/** * Responsible for sequentially executing actions on the database */ var async = require('async') ; function Executor () { this.buffer = []; this.ready = false; // This queue will execute all commands, one-by-one in order this.queue = async.queue(function (task, cb) { var callback , lastArg = task.arguments[task.arguments.length - 1] , i, newArguments = [] ; // task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array for (i = 0; i < task.arguments.length; i += 1) { newArguments.push(task.arguments[i]); } // Always tell the queue task is complete. Execute callback if any was given. if (typeof lastArg === 'function') { callback = function () { if (typeof setImmediate === 'function') { setImmediate(cb); } else { process.nextTick(cb); } lastArg.apply(null, arguments); }; newArguments[newArguments.length - 1] = callback; } else { callback = function () { cb(); }; newArguments.push(callback); } task.fn.apply(task.this, newArguments); }, 1); } /** * If executor is ready, queue task (and process it immediately if executor was idle) * If not, buffer task for later processing * @param {Object} task * task.this - Object to use as this * task.fn - Function to execute * task.arguments - Array of arguments * @param {Boolean} forceQueuing Optional (defaults to false) force executor to queue task even if it is not ready */ Executor.prototype.push = function (task, forceQueuing) { if (this.ready || forceQueuing) { this.queue.push(task); } else { this.buffer.push(task); } }; /** * Queue all tasks in buffer (in the same order they came in) * Automatically sets executor as ready */ Executor.prototype.processBuffer = function () { var i; this.ready = true; for (i = 0; i < this.buffer.length; i += 1) { this.queue.push(this.buffer[i]); } this.buffer = []; }; // Interface module.exports = Executor;