From 622c9f7a94aed8813452a0c4964a660d8efd5dd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timoth=C3=A9e=20Rebours?= Date: Fri, 22 Oct 2021 08:54:35 +0200 Subject: [PATCH] WIP: async Cursor and fix async executor --- lib/cursor.js | 81 +++++++++++++++++++++++++++--------------------- lib/datastore.js | 44 ++++++++++++++------------ lib/executor.js | 2 +- 3 files changed, 70 insertions(+), 57 deletions(-) diff --git a/lib/cursor.js b/lib/cursor.js index ed37416..9521458 100755 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -2,6 +2,7 @@ * Manage access to data, be it to find, update or remove it */ const model = require('./model.js') +const { callbackify, promisify } = require('util') class Cursor { /** @@ -10,10 +11,11 @@ class Cursor { * @param {Query} query - The query this cursor will operate on * @param {Function} execFn - Handler to be executed after cursor has found the results and before the callback passed to find/findOne/update/remove */ - constructor (db, query, execFn) { + constructor (db, query, execFn, async = false) { this.db = db this.query = query || {} if (execFn) { this.execFn = execFn } + if (async) { this.async = true } } /** @@ -103,50 +105,33 @@ class Cursor { * * @param {Function} callback - Signature: err, results */ - _exec (_callback) { + async _execAsync () { let res = [] let added = 0 let skipped = 0 let error = null - let keys - let key - - const callback = (error, res) => { - if (this.execFn) return this.execFn(error, res, _callback) - else return _callback(error, res) - } - this.db.getCandidates(this.query, (err, candidates) => { - if (err) return callback(err) - - try { - for (const candidate of candidates) { - if (model.match(candidate, this.query)) { - // If a sort is defined, wait for the results to be sorted before applying limit and skip - if (!this._sort) { - if (this._skip && this._skip > skipped) skipped += 1 - else { - res.push(candidate) - added += 1 - if (this._limit && this._limit <= added) break - } - } else res.push(candidate) - } + try { + const candidates = await this.db.getCandidatesAsync(this.query) + + for (const candidate of candidates) { + if (model.match(candidate, this.query)) { + // If a sort is defined, wait for the results to be sorted before applying limit and skip + if (!this._sort) { + if (this._skip && this._skip > skipped) skipped += 1 + else { + res.push(candidate) + added += 1 + if (this._limit && this._limit <= added) break + } + } else res.push(candidate) } - } catch (err) { - return callback(err) } // Apply all sorts if (this._sort) { - keys = Object.keys(this._sort) - // Sorting - const criteria = [] - keys.forEach(item => { - key = item - criteria.push({ key: key, direction: this._sort[key] }) - }) + const criteria = Object.entries(this._sort).map(([key, direction]) => ({ key, direction })) res.sort((a, b) => { for (const criterion of criteria) { const compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), this.db.compareStrings) @@ -169,14 +154,38 @@ class Cursor { error = e res = undefined } + } catch (e) { + error = e + } + if (this.execFn && !this.async) return promisify(this.execFn)(error, res) + else if (error) throw error + else if (this.execFn) return this.execFn(res) + else return res + } - return callback(error, res) - }) + _exec (_callback) { + callbackify(this._execAsync.bind(this))(_callback) } exec () { this.db.executor.push({ this: this, fn: this._exec, arguments: arguments }) } + + execAsync () { + return this.db.executor.pushAsync({ this: this, fn: this._execAsync, arguments: arguments }) + } + + then (onFulfilled, onRejected) { + return this.execAsync().then(onFulfilled, onRejected) + } + + catch (onRejected) { + return this.execAsync().catch(onRejected) + } + + finally (onFinally) { + return this.execAsync().finally(onFinally) + } } // Interface diff --git a/lib/datastore.js b/lib/datastore.js index 362a774..74411f1 100755 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -129,7 +129,6 @@ class Datastore extends EventEmitter { } async ensureIndexAsync (options = {}) { - console.log('exec now') if (!options.fieldName) { const err = new Error('Cannot create an index without a fieldName') err.missingFieldName = true @@ -451,15 +450,18 @@ class Datastore extends EventEmitter { * @param {Function} callback Optional callback, signature: err, count */ count (query, callback) { - const cursor = new Cursor(this, query, function (err, docs, callback) { - if (err) { return callback(err) } - return callback(null, docs.length) - }) + const cursor = this.countAsync(query) - if (typeof callback === 'function') cursor.exec(callback) + if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback) else return cursor } + countAsync (query) { + const cursor = new Cursor(this, query, async docs => docs.length, true) + + return cursor // this is a trick, Cursor itself is a thenable, which allows to await it + } + /** * Find all documents matching the query * If no callback is passed, we return the cursor so that user can limit, skip and finally exec @@ -478,17 +480,17 @@ class Datastore extends EventEmitter { } // If not assume projection is an object and callback undefined } - const cursor = new Cursor(this, query, function (err, docs, callback) { - if (err) { return callback(err) } + const cursor = this.findAsync(query, projection) - const res = docs.map(doc => model.deepCopy(doc)) + if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback) + else return cursor + } - return callback(null, res) - }) + findAsync (query, projection = {}) { + const cursor = new Cursor(this, query, docs => docs.map(doc => model.deepCopy(doc)), true) cursor.projection(projection) - if (typeof callback === 'function') cursor.exec(callback) - else return cursor + return cursor } /** @@ -508,17 +510,19 @@ class Datastore extends EventEmitter { } // If not assume projection is an object and callback undefined } - const cursor = new Cursor(this, query, (err, docs, callback) => { - if (err) return callback(err) - if (docs.length === 1) return callback(null, model.deepCopy(docs[0])) - else return callback(null, null) - }) + const cursor = this.findOneAsync(query, projection) - cursor.projection(projection).limit(1) - if (typeof callback === 'function') cursor.exec(callback) + if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback) else return cursor } + findOneAsync (query, projection = {}) { + const cursor = new Cursor(this, query, docs => docs.length === 1 ? model.deepCopy(docs[0]) : null, true) + + cursor.projection(projection).limit(1) + return cursor + } + /** * Update all docs matching query. * Use Datastore.update which has the same signature diff --git a/lib/executor.js b/lib/executor.js index 46b8c3e..8a76078 100755 --- a/lib/executor.js +++ b/lib/executor.js @@ -99,7 +99,7 @@ class Executor { } }) } else { - await task.fn.apply(task.this, newArguments) + return task.fn.apply(task.this, newArguments) } }) }