diff --git a/routes/api.js b/routes/api.js index 2cc177c..45f6736 100644 --- a/routes/api.js +++ b/routes/api.js @@ -2,8 +2,8 @@ var express = require('express'); var router = express.Router(); +// what is util for?? var util = require('util'); -var request = require('request'); var lxc = require('../lxc'); var doapi = require('../doapi')(); @@ -13,58 +13,6 @@ console.log('========STARTING==========='); workers.start(); -var attemptRun = function(req, res, runner, count){ - count = count || 0; - console.log(`Runner starting attempt ${count}.`); - - if(!runner){ - console.log(`No runner available!`); - res.status(503); - return res.json({error: 'No runners, try again soon.'}); - } - - // TODO: Configurable - if(count > 2){ - console.log(`Runner attempt failed, to many requests!`); - return res.status(400).json({error: 'Runner restarted to many times'}); - } - - var httpOptions = { - url: 'http://' + runner.worker.ip, - headers: { - Host: runner.name - }, - body: JSON.stringify({ - code: req.body.code - }) - }; - - return request.post(httpOptions, function(error, response, body){ - // console.log('runner response:', arguments) - if(error || response.statusCode !== 200) { - return attemptRun(req, res, workers.getAvailableRunner(), ++count); - } - - body = JSON.parse(body); - - if(req.query.once){ - res.json(body); - // 0 here does nothing - // return runnerFree(runner, 0); - return runner.free(); - } - - workers.setRunner(runner); - body['ip'] = runner.label; - body['rname'] = runner.name; - body['wname'] = runner.worker.name; - res.json(body); - - // runnerTimeout(runner); - runner.setTimeout(); - }); -}; - // Why is this a GET? router.get('/stop/:name', function(req, res, next){ return lxc.stop(req.params.name, function(data){ @@ -105,8 +53,21 @@ router.get('/ping/:runner', function(req, res, next){ router.post('/run/:runner?', function (req, res, next){ console.log(`Request runner route!`); - var runner = workers.getAvailableRunner(workers.getRunner(req.params.runner)); - return attemptRun(req, res, runner); + + return workers.attemptRun( + req.body.code, req.query.once, req.params.runner, + (body) => { + res.json(body); + }, + (error, statusCode) => { + if (statusCode === 503){ + res.status(503); + return res.json({error: 'No runners, try again soon.'}); + } else if (statusCode === 400){ + return res.status(400).json({error: 'Runner restarted too many times'}); + } + } + ); }); module.exports = router; diff --git a/routes/worker_collection.js b/routes/worker_collection.js index d93e703..95f6cd1 100644 --- a/routes/worker_collection.js +++ b/routes/worker_collection.js @@ -1,6 +1,7 @@ 'use strict'; var jsonfile = require('jsonfile'); +var request = require('request'); var lxc = require('../lxc'); var doapi = require('../doapi')(); var settings = require('./workers.json'); @@ -38,7 +39,7 @@ var Runner = (function(){ proto.get = function(label){ return proto.runnerMap[label]; }; - + proto.create = function(config){ var runner = Object.create(proto); @@ -47,9 +48,13 @@ var Runner = (function(){ }; proto.free = function(){ + // track how often this happens in a minute var runner = this; + lxc.stop(runner.name, runner.worker.ip); + // this should be done by the worker runner.worker.usedrunners--; + if(runner.hasOwnProperty('timeout')){ clearTimeout(runner.timeout); } @@ -57,8 +62,11 @@ var Runner = (function(){ if(runner.hasOwnProperty('cleanUp')){ runner.cleanUp(); } - // TODO: Determine if this call is even needed - // runner.worker.sync(); + // worker has more space now + // worker should track this + setTimeout(()=>{ + runner.worker.populate(); + }, 0); }; proto.setTimeout = function(time){ @@ -94,13 +102,6 @@ var Worker = (function(){ }); worker.availrunners = []; - // need map of used runners - // need list of all runners - // - sync should probably populate the all runners container - // - availrunners should the diff of used runners and all runners - - // runners should probably indicate when they have been used. - worker.ip = worker.publicIP; worker.usedrunners = 0; worker.age = +(new Date()); @@ -113,6 +114,14 @@ var Worker = (function(){ return worker; }; + proto.populate = function(callback){ + callback = callback || __empty; + return lxc.startEphemeral( + "crunner-batch-$RANDOM-id-$RANDOM", "crunner0", this.ip, callback + ); + }; + + proto.getRunner = function(){ if(this.availrunners.length === 0) return false; // console.log('getting runner from ', worker.name, ' avail length ', this.availrunners.length); @@ -168,7 +177,6 @@ var Worker = (function(){ // mainly to update the active runners on the worker // potentially collect stats about the droplet as well // - check memory and check runners - // - when does start runners get called? lxc.exec('lxc-ls --fancy', worker.ip, function(data, error, stderr){ if (error){ @@ -206,11 +214,13 @@ var Worker = (function(){ } console.log(`RUNNERS FOUND[=> ${worker.ip}`); console.log(`RUNNERS FOUND[=>`, runners); - // bad worker.availrunners = []; for (let idx = 0, stop = runners.length; idx < stop; idx++){ - if(runners[idx].state !== "STOPPED" && !Runner.get(worker.name + ':' + runners[idx].name)){ + if(runners[idx].state === "STOPPED" || Runner.get(worker.name + ':' + runners[idx].name)){ + continue; + } else { + var runner = Runner.create({ "name": runners[idx].name, "ipv4": runners[idx].ipv4, @@ -234,7 +244,7 @@ var Worker = (function(){ proto.initialize = function(params, config){ // Create droplet // Once active the droplet begins to create runners - var maxMemoryUsage = args.maxMemoryUsage || config.maxMemoryUsage || 80; + var maxMemoryUsage = params.maxMemoryUsage || config.maxMemoryUsage || 80; var worker_uuid = utils.uuid(); var phone_home = config.home || "/worker/ping"; @@ -524,6 +534,62 @@ var WorkerCollection = (function(){ }); }; + workers.attemptRun = function(code, once, runnerLabel, callback, errorCallback, count){ + // PARAMS: code, once, runnerLabel, callback, errorCallback, count; + + var runner = workers.getAvailableRunner( + workers.getRunner(runnerLabel) + ); + + if(!runner){ + console.log(`No runner available!`); + return errorCallback(null, 503); + } + + count = count || 0; + + console.log(`Runner starting attempt ${count}.`); + + + if(count > 2){ + console.log(`Runner attempt failed, to many requests!`); + return errorCallback(null, 400); + } + + var httpOptions = { + url: 'http://' + runner.worker.ip, + headers: { + Host: runner.name + }, + body: JSON.stringify({ + code: code + }) + }; + + // Move the http stuff to the runner + return request.post(httpOptions, function(error, response, body){ + // console.log('runner response:', arguments) + + if(error || response.statusCode !== 200) { + return workers.attemptRun(code, once, void 0, callback, errorCallback, ++count); + } + + body = JSON.parse(body); + + if(once){ + runner.free(); + } else { + runner.setTimeout(); + body['ip'] = runner.label; + body['rname'] = runner.name; + body['wname'] = runner.worker.name; + workers.setRunner(runner); + } + return callback(body); + }); + }; + + workers.add = function(newWorkers){ newWorkers.forEach(function(worker){ workers.push(worker);