diff --git a/lib/datastore.js b/lib/datastore.js index 8d45b89..77a19ef 100755 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -90,11 +90,10 @@ class Datastore extends EventEmitter { } loadDatabaseAsync () { - return this.executor.push({ + return this.executor.pushAsync({ this: this.persistence, fn: this.persistence.loadDatabaseAsync, - arguments: arguments, - async: true + arguments: arguments }, true) } diff --git a/lib/executor.js b/lib/executor.js index dd866ec..46b8c3e 100755 --- a/lib/executor.js +++ b/lib/executor.js @@ -2,57 +2,80 @@ * Responsible for sequentially executing actions on the database */ -const makeQueue = execute => { - const tasks = new Map() - let running = false - let drainPromise = Promise.resolve() - const executeNextTask = async (self = false) => { - if (!tasks.size) { - running = false - return - } else if (running && !self) { +class Queue { + constructor (execute) { + this.execute = execute + this.tasks = new Map() + this.buffer = new Map() + this.running = false + this.drainPromise = Promise.resolve() + } + + async executeNextTask (force = false) { + if (!this.tasks.size) { + this.running = false return - } - running = true - const [task, { resolve, reject }] = tasks[Symbol.iterator]().next().value + } else if (this.running && !force) return + this.running = true + const [task, { resolve, reject, async }] = this.tasks[Symbol.iterator]().next().value - tasks.delete(task) + this.tasks.delete(task) try { - resolve(await execute(task)) + resolve(await this.execute(task, async)) } catch (err) { reject(err) } - drainPromise = executeNextTask(true) + this.drainPromise = this.executeNextTask(true) } - return { - push (task) { - let _resolve, _reject - const promise = new Promise((resolve, reject) => { - _reject = reject - _resolve = resolve - }) - tasks.set(task, { resolve: _resolve, reject: _reject }) - if (!running) drainPromise = executeNextTask() - return promise - }, - async drain () { - return drainPromise - } + _push (task, async, map, run = false) { + let _resolve, _reject + const promise = new Promise((resolve, reject) => { + _reject = reject + _resolve = resolve + }) + map.set(task, { async: async, resolve: _resolve, reject: _reject }) + if (run && !this.running) this.drainPromise = this.executeNextTask() + return promise + } + + push (task) { + this._push(task, false, this.tasks, true).then(() => {}, () => {}) // to avoid having unhandledRejection + } + + pushAsync (task) { + return this._push(task, true, this.tasks, true) + } + + addToBuffer (task) { + this._push(task, false, this.buffer, false).then(() => {}, () => {}) // to avoid having unhandledRejection + } + + addToBufferAsync (task) { + return this._push(task, true, this.buffer, false) + } + + processBuffer () { + this.tasks = new Map([...this.tasks, ...this.buffer]) + this.buffer = new Map() + this.drainPromise = this.executeNextTask() + } + + async drain () { + return this.drainPromise } } class Executor { constructor () { - this.buffer = [] this.ready = false - this.queue = makeQueue(async task => { + this.queue = new Queue(async (task, async) => { // task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array const newArguments = Array.from(task.arguments) // If the task isn't async, let's proceed with the old handler - if (!task.async) { + if (!async) { const lastArg = newArguments[newArguments.length - 1] await new Promise(resolve => { if (typeof lastArg === 'function') { @@ -93,7 +116,12 @@ class Executor { */ push (task, forceQueuing) { if (this.ready || forceQueuing) this.queue.push(task) - else this.buffer.push(task) + else this.queue.addToBuffer(task) + } + + pushAsync (task, forceQueuing) { + if (this.ready || forceQueuing) return this.queue.pushAsync(task) + else return this.queue.addToBufferAsync(task) } /** @@ -102,8 +130,7 @@ class Executor { */ processBuffer () { this.ready = true - this.buffer.forEach(task => { this.queue.push(task) }) - this.buffer = [] + this.queue.processBuffer() } }