From 645d87504dadc453213898760fcabd61134e291c Mon Sep 17 00:00:00 2001 From: Louis Chatriot Date: Mon, 18 Jan 2016 17:53:00 -0800 Subject: [PATCH] getCandidates is now async --- lib/cursor.js | 111 ++++++++++++++------------ lib/datastore.js | 188 ++++++++++++++++++++++++-------------------- lib/indexes.js | 6 +- test/cursor.test.js | 8 +- test/db.test.js | 85 ++++++++++---------- 5 files changed, 214 insertions(+), 184 deletions(-) diff --git a/lib/cursor.js b/lib/cursor.js index bd1c4e3..2817475 100755 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -103,76 +103,83 @@ Cursor.prototype.project = function (candidates) { * * @param {Function} callback - Signature: err, results */ -Cursor.prototype._exec = function(callback) { - var candidates = this.db.getCandidates(this.query) - , res = [], added = 0, skipped = 0, self = this +Cursor.prototype._exec = function(_callback) { + var res = [], added = 0, skipped = 0, self = this , error = null , i, keys, key ; - try { - for (i = 0; i < candidates.length; i += 1) { - if (model.match(candidates[i], this.query)) { - // If a sort is defined, wait for the results to be sorted before applying limit and skip - if (!this._sort) { - if (this._skip && this._skip > skipped) { - skipped += 1; + function callback (error, res) { + if (self.execFn) { + return self.execFn(error, res, _callback); + } else { + return _callback(error, res); + } + } + + this.db.getCandidates(this.query, function (err, candidates) { + if (err) { return callback(err); } + + try { + for (i = 0; i < candidates.length; i += 1) { + if (model.match(candidates[i], self.query)) { + // If a sort is defined, wait for the results to be sorted before applying limit and skip + if (!self._sort) { + if (self._skip && self._skip > skipped) { + skipped += 1; + } else { + res.push(candidates[i]); + added += 1; + if (self._limit && self._limit <= added) { break; } + } } else { res.push(candidates[i]); - added += 1; - if (this._limit && this._limit <= added) { break; } } - } else { - res.push(candidates[i]); } } + } catch (err) { + return callback(err); } - } catch (err) { - return callback(err); - } - // Apply all sorts - if (this._sort) { - keys = Object.keys(this._sort); + // Apply all sorts + if (self._sort) { + keys = Object.keys(self._sort); - // Sorting - var criteria = []; - for (i = 0; i < keys.length; i++) { - key = keys[i]; - criteria.push({ key: key, direction: self._sort[key] }); - } - res.sort(function(a, b) { - var criterion, compare, i; - for (i = 0; i < criteria.length; i++) { - criterion = criteria[i]; - compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), self.db.compareStrings); - if (compare !== 0) { - return compare; - } + // Sorting + var criteria = []; + for (i = 0; i < keys.length; i++) { + key = keys[i]; + criteria.push({ key: key, direction: self._sort[key] }); } - return 0; - }); + res.sort(function(a, b) { + var criterion, compare, i; + for (i = 0; i < criteria.length; i++) { + criterion = criteria[i]; + compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), self.db.compareStrings); + if (compare !== 0) { + return compare; + } + } + return 0; + }); - // Applying limit and skip - var limit = this._limit || res.length - , skip = this._skip || 0; + // Applying limit and skip + var limit = self._limit || res.length + , skip = self._skip || 0; - res = res.slice(skip, skip + limit); - } + res = res.slice(skip, skip + limit); + } - // Apply projection - try { - res = this.project(res); - } catch (e) { - error = e; - res = undefined; - } + // Apply projection + try { + res = self.project(res); + } catch (e) { + error = e; + res = undefined; + } - if (this.execFn) { - return this.execFn(error, res, callback); - } else { return callback(error, res); - } + }); }; Cursor.prototype.exec = function () { diff --git a/lib/datastore.js b/lib/datastore.js index 132330c..6ff9c60 100755 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -115,6 +115,7 @@ Datastore.prototype.resetIndexes = function (newData) { * @param {String} options.fieldName * @param {Boolean} options.unique * @param {Boolean} options.sparse + * @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index * @param {Function} cb Optional callback, signature: err */ Datastore.prototype.ensureIndex = function (options, cb) { @@ -243,50 +244,63 @@ Datastore.prototype.updateIndexes = function (oldDoc, newDoc) { * One way to make it better would be to enable the use of multiple indexes if the first usable index * returns too much data. I may do it in the future. * - * TODO: needs to be moved to the Cursor module + * Returned candidates will be scanned to find and remove all expired documents + * + * @param {Query} query + * @param {Function} callback Signature err, docs */ -Datastore.prototype.getCandidates = function (query) { +Datastore.prototype.getCandidates = function (query, callback) { var indexNames = Object.keys(this.indexes) + , self = this , usableQueryKeys; - // For a basic match - usableQueryKeys = []; - Object.keys(query).forEach(function (k) { - if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || util.isDate(query[k]) || query[k] === null) { - usableQueryKeys.push(k); + + async.waterfall([ + // STEP 1: get candidates list by checking indexes from most to least frequent usecase + function (cb) { + // For a basic match + usableQueryKeys = []; + Object.keys(query).forEach(function (k) { + if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || util.isDate(query[k]) || query[k] === null) { + usableQueryKeys.push(k); + } + }); + usableQueryKeys = _.intersection(usableQueryKeys, indexNames); + if (usableQueryKeys.length > 0) { + return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]])); } - }); - usableQueryKeys = _.intersection(usableQueryKeys, indexNames); - if (usableQueryKeys.length > 0) { - return this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]]); - } - // For a $in match - usableQueryKeys = []; - Object.keys(query).forEach(function (k) { - if (query[k] && query[k].hasOwnProperty('$in')) { - usableQueryKeys.push(k); + // For a $in match + usableQueryKeys = []; + Object.keys(query).forEach(function (k) { + if (query[k] && query[k].hasOwnProperty('$in')) { + usableQueryKeys.push(k); + } + }); + usableQueryKeys = _.intersection(usableQueryKeys, indexNames); + if (usableQueryKeys.length > 0) { + return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in)); } - }); - usableQueryKeys = _.intersection(usableQueryKeys, indexNames); - if (usableQueryKeys.length > 0) { - return this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in); - } - // For a comparison match - usableQueryKeys = []; - Object.keys(query).forEach(function (k) { - if (query[k] && (query[k].hasOwnProperty('$lt') || query[k].hasOwnProperty('$lte') || query[k].hasOwnProperty('$gt') || query[k].hasOwnProperty('$gte'))) { - usableQueryKeys.push(k); + // For a comparison match + usableQueryKeys = []; + Object.keys(query).forEach(function (k) { + if (query[k] && (query[k].hasOwnProperty('$lt') || query[k].hasOwnProperty('$lte') || query[k].hasOwnProperty('$gt') || query[k].hasOwnProperty('$gte'))) { + usableQueryKeys.push(k); + } + }); + usableQueryKeys = _.intersection(usableQueryKeys, indexNames); + if (usableQueryKeys.length > 0) { + return cb(null, self.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]])); } - }); - usableQueryKeys = _.intersection(usableQueryKeys, indexNames); - if (usableQueryKeys.length > 0) { - return this.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]]); - } - // By default, return all the DB data - return this.getAllData(); + // By default, return all the DB data + return cb(null, self.getAllData()); + } + // STEP 2: remove all expired documents + , function (docs) { + return callback(null, docs); + }]); }; @@ -548,48 +562,49 @@ Datastore.prototype._update = function (query, updateQuery, options, cb) { }); } , function () { // Perform the update - var modifiedDoc - , candidates = self.getCandidates(query) - , modifications = [] - ; + var modifiedDoc , modifications = []; - // Preparing update (if an error is thrown here neither the datafile nor - // the in-memory indexes are affected) - try { - for (i = 0; i < candidates.length; i += 1) { - if (model.match(candidates[i], query) && (multi || numReplaced === 0)) { - numReplaced += 1; - modifiedDoc = model.modify(candidates[i], updateQuery); - if (self.timestampData) { modifiedDoc.updatedAt = new Date(); } - modifications.push({ oldDoc: candidates[i], newDoc: modifiedDoc }); + self.getCandidates(query, function (err, candidates) { + if (err) { return callback(err); } + + // Preparing update (if an error is thrown here neither the datafile nor + // the in-memory indexes are affected) + try { + for (i = 0; i < candidates.length; i += 1) { + if (model.match(candidates[i], query) && (multi || numReplaced === 0)) { + numReplaced += 1; + modifiedDoc = model.modify(candidates[i], updateQuery); + if (self.timestampData) { modifiedDoc.updatedAt = new Date(); } + modifications.push({ oldDoc: candidates[i], newDoc: modifiedDoc }); + } } + } catch (err) { + return callback(err); } - } catch (err) { - return callback(err); - } - // Change the docs in memory - try { - self.updateIndexes(modifications); - } catch (err) { - return callback(err); - } - - // Update the datafile - var updatedDocs = _.pluck(modifications, 'newDoc'); - self.persistence.persistNewState(updatedDocs, function (err) { - if (err) { return callback(err); } - if (!options.returnUpdatedDocs) { - return callback(null, numReplaced); - } else { - var updatedDocsDC = []; - updatedDocs.forEach(function (doc) { updatedDocsDC.push(model.deepCopy(doc)); }); - return callback(null, numReplaced, updatedDocsDC); + // Change the docs in memory + try { + self.updateIndexes(modifications); + } catch (err) { + return callback(err); } + + // Update the datafile + var updatedDocs = _.pluck(modifications, 'newDoc'); + self.persistence.persistNewState(updatedDocs, function (err) { + if (err) { return callback(err); } + if (!options.returnUpdatedDocs) { + return callback(null, numReplaced); + } else { + var updatedDocsDC = []; + updatedDocs.forEach(function (doc) { updatedDocsDC.push(model.deepCopy(doc)); }); + return callback(null, numReplaced, updatedDocsDC); + } + }); }); - } - ]); + }]); }; + Datastore.prototype.update = function () { this.executor.push({ this: this, fn: this._update, arguments: arguments }); }; @@ -607,32 +622,33 @@ Datastore.prototype.update = function () { */ Datastore.prototype._remove = function (query, options, cb) { var callback - , self = this - , numRemoved = 0 - , multi - , removedDocs = [] - , candidates = this.getCandidates(query) + , self = this, numRemoved = 0, removedDocs = [], multi ; if (typeof options === 'function') { cb = options; options = {}; } callback = cb || function () {}; multi = options.multi !== undefined ? options.multi : false; - try { - candidates.forEach(function (d) { - if (model.match(d, query) && (multi || numRemoved === 0)) { - numRemoved += 1; - removedDocs.push({ $$deleted: true, _id: d._id }); - self.removeFromIndexes(d); - } - }); - } catch (err) { return callback(err); } - - self.persistence.persistNewState(removedDocs, function (err) { + this.getCandidates(query, function (err, candidates) { if (err) { return callback(err); } - return callback(null, numRemoved); + + try { + candidates.forEach(function (d) { + if (model.match(d, query) && (multi || numRemoved === 0)) { + numRemoved += 1; + removedDocs.push({ $$deleted: true, _id: d._id }); + self.removeFromIndexes(d); + } + }); + } catch (err) { return callback(err); } + + self.persistence.persistNewState(removedDocs, function (err) { + if (err) { return callback(err); } + return callback(null, numRemoved); + }); }); }; + Datastore.prototype.remove = function () { this.executor.push({ this: this, fn: this._remove, arguments: arguments }); }; diff --git a/lib/indexes.js b/lib/indexes.js index 09952fb..2b43a6b 100755 --- a/lib/indexes.js +++ b/lib/indexes.js @@ -32,11 +32,13 @@ function projectForUnique (elt) { * @param {String} options.fieldName On which field should the index apply (can use dot notation to index on sub fields) * @param {Boolean} options.unique Optional, enforce a unique constraint (default: false) * @param {Boolean} options.sparse Optional, allow a sparse index (we can have documents for which fieldName is undefined) (default: false) + * @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index */ function Index (options) { this.fieldName = options.fieldName; this.unique = options.unique || false; this.sparse = options.sparse || false; + if (options.expireAfterSeconds !== undefined) { this.expireAfterSeconds = options.expireAfterSeconds; } this.treeOptions = { unique: this.unique, compareKeys: model.compareThings, checkValueEquality: checkValueEquality }; @@ -88,12 +90,12 @@ Index.prototype.insert = function (doc) { break; } } - + if (error) { for (i = 0; i < failingI; i += 1) { this.tree.delete(keys[i], doc); } - + throw error; } } diff --git a/test/cursor.test.js b/test/cursor.test.js index f5daca5..95d4588 100755 --- a/test/cursor.test.js +++ b/test/cursor.test.js @@ -97,7 +97,7 @@ describe('Cursor', function () { } ], done); }); - + it('With an empty collection', function (done) { async.waterfall([ function (cb) { @@ -113,7 +113,7 @@ describe('Cursor', function () { } ], done); }); - + it('With a limit', function (done) { var cursor = new Cursor(d); cursor.limit(3); @@ -289,7 +289,7 @@ describe('Cursor', function () { }); it('Using a limit higher than total number of docs shouldnt cause an error', function (done) { - var i; + var i; async.waterfall([ function (cb) { var cursor = new Cursor(d); @@ -308,7 +308,7 @@ describe('Cursor', function () { }); it('Using limit and skip with sort', function (done) { - var i; + var i; async.waterfall([ function (cb) { var cursor = new Cursor(d); diff --git a/test/db.test.js b/test/db.test.js index cb3a74c..12db34e 100755 --- a/test/db.test.js +++ b/test/db.test.js @@ -432,16 +432,17 @@ describe('Database', function () { d.insert({ tf: 6 }, function () { d.insert({ tf: 4, an: 'other' }, function (err, _doc2) { d.insert({ tf: 9 }, function () { - var data = d.getCandidates({ r: 6, tf: 4 }) - , doc1 = _.find(data, function (d) { return d._id === _doc1._id; }) - , doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) - ; + d.getCandidates({ r: 6, tf: 4 }, function (err, data) { + var doc1 = _.find(data, function (d) { return d._id === _doc1._id; }) + , doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) + ; - data.length.should.equal(2); - assert.deepEqual(doc1, { _id: doc1._id, tf: 4 }); - assert.deepEqual(doc2, { _id: doc2._id, tf: 4, an: 'other' }); + data.length.should.equal(2); + assert.deepEqual(doc1, { _id: doc1._id, tf: 4 }); + assert.deepEqual(doc2, { _id: doc2._id, tf: 4, an: 'other' }); - done(); + done(); + }); }); }); }); @@ -455,16 +456,17 @@ describe('Database', function () { d.insert({ tf: 6 }, function (err, _doc1) { d.insert({ tf: 4, an: 'other' }, function (err) { d.insert({ tf: 9 }, function (err, _doc2) { - var data = d.getCandidates({ r: 6, tf: { $in: [6, 9, 5] } }) - , doc1 = _.find(data, function (d) { return d._id === _doc1._id; }) - , doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) - ; + d.getCandidates({ r: 6, tf: { $in: [6, 9, 5] } }, function (err, data) { + var doc1 = _.find(data, function (d) { return d._id === _doc1._id; }) + , doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) + ; - data.length.should.equal(2); - assert.deepEqual(doc1, { _id: doc1._id, tf: 6 }); - assert.deepEqual(doc2, { _id: doc2._id, tf: 9 }); + data.length.should.equal(2); + assert.deepEqual(doc1, { _id: doc1._id, tf: 6 }); + assert.deepEqual(doc2, { _id: doc2._id, tf: 9 }); - done(); + done(); + }); }); }); }); @@ -478,20 +480,21 @@ describe('Database', function () { d.insert({ tf: 6 }, function (err, _doc2) { d.insert({ tf: 4, an: 'other' }, function (err, _doc3) { d.insert({ tf: 9 }, function (err, _doc4) { - var data = d.getCandidates({ r: 6, notf: { $in: [6, 9, 5] } }) - , doc1 = _.find(data, function (d) { return d._id === _doc1._id; }) - , doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) - , doc3 = _.find(data, function (d) { return d._id === _doc3._id; }) - , doc4 = _.find(data, function (d) { return d._id === _doc4._id; }) - ; - - data.length.should.equal(4); - assert.deepEqual(doc1, { _id: doc1._id, tf: 4 }); - assert.deepEqual(doc2, { _id: doc2._id, tf: 6 }); - assert.deepEqual(doc3, { _id: doc3._id, tf: 4, an: 'other' }); - assert.deepEqual(doc4, { _id: doc4._id, tf: 9 }); + d.getCandidates({ r: 6, notf: { $in: [6, 9, 5] } }, function (err, data) { + var doc1 = _.find(data, function (d) { return d._id === _doc1._id; }) + , doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) + , doc3 = _.find(data, function (d) { return d._id === _doc3._id; }) + , doc4 = _.find(data, function (d) { return d._id === _doc4._id; }) + ; + + data.length.should.equal(4); + assert.deepEqual(doc1, { _id: doc1._id, tf: 4 }); + assert.deepEqual(doc2, { _id: doc2._id, tf: 6 }); + assert.deepEqual(doc3, { _id: doc3._id, tf: 4, an: 'other' }); + assert.deepEqual(doc4, { _id: doc4._id, tf: 9 }); - done(); + done(); + }); }); }); }); @@ -505,16 +508,17 @@ describe('Database', function () { d.insert({ tf: 6 }, function (err, _doc2) { d.insert({ tf: 4, an: 'other' }, function (err, _doc3) { d.insert({ tf: 9 }, function (err, _doc4) { - var data = d.getCandidates({ r: 6, tf: { $lte: 9, $gte: 6 } }) - , doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) - , doc4 = _.find(data, function (d) { return d._id === _doc4._id; }) - ; + d.getCandidates({ r: 6, tf: { $lte: 9, $gte: 6 } }, function (err, data) { + var doc2 = _.find(data, function (d) { return d._id === _doc2._id; }) + , doc4 = _.find(data, function (d) { return d._id === _doc4._id; }) + ; - data.length.should.equal(2); - assert.deepEqual(doc2, { _id: doc2._id, tf: 6 }); - assert.deepEqual(doc4, { _id: doc4._id, tf: 9 }); + data.length.should.equal(2); + assert.deepEqual(doc2, { _id: doc2._id, tf: 6 }); + assert.deepEqual(doc4, { _id: doc4._id, tf: 9 }); - done(); + done(); + }); }); }); }); @@ -2649,9 +2653,10 @@ describe('Database', function () { it('Results of getMatching should never contain duplicates', function (done) { d.ensureIndex({ fieldName: 'bad' }); d.insert({ bad: ['a', 'b'] }, function () { - var res = d.getCandidates({ bad: { $in: ['a', 'b'] } }); - res.length.should.equal(1); - done(); + d.getCandidates({ bad: { $in: ['a', 'b'] } }, function (err, res) { + res.length.should.equal(1); + done(); + }); }); });