From 164a972faaa5f20442cd0e3f3e176a5161e51a59 Mon Sep 17 00:00:00 2001 From: William Mantly Date: Fri, 21 Aug 2020 16:54:15 -0400 Subject: [PATCH] classes! --- app.js | 18 ++--- client.js | 38 ----------- p2p.js | 196 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ peers.js | 126 ----------------------------------- pubsub.js | 11 +++ server.js | 51 -------------- 6 files changed, 216 insertions(+), 224 deletions(-) delete mode 100644 client.js create mode 100644 p2p.js delete mode 100644 peers.js create mode 100644 pubsub.js delete mode 100644 server.js diff --git a/app.js b/app.js index dca7a95..a02ecce 100755 --- a/app.js +++ b/app.js @@ -1,6 +1,9 @@ #!/usr/bin/env nodejs +const app = {}; +module.exports = app; + +const {P2P} = require('./p2p') -const p2p = require('./peers') const args = process.argv.slice(1); @@ -18,13 +21,10 @@ if(server_port){ process.exit(0) } -p2p.listen(server_port); -for(let client of clients_list){ - p2p.addPeer(client); -} +app.p2p = new P2P({ + listenPort: server_port, + peers: clients_list +}) - -setTimeout(function(){ - p2p.broadcast({type:'topic', body:"yolo"}) -}, 10000); \ No newline at end of file +app.pubsub = require('./pubsub') \ No newline at end of file diff --git a/client.js b/client.js deleted file mode 100644 index 71875a5..0000000 --- a/client.js +++ /dev/null @@ -1,38 +0,0 @@ -const net = require('net'); - -var connect = function(address, port){ - if(!port){ - let parse = address.split(':'); - address = parse[0]; - port = parse[1]; - } - - let socket = new net.Socket().connect(port, address); - - // socket.on('connect', function(){ - // console.log('client EVENT connect:', arguments); - // }); - - // socket.on('close', function(){ - // console.log('client EVENT close:', arguments); - // }); - - // socket.on('ready', function(){ - // console.log('client EVENT ready:', arguments); - // }); - - // socket.on('data', function(data){ - // console.log('client EVENT data:',data.toString(), socket.remoteAddress); - // }); - - - socket.on('error', function(){ - console.log('client EVENT error:', arguments); - }); - - - return socket; - -} - -module.exports = {connect} diff --git a/p2p.js b/p2p.js new file mode 100644 index 0000000..4a4450a --- /dev/null +++ b/p2p.js @@ -0,0 +1,196 @@ +const crypto = require("crypto"); +const net = require('net'); + +class P2P { + constructor(args){ + this.wantedPeers = new Set(); + this.connectedPeers = {}; + this.peerID = crypto.randomBytes(16).toString("hex"); + this.connectInterval = this.__peerInterval(); + this.onDataCallbacks = []; + + + if(args.listenPort){ + this.server = this.__listen(args.listenPort); + } + + if(args.peers){ + this.addPeer(args.peers); + } + + } + + addPeer(peer){ + if(Array.isArray(peer)){ + for(let address of peer){ + this.addPeer(address); + } + return; + } + + if(this.wantedPeers.has(peer)) return true; + this.wantedPeers.add(peer); + } + + removePeer(peer){ + if(this.wantedPeers.has(peer)){ + this.wantedPeers.delete(peer); + + for(let peerID in this.connectedPeers){ + if(this.connectedPeers[peerID].peerConnectAddress !== peer) continue; + this.connectedPeers[peerID].end(); + delete this.connectedPeers[peerID]; + } + } + } + + __connectPeer(address, port) { + if(!port){ + let parse = address.split(':'); + address = parse[0]; + port = parse[1]; + } + + let peer = new net.Socket().connect(port, address); + let p2p = this; + + peer.on('connect', function(){ + console.info(`Peer ${address} is now connected.`); + peer.peerConnectAddress = `${address}:${port}` + peer.write(JSON.stringify({type:"register", id: p2p.peerID})); + }); + + peer.on('close', function(){ + console.info(`Client Peer ${address}, ${peer.peerID} droped.`); + delete p2p.connectedPeers[peer.peerID]; + }); + + peer.on('data', function(data){ + p2p.__read(JSON.parse(data.toString()), peer.remoteAddress, peer); + }); + + peer.on('error', function(error){ + if(error.syscall === 'connect' && error.code === 'ECONNREFUSED'){ + console.info(`Peer ${error.address}:${error.port} connection refussed!`); + }else{ + console.warn('client EVENT error:', arguments); + } + }); + } + + __peerInterval(interval){ + + return setInterval(function(p2p){ + let connected = Object.keys(p2p.connectedPeers).map(function(peerID){ + return p2p.connectedPeers[peerID].peerConnectAddress + }); + + for(let peer of p2p.wantedPeers){ + if(!connected.includes(peer)){ + p2p.__connectPeer(peer); + } + } + }, interval || 1000, this); + } + + __listen (port){ + + let p2p = this; + + let serverSocket = new net.Server(function (clientSocket) { + + console.info(`server ${port}`) + + + clientSocket.on('error', function(){ + console.log('server-client EVENT error:', arguments); + }); + + + }); + + serverSocket.on('connection', function(clientSocket){ + + console.log('server EVENT connection from client:', clientSocket.remoteAddress); + clientSocket.write(JSON.stringify({type:"register", id: p2p.peerID})); + + clientSocket.on('data', function(data){ + p2p.__read(JSON.parse(data.toString()), clientSocket.remoteAddress, clientSocket); + }); + + clientSocket.on('close', function(){ + console.info(`server Peer ${clientSocket.remoteAddress} - ${clientSocket.peerID} droped.`); + delete p2p.connectedPeers[clientSocket.peerID]; + }); + + }); + + serverSocket.on('listening', function(){ + console.log('p2p server listening on', port,) + }); + + serverSocket.on('error', function(){ + console.log('server EVENT error:', arguments); + }); + + serverSocket.listen(Number(port)); + + return serverSocket; + } + + broadcast(message, exclude){ + exclude = [...exclude || [], ...[this.peerID]] + let sentTo = [] + + for(let _peerID in this.connectedPeers){ + if(exclude.includes(_peerID)) continue; + sentTo.push(_peerID); + } + + message.sentTo = [...new Set([...message.sentTo || [], ...sentTo, ...[this.peerID]])] + message.from = message.from || this.peerID; + + for(let _peerID of sentTo){ + this.connectedPeers[_peerID].write(JSON.stringify(message)); + } + } + + __read(message, from, socket){ + if(message.type === "heartbeat"){ + console.log('heartbeat from', from); + return ; + } + + if(message.type === "register"){ + console.log('registering peer', message.id, socket.remoteAddress) + + if(Object.keys(this.connectedPeers).includes(message.id)){ + console.log(`Dropping ${message.id}, already connected`) + socket.end(); + }else{ + socket.peerID = message.id + this.connectedPeers[message.id] = socket; + } + return ; + } + + // forward the message to other peers + this.broadcast(message, message.sentTo); + // console.log('p2p read:', message) + + // pass message to local callbacks + this.onDataCallbacks.forEach(function(callback){ + callback(message); + }) + + } + + onData(callback){ + if(callback instanceof Function){ + this.onDataCallbacks.push(callback); + } + } + +} + +module.exports = {P2P}; diff --git a/peers.js b/peers.js deleted file mode 100644 index 1e68afb..0000000 --- a/peers.js +++ /dev/null @@ -1,126 +0,0 @@ -const TcpServer = require('./server'); -const TcpClient = require('./client'); -const crypto = require("crypto"); - -const peerID = crypto.randomBytes(16).toString("hex"); - -console.log('Peer ID:', peerID) - -let connectedPeers = {} -let peers = [] - -let addPeer = function(peer){ - if(peers.includes(peer)) return true; - peers.push(peer) -} - -setInterval(function(){ - let connected = Object.keys(connectedPeers).map(function(i){return connectedPeers[i].peerConnectAddress}) - console.log('interval', connected, Object.keys(connectedPeers)) - for(let peer of peers){ - if(!connected.includes(peer)){ - connectPeer(peer); - } - } -}, 1000); - - -let connectPeer = function(address, port) { - if(!port){ - let parse = address.split(':'); - address = parse[0]; - port = parse[1]; - } - - let peer = TcpClient.connect(address, port); - - peer.on('connect', function(){ - console.info(`Peer ${address} is now connected.`); - peer.peerConnectAddress = `${address}:${port}` - peer.write(JSON.stringify({type:"register", id: peerID})); - }); - - peer.on('close', function(){ - delete connectedPeers[peer.peerID]; - console.info(`Peer ${address} droped.`); - }); - - peer.on('data', function(data){ - read(JSON.parse(data.toString()), peer.remoteAddress, peer); - }); - -} - -let removePeer = function(peer){ - if(peer[peer.address]){ - peer.end(); - delete peer[peer.address] - } -} - -let listen = function(port){ - console.log('p2p listen', port) - let serverSocket = TcpServer(port); - - serverSocket.on('connection', function(clientSocket){ - console.log('server EVENT connection from client:', clientSocket.remoteAddress); - clientSocket.write(JSON.stringify({type:"register", id: peerID})); - - clientSocket.on('data', function(data){ - read(JSON.parse(data.toString()), clientSocket.remoteAddress, clientSocket); - }); - - clientSocket.on('close', function(){ - delete connectedPeers[clientSocket.peerID]; - console.info(`Peer ${clientSocket.remoteAddress} droped.`); - }); - - }); -} - - -let broadcast = function(message, exclude){ - exclude = [...exclude || [], ...[peerID]] - let sentTo = [] - - for(let _peerID in connectedPeers){ - if(exclude.includes(_peerID)) continue; - sentTo.push(_peerID); - } - - message.sentTo = [...new Set([...message.sentTo || [] ,...sentTo, ...[peerID]])] - message.from = message.from || peerID; - - for(let _peerID of sentTo){ - connectedPeers[_peerID].write(JSON.stringify(message)); - } -} - -let read = function(message, from, socket){ - if(message.type === "heartbeat"){ - console.log('heartbeat from', from); - return ; - } - - if(message.type === "register"){ - console.log('registering net peer', message.id, socket.remoteAddress) - if(Object.keys(connectedPeers).includes(message.id)){ - console.log(`Dropping ${message.id}, already connected`) - socket.end(); - }else{ - connectedPeers[message.id] = socket; - connectedPeers[message.id].peerID = message.id - } - } - - if(message.type === 'topic'){ - broadcast(message, message.sentTo); - console.log('p2p read:', message) - - // publish message locally - //publish(message.top, message.body) - } -} - - -module.exports = {broadcast, connectPeer, listen, addPeer} diff --git a/pubsub.js b/pubsub.js new file mode 100644 index 0000000..18bef03 --- /dev/null +++ b/pubsub.js @@ -0,0 +1,11 @@ +const app = require('./app'); + +app.p2p.onData(function(data){ + console.log('app, data:', data) +}) + + +setTimeout(function(){ + app.p2p.broadcast({type:'topic', body:"yolo"}) +}, 10000); + diff --git a/server.js b/server.js deleted file mode 100644 index b0b81d8..0000000 --- a/server.js +++ /dev/null @@ -1,51 +0,0 @@ -const net = require('net') - -const server = function(port){ - - let serverSocket = new net.Server(function (clientSocket) { - - console.info(`server ${port}`) - - // clientSocket.on('connection', function(){ - // console.log('server-client EVENT connection:', arguments); - // }); - - // clientSocket.on('listening', function(){ - // console.log('server-client EVENT listening:', arguments); - // }); - - clientSocket.on('error', function(){ - console.log('server-client EVENT error:', arguments); - }); - - // clientSocket.on('data', function (data) { - // console.log('server-client EVENT data', data.toString(), clientSocket.remoteAddress); - // }); - - }) - serverSocket.listen(Number(port)); - - // serverSocket.on('connection', function(clientSocket){ - // console.log('server EVENT connection:', clientSocket.remoteAddress); - // }); - - // serverSocket.on('listening', function(){ - // console.log('server EVENT listening:', arguments); - // }); - - serverSocket.on('error', function(){ - console.log('server EVENT error:', arguments); - }); - - // serverSocket.on('data', function (data) { - // console.info('here...') - // // console.log(arguments, serverSocket); - // }); - - - return serverSocket; - -} - - -module.exports = server; \ No newline at end of file