|
|
|
@ -2,57 +2,80 @@ |
|
|
|
|
* Responsible for sequentially executing actions on the database |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
const makeQueue = execute => { |
|
|
|
|
const tasks = new Map() |
|
|
|
|
let running = false |
|
|
|
|
let drainPromise = Promise.resolve() |
|
|
|
|
const executeNextTask = async (self = false) => { |
|
|
|
|
if (!tasks.size) { |
|
|
|
|
running = false |
|
|
|
|
return |
|
|
|
|
} else if (running && !self) { |
|
|
|
|
return |
|
|
|
|
class Queue { |
|
|
|
|
constructor (execute) { |
|
|
|
|
this.execute = execute |
|
|
|
|
this.tasks = new Map() |
|
|
|
|
this.buffer = new Map() |
|
|
|
|
this.running = false |
|
|
|
|
this.drainPromise = Promise.resolve() |
|
|
|
|
} |
|
|
|
|
running = true |
|
|
|
|
const [task, { resolve, reject }] = tasks[Symbol.iterator]().next().value |
|
|
|
|
|
|
|
|
|
tasks.delete(task) |
|
|
|
|
async executeNextTask (force = false) { |
|
|
|
|
if (!this.tasks.size) { |
|
|
|
|
this.running = false |
|
|
|
|
return |
|
|
|
|
} else if (this.running && !force) return |
|
|
|
|
this.running = true |
|
|
|
|
const [task, { resolve, reject, async }] = this.tasks[Symbol.iterator]().next().value |
|
|
|
|
|
|
|
|
|
this.tasks.delete(task) |
|
|
|
|
try { |
|
|
|
|
resolve(await execute(task)) |
|
|
|
|
resolve(await this.execute(task, async)) |
|
|
|
|
} catch (err) { |
|
|
|
|
reject(err) |
|
|
|
|
} |
|
|
|
|
drainPromise = executeNextTask(true) |
|
|
|
|
this.drainPromise = this.executeNextTask(true) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return { |
|
|
|
|
push (task) { |
|
|
|
|
_push (task, async, map, run = false) { |
|
|
|
|
let _resolve, _reject |
|
|
|
|
const promise = new Promise((resolve, reject) => { |
|
|
|
|
_reject = reject |
|
|
|
|
_resolve = resolve |
|
|
|
|
}) |
|
|
|
|
tasks.set(task, { resolve: _resolve, reject: _reject }) |
|
|
|
|
if (!running) drainPromise = executeNextTask() |
|
|
|
|
map.set(task, { async: async, resolve: _resolve, reject: _reject }) |
|
|
|
|
if (run && !this.running) this.drainPromise = this.executeNextTask() |
|
|
|
|
return promise |
|
|
|
|
}, |
|
|
|
|
async drain () { |
|
|
|
|
return drainPromise |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
push (task) { |
|
|
|
|
this._push(task, false, this.tasks, true).then(() => {}, () => {}) // to avoid having unhandledRejection
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pushAsync (task) { |
|
|
|
|
return this._push(task, true, this.tasks, true) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
addToBuffer (task) { |
|
|
|
|
this._push(task, false, this.buffer, false).then(() => {}, () => {}) // to avoid having unhandledRejection
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
addToBufferAsync (task) { |
|
|
|
|
return this._push(task, true, this.buffer, false) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
processBuffer () { |
|
|
|
|
this.tasks = new Map([...this.tasks, ...this.buffer]) |
|
|
|
|
this.buffer = new Map() |
|
|
|
|
this.drainPromise = this.executeNextTask() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async drain () { |
|
|
|
|
return this.drainPromise |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class Executor { |
|
|
|
|
constructor () { |
|
|
|
|
this.buffer = [] |
|
|
|
|
this.ready = false |
|
|
|
|
|
|
|
|
|
this.queue = makeQueue(async task => { |
|
|
|
|
this.queue = new Queue(async (task, async) => { |
|
|
|
|
// task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array
|
|
|
|
|
const newArguments = Array.from(task.arguments) |
|
|
|
|
|
|
|
|
|
// If the task isn't async, let's proceed with the old handler
|
|
|
|
|
if (!task.async) { |
|
|
|
|
if (!async) { |
|
|
|
|
const lastArg = newArguments[newArguments.length - 1] |
|
|
|
|
await new Promise(resolve => { |
|
|
|
|
if (typeof lastArg === 'function') { |
|
|
|
@ -93,7 +116,12 @@ class Executor { |
|
|
|
|
*/ |
|
|
|
|
push (task, forceQueuing) { |
|
|
|
|
if (this.ready || forceQueuing) this.queue.push(task) |
|
|
|
|
else this.buffer.push(task) |
|
|
|
|
else this.queue.addToBuffer(task) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pushAsync (task, forceQueuing) { |
|
|
|
|
if (this.ready || forceQueuing) return this.queue.pushAsync(task) |
|
|
|
|
else return this.queue.addToBufferAsync(task) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -102,8 +130,7 @@ class Executor { |
|
|
|
|
*/ |
|
|
|
|
processBuffer () { |
|
|
|
|
this.ready = true |
|
|
|
|
this.buffer.forEach(task => { this.queue.push(task) }) |
|
|
|
|
this.buffer = [] |
|
|
|
|
this.queue.processBuffer() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|