From a3107de459cb2c458ca19bd3b398d90d94b1ddea Mon Sep 17 00:00:00 2001 From: Thomas Harvey Date: Mon, 9 Oct 2017 23:12:48 -0400 Subject: [PATCH] adding concept of Worker and Runner. Moved label2runner to Workers manager. --- routes/api.js | 77 ++++++------ .../{workers_manager.js => worker_manager.js} | 119 +++++++++++++++++- 2 files changed, 150 insertions(+), 46 deletions(-) rename routes/{workers_manager.js => worker_manager.js} (76%) diff --git a/routes/api.js b/routes/api.js index 51f7c66..b9a23ac 100644 --- a/routes/api.js +++ b/routes/api.js @@ -15,51 +15,44 @@ var label2runner = {}; // var tagPrefix = settings.tagPrefix || 'clwV'; -var workers = require('./workers_manager.js'); +var workers = require('./worker_manager.js'); -var ramPercentUsed = function(ip, callback){ - // checks the percent of ram used on a worker. - return lxc.exec( - "python3 -c \"a=`head /proc/meminfo|grep MemAvail|grep -Po '\\d+'`;t=`head /proc/meminfo|grep MemTotal|grep -Po '\\d+'`;print(round(((t-a)/t)*100, 2))\"", - ip, - callback - ); -}; -var runnerTimeout = function(runner, time){ - time = time || 60000; // 1 minutes +// var runnerTimeout = function(runner, time){ +// time = time || 60000; // 1 minutes - if(runner.hasOwnProperty('timeout')){ - clearTimeout(runner.timeout); - } +// if(runner.hasOwnProperty('timeout')){ +// clearTimeout(runner.timeout); +// } - return runner.timeout = setTimeout(runnerFree, time, runner); -}; +// return runner.timeout = setTimeout(runnerFree, time, runner); +// }; -var runnerFree = function(runner){ - lxc.stop(runner.name, runner.worker.ip); - runner.worker.usedrunners--; - if(runner.hasOwnProperty('timeout')){ - clearTimeout(runner.timeout); - } - delete label2runner[runner.label]; +// var runnerFree = function(runner){ +// lxc.stop(runner.name, runner.worker.ip); +// runner.worker.usedrunners--; +// if(runner.hasOwnProperty('timeout')){ +// clearTimeout(runner.timeout); +// } +// delete label2runner[runner.label]; - console.log(`Runner freed ${runner.label}.`, runner.worker); - workers.startRunners({worker: runner.worker}); -}; +// console.log(`Runner freed ${runner.label}.`, runner.worker); +// workers.startRunners({worker: runner.worker}); +// }; -var getAvailrunner = function(runner){ - for(let worker of workers){ - if(worker.availrunners.length === 0) continue; - if(runner && runner.worker.index <= worker.index) break; - if(runner) runnerFree(runner); +// var getAvailrunner = function(runner){ +// for(let worker of workers){ +// if(worker.availrunners.length === 0) continue; +// if(runner && runner.worker.index <= worker.index) break; +// // if(runner) runnerFree(runner); +// if(runner) runner.free(); - return worker.getRunner(); - } +// return worker.getRunner(); +// } - if(runner) return runner; -}; +// if(runner) return runner; +// }; var run = function(req, res, runner, count){ count = count || 0; @@ -88,12 +81,14 @@ var run = function(req, res, runner, count){ return request.post(httpOptions, function(error, response, body){ // console.log('runner response:', arguments) - if(error || response.statusCode !== 200) return run(req, res, getAvailrunner(), ++count); + if(error || response.statusCode !== 200) return run(req, res, workers.getAvailableRunner(), ++count); body = JSON.parse(body); if(req.query.once){ res.json(body); - return runnerFree(runner, 0); + // 0 here does nothing + // return runnerFree(runner, 0); + return runner.free(); } label2runner[runner.label] = runner; @@ -102,7 +97,8 @@ var run = function(req, res, runner, count){ body['wname'] = runner.worker.name; res.json(body); - runnerTimeout(runner); + // runnerTimeout(runner); + runner.setTimeout(); }); }; @@ -209,13 +205,14 @@ router.get('/liststuff', function(req, res, next){ router.get('/ping/:runner', function(req, res, next){ var runner = label2runner[req.params.runner]; - runnerTimeout(runner); + // runnerTimeout(runner); + runner.setTimeout(); res.json({res:''}); }); router.post('/run/:runner?', function (req, res, next){ console.log(`Request runner route!`); - var runner = getAvailrunner(label2runner[req.params.runner]); + var runner = workers.getAvailrunner(workers.getRunner(req.params.runner)); return run(req, res, runner); }); diff --git a/routes/workers_manager.js b/routes/worker_manager.js similarity index 76% rename from routes/workers_manager.js rename to routes/worker_manager.js index b314746..df7d727 100644 --- a/routes/workers_manager.js +++ b/routes/worker_manager.js @@ -7,6 +7,88 @@ var settings = require('./workers.json'); var tagPrefix = settings.tagPrefix || 'clwV'; + +var ramPercentUsed = function(ip, callback){ + // checks the percent of ram used on a worker. + + return lxc.exec( + "python3 -c \"a=`head /proc/meminfo|grep MemAvail|grep -Po '\\d+'`;t=`head /proc/meminfo|grep MemTotal|grep -Po '\\d+'`;print(round(((t-a)/t)*100, 2))\"", + ip, + callback + ); +}; + +var Runner = (function(){ + var proto = {}; + + proto.create = function(config){ + var runner = Object.create(proto); + Object.assign(runner, config); + return runner; + }; + + proto.free = function(){ + var runner = this; + lxc.stop(runner.name, runner.worker.ip); + runner.worker.usedrunners--; + if(runner.hasOwnProperty('timeout')){ + clearTimeout(runner.timeout); + }; + + + // TODO: FIX THIS + delete label2runner[runner.label]; + + console.log(`Runner freed ${runner.label}.`, runner.worker); + // Why does this need to run here? + workers.startRunners({worker: runner.worker}); + }; + + proto.setTimeout = function(time){ + time = time || 60000; // 1 minutes + var runner = this; + if(runner.hasOwnProperty('timeout')){ + clearTimeout(runner.timeout); + } + + return runner.timeout = setTimeout(function(){ + runner.free(); + }, time); + }; + +})(); + + +var Worker = (function(){ + var proto = {}; + + proto.create = function(config){ + var worker = Object.create(proto); + + worker.networks.v4.forEach(function(value){ + worker[value.type+'IP'] = value.ip_address; + }); + + worker.availrunners = []; + worker.ip = worker.publicIP; + worker.usedrunners = 0; + worker.index = workers.length; + + return worker; + }; + + proto.getRunner = function(){ + if(this.availrunners.length === 0) return false; + // console.log('getting runner from ', worker.name, ' avail length ', this.availrunners.length); + var runner = this.availrunners.pop(); + this.usedrunners++; + runner.setTimeout(); + + return runner; + }; +})(); + + var workers = (function(){ // works array constructor. This will hold the works(order by creation) and all // the methods interacting with the workers. @@ -14,6 +96,32 @@ var workers = (function(){ // base array that will be the workers objects. var workers = []; + var runnerMap = {}; + + workers.setRunner = function(runner){ + runnerMap[runner.label] = runner; + }; + + + workers.getRunner = function(label){ + return runnerMap[runner.label]; + }; + + + workers.getAvailableRunner = function(runner){ + for(let worker of workers){ + if(worker.availrunners.length === 0) continue; + if(runner && runner.worker.index <= worker.index) break; + // if(runner) runnerFree(runner); + if(runner) runner.free(); + + return worker.getRunner(); + } + + if(runner) return runner; + }; + + // persistent settings object // .image is the currently used Digital Ocean snap shot ID // .lastSnapShotId is the previous ID used Digital Ocean snap shot @@ -62,7 +170,6 @@ var workers = (function(){ }); } }); - }; workers.makeWorkerObj = function(worker){ @@ -85,7 +192,7 @@ var workers = (function(){ runnerTimeout(runner); return runner; - } + }; return worker; }; @@ -95,7 +202,6 @@ var workers = (function(){ return workers.map(function(item){ return item.id; }); - }; workers.destroy = function(worker){ @@ -167,12 +273,13 @@ var workers = (function(){ if(!data.ip) return setTimeout(workers.startRunners, 0, args); // console.log('started runner on', args.worker.name) - var runner = { + var runner = Runner.create({ ip: data.ip, name: name, worker: args.worker, - label: args.worker.name + ':' + name - }; + label: args.worker.name + ':' + name, + + }); args.onStart(runner, args); args.worker.availrunners.push(runner);