diff --git a/README.md b/README.md index 296e699..6498c72 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,101 @@ # node-p2p-simple +Peer to peer JSON Pub/Sub with no extremal dependencies. Each peer can act as +server and client, just a client or a simple relay peer. Topics can be +subscribed to using a simple string or regex pattern. + +## Install + +`npm install --save` + + +## Usage + +Instantiate a `P2PSub` instance: + +``` +const {P2PSub} = require(''); + +const p2p = new P2PSub({ + listenPort: 7575, + peers:[ + '10.10.10.11:7575', + '10.10.10.11:7575', + '172.16.24.2:8637' + ] +}); +``` + +Now we can listen for and publish topics across the whole network: + +``` +// Local peer + +p2p.publish('announcement', {message: 'p2p pubsub is awesome!'}); +``` + +``` +// Remote peer + +p2p.subscribe('announcement', (data)=> console.log(data.message)); + +# p2p pubsub is awesome! +``` + +We can also use regex patterns in our subscribes to catch more, or all topics: + +``` +// Local peer + +p2p.publish('announcement', {message: 'p2p pubsub is awesome!'}); +p2p.publish('announcement-group1', {message: 'Mesh is the future!'}); +p2p.publish('resource-block-added', {id: '123', name:'block0'}); +``` + +``` +// Remote peer + +p2p.subscribe(\^announcement\, (data)=> console.log(data.message)); + +# p2p pubsub is awesome! +# Mesh is the future! +``` + +``` +// Another remote peer + +p2p.subscribe(\.\, (data)=> console.log(data)); + +# {message: 'p2p pubsub is awesome!'} +# {message: 'Mesh is the future!'} +# {id: '123', name:'block0'} +``` + +### P2PSub instance options + +`listenPort` Optional, type Number or String. Sets the incoming TCP port for the +local peer to listen on. If this is left blank, the local peer will not accept +incoming peers. Default is `undefined` + +`peers`: Optional, type Array of Strings. The list of peers this peer will try +to connect with. Peers should be specified as `{host}:{port{}`. The host can be +an IP or a hostname the system can resolve. Default is `[]` + +`logLevel`: Optional, type Array of Strings or `false`. Sets the how the +instance log to STDIN out. Options are `info`, `warn` and `error` if one or more +is passed in the `logLevel` Array, messages ill be printed to STDIN/STDERR. +Passing `false` or leaving it blank will suppress all message except if the +listening port is in use. + +### CLI usage + +A simple relay peer can be set up using just the CLI, no code required. This +peer will only relay messages to all its connected peers. The logging level is +set to `info`. + +```./app.js 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575 ... + +``` + +The first argument is the listen port, optionally followed by space separated +list of peers to connect with. diff --git a/app.js b/app.js index c1d66f6..1d7b082 100755 --- a/app.js +++ b/app.js @@ -1,47 +1,53 @@ -#!/usr/bin/env nodejs -const app = {}; -const pubsub = new (require('./pubsub')).PubSub(); -module.exports = app; - -const {P2P} = require('./p2p') +const {PubSub} = require('./pubsub'); +const {P2P} = require('./p2p'); -const args = process.argv.slice(1); +class P2PSub{ + constructor(...args){ + this.p2p = new P2P(...args); -const exec_name = args[0].split('/').pop(); -const server_port = args[1]; -const clients_list = args.slice(2); + this.pubsub = new PubSub(); -if(server_port === "help"){ - console.error('Please supply the server port and list of clients to connect too;') - console.error(`${exec_name} ...` ) - console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` ) - process.exit(0) + this.pubsub.subscribe(/.*/gi, function(data, topic){ + if(data.__local) return false; + this.p2p.broadcast({ + type:'topic', + body:{ + topic: topic, + data: data + } + }); + }); + + this.p2p.onData(function(data){ + data.__local = true; + if(data.type === 'topic') this.pubsub.publish(data.body.topic, data.body.data, true); + }); + } + + subscribe(){ + return this.subscribe.apply(this.pubsub, arguments); + } + + publish(){ + return this.publish.apply(this.pubsub, arguments); + } + + broadcast(){ + return this.broadcast.apply(this.p2p, arguments); + } + + onData(){ + return this.onData.apply(this.p2p, arguments); + } + + addPeer(){ + return this.addPeer.apply(this.p2p, arguments); + } + + removePeer(){ + return this.removePeer.apply(this.p2p, arguments); + } } -console.log('port:', server_port, 'clients:', clients_list) - -app.p2p = new P2P({ - listenPort: server_port, - peers: clients_list -}) - -app.pub = pubsub.pub; -app.sub = pubsub.sub; - - -app.sub(/.*/gi, function(data, topic){ - if(data.__local) return false; - data.__local = true; - app.p2p.broadcast({ - type:'topic', - body:{ - topic: topic, - data: data - } - }); -}); - -app.p2p.onData(function(data){ - if(data.type === 'topic') app.publish(data.body.topic, data.body.data, true); -}); \ No newline at end of file +module.exports = {P2PSub, P2P, PubSub}; diff --git a/cli.js b/cli.js new file mode 100755 index 0000000..199fde7 --- /dev/null +++ b/cli.js @@ -0,0 +1,25 @@ +#!/usr/bin/env nodejs + + +const {P2PSub} = require('./app'); + +const args = process.argv.slice(1); + +const exec_name = args[0].split('/').pop(); +const listenPort = args[1]; +const peers = args.slice(2); + +if(P2PSub === "help"){ + console.error('Please supply the server port and list of clients to connect too;') + console.error(`${exec_name} ...` ) + console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` ) + process.exit(0) +} + +// console.log('port:', server_port, 'clients:', clients_list) + +let instance = new P2PSub({ + listenPort, + peers, + logLevel: ['info'] +}); \ No newline at end of file diff --git a/p2p.js b/p2p.js index c47aaa5..2882504 100644 --- a/p2p.js +++ b/p2p.js @@ -3,6 +3,7 @@ const net = require('net'); class P2P { constructor(args){ + args = args || {}; // The list of clients this peer wants to connect to this.wantedPeers = new Set(); @@ -19,6 +20,9 @@ class P2P { // Hold the data callbacks when a message is received this.onDataCallbacks = []; + // Set the logging level + this.logLevel = args.logLevel || false; + // If a listen port was specified, have this peer listen for incoming // connection if(args.listenPort){ @@ -29,6 +33,14 @@ class P2P { if(args.peers){ this.addPeer(args.peers); } + + this.__log('info', 'Local peerID', this.peerID); + } + + __log(type, ...message){ + if(this.logLevel || this.logLevel === 'all' || this.logLevel.includes(type)){ + console[type](...message); + } } // Take a peer as : and add it the `wantedPeers` list @@ -72,7 +84,7 @@ class P2P { let p2p = this; peer.on('connect', function(){ - console.info(`Peer ${address} is now connected.`); + p2p.__log('info', `Peer ${address} is now connected.`); peer.peerConnectAddress = `${address}:${port}` // When a connection is started, send a message informing the remote @@ -81,7 +93,7 @@ class P2P { }); peer.on('close', function(){ - console.info(`Client Peer ${address}, ${peer.peerID} droped.`); + p2p.__log('info', `Client Peer ${address}, ${peer.peerID} droped.`); delete p2p.connectedPeers[peer.peerID]; }); @@ -91,9 +103,9 @@ class P2P { peer.on('error', function(error){ if(error.syscall === 'connect' && error.code === 'ECONNREFUSED'){ - console.info(`Peer ${error.address}:${error.port} connection refussed!`); + p2p.__log('info', `Peer ${error.address}:${error.port} connection refussed!`); }else{ - console.warn('client EVENT error:', arguments); + p2p.__log('warn', 'client EVENT error:', arguments); } }); } @@ -152,18 +164,22 @@ class P2P { }); clientSocket.on('close', function(){ - console.info(`server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`); + p2p.__log('info', `server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`); delete p2p.connectedPeers[clientSocket.peerID]; }); }); serverSocket.on('listening', function(){ - console.log('p2p server listening on', port,) + p2p.__log('info','p2p server listening on', port,) }); - serverSocket.on('error', function(){ - console.log('server EVENT error:', arguments); + serverSocket.on('error', function(error){ + if(error.syscall === 'listen' && error.code === 'EADDRINUSE'){ + console.error('ERROR: Server listening port', port, 'address already in use.') + process.exit(2) + } + p2p.__log('error','server EVENT error:', arguments); }); serverSocket.listen(Number(port)); @@ -197,17 +213,17 @@ class P2P { // Drop heart beats if(message.type === "heartbeat"){ - console.log('heartbeat from', from); + this.__log('info', 'heartbeat from', from); return ; } // Register new clients with this peer, drop them if a connection // already exists if(message.type === "register"){ - console.log('registering peer', message.id, socket.remoteAddress) + this.__log('info', 'registering peer', message.id, socket.remoteAddress) if(Object.keys(this.connectedPeers).includes(message.id)){ - console.log(`Dropping ${message.id}, already connected`) + this.__log('info', `Dropping ${message.id}, already connected`) socket.end(); }else{ socket.peerID = message.id @@ -218,6 +234,8 @@ class P2P { return ; } + this.__log('info', 'message', message); + // forward the message to other peers this.broadcast(message, message.sentTo); // console.log('p2p read:', message) diff --git a/pubsub.js b/pubsub.js index d06bdd7..9f1bd2f 100644 --- a/pubsub.js +++ b/pubsub.js @@ -9,6 +9,8 @@ class PubSub{ topic = "__REGEX__"; } + // console.log(this) +// // create the topic if not yet created if(!this.topics[topic]) this.topics[topic] = []; @@ -17,7 +19,7 @@ class PubSub{ } matchTopics(topic){ - let topics = [... this.topics[topic] ? this.topics[topic] : []] + let topics = [... this.topics[topic] ? this.topics[topic] : []]; // console.log(this.topics) if(!this.topics['__REGEX__']) return topics;