var customUtils = require('./customUtils') , model = require('./model') , async = require('async') , Executor = require('./executor') , Index = require('./indexes') , util = require('util') , _ = require('underscore') , Persistence = require('./persistence') , Cursor = require('./cursor') ; /** * Create a new collection * @param {String} options.filename Optional, datastore will be in-memory only if not provided * @param {Boolean} options.timestampData Optional, defaults to false. If set to true, createdAt and updatedAt will be created and populated automatically (if not specified by user) * @param {Boolean} options.inMemoryOnly Optional, defaults to false * @param {String} options.nodeWebkitAppName Optional, specify the name of your NW app if you want options.filename to be relative to the directory where * Node Webkit stores application data such as cookies and local storage (the best place to store data in my opinion) * @param {Boolean} options.autoload Optional, defaults to false * @param {Function} options.onload Optional, if autoload is used this will be called after the load database with the error object as parameter. If you don't pass it the error will be thrown * @param {Function} options.afterSerialization/options.beforeDeserialization Optional, serialization hooks * @param {Number} options.corruptAlertThreshold Optional, threshold after which an alert is thrown if too much data is corrupt * @param {Function} options.compareStrings Optional, string comparison function that overrides default for sorting * * Event Emitter - Events * * compaction.done - Fired whenever a compaction operation was finished */ function Datastore (options) { var filename; // Retrocompatibility with v0.6 and before if (typeof options === 'string') { filename = options; this.inMemoryOnly = false; // Default } else { options = options || {}; filename = options.filename; this.inMemoryOnly = options.inMemoryOnly || false; this.autoload = options.autoload || false; this.timestampData = options.timestampData || false; } // Determine whether in memory or persistent if (!filename || typeof filename !== 'string' || filename.length === 0) { this.filename = null; this.inMemoryOnly = true; } else { this.filename = filename; } // String comparison function this.compareStrings = options.compareStrings; // Persistence handling this.persistence = new Persistence({ db: this, nodeWebkitAppName: options.nodeWebkitAppName , afterSerialization: options.afterSerialization , beforeDeserialization: options.beforeDeserialization , corruptAlertThreshold: options.corruptAlertThreshold }); // This new executor is ready if we don't use persistence // If we do, it will only be ready once loadDatabase is called this.executor = new Executor(); if (this.inMemoryOnly) { this.executor.ready = true; } // Indexed by field name, dot notation can be used // _id is always indexed and since _ids are generated randomly the underlying // binary is always well-balanced this.indexes = {}; this.indexes._id = new Index({ fieldName: '_id', unique: true }); this.ttlIndexes = {}; // Queue a load of the database right away and call the onload handler // By default (no onload handler), if there is an error there, no operation will be possible so warn the user by throwing an exception if (this.autoload) { this.loadDatabase(options.onload || function (err) { if (err) { throw err; } }); } } util.inherits(Datastore, require('events').EventEmitter); /** * Load the database from the datafile, and trigger the execution of buffered commands if any */ Datastore.prototype.loadDatabase = function () { this.executor.push({ this: this.persistence, fn: this.persistence.loadDatabase, arguments: arguments }, true); }; /** * Get an array of all the data in the database */ Datastore.prototype.getAllData = function () { return this.indexes._id.getAll(); }; /** * Reset all currently defined indexes */ Datastore.prototype.resetIndexes = function (newData) { var self = this; Object.keys(this.indexes).forEach(function (i) { self.indexes[i].reset(newData); }); }; /** * Ensure an index is kept for this field. Same parameters as lib/indexes * For now this function is synchronous, we need to test how much time it takes * We use an async API for consistency with the rest of the code * @param {String} options.fieldName * @param {Boolean} options.unique * @param {Boolean} options.sparse * @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index (only works on Date fields, not arrays of Date) * @param {Function} cb Optional callback, signature: err */ Datastore.prototype.ensureIndex = function (options, cb) { var err , callback = cb || function () {}; options = options || {}; if (!options.fieldName) { err = new Error("Cannot create an index without a fieldName"); err.missingFieldName = true; return callback(err); } if (this.indexes[options.fieldName]) { return callback(null); } this.indexes[options.fieldName] = new Index(options); if (options.expireAfterSeconds !== undefined) { this.ttlIndexes[options.fieldName] = options.expireAfterSeconds; } // With this implementation index creation is not necessary to ensure TTL but we stick with MongoDB's API here try { this.indexes[options.fieldName].insert(this.getAllData()); } catch (e) { delete this.indexes[options.fieldName]; return callback(e); } // We may want to force all options to be persisted including defaults, not just the ones passed the index creation function this.persistence.persistNewState([{ $$indexCreated: options }], function (err) { if (err) { return callback(err); } return callback(null); }); }; /** * Remove an index * @param {String} fieldName * @param {Function} cb Optional callback, signature: err */ Datastore.prototype.removeIndex = function (fieldName, cb) { var callback = cb || function () {}; delete this.indexes[fieldName]; this.persistence.persistNewState([{ $$indexRemoved: fieldName }], function (err) { if (err) { return callback(err); } return callback(null); }); }; /** * Add one or several document(s) to all indexes */ Datastore.prototype.addToIndexes = function (doc) { var i, failingIndex, error , keys = Object.keys(this.indexes) ; for (i = 0; i < keys.length; i += 1) { try { this.indexes[keys[i]].insert(doc); } catch (e) { failingIndex = i; error = e; break; } } // If an error happened, we need to rollback the insert on all other indexes if (error) { for (i = 0; i < failingIndex; i += 1) { this.indexes[keys[i]].remove(doc); } throw error; } }; /** * Remove one or several document(s) from all indexes */ Datastore.prototype.removeFromIndexes = function (doc) { var self = this; Object.keys(this.indexes).forEach(function (i) { self.indexes[i].remove(doc); }); }; /** * Update one or several documents in all indexes * To update multiple documents, oldDoc must be an array of { oldDoc, newDoc } pairs * If one update violates a constraint, all changes are rolled back */ Datastore.prototype.updateIndexes = function (oldDoc, newDoc) { var i, failingIndex, error , keys = Object.keys(this.indexes) ; for (i = 0; i < keys.length; i += 1) { try { this.indexes[keys[i]].update(oldDoc, newDoc); } catch (e) { failingIndex = i; error = e; break; } } // If an error happened, we need to rollback the update on all other indexes if (error) { for (i = 0; i < failingIndex; i += 1) { this.indexes[keys[i]].revertUpdate(oldDoc, newDoc); } throw error; } }; /** * Return the list of candidates for a given query * Crude implementation for now, we return the candidates given by the first usable index if any * We try the following query types, in this order: basic match, $in match, comparison match * One way to make it better would be to enable the use of multiple indexes if the first usable index * returns too much data. I may do it in the future. * * Returned candidates will be scanned to find and remove all expired documents * * @param {Query} query * @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations * @param {Function} callback Signature err, docs */ Datastore.prototype.getCandidates = function (query, dontExpireStaleDocs, callback) { var indexNames = Object.keys(this.indexes) , self = this , usableQueryKeys; if (typeof dontExpireStaleDocs === 'function') { callback = dontExpireStaleDocs; dontExpireStaleDocs = false; } async.waterfall([ // STEP 1: get candidates list by checking indexes from most to least frequent usecase function (cb) { // For a basic match usableQueryKeys = []; Object.keys(query).forEach(function (k) { if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || util.isDate(query[k]) || query[k] === null) { usableQueryKeys.push(k); } }); usableQueryKeys = _.intersection(usableQueryKeys, indexNames); if (usableQueryKeys.length > 0) { return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]])); } // For a $in match usableQueryKeys = []; Object.keys(query).forEach(function (k) { if (query[k] && query[k].hasOwnProperty('$in')) { usableQueryKeys.push(k); } }); usableQueryKeys = _.intersection(usableQueryKeys, indexNames); if (usableQueryKeys.length > 0) { return cb(null, self.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in)); } // For a comparison match usableQueryKeys = []; Object.keys(query).forEach(function (k) { if (query[k] && (query[k].hasOwnProperty('$lt') || query[k].hasOwnProperty('$lte') || query[k].hasOwnProperty('$gt') || query[k].hasOwnProperty('$gte'))) { usableQueryKeys.push(k); } }); usableQueryKeys = _.intersection(usableQueryKeys, indexNames); if (usableQueryKeys.length > 0) { return cb(null, self.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]])); } // By default, return all the DB data return cb(null, self.getAllData()); } // STEP 2: remove all expired documents , function (docs) { if (dontExpireStaleDocs) { return callback(null, docs); } var expiredDocsIds = [], validDocs = [], ttlIndexesFieldNames = Object.keys(self.ttlIndexes); docs.forEach(function (doc) { var valid = true; ttlIndexesFieldNames.forEach(function (i) { if (doc[i] !== undefined && util.isDate(doc[i]) && Date.now() > doc[i].getTime() + self.ttlIndexes[i] * 1000) { valid = false; } }); if (valid) { validDocs.push(doc); } else { expiredDocsIds.push(doc._id); } }); async.eachSeries(expiredDocsIds, function (_id, cb) { self._remove({ _id: _id }, {}, function (err) { if (err) { return callback(err); } return cb(); }); }, function (err) { return callback(null, validDocs); }); }]); }; /** * Insert a new document * @param {Function} cb Optional callback, signature: err, insertedDoc * * @api private Use Datastore.insert which has the same signature */ Datastore.prototype._insert = function (newDoc, cb) { var callback = cb || function () {} , preparedDoc ; try { preparedDoc = this.prepareDocumentForInsertion(newDoc) this._insertInCache(preparedDoc); } catch (e) { return callback(e); } this.persistence.persistNewState(util.isArray(preparedDoc) ? preparedDoc : [preparedDoc], function (err) { if (err) { return callback(err); } return callback(null, model.deepCopy(preparedDoc)); }); }; /** * Create a new _id that's not already in use */ Datastore.prototype.createNewId = function () { var tentativeId = customUtils.uid(16); // Try as many times as needed to get an unused _id. As explained in customUtils, the probability of this ever happening is extremely small, so this is O(1) if (this.indexes._id.getMatching(tentativeId).length > 0) { tentativeId = this.createNewId(); } return tentativeId; }; /** * Prepare a document (or array of documents) to be inserted in a database * Meaning adds _id and timestamps if necessary on a copy of newDoc to avoid any side effect on user input * @api private */ Datastore.prototype.prepareDocumentForInsertion = function (newDoc) { var preparedDoc, self = this; if (util.isArray(newDoc)) { preparedDoc = []; newDoc.forEach(function (doc) { preparedDoc.push(self.prepareDocumentForInsertion(doc)); }); } else { preparedDoc = model.deepCopy(newDoc); if (preparedDoc._id === undefined) { preparedDoc._id = this.createNewId(); } var now = new Date(); if (this.timestampData && preparedDoc.createdAt === undefined) { preparedDoc.createdAt = now; } if (this.timestampData && preparedDoc.updatedAt === undefined) { preparedDoc.updatedAt = now; } model.checkObject(preparedDoc); } return preparedDoc; }; /** * If newDoc is an array of documents, this will insert all documents in the cache * @api private */ Datastore.prototype._insertInCache = function (preparedDoc) { if (util.isArray(preparedDoc)) { this._insertMultipleDocsInCache(preparedDoc); } else { this.addToIndexes(preparedDoc); } }; /** * If one insertion fails (e.g. because of a unique constraint), roll back all previous * inserts and throws the error * @api private */ Datastore.prototype._insertMultipleDocsInCache = function (preparedDocs) { var i, failingI, error; for (i = 0; i < preparedDocs.length; i += 1) { try { this.addToIndexes(preparedDocs[i]); } catch (e) { error = e; failingI = i; break; } } if (error) { for (i = 0; i < failingI; i += 1) { this.removeFromIndexes(preparedDocs[i]); } throw error; } }; Datastore.prototype.insert = function () { this.executor.push({ this: this, fn: this._insert, arguments: arguments }); }; /** * Count all documents matching the query * @param {Object} query MongoDB-style query */ Datastore.prototype.count = function(query, callback) { var cursor = new Cursor(this, query, function(err, docs, callback) { if (err) { return callback(err); } return callback(null, docs.length); }); if (typeof callback === 'function') { cursor.exec(callback); } else { return cursor; } }; /** * Find all documents matching the query * If no callback is passed, we return the cursor so that user can limit, skip and finally exec * @param {Object} query MongoDB-style query * @param {Object} projection MongoDB-style projection */ Datastore.prototype.find = function (query, projection, callback) { switch (arguments.length) { case 1: projection = {}; // callback is undefined, will return a cursor break; case 2: if (typeof projection === 'function') { callback = projection; projection = {}; } // If not assume projection is an object and callback undefined break; } var cursor = new Cursor(this, query, function(err, docs, callback) { var res = [], i; if (err) { return callback(err); } for (i = 0; i < docs.length; i += 1) { res.push(model.deepCopy(docs[i])); } return callback(null, res); }); cursor.projection(projection); if (typeof callback === 'function') { cursor.exec(callback); } else { return cursor; } }; /** * Find one document matching the query * @param {Object} query MongoDB-style query * @param {Object} projection MongoDB-style projection */ Datastore.prototype.findOne = function (query, projection, callback) { switch (arguments.length) { case 1: projection = {}; // callback is undefined, will return a cursor break; case 2: if (typeof projection === 'function') { callback = projection; projection = {}; } // If not assume projection is an object and callback undefined break; } var cursor = new Cursor(this, query, function(err, docs, callback) { if (err) { return callback(err); } if (docs.length === 1) { return callback(null, model.deepCopy(docs[0])); } else { return callback(null, null); } }); cursor.projection(projection).limit(1); if (typeof callback === 'function') { cursor.exec(callback); } else { return cursor; } }; /** * Update all docs matching query * @param {Object} query * @param {Object} updateQuery * @param {Object} options Optional options * options.multi If true, can update multiple documents (defaults to false) * options.upsert If true, document is inserted if the query doesn't match anything * options.returnUpdatedDocs Defaults to false, if true return as third argument the array of updated matched documents (even if no change actually took place) * @param {Function} cb Optional callback, signature: err, numReplaced, upsert (set to true if the update was in fact an upsert) * * @api private Use Datastore.update which has the same signature */ Datastore.prototype._update = function (query, updateQuery, options, cb) { var callback , self = this , numReplaced = 0 , multi, upsert , i ; if (typeof options === 'function') { cb = options; options = {}; } callback = cb || function () {}; multi = options.multi !== undefined ? options.multi : false; upsert = options.upsert !== undefined ? options.upsert : false; async.waterfall([ function (cb) { // If upsert option is set, check whether we need to insert the doc if (!upsert) { return cb(); } // Need to use an internal function not tied to the executor to avoid deadlock var cursor = new Cursor(self, query); cursor.limit(1)._exec(function (err, docs) { if (err) { return callback(err); } if (docs.length === 1) { return cb(); } else { var toBeInserted; try { model.checkObject(updateQuery); // updateQuery is a simple object with no modifier, use it as the document to insert toBeInserted = updateQuery; } catch (e) { // updateQuery contains modifiers, use the find query as the base, // strip it from all operators and update it according to updateQuery try { toBeInserted = model.modify(model.deepCopy(query, true), updateQuery); } catch (err) { return callback(err); } } return self._insert(toBeInserted, function (err, newDoc) { if (err) { return callback(err); } return callback(null, 1, newDoc); }); } }); } , function () { // Perform the update var modifiedDoc , modifications = []; self.getCandidates(query, function (err, candidates) { if (err) { return callback(err); } // Preparing update (if an error is thrown here neither the datafile nor // the in-memory indexes are affected) try { for (i = 0; i < candidates.length; i += 1) { if (model.match(candidates[i], query) && (multi || numReplaced === 0)) { numReplaced += 1; modifiedDoc = model.modify(candidates[i], updateQuery); if (self.timestampData) { modifiedDoc.updatedAt = new Date(); } modifications.push({ oldDoc: candidates[i], newDoc: modifiedDoc }); } } } catch (err) { return callback(err); } // Change the docs in memory try { self.updateIndexes(modifications); } catch (err) { return callback(err); } // Update the datafile var updatedDocs = _.pluck(modifications, 'newDoc'); self.persistence.persistNewState(updatedDocs, function (err) { if (err) { return callback(err); } if (!options.returnUpdatedDocs) { return callback(null, numReplaced); } else { var updatedDocsDC = []; updatedDocs.forEach(function (doc) { updatedDocsDC.push(model.deepCopy(doc)); }); return callback(null, numReplaced, updatedDocsDC); } }); }); }]); }; Datastore.prototype.update = function () { this.executor.push({ this: this, fn: this._update, arguments: arguments }); }; /** * Remove all docs matching the query * For now very naive implementation (similar to update) * @param {Object} query * @param {Object} options Optional options * options.multi If true, can update multiple documents (defaults to false) * @param {Function} cb Optional callback, signature: err, numRemoved * * @api private Use Datastore.remove which has the same signature */ Datastore.prototype._remove = function (query, options, cb) { var callback , self = this, numRemoved = 0, removedDocs = [], multi ; if (typeof options === 'function') { cb = options; options = {}; } callback = cb || function () {}; multi = options.multi !== undefined ? options.multi : false; this.getCandidates(query, true, function (err, candidates) { if (err) { return callback(err); } try { candidates.forEach(function (d) { if (model.match(d, query) && (multi || numRemoved === 0)) { numRemoved += 1; removedDocs.push({ $$deleted: true, _id: d._id }); self.removeFromIndexes(d); } }); } catch (err) { return callback(err); } self.persistence.persistNewState(removedDocs, function (err) { if (err) { return callback(err); } return callback(null, numRemoved); }); }); }; Datastore.prototype.remove = function () { this.executor.push({ this: this, fn: this._remove, arguments: arguments }); }; module.exports = Datastore;