simplify executor

pull/11/head
Timothée Rebours 3 years ago
parent 42a7d45856
commit 06dc91915c
  1. 2
      lib/cursor.js
  2. 10
      lib/datastore.js
  3. 143
      lib/executor.js

@ -172,7 +172,7 @@ class Cursor {
} }
execAsync (...args) { execAsync (...args) {
return this.db.executor.pushAsync({ this: this, fn: this._execAsync, arguments: args }) return this.db.executor.pushAsync(() => this._execAsync(...args))
} }
then (onFulfilled, onRejected) { then (onFulfilled, onRejected) {

@ -90,11 +90,7 @@ class Datastore extends EventEmitter {
} }
loadDatabaseAsync (...args) { loadDatabaseAsync (...args) {
return this.executor.pushAsync({ return this.executor.pushAsync(() => this.persistence.loadDatabaseAsync(args), true)
this: this.persistence,
fn: this.persistence.loadDatabaseAsync,
arguments: args
}, true)
} }
/** /**
@ -607,7 +603,7 @@ class Datastore extends EventEmitter {
} }
updateAsync (...args) { updateAsync (...args) {
return this.executor.pushAsync({ this: this, fn: this._updateAsync, arguments: args }) return this.executor.pushAsync(() => this._updateAsync(args))
} }
/** /**
@ -655,7 +651,7 @@ class Datastore extends EventEmitter {
} }
removeAsync (...args) { removeAsync (...args) {
return this.executor.pushAsync({ this: this, fn: this._removeAsync, arguments: args }) return this.executor.pushAsync(() => this._removeAsync(args))
} }
} }

@ -1,104 +1,41 @@
/** /**
* Responsible for sequentially executing actions on the database * Responsible for sequentially executing actions on the database
*/ */
class Waterfall {
class Queue { constructor () {
constructor (execute) { this._guardian = Promise.resolve()
this.execute = execute
this.tasks = new Map()
this.buffer = new Map()
this.running = false
this.drainPromise = Promise.resolve()
}
async executeNextTask (force = false) {
if (!this.tasks.size) {
this.running = false
return
} else if (this.running && !force) return
this.running = true
const [task, { resolve, reject, async }] = this.tasks[Symbol.iterator]().next().value
this.tasks.delete(task)
try {
resolve(await this.execute(task, async))
} catch (err) {
reject(err)
}
this.drainPromise = this.executeNextTask(true)
}
_push (task, async, map, run = false) {
let _resolve, _reject
const promise = new Promise((resolve, reject) => {
_reject = reject
_resolve = resolve
})
map.set(task, { async: async, resolve: _resolve, reject: _reject })
if (run && !this.running) this.drainPromise = this.executeNextTask()
return promise
}
push (task) {
this._push(task, false, this.tasks, true).then(() => {}, () => {}) // to avoid having unhandledRejection
}
pushAsync (task) {
return this._push(task, true, this.tasks, true)
}
addToBuffer (task) {
this._push(task, false, this.buffer, false).then(() => {}, () => {}) // to avoid having unhandledRejection
} }
addToBufferAsync (task) { get guardian () {
return this._push(task, true, this.buffer, false) return this._guardian
} }
processBuffer () { waterfall (func) {
this.tasks = new Map([...this.tasks, ...this.buffer]) return (...args) => {
this.buffer = new Map() this._guardian = this.guardian.then(() => {
this.drainPromise = this.executeNextTask() return func(...args)
.then(result => ({ error: false, result }), result => ({ error: true, result }))
})
return this.guardian.then(({ error, result }) => {
if (error) return Promise.reject(result)
else return Promise.resolve(result)
})
}
} }
async drain () { chain (promise) {
return this.drainPromise return this.waterfall(() => promise)()
} }
} }
class Executor { class Executor {
constructor () { constructor () {
this.ready = false this.ready = false
this._mainWaterfallObject = new Waterfall()
this.queue = new Queue(async (task, async) => { this._bufferWaterfallObject = new Waterfall()
// If the task isn't async, let's proceed with the old handler this._bufferWaterfallObject.chain(new Promise(resolve => {
if (!async) { this._resolveBuffer = resolve
const lastArg = task.arguments[task.arguments.length - 1] }))
await new Promise(resolve => {
if (typeof lastArg === 'function') {
// We got a callback
task.arguments.pop() // remove original callback
task.fn.apply(task.this, [...task.arguments, function () {
resolve() // triggers next task after next tick // TODO: check if it's at next tick or not
lastArg.apply(null, arguments) // call original callback
}])
} else if (!lastArg && task.arguments.length !== 0) {
// We got a falsy callback
task.arguments.pop() // remove original callback
task.fn.apply(task.this, [...task.arguments, () => {
resolve()
}])
} else {
// We don't have a callback
task.fn.apply(task.this, [...task.arguments, () => {
resolve()
}])
}
})
} else {
return task.fn.apply(task.this, task.arguments)
}
})
} }
/** /**
@ -112,13 +49,36 @@ class Executor {
* @param {Boolean} forceQueuing Optional (defaults to false) force executor to queue task even if it is not ready * @param {Boolean} forceQueuing Optional (defaults to false) force executor to queue task even if it is not ready
*/ */
push (task, forceQueuing) { push (task, forceQueuing) {
if (this.ready || forceQueuing) this.queue.push(task) const func = async () => {
else this.queue.addToBuffer(task) const lastArg = task.arguments[task.arguments.length - 1]
await new Promise(resolve => {
if (typeof lastArg === 'function') {
// We got a callback
task.arguments.pop() // remove original callback
task.fn.apply(task.this, [...task.arguments, function () {
resolve() // triggers next task after next tick // TODO: check if it's at next tick or not
lastArg.apply(null, arguments) // call original callback
}])
} else if (!lastArg && task.arguments.length !== 0) {
// We got a falsy callback
task.arguments.pop() // remove original callback
task.fn.apply(task.this, [...task.arguments, () => {
resolve()
}])
} else {
// We don't have a callback
task.fn.apply(task.this, [...task.arguments, () => {
resolve()
}])
}
})
}
this.pushAsync(func, forceQueuing)
} }
pushAsync (task, forceQueuing) { pushAsync (task, forceQueuing) {
if (this.ready || forceQueuing) return this.queue.pushAsync(task) if (this.ready || forceQueuing) return this._mainWaterfallObject.waterfall(task)()
else return this.queue.addToBufferAsync(task) else return this._bufferWaterfallObject.waterfall(task)()
} }
/** /**
@ -127,7 +87,8 @@ class Executor {
*/ */
processBuffer () { processBuffer () {
this.ready = true this.ready = true
this.queue.processBuffer() this._resolveBuffer()
this._mainWaterfallObject.waterfall(() => this._bufferWaterfallObject.guardian)
} }
} }

Loading…
Cancel
Save