diff --git a/allocate_runners.sh b/allocate_runners.sh new file mode 100644 index 0000000..79e018c --- /dev/null +++ b/allocate_runners.sh @@ -0,0 +1,33 @@ +# maxMemoryUsage must be defined + +function usedMemoryPercent () { + memoryAvailable=$(head /proc/meminfo|grep MemAvail|grep -Po '\d+'); + totalMemory=$(head /proc/meminfo|grep MemTotal|grep -Po '\d+'); + difference=$(expr $totalMemory - $memoryAvailable); + difference=$(expr $difference \* 100); + memory=$(expr $difference / $totalMemory); +} + +function buildRunners () { + baseName="crunner0"; + namePrefix="crunner-batch-${RANDOM}"; + runners=""; + usedMemoryPercent; + + # maxMemoryUsage must be defined + until [[ $memory -gt $maxMemoryUsage ]]; do + + runnerName="${namePrefix}-id-${RANDOM}"; + lxc-start-ephemeral -o $baseName -n $runnerName --union-type overlayfs -d; + + if [[ $? -eq 0 ]]; then + runners="${runners};${runnerName}"; + fi + usedMemoryPercent; + done +} +buildRunners; + +# Add curl to manager here. Sending status report to manager +# curl -X POST "${PHONE_HOME}" -d'{"memory":${memory}, "runnersShort": "${runners}", "runnersLong": "'"$(lxc-ls --fancy)"'", "id": "${WORKER_UUID}"}' +exit 0; \ No newline at end of file diff --git a/docs/lxc-world.md b/docs/lxc-world.md new file mode 100644 index 0000000..3edc391 --- /dev/null +++ b/docs/lxc-world.md @@ -0,0 +1,19 @@ +##### LXC Commands + + +`lxc-top`: + + Realtime Meta data about containers(not machine friendly). Monitor container statistics. + +`lxc-ls --fancy`: + + list of existing containers + +`lxc-start-ephemeral`, `lxc-copy`: + + start an ephemeral copy of an existing container + + +`lxc-attach`: + + start a process inside a running container.(enter the droplet) \ No newline at end of file diff --git a/docs/memory-managment.md b/docs/memory-managment.md new file mode 100644 index 0000000..79caf7d --- /dev/null +++ b/docs/memory-managment.md @@ -0,0 +1,27 @@ +##### Memory Managment + + +`free`, `free -h`: + + display amount of free and used memory in the system. + * Disclamers: + - DO NOT USE IN SCRIPT + +`/proc/`: + + It is actually an kernel application. Looks like a file, but is actually generated on demand by the kernel. All "files" in proc are kernal commands. Interact with these commands as if they were files. + + +`/proc/meminfo`: + + File containing realtime memory info. + +`/proc/cpuinfo`: + + cpu info + + + +`/dev`: + + DANGER This folder contains physical interfaces to hardware on or connected to the machine. Just don't. Read first. Except `/dev/null` it is a blackhole. \ No newline at end of file diff --git a/lxc.js b/lxc.js index ab544d1..6018fe4 100644 --- a/lxc.js +++ b/lxc.js @@ -5,7 +5,7 @@ var exec = require('child_process').exec; function sysExec(command, ip, callback){ ip = ip || '104.236.77.157'; command = new Buffer(command).toString('base64') - command = 'ssh -i ~/.ssh/clw_rsa -o StrictHostKeyChecking=no virt@'+ ip + ' "echo ' + command + '|base64 --decode|bash"'; + command = 'ssh -i ~/.ssh/clw_rsa -o "StrictHostKeyChecking no" virt@'+ ip + ' "echo ' + command + '|base64 --decode|bash"'; // command = 'unset XDG_SESSION_ID XDG_RUNTIME_DIR; cgm movepid all virt $$; ' + command; return exec(command, (function(callback){ diff --git a/routes/api.js b/routes/api.js index bf929de..45f6736 100644 --- a/routes/api.js +++ b/routes/api.js @@ -2,377 +2,18 @@ var express = require('express'); var router = express.Router(); +// what is util for?? var util = require('util'); -var request = require('request'); -var jsonfile = require('jsonfile'); var lxc = require('../lxc'); var doapi = require('../doapi')(); -var settings = require('./workers.json'); -// mapping of current used runners for quick loop up based on runner label -var label2runner = {}; +var workers = require('./worker_collection.js'); -// -var tagPrefix = settings.tagPrefix || 'clwV'; - -var workers = (function(){ - // works array constructor. This will hold the works(order by creation) and all - // the methods interacting with the workers. - - // base array that will be the workers objects. - var workers = []; - - // persistent settings object - // .image is the currently used Digital Ocean snap shot ID - // .lastSnapShotId is the previous ID used Digital Ocean snap shot - // .version is the current worker version - // .size is the base Droplet size for worker creation - // .min is the minimum amount of workers that should exist - // .max is the maximum amount of works that ca exist - // .minAvail is the amount of empty workers there should be - workers.settings = settings; - - // How many droplets are currently in the process of being created. It takes - // about 3 minutes to create a worker. - workers.currentCreating = 0; - - workers.create = function(){ - // manages the creation of a work from first call to all runners seeded - - // dont create more workers then the settings file allows - if(workers.currentCreating > workers.settings.max ) return false; - workers.currentCreating++; - - doapi.dropletToActive({ - name: 'clw'+workers.settings.version+'-'+(Math.random()*100).toString().slice(-4), - image: workers.settings.image, - size: workers.settings.size, - onCreate: function(data){ - doapi.dropletSetTag(tagPrefix+workers.settings.version, data.droplet.id); - }, - onActive: function(worker, args){ - workers.startRunners({ - worker: workers.makeWorkerObj(worker), - onStart: function(runner, args){ - workers.push(args.worker); - doapi.domianAddRecord({ - domain: "codeland.us", - type: "A", - name: "*."+worker.name+".workers", - data: worker.publicIP - }); - args.onStart = function(){}; - }, - onDone: function(args){ - console.log("Seeded runners on", worker.name); - workers.currentCreating--; - } - }); - } - }); - - }; - - workers.makeWorkerObj = function(worker){ - // Create object for each worker. - - worker.networks.v4.forEach(function(value){ - worker[value.type+'IP'] = value.ip_address; - }); - - worker.availrunners = []; - worker.ip = worker.publicIP; - worker.usedrunners = 0; - worker.index = workers.length; - - worker.getRunner = function(){ - if(this.availrunners.length === 0) return false; - // console.log('getting runner from ', worker.name, ' avail length ', this.availrunners.length); - var runner = this.availrunners.pop(); - this.usedrunners++; - runnerTimeout(runner); - - return runner; - } - - return worker; - }; - - workers.__workersId = function(argument){ - // create array of all current worker Digital Ocean ID - return workers.map(function(item){ - return item.id; - }); - - }; - - workers.destroy = function(worker){ - // todo: If worker is passed, check for it in the workers array and - // remove it if found. - - var worker = worker || workers.pop(); - return doapi.dropletDestroy(worker.id, function(body) { - console.log('Deleted worker', worker.name); - }); - }; - - workers.destroyByTag = function(tag){ - // Delete works that with - - tag = tag || tagPrefix + workers.settings.version; - let currentIDs = workers.__workersId(); - - let deleteDroplets = function(droplets){ - if(droplets.length === 0) return true; - let droplet = droplets.pop(); - if(~currentIDs.indexOf(droplet.id)) return deleteDroplets(droplets); - - doapi.dropletDestroy(droplet.id, function(body){ - setTimeout(deleteDroplets, 1000, droplets); - if(!droplets.length) console.log(`Finished deleting workers tagged ${tag}.`); - }); - } - - doapi.dropletsByTag(tag, function(data){ - data = JSON.parse(data); - console.log(`Deleting ${data['droplets'].length} workers tagged ${tag}. Workers`, - data['droplets'].map(function(item){ - return item.name+' | '+item.id; - }) - ); - - deleteDroplets(data['droplets']); - }); - }; - - workers.startRunners = function(args){ - // console.log('starting runners on', args.worker.name, args.worker.ip) - - // dont make runners on out dated workers - if(!args.worker || workers.settings.image > args.worker.image.id){ - console.log(`Blocked outdated worker(${args.worker.image.id}), current image ${workers.settings.image}.`) - return ; - } - - // percent of used RAM to stop runner creation - args.stopPercent = args.stopPercent || 80; - args.onStart = args.onStart || function(){}; - args.onDone = args.onDone || function(){}; - - ramPercentUsed(args.worker.ip, function(usedMemPercent){ - if(usedMemPercent > args.stopPercent ){ - console.log('using', String(usedMemPercent).trim(), - 'percent memory, stopping runner creation!', args.worker.availrunners.length, - 'created on ', args.worker.name - ); - args.onDone(args); - return ; - } - - var name = 'crunner-'+(Math.random()*100).toString().slice(-4); - // console.log('Free ram check passed!') - lxc.startEphemeral(name, 'crunner0', args.worker.ip, function(data){ - if(!data.ip) return setTimeout(workers.startRunners, 0, args); - // console.log('started runner on', args.worker.name) - - var runner = { - ip: data.ip, - name: name, - worker: args.worker, - label: args.worker.name + ':' + name - }; - args.onStart(runner, args); - - args.worker.availrunners.push(runner); - - setTimeout(workers.startRunners, 0, args); - }); - }); - }; - - workers.checkForZombies = function(){ - // check to make sure all works are used or usable. - - let zombies = 0; - - for(let worker of workers){ - console.log(`Checking if ${worker.name} is a zombie worker.`); - // if a runner has no available runners and no used runners, its a - // zombie. This should happen when a newer image ID has been added - // and old workers slowly lose there usefulness. - if(worker.availrunners.length === 0 && worker.usedrunners === 0){ - workers.splice(workers.indexOf(worker), 1); - console.log(`Zombie! Worker ${worker.name}, destroying.`); - workers.destroy(worker); - zombies++; - } - } - - return zombies; - }; - - workers.checkBalance = function(){ - console.log(`Checking balance.`); - - workers.checkForZombies(); - - // if there are workers being created, stop scale up and down check - if(workers.currentCreating+workers.length < workers.settings.min) null; - else if(workers.currentCreating) - return console.log(`Killing balance, workers are being created.`); - - // hold amount of workers with no used runners - var lastMinAval = 0; - - // check to make sure the `workers.settings.minAvail` have free runners - for(let worker of workers.slice(-workers.settings.minAvail)){ - if(worker.usedrunners === 0){ - lastMinAval++; - }else{ - // no need to keep counting, workers need to be created - break; - } - } - - if(lastMinAval > workers.settings.minAvail){ - // Remove workers if there are more then the settings states - console.log( - `Last ${workers.settings.minAvail} workers not used, killing last worker`, - 'lastMinAval:', lastMinAval, - 'minAvail:', workers.settings.minAvail, - 'workers:', workers.length - ); - - return workers.destroy(); - - } else if(lastMinAval < workers.settings.minAvail){ - // creates workers if the settings file demands it - console.log( - 'last 3 workers have no free runners, starting worker', - 'lastMinAval:', lastMinAval, - 'minAvail:', workers.settings.minAvail, - 'workers:', workers.length - ); - - return workers.create(); - } - - }; - workers.settingsSave = function(){ - // save the live settings file to disk - - jsonfile.writeFile('./workers.json', workers.settings, {spaces: 2}, function(err) { - console.error(err); - }); - }; - - workers.add = function(newWorkers){ - newWorkers.forEach(function(worker){ - workers.push(worker); - }); - }; - - // make sure Digital Ocean has a tag for the current worker version - doapi.tagCreate(tagPrefix+workers.settings.version); - - return workers; - -})(); - -var ramPercentUsed = function(ip, callback){ - // checks the percent of ram used on a worker. - - return lxc.exec( - "python3 -c \"a=`head /proc/meminfo|grep MemAvail|grep -Po '\\d+'`;t=`head /proc/meminfo|grep MemTotal|grep -Po '\\d+'`;print(round(((t-a)/t)*100, 2))\"", - ip, - callback - ); -}; - -var runnerTimeout = function(runner, time){ - time = time || 60000; // 1 minutes - - if(runner.hasOwnProperty('timeout')){ - clearTimeout(runner.timeout); - } - - return runner.timeout = setTimeout(runnerFree, time, runner); -}; - -var runnerFree = function(runner){ - lxc.stop(runner.name, runner.worker.ip); - runner.worker.usedrunners--; - if(runner.hasOwnProperty('timeout')){ - clearTimeout(runner.timeout); - } - delete label2runner[runner.label]; - - console.log(`Runner freed ${runner.label}.`, runner.worker); - workers.startRunners({worker: runner.worker}); -}; - -var getAvailrunner = function(runner){ - for(let worker of workers){ - if(worker.availrunners.length === 0) continue; - if(runner && runner.worker.index <= worker.index) break; - if(runner) runnerFree(runner); - - return worker.getRunner(); - } - - if(runner) return runner; -}; - -var run = function(req, res, runner, count){ - count = count || 0; - console.log(`Runner starting attempt ${count}.`); - - if(!runner){ - console.log(`No runner available!`); - res.status(503); - return res.json({error: 'No runners, try again soon.'}); - } - - if(count > 2){ - console.log(`Runner attempt failed, to many requests!`); - return res.status(400).json({error: 'Runner restarted to many times'}); - } - - var httpOptions = { - url: 'http://' + runner.worker.ip, - headers: { - Host: runner.name - }, - body: JSON.stringify({ - code: req.body.code - }) - }; - - return request.post(httpOptions, function(error, response, body){ - // console.log('runner response:', arguments) - if(error || response.statusCode !== 200) return run(req, res, getAvailrunner(), ++count); - body = JSON.parse(body); - - if(req.query.once){ - res.json(body); - return runnerFree(runner, 0); - } - - label2runner[runner.label] = runner; - body['ip'] = runner.label; - body['rname'] = runner.name; - body['wname'] = runner.worker.name; - res.json(body); - - runnerTimeout(runner); - }); -}; - -console.log('========STARTING===========') -setInterval(workers.checkBalance, 15000); -workers.destroyByTag(); +console.log('========STARTING==========='); +workers.start(); +// Why is this a GET? router.get('/stop/:name', function(req, res, next){ return lxc.stop(req.params.name, function(data){ console.log('stop', arguments); @@ -383,79 +24,13 @@ router.get('/stop/:name', function(req, res, next){ } }); }); + +// Why is this a GET? router.get('/destroyByTag', function(req, res, next) { workers.destroyByTag(); res.send('?'); }); -router.post('/updateID', function(req, res, next){ - var newWorkers = { - workers: [], - image: req.query.image, - target: req.query.target || workers.length, - size: req.query.size || workers.settings.size, - version: workers.settings.version+1, - min: req.query.min || workers.settings, - minAvail: req.query.minAvail || workers.settings - }; - - doapi.tagCreate(tagPrefix+newWorkers.version); - workers.destroyByTag(tagPrefix+newWorkers.version); - - for(var i=0; i= args.newWorkers.target){ - console.log('upgrade complete!') - workers.settings.image = args.newWorkers.image; - workers.settings.size = args.newWorkers.size; - workers.settings.min = args.newWorkers.min; - workers.settings.minAvail = args.newWorkers.minAvail; - - workers.forEach(function(worker){ - worker.availrunners.forEach(function(runner){ - lxc.stop(runner.name, runner.worker.ip); - }); - worker.availrunners = []; - }); - - workers.add(args.newWorkers.workers); - workers.settingsSave(); - workers.checkBalance(); - } - } - - }); - } - }); - } - res.json({status: "maybe?"}); -}); router.get('/liststuff', function(req, res, next){ var obj = util.inspect(workers, {depth: 4}); @@ -463,22 +38,36 @@ router.get('/liststuff', function(req, res, next){

Workers

${obj}

label2runner

-
${util.inspect(label2runner)}
+
${util.inspect(workers.runnerMap)}

DO calls

${doapi.calls} `); }); router.get('/ping/:runner', function(req, res, next){ - var runner = label2runner[req.params.runner]; - runnerTimeout(runner); + var runner = workers.getRunner(req.params.runner); + // runnerTimeout(runner); + runner.setTimeout(); res.json({res:''}); }); router.post('/run/:runner?', function (req, res, next){ console.log(`Request runner route!`); - var runner = getAvailrunner(label2runner[req.params.runner]); - return run(req, res, runner); + + return workers.attemptRun( + req.body.code, req.query.once, req.params.runner, + (body) => { + res.json(body); + }, + (error, statusCode) => { + if (statusCode === 503){ + res.status(503); + return res.json({error: 'No runners, try again soon.'}); + } else if (statusCode === 400){ + return res.status(400).json({error: 'Runner restarted too many times'}); + } + } + ); }); module.exports = router; diff --git a/routes/worker_collection.js b/routes/worker_collection.js new file mode 100644 index 0000000..95f6cd1 --- /dev/null +++ b/routes/worker_collection.js @@ -0,0 +1,607 @@ +'use strict'; + +var jsonfile = require('jsonfile'); +var request = require('request'); +var lxc = require('../lxc'); +var doapi = require('../doapi')(); +var settings = require('./workers.json'); +var fs = require('fs'); +settings.tagPrefix = settings.tagPrefix || 'clwV'; + +var utils = (function(){ + return { + + "uuid": function(){ + return (Math.random()*100).toString().slice(-4); + } + + }; + +})(); + + + +var Runner = (function(){ + var proto = {}; + var __empty = function(){}; + + proto.runnerMap = {}; + + proto.cleanUp = function(label){ + delete proto.runnerMap[label]; + }; + + proto.set = function(runner){ + proto.runnerMap[runner.label] = runner; + proto.runnerMap[runner.label] = runner; + }; + + proto.get = function(label){ + return proto.runnerMap[label]; + }; + + + proto.create = function(config){ + var runner = Object.create(proto); + Object.assign(runner, config); + return runner; + }; + + proto.free = function(){ + // track how often this happens in a minute + var runner = this; + + lxc.stop(runner.name, runner.worker.ip); + // this should be done by the worker + runner.worker.usedrunners--; + + if(runner.hasOwnProperty('timeout')){ + clearTimeout(runner.timeout); + } + + if(runner.hasOwnProperty('cleanUp')){ + runner.cleanUp(); + } + // worker has more space now + // worker should track this + setTimeout(()=>{ + runner.worker.populate(); + }, 0); + }; + + proto.setTimeout = function(time){ + time = time || 60000; // 1 minutes + var runner = this; + if(runner.hasOwnProperty('timeout')){ + clearTimeout(runner.timeout); + } + + return runner.timeout = setTimeout(function(){ + runner.free(); + }, time); + }; + + return proto; +})(); + + +var Worker = (function(){ + var proto = {}; + var __empty = function(){}; + + // settings should probably be retrieved via a function + proto.settings = settings; + + var maxSyncAttempts = 6; + + proto.create = function(config){ + var worker = Object.create(proto); + Object.assign(worker, config); + worker.networks.v4.forEach(function(value){ + worker[ value.type + 'IP' ] = value.ip_address; + }); + + worker.availrunners = []; + worker.ip = worker.publicIP; + worker.usedrunners = 0; + worker.age = +(new Date()); + worker.canSchedule = true; + worker.isBuildingRunners = false; + worker.isSyncing = false; + worker.syncAttempts = 0; + + + return worker; + }; + + proto.populate = function(callback){ + callback = callback || __empty; + return lxc.startEphemeral( + "crunner-batch-$RANDOM-id-$RANDOM", "crunner0", this.ip, callback + ); + }; + + + proto.getRunner = function(){ + if(this.availrunners.length === 0) return false; + // console.log('getting runner from ', worker.name, ' avail length ', this.availrunners.length); + var runner = this.availrunners.pop(); + this.usedrunners++; + runner.setTimeout(); + return runner; + }; + + + proto.ramPercentUsed = function(callback){ + // checks the percent of ram used on a worker. + return lxc.exec( + "python3 -c \"a=`head /proc/meminfo|grep MemAvail|grep -Po '\\d+'`;t=`head /proc/meminfo|grep MemTotal|grep -Po '\\d+'`;print(round(((t-a)/t)*100, 2))\"", + this.ip, + callback + ); + }; + + proto.destroy = function(){ + var worker = this; + worker.canSchedule = false; + return doapi.dropletDestroy(this.id, function(body) { + console.log('Deleted worker', worker.name); + }); + }; + + proto.isZombie = function(){ + return this.availrunners.length === 0 && this.usedrunners === 0 && !this.isBuildingRunners; + }; + + proto.register = function(){ + var worker = this; + doapi.domianAddRecord({ + domain: "codeland.us", + type: "A", + name: "*." + this.name + ".workers", + data: this.publicIP + }); + }; + + + // When should this be called + proto.sync = function(callback, errorCallback, maxAttempts){ + var worker = this; + + maxAttempts = maxAttempts || maxSyncAttempts; + worker.isSyncing = true; + callback = callback || __empty; + errorCallback = errorCallback || __empty; + // this will call the droplet or the droplet will send the data using a cron job + + // mainly to update the active runners on the worker + // potentially collect stats about the droplet as well + // - check memory and check runners + + lxc.exec('lxc-ls --fancy', worker.ip, function(data, error, stderr){ + if (error){ + console.log("Sync Error: \n", error); + if (worker.syncAttempts > maxAttempts){ + setTimeout(function(){ + errorCallback(error, worker); + }, 0); + } else { + console.log("Waiting 15 seconds") + worker.syncAttempts++; + setTimeout(function(){ + worker.sync(maxAttempts, callback, errorCallback); + }, 15000); + } + } else { + + var output = data.split("\n"); + var keys = output.splice(0,1)[0].split(/\s+/).slice(0,-1); + var runners = []; + + keys = keys.map(function(v){return v.toLowerCase()}); + output = output.slice(0).slice(0,-1); + + for(var i in output){ + if(output[i].match(/^-/)) continue; // compatibility with 1.x and 2.x output + + var aIn = output[i].split(/\s+/).slice(0,-1); + var mapOut = {}; + aIn.map(function(value,idx){ + mapOut[keys[idx]] = value; + }); + runners.push(mapOut); + + } + console.log(`RUNNERS FOUND[=> ${worker.ip}`); + console.log(`RUNNERS FOUND[=>`, runners); + worker.availrunners = []; + + for (let idx = 0, stop = runners.length; idx < stop; idx++){ + if(runners[idx].state === "STOPPED" || Runner.get(worker.name + ':' + runners[idx].name)){ + continue; + } else { + + var runner = Runner.create({ + "name": runners[idx].name, + "ipv4": runners[idx].ipv4, + "worker": worker, + "label": worker.name + ':' + runners[idx].name + }); + + worker.availrunners.push(runner); + } + } + console.log(`RUNNERS AVAILABLE[=>`, worker.availrunners); + // TODO: Determine if this flag is needed anymore + worker.isBuildingRunners = false; + worker.isSyncing = false; + worker.syncAttempts = 0; + callback(null, worker); + } + }); + }; + + proto.initialize = function(params, config){ + // Create droplet + // Once active the droplet begins to create runners + var maxMemoryUsage = params.maxMemoryUsage || config.maxMemoryUsage || 80; + var worker_uuid = utils.uuid(); + var phone_home = config.home || "/worker/ping"; + + + var callback = params.callback || __empty; + var errorCallback = params.errorCallback || empty; + + fs.readFile(__dirname + "/../allocate_runners.sh", function(error, file){ + + doapi.dropletToActive({ + name: config.tagPrefix + (config.version + "") + '-' + utils.uuid(), + image: config.image, + size: config.size, + user_data: proto.__buildCommand(file, maxMemoryUsage, worker_uuid, phone_home), + + onCreate: function(data){ + doapi.dropletSetTag( + config.tagPrefix + config.version, + data.droplet.id + ); + }, + onActive: function(data, args){ + data.worker_uuid = worker_uuid; + var worker = Worker.create(data); + + // wait for boot before syncing runners + setTimeout(function(){ + worker.sync(callback, errorCallback); + }, 75000); + } + }); + }); + }; + + proto.__buildCommand = function(file, maxMemoryUsage, worker_uuid, phone_home){ + var scriptSetup, script, createScript, makeScriptExecutable, setupCrontab; + var interval = 1; + + // worker_uuid and phone_home are only usable with localhost tunnels setup in dev + // cronjobSetup = `export PATH=\${PATH};export WORKER_UUID="${worker_uuid}";export PHONE_HOME=${phone_home};export maxMemoryUsage=${maxMemoryUsage};`; + scriptSetup = `export PATH=\${PATH};export WORKER_UUID="${worker_uuid}";export maxMemoryUsage=${maxMemoryUsage};`; + script = scriptSetup + `echo '${file.toString("base64")}'|base64 --decode|bash`; + + createScript = `echo "${script}" | cat > /home/virt/allocate_runners.sh`; + + makeScriptExecutable = `chmod o+x /home/virt/allocate_runners.sh`; + + setupCrontab = `echo "*/${interval} * * * * /home/virt/allocate_runners.sh > /home/virt/allocate_runners.log 2>&1" | crontab -u virt -`; + + return `#!/bin/bash\n\n${createScript} && ${makeScriptExecutable} && ${setupCrontab};`; + }; + + return proto; +})(); + + +var WorkerCollection = (function(){ + // works array constructor. This will hold the works(order by creation) and all + // the methods interacting with the workers. + + // base array that will be the workers objects. + var workers = []; + + + // persistent settings object + // .image is the currently used Digital Ocean snap shot ID + // .lastSnapShotId is the previous ID used Digital Ocean snap shot + // .version is the current worker version + // .size is the base Droplet size for worker creation + // .min is the minimum amount of workers that should exist + // .max is the maximum amount of works that ca exist + // .minAvail is the amount of empty workers there should be + workers.settings = settings; + + // How many droplets are currently in the process of being created. It takes + // about 3 minutes to create a worker. + workers.currentCreating = 0; + + // REMOVE THIS + workers.runnerMap = Runner.runnerMap; + + workers.setRunner = function(runner){ + Runner.set(runner); + }; + + + workers.getRunner = function(label){ + return Runner.get(label); + }; + + //************************************************** + //************************************************** + + workers.getAvailableRunner = function(runner){ + for(let worker of workers){ + if(worker.availrunners.length === 0) continue; + if(runner && runner.worker.age <= worker.age) break; + if(runner) runner.free(); + + return worker.getRunner(); + } + + if(runner) return runner; + }; + + workers.create = function(config){ + // manages the creation of a work from first call to all runners seeded + + // dont create more workers then the settings file allows + if(workers.length + workers.currentCreating >= workers.settings.max ) return false; + workers.currentCreating++; + + var count = 0; + config = config || workers.settings; + Worker.initialize({ + "callback": function(error, worker){ + console.log("Seeded runners on", worker.name); + workers.push(worker); + worker.register(); + workers.currentCreating--; + }, + "errorCallback": function(error, worker){ + // destroy worker + workers.currentCreating--; + } + }, config); + }; + + workers.__workersId = function(argument){ + // create array of all current worker Digital Ocean ID + return workers.map(function(item){ + return item.id; + }); + }; + + workers.destroy = function(worker){ + // removes last one + // X TODO: If worker is passed, check for it in the workers array and + // remove it if found. + if ( worker ){ + var worker_idx = workers.indexOf(worker); + if (~worker_idx){ + workers.splice(worker_idx, 1); + return worker.destroy(); + } + } else { + worker = workers.pop(); + return worker.destroy(); + } + }; + + workers.destroyByTag = function(tag){ + // Delete works that with + + tag = tag || workers.settings.tagPrefix + workers.settings.version; + let currentIDs = workers.__workersId(); + + let deleteDroplets = function(droplets){ + if(droplets.length === 0) return true; + let droplet = droplets.pop(); + if(~currentIDs.indexOf(droplet.id)) return deleteDroplets(droplets); + + doapi.dropletDestroy(droplet.id, function(body){ + setTimeout(deleteDroplets, 1000, droplets); + if(!droplets.length) console.log(`Finished deleting workers tagged ${tag}.`); + }); + } + + // TODO: move to seperate method + doapi.dropletsByTag(tag, function(data){ + data = JSON.parse(data); + console.log(`Deleting ${data['droplets'].length} workers tagged ${tag}. Workers`, + data['droplets'].map(function(item){ + return item.name+' | '+item.id; + }) + ); + + deleteDroplets(data['droplets']); + }); + }; + + workers.checkForZombies = function(callback){ + // check to make sure all works are used or usable. + if (workers.length === 0) callback(); + let + zombies = 0, + syncedCount = workers.length, + workerCleanUp = function(error, worker){ + console.log(`Zombie! Worker ${worker.name}, destroying.`); + workers.destroy(worker); + zombies++; + if(!--count) callback(); + }; + + + for(let worker of workers){ + console.log(`Checking if ${worker.name} is a zombie worker.`); + // if a runner has no available runners and no used runners, its a + // zombie. This should happen when a newer image ID has been added + // and old workers slowly lose there usefulness. + worker.sync(function(error, worker){ + if(worker.isZombie()) workerCleanUp(error, worker); + }, workerCleanUp); + } + }; + + workers.checkBalance = function(){ + console.log(`${(new Date())} Checking balance.`); + + workers.checkForZombies(function(){ + // if there are workers being created, stop scale up and down check + var skipBalance = workers.currentCreating + workers.length >= workers.settings.min; + if(workers.currentCreating && skipBalance){ + return console.log(`Killing balance, workers are being created.`); + } + + workers.balance(); + }); + }; + + workers.balance = function(){ + console.log(`BALANCING: ${(new Date())}`); + // count workers and locate oldest worker + var oldestWorker, isNotOlder, workerCount = 0; + + for(let worker of workers){ + console.log(` + Checking worker + worker.name: ${worker.name} + worker.usedrunners: ${worker.usedrunners} + worker.availrunners: ${worker.availrunners.length} + workerCount: ${workerCount} + compare: ${worker.usedrunners !== 0} + `); + + if(worker.usedrunners === 0){ + workerCount++; + isNotOlder = oldestWorker && oldestWorker.age < worker.age + oldestWorker = (isNotOlder ? oldestWorker:worker); + } + } + + if(workerCount > workers.settings.minAvail){ + // Remove oldest worker if there are more than the settings file state + console.log(` + Destroying Worker + Last ${workers.settings.minAvail} workers not used, killing last worker + workerCount: ${workerCount} + minAvail: ${workers.settings.minAvail} + workers: ${workers.length} + `); + return workers.destroy(oldestWorker); + + } else if( workerCount < workers.settings.minAvail){ + // Creates worker if there are less than the settings state + console.log(` + Creating Worker + last 3 workers have no free runners, starting worker, + workerCount: ${workerCount} + minAvail: ${workers.settings.minAvail} + workers: ${workers.length} + `); + + return workers.create(); + } else { + console.log(` + Blanced + LMA: ${workerCount} + Settings MA: ${workers.settings.minAvail} + Workers: ${workers.length} + `); + } + + + }; + + workers.start = function(interval){ + setInterval(workers.checkBalance, interval || 15000); + workers.destroyByTag(); + }; + + workers.settingsSave = function(){ + // save the live settings file to disk + + jsonfile.writeFile('./workers.json', workers.settings, {spaces: 2}, function(err) { + console.error(err); + }); + }; + + workers.attemptRun = function(code, once, runnerLabel, callback, errorCallback, count){ + // PARAMS: code, once, runnerLabel, callback, errorCallback, count; + + var runner = workers.getAvailableRunner( + workers.getRunner(runnerLabel) + ); + + if(!runner){ + console.log(`No runner available!`); + return errorCallback(null, 503); + } + + count = count || 0; + + console.log(`Runner starting attempt ${count}.`); + + + if(count > 2){ + console.log(`Runner attempt failed, to many requests!`); + return errorCallback(null, 400); + } + + var httpOptions = { + url: 'http://' + runner.worker.ip, + headers: { + Host: runner.name + }, + body: JSON.stringify({ + code: code + }) + }; + + // Move the http stuff to the runner + return request.post(httpOptions, function(error, response, body){ + // console.log('runner response:', arguments) + + if(error || response.statusCode !== 200) { + return workers.attemptRun(code, once, void 0, callback, errorCallback, ++count); + } + + body = JSON.parse(body); + + if(once){ + runner.free(); + } else { + runner.setTimeout(); + body['ip'] = runner.label; + body['rname'] = runner.name; + body['wname'] = runner.worker.name; + workers.setRunner(runner); + } + return callback(body); + }); + }; + + + workers.add = function(newWorkers){ + newWorkers.forEach(function(worker){ + workers.push(worker); + }); + }; + // does this have to be last? + // make sure Digital Ocean has a tag for the current worker version + doapi.tagCreate(workers.settings.tagPrefix + workers.settings.version); + + return workers; + +})(); + + +module.exports = WorkerCollection; \ No newline at end of file diff --git a/routes/workers.json b/routes/workers.json index 0b060ae..3b22b85 100644 --- a/routes/workers.json +++ b/routes/workers.json @@ -5,5 +5,6 @@ "size":"512mb", "max":100, "min":3, - "minAvail":3 + "minAvail":3, + "tagPrefix": "dev--CLW" } \ No newline at end of file diff --git a/testAPI.js b/testAPI.js index 01b0ca2..30b3c8c 100644 --- a/testAPI.js +++ b/testAPI.js @@ -21,7 +21,7 @@ var callRunner = (function(){ let httpOptions = { - url: 'http://codeland.bytedev.co:2000/api/run?once=true', + url: 'http://localhost:2000/api/run?once=true', form: { code: code || `python3 -c " from time import sleep @@ -37,10 +37,20 @@ sleep(${sleepTime}) noRunner++; }else if(error || response.statusCode !== 200){ errors++; + console.log(` + ID: ${id} + Error: ${error} + `); + } else { body = JSON.parse(body); res = (Buffer.from(body.res, 'base64').toString('ascii')); } - console.log(`${id} with results ${res}. Errors ${errors}. No runner ${noRunner}. Completed ${completed}`); + console.log(` + ${id} with results ${res}. + Errors ${errors}. + No runner ${noRunner}. + Completed ${completed} + `); callback() @@ -58,4 +68,4 @@ let __do = function(till){ setTimeout(__do, 1500, --till); }; -__do(30) +__do(1000);