const { EventEmitter } = require('events') const async = require('async') const Cursor = require('./cursor.js') const customUtils = require('./customUtils.js') const Executor = require('./executor.js') const Index = require('./indexes.js') const model = require('./model.js') const Persistence = require('./persistence.js') const { isDate } = require('./utils.js') class Datastore extends EventEmitter { /** * Create a new collection * @param {String} [options.filename] Optional, datastore will be in-memory only if not provided * @param {Boolean} [options.timestampData] Optional, defaults to false. If set to true, createdAt and updatedAt will be created and populated automatically (if not specified by user) * @param {Boolean} [options.inMemoryOnly] Optional, defaults to false * @param {String} [options.nodeWebkitAppName] Optional, specify the name of your NW app if you want options.filename to be relative to the directory where * Node Webkit stores application data such as cookies and local storage (the best place to store data in my opinion) * @param {Boolean} [options.autoload] Optional, defaults to false * @param {Function} [options.onload] Optional, if autoload is used this will be called after the load database with the error object as parameter. If you don't pass it the error will be thrown * @param {Function} [options.beforeDeserialization] Optional, serialization hooks * @param {Function} [options.afterSerialization] Optional, serialization hooks * @param {Number} [options.corruptAlertThreshold] Optional, threshold after which an alert is thrown if too much data is corrupt * @param {Function} [options.compareStrings] Optional, string comparison function that overrides default for sorting * * Event Emitter - Events * * compaction.done - Fired whenever a compaction operation was finished */ constructor (options) { super() let filename // Retrocompatibility with v0.6 and before if (typeof options === 'string') { filename = options this.inMemoryOnly = false // Default } else { options = options || {} filename = options.filename this.inMemoryOnly = options.inMemoryOnly || false this.autoload = options.autoload || false this.timestampData = options.timestampData || false } // Determine whether in memory or persistent if (!filename || typeof filename !== 'string' || filename.length === 0) { this.filename = null this.inMemoryOnly = true } else { this.filename = filename } // String comparison function this.compareStrings = options.compareStrings // Persistence handling this.persistence = new Persistence({ db: this, nodeWebkitAppName: options.nodeWebkitAppName, afterSerialization: options.afterSerialization, beforeDeserialization: options.beforeDeserialization, corruptAlertThreshold: options.corruptAlertThreshold }) // This new executor is ready if we don't use persistence // If we do, it will only be ready once loadDatabase is called this.executor = new Executor() if (this.inMemoryOnly) this.executor.ready = true // Indexed by field name, dot notation can be used // _id is always indexed and since _ids are generated randomly the underlying // binary is always well-balanced this.indexes = {} this.indexes._id = new Index({ fieldName: '_id', unique: true }) this.ttlIndexes = {} // Queue a load of the database right away and call the onload handler // By default (no onload handler), if there is an error there, no operation will be possible so warn the user by throwing an exception if (this.autoload) { this.loadDatabase(options.onload || (err => { if (err) throw err })) } } /** * Load the database from the datafile, and trigger the execution of buffered commands if any */ loadDatabase () { this.executor.push({ this: this.persistence, fn: this.persistence.loadDatabase, arguments: arguments }, true) } /** * Get an array of all the data in the database */ getAllData () { return this.indexes._id.getAll() } /** * Reset all currently defined indexes */ resetIndexes (newData) { for (const index of Object.values(this.indexes)) { index.reset(newData) } } /** * Ensure an index is kept for this field. Same parameters as lib/indexes * For now this function is synchronous, we need to test how much time it takes * We use an async API for consistency with the rest of the code * @param {Object} options * @param {String} options.fieldName * @param {Boolean} options.unique * @param {Boolean} options.sparse * @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index (only works on Date fields, not arrays of Date) * @param {Function} callback Optional callback, signature: err */ ensureIndex (options = {}, callback = () => {}) { if (!options.fieldName) { const err = new Error('Cannot create an index without a fieldName') err.missingFieldName = true return callback(err) } if (this.indexes[options.fieldName]) return callback(null) this.indexes[options.fieldName] = new Index(options) if (options.expireAfterSeconds !== undefined) this.ttlIndexes[options.fieldName] = options.expireAfterSeconds // With this implementation index creation is not necessary to ensure TTL but we stick with MongoDB's API here try { this.indexes[options.fieldName].insert(this.getAllData()) } catch (e) { delete this.indexes[options.fieldName] return callback(e) } // We may want to force all options to be persisted including defaults, not just the ones passed the index creation function this.persistence.persistNewState([{ $$indexCreated: options }], err => { if (err) return callback(err) return callback(null) }) } /** * Remove an index * @param {String} fieldName * @param {Function} callback Optional callback, signature: err */ removeIndex (fieldName, callback = () => {}) { delete this.indexes[fieldName] this.persistence.persistNewState([{ $$indexRemoved: fieldName }], err => { if (err) return callback(err) return callback(null) }) } /** * Add one or several document(s) to all indexes */ addToIndexes (doc) { let failingIndex let error const keys = Object.keys(this.indexes) for (let i = 0; i < keys.length; i += 1) { try { this.indexes[keys[i]].insert(doc) } catch (e) { failingIndex = i error = e break } } // If an error happened, we need to rollback the insert on all other indexes if (error) { for (let i = 0; i < failingIndex; i += 1) { this.indexes[keys[i]].remove(doc) } throw error } } /** * Remove one or several document(s) from all indexes */ removeFromIndexes (doc) { for (const index of Object.values(this.indexes)) { index.remove(doc) } } /** * Update one or several documents in all indexes * To update multiple documents, oldDoc must be an array of { oldDoc, newDoc } pairs * If one update violates a constraint, all changes are rolled back */ updateIndexes (oldDoc, newDoc) { let failingIndex let error const keys = Object.keys(this.indexes) for (let i = 0; i < keys.length; i += 1) { try { this.indexes[keys[i]].update(oldDoc, newDoc) } catch (e) { failingIndex = i error = e break } } // If an error happened, we need to rollback the update on all other indexes if (error) { for (let i = 0; i < failingIndex; i += 1) { this.indexes[keys[i]].revertUpdate(oldDoc, newDoc) } throw error } } /** * Return the list of candidates for a given query * Crude implementation for now, we return the candidates given by the first usable index if any * We try the following query types, in this order: basic match, $in match, comparison match * One way to make it better would be to enable the use of multiple indexes if the first usable index * returns too much data. I may do it in the future. * * Returned candidates will be scanned to find and remove all expired documents * * @param {Query} query * @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations * @param {Function} callback Signature err, candidates */ getCandidates (query, dontExpireStaleDocs, callback) { const indexNames = Object.keys(this.indexes) let usableQueryKeys if (typeof dontExpireStaleDocs === 'function') { callback = dontExpireStaleDocs dontExpireStaleDocs = false } async.waterfall([ // STEP 1: get candidates list by checking indexes from most to least frequent usecase cb => { // For a basic match usableQueryKeys = [] Object.keys(query).forEach(k => { if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || isDate(query[k]) || query[k] === null) { usableQueryKeys.push(k) } }) usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k)) if (usableQueryKeys.length > 0) { return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]])) } // For a $in match usableQueryKeys = [] Object.keys(query).forEach(k => { if (query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) { usableQueryKeys.push(k) } }) usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k)) if (usableQueryKeys.length > 0) { return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in)) } // For a comparison match usableQueryKeys = [] Object.keys(query).forEach(k => { if (query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) { usableQueryKeys.push(k) } }) usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k)) if (usableQueryKeys.length > 0) { return cb(null, this.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]])) } // By default, return all the DB data return cb(null, this.getAllData()) }, // STEP 2: remove all expired documents docs => { if (dontExpireStaleDocs) return callback(null, docs) const expiredDocsIds = [] const validDocs = [] const ttlIndexesFieldNames = Object.keys(this.ttlIndexes) docs.forEach(doc => { let valid = true ttlIndexesFieldNames.forEach(i => { if (doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000) { valid = false } }) if (valid) validDocs.push(doc) else expiredDocsIds.push(doc._id) }) async.eachSeries(expiredDocsIds, (_id, cb) => { this._remove({ _id: _id }, {}, err => { if (err) return callback(err) return cb() }) // eslint-disable-next-line node/handle-callback-err }, err => { // TODO: handle error return callback(null, validDocs) }) }]) } /** * Insert a new document * @param {Document} newDoc * @param {Function} callback Optional callback, signature: err, insertedDoc * * @api private Use Datastore.insert which has the same signature */ _insert (newDoc, callback = () => {}) { let preparedDoc try { preparedDoc = this.prepareDocumentForInsertion(newDoc) this._insertInCache(preparedDoc) } catch (e) { return callback(e) } this.persistence.persistNewState(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc], err => { if (err) return callback(err) return callback(null, model.deepCopy(preparedDoc)) }) } /** * Create a new _id that's not already in use */ createNewId () { let attemptId = customUtils.uid(16) // Try as many times as needed to get an unused _id. As explained in customUtils, the probability of this ever happening is extremely small, so this is O(1) if (this.indexes._id.getMatching(attemptId).length > 0) attemptId = this.createNewId() return attemptId } /** * Prepare a document (or array of documents) to be inserted in a database * Meaning adds _id and timestamps if necessary on a copy of newDoc to avoid any side effect on user input * @api private */ prepareDocumentForInsertion (newDoc) { let preparedDoc if (Array.isArray(newDoc)) { preparedDoc = [] newDoc.forEach(doc => { preparedDoc.push(this.prepareDocumentForInsertion(doc)) }) } else { preparedDoc = model.deepCopy(newDoc) if (preparedDoc._id === undefined) preparedDoc._id = this.createNewId() const now = new Date() if (this.timestampData && preparedDoc.createdAt === undefined) preparedDoc.createdAt = now if (this.timestampData && preparedDoc.updatedAt === undefined) preparedDoc.updatedAt = now model.checkObject(preparedDoc) } return preparedDoc } /** * If newDoc is an array of documents, this will insert all documents in the cache * @api private */ _insertInCache (preparedDoc) { if (Array.isArray(preparedDoc)) this._insertMultipleDocsInCache(preparedDoc) else this.addToIndexes(preparedDoc) } /** * If one insertion fails (e.g. because of a unique constraint), roll back all previous * inserts and throws the error * @api private */ _insertMultipleDocsInCache (preparedDocs) { let failingIndex let error for (let i = 0; i < preparedDocs.length; i += 1) { try { this.addToIndexes(preparedDocs[i]) } catch (e) { error = e failingIndex = i break } } if (error) { for (let i = 0; i < failingIndex; i += 1) { this.removeFromIndexes(preparedDocs[i]) } throw error } } insert () { this.executor.push({ this: this, fn: this._insert, arguments: arguments }) } /** * Count all documents matching the query * @param {Object} query MongoDB-style query * @param {Function} callback Optional callback, signature: err, count */ count (query, callback) { const cursor = new Cursor(this, query, function (err, docs, callback) { if (err) { return callback(err) } return callback(null, docs.length) }) if (typeof callback === 'function') cursor.exec(callback) else return cursor } /** * Find all documents matching the query * If no callback is passed, we return the cursor so that user can limit, skip and finally exec * @param {Object} query MongoDB-style query * @param {Object} projection MongoDB-style projection * @param {Function} callback Optional callback, signature: err, docs */ find (query, projection, callback) { if (arguments.length === 1) { projection = {} // callback is undefined, will return a cursor } else if (arguments.length === 2) { if (typeof projection === 'function') { callback = projection projection = {} } // If not assume projection is an object and callback undefined } const cursor = new Cursor(this, query, function (err, docs, callback) { if (err) { return callback(err) } const res = docs.map(doc => model.deepCopy(doc)) return callback(null, res) }) cursor.projection(projection) if (typeof callback === 'function') cursor.exec(callback) else return cursor } /** * Find one document matching the query * @param {Object} query MongoDB-style query * @param {Object} projection MongoDB-style projection * @param {Function} callback Optional callback, signature: err, doc */ findOne (query, projection, callback) { if (arguments.length === 1) { projection = {} // callback is undefined, will return a cursor } else if (arguments.length === 2) { if (typeof projection === 'function') { callback = projection projection = {} } // If not assume projection is an object and callback undefined } const cursor = new Cursor(this, query, (err, docs, callback) => { if (err) return callback(err) if (docs.length === 1) return callback(null, model.deepCopy(docs[0])) else return callback(null, null) }) cursor.projection(projection).limit(1) if (typeof callback === 'function') cursor.exec(callback) else return cursor } /** * Update all docs matching query * @param {Object} query * @param {Object} updateQuery * @param {Object} options Optional options * options.multi If true, can update multiple documents (defaults to false) * options.upsert If true, document is inserted if the query doesn't match anything * options.returnUpdatedDocs Defaults to false, if true return as third argument the array of updated matched documents (even if no change actually took place) * @param {Function} cb Optional callback, signature: (err, numAffected, affectedDocuments, upsert) * If update was an upsert, upsert flag is set to true * affectedDocuments can be one of the following: * * For an upsert, the upserted document * * For an update with returnUpdatedDocs option false, null * * For an update with returnUpdatedDocs true and multi false, the updated document * * For an update with returnUpdatedDocs true and multi true, the array of updated documents * * WARNING: The API was changed between v1.7.4 and v1.8, for consistency and readability reasons. Prior and including to v1.7.4, * the callback signature was (err, numAffected, updated) where updated was the updated document in case of an upsert * or the array of updated documents for an update if the returnUpdatedDocs option was true. That meant that the type of * affectedDocuments in a non multi update depended on whether there was an upsert or not, leaving only two ways for the * user to check whether an upsert had occured: checking the type of affectedDocuments or running another find query on * the whole dataset to check its size. Both options being ugly, the breaking change was necessary. * * @api private Use Datastore.update which has the same signature */ _update (query, updateQuery, options, cb) { if (typeof options === 'function') { cb = options options = {} } const callback = cb || (() => {}) const multi = options.multi !== undefined ? options.multi : false const upsert = options.upsert !== undefined ? options.upsert : false async.waterfall([ cb => { // If upsert option is set, check whether we need to insert the doc if (!upsert) return cb() // Need to use an internal function not tied to the executor to avoid deadlock const cursor = new Cursor(this, query) cursor.limit(1)._exec((err, docs) => { if (err) return callback(err) if (docs.length === 1) return cb() else { let toBeInserted try { model.checkObject(updateQuery) // updateQuery is a simple object with no modifier, use it as the document to insert toBeInserted = updateQuery } catch (e) { // updateQuery contains modifiers, use the find query as the base, // strip it from all operators and update it according to updateQuery try { toBeInserted = model.modify(model.deepCopy(query, true), updateQuery) } catch (err) { return callback(err) } } return this._insert(toBeInserted, (err, newDoc) => { if (err) return callback(err) return callback(null, 1, newDoc, true) }) } }) }, () => { // Perform the update let numReplaced = 0 let modifiedDoc const modifications = [] let createdAt this.getCandidates(query, (err, candidates) => { if (err) return callback(err) // Preparing update (if an error is thrown here neither the datafile nor // the in-memory indexes are affected) try { for (const candidate of candidates) { if (model.match(candidate, query) && (multi || numReplaced === 0)) { numReplaced += 1 if (this.timestampData) { createdAt = candidate.createdAt } modifiedDoc = model.modify(candidate, updateQuery) if (this.timestampData) { modifiedDoc.createdAt = createdAt modifiedDoc.updatedAt = new Date() } modifications.push({ oldDoc: candidate, newDoc: modifiedDoc }) } } } catch (err) { return callback(err) } // Change the docs in memory try { this.updateIndexes(modifications) } catch (err) { return callback(err) } // Update the datafile const updatedDocs = modifications.map(x => x.newDoc) this.persistence.persistNewState(updatedDocs, err => { if (err) return callback(err) if (!options.returnUpdatedDocs) { return callback(null, numReplaced) } else { let updatedDocsDC = [] updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) }) if (!multi) updatedDocsDC = updatedDocsDC[0] return callback(null, numReplaced, updatedDocsDC) } }) }) }]) } update () { this.executor.push({ this: this, fn: this._update, arguments: arguments }) } /** * Remove all docs matching the query * For now very naive implementation (similar to update) * @param {Object} query * @param {Object} options Optional options * options.multi If true, can update multiple documents (defaults to false) * @param {Function} cb Optional callback, signature: err, numRemoved * * @api private Use Datastore.remove which has the same signature */ _remove (query, options, cb) { if (typeof options === 'function') { cb = options options = {} } const callback = cb || (() => {}) const multi = options.multi !== undefined ? options.multi : false this.getCandidates(query, true, (err, candidates) => { if (err) return callback(err) const removedDocs = [] let numRemoved = 0 try { candidates.forEach(d => { if (model.match(d, query) && (multi || numRemoved === 0)) { numRemoved += 1 removedDocs.push({ $$deleted: true, _id: d._id }) this.removeFromIndexes(d) } }) } catch (err) { return callback(err) } this.persistence.persistNewState(removedDocs, err => { if (err) return callback(err) return callback(null, numRemoved) }) }) } remove () { this.executor.push({ this: this, fn: this._remove, arguments: arguments }) } } module.exports = Datastore