diff --git a/p2p.js b/p2p.js index 4a4450a..c47aaa5 100644 --- a/p2p.js +++ b/p2p.js @@ -3,24 +3,38 @@ const net = require('net'); class P2P { constructor(args){ + + // The list of clients this peer wants to connect to this.wantedPeers = new Set(); + + // The peers we are currently connected to this.connectedPeers = {}; + + // Random ID for this peer this.peerID = crypto.randomBytes(16).toString("hex"); + + // Kick of the interval to connect to peers and send hear beats this.connectInterval = this.__peerInterval(); + + // Hold the data callbacks when a message is received this.onDataCallbacks = []; - + // If a listen port was specified, have this peer listen for incoming + // connection if(args.listenPort){ this.server = this.__listen(args.listenPort); } + // If a list of peers in supplied, add them. if(args.peers){ this.addPeer(args.peers); } - } + // Take a peer as : and add it the `wantedPeers` list addPeer(peer){ + + // If `peer` is a list, call `addPeer` with each item if(Array.isArray(peer)){ for(let address of peer){ this.addPeer(address); @@ -28,22 +42,25 @@ class P2P { return; } - if(this.wantedPeers.has(peer)) return true; this.wantedPeers.add(peer); } + // Close a connection to a peer and remove it from the `wantedPeers` list removePeer(peer){ if(this.wantedPeers.has(peer)){ this.wantedPeers.delete(peer); + // find the peer in the `connectedPeers` object for(let peerID in this.connectedPeers){ if(this.connectedPeers[peerID].peerConnectAddress !== peer) continue; + this.connectedPeers[peerID].end(); delete this.connectedPeers[peerID]; } } } + // Connect to a remote peer __connectPeer(address, port) { if(!port){ let parse = address.split(':'); @@ -57,6 +74,9 @@ class P2P { peer.on('connect', function(){ console.info(`Peer ${address} is now connected.`); peer.peerConnectAddress = `${address}:${port}` + + // When a connection is started, send a message informing the remote + // peer of our ID peer.write(JSON.stringify({type:"register", id: p2p.peerID})); }); @@ -80,15 +100,29 @@ class P2P { __peerInterval(interval){ - return setInterval(function(p2p){ - let connected = Object.keys(p2p.connectedPeers).map(function(peerID){ - return p2p.connectedPeers[peerID].peerConnectAddress - }); + this.count = 1; - for(let peer of p2p.wantedPeers){ - if(!connected.includes(peer)){ - p2p.__connectPeer(peer); - } + return setInterval(function(p2p){ + // Copy the wanted peers list do we can reduce it. + let tryConnectionSet = new Set(p2p.wantedPeers); + + // loop over all the connected peers + for(let peerID in p2p.connectedPeers){ + let peer = p2p.connectedPeers[peerID]; + + // If the peer does not a `peerConnactAddress`, it + if(! peer.peerConnectAddress) continue; + + // Remove connected peers from the list + tryConnectionSet.delete(peer.peerConnectAddress); + + // Every once and while send a heart beat keep the socket open. + if((p2p.count % 10) == 0) peer.write(JSON.stringify({type:"heartbeat"})); + } + + // loop over the unconnected peers, and try to connect to them. + for(let peer of tryConnectionSet){ + p2p.__connectPeer(peer); } }, interval || 1000, this); } @@ -99,19 +133,18 @@ class P2P { let serverSocket = new net.Server(function (clientSocket) { - console.info(`server ${port}`) - - clientSocket.on('error', function(){ console.log('server-client EVENT error:', arguments); }); - }); serverSocket.on('connection', function(clientSocket){ console.log('server EVENT connection from client:', clientSocket.remoteAddress); + + // When a connection is started, send a message informing the remote + // peer of our ID clientSocket.write(JSON.stringify({type:"register", id: p2p.peerID})); clientSocket.on('data', function(data){ @@ -142,25 +175,34 @@ class P2P { exclude = [...exclude || [], ...[this.peerID]] let sentTo = [] + // Build a list of peers to send this message too for(let _peerID in this.connectedPeers){ if(exclude.includes(_peerID)) continue; sentTo.push(_peerID); } + // Attach a list of peers this message has been send to, including our + // peerID message.sentTo = [...new Set([...message.sentTo || [], ...sentTo, ...[this.peerID]])] message.from = message.from || this.peerID; + // Send the message to the connected peers in the `sentTo` list. for(let _peerID of sentTo){ this.connectedPeers[_peerID].write(JSON.stringify(message)); } } __read(message, from, socket){ + // Parse an incoming message + + // Drop heart beats if(message.type === "heartbeat"){ console.log('heartbeat from', from); return ; } + // Register new clients with this peer, drop them if a connection + // already exists if(message.type === "register"){ console.log('registering peer', message.id, socket.remoteAddress) @@ -169,6 +211,8 @@ class P2P { socket.end(); }else{ socket.peerID = message.id + // Add the peer to 'connectedPeers' object with the remote + // peerID as the key and the socket as it' value. this.connectedPeers[message.id] = socket; } return ; @@ -182,7 +226,6 @@ class P2P { this.onDataCallbacks.forEach(function(callback){ callback(message); }) - } onData(callback){ @@ -190,7 +233,6 @@ class P2P { this.onDataCallbacks.push(callback); } } - } module.exports = {P2P}; diff --git a/package.json b/package.json index 2966761..c5bb7a8 100644 --- a/package.json +++ b/package.json @@ -7,5 +7,5 @@ "test": "echo \"Error: no test specified\" && exit 1" }, "author": "", - "license": "ISC" + "license": "MIT" }