diff --git a/lib/datastore.js b/lib/datastore.js index 491a3f2..9dd255c 100755 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -1,4 +1,5 @@ const { EventEmitter } = require('events') +const { callbackify } = require('util') const async = require('async') const Cursor = require('./cursor.js') const customUtils = require('./customUtils.js') @@ -267,26 +268,61 @@ class Datastore extends EventEmitter { * @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations * @param {Function} callback Signature err, candidates */ - async getCandidates (query, dontExpireStaleDocs, callback) { - const validDocs = [] - + getCandidates (query, dontExpireStaleDocs, callback) { if (typeof dontExpireStaleDocs === 'function') { callback = dontExpireStaleDocs dontExpireStaleDocs = false } - try { + callbackify(this.getCandidatesAsync.bind(this))(query, dontExpireStaleDocs, callback) + } + + async getCandidatesAsync (query, dontExpireStaleDocs = false) { + const indexNames = Object.keys(this.indexes) + const validDocs = [] + + const _getCandidates = query => { // STEP 1: get candidates list by checking indexes from most to least frequent usecase - const docs = this._getCandidates(query) - // STEP 2: remove all expired documents + // For a basic match + let usableQuery + usableQuery = Object.entries(query) + .filter(([k, v]) => + !!(typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate(v) || v === null) && + indexNames.includes(k) + ) + .pop() + if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1]) + // For a $in match + usableQuery = Object.entries(query) + .filter(([k, v]) => + !!(query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) && + indexNames.includes(k) + ) + .pop() + if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1].$in) + // For a comparison match + usableQuery = Object.entries(query) + .filter(([k, v]) => + !!(query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) && + indexNames.includes(k) + ) + .pop() + if (usableQuery) return this.indexes[usableQuery[0]].getBetweenBounds(usableQuery[1]) + // By default, return all the DB data + return this.getAllData() + } + + // STEP 1: get candidates list by checking indexes from most to least frequent usecase + const docs = _getCandidates(query) + // STEP 2: remove all expired documents if (!dontExpireStaleDocs) { - const expiredDocsIds = [] - const ttlIndexesFieldNames = Object.keys(this.ttlIndexes) + const expiredDocsIds = [] + const ttlIndexesFieldNames = Object.keys(this.ttlIndexes) - docs.forEach(doc => { + docs.forEach(doc => { if (ttlIndexesFieldNames.every(i => !(doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000))) validDocs.push(doc) - else expiredDocsIds.push(doc._id) - }) + else expiredDocsIds.push(doc._id) + }) for (const _id of expiredDocsIds) { await new Promise((resolve, reject) => { this._remove({ _id: _id }, {}, err => { @@ -294,14 +330,9 @@ class Datastore extends EventEmitter { return resolve() }) }) - } - } else validDocs.push(...docs) - } catch (error) { - if (typeof callback === 'function') callback(error, null) - else throw error - } - if (typeof callback === 'function') callback(null, validDocs) - else return validDocs + } + } else validDocs.push(...docs) + return validDocs } /** @@ -509,92 +540,90 @@ class Datastore extends EventEmitter { options = {} } const callback = cb || (() => {}) + + const _callback = (err, res = {}) => { + callback(err, res.numAffected, res.affectedDocuments, res.upsert) + } + callbackify(this._updateAsync.bind(this))(query, updateQuery, options, _callback) + } + + async _updateAsync (query, updateQuery, options = {}) { const multi = options.multi !== undefined ? options.multi : false const upsert = options.upsert !== undefined ? options.upsert : false - async.waterfall([ - cb => { // If upsert option is set, check whether we need to insert the doc - if (!upsert) return cb() + // If upsert option is set, check whether we need to insert the doc + if (upsert) { + // Need to use an internal function not tied to the executor to avoid deadlock + const cursor = new Cursor(this, query) - // Need to use an internal function not tied to the executor to avoid deadlock - const cursor = new Cursor(this, query) + const docs = await new Promise((resolve, reject) => { cursor.limit(1)._exec((err, docs) => { - if (err) return callback(err) - if (docs.length === 1) return cb() - else { - let toBeInserted - - try { - model.checkObject(updateQuery) - // updateQuery is a simple object with no modifier, use it as the document to insert - toBeInserted = updateQuery - } catch (e) { - // updateQuery contains modifiers, use the find query as the base, - // strip it from all operators and update it according to updateQuery - try { - toBeInserted = model.modify(model.deepCopy(query, true), updateQuery) - } catch (err) { - return callback(err) - } - } - - return this._insert(toBeInserted, (err, newDoc) => { - if (err) return callback(err) - return callback(null, 1, newDoc, true) - }) - } + if (err) reject(err) + else resolve(docs) }) - }, - () => { // Perform the update - let numReplaced = 0 - let modifiedDoc - const modifications = [] - let createdAt - - this.getCandidates(query, (err, candidates) => { - if (err) return callback(err) - - // Preparing update (if an error is thrown here neither the datafile nor - // the in-memory indexes are affected) - try { - for (const candidate of candidates) { - if (model.match(candidate, query) && (multi || numReplaced === 0)) { - numReplaced += 1 - if (this.timestampData) { createdAt = candidate.createdAt } - modifiedDoc = model.modify(candidate, updateQuery) - if (this.timestampData) { - modifiedDoc.createdAt = createdAt - modifiedDoc.updatedAt = new Date() - } - modifications.push({ oldDoc: candidate, newDoc: modifiedDoc }) - } - } - } catch (err) { - return callback(err) - } + }) - // Change the docs in memory - try { - this.updateIndexes(modifications) - } catch (err) { - return callback(err) - } + if (docs.length !== 1) { + let toBeInserted + + try { + model.checkObject(updateQuery) + // updateQuery is a simple object with no modifier, use it as the document to insert + toBeInserted = updateQuery + } catch (e) { + // updateQuery contains modifiers, use the find query as the base, + // strip it from all operators and update it according to updateQuery + toBeInserted = model.modify(model.deepCopy(query, true), updateQuery) + } - // Update the datafile - const updatedDocs = modifications.map(x => x.newDoc) - this.persistence.persistNewState(updatedDocs, err => { - if (err) return callback(err) - if (!options.returnUpdatedDocs) { - return callback(null, numReplaced) - } else { - let updatedDocsDC = [] - updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) }) - if (!multi) updatedDocsDC = updatedDocsDC[0] - return callback(null, numReplaced, updatedDocsDC) - } + return new Promise((resolve, reject) => { + this._insert(toBeInserted, (err, newDoc) => { + if (err) return reject(err) + return resolve({ numAffected: 1, affectedDocuments: newDoc, upsert: true }) }) }) - }]) + } + } + // Perform the update + let numReplaced = 0 + let modifiedDoc + const modifications = [] + let createdAt + + const candidates = await this.getCandidatesAsync(query) + // Preparing update (if an error is thrown here neither the datafile nor + // the in-memory indexes are affected) + for (const candidate of candidates) { + if (model.match(candidate, query) && (multi || numReplaced === 0)) { + numReplaced += 1 + if (this.timestampData) { createdAt = candidate.createdAt } + modifiedDoc = model.modify(candidate, updateQuery) + if (this.timestampData) { + modifiedDoc.createdAt = createdAt + modifiedDoc.updatedAt = new Date() + } + modifications.push({ oldDoc: candidate, newDoc: modifiedDoc }) + } + } + + // Change the docs in memory + this.updateIndexes(modifications) + + // Update the datafile + const updatedDocs = modifications.map(x => x.newDoc) + await new Promise((resolve, reject) => { + this.persistence.persistNewState(updatedDocs, err => { + if (err) return reject(err) + else resolve() + }) + }) + if (!options.returnUpdatedDocs) return { numAffected: numReplaced } + else { + let updatedDocsDC = [] + updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) }) + if (!multi) updatedDocsDC = updatedDocsDC[0] + return { numAffected: numReplaced, affectedDocuments: updatedDocsDC } + } } update () {