diff --git a/.gitignore b/.gitignore index aba296a..8e2fee9 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ node_modules # Debug log from npm npm-debug.log .c9 + +# keys +secrets.js \ No newline at end of file diff --git a/doapi.js b/doapi.js new file mode 100644 index 0000000..8847282 --- /dev/null +++ b/doapi.js @@ -0,0 +1,119 @@ +var request = require('request'); + +api = function(key){ + key = key || require('./secrets.js').doAPI; + this.BASEURL = 'https://api.digitalocean.com/v2/'; + this.headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer '+key + } + this.calls = 0; + + this.account = function(callback){ + var options = { + url: this.BASEURL+'account', + headers: this.headers + }; + this.calls++; + + return request.get(options, function(error, response, body){ + return callback(body, response, error); + }); + }; + + this.dropletsByTag = function(tag, callback){ + var options = { + url: this.BASEURL+'droplets?tag_name='+tag, + headers: this.headers + }; + this.calls++; + + return request.get(options, function(error, response, body){ + return callback(body, response, error); + }); + }; + + this.dropletSetTag = function(tag, dropletID, callback) { + var data = { + resources: [ + { + resource_id: dropletID, + resource_type: 'droplet' + } + ] + }; + var options = { + url: this.BASEURL+'tags/'+tag+'/resources', + headers: this.headers, + body: JSON.stringify(data) + }; + this.calls++; + + return request.post(options, function(error, response, body){ + return callback(body, response, error); + }); + }; + + this.dropletCreate = function(args, callback){ + var data = { + name: args.name, // || return false, + region: args.region || 'nyc3', + size: args.size || '512mb', + image: args.image || 'ubuntu-14-04-x64', + ssh_keys: args.ssh_key || null, + backups: args.backup || false, + private_networking: args.private_networking || true, + user_data: args.user_data || null + }; + var options = { + url: this.BASEURL+'droplets', + headers: this.headers, + body: JSON.stringify(data) + }; + this.calls++; + + return request.post(options, function(error, response, body){ + return callback(body, response, error); + }); + } + + this.dropletDestroy = function(dropletID, callback){ + var options = { + url: this.BASEURL+'droplets/'+dropletID, + headers: this.headers + }; + this.calls++; + + return request.del(options, function(error, response, body){ + callback(body, response, error); + }); + }; + + this.dropletInfo = function(dropletID, callback){ + var options = { + url: this.BASEURL+'droplets/'+dropletID, + headers: this.headers + }; + this.calls++; + + return request.get(options, function(error, response, body){ + callback(body, response, error); + }); + }; + + this.tagsList = function(callback){ + var options = { + url: this.BASEURL+'tags', + headers: this.headers + }; + this.calls++; + + return request.get(options, function(e,r,b){ + callback(b,r,e); + }); + }; + + return this; +} + +module.exports = api; diff --git a/lxc.js b/lxc.js index 1baa632..9bf3796 100644 --- a/lxc.js +++ b/lxc.js @@ -1,31 +1,11 @@ 'use strict'; var exec = require('child_process').exec; -var extend = require('node.extend'); - -var parseArgs = function(config){ - var all = Object.keys(config.defaults); - // console.log(all) - for(var i=config.required.length; i--;){ - if(all.indexOf(config.required[i]) !== -1){ - config.required.splice(i, 1); - } - } - - if(config.required.length !== 0) return false; - - var out = ''; - for(var i=0; i< config.takes.length; i++){ - if(all.indexOf(config.takes[i]) !== -1){ - out += '--'+config.takes[i]+' '+config.defaults[config.takes[i]]+' '; - } - } - - return out; -}; - -function sysExec(command, callback){ - command = 'unset XDG_SESSION_ID XDG_RUNTIME_DIR; cgm movepid all virt $$; ' + command; +function sysExec(command, ip, callback){ + ip = ip || '104.236.77.157'; + command = new Buffer(command).toString('base64') + command = 'ssh -o StrictHostKeyChecking=no virt@'+ ip + ' "echo ' + command + '|base64 --decode|bash"'; + // command = 'unset XDG_SESSION_ID XDG_RUNTIME_DIR; cgm movepid all virt $$; ' + command; return exec(command, (function(callback){ return function(err,data,stderr){ @@ -36,237 +16,102 @@ function sysExec(command, callback){ })(callback)); }; +var lxc = { + exec: sysExec, -var Container = function(config){ - this.name = config.name; - this.state = config.state || 'STOPPED'; - this.ip = config.ip || (config.ipv4 || '').replace('-', '') || null; - this.overlayfs = undefined; -} + create: function(name, template, config, callback){ + return sysExec('lxc-create -n '+name+' -t '+template, callback); + }, -Container.prototype.clone = function(callback){ - var overlayfs = this.overlayfs ? ' -B overlayfs -s ' : ''; - - return sysExec('lxc-clone -o '+this.orig+ ' -n '+this.name + overlayfs, callback); -}; + clone: function(name, base_name, callback){ + return sysExec('lxc-clone -o '+base_name+ ' -n '+name +' -B overlayfs -s', callback); + }, -Container.prototype.start = function(callback){ - var args = parseArgs({ - required: ['name'], - takes: ['name'], - defaults: extend({}, this) - - }); + destroy: function(name, callback){ + return sysExec('lxc-destroy -n '+ name, function(data){ + var info = data.match(/Destroyed container/); + // console.log('destroy info:', info); + var args = [true].concat(Array.prototype.slice.call(arguments, 1)); + return callback.apply(this, args); + }); + }, - var that = this; - callback = function(callback){ - that.info(); - return callback; - }; - return sysExec('lxc-start --daemon '+args, callback); -}; + start: function(name, callback){ + return sysExec('lxc-start --name '+name+' --daemon', callback); + }, -Container.prototype.startEphemeral = function(callback){ - var args = parseArgs({ - required: ['orig'], - takes: ['orig', 'name', 'key', 'union-type', 'keep-data'], - defaults: extend({}, this) - - }); - - var command = 'lxc-start-ephemeral --daemon '+args; - return sysExec(command, function(data){ - console.log('startEphemeral', arguments); - if(data.match("doesn't exist.")){ - return callback({status: 500, error: "doesn't exist."}); - } - if(data.match('already exists.')){ - return callback({status: 500, error: 'already exists'}); - } - if(data.match(/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)){ - return callback({status: 200, state:'RUNNING', ip: data.match(/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)[0]}); - } - - callback({'?': '?', data: data, name: name, base_name: base_name}); - }); -}; - -Container.prototype.destroy = function(callback){ - var args = parseArgs({ - required: ['name'], - takes: ['name', 'force'], - defaults: extend({}, this) - }); - - return sysExec('lxc-destroy '+ args, function(data){ - var info = data.match(/Destroyed container/); - console.log('destroy info:', info); - var args = [true].concat(Array.prototype.slice.call(arguments, 1)); - return callback.apply(this, args); - }); -}, - -Container.prototype.stop = function(callback){ - var args = parseArgs({ - required: ['name'], - takes: ['name', 'reboot', 'nowait', 'timeout', 'kill'], - defaults: extend({}, this) - - }); - var that = this; - callback = function(callback){ - that.info(); - return callback; - }; - return sysExec('lxc-stop '+args, callback); -}; - -Container.prototype.freeze = function(callback){ - var args = parseArgs({ - required: ['name'], - takes: ['name', 'force'], - defaults: extend({}, this) - }); - return sysExec('lxc-freeze -n '+name, callback); -}; - -Container.prototype.unfreeze = function(callback){ - - return sysExec('lxc-unfreeze --name '+this.name, callback); -}; - -Container.prototype.info = function(callback){ - var args = parseArgs({ - required: ['name'], - takes: ['name', 'reboot', 'nowait', 'timeout', 'kill'], - defaults: extend({}, this) - - }); - return sysExec('lxc-stop '+args, callback); -}; - -Container.prototype.freeze = function(callback){ - var args = parseArgs({ - required: ['name'], - takes: ['name', 'force'], - defaults: extend({}, this) - }); - return sysExec('lxc-freeze -n '+name, callback); -}; - -Container.prototype.unfreeze = function(callback){ - - return sysExec('lxc-unfreeze --name '+this.name, callback); -}; - -Container.prototype.info = function(callback){ - var that = this; - callback = callback || function(){} - - return sysExec('lxc-info --name '+this.name, function(data){ - // console.log('info', arguments); - if(data.match("doesn't exist")){ - return callback({state: 'NULL'}); - } - - var info = {}; - data = data.replace(/\suse/ig, '').replace(/\sbytes/ig, '').split("\n").slice(0,-1); - for(var i in data){ - var temp = data[i].split(/\:\s+/); - info[temp[0].toLowerCase().trim()] = temp[1].trim(); - } - - that.updateFromInfo(info); - - var args = [info].concat(Array.prototype.slice.call(arguments, 1)); - return callback.apply(that, args); - }); -}; - -Container.prototype.updateFromInfo = function(data){ - var keys = ['state', 'ip', 'total', 'rx', 'tx', 'link', 'kmem', 'memory', 'blkio', 'cpu', 'pid']; - for(var i=keys.length; i--;){ - this[keys[i]] = data[keys[i]]; - } - - return this; -} - - - - - -var lxcORM = function(){ - this.containers = {}; - this.isReady = false; - this.whenReady = []; - var that = this; - - this.list(function(data){ - for(var idx = data.length; idx--;){ - that.containers[data[idx].name] = new Container(data[idx]); - if(idx===0){ - // console.log('call ready!') - that.callReady(); + startEphemeral: function(name, base_name, ip, callback){ + var command = 'lxc-start-ephemeral -o '+base_name+ ' -n '+name +' --union-type overlayfs -d'; + return sysExec(command, ip, function(data){ + // console.log('startEphemeral', arguments); + if(data.match("doesn't exist.")){ + return callback({status: 500, error: "doesn't exist."}); + } + if(data.match('already exists.')){ + return callback({status: 500, error: 'already exists'}); + } + if(data.match(/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)){ + return callback({status: 200, state:'RUNNING', ip: data.match(/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/)[0]}); } - } - }); -}; + callback({'?': '?', data: data, name: name, base_name: base_name}); + }); + }, -lxcORM.prototype.callReady = function(){ - for(var idx=0; idx workers.currentCreatingMax ) return false; + return doapi.dropletCreate({ + name: 'clw'+workerSnapID+'-'+(Math.random()*100).toString().slice(-4), + image: '17575764' + }, function(data){ + data = JSON.parse(data); + workers.currentCreating++; + setTimeout(function(dopletNewID){ + return workers.checkDroplet(dopletNewID); + }, 70000, data.droplet.id); + return doapi.dropletSetTag('clworker', data.droplet.id, function(){}); + }); + + }; + + workers.destroy = function(worker){ + var worker = worker || workers.pop(); + return doapi.dropletDestroy(worker.id, function(){}); + }; + + workers.makeWorkerObj = function(worker){ + worker.networks.v4.forEach(function(value){ + worker[value.type+'IP'] = value.ip_address; + }); + worker.availrunners = []; + worker.ip = worker.privateIP; + worker.usedrunner = 0; + worker.index = workers.length, + worker.getRunner = function(){ + if(this.availrunners.length === 0) return false; + console.log('getting runner from ', worker.name, ' avail length ', this.availrunners.length); + var runner = this.availrunners.pop(); + this.usedrunner++; + label2runner[runner.label] = runner; + + return runner; + } + + return worker; + }; + + workers.destroyOld = function(){ + doapi.dropletsByTag('clworker', function(data){ + data = JSON.parse(data); + data['droplets'].forEach(function(worker){ + console.log('found old droplet, killing it'); + doapi.dropletDestroy(worker.id, function(){}); + }); + }); + }; + + workers.startRunners = function(worker, newWorker, stopPercent){ + // console.log('starting runners on', worker.name, worker.ip) + stopPercent = stopPercent || 80; + ramPercentUsed(worker.ip, function(usedMemPercent){ + if(usedMemPercent < stopPercent ){ + var name = 'crunner-'+(Math.random()*100).toString().slice(-4); + return lxc.startEphemeral(name, 'crunner0', worker.ip, function(data){ + if(!data.ip) return setTimeout(workers.startRunners(worker, newWorker),0); + console.log('started runner on', worker.name) + if(newWorker) worker = workers[workers.push(worker)-1] + + worker.availrunners.push({ + ip: data.ip, + name: name, + worker: worker, + label: worker.name + ':' + name + }); + return setTimeout(workers.startRunners(worker, false ,stopPercent), 0); + }); + }else{ + console.log('using', String(usedMemPercent), 'percent memory, stopping runner creation!', worker.availrunners.length, 'created on ', worker.name); + } + }); + }; + + workers.checkBalance = function(){ + + var minWorkers = 3; + console.log('checking balance'); + + if(workers.length < minWorkers){ + console.log('less then 3 workers, starting a droplet'); + for(var i=minWorkers-workers.length; i--;) workers.create(); + return ; + } + if(workers[workers.length-3].usedrunner !== 0 && workers[workers.length-2].usedrunner !== 0 && workers[workers.length-1].usedrunner !== 0){ + console.log('last 3 workers have no free runners, starting droplet'); + return workers.create(); + } + if(workers.length > minWorkers && workers[workers.length-3].usedrunner === 0 && workers[workers.length-2].usedrunner === 0 && workers[workers.length-1].usedrunner === 0){ + console.log('Last 2 runners not used, killing last runner', workers.length); + return workers.destroy(); + } + + for(let worker of workers){ + if(worker.length <= 3) break; + if(worker.availrunners.length === 0 && worker.usedrunner === 0){ + console.log('found zombie worker, destroying') + workers.destroy(worker); + } + } + + console.log('stopping workers balancing check'); + }; + + return workers; + +})(); + +var ramPercentUsed = function(ip, callback){ + + return lxc.exec( + "python3 -c \"a=`head /proc/meminfo|grep MemAvail|grep -Po '\\d+'`;t=`head /proc/meminfo|grep MemTotal|grep -Po '\\d+'`;print(round(((t-a)/t)*100, 2))\"", + ip, + callback + ); }; - -var lxcTimeout = function(ip, time){ - var name = ip2name[ip]; - console.log(name) - time = time || 900000; // 15 minutes - var keys = Object.keys(timeoutEvents) - if(keys.indexOf(name) !== -1){ - clearTimeout(timeoutEvents[name]) +var runnerFree = function(runner){ + lxc.stop(runner.name, runner.worker.ip); + runner.worker.usedrunner--; + if(runner.hasOwnProperty('timeout')){ + clearTimeout(runner.timeout); } - timeoutEvents[name] = setTimeout(function(){ - lxc.stop(name); + delete label2runner[runner.label]; + + workers.startRunners(runner.worker); +}; + +var lxcTimeout = function(runner, time){ + time = time || 300000; // 5 minutes + + if(runner.hasOwnProperty('timeout')){ + clearTimeout(runner.timeout); + } + + return runner.timeout = setTimeout(function(){ + runnerFree(runner); }, time); -} +}; +var getAvailrunner = function(runner){ + for(let worker of workers){ + if(worker.availrunners.length === 0) continue; + if(runner && runner.worker.index <= worker.index) break; + if(runner) runnerFree(runner); + return worker.getRunner(); + } + if(runner) return runner; + return null; +}; -var runner = function(req, res, ip){ - lxcTimeout(ip); +var run = function(req, res, runner, count){ + count = count || 0; + console.log('run start', count, runner); + + if(!runner){ + console.log('no runner'); + res.status(503); + return res.json({error: 'No runners, try again soon.'}); + } var httpOptions = { - url:'http://' + ip + ':15000', + url: 'http://' + runner.worker.ip, + headers: { + Host: runner.name + }, body: JSON.stringify({ code: req.body.code }) }; + console.log('run', runner); + return request.post(httpOptions, function(error, response, body){ + // console.log('runner response:', arguments) + console.log('in request'); + if(error || response.statusCode !== 200) return run(req, res, getAvailrunner(), ++count); body = JSON.parse(body); - body['ip'] = ip.replace('10.0.', ''); + + body['ip'] = runner.label; + lxcTimeout(runner); return res.json(body); + }); }; -var addToRedis = function(){ - lxc.info(req.params.name, null, function(data){ - var domain = req.query.domain || 'vm42.us'; - domain = req.params.name+'.'+domain; - client.SADD("hosts", domain, function(){}); - - var ip = data.ip + ':5000'; - client.HSET(domain, "ip", ip, redis.print); - client.HSET(domain, "updated", (new Date).getTime(), redis.print); - client.hset(domain, "include", "proxy.include"); - return res.json({status: 200, info: data}); - }); -}; +setTimeout(function(){ + console.log('Starting balance checking in 30 seconds') + setInterval(workers.checkBalance, 15000); +}, 180000); -router.get('/start/:name', function(req, res, next){ - return lxc.start(req.params.name, function(data){ - if(!data){ - return res.json({status: 500, name: req.params.name, message: data}); - }else{ - res.json({}); - } - }); -}); +workers.destroyOld(); +workers.checkBalance(); -router.get('/live/:template/:name', function(req, res, next){ - return lxc.startEphemeral(req.params.name, req.params.template, function (data) { - console.log('live', arguments); - return res.json(data); - }); -}); +// router.get('/start/:name', function(req, res, next){ +// return lxc.start(req.params.name, function(data){ +// if(!data){ +// return res.json({ +// status: 500, +// name: req.params.name, +// message: data +// }); +// }else{ +// res.json({}); +// } +// }); +// }); + +// router.get('/live/:template/:name', function(req, res, next){ +// return lxc.startEphemeral(req.params.name, req.params.template, function (data) { +// console.log('live', arguments); +// return res.json(data); +// }); +// }); + +// router.get('/clone/:template/:name', function(req, res, next){ +// return lxc.clone(req.params.name, req.params.template, function(data){ +// console.log('clone', arguments); +// if( data.match(/Created runner/) ){ +// return res.json({status: 200}); +// }else{ +// return res.json({status: 500, message: data}); +// } +// }); +// }); + +// router.get('/destroy/:name', function(req, res, next){ +// return lxc.destroy(req.params.name, function(data){ +// console.log('destroy', arguments); +// if(data){ +// return res.json({status: 500, message: data}); +// }else{ +// return res.json({status: 200}); +// } +// }); +// }); + +// router.get('/info/:name', function(req, res, next){ +// return lxc.info(req.params.name, function(data){ +// return res.json(data); +// }); +// }); + +// router.get('/list', function(req, res, next) { +// return lxc.list(workers.clworker0.ip, function(data){ +// return res.json(data); +// }); +// }); router.get('/stop/:name', function(req, res, next){ return lxc.stop(req.params.name, function(data){ @@ -110,83 +308,15 @@ router.get('/stop/:name', function(req, res, next){ }); }); -router.get('/clone/:template/:name', function(req, res, next){ - return lxc.clone(req.params.name, req.params.template, function(data){ - console.log('clone', arguments); - if( data.match(/Created container/) ){ - return res.json({status: 200}); - }else{ - return res.json({status: 500, message: data}); - } - }); -}); - -router.get('/destroy/:name', function(req, res, next){ - return lxc.destroy(req.params.name, function(data){ - console.log('destroy', arguments); - if(data){ - return res.json({status: 500, message: data}); - }else{ - return res.json({status: 200}); - } - }); -}); - -router.get('/info/:name', function(req, res, next){ - return lxc.info(req.params.name, function(data){ - return res.json(data); - }); -}); - -router.get('/list', function(req, res, next) { - return lxc.list(function(data){ - return res.json(data); - }); +router.get('/liststuff', function(req, res, next){ + var obj = util.inspect(workers, {depth: 4}); + res.send("

Workers

"+obj+"

label2runner

"+util.inspect(label2runner)+'

DO calls

'+doapi.calls); }); router.post('/run/:ip?', function doRun(req, res, next){ - // check if server is - - return lxc.list(function(data){ - if(!req.params.ip) data = []; - var ip = '10.0.'+ req.params.ip; - var found = false; - - for(var idx=data.length; idx--;){ - if( data[idx]['ipv4'] === ip ){ - found = true; - break; - } - } - - if(found){ - return runner(req, res, ip) - }else{ - return runner(req, res, availContainers.pop()); - } - }); - + console.log('hit runner route'); + var runner = getAvailrunner(label2runner[req.params.ip]); + return run(req, res, runner); }); -// freeMem: 97700 totalmem 513818624 usedMem: 0 -// freeMem: 420,472 totalmem 513,818,624 usedMem: 100 -var startAll = function(){ - getFreeMem(function(freeMem){ - var usedMemPercent = Math.round(( (totalMem-freeMem) /totalMem)*100); - console.log('freeMem:', freeMem, 'totalmem', totalMem, 'usedMemPercent:', usedMemPercent); - if(usedMemPercent < 81 ){ - var name = 'crunner-'+(Math.random()*100).toString().replace('.',''); - return lxc.startEphemeral(name, 'crunner', function(data){ - ip2name[data.ip] = name; - availContainers.push(data.ip); - return startAll(); - }); - }else{ - console.log('using', usedMemPercent, 'percent memory, stopping container creation!', availContainers.length, 'created'); - } - }); -} - -startAll(); - module.exports = router; diff --git a/testAPI.py b/testAPI.py new file mode 100644 index 0000000..e507afd --- /dev/null +++ b/testAPI.py @@ -0,0 +1,19 @@ +import requests as r +import time + +def testAPI(times=100, sleep=2): + errors = 0 + + for i in range(times): + try: + res = r.post( + 'http://codeland.bytedev.co:2000/api/run', + data={'code': 'pwd'} + ) + if res.status_code != 200: errors += 1 + print(i, res.status_code, res.content) + except: + print('caught error') + errors += 1 + time.sleep(sleep) + print('errors ', errors, (errors/times)*100)