getCandidates is now async

pull/2/head
Louis Chatriot 9 years ago
parent 65024439c6
commit 645d87504d
  1. 111
      lib/cursor.js
  2. 188
      lib/datastore.js
  3. 6
      lib/indexes.js
  4. 8
      test/cursor.test.js
  5. 85
      test/db.test.js

@ -103,76 +103,83 @@ Cursor.prototype.project = function (candidates) {
*
* @param {Function} callback - Signature: err, results
*/
Cursor.prototype._exec = function(callback) {
var candidates = this.db.getCandidates(this.query)
, res = [], added = 0, skipped = 0, self = this
Cursor.prototype._exec = function(_callback) {
var res = [], added = 0, skipped = 0, self = this
, error = null
, i, keys, key
;
try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], this.query)) {
// If a sort is defined, wait for the results to be sorted before applying limit and skip
if (!this._sort) {
if (this._skip && this._skip > skipped) {
skipped += 1;
function callback (error, res) {
if (self.execFn) {
return self.execFn(error, res, _callback);
} else {
return _callback(error, res);
}
}
this.db.getCandidates(this.query, function (err, candidates) {
if (err) { return callback(err); }
try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], self.query)) {
// If a sort is defined, wait for the results to be sorted before applying limit and skip
if (!self._sort) {
if (self._skip && self._skip > skipped) {
skipped += 1;
} else {
res.push(candidates[i]);
added += 1;
if (self._limit && self._limit <= added) { break; }
}
} else {
res.push(candidates[i]);
added += 1;
if (this._limit && this._limit <= added) { break; }
}
} else {
res.push(candidates[i]);
}
}
} catch (err) {
return callback(err);
}
} catch (err) {
return callback(err);
}
// Apply all sorts
if (this._sort) {
keys = Object.keys(this._sort);
// Apply all sorts
if (self._sort) {
keys = Object.keys(self._sort);
// Sorting
var criteria = [];
for (i = 0; i < keys.length; i++) {
key = keys[i];
criteria.push({ key: key, direction: self._sort[key] });
}
res.sort(function(a, b) {
var criterion, compare, i;
for (i = 0; i < criteria.length; i++) {
criterion = criteria[i];
compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), self.db.compareStrings);
if (compare !== 0) {
return compare;
}
// Sorting
var criteria = [];
for (i = 0; i < keys.length; i++) {
key = keys[i];
criteria.push({ key: key, direction: self._sort[key] });
}
return 0;
});
res.sort(function(a, b) {
var criterion, compare, i;
for (i = 0; i < criteria.length; i++) {
criterion = criteria[i];
compare = criterion.direction * model.compareThings(model.getDotValue(a, criterion.key), model.getDotValue(b, criterion.key), self.db.compareStrings);
if (compare !== 0) {
return compare;
}
}
return 0;
});
// Applying limit and skip
var limit = this._limit || res.length
, skip = this._skip || 0;
// Applying limit and skip
var limit = self._limit || res.length
, skip = self._skip || 0;
res = res.slice(skip, skip + limit);
}
res = res.slice(skip, skip + limit);
}
// Apply projection
try {
res = this.project(res);
} catch (e) {
error = e;
res = undefined;
}
// Apply projection
try {
res = self.project(res);
} catch (e) {
error = e;
res = undefined;
}
if (this.execFn) {
return this.execFn(error, res, callback);
} else {
return callback(error, res);
}
});
};
Cursor.prototype.exec = function () {

@ -115,6 +115,7 @@ Datastore.prototype.resetIndexes = function (newData) {
* @param {String} options.fieldName
* @param {Boolean} options.unique
* @param {Boolean} options.sparse
* @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index
* @param {Function} cb Optional callback, signature: err
*/
Datastore.prototype.ensureIndex = function (options, cb) {
@ -243,50 +244,63 @@ Datastore.prototype.updateIndexes = function (oldDoc, newDoc) {
* One way to make it better would be to enable the use of multiple indexes if the first usable index
* returns too much data. I may do it in the future.
*
* TODO: needs to be moved to the Cursor module
* Returned candidates will be scanned to find and remove all expired documents
*
* @param {Query} query
* @param {Function} callback Signature err, docs
*/
Datastore.prototype.getCandidates = function (query) {
Datastore.prototype.getCandidates = function (query, callback) {
var indexNames = Object.keys(this.indexes)
, self = this
, usableQueryKeys;
// For a basic match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || util.isDate(query[k]) || query[k] === null) {
usableQueryKeys.push(k);
async.waterfall([
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
function (cb) {
// For a basic match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || util.isDate(query[k]) || query[k] === null) {
usableQueryKeys.push(k);
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]]));
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]]);
}
// For a $in match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && query[k].hasOwnProperty('$in')) {
usableQueryKeys.push(k);
// For a $in match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && query[k].hasOwnProperty('$in')) {
usableQueryKeys.push(k);
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in));
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in);
}
// For a comparison match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && (query[k].hasOwnProperty('$lt') || query[k].hasOwnProperty('$lte') || query[k].hasOwnProperty('$gt') || query[k].hasOwnProperty('$gte'))) {
usableQueryKeys.push(k);
// For a comparison match
usableQueryKeys = [];
Object.keys(query).forEach(function (k) {
if (query[k] && (query[k].hasOwnProperty('$lt') || query[k].hasOwnProperty('$lte') || query[k].hasOwnProperty('$gt') || query[k].hasOwnProperty('$gte'))) {
usableQueryKeys.push(k);
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return cb(null, self.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]]));
}
});
usableQueryKeys = _.intersection(usableQueryKeys, indexNames);
if (usableQueryKeys.length > 0) {
return this.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]]);
}
// By default, return all the DB data
return this.getAllData();
// By default, return all the DB data
return cb(null, self.getAllData());
}
// STEP 2: remove all expired documents
, function (docs) {
return callback(null, docs);
}]);
};
@ -548,48 +562,49 @@ Datastore.prototype._update = function (query, updateQuery, options, cb) {
});
}
, function () { // Perform the update
var modifiedDoc
, candidates = self.getCandidates(query)
, modifications = []
;
var modifiedDoc , modifications = [];
// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], query) && (multi || numReplaced === 0)) {
numReplaced += 1;
modifiedDoc = model.modify(candidates[i], updateQuery);
if (self.timestampData) { modifiedDoc.updatedAt = new Date(); }
modifications.push({ oldDoc: candidates[i], newDoc: modifiedDoc });
self.getCandidates(query, function (err, candidates) {
if (err) { return callback(err); }
// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
try {
for (i = 0; i < candidates.length; i += 1) {
if (model.match(candidates[i], query) && (multi || numReplaced === 0)) {
numReplaced += 1;
modifiedDoc = model.modify(candidates[i], updateQuery);
if (self.timestampData) { modifiedDoc.updatedAt = new Date(); }
modifications.push({ oldDoc: candidates[i], newDoc: modifiedDoc });
}
}
} catch (err) {
return callback(err);
}
} catch (err) {
return callback(err);
}
// Change the docs in memory
try {
self.updateIndexes(modifications);
} catch (err) {
return callback(err);
}
// Update the datafile
var updatedDocs = _.pluck(modifications, 'newDoc');
self.persistence.persistNewState(updatedDocs, function (err) {
if (err) { return callback(err); }
if (!options.returnUpdatedDocs) {
return callback(null, numReplaced);
} else {
var updatedDocsDC = [];
updatedDocs.forEach(function (doc) { updatedDocsDC.push(model.deepCopy(doc)); });
return callback(null, numReplaced, updatedDocsDC);
// Change the docs in memory
try {
self.updateIndexes(modifications);
} catch (err) {
return callback(err);
}
// Update the datafile
var updatedDocs = _.pluck(modifications, 'newDoc');
self.persistence.persistNewState(updatedDocs, function (err) {
if (err) { return callback(err); }
if (!options.returnUpdatedDocs) {
return callback(null, numReplaced);
} else {
var updatedDocsDC = [];
updatedDocs.forEach(function (doc) { updatedDocsDC.push(model.deepCopy(doc)); });
return callback(null, numReplaced, updatedDocsDC);
}
});
});
}
]);
}]);
};
Datastore.prototype.update = function () {
this.executor.push({ this: this, fn: this._update, arguments: arguments });
};
@ -607,32 +622,33 @@ Datastore.prototype.update = function () {
*/
Datastore.prototype._remove = function (query, options, cb) {
var callback
, self = this
, numRemoved = 0
, multi
, removedDocs = []
, candidates = this.getCandidates(query)
, self = this, numRemoved = 0, removedDocs = [], multi
;
if (typeof options === 'function') { cb = options; options = {}; }
callback = cb || function () {};
multi = options.multi !== undefined ? options.multi : false;
try {
candidates.forEach(function (d) {
if (model.match(d, query) && (multi || numRemoved === 0)) {
numRemoved += 1;
removedDocs.push({ $$deleted: true, _id: d._id });
self.removeFromIndexes(d);
}
});
} catch (err) { return callback(err); }
self.persistence.persistNewState(removedDocs, function (err) {
this.getCandidates(query, function (err, candidates) {
if (err) { return callback(err); }
return callback(null, numRemoved);
try {
candidates.forEach(function (d) {
if (model.match(d, query) && (multi || numRemoved === 0)) {
numRemoved += 1;
removedDocs.push({ $$deleted: true, _id: d._id });
self.removeFromIndexes(d);
}
});
} catch (err) { return callback(err); }
self.persistence.persistNewState(removedDocs, function (err) {
if (err) { return callback(err); }
return callback(null, numRemoved);
});
});
};
Datastore.prototype.remove = function () {
this.executor.push({ this: this, fn: this._remove, arguments: arguments });
};

@ -32,11 +32,13 @@ function projectForUnique (elt) {
* @param {String} options.fieldName On which field should the index apply (can use dot notation to index on sub fields)
* @param {Boolean} options.unique Optional, enforce a unique constraint (default: false)
* @param {Boolean} options.sparse Optional, allow a sparse index (we can have documents for which fieldName is undefined) (default: false)
* @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index
*/
function Index (options) {
this.fieldName = options.fieldName;
this.unique = options.unique || false;
this.sparse = options.sparse || false;
if (options.expireAfterSeconds !== undefined) { this.expireAfterSeconds = options.expireAfterSeconds; }
this.treeOptions = { unique: this.unique, compareKeys: model.compareThings, checkValueEquality: checkValueEquality };
@ -88,12 +90,12 @@ Index.prototype.insert = function (doc) {
break;
}
}
if (error) {
for (i = 0; i < failingI; i += 1) {
this.tree.delete(keys[i], doc);
}
throw error;
}
}

@ -97,7 +97,7 @@ describe('Cursor', function () {
}
], done);
});
it('With an empty collection', function (done) {
async.waterfall([
function (cb) {
@ -113,7 +113,7 @@ describe('Cursor', function () {
}
], done);
});
it('With a limit', function (done) {
var cursor = new Cursor(d);
cursor.limit(3);
@ -289,7 +289,7 @@ describe('Cursor', function () {
});
it('Using a limit higher than total number of docs shouldnt cause an error', function (done) {
var i;
var i;
async.waterfall([
function (cb) {
var cursor = new Cursor(d);
@ -308,7 +308,7 @@ describe('Cursor', function () {
});
it('Using limit and skip with sort', function (done) {
var i;
var i;
async.waterfall([
function (cb) {
var cursor = new Cursor(d);

@ -432,16 +432,17 @@ describe('Database', function () {
d.insert({ tf: 6 }, function () {
d.insert({ tf: 4, an: 'other' }, function (err, _doc2) {
d.insert({ tf: 9 }, function () {
var data = d.getCandidates({ r: 6, tf: 4 })
, doc1 = _.find(data, function (d) { return d._id === _doc1._id; })
, doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
;
d.getCandidates({ r: 6, tf: 4 }, function (err, data) {
var doc1 = _.find(data, function (d) { return d._id === _doc1._id; })
, doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
;
data.length.should.equal(2);
assert.deepEqual(doc1, { _id: doc1._id, tf: 4 });
assert.deepEqual(doc2, { _id: doc2._id, tf: 4, an: 'other' });
data.length.should.equal(2);
assert.deepEqual(doc1, { _id: doc1._id, tf: 4 });
assert.deepEqual(doc2, { _id: doc2._id, tf: 4, an: 'other' });
done();
done();
});
});
});
});
@ -455,16 +456,17 @@ describe('Database', function () {
d.insert({ tf: 6 }, function (err, _doc1) {
d.insert({ tf: 4, an: 'other' }, function (err) {
d.insert({ tf: 9 }, function (err, _doc2) {
var data = d.getCandidates({ r: 6, tf: { $in: [6, 9, 5] } })
, doc1 = _.find(data, function (d) { return d._id === _doc1._id; })
, doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
;
d.getCandidates({ r: 6, tf: { $in: [6, 9, 5] } }, function (err, data) {
var doc1 = _.find(data, function (d) { return d._id === _doc1._id; })
, doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
;
data.length.should.equal(2);
assert.deepEqual(doc1, { _id: doc1._id, tf: 6 });
assert.deepEqual(doc2, { _id: doc2._id, tf: 9 });
data.length.should.equal(2);
assert.deepEqual(doc1, { _id: doc1._id, tf: 6 });
assert.deepEqual(doc2, { _id: doc2._id, tf: 9 });
done();
done();
});
});
});
});
@ -478,20 +480,21 @@ describe('Database', function () {
d.insert({ tf: 6 }, function (err, _doc2) {
d.insert({ tf: 4, an: 'other' }, function (err, _doc3) {
d.insert({ tf: 9 }, function (err, _doc4) {
var data = d.getCandidates({ r: 6, notf: { $in: [6, 9, 5] } })
, doc1 = _.find(data, function (d) { return d._id === _doc1._id; })
, doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
, doc3 = _.find(data, function (d) { return d._id === _doc3._id; })
, doc4 = _.find(data, function (d) { return d._id === _doc4._id; })
;
data.length.should.equal(4);
assert.deepEqual(doc1, { _id: doc1._id, tf: 4 });
assert.deepEqual(doc2, { _id: doc2._id, tf: 6 });
assert.deepEqual(doc3, { _id: doc3._id, tf: 4, an: 'other' });
assert.deepEqual(doc4, { _id: doc4._id, tf: 9 });
d.getCandidates({ r: 6, notf: { $in: [6, 9, 5] } }, function (err, data) {
var doc1 = _.find(data, function (d) { return d._id === _doc1._id; })
, doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
, doc3 = _.find(data, function (d) { return d._id === _doc3._id; })
, doc4 = _.find(data, function (d) { return d._id === _doc4._id; })
;
data.length.should.equal(4);
assert.deepEqual(doc1, { _id: doc1._id, tf: 4 });
assert.deepEqual(doc2, { _id: doc2._id, tf: 6 });
assert.deepEqual(doc3, { _id: doc3._id, tf: 4, an: 'other' });
assert.deepEqual(doc4, { _id: doc4._id, tf: 9 });
done();
done();
});
});
});
});
@ -505,16 +508,17 @@ describe('Database', function () {
d.insert({ tf: 6 }, function (err, _doc2) {
d.insert({ tf: 4, an: 'other' }, function (err, _doc3) {
d.insert({ tf: 9 }, function (err, _doc4) {
var data = d.getCandidates({ r: 6, tf: { $lte: 9, $gte: 6 } })
, doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
, doc4 = _.find(data, function (d) { return d._id === _doc4._id; })
;
d.getCandidates({ r: 6, tf: { $lte: 9, $gte: 6 } }, function (err, data) {
var doc2 = _.find(data, function (d) { return d._id === _doc2._id; })
, doc4 = _.find(data, function (d) { return d._id === _doc4._id; })
;
data.length.should.equal(2);
assert.deepEqual(doc2, { _id: doc2._id, tf: 6 });
assert.deepEqual(doc4, { _id: doc4._id, tf: 9 });
data.length.should.equal(2);
assert.deepEqual(doc2, { _id: doc2._id, tf: 6 });
assert.deepEqual(doc4, { _id: doc4._id, tf: 9 });
done();
done();
});
});
});
});
@ -2649,9 +2653,10 @@ describe('Database', function () {
it('Results of getMatching should never contain duplicates', function (done) {
d.ensureIndex({ fieldName: 'bad' });
d.insert({ bad: ['a', 'b'] }, function () {
var res = d.getCandidates({ bad: { $in: ['a', 'b'] } });
res.length.should.equal(1);
done();
d.getCandidates({ bad: { $in: ['a', 'b'] } }, function (err, res) {
res.length.should.equal(1);
done();
});
});
});

Loading…
Cancel
Save