From 5a4859c3baa31dd08a68ab07511cdb0029ef8317 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timoth=C3=A9e=20Rebours?= Date: Tue, 19 Oct 2021 17:37:36 +0200 Subject: [PATCH] WIP: Successfully replace a waterfall with native async/await --- lib/datastore.js | 118 +++++++++++++++++++----------------------- test/db.test.js | 14 +++++ test/executor.test.js | 10 ++++ 3 files changed, 76 insertions(+), 66 deletions(-) diff --git a/lib/datastore.js b/lib/datastore.js index 16c4d19..491a3f2 100755 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -222,6 +222,38 @@ class Datastore extends EventEmitter { } } + _getCandidates (query) { + const indexNames = Object.keys(this.indexes) + // STEP 1: get candidates list by checking indexes from most to least frequent usecase + // For a basic match + let usableQuery + usableQuery = Object.entries(query) + .filter(([k, v]) => + !!(typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate(v) || v === null) && + indexNames.includes(k) + ) + .pop() + if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1]) + // For a $in match + usableQuery = Object.entries(query) + .filter(([k, v]) => + !!(query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) && + indexNames.includes(k) + ) + .pop() + if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1].$in) + // For a comparison match + usableQuery = Object.entries(query) + .filter(([k, v]) => + !!(query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) && + indexNames.includes(k) + ) + .pop() + if (usableQuery) return this.indexes[usableQuery[0]].getBetweenBounds(usableQuery[1]) + // By default, return all the DB data + return this.getAllData() + } + /** * Return the list of candidates for a given query * Crude implementation for now, we return the candidates given by the first usable index if any @@ -235,87 +267,41 @@ class Datastore extends EventEmitter { * @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations * @param {Function} callback Signature err, candidates */ - getCandidates (query, dontExpireStaleDocs, callback) { - const indexNames = Object.keys(this.indexes) - let usableQueryKeys + async getCandidates (query, dontExpireStaleDocs, callback) { + const validDocs = [] if (typeof dontExpireStaleDocs === 'function') { callback = dontExpireStaleDocs dontExpireStaleDocs = false } - async.waterfall([ + try { // STEP 1: get candidates list by checking indexes from most to least frequent usecase - cb => { - // For a basic match - usableQueryKeys = [] - Object.keys(query).forEach(k => { - if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || isDate(query[k]) || query[k] === null) { - usableQueryKeys.push(k) - } - }) - usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k)) - if (usableQueryKeys.length > 0) { - return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]])) - } - - // For a $in match - usableQueryKeys = [] - Object.keys(query).forEach(k => { - if (query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) { - usableQueryKeys.push(k) - } - }) - usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k)) - if (usableQueryKeys.length > 0) { - return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in)) - } - - // For a comparison match - usableQueryKeys = [] - Object.keys(query).forEach(k => { - if (query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) { - usableQueryKeys.push(k) - } - }) - usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k)) - if (usableQueryKeys.length > 0) { - return cb(null, this.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]])) - } - - // By default, return all the DB data - return cb(null, this.getAllData()) - }, + const docs = this._getCandidates(query) // STEP 2: remove all expired documents - docs => { - if (dontExpireStaleDocs) return callback(null, docs) - + if (!dontExpireStaleDocs) { const expiredDocsIds = [] - const validDocs = [] const ttlIndexesFieldNames = Object.keys(this.ttlIndexes) docs.forEach(doc => { - let valid = true - ttlIndexesFieldNames.forEach(i => { - if (doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000) { - valid = false - } - }) - if (valid) validDocs.push(doc) + if (ttlIndexesFieldNames.every(i => !(doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000))) validDocs.push(doc) else expiredDocsIds.push(doc._id) }) - - async.eachSeries(expiredDocsIds, (_id, cb) => { - this._remove({ _id: _id }, {}, err => { - if (err) return callback(err) - return cb() + for (const _id of expiredDocsIds) { + await new Promise((resolve, reject) => { + this._remove({ _id: _id }, {}, err => { + if (err) return reject(err) + return resolve() + }) }) - // eslint-disable-next-line node/handle-callback-err - }, err => { - // TODO: handle error - return callback(null, validDocs) - }) - }]) + } + } else validDocs.push(...docs) + } catch (error) { + if (typeof callback === 'function') callback(error, null) + else throw error + } + if (typeof callback === 'function') callback(null, validDocs) + else return validDocs } /** diff --git a/test/db.test.js b/test/db.test.js index 8620adf..aff8e50 100755 --- a/test/db.test.js +++ b/test/db.test.js @@ -431,9 +431,12 @@ describe('Database', function () { it('If the callback throws an uncaught exception, do not catch it inside findOne, this is userspace concern', function (done) { let tryCount = 0 const currentUncaughtExceptionHandlers = process.listeners('uncaughtException') + const currentUnhandledRejectionHandlers = process.listeners('unhandledRejection') + let i process.removeAllListeners('uncaughtException') + process.removeAllListeners('unhandledRejection') process.on('uncaughtException', function MINE (ex) { process.removeAllListeners('uncaughtException') @@ -446,6 +449,17 @@ describe('Database', function () { done() }) + process.on('unhandledRejection', function MINE (ex) { + process.removeAllListeners('unhandledRejection') + + for (i = 0; i < currentUnhandledRejectionHandlers.length; i += 1) { + process.on('unhandledRejection', currentUnhandledRejectionHandlers[i]) + } + + ex.message.should.equal('SOME EXCEPTION') + done() + }) + d.insert({ a: 5 }, function () { // eslint-disable-next-line node/handle-callback-err d.findOne({ a: 5 }, function (err, doc) { diff --git a/test/executor.test.js b/test/executor.test.js index 13c6606..fda65ff 100755 --- a/test/executor.test.js +++ b/test/executor.test.js @@ -14,23 +14,33 @@ chai.should() // We prevent Mocha from catching the exception we throw on purpose by remembering all current handlers, remove them and register them back after test ends function testThrowInCallback (d, done) { const currentUncaughtExceptionHandlers = process.listeners('uncaughtException') + const currentUnhandledRejectionHandlers = process.listeners('unhandledRejection') process.removeAllListeners('uncaughtException') + process.removeAllListeners('unhandledRejection') // eslint-disable-next-line node/handle-callback-err process.on('uncaughtException', function (err) { // Do nothing with the error which is only there to test we stay on track }) + process.on('unhandledRejection', function MINE (ex) { + // Do nothing with the error which is only there to test we stay on track + }) + // eslint-disable-next-line node/handle-callback-err d.find({}, function (err) { process.nextTick(function () { // eslint-disable-next-line node/handle-callback-err d.insert({ bar: 1 }, function (err) { process.removeAllListeners('uncaughtException') + process.removeAllListeners('unhandledRejection') for (let i = 0; i < currentUncaughtExceptionHandlers.length; i += 1) { process.on('uncaughtException', currentUncaughtExceptionHandlers[i]) } + for (let i = 0; i < currentUnhandledRejectionHandlers.length; i += 1) { + process.on('unhandledRejection', currentUnhandledRejectionHandlers[i]) + } done() })