From 0ba7f86fc12e32348ec20374cf1a6496ccb68f13 Mon Sep 17 00:00:00 2001 From: William Mantly Date: Sat, 22 Aug 2020 20:06:28 -0400 Subject: [PATCH] pubsub --- app.js | 21 ++++++++++++++++++++- pubsub.js | 46 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/app.js b/app.js index ac5f150..c1d66f6 100755 --- a/app.js +++ b/app.js @@ -1,5 +1,6 @@ #!/usr/bin/env nodejs const app = {}; +const pubsub = new (require('./pubsub')).PubSub(); module.exports = app; const {P2P} = require('./p2p') @@ -25,4 +26,22 @@ app.p2p = new P2P({ peers: clients_list }) -app.pubsub = require('./pubsub') +app.pub = pubsub.pub; +app.sub = pubsub.sub; + + +app.sub(/.*/gi, function(data, topic){ + if(data.__local) return false; + data.__local = true; + app.p2p.broadcast({ + type:'topic', + body:{ + topic: topic, + data: data + } + }); +}); + +app.p2p.onData(function(data){ + if(data.type === 'topic') app.publish(data.body.topic, data.body.data, true); +}); \ No newline at end of file diff --git a/pubsub.js b/pubsub.js index 18bef03..d06bdd7 100644 --- a/pubsub.js +++ b/pubsub.js @@ -1,11 +1,43 @@ -const app = require('./app'); +class PubSub{ + constructor(){ + this.topics = {}; + } -app.p2p.onData(function(data){ - console.log('app, data:', data) -}) + subscribe(topic, listener) { + if(topic instanceof RegExp){ + listener.match = topic; + topic = "__REGEX__"; + } + // create the topic if not yet created + if(!this.topics[topic]) this.topics[topic] = []; -setTimeout(function(){ - app.p2p.broadcast({type:'topic', body:"yolo"}) -}, 10000); + // add the listener + this.topics[topic].push(listener); + } + matchTopics(topic){ + let topics = [... this.topics[topic] ? this.topics[topic] : []] + + // console.log(this.topics) + if(!this.topics['__REGEX__']) return topics; + + for(let listener of this.topics['__REGEX__']){ + if(topic.match(listener.match)) topics.push(listener); + } + + return topics; + } + + publish(topic, data) { + + // send the event to all listeners + this.matchTopics(topic).forEach(function(listener) { + setTimeout(function(data, topic){ + listener(data || {}, topic); + }, 0, data, topic); + }); + } +} + +module.exports = {PubSub};