WIP: Successfully replace a waterfall with native async/await

Timothée Rebours 3 years ago
parent 49574da5e2
commit 9b3b2a0afc
  1. 118
      lib/datastore.js
  2. 14
      test/db.test.js
  3. 10
      test/executor.test.js

@ -222,6 +222,38 @@ class Datastore extends EventEmitter {
}
}
_getCandidates (query) {
const indexNames = Object.keys(this.indexes)
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
// For a basic match
let usableQuery
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate(v) || v === null) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1])
// For a $in match
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1].$in)
// For a comparison match
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getBetweenBounds(usableQuery[1])
// By default, return all the DB data
return this.getAllData()
}
/**
* Return the list of candidates for a given query
* Crude implementation for now, we return the candidates given by the first usable index if any
@ -235,87 +267,41 @@ class Datastore extends EventEmitter {
* @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations
* @param {Function} callback Signature err, candidates
*/
getCandidates (query, dontExpireStaleDocs, callback) {
const indexNames = Object.keys(this.indexes)
let usableQueryKeys
async getCandidates (query, dontExpireStaleDocs, callback) {
const validDocs = []
if (typeof dontExpireStaleDocs === 'function') {
callback = dontExpireStaleDocs
dontExpireStaleDocs = false
}
async.waterfall([
try {
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
cb => {
// For a basic match
usableQueryKeys = []
Object.keys(query).forEach(k => {
if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || isDate(query[k]) || query[k] === null) {
usableQueryKeys.push(k)
}
})
usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k))
if (usableQueryKeys.length > 0) {
return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]]))
}
// For a $in match
usableQueryKeys = []
Object.keys(query).forEach(k => {
if (query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) {
usableQueryKeys.push(k)
}
})
usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k))
if (usableQueryKeys.length > 0) {
return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in))
}
// For a comparison match
usableQueryKeys = []
Object.keys(query).forEach(k => {
if (query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) {
usableQueryKeys.push(k)
}
})
usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k))
if (usableQueryKeys.length > 0) {
return cb(null, this.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]]))
}
// By default, return all the DB data
return cb(null, this.getAllData())
},
const docs = this._getCandidates(query)
// STEP 2: remove all expired documents
docs => {
if (dontExpireStaleDocs) return callback(null, docs)
if (!dontExpireStaleDocs) {
const expiredDocsIds = []
const validDocs = []
const ttlIndexesFieldNames = Object.keys(this.ttlIndexes)
docs.forEach(doc => {
let valid = true
ttlIndexesFieldNames.forEach(i => {
if (doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000) {
valid = false
}
})
if (valid) validDocs.push(doc)
if (ttlIndexesFieldNames.every(i => !(doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000))) validDocs.push(doc)
else expiredDocsIds.push(doc._id)
})
async.eachSeries(expiredDocsIds, (_id, cb) => {
this._remove({ _id: _id }, {}, err => {
if (err) return callback(err)
return cb()
for (const _id of expiredDocsIds) {
await new Promise((resolve, reject) => {
this._remove({ _id: _id }, {}, err => {
if (err) return reject(err)
return resolve()
})
})
// eslint-disable-next-line node/handle-callback-err
}, err => {
// TODO: handle error
return callback(null, validDocs)
})
}])
}
} else validDocs.push(...docs)
} catch (error) {
if (typeof callback === 'function') callback(error, null)
else throw error
}
if (typeof callback === 'function') callback(null, validDocs)
else return validDocs
}
/**

@ -431,9 +431,12 @@ describe('Database', function () {
it('If the callback throws an uncaught exception, do not catch it inside findOne, this is userspace concern', function (done) {
let tryCount = 0
const currentUncaughtExceptionHandlers = process.listeners('uncaughtException')
const currentUnhandledRejectionHandlers = process.listeners('unhandledRejection')
let i
process.removeAllListeners('uncaughtException')
process.removeAllListeners('unhandledRejection')
process.on('uncaughtException', function MINE (ex) {
process.removeAllListeners('uncaughtException')
@ -446,6 +449,17 @@ describe('Database', function () {
done()
})
process.on('unhandledRejection', function MINE (ex) {
process.removeAllListeners('unhandledRejection')
for (i = 0; i < currentUnhandledRejectionHandlers.length; i += 1) {
process.on('unhandledRejection', currentUnhandledRejectionHandlers[i])
}
ex.message.should.equal('SOME EXCEPTION')
done()
})
d.insert({ a: 5 }, function () {
// eslint-disable-next-line node/handle-callback-err
d.findOne({ a: 5 }, function (err, doc) {

@ -14,23 +14,33 @@ chai.should()
// We prevent Mocha from catching the exception we throw on purpose by remembering all current handlers, remove them and register them back after test ends
function testThrowInCallback (d, done) {
const currentUncaughtExceptionHandlers = process.listeners('uncaughtException')
const currentUnhandledRejectionHandlers = process.listeners('unhandledRejection')
process.removeAllListeners('uncaughtException')
process.removeAllListeners('unhandledRejection')
// eslint-disable-next-line node/handle-callback-err
process.on('uncaughtException', function (err) {
// Do nothing with the error which is only there to test we stay on track
})
process.on('unhandledRejection', function MINE (ex) {
// Do nothing with the error which is only there to test we stay on track
})
// eslint-disable-next-line node/handle-callback-err
d.find({}, function (err) {
process.nextTick(function () {
// eslint-disable-next-line node/handle-callback-err
d.insert({ bar: 1 }, function (err) {
process.removeAllListeners('uncaughtException')
process.removeAllListeners('unhandledRejection')
for (let i = 0; i < currentUncaughtExceptionHandlers.length; i += 1) {
process.on('uncaughtException', currentUncaughtExceptionHandlers[i])
}
for (let i = 0; i < currentUnhandledRejectionHandlers.length; i += 1) {
process.on('unhandledRejection', currentUnhandledRejectionHandlers[i])
}
done()
})

Loading…
Cancel
Save