diff --git a/lib/datastore.js b/lib/datastore.js index 57dccc6..8d45b89 100755 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -89,6 +89,15 @@ class Datastore extends EventEmitter { this.executor.push({ this: this.persistence, fn: this.persistence.loadDatabase, arguments: arguments }, true) } + loadDatabaseAsync () { + return this.executor.push({ + this: this.persistence, + fn: this.persistence.loadDatabaseAsync, + arguments: arguments, + async: true + }, true) + } + /** * Get an array of all the data in the database */ @@ -343,19 +352,15 @@ class Datastore extends EventEmitter { * @private */ _insert (newDoc, callback = () => {}) { - let preparedDoc + return callbackify(this._insertAsync.bind(this))(newDoc, callback) + } - try { - preparedDoc = this._prepareDocumentForInsertion(newDoc) - this._insertInCache(preparedDoc) - } catch (e) { - return callback(e) - } + async _insertAsync (newDoc) { + const preparedDoc = this._prepareDocumentForInsertion(newDoc) + this._insertInCache(preparedDoc) - this.persistence.persistNewState(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc], err => { - if (err) return callback(err) - return callback(null, model.deepCopy(preparedDoc)) - }) + await this.persistence.persistNewStateAsync(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc]) + return model.deepCopy(preparedDoc) } /** @@ -433,6 +438,10 @@ class Datastore extends EventEmitter { this.executor.push({ this: this, fn: this._insert, arguments: arguments }) } + insertAsync () { + this.executor.push({ this: this, fn: this._insertAsync, arguments: arguments, async: true }) + } + /** * Count all documents matching the query * @param {Query} query MongoDB-style query diff --git a/lib/executor.js b/lib/executor.js index 98a9c4c..dd866ec 100755 --- a/lib/executor.js +++ b/lib/executor.js @@ -1,41 +1,84 @@ /** * Responsible for sequentially executing actions on the database */ -const async = require('async') + +const makeQueue = execute => { + const tasks = new Map() + let running = false + let drainPromise = Promise.resolve() + const executeNextTask = async (self = false) => { + if (!tasks.size) { + running = false + return + } else if (running && !self) { + return + } + running = true + const [task, { resolve, reject }] = tasks[Symbol.iterator]().next().value + + tasks.delete(task) + try { + resolve(await execute(task)) + } catch (err) { + reject(err) + } + drainPromise = executeNextTask(true) + } + + return { + push (task) { + let _resolve, _reject + const promise = new Promise((resolve, reject) => { + _reject = reject + _resolve = resolve + }) + tasks.set(task, { resolve: _resolve, reject: _reject }) + if (!running) drainPromise = executeNextTask() + return promise + }, + async drain () { + return drainPromise + } + } +} class Executor { constructor () { this.buffer = [] this.ready = false - // This queue will execute all commands, one-by-one in order - this.queue = async.queue((task, cb) => { + this.queue = makeQueue(async task => { // task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array const newArguments = Array.from(task.arguments) - const lastArg = newArguments[newArguments.length - 1] - - // Always tell the queue task is complete. Execute callback if any was given. - if (typeof lastArg === 'function') { - // Callback was supplied - newArguments[newArguments.length - 1] = function () { - if (typeof setImmediate === 'function') { - setImmediate(cb) + // If the task isn't async, let's proceed with the old handler + if (!task.async) { + const lastArg = newArguments[newArguments.length - 1] + await new Promise(resolve => { + if (typeof lastArg === 'function') { + // We got a callback + newArguments.pop() // remove original callback + task.fn.apply(task.this, [...newArguments, function () { + resolve() // triggers next task after next tick + lastArg.apply(null, arguments) // call original callback + }]) + } else if (!lastArg && task.arguments.length !== 0) { + // We got a falsy callback + newArguments.pop() // remove original callback + task.fn.apply(task.this, [...newArguments, () => { + resolve() + }]) } else { - process.nextTick(cb) + // We don't have a callback + task.fn.apply(task.this, [...newArguments, () => { + resolve() + }]) } - lastArg.apply(null, arguments) - } - } else if (!lastArg && task.arguments.length !== 0) { - // false/undefined/null supplied as callback - newArguments[newArguments.length - 1] = () => { cb() } + }) } else { - // Nothing supplied as callback - newArguments.push(() => { cb() }) + await task.fn.apply(task.this, newArguments) } - - task.fn.apply(task.this, newArguments) - }, 1) + }) } /** diff --git a/test/cursor.test.js b/test/cursor.test.js index f1fe16d..42d8566 100755 --- a/test/cursor.test.js +++ b/test/cursor.test.js @@ -29,12 +29,10 @@ describe('Cursor', function () { }) }) }, - function (cb) { - d.loadDatabase(function (err) { - assert.isNull(err) - d.getAllData().length.should.equal(0) - return cb() - }) + async function (cb) { + await d.loadDatabaseAsync() + d.getAllData().length.should.equal(0) + cb() } ], done) })