WIP: rewrite executor without async

pull/11/head
Timothée Rebours 3 years ago
parent 7f78b403ae
commit fbf2bd8d68
  1. 31
      lib/datastore.js
  2. 87
      lib/executor.js
  3. 10
      test/cursor.test.js

@ -89,6 +89,15 @@ class Datastore extends EventEmitter {
this.executor.push({ this: this.persistence, fn: this.persistence.loadDatabase, arguments: arguments }, true)
}
loadDatabaseAsync () {
return this.executor.push({
this: this.persistence,
fn: this.persistence.loadDatabaseAsync,
arguments: arguments,
async: true
}, true)
}
/**
* Get an array of all the data in the database
*/
@ -343,19 +352,15 @@ class Datastore extends EventEmitter {
* @private
*/
_insert (newDoc, callback = () => {}) {
let preparedDoc
return callbackify(this._insertAsync.bind(this))(newDoc, callback)
}
try {
preparedDoc = this._prepareDocumentForInsertion(newDoc)
this._insertInCache(preparedDoc)
} catch (e) {
return callback(e)
}
async _insertAsync (newDoc) {
const preparedDoc = this._prepareDocumentForInsertion(newDoc)
this._insertInCache(preparedDoc)
this.persistence.persistNewState(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc], err => {
if (err) return callback(err)
return callback(null, model.deepCopy(preparedDoc))
})
await this.persistence.persistNewStateAsync(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc])
return model.deepCopy(preparedDoc)
}
/**
@ -433,6 +438,10 @@ class Datastore extends EventEmitter {
this.executor.push({ this: this, fn: this._insert, arguments: arguments })
}
insertAsync () {
this.executor.push({ this: this, fn: this._insertAsync, arguments: arguments, async: true })
}
/**
* Count all documents matching the query
* @param {Query} query MongoDB-style query

@ -1,41 +1,84 @@
/**
* Responsible for sequentially executing actions on the database
*/
const async = require('async')
const makeQueue = execute => {
const tasks = new Map()
let running = false
let drainPromise = Promise.resolve()
const executeNextTask = async (self = false) => {
if (!tasks.size) {
running = false
return
} else if (running && !self) {
return
}
running = true
const [task, { resolve, reject }] = tasks[Symbol.iterator]().next().value
tasks.delete(task)
try {
resolve(await execute(task))
} catch (err) {
reject(err)
}
drainPromise = executeNextTask(true)
}
return {
push (task) {
let _resolve, _reject
const promise = new Promise((resolve, reject) => {
_reject = reject
_resolve = resolve
})
tasks.set(task, { resolve: _resolve, reject: _reject })
if (!running) drainPromise = executeNextTask()
return promise
},
async drain () {
return drainPromise
}
}
}
class Executor {
constructor () {
this.buffer = []
this.ready = false
// This queue will execute all commands, one-by-one in order
this.queue = async.queue((task, cb) => {
this.queue = makeQueue(async task => {
// task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array
const newArguments = Array.from(task.arguments)
const lastArg = newArguments[newArguments.length - 1]
// Always tell the queue task is complete. Execute callback if any was given.
if (typeof lastArg === 'function') {
// Callback was supplied
newArguments[newArguments.length - 1] = function () {
if (typeof setImmediate === 'function') {
setImmediate(cb)
// If the task isn't async, let's proceed with the old handler
if (!task.async) {
const lastArg = newArguments[newArguments.length - 1]
await new Promise(resolve => {
if (typeof lastArg === 'function') {
// We got a callback
newArguments.pop() // remove original callback
task.fn.apply(task.this, [...newArguments, function () {
resolve() // triggers next task after next tick
lastArg.apply(null, arguments) // call original callback
}])
} else if (!lastArg && task.arguments.length !== 0) {
// We got a falsy callback
newArguments.pop() // remove original callback
task.fn.apply(task.this, [...newArguments, () => {
resolve()
}])
} else {
process.nextTick(cb)
// We don't have a callback
task.fn.apply(task.this, [...newArguments, () => {
resolve()
}])
}
lastArg.apply(null, arguments)
}
} else if (!lastArg && task.arguments.length !== 0) {
// false/undefined/null supplied as callback
newArguments[newArguments.length - 1] = () => { cb() }
})
} else {
// Nothing supplied as callback
newArguments.push(() => { cb() })
await task.fn.apply(task.this, newArguments)
}
task.fn.apply(task.this, newArguments)
}, 1)
})
}
/**

@ -29,12 +29,10 @@ describe('Cursor', function () {
})
})
},
function (cb) {
d.loadDatabase(function (err) {
assert.isNull(err)
d.getAllData().length.should.equal(0)
return cb()
})
async function (cb) {
await d.loadDatabaseAsync()
d.getAllData().length.should.equal(0)
cb()
}
], done)
})

Loading…
Cancel
Save