This commit is contained in:
William Mantly 2020-08-24 12:04:54 -04:00
parent 0ba7f86fc1
commit 7bd9fa8308
Signed by: wmantly
GPG Key ID: 186A8370EFF937CA
5 changed files with 203 additions and 53 deletions

View File

@ -1,2 +1,101 @@
# node-p2p-simple
Peer to peer JSON Pub/Sub with no extremal dependencies. Each peer can act as
server and client, just a client or a simple relay peer. Topics can be
subscribed to using a simple string or regex pattern.
## Install
`npm install <Name Soon!> --save`
## Usage
Instantiate a `P2PSub` instance:
```
const {P2PSub} = require('<Name Soon!>');
const p2p = new P2PSub({
listenPort: 7575,
peers:[
'10.10.10.11:7575',
'10.10.10.11:7575',
'172.16.24.2:8637'
]
});
```
Now we can listen for and publish topics across the whole network:
```
// Local peer
p2p.publish('announcement', {message: 'p2p pubsub is awesome!'});
```
```
// Remote peer
p2p.subscribe('announcement', (data)=> console.log(data.message));
# p2p pubsub is awesome!
```
We can also use regex patterns in our subscribes to catch more, or all topics:
```
// Local peer
p2p.publish('announcement', {message: 'p2p pubsub is awesome!'});
p2p.publish('announcement-group1', {message: 'Mesh is the future!'});
p2p.publish('resource-block-added', {id: '123', name:'block0'});
```
```
// Remote peer
p2p.subscribe(\^announcement\, (data)=> console.log(data.message));
# p2p pubsub is awesome!
# Mesh is the future!
```
```
// Another remote peer
p2p.subscribe(\.\, (data)=> console.log(data));
# {message: 'p2p pubsub is awesome!'}
# {message: 'Mesh is the future!'}
# {id: '123', name:'block0'}
```
### P2PSub instance options
`listenPort` Optional, type Number or String. Sets the incoming TCP port for the
local peer to listen on. If this is left blank, the local peer will not accept
incoming peers. Default is `undefined`
`peers`: Optional, type Array of Strings. The list of peers this peer will try
to connect with. Peers should be specified as `{host}:{port{}`. The host can be
an IP or a hostname the system can resolve. Default is `[]`
`logLevel`: Optional, type Array of Strings or `false`. Sets the how the
instance log to STDIN out. Options are `info`, `warn` and `error` if one or more
is passed in the `logLevel` Array, messages ill be printed to STDIN/STDERR.
Passing `false` or leaving it blank will suppress all message except if the
listening port is in use.
### CLI usage
A simple relay peer can be set up using just the CLI, no code required. This
peer will only relay messages to all its connected peers. The logging level is
set to `info`.
```./app.js 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575 ...
```
The first argument is the listen port, optionally followed by space separated
list of peers to connect with.

88
app.js
View File

@ -1,47 +1,53 @@
#!/usr/bin/env nodejs
const app = {};
const pubsub = new (require('./pubsub')).PubSub();
module.exports = app;
const {P2P} = require('./p2p')
const {PubSub} = require('./pubsub');
const {P2P} = require('./p2p');
const args = process.argv.slice(1);
class P2PSub{
constructor(...args){
this.p2p = new P2P(...args);
const exec_name = args[0].split('/').pop();
const server_port = args[1];
const clients_list = args.slice(2);
this.pubsub = new PubSub();
if(server_port === "help"){
console.error('Please supply the server port and list of clients to connect too;')
console.error(`${exec_name} <server port> <client 1> <client 2> <client 3> ...` )
console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` )
process.exit(0)
this.pubsub.subscribe(/.*/gi, function(data, topic){
if(data.__local) return false;
this.p2p.broadcast({
type:'topic',
body:{
topic: topic,
data: data
}
});
});
this.p2p.onData(function(data){
data.__local = true;
if(data.type === 'topic') this.pubsub.publish(data.body.topic, data.body.data, true);
});
}
subscribe(){
return this.subscribe.apply(this.pubsub, arguments);
}
publish(){
return this.publish.apply(this.pubsub, arguments);
}
broadcast(){
return this.broadcast.apply(this.p2p, arguments);
}
onData(){
return this.onData.apply(this.p2p, arguments);
}
addPeer(){
return this.addPeer.apply(this.p2p, arguments);
}
removePeer(){
return this.removePeer.apply(this.p2p, arguments);
}
}
console.log('port:', server_port, 'clients:', clients_list)
app.p2p = new P2P({
listenPort: server_port,
peers: clients_list
})
app.pub = pubsub.pub;
app.sub = pubsub.sub;
app.sub(/.*/gi, function(data, topic){
if(data.__local) return false;
data.__local = true;
app.p2p.broadcast({
type:'topic',
body:{
topic: topic,
data: data
}
});
});
app.p2p.onData(function(data){
if(data.type === 'topic') app.publish(data.body.topic, data.body.data, true);
});
module.exports = {P2PSub, P2P, PubSub};

25
cli.js Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env nodejs
const {P2PSub} = require('./app');
const args = process.argv.slice(1);
const exec_name = args[0].split('/').pop();
const listenPort = args[1];
const peers = args.slice(2);
if(P2PSub === "help"){
console.error('Please supply the server port and list of clients to connect too;')
console.error(`${exec_name} <server port> <client 1> <client 2> <client 3> ...` )
console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` )
process.exit(0)
}
// console.log('port:', server_port, 'clients:', clients_list)
let instance = new P2PSub({
listenPort,
peers,
logLevel: ['info']
});

40
p2p.js
View File

@ -3,6 +3,7 @@ const net = require('net');
class P2P {
constructor(args){
args = args || {};
// The list of clients this peer wants to connect to
this.wantedPeers = new Set();
@ -19,6 +20,9 @@ class P2P {
// Hold the data callbacks when a message is received
this.onDataCallbacks = [];
// Set the logging level
this.logLevel = args.logLevel || false;
// If a listen port was specified, have this peer listen for incoming
// connection
if(args.listenPort){
@ -29,6 +33,14 @@ class P2P {
if(args.peers){
this.addPeer(args.peers);
}
this.__log('info', 'Local peerID', this.peerID);
}
__log(type, ...message){
if(this.logLevel || this.logLevel === 'all' || this.logLevel.includes(type)){
console[type](...message);
}
}
// Take a peer as <host>:<port> and add it the `wantedPeers` list
@ -72,7 +84,7 @@ class P2P {
let p2p = this;
peer.on('connect', function(){
console.info(`Peer ${address} is now connected.`);
p2p.__log('info', `Peer ${address} is now connected.`);
peer.peerConnectAddress = `${address}:${port}`
// When a connection is started, send a message informing the remote
@ -81,7 +93,7 @@ class P2P {
});
peer.on('close', function(){
console.info(`Client Peer ${address}, ${peer.peerID} droped.`);
p2p.__log('info', `Client Peer ${address}, ${peer.peerID} droped.`);
delete p2p.connectedPeers[peer.peerID];
});
@ -91,9 +103,9 @@ class P2P {
peer.on('error', function(error){
if(error.syscall === 'connect' && error.code === 'ECONNREFUSED'){
console.info(`Peer ${error.address}:${error.port} connection refussed!`);
p2p.__log('info', `Peer ${error.address}:${error.port} connection refussed!`);
}else{
console.warn('client EVENT error:', arguments);
p2p.__log('warn', 'client EVENT error:', arguments);
}
});
}
@ -152,18 +164,22 @@ class P2P {
});
clientSocket.on('close', function(){
console.info(`server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`);
p2p.__log('info', `server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`);
delete p2p.connectedPeers[clientSocket.peerID];
});
});
serverSocket.on('listening', function(){
console.log('p2p server listening on', port,)
p2p.__log('info','p2p server listening on', port,)
});
serverSocket.on('error', function(){
console.log('server EVENT error:', arguments);
serverSocket.on('error', function(error){
if(error.syscall === 'listen' && error.code === 'EADDRINUSE'){
console.error('ERROR: Server listening port', port, 'address already in use.')
process.exit(2)
}
p2p.__log('error','server EVENT error:', arguments);
});
serverSocket.listen(Number(port));
@ -197,17 +213,17 @@ class P2P {
// Drop heart beats
if(message.type === "heartbeat"){
console.log('heartbeat from', from);
this.__log('info', 'heartbeat from', from);
return ;
}
// Register new clients with this peer, drop them if a connection
// already exists
if(message.type === "register"){
console.log('registering peer', message.id, socket.remoteAddress)
this.__log('info', 'registering peer', message.id, socket.remoteAddress)
if(Object.keys(this.connectedPeers).includes(message.id)){
console.log(`Dropping ${message.id}, already connected`)
this.__log('info', `Dropping ${message.id}, already connected`)
socket.end();
}else{
socket.peerID = message.id
@ -218,6 +234,8 @@ class P2P {
return ;
}
this.__log('info', 'message', message);
// forward the message to other peers
this.broadcast(message, message.sentTo);
// console.log('p2p read:', message)

View File

@ -9,6 +9,8 @@ class PubSub{
topic = "__REGEX__";
}
// console.log(this)
//
// create the topic if not yet created
if(!this.topics[topic]) this.topics[topic] = [];
@ -17,7 +19,7 @@ class PubSub{
}
matchTopics(topic){
let topics = [... this.topics[topic] ? this.topics[topic] : []]
let topics = [... this.topics[topic] ? this.topics[topic] : []];
// console.log(this.topics)
if(!this.topics['__REGEX__']) return topics;