commented

This commit is contained in:
William Mantly 2020-08-21 20:23:18 -04:00
parent 37e724cd76
commit e71803cb3d
Signed by: wmantly
GPG Key ID: 186A8370EFF937CA
2 changed files with 60 additions and 18 deletions

76
p2p.js
View File

@ -3,24 +3,38 @@ const net = require('net');
class P2P { class P2P {
constructor(args){ constructor(args){
// The list of clients this peer wants to connect to
this.wantedPeers = new Set(); this.wantedPeers = new Set();
// The peers we are currently connected to
this.connectedPeers = {}; this.connectedPeers = {};
// Random ID for this peer
this.peerID = crypto.randomBytes(16).toString("hex"); this.peerID = crypto.randomBytes(16).toString("hex");
// Kick of the interval to connect to peers and send hear beats
this.connectInterval = this.__peerInterval(); this.connectInterval = this.__peerInterval();
// Hold the data callbacks when a message is received
this.onDataCallbacks = []; this.onDataCallbacks = [];
// If a listen port was specified, have this peer listen for incoming
// connection
if(args.listenPort){ if(args.listenPort){
this.server = this.__listen(args.listenPort); this.server = this.__listen(args.listenPort);
} }
// If a list of peers in supplied, add them.
if(args.peers){ if(args.peers){
this.addPeer(args.peers); this.addPeer(args.peers);
} }
} }
// Take a peer as <host>:<port> and add it the `wantedPeers` list
addPeer(peer){ addPeer(peer){
// If `peer` is a list, call `addPeer` with each item
if(Array.isArray(peer)){ if(Array.isArray(peer)){
for(let address of peer){ for(let address of peer){
this.addPeer(address); this.addPeer(address);
@ -28,22 +42,25 @@ class P2P {
return; return;
} }
if(this.wantedPeers.has(peer)) return true;
this.wantedPeers.add(peer); this.wantedPeers.add(peer);
} }
// Close a connection to a peer and remove it from the `wantedPeers` list
removePeer(peer){ removePeer(peer){
if(this.wantedPeers.has(peer)){ if(this.wantedPeers.has(peer)){
this.wantedPeers.delete(peer); this.wantedPeers.delete(peer);
// find the peer in the `connectedPeers` object
for(let peerID in this.connectedPeers){ for(let peerID in this.connectedPeers){
if(this.connectedPeers[peerID].peerConnectAddress !== peer) continue; if(this.connectedPeers[peerID].peerConnectAddress !== peer) continue;
this.connectedPeers[peerID].end(); this.connectedPeers[peerID].end();
delete this.connectedPeers[peerID]; delete this.connectedPeers[peerID];
} }
} }
} }
// Connect to a remote peer
__connectPeer(address, port) { __connectPeer(address, port) {
if(!port){ if(!port){
let parse = address.split(':'); let parse = address.split(':');
@ -57,6 +74,9 @@ class P2P {
peer.on('connect', function(){ peer.on('connect', function(){
console.info(`Peer ${address} is now connected.`); console.info(`Peer ${address} is now connected.`);
peer.peerConnectAddress = `${address}:${port}` peer.peerConnectAddress = `${address}:${port}`
// When a connection is started, send a message informing the remote
// peer of our ID
peer.write(JSON.stringify({type:"register", id: p2p.peerID})); peer.write(JSON.stringify({type:"register", id: p2p.peerID}));
}); });
@ -80,15 +100,29 @@ class P2P {
__peerInterval(interval){ __peerInterval(interval){
return setInterval(function(p2p){ this.count = 1;
let connected = Object.keys(p2p.connectedPeers).map(function(peerID){
return p2p.connectedPeers[peerID].peerConnectAddress
});
for(let peer of p2p.wantedPeers){ return setInterval(function(p2p){
if(!connected.includes(peer)){ // Copy the wanted peers list do we can reduce it.
p2p.__connectPeer(peer); let tryConnectionSet = new Set(p2p.wantedPeers);
}
// loop over all the connected peers
for(let peerID in p2p.connectedPeers){
let peer = p2p.connectedPeers[peerID];
// If the peer does not a `peerConnactAddress`, it
if(! peer.peerConnectAddress) continue;
// Remove connected peers from the list
tryConnectionSet.delete(peer.peerConnectAddress);
// Every once and while send a heart beat keep the socket open.
if((p2p.count % 10) == 0) peer.write(JSON.stringify({type:"heartbeat"}));
}
// loop over the unconnected peers, and try to connect to them.
for(let peer of tryConnectionSet){
p2p.__connectPeer(peer);
} }
}, interval || 1000, this); }, interval || 1000, this);
} }
@ -99,19 +133,18 @@ class P2P {
let serverSocket = new net.Server(function (clientSocket) { let serverSocket = new net.Server(function (clientSocket) {
console.info(`server ${port}`)
clientSocket.on('error', function(){ clientSocket.on('error', function(){
console.log('server-client EVENT error:', arguments); console.log('server-client EVENT error:', arguments);
}); });
}); });
serverSocket.on('connection', function(clientSocket){ serverSocket.on('connection', function(clientSocket){
console.log('server EVENT connection from client:', clientSocket.remoteAddress); console.log('server EVENT connection from client:', clientSocket.remoteAddress);
// When a connection is started, send a message informing the remote
// peer of our ID
clientSocket.write(JSON.stringify({type:"register", id: p2p.peerID})); clientSocket.write(JSON.stringify({type:"register", id: p2p.peerID}));
clientSocket.on('data', function(data){ clientSocket.on('data', function(data){
@ -142,25 +175,34 @@ class P2P {
exclude = [...exclude || [], ...[this.peerID]] exclude = [...exclude || [], ...[this.peerID]]
let sentTo = [] let sentTo = []
// Build a list of peers to send this message too
for(let _peerID in this.connectedPeers){ for(let _peerID in this.connectedPeers){
if(exclude.includes(_peerID)) continue; if(exclude.includes(_peerID)) continue;
sentTo.push(_peerID); sentTo.push(_peerID);
} }
// Attach a list of peers this message has been send to, including our
// peerID
message.sentTo = [...new Set([...message.sentTo || [], ...sentTo, ...[this.peerID]])] message.sentTo = [...new Set([...message.sentTo || [], ...sentTo, ...[this.peerID]])]
message.from = message.from || this.peerID; message.from = message.from || this.peerID;
// Send the message to the connected peers in the `sentTo` list.
for(let _peerID of sentTo){ for(let _peerID of sentTo){
this.connectedPeers[_peerID].write(JSON.stringify(message)); this.connectedPeers[_peerID].write(JSON.stringify(message));
} }
} }
__read(message, from, socket){ __read(message, from, socket){
// Parse an incoming message
// Drop heart beats
if(message.type === "heartbeat"){ if(message.type === "heartbeat"){
console.log('heartbeat from', from); console.log('heartbeat from', from);
return ; return ;
} }
// Register new clients with this peer, drop them if a connection
// already exists
if(message.type === "register"){ if(message.type === "register"){
console.log('registering peer', message.id, socket.remoteAddress) console.log('registering peer', message.id, socket.remoteAddress)
@ -169,6 +211,8 @@ class P2P {
socket.end(); socket.end();
}else{ }else{
socket.peerID = message.id socket.peerID = message.id
// Add the peer to 'connectedPeers' object with the remote
// peerID as the key and the socket as it' value.
this.connectedPeers[message.id] = socket; this.connectedPeers[message.id] = socket;
} }
return ; return ;
@ -182,7 +226,6 @@ class P2P {
this.onDataCallbacks.forEach(function(callback){ this.onDataCallbacks.forEach(function(callback){
callback(message); callback(message);
}) })
} }
onData(callback){ onData(callback){
@ -190,7 +233,6 @@ class P2P {
this.onDataCallbacks.push(callback); this.onDataCallbacks.push(callback);
} }
} }
} }
module.exports = {P2P}; module.exports = {P2P};

View File

@ -7,5 +7,5 @@
"test": "echo \"Error: no test specified\" && exit 1" "test": "echo \"Error: no test specified\" && exit 1"
}, },
"author": "", "author": "",
"license": "ISC" "license": "MIT"
} }