Compare commits
9 Commits
not-workin
...
master
Author | SHA1 | Date | |
---|---|---|---|
d443e7d583 | |||
685e905fbd | |||
e3e55b8a92 | |||
c9ec277cc3 | |||
15e04039a3 | |||
54f74e4a49 | |||
7bd9fa8308 | |||
0ba7f86fc1 | |||
e71803cb3d |
169
README.md
169
README.md
@ -1,2 +1,171 @@
|
|||||||
# node-p2p-simple
|
# node-p2p-simple
|
||||||
|
|
||||||
|
Mesh peer to peer JSON Pub/Sub with no extremal dependencies. Each peer can act as
|
||||||
|
server and client, just a client or a simple relay peer. Topics can be
|
||||||
|
subscribed to using a simple string or regex pattern.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
* Mesh peer to peer network forwards messages to all connected, even if they are
|
||||||
|
not directly connected.
|
||||||
|
* Peers can be added and removed on the fly.
|
||||||
|
* PubSub topics can be subscribed using patter RegExp.
|
||||||
|
* Peer to peer and pub/sub is 2 separate importable classes for even more
|
||||||
|
customization.
|
||||||
|
* **No dependences.**
|
||||||
|
|
||||||
|
## Install
|
||||||
|
|
||||||
|
`npm install <Name Soon!> --save`
|
||||||
|
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
Instantiate a `P2PSub` instance:
|
||||||
|
|
||||||
|
```
|
||||||
|
const {P2PSub} = require('<Name Soon!>');
|
||||||
|
|
||||||
|
const p2p = new P2PSub({
|
||||||
|
listenPort: 7575,
|
||||||
|
peers:[
|
||||||
|
'10.10.10.11:7575',
|
||||||
|
'10.10.10.11:7575',
|
||||||
|
'172.16.24.2:8637'
|
||||||
|
]
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
Now we can listen for and publish topics across the whole network:
|
||||||
|
|
||||||
|
```
|
||||||
|
// Local peer
|
||||||
|
|
||||||
|
p2p.publish('announcement', {message: 'p2p pubsub is awesome!'});
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
// Remote peer
|
||||||
|
|
||||||
|
p2p.subscribe('announcement', (data)=> console.log(data.message));
|
||||||
|
|
||||||
|
# p2p pubsub is awesome!
|
||||||
|
```
|
||||||
|
|
||||||
|
We can also use regex patterns in our subscribes to catch more, or all topics:
|
||||||
|
|
||||||
|
```
|
||||||
|
// Local peer
|
||||||
|
|
||||||
|
p2p.publish('announcement', {message: 'p2p pubsub is awesome!'});
|
||||||
|
p2p.publish('announcement-group1', {message: 'Mesh is the future!'});
|
||||||
|
p2p.publish('resource-block-added', {id: '123', name:'block0'});
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
// Remote peer
|
||||||
|
|
||||||
|
p2p.subscribe(\^announcement\, (data)=> console.log(data.message));
|
||||||
|
|
||||||
|
# p2p pubsub is awesome!
|
||||||
|
# Mesh is the future!
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
// Another remote peer
|
||||||
|
|
||||||
|
p2p.subscribe(\.\, (data)=> console.log(data));
|
||||||
|
|
||||||
|
# {message: 'p2p pubsub is awesome!'}
|
||||||
|
# {message: 'Mesh is the future!'}
|
||||||
|
# {id: '123', name:'block0'}
|
||||||
|
```
|
||||||
|
|
||||||
|
### P2PSub instance options
|
||||||
|
|
||||||
|
All of these are provided, and passed to the `P2P` class. `PubSub` takes no
|
||||||
|
instance variables.
|
||||||
|
|
||||||
|
`listenPort` Optional, type Number or String. Sets the incoming TCP port for the
|
||||||
|
local peer to listen on. If this is left blank, the local peer will not accept
|
||||||
|
incoming peers. Default is `undefined`
|
||||||
|
|
||||||
|
`peers`: Optional, type Array of Strings. The list of peers this peer will try
|
||||||
|
to connect with. Peers should be specified as `{host}:{port{}`. The host can be
|
||||||
|
an IP or a hostname the system can resolve. Default is `[]`
|
||||||
|
|
||||||
|
`logLevel`: Optional, type Array of Strings or `false`. Sets the how the
|
||||||
|
instance log to STDIN out. Options are `info`, `warn` and `error` if one or more
|
||||||
|
is passed in the `logLevel` Array, messages ill be printed to STDIN/STDERR.
|
||||||
|
Passing `false` or leaving it blank will suppress all message except if the
|
||||||
|
listening port is in use.
|
||||||
|
|
||||||
|
## CLI usage
|
||||||
|
|
||||||
|
A simple relay peer can be set up using just the CLI, no code required. This
|
||||||
|
peer will only relay messages to all its connected peers. The logging level is
|
||||||
|
set to `info`.
|
||||||
|
|
||||||
|
```
|
||||||
|
./app.js 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575 ...
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
The first argument is the listening port, optionally followed by space separated
|
||||||
|
list of peers to connect with.
|
||||||
|
|
||||||
|
## Methods and attributes
|
||||||
|
|
||||||
|
Methods are provided by either the `P2P` or `PubSub` classes. The `P2PSub` class
|
||||||
|
is a mix of both classes and merges the Pub/Sub with p2p functions.
|
||||||
|
|
||||||
|
The `P2PSub` class provides `subscribe()`, `publish()`, `addPeer()` and
|
||||||
|
`removePeer()`. Please see the methods below for detailed usage.
|
||||||
|
|
||||||
|
#### Provided by `PubSub` class
|
||||||
|
|
||||||
|
* `subscribe(topic, callback-function)` Sets a function to be called on `topic`
|
||||||
|
publish. A RegExp patter can be passed as the topic and will match to
|
||||||
|
`topic` on publish. a String will perform an exact match.
|
||||||
|
|
||||||
|
* `publish(topic, JSON-body)` Executes each callback attached to the passed
|
||||||
|
`topic`. To prevent a publication from propagating across the network, pass
|
||||||
|
`__local = true` in the message body.
|
||||||
|
|
||||||
|
* `topics` An instance attribute holding an Object on topics and bound callbacks.
|
||||||
|
This is not exposed by the `P2PSub` class.
|
||||||
|
|
||||||
|
#### Provided by `P2P` class
|
||||||
|
|
||||||
|
* `addPeer(peer)` Adds a remote peer to the local peers connections. `peer` can
|
||||||
|
passed as a single sting with a hostname and port, `'remotehost.com:7575'`,
|
||||||
|
or an Array of Strings with peers `['remotehost.com:7575', '10.10.2.1:7575']`.
|
||||||
|
Duplicate and existing peers will be ignored.
|
||||||
|
|
||||||
|
* `removePeer(peer)` Disconnect the passed peer and removes it from the list of
|
||||||
|
peers this instances auto reconnects to. `peer` is passed as a single sting
|
||||||
|
with a hostname and port, `'remotehost.com:7575'`
|
||||||
|
|
||||||
|
* `onData(callback)` Pass a listener to called when this peer receives a message
|
||||||
|
from the network. The message is passed to the callback as native JSON
|
||||||
|
object. This is not exposed by the `P2PSub` class, `subscribe` should be
|
||||||
|
used.
|
||||||
|
|
||||||
|
* `broadcast(message, <excludes>)` Sends a JSON message to all connected peers.
|
||||||
|
`message` is JSON object that will be stringified. `excludes` is a Array of
|
||||||
|
Strings containing peerID's that this broadcast should not be sent you, and
|
||||||
|
is for internal use at this time. This is not exposed by the `P2PSub` class,
|
||||||
|
`publish` should be used.
|
||||||
|
|
||||||
|
* `peerID` An instance attribute holding a String for the local `peerID`. This
|
||||||
|
is randomly generated and is for internal use. This is not exposed by the
|
||||||
|
`P2PSub` class.
|
||||||
|
|
||||||
|
## Todo
|
||||||
|
|
||||||
|
* Add timestamps to each message.
|
||||||
|
* Change the parsing of the message move `sentTo` in the prototype before
|
||||||
|
passing the class.
|
||||||
|
* Add optional TSL
|
||||||
|
* Internal ability to publish new peers across the network.
|
||||||
|
* Add config file for CLI mode.
|
||||||
|
69
app.js
69
app.js
@ -1,28 +1,53 @@
|
|||||||
#!/usr/bin/env nodejs
|
const {PubSub} = require('./pubsub');
|
||||||
const app = {};
|
const {P2P} = require('./p2p');
|
||||||
module.exports = app;
|
|
||||||
|
|
||||||
const {P2P} = require('./p2p')
|
|
||||||
|
|
||||||
|
|
||||||
const args = process.argv.slice(1);
|
class P2PSub{
|
||||||
|
constructor(...args){
|
||||||
|
this.p2p = new P2P(...args);
|
||||||
|
|
||||||
const exec_name = args[0].split('/').pop();
|
this.pubsub = new PubSub();
|
||||||
const server_port = args[1];
|
|
||||||
const clients_list = args.slice(2);
|
|
||||||
|
|
||||||
if(server_port === "help"){
|
this.pubsub.subscribe(/.*/gi, function(data, topic){
|
||||||
console.error('Please supply the server port and list of clients to connect too;')
|
if(data.__local) return false;
|
||||||
console.error(`${exec_name} <server port> <client 1> <client 2> <client 3> ...` )
|
this.p2p.broadcast({
|
||||||
console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` )
|
type:'topic',
|
||||||
process.exit(0)
|
body:{
|
||||||
|
topic: topic,
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
this.p2p.onData(function(data){
|
||||||
|
data.__local = true;
|
||||||
|
if(data.type === 'topic') this.pubsub.publish(data.body.topic, data.body.data, true);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
subscribe(){
|
||||||
|
return this.subscribe.apply(this.pubsub, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
publish(){
|
||||||
|
return this.publish.apply(this.pubsub, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
broadcast(){
|
||||||
|
return this.broadcast.apply(this.p2p, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
onData(){
|
||||||
|
return this.onData.apply(this.p2p, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
addPeer(){
|
||||||
|
return this.addPeer.apply(this.p2p, arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
removePeer(){
|
||||||
|
return this.removePeer.apply(this.p2p, arguments);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log('port:', server_port, 'clients:', clients_list)
|
module.exports = {P2PSub, P2P, PubSub};
|
||||||
|
|
||||||
app.p2p = new P2P({
|
|
||||||
listenPort: server_port,
|
|
||||||
peers: clients_list
|
|
||||||
})
|
|
||||||
|
|
||||||
app.pubsub = require('./pubsub')
|
|
||||||
|
25
cli.js
Executable file
25
cli.js
Executable file
@ -0,0 +1,25 @@
|
|||||||
|
#!/usr/bin/env nodejs
|
||||||
|
|
||||||
|
|
||||||
|
const {P2PSub} = require('./app');
|
||||||
|
|
||||||
|
const args = process.argv.slice(1);
|
||||||
|
|
||||||
|
const exec_name = args[0].split('/').pop();
|
||||||
|
const listenPort = args[1];
|
||||||
|
const peers = args.slice(2);
|
||||||
|
|
||||||
|
if(P2PSub === "help"){
|
||||||
|
console.error('Please supply the server port and list of clients to connect too;')
|
||||||
|
console.error(`${exec_name} <server port> <client 1> <client 2> <client 3> ...` )
|
||||||
|
console.error(`${exec_name} 7575 10.1.0.1:7575 10.2.0.1:7575 10.3.0.1:7575` )
|
||||||
|
process.exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// console.log('port:', server_port, 'clients:', clients_list)
|
||||||
|
|
||||||
|
let instance = new P2PSub({
|
||||||
|
listenPort,
|
||||||
|
peers,
|
||||||
|
logLevel: ['info']
|
||||||
|
});
|
112
p2p.js
112
p2p.js
@ -3,24 +3,50 @@ const net = require('net');
|
|||||||
|
|
||||||
class P2P {
|
class P2P {
|
||||||
constructor(args){
|
constructor(args){
|
||||||
|
args = args || {};
|
||||||
|
|
||||||
|
// The list of clients this peer wants to connect to
|
||||||
this.wantedPeers = new Set();
|
this.wantedPeers = new Set();
|
||||||
|
|
||||||
|
// The peers we are currently connected to
|
||||||
this.connectedPeers = {};
|
this.connectedPeers = {};
|
||||||
|
|
||||||
|
// Random ID for this peer
|
||||||
this.peerID = crypto.randomBytes(16).toString("hex");
|
this.peerID = crypto.randomBytes(16).toString("hex");
|
||||||
|
|
||||||
|
// Kick of the interval to connect to peers and send hear beats
|
||||||
this.connectInterval = this.__peerInterval();
|
this.connectInterval = this.__peerInterval();
|
||||||
|
|
||||||
|
// Hold the data callbacks when a message is received
|
||||||
this.onDataCallbacks = [];
|
this.onDataCallbacks = [];
|
||||||
|
|
||||||
|
// Set the logging level
|
||||||
|
this.logLevel = args.logLevel || false;
|
||||||
|
|
||||||
|
// If a listen port was specified, have this peer listen for incoming
|
||||||
|
// connection
|
||||||
if(args.listenPort){
|
if(args.listenPort){
|
||||||
this.server = this.__listen(args.listenPort);
|
this.server = this.__listen(args.listenPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If a list of peers in supplied, add them.
|
||||||
if(args.peers){
|
if(args.peers){
|
||||||
this.addPeer(args.peers);
|
this.addPeer(args.peers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.__log('info', 'Local peerID', this.peerID);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
__log(type, ...message){
|
||||||
|
if(this.logLevel || this.logLevel === 'all' || this.logLevel.includes(type)){
|
||||||
|
console[type](...message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take a peer as <host>:<port> and add it the `wantedPeers` list
|
||||||
addPeer(peer){
|
addPeer(peer){
|
||||||
|
|
||||||
|
// If `peer` is a list, call `addPeer` with each item
|
||||||
if(Array.isArray(peer)){
|
if(Array.isArray(peer)){
|
||||||
for(let address of peer){
|
for(let address of peer){
|
||||||
this.addPeer(address);
|
this.addPeer(address);
|
||||||
@ -28,22 +54,25 @@ class P2P {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(this.wantedPeers.has(peer)) return true;
|
|
||||||
this.wantedPeers.add(peer);
|
this.wantedPeers.add(peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close a connection to a peer and remove it from the `wantedPeers` list
|
||||||
removePeer(peer){
|
removePeer(peer){
|
||||||
if(this.wantedPeers.has(peer)){
|
if(this.wantedPeers.has(peer)){
|
||||||
this.wantedPeers.delete(peer);
|
this.wantedPeers.delete(peer);
|
||||||
|
|
||||||
|
// find the peer in the `connectedPeers` object
|
||||||
for(let peerID in this.connectedPeers){
|
for(let peerID in this.connectedPeers){
|
||||||
if(this.connectedPeers[peerID].peerConnectAddress !== peer) continue;
|
if(this.connectedPeers[peerID].peerConnectAddress !== peer) continue;
|
||||||
|
|
||||||
this.connectedPeers[peerID].end();
|
this.connectedPeers[peerID].end();
|
||||||
delete this.connectedPeers[peerID];
|
delete this.connectedPeers[peerID];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Connect to a remote peer
|
||||||
__connectPeer(address, port) {
|
__connectPeer(address, port) {
|
||||||
if(!port){
|
if(!port){
|
||||||
let parse = address.split(':');
|
let parse = address.split(':');
|
||||||
@ -55,13 +84,16 @@ class P2P {
|
|||||||
let p2p = this;
|
let p2p = this;
|
||||||
|
|
||||||
peer.on('connect', function(){
|
peer.on('connect', function(){
|
||||||
console.info(`Peer ${address} is now connected.`);
|
p2p.__log('info', `Peer ${address} is now connected.`);
|
||||||
peer.peerConnectAddress = `${address}:${port}`
|
peer.peerConnectAddress = `${address}:${port}`
|
||||||
|
|
||||||
|
// When a connection is started, send a message informing the remote
|
||||||
|
// peer of our ID
|
||||||
peer.write(JSON.stringify({type:"register", id: p2p.peerID}));
|
peer.write(JSON.stringify({type:"register", id: p2p.peerID}));
|
||||||
});
|
});
|
||||||
|
|
||||||
peer.on('close', function(){
|
peer.on('close', function(){
|
||||||
console.info(`Client Peer ${address}, ${peer.peerID} droped.`);
|
p2p.__log('info', `Client Peer ${address}, ${peer.peerID} droped.`);
|
||||||
delete p2p.connectedPeers[peer.peerID];
|
delete p2p.connectedPeers[peer.peerID];
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -71,24 +103,38 @@ class P2P {
|
|||||||
|
|
||||||
peer.on('error', function(error){
|
peer.on('error', function(error){
|
||||||
if(error.syscall === 'connect' && error.code === 'ECONNREFUSED'){
|
if(error.syscall === 'connect' && error.code === 'ECONNREFUSED'){
|
||||||
console.info(`Peer ${error.address}:${error.port} connection refussed!`);
|
p2p.__log('info', `Peer ${error.address}:${error.port} connection refussed!`);
|
||||||
}else{
|
}else{
|
||||||
console.warn('client EVENT error:', arguments);
|
p2p.__log('warn', 'client EVENT error:', arguments);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
__peerInterval(interval){
|
__peerInterval(interval){
|
||||||
|
|
||||||
return setInterval(function(p2p){
|
this.count = 1;
|
||||||
let connected = Object.keys(p2p.connectedPeers).map(function(peerID){
|
|
||||||
return p2p.connectedPeers[peerID].peerConnectAddress
|
|
||||||
});
|
|
||||||
|
|
||||||
for(let peer of p2p.wantedPeers){
|
return setInterval(function(p2p){
|
||||||
if(!connected.includes(peer)){
|
// Copy the wanted peers list do we can reduce it.
|
||||||
p2p.__connectPeer(peer);
|
let tryConnectionSet = new Set(p2p.wantedPeers);
|
||||||
}
|
|
||||||
|
// loop over all the connected peers
|
||||||
|
for(let peerID in p2p.connectedPeers){
|
||||||
|
let peer = p2p.connectedPeers[peerID];
|
||||||
|
|
||||||
|
// If the peer does not a `peerConnactAddress`, it
|
||||||
|
if(! peer.peerConnectAddress) continue;
|
||||||
|
|
||||||
|
// Remove connected peers from the list
|
||||||
|
tryConnectionSet.delete(peer.peerConnectAddress);
|
||||||
|
|
||||||
|
// Every once and while send a heart beat keep the socket open.
|
||||||
|
if((p2p.count % 10) == 0) peer.write(JSON.stringify({type:"heartbeat"}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// loop over the unconnected peers, and try to connect to them.
|
||||||
|
for(let peer of tryConnectionSet){
|
||||||
|
p2p.__connectPeer(peer);
|
||||||
}
|
}
|
||||||
}, interval || 1000, this);
|
}, interval || 1000, this);
|
||||||
}
|
}
|
||||||
@ -99,19 +145,18 @@ class P2P {
|
|||||||
|
|
||||||
let serverSocket = new net.Server(function (clientSocket) {
|
let serverSocket = new net.Server(function (clientSocket) {
|
||||||
|
|
||||||
console.info(`server ${port}`)
|
|
||||||
|
|
||||||
|
|
||||||
clientSocket.on('error', function(){
|
clientSocket.on('error', function(){
|
||||||
console.log('server-client EVENT error:', arguments);
|
console.log('server-client EVENT error:', arguments);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
serverSocket.on('connection', function(clientSocket){
|
serverSocket.on('connection', function(clientSocket){
|
||||||
|
|
||||||
console.log('server EVENT connection from client:', clientSocket.remoteAddress);
|
console.log('server EVENT connection from client:', clientSocket.remoteAddress);
|
||||||
|
|
||||||
|
// When a connection is started, send a message informing the remote
|
||||||
|
// peer of our ID
|
||||||
clientSocket.write(JSON.stringify({type:"register", id: p2p.peerID}));
|
clientSocket.write(JSON.stringify({type:"register", id: p2p.peerID}));
|
||||||
|
|
||||||
clientSocket.on('data', function(data){
|
clientSocket.on('data', function(data){
|
||||||
@ -119,18 +164,22 @@ class P2P {
|
|||||||
});
|
});
|
||||||
|
|
||||||
clientSocket.on('close', function(){
|
clientSocket.on('close', function(){
|
||||||
console.info(`server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`);
|
p2p.__log('info', `server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`);
|
||||||
delete p2p.connectedPeers[clientSocket.peerID];
|
delete p2p.connectedPeers[clientSocket.peerID];
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
serverSocket.on('listening', function(){
|
serverSocket.on('listening', function(){
|
||||||
console.log('p2p server listening on', port,)
|
p2p.__log('info','p2p server listening on', port,)
|
||||||
});
|
});
|
||||||
|
|
||||||
serverSocket.on('error', function(){
|
serverSocket.on('error', function(error){
|
||||||
console.log('server EVENT error:', arguments);
|
if(error.syscall === 'listen' && error.code === 'EADDRINUSE'){
|
||||||
|
console.error('ERROR: Server listening port', port, 'address already in use.')
|
||||||
|
process.exit(2)
|
||||||
|
}
|
||||||
|
p2p.__log('error','server EVENT error:', arguments);
|
||||||
});
|
});
|
||||||
|
|
||||||
serverSocket.listen(Number(port));
|
serverSocket.listen(Number(port));
|
||||||
@ -142,38 +191,51 @@ class P2P {
|
|||||||
exclude = [...exclude || [], ...[this.peerID]]
|
exclude = [...exclude || [], ...[this.peerID]]
|
||||||
let sentTo = []
|
let sentTo = []
|
||||||
|
|
||||||
|
// Build a list of peers to send this message too
|
||||||
for(let _peerID in this.connectedPeers){
|
for(let _peerID in this.connectedPeers){
|
||||||
if(exclude.includes(_peerID)) continue;
|
if(exclude.includes(_peerID)) continue;
|
||||||
sentTo.push(_peerID);
|
sentTo.push(_peerID);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Attach a list of peers this message has been send to, including our
|
||||||
|
// peerID
|
||||||
message.sentTo = [...new Set([...message.sentTo || [], ...sentTo, ...[this.peerID]])]
|
message.sentTo = [...new Set([...message.sentTo || [], ...sentTo, ...[this.peerID]])]
|
||||||
message.from = message.from || this.peerID;
|
message.from = message.from || this.peerID;
|
||||||
|
|
||||||
|
// Send the message to the connected peers in the `sentTo` list.
|
||||||
for(let _peerID of sentTo){
|
for(let _peerID of sentTo){
|
||||||
this.connectedPeers[_peerID].write(JSON.stringify(message));
|
this.connectedPeers[_peerID].write(JSON.stringify(message));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
__read(message, from, socket){
|
__read(message, from, socket){
|
||||||
|
// Parse an incoming message
|
||||||
|
|
||||||
|
// Drop heart beats
|
||||||
if(message.type === "heartbeat"){
|
if(message.type === "heartbeat"){
|
||||||
console.log('heartbeat from', from);
|
this.__log('info', 'heartbeat from', from);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register new clients with this peer, drop them if a connection
|
||||||
|
// already exists
|
||||||
if(message.type === "register"){
|
if(message.type === "register"){
|
||||||
console.log('registering peer', message.id, socket.remoteAddress)
|
this.__log('info', 'registering peer', message.id, socket.remoteAddress)
|
||||||
|
|
||||||
if(Object.keys(this.connectedPeers).includes(message.id)){
|
if(Object.keys(this.connectedPeers).includes(message.id)){
|
||||||
console.log(`Dropping ${message.id}, already connected`)
|
this.__log('info', `Dropping ${message.id}, already connected`)
|
||||||
socket.end();
|
socket.end();
|
||||||
}else{
|
}else{
|
||||||
socket.peerID = message.id
|
socket.peerID = message.id
|
||||||
|
// Add the peer to 'connectedPeers' object with the remote
|
||||||
|
// peerID as the key and the socket as it' value.
|
||||||
this.connectedPeers[message.id] = socket;
|
this.connectedPeers[message.id] = socket;
|
||||||
}
|
}
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.__log('info', 'message', message);
|
||||||
|
|
||||||
// forward the message to other peers
|
// forward the message to other peers
|
||||||
this.broadcast(message, message.sentTo);
|
this.broadcast(message, message.sentTo);
|
||||||
// console.log('p2p read:', message)
|
// console.log('p2p read:', message)
|
||||||
@ -182,7 +244,6 @@ class P2P {
|
|||||||
this.onDataCallbacks.forEach(function(callback){
|
this.onDataCallbacks.forEach(function(callback){
|
||||||
callback(message);
|
callback(message);
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
onData(callback){
|
onData(callback){
|
||||||
@ -190,7 +251,6 @@ class P2P {
|
|||||||
this.onDataCallbacks.push(callback);
|
this.onDataCallbacks.push(callback);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {P2P};
|
module.exports = {P2P};
|
||||||
|
@ -7,5 +7,5 @@
|
|||||||
"test": "echo \"Error: no test specified\" && exit 1"
|
"test": "echo \"Error: no test specified\" && exit 1"
|
||||||
},
|
},
|
||||||
"author": "",
|
"author": "",
|
||||||
"license": "ISC"
|
"license": "MIT"
|
||||||
}
|
}
|
||||||
|
48
pubsub.js
48
pubsub.js
@ -1,11 +1,45 @@
|
|||||||
const app = require('./app');
|
class PubSub{
|
||||||
|
constructor(){
|
||||||
|
this.topics = {};
|
||||||
|
}
|
||||||
|
|
||||||
app.p2p.onData(function(data){
|
subscribe(topic, listener) {
|
||||||
console.log('app, data:', data)
|
if(topic instanceof RegExp){
|
||||||
})
|
listener.match = topic;
|
||||||
|
topic = "__REGEX__";
|
||||||
|
}
|
||||||
|
|
||||||
|
// console.log(this)
|
||||||
|
//
|
||||||
|
// create the topic if not yet created
|
||||||
|
if(!this.topics[topic]) this.topics[topic] = [];
|
||||||
|
|
||||||
setTimeout(function(){
|
// add the listener
|
||||||
app.p2p.broadcast({type:'topic', body:"yolo"})
|
this.topics[topic].push(listener);
|
||||||
}, 10000);
|
}
|
||||||
|
|
||||||
|
matchTopics(topic){
|
||||||
|
let topics = [... this.topics[topic] ? this.topics[topic] : []];
|
||||||
|
|
||||||
|
// console.log(this.topics)
|
||||||
|
if(!this.topics['__REGEX__']) return topics;
|
||||||
|
|
||||||
|
for(let listener of this.topics['__REGEX__']){
|
||||||
|
if(topic.match(listener.match)) topics.push(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
return topics;
|
||||||
|
}
|
||||||
|
|
||||||
|
publish(topic, data) {
|
||||||
|
|
||||||
|
// send the event to all listeners
|
||||||
|
this.matchTopics(topic).forEach(function(listener) {
|
||||||
|
setTimeout(function(data, topic){
|
||||||
|
listener(data || {}, topic);
|
||||||
|
}, 0, data, topic);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {PubSub};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user