diff --git a/lib/cursor.js b/lib/cursor.js index b0eb402..2c318ca 100755 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -172,7 +172,7 @@ class Cursor { } execAsync (...args) { - return this.db.executor.pushAsync({ this: this, fn: this._execAsync, arguments: args }) + return this.db.executor.pushAsync(() => this._execAsync(...args)) } then (onFulfilled, onRejected) { diff --git a/lib/datastore.js b/lib/datastore.js index 14e125c..e6a7518 100755 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -90,11 +90,7 @@ class Datastore extends EventEmitter { } loadDatabaseAsync (...args) { - return this.executor.pushAsync({ - this: this.persistence, - fn: this.persistence.loadDatabaseAsync, - arguments: args - }, true) + return this.executor.pushAsync(() => this.persistence.loadDatabaseAsync(args), true) } /** @@ -607,7 +603,7 @@ class Datastore extends EventEmitter { } updateAsync (...args) { - return this.executor.pushAsync({ this: this, fn: this._updateAsync, arguments: args }) + return this.executor.pushAsync(() => this._updateAsync(args)) } /** @@ -655,7 +651,7 @@ class Datastore extends EventEmitter { } removeAsync (...args) { - return this.executor.pushAsync({ this: this, fn: this._removeAsync, arguments: args }) + return this.executor.pushAsync(() => this._removeAsync(args)) } } diff --git a/lib/executor.js b/lib/executor.js index 1d7f7f6..6233c13 100755 --- a/lib/executor.js +++ b/lib/executor.js @@ -1,104 +1,41 @@ /** * Responsible for sequentially executing actions on the database */ - -class Queue { - constructor (execute) { - this.execute = execute - this.tasks = new Map() - this.buffer = new Map() - this.running = false - this.drainPromise = Promise.resolve() - } - - async executeNextTask (force = false) { - if (!this.tasks.size) { - this.running = false - return - } else if (this.running && !force) return - this.running = true - const [task, { resolve, reject, async }] = this.tasks[Symbol.iterator]().next().value - - this.tasks.delete(task) - try { - resolve(await this.execute(task, async)) - } catch (err) { - reject(err) - } - this.drainPromise = this.executeNextTask(true) - } - - _push (task, async, map, run = false) { - let _resolve, _reject - const promise = new Promise((resolve, reject) => { - _reject = reject - _resolve = resolve - }) - map.set(task, { async: async, resolve: _resolve, reject: _reject }) - if (run && !this.running) this.drainPromise = this.executeNextTask() - return promise - } - - push (task) { - this._push(task, false, this.tasks, true).then(() => {}, () => {}) // to avoid having unhandledRejection - } - - pushAsync (task) { - return this._push(task, true, this.tasks, true) - } - - addToBuffer (task) { - this._push(task, false, this.buffer, false).then(() => {}, () => {}) // to avoid having unhandledRejection +class Waterfall { + constructor () { + this._guardian = Promise.resolve() } - addToBufferAsync (task) { - return this._push(task, true, this.buffer, false) + get guardian () { + return this._guardian } - processBuffer () { - this.tasks = new Map([...this.tasks, ...this.buffer]) - this.buffer = new Map() - this.drainPromise = this.executeNextTask() + waterfall (func) { + return (...args) => { + this._guardian = this.guardian.then(() => { + return func(...args) + .then(result => ({ error: false, result }), result => ({ error: true, result })) + }) + return this.guardian.then(({ error, result }) => { + if (error) return Promise.reject(result) + else return Promise.resolve(result) + }) + } } - async drain () { - return this.drainPromise + chain (promise) { + return this.waterfall(() => promise)() } } class Executor { constructor () { this.ready = false - - this.queue = new Queue(async (task, async) => { - // If the task isn't async, let's proceed with the old handler - if (!async) { - const lastArg = task.arguments[task.arguments.length - 1] - await new Promise(resolve => { - if (typeof lastArg === 'function') { - // We got a callback - task.arguments.pop() // remove original callback - task.fn.apply(task.this, [...task.arguments, function () { - resolve() // triggers next task after next tick // TODO: check if it's at next tick or not - lastArg.apply(null, arguments) // call original callback - }]) - } else if (!lastArg && task.arguments.length !== 0) { - // We got a falsy callback - task.arguments.pop() // remove original callback - task.fn.apply(task.this, [...task.arguments, () => { - resolve() - }]) - } else { - // We don't have a callback - task.fn.apply(task.this, [...task.arguments, () => { - resolve() - }]) - } - }) - } else { - return task.fn.apply(task.this, task.arguments) - } - }) + this._mainWaterfallObject = new Waterfall() + this._bufferWaterfallObject = new Waterfall() + this._bufferWaterfallObject.chain(new Promise(resolve => { + this._resolveBuffer = resolve + })) } /** @@ -112,13 +49,36 @@ class Executor { * @param {Boolean} forceQueuing Optional (defaults to false) force executor to queue task even if it is not ready */ push (task, forceQueuing) { - if (this.ready || forceQueuing) this.queue.push(task) - else this.queue.addToBuffer(task) + const func = async () => { + const lastArg = task.arguments[task.arguments.length - 1] + await new Promise(resolve => { + if (typeof lastArg === 'function') { + // We got a callback + task.arguments.pop() // remove original callback + task.fn.apply(task.this, [...task.arguments, function () { + resolve() // triggers next task after next tick // TODO: check if it's at next tick or not + lastArg.apply(null, arguments) // call original callback + }]) + } else if (!lastArg && task.arguments.length !== 0) { + // We got a falsy callback + task.arguments.pop() // remove original callback + task.fn.apply(task.this, [...task.arguments, () => { + resolve() + }]) + } else { + // We don't have a callback + task.fn.apply(task.this, [...task.arguments, () => { + resolve() + }]) + } + }) + } + this.pushAsync(func, forceQueuing) } pushAsync (task, forceQueuing) { - if (this.ready || forceQueuing) return this.queue.pushAsync(task) - else return this.queue.addToBufferAsync(task) + if (this.ready || forceQueuing) return this._mainWaterfallObject.waterfall(task)() + else return this._bufferWaterfallObject.waterfall(task)() } /** @@ -127,7 +87,8 @@ class Executor { */ processBuffer () { this.ready = true - this.queue.processBuffer() + this._resolveBuffer() + this._mainWaterfallObject.waterfall(() => this._bufferWaterfallObject.guardian) } }