Compare commits

..

9 Commits

Author SHA1 Message Date
d443e7d583
docs 2020-08-24 17:14:41 -04:00
685e905fbd
docs 2020-08-24 17:05:58 -04:00
e3e55b8a92
docs 2020-08-24 17:03:53 -04:00
c9ec277cc3
docs 2020-08-24 17:02:39 -04:00
15e04039a3
docs 2020-08-24 12:06:36 -04:00
54f74e4a49
docs 2020-08-24 12:05:42 -04:00
7bd9fa8308
docs 2020-08-24 12:04:54 -04:00
0ba7f86fc1
pubsub 2020-08-22 20:06:28 -04:00
e71803cb3d
commented 2020-08-21 20:23:18 -04:00
6 changed files with 369 additions and 57 deletions

169
README.md
View File

@ -1,2 +1,171 @@
# node-p2p-simple
Mesh peer to peer JSON Pub/Sub with no extremal dependencies. Each peer can act as
server and client, just a client or a simple relay peer. Topics can be
subscribed to using a simple string or regex pattern.
## Features
* Mesh peer to peer network forwards messages to all connected, even if they are
not directly connected.
* Peers can be added and removed on the fly.
* PubSub topics can be subscribed using patter RegExp.
* Peer to peer and pub/sub is 2 separate importable classes for even more
customization.
* **No dependences.**
## Install
`npm install <Name Soon!> --save`
## Usage
Instantiate a `P2PSub` instance:
```
const {P2PSub} = require('<Name Soon!>');
const p2p = new P2PSub({
listenPort: 7575,
peers:[
'10.10.10.11:7575',
'10.10.10.11:7575',
'172.16.24.2:8637'
]
});
```
Now we can listen for and publish topics across the whole network:
```
// Local peer
p2p.publish('announcement', {message: 'p2p pubsub is awesome!'});
```
```
// Remote peer
p2p.subscribe('announcement', (data)=> console.log(data.message));
# p2p pubsub is awesome!
```
We can also use regex patterns in our subscribes to catch more, or all topics:
```
// Local peer
p2p.publish('announcement', {message: 'p2p pubsub is awesome!'});
p2p.publish('announcement-group1', {message: 'Mesh is the future!'});
p2p.publish('resource-block-added', {id: '123', name:'block0'});
```
```
// Remote peer
p2p.subscribe(\^announcement\, (data)=> console.log(data.message));
# p2p pubsub is awesome!
# Mesh is the future!
```
```
// Another remote peer
p2p.subscribe(\.\, (data)=> console.log(data));
# {message: 'p2p pubsub is awesome!'}
# {message: 'Mesh is the future!'}
# {id: '123', name:'block0'}
```
### P2PSub instance options
All of these are provided, and passed to the `P2P` class. `PubSub` takes no
instance variables.
`listenPort` Optional, type Number or String. Sets the incoming TCP port for the
local peer to listen on. If this is left blank, the local peer will not accept
incoming peers. Default is `undefined`
`peers`: Optional, type Array of Strings. The list of peers this peer will try
to connect with. Peers should be specified as `{host}:{port{}`. The host can be
an IP or a hostname the system can resolve. Default is `[]`
`logLevel`: Optional, type Array of Strings or `false`. Sets the how the
instance log to STDIN out. Options are `info`, `warn` and `error` if one or more
is passed in the `logLevel` Array, messages ill be printed to STDIN/STDERR.
Passing `false` or leaving it blank will suppress all message except if the
listening port is in use.
## CLI usage
A simple relay peer can be set up using just the CLI, no code required. This
peer will only relay messages to all its connected peers. The logging level is
set to `info`.
```
./app.js 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575 ...
```
The first argument is the listening port, optionally followed by space separated
list of peers to connect with.
## Methods and attributes
Methods are provided by either the `P2P` or `PubSub` classes. The `P2PSub` class
is a mix of both classes and merges the Pub/Sub with p2p functions.
The `P2PSub` class provides `subscribe()`, `publish()`, `addPeer()` and
`removePeer()`. Please see the methods below for detailed usage.
#### Provided by `PubSub` class
* `subscribe(topic, callback-function)` Sets a function to be called on `topic`
publish. A RegExp patter can be passed as the topic and will match to
`topic` on publish. a String will perform an exact match.
* `publish(topic, JSON-body)` Executes each callback attached to the passed
`topic`. To prevent a publication from propagating across the network, pass
`__local = true` in the message body.
* `topics` An instance attribute holding an Object on topics and bound callbacks.
This is not exposed by the `P2PSub` class.
#### Provided by `P2P` class
* `addPeer(peer)` Adds a remote peer to the local peers connections. `peer` can
passed as a single sting with a hostname and port, `'remotehost.com:7575'`,
or an Array of Strings with peers `['remotehost.com:7575', '10.10.2.1:7575']`.
Duplicate and existing peers will be ignored.
* `removePeer(peer)` Disconnect the passed peer and removes it from the list of
peers this instances auto reconnects to. `peer` is passed as a single sting
with a hostname and port, `'remotehost.com:7575'`
* `onData(callback)` Pass a listener to called when this peer receives a message
from the network. The message is passed to the callback as native JSON
object. This is not exposed by the `P2PSub` class, `subscribe` should be
used.
* `broadcast(message, <excludes>)` Sends a JSON message to all connected peers.
`message` is JSON object that will be stringified. `excludes` is a Array of
Strings containing peerID's that this broadcast should not be sent you, and
is for internal use at this time. This is not exposed by the `P2PSub` class,
`publish` should be used.
* `peerID` An instance attribute holding a String for the local `peerID`. This
is randomly generated and is for internal use. This is not exposed by the
`P2PSub` class.
## Todo
* Add timestamps to each message.
* Change the parsing of the message move `sentTo` in the prototype before
passing the class.
* Add optional TSL
* Internal ability to publish new peers across the network.
* Add config file for CLI mode.

70
app.js
View File

@ -1,29 +1,53 @@
#!/usr/bin/env nodejs
const app = {};
const {P2P} = require('./p2p')
const {PubSub} = require('./pubsub');
const {P2P} = require('./p2p');
const args = process.argv.slice(1);
class P2PSub{
constructor(...args){
this.p2p = new P2P(...args);
const exec_name = args[0].split('/').pop();
const server_port = args[1];
const clients_list = args.slice(2);
this.pubsub = new PubSub();
if(server_port === "help"){
console.error('Please supply the server port and list of clients to connect too;')
console.error(`${exec_name} <server port> <client 1> <client 2> <client 3> ...` )
console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` )
process.exit(0)
this.pubsub.subscribe(/.*/gi, function(data, topic){
if(data.__local) return false;
this.p2p.broadcast({
type:'topic',
body:{
topic: topic,
data: data
}
});
});
this.p2p.onData(function(data){
data.__local = true;
if(data.type === 'topic') this.pubsub.publish(data.body.topic, data.body.data, true);
});
}
subscribe(){
return this.subscribe.apply(this.pubsub, arguments);
}
publish(){
return this.publish.apply(this.pubsub, arguments);
}
broadcast(){
return this.broadcast.apply(this.p2p, arguments);
}
onData(){
return this.onData.apply(this.p2p, arguments);
}
addPeer(){
return this.addPeer.apply(this.p2p, arguments);
}
removePeer(){
return this.removePeer.apply(this.p2p, arguments);
}
}
console.log('port:', server_port, 'clients:', clients_list)
app.p2p = new P2P({
listenPort: server_port,
peers: clients_list
})
app.pubsub = require('./pubsub')
module.exports = app;
module.exports = {P2PSub, P2P, PubSub};

25
cli.js Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env nodejs
const {P2PSub} = require('./app');
const args = process.argv.slice(1);
const exec_name = args[0].split('/').pop();
const listenPort = args[1];
const peers = args.slice(2);
if(P2PSub === "help"){
console.error('Please supply the server port and list of clients to connect too;')
console.error(`${exec_name} <server port> <client 1> <client 2> <client 3> ...` )
console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` )
process.exit(0)
}
// console.log('port:', server_port, 'clients:', clients_list)
let instance = new P2PSub({
listenPort,
peers,
logLevel: ['info']
});

112
p2p.js
View File

@ -3,24 +3,50 @@ const net = require('net');
class P2P {
constructor(args){
args = args || {};
// The list of clients this peer wants to connect to
this.wantedPeers = new Set();
// The peers we are currently connected to
this.connectedPeers = {};
// Random ID for this peer
this.peerID = crypto.randomBytes(16).toString("hex");
// Kick of the interval to connect to peers and send hear beats
this.connectInterval = this.__peerInterval();
// Hold the data callbacks when a message is received
this.onDataCallbacks = [];
// Set the logging level
this.logLevel = args.logLevel || false;
// If a listen port was specified, have this peer listen for incoming
// connection
if(args.listenPort){
this.server = this.__listen(args.listenPort);
}
// If a list of peers in supplied, add them.
if(args.peers){
this.addPeer(args.peers);
}
this.__log('info', 'Local peerID', this.peerID);
}
__log(type, ...message){
if(this.logLevel || this.logLevel === 'all' || this.logLevel.includes(type)){
console[type](...message);
}
}
// Take a peer as <host>:<port> and add it the `wantedPeers` list
addPeer(peer){
// If `peer` is a list, call `addPeer` with each item
if(Array.isArray(peer)){
for(let address of peer){
this.addPeer(address);
@ -28,22 +54,25 @@ class P2P {
return;
}
if(this.wantedPeers.has(peer)) return true;
this.wantedPeers.add(peer);
}
// Close a connection to a peer and remove it from the `wantedPeers` list
removePeer(peer){
if(this.wantedPeers.has(peer)){
this.wantedPeers.delete(peer);
// find the peer in the `connectedPeers` object
for(let peerID in this.connectedPeers){
if(this.connectedPeers[peerID].peerConnectAddress !== peer) continue;
this.connectedPeers[peerID].end();
delete this.connectedPeers[peerID];
}
}
}
// Connect to a remote peer
__connectPeer(address, port) {
if(!port){
let parse = address.split(':');
@ -55,13 +84,16 @@ class P2P {
let p2p = this;
peer.on('connect', function(){
console.info(`Peer ${address} is now connected.`);
p2p.__log('info', `Peer ${address} is now connected.`);
peer.peerConnectAddress = `${address}:${port}`
// When a connection is started, send a message informing the remote
// peer of our ID
peer.write(JSON.stringify({type:"register", id: p2p.peerID}));
});
peer.on('close', function(){
console.info(`Client Peer ${address}, ${peer.peerID} droped.`);
p2p.__log('info', `Client Peer ${address}, ${peer.peerID} droped.`);
delete p2p.connectedPeers[peer.peerID];
});
@ -71,24 +103,38 @@ class P2P {
peer.on('error', function(error){
if(error.syscall === 'connect' && error.code === 'ECONNREFUSED'){
console.info(`Peer ${error.address}:${error.port} connection refussed!`);
p2p.__log('info', `Peer ${error.address}:${error.port} connection refussed!`);
}else{
console.warn('client EVENT error:', arguments);
p2p.__log('warn', 'client EVENT error:', arguments);
}
});
}
__peerInterval(interval){
return setInterval(function(p2p){
let connected = Object.keys(p2p.connectedPeers).map(function(peerID){
return p2p.connectedPeers[peerID].peerConnectAddress
});
this.count = 1;
for(let peer of p2p.wantedPeers){
if(!connected.includes(peer)){
p2p.__connectPeer(peer);
}
return setInterval(function(p2p){
// Copy the wanted peers list do we can reduce it.
let tryConnectionSet = new Set(p2p.wantedPeers);
// loop over all the connected peers
for(let peerID in p2p.connectedPeers){
let peer = p2p.connectedPeers[peerID];
// If the peer does not a `peerConnactAddress`, it
if(! peer.peerConnectAddress) continue;
// Remove connected peers from the list
tryConnectionSet.delete(peer.peerConnectAddress);
// Every once and while send a heart beat keep the socket open.
if((p2p.count % 10) == 0) peer.write(JSON.stringify({type:"heartbeat"}));
}
// loop over the unconnected peers, and try to connect to them.
for(let peer of tryConnectionSet){
p2p.__connectPeer(peer);
}
}, interval || 1000, this);
}
@ -99,19 +145,18 @@ class P2P {
let serverSocket = new net.Server(function (clientSocket) {
console.info(`server ${port}`)
clientSocket.on('error', function(){
console.log('server-client EVENT error:', arguments);
});
});
serverSocket.on('connection', function(clientSocket){
console.log('server EVENT connection from client:', clientSocket.remoteAddress);
// When a connection is started, send a message informing the remote
// peer of our ID
clientSocket.write(JSON.stringify({type:"register", id: p2p.peerID}));
clientSocket.on('data', function(data){
@ -119,18 +164,22 @@ class P2P {
});
clientSocket.on('close', function(){
console.info(`server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`);
p2p.__log('info', `server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`);
delete p2p.connectedPeers[clientSocket.peerID];
});
});
serverSocket.on('listening', function(){
console.log('p2p server listening on', port,)
p2p.__log('info','p2p server listening on', port,)
});
serverSocket.on('error', function(){
console.log('server EVENT error:', arguments);
serverSocket.on('error', function(error){
if(error.syscall === 'listen' && error.code === 'EADDRINUSE'){
console.error('ERROR: Server listening port', port, 'address already in use.')
process.exit(2)
}
p2p.__log('error','server EVENT error:', arguments);
});
serverSocket.listen(Number(port));
@ -142,38 +191,51 @@ class P2P {
exclude = [...exclude || [], ...[this.peerID]]
let sentTo = []
// Build a list of peers to send this message too
for(let _peerID in this.connectedPeers){
if(exclude.includes(_peerID)) continue;
sentTo.push(_peerID);
}
// Attach a list of peers this message has been send to, including our
// peerID
message.sentTo = [...new Set([...message.sentTo || [], ...sentTo, ...[this.peerID]])]
message.from = message.from || this.peerID;
// Send the message to the connected peers in the `sentTo` list.
for(let _peerID of sentTo){
this.connectedPeers[_peerID].write(JSON.stringify(message));
}
}
__read(message, from, socket){
// Parse an incoming message
// Drop heart beats
if(message.type === "heartbeat"){
console.log('heartbeat from', from);
this.__log('info', 'heartbeat from', from);
return ;
}
// Register new clients with this peer, drop them if a connection
// already exists
if(message.type === "register"){
console.log('registering peer', message.id, socket.remoteAddress)
this.__log('info', 'registering peer', message.id, socket.remoteAddress)
if(Object.keys(this.connectedPeers).includes(message.id)){
console.log(`Dropping ${message.id}, already connected`)
this.__log('info', `Dropping ${message.id}, already connected`)
socket.end();
}else{
socket.peerID = message.id
// Add the peer to 'connectedPeers' object with the remote
// peerID as the key and the socket as it' value.
this.connectedPeers[message.id] = socket;
}
return ;
}
this.__log('info', 'message', message);
// forward the message to other peers
this.broadcast(message, message.sentTo);
// console.log('p2p read:', message)
@ -182,7 +244,6 @@ class P2P {
this.onDataCallbacks.forEach(function(callback){
callback(message);
})
}
onData(callback){
@ -190,7 +251,6 @@ class P2P {
this.onDataCallbacks.push(callback);
}
}
}
module.exports = {P2P};

View File

@ -7,5 +7,5 @@
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC"
"license": "MIT"
}

View File

@ -1,11 +1,45 @@
const app = require('./app');
class PubSub{
constructor(){
this.topics = {};
}
app.p2p.onData(function(data){
console.log('app, data:', data)
})
subscribe(topic, listener) {
if(topic instanceof RegExp){
listener.match = topic;
topic = "__REGEX__";
}
// console.log(this)
//
// create the topic if not yet created
if(!this.topics[topic]) this.topics[topic] = [];
setTimeout(function(){
app.p2p.broadcast({type:'topic', body:"yolo"})
}, 10000);
// add the listener
this.topics[topic].push(listener);
}
matchTopics(topic){
let topics = [... this.topics[topic] ? this.topics[topic] : []];
// console.log(this.topics)
if(!this.topics['__REGEX__']) return topics;
for(let listener of this.topics['__REGEX__']){
if(topic.match(listener.match)) topics.push(listener);
}
return topics;
}
publish(topic, data) {
// send the event to all listeners
this.matchTopics(topic).forEach(function(listener) {
setTimeout(function(data, topic){
listener(data || {}, topic);
}, 0, data, topic);
});
}
}
module.exports = {PubSub};