WIP: async Cursor and fix async executor

Timothée Rebours 3 years ago
parent 477316a470
commit 622c9f7a94
  1. 81
      lib/cursor.js
  2. 44
      lib/datastore.js
  3. 2
      lib/executor.js

@ -2,6 +2,7 @@
* Manage access to data, be it to find, update or remove it * Manage access to data, be it to find, update or remove it
*/ */
const model = require('./model.js') const model = require('./model.js')
const { callbackify, promisify } = require('util')
class Cursor { class Cursor {
/** /**
@ -10,10 +11,11 @@ class Cursor {
* @param {Query} query - The query this cursor will operate on * @param {Query} query - The query this cursor will operate on
* @param {Function} execFn - Handler to be executed after cursor has found the results and before the callback passed to find/findOne/update/remove * @param {Function} execFn - Handler to be executed after cursor has found the results and before the callback passed to find/findOne/update/remove
*/ */
constructor (db, query, execFn) { constructor (db, query, execFn, async = false) {
this.db = db this.db = db
this.query = query || {} this.query = query || {}
if (execFn) { this.execFn = execFn } if (execFn) { this.execFn = execFn }
if (async) { this.async = true }
} }
/** /**
@ -103,50 +105,33 @@ class Cursor {
* *
* @param {Function} callback - Signature: err, results * @param {Function} callback - Signature: err, results
*/ */
_exec (_callback) { async _execAsync () {
let res = [] let res = []
let added = 0 let added = 0
let skipped = 0 let skipped = 0
let error = null let error = null
let keys
let key
const callback = (error, res) => {
if (this.execFn) return this.execFn(error, res, _callback)
else return _callback(error, res)
}
this.db.getCandidates(this.query, (err, candidates) => { try {
if (err) return callback(err) const candidates = await this.db.getCandidatesAsync(this.query)
try { for (const candidate of candidates) {
for (const candidate of candidates) { if (model.match(candidate, this.query)) {
if (model.match(candidate, this.query)) { // If a sort is defined, wait for the results to be sorted before applying limit and skip
// If a sort is defined, wait for the results to be sorted before applying limit and skip if (!this._sort) {
if (!this._sort) { if (this._skip && this._skip > skipped) skipped += 1
if (this._skip && this._skip > skipped) skipped += 1 else {
else { res.push(candidate)
res.push(candidate) added += 1
added += 1 if (this._limit && this._limit <= added) break
if (this._limit && this._limit <= added) break }
} } else res.push(candidate)
} else res.push(candidate)
}
} }
} catch (err) {
return callback(err)
} }
// Apply all sorts // Apply all sorts
if (this._sort) { if (this._sort) {
keys = Object.keys(this._sort)
// Sorting // Sorting
const criteria = [] const criteria = Object.entries(this._sort).map(([key, direction]) => ({ key, direction }))
keys.forEach(item => {
key = item
criteria.push({ key: key, direction: this._sort[key] })
})
res.sort((a, b) => { res.sort((a, b) => {
for (const criterion of criteria) { for (const criterion of criteria) {
const compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), this.db.compareStrings) const compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), this.db.compareStrings)
@ -169,14 +154,38 @@ class Cursor {
error = e error = e
res = undefined res = undefined
} }
} catch (e) {
error = e
}
if (this.execFn && !this.async) return promisify(this.execFn)(error, res)
else if (error) throw error
else if (this.execFn) return this.execFn(res)
else return res
}
return callback(error, res) _exec (_callback) {
}) callbackify(this._execAsync.bind(this))(_callback)
} }
exec () { exec () {
this.db.executor.push({ this: this, fn: this._exec, arguments: arguments }) this.db.executor.push({ this: this, fn: this._exec, arguments: arguments })
} }
execAsync () {
return this.db.executor.pushAsync({ this: this, fn: this._execAsync, arguments: arguments })
}
then (onFulfilled, onRejected) {
return this.execAsync().then(onFulfilled, onRejected)
}
catch (onRejected) {
return this.execAsync().catch(onRejected)
}
finally (onFinally) {
return this.execAsync().finally(onFinally)
}
} }
// Interface // Interface

@ -129,7 +129,6 @@ class Datastore extends EventEmitter {
} }
async ensureIndexAsync (options = {}) { async ensureIndexAsync (options = {}) {
console.log('exec now')
if (!options.fieldName) { if (!options.fieldName) {
const err = new Error('Cannot create an index without a fieldName') const err = new Error('Cannot create an index without a fieldName')
err.missingFieldName = true err.missingFieldName = true
@ -451,15 +450,18 @@ class Datastore extends EventEmitter {
* @param {Function} callback Optional callback, signature: err, count * @param {Function} callback Optional callback, signature: err, count
*/ */
count (query, callback) { count (query, callback) {
const cursor = new Cursor(this, query, function (err, docs, callback) { const cursor = this.countAsync(query)
if (err) { return callback(err) }
return callback(null, docs.length)
})
if (typeof callback === 'function') cursor.exec(callback) if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback)
else return cursor else return cursor
} }
countAsync (query) {
const cursor = new Cursor(this, query, async docs => docs.length, true)
return cursor // this is a trick, Cursor itself is a thenable, which allows to await it
}
/** /**
* Find all documents matching the query * Find all documents matching the query
* If no callback is passed, we return the cursor so that user can limit, skip and finally exec * If no callback is passed, we return the cursor so that user can limit, skip and finally exec
@ -478,17 +480,17 @@ class Datastore extends EventEmitter {
} // If not assume projection is an object and callback undefined } // If not assume projection is an object and callback undefined
} }
const cursor = new Cursor(this, query, function (err, docs, callback) { const cursor = this.findAsync(query, projection)
if (err) { return callback(err) }
const res = docs.map(doc => model.deepCopy(doc)) if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback)
else return cursor
}
return callback(null, res) findAsync (query, projection = {}) {
}) const cursor = new Cursor(this, query, docs => docs.map(doc => model.deepCopy(doc)), true)
cursor.projection(projection) cursor.projection(projection)
if (typeof callback === 'function') cursor.exec(callback) return cursor
else return cursor
} }
/** /**
@ -508,17 +510,19 @@ class Datastore extends EventEmitter {
} // If not assume projection is an object and callback undefined } // If not assume projection is an object and callback undefined
} }
const cursor = new Cursor(this, query, (err, docs, callback) => { const cursor = this.findOneAsync(query, projection)
if (err) return callback(err)
if (docs.length === 1) return callback(null, model.deepCopy(docs[0]))
else return callback(null, null)
})
cursor.projection(projection).limit(1) if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback)
if (typeof callback === 'function') cursor.exec(callback)
else return cursor else return cursor
} }
findOneAsync (query, projection = {}) {
const cursor = new Cursor(this, query, docs => docs.length === 1 ? model.deepCopy(docs[0]) : null, true)
cursor.projection(projection).limit(1)
return cursor
}
/** /**
* Update all docs matching query. * Update all docs matching query.
* Use Datastore.update which has the same signature * Use Datastore.update which has the same signature

@ -99,7 +99,7 @@ class Executor {
} }
}) })
} else { } else {
await task.fn.apply(task.this, newArguments) return task.fn.apply(task.this, newArguments)
} }
}) })
} }

Loading…
Cancel
Save