WIP: rewrite executor

Timothée Rebours 3 years ago
parent 176d53c4ee
commit f6d7208740
  1. 5
      lib/datastore.js
  2. 83
      lib/executor.js

@ -90,11 +90,10 @@ class Datastore extends EventEmitter {
} }
loadDatabaseAsync () { loadDatabaseAsync () {
return this.executor.push({ return this.executor.pushAsync({
this: this.persistence, this: this.persistence,
fn: this.persistence.loadDatabaseAsync, fn: this.persistence.loadDatabaseAsync,
arguments: arguments, arguments: arguments
async: true
}, true) }, true)
} }

@ -2,57 +2,80 @@
* Responsible for sequentially executing actions on the database * Responsible for sequentially executing actions on the database
*/ */
const makeQueue = execute => { class Queue {
const tasks = new Map() constructor (execute) {
let running = false this.execute = execute
let drainPromise = Promise.resolve() this.tasks = new Map()
const executeNextTask = async (self = false) => { this.buffer = new Map()
if (!tasks.size) { this.running = false
running = false this.drainPromise = Promise.resolve()
return
} else if (running && !self) {
return
} }
running = true
const [task, { resolve, reject }] = tasks[Symbol.iterator]().next().value
tasks.delete(task) async executeNextTask (force = false) {
if (!this.tasks.size) {
this.running = false
return
} else if (this.running && !force) return
this.running = true
const [task, { resolve, reject, async }] = this.tasks[Symbol.iterator]().next().value
this.tasks.delete(task)
try { try {
resolve(await execute(task)) resolve(await this.execute(task, async))
} catch (err) { } catch (err) {
reject(err) reject(err)
} }
drainPromise = executeNextTask(true) this.drainPromise = this.executeNextTask(true)
} }
return { _push (task, async, map, run = false) {
push (task) {
let _resolve, _reject let _resolve, _reject
const promise = new Promise((resolve, reject) => { const promise = new Promise((resolve, reject) => {
_reject = reject _reject = reject
_resolve = resolve _resolve = resolve
}) })
tasks.set(task, { resolve: _resolve, reject: _reject }) map.set(task, { async: async, resolve: _resolve, reject: _reject })
if (!running) drainPromise = executeNextTask() if (run && !this.running) this.drainPromise = this.executeNextTask()
return promise return promise
},
async drain () {
return drainPromise
} }
push (task) {
this._push(task, false, this.tasks, true).then(() => {}, () => {}) // to avoid having unhandledRejection
}
pushAsync (task) {
return this._push(task, true, this.tasks, true)
}
addToBuffer (task) {
this._push(task, false, this.buffer, false).then(() => {}, () => {}) // to avoid having unhandledRejection
}
addToBufferAsync (task) {
return this._push(task, true, this.buffer, false)
}
processBuffer () {
this.tasks = new Map([...this.tasks, ...this.buffer])
this.buffer = new Map()
this.drainPromise = this.executeNextTask()
}
async drain () {
return this.drainPromise
} }
} }
class Executor { class Executor {
constructor () { constructor () {
this.buffer = []
this.ready = false this.ready = false
this.queue = makeQueue(async task => { this.queue = new Queue(async (task, async) => {
// task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array // task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array
const newArguments = Array.from(task.arguments) const newArguments = Array.from(task.arguments)
// If the task isn't async, let's proceed with the old handler // If the task isn't async, let's proceed with the old handler
if (!task.async) { if (!async) {
const lastArg = newArguments[newArguments.length - 1] const lastArg = newArguments[newArguments.length - 1]
await new Promise(resolve => { await new Promise(resolve => {
if (typeof lastArg === 'function') { if (typeof lastArg === 'function') {
@ -93,7 +116,12 @@ class Executor {
*/ */
push (task, forceQueuing) { push (task, forceQueuing) {
if (this.ready || forceQueuing) this.queue.push(task) if (this.ready || forceQueuing) this.queue.push(task)
else this.buffer.push(task) else this.queue.addToBuffer(task)
}
pushAsync (task, forceQueuing) {
if (this.ready || forceQueuing) return this.queue.pushAsync(task)
else return this.queue.addToBufferAsync(task)
} }
/** /**
@ -102,8 +130,7 @@ class Executor {
*/ */
processBuffer () { processBuffer () {
this.ready = true this.ready = true
this.buffer.forEach(task => { this.queue.push(task) }) this.queue.processBuffer()
this.buffer = []
} }
} }

Loading…
Cancel
Save