const { EventEmitter } = require('events') const { callbackify } = require('util') const Cursor = require('./cursor.js') const customUtils = require('./customUtils.js') const Executor = require('./executor.js') const Index = require('./indexes.js') const model = require('./model.js') const Persistence = require('./persistence.js') const { isDate } = require('./utils.js') class Datastore extends EventEmitter { /** * Create a new collection * @param {String} [options.filename] Optional, datastore will be in-memory only if not provided * @param {Boolean} [options.timestampData] Optional, defaults to false. If set to true, createdAt and updatedAt will be created and populated automatically (if not specified by user) * @param {Boolean} [options.inMemoryOnly] Optional, defaults to false * @param {String} [options.nodeWebkitAppName] Optional, specify the name of your NW app if you want options.filename to be relative to the directory where Node Webkit stores application data such as cookies and local storage (the best place to store data in my opinion) * @param {Boolean} [options.autoload] Optional, defaults to false * @param {Function} [options.onload] Optional, if autoload is used this will be called after the load database with the error object as parameter. If you don't pass it the error will be thrown * @param {Function} [options.beforeDeserialization] Optional, serialization hooks * @param {Function} [options.afterSerialization] Optional, serialization hooks * @param {Number} [options.corruptAlertThreshold] Optional, threshold after which an alert is thrown if too much data is corrupt * @param {Function} [options.compareStrings] Optional, string comparison function that overrides default for sorting * * Event Emitter - Events * * compaction.done - Fired whenever a compaction operation was finished */ constructor (options) { super() let filename // Retrocompatibility with v0.6 and before if (typeof options === 'string') { filename = options this.inMemoryOnly = false // Default } else { options = options || {} filename = options.filename this.inMemoryOnly = options.inMemoryOnly || false this.autoload = options.autoload || false this.timestampData = options.timestampData || false } // Determine whether in memory or persistent if (!filename || typeof filename !== 'string' || filename.length === 0) { this.filename = null this.inMemoryOnly = true } else { this.filename = filename } // String comparison function this.compareStrings = options.compareStrings // Persistence handling this.persistence = new Persistence({ db: this, nodeWebkitAppName: options.nodeWebkitAppName, afterSerialization: options.afterSerialization, beforeDeserialization: options.beforeDeserialization, corruptAlertThreshold: options.corruptAlertThreshold }) // This new executor is ready if we don't use persistence // If we do, it will only be ready once loadDatabase is called this.executor = new Executor() if (this.inMemoryOnly) this.executor.ready = true // Indexed by field name, dot notation can be used // _id is always indexed and since _ids are generated randomly the underlying // binary is always well-balanced this.indexes = {} this.indexes._id = new Index({ fieldName: '_id', unique: true }) this.ttlIndexes = {} // Queue a load of the database right away and call the onload handler // By default (no onload handler), if there is an error there, no operation will be possible so warn the user by throwing an exception if (this.autoload) { this.loadDatabase(options.onload || (err => { if (err) throw err })) } } /** * Load the database from the datafile, and trigger the execution of buffered commands if any */ loadDatabase (...args) { this.executor.push({ this: this.persistence, fn: this.persistence.loadDatabase, arguments: args }, true) } loadDatabaseAsync (...args) { return this.executor.pushAsync(() => this.persistence.loadDatabaseAsync(args), true) } /** * Get an array of all the data in the database */ getAllData () { return this.indexes._id.getAll() } /** * Reset all currently defined indexes */ resetIndexes (newData) { for (const index of Object.values(this.indexes)) { index.reset(newData) } } /** * Ensure an index is kept for this field. Same parameters as lib/indexes * For now this function is synchronous, we need to test how much time it takes * We use an async API for consistency with the rest of the code * @param {Object} options * @param {String} options.fieldName * @param {Boolean} [options.unique] * @param {Boolean} [options.sparse] * @param {Number} [options.expireAfterSeconds] - Optional, if set this index becomes a TTL index (only works on Date fields, not arrays of Date) * @param {Function} callback Optional callback, signature: err */ ensureIndex (options = {}, callback = () => {}) { callbackify(this.ensureIndexAsync.bind(this))(options, callback) } async ensureIndexAsync (options = {}) { if (!options.fieldName) { const err = new Error('Cannot create an index without a fieldName') err.missingFieldName = true throw err } if (this.indexes[options.fieldName]) return this.indexes[options.fieldName] = new Index(options) if (options.expireAfterSeconds !== undefined) this.ttlIndexes[options.fieldName] = options.expireAfterSeconds // With this implementation index creation is not necessary to ensure TTL but we stick with MongoDB's API here try { this.indexes[options.fieldName].insert(this.getAllData()) } catch (e) { delete this.indexes[options.fieldName] throw e } // We may want to force all options to be persisted including defaults, not just the ones passed the index creation function await this.persistence.persistNewStateAsync([{ $$indexCreated: options }]) } /** * Remove an index * @param {String} fieldName * @param {Function} callback Optional callback, signature: err */ // TODO: contrary to what is said in the JSDoc, this function should probably be called through the executor, it persists a new state removeIndex (fieldName, callback = () => {}) { callbackify(this.removeIndexAsync.bind(this))(fieldName, callback) } async removeIndexAsync (fieldName) { delete this.indexes[fieldName] await this.persistence.persistNewStateAsync([{ $$indexRemoved: fieldName }]) } /** * Add one or several document(s) to all indexes */ addToIndexes (doc) { let failingIndex let error const keys = Object.keys(this.indexes) for (let i = 0; i < keys.length; i += 1) { try { this.indexes[keys[i]].insert(doc) } catch (e) { failingIndex = i error = e break } } // If an error happened, we need to rollback the insert on all other indexes if (error) { for (let i = 0; i < failingIndex; i += 1) { this.indexes[keys[i]].remove(doc) } throw error } } /** * Remove one or several document(s) from all indexes */ removeFromIndexes (doc) { for (const index of Object.values(this.indexes)) { index.remove(doc) } } /** * Update one or several documents in all indexes * To update multiple documents, oldDoc must be an array of { oldDoc, newDoc } pairs * If one update violates a constraint, all changes are rolled back */ updateIndexes (oldDoc, newDoc) { let failingIndex let error const keys = Object.keys(this.indexes) for (let i = 0; i < keys.length; i += 1) { try { this.indexes[keys[i]].update(oldDoc, newDoc) } catch (e) { failingIndex = i error = e break } } // If an error happened, we need to rollback the update on all other indexes if (error) { for (let i = 0; i < failingIndex; i += 1) { this.indexes[keys[i]].revertUpdate(oldDoc, newDoc) } throw error } } _getCandidates (query) { const indexNames = Object.keys(this.indexes) // STEP 1: get candidates list by checking indexes from most to least frequent usecase // For a basic match let usableQuery usableQuery = Object.entries(query) .filter(([k, v]) => !!(typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate(v) || v === null) && indexNames.includes(k) ) .pop() if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1]) // For a $in match usableQuery = Object.entries(query) .filter(([k, v]) => !!(query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) && indexNames.includes(k) ) .pop() if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1].$in) // For a comparison match usableQuery = Object.entries(query) .filter(([k, v]) => !!(query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) && indexNames.includes(k) ) .pop() if (usableQuery) return this.indexes[usableQuery[0]].getBetweenBounds(usableQuery[1]) // By default, return all the DB data return this.getAllData() } /** * Return the list of candidates for a given query * Crude implementation for now, we return the candidates given by the first usable index if any * We try the following query types, in this order: basic match, $in match, comparison match * One way to make it better would be to enable the use of multiple indexes if the first usable index * returns too much data. I may do it in the future. * * Returned candidates will be scanned to find and remove all expired documents * * @param {Query} query * @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations * @param {Function} callback Signature err, candidates */ getCandidates (query, dontExpireStaleDocs, callback) { if (typeof dontExpireStaleDocs === 'function') { callback = dontExpireStaleDocs dontExpireStaleDocs = false } callbackify(this.getCandidatesAsync.bind(this))(query, dontExpireStaleDocs, callback) } async getCandidatesAsync (query, dontExpireStaleDocs = false) { const validDocs = [] // STEP 1: get candidates list by checking indexes from most to least frequent usecase const docs = this._getCandidates(query) // STEP 2: remove all expired documents if (!dontExpireStaleDocs) { const expiredDocsIds = [] const ttlIndexesFieldNames = Object.keys(this.ttlIndexes) docs.forEach(doc => { if (ttlIndexesFieldNames.every(i => !(doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000))) validDocs.push(doc) else expiredDocsIds.push(doc._id) }) for (const _id of expiredDocsIds) { await new Promise((resolve, reject) => { this._remove({ _id: _id }, {}, err => { if (err) return reject(err) return resolve() }) }) } } else validDocs.push(...docs) return validDocs } /** * Insert a new document * Private Use Datastore.insert which has the same signature * @param {Document} newDoc * @param {Function} callback Optional callback, signature: err, insertedDoc * * @private */ _insert (newDoc, callback = () => {}) { return callbackify(this._insertAsync.bind(this))(newDoc, callback) } async _insertAsync (newDoc) { const preparedDoc = this._prepareDocumentForInsertion(newDoc) this._insertInCache(preparedDoc) await this.persistence.persistNewStateAsync(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc]) return model.deepCopy(preparedDoc) } /** * Create a new _id that's not already in use * @private */ _createNewId () { let attemptId = customUtils.uid(16) // Try as many times as needed to get an unused _id. As explained in customUtils, the probability of this ever happening is extremely small, so this is O(1) if (this.indexes._id.getMatching(attemptId).length > 0) attemptId = this._createNewId() return attemptId } /** * Prepare a document (or array of documents) to be inserted in a database * Meaning adds _id and timestamps if necessary on a copy of newDoc to avoid any side effect on user input * @private */ _prepareDocumentForInsertion (newDoc) { let preparedDoc if (Array.isArray(newDoc)) { preparedDoc = [] newDoc.forEach(doc => { preparedDoc.push(this._prepareDocumentForInsertion(doc)) }) } else { preparedDoc = model.deepCopy(newDoc) if (preparedDoc._id === undefined) preparedDoc._id = this._createNewId() const now = new Date() if (this.timestampData && preparedDoc.createdAt === undefined) preparedDoc.createdAt = now if (this.timestampData && preparedDoc.updatedAt === undefined) preparedDoc.updatedAt = now model.checkObject(preparedDoc) } return preparedDoc } /** * If newDoc is an array of documents, this will insert all documents in the cache * @private */ _insertInCache (preparedDoc) { if (Array.isArray(preparedDoc)) this._insertMultipleDocsInCache(preparedDoc) else this.addToIndexes(preparedDoc) } /** * If one insertion fails (e.g. because of a unique constraint), roll back all previous * inserts and throws the error * @private */ _insertMultipleDocsInCache (preparedDocs) { let failingIndex let error for (let i = 0; i < preparedDocs.length; i += 1) { try { this.addToIndexes(preparedDocs[i]) } catch (e) { error = e failingIndex = i break } } if (error) { for (let i = 0; i < failingIndex; i += 1) { this.removeFromIndexes(preparedDocs[i]) } throw error } } insert (...args) { this.executor.push({ this: this, fn: this._insert, arguments: args }) } insertAsync (...args) { return this.executor.push({ this: this, fn: this._insertAsync, arguments: args, async: true }) } /** * Count all documents matching the query * @param {Query} query MongoDB-style query * @param {Function} callback Optional callback, signature: err, count */ count (query, callback) { const cursor = this.countAsync(query) if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback) else return cursor } countAsync (query) { return new Cursor(this, query, async docs => docs.length, true) // this is a trick, Cursor itself is a thenable, which allows to await it } /** * Find all documents matching the query * If no callback is passed, we return the cursor so that user can limit, skip and finally exec * @param {Object} query MongoDB-style query * @param {Object} projection MongoDB-style projection * @param {Function} callback Optional callback, signature: err, docs */ find (query, projection, callback) { if (arguments.length === 1) { projection = {} // callback is undefined, will return a cursor } else if (arguments.length === 2) { if (typeof projection === 'function') { callback = projection projection = {} } // If not assume projection is an object and callback undefined } const cursor = this.findAsync(query, projection) if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback) else return cursor } findAsync (query, projection = {}) { const cursor = new Cursor(this, query, docs => docs.map(doc => model.deepCopy(doc)), true) cursor.projection(projection) return cursor } /** * Find one document matching the query * @param {Object} query MongoDB-style query * @param {Object} projection MongoDB-style projection * @param {Function} callback Optional callback, signature: err, doc */ findOne (query, projection, callback) { if (arguments.length === 1) { projection = {} // callback is undefined, will return a cursor } else if (arguments.length === 2) { if (typeof projection === 'function') { callback = projection projection = {} } // If not assume projection is an object and callback undefined } const cursor = this.findOneAsync(query, projection) if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback) else return cursor } findOneAsync (query, projection = {}) { const cursor = new Cursor(this, query, docs => docs.length === 1 ? model.deepCopy(docs[0]) : null, true) cursor.projection(projection).limit(1) return cursor } /** * Update all docs matching query. * Use Datastore.update which has the same signature * @param {Object} query * @param {Object} updateQuery * @param {Object} options Optional options * options.multi If true, can update multiple documents (defaults to false) * options.upsert If true, document is inserted if the query doesn't match anything * options.returnUpdatedDocs Defaults to false, if true return as third argument the array of updated matched documents (even if no change actually took place) * @param {Function} cb Optional callback, signature: (err, numAffected, affectedDocuments, upsert) * If update was an upsert, upsert flag is set to true * affectedDocuments can be one of the following: * * For an upsert, the upserted document * * For an update with returnUpdatedDocs option false, null * * For an update with returnUpdatedDocs true and multi false, the updated document * * For an update with returnUpdatedDocs true and multi true, the array of updated documents * * WARNING: The API was changed between v1.7.4 and v1.8, for consistency and readability reasons. Prior and including to v1.7.4, * the callback signature was (err, numAffected, updated) where updated was the updated document in case of an upsert * or the array of updated documents for an update if the returnUpdatedDocs option was true. That meant that the type of * affectedDocuments in a non multi update depended on whether there was an upsert or not, leaving only two ways for the * user to check whether an upsert had occured: checking the type of affectedDocuments or running another find query on * the whole dataset to check its size. Both options being ugly, the breaking change was necessary. * * @private */ _update (query, updateQuery, options, cb) { if (typeof options === 'function') { cb = options options = {} } const callback = cb || (() => {}) const _callback = (err, res = {}) => { callback(err, res.numAffected, res.affectedDocuments, res.upsert) } callbackify(this._updateAsync.bind(this))(query, updateQuery, options, _callback) } async _updateAsync (query, updateQuery, options = {}) { const multi = options.multi !== undefined ? options.multi : false const upsert = options.upsert !== undefined ? options.upsert : false // If upsert option is set, check whether we need to insert the doc if (upsert) { // Need to use an internal function not tied to the executor to avoid deadlock const cursor = new Cursor(this, query) const docs = await new Promise((resolve, reject) => { cursor.limit(1)._exec((err, docs) => { if (err) reject(err) else resolve(docs) }) }) if (docs.length !== 1) { let toBeInserted try { model.checkObject(updateQuery) // updateQuery is a simple object with no modifier, use it as the document to insert toBeInserted = updateQuery } catch (e) { // updateQuery contains modifiers, use the find query as the base, // strip it from all operators and update it according to updateQuery toBeInserted = model.modify(model.deepCopy(query, true), updateQuery) } return new Promise((resolve, reject) => { this._insert(toBeInserted, (err, newDoc) => { if (err) return reject(err) return resolve({ numAffected: 1, affectedDocuments: newDoc, upsert: true }) }) }) } } // Perform the update let numReplaced = 0 let modifiedDoc const modifications = [] let createdAt const candidates = await this.getCandidatesAsync(query) // Preparing update (if an error is thrown here neither the datafile nor // the in-memory indexes are affected) for (const candidate of candidates) { if (model.match(candidate, query) && (multi || numReplaced === 0)) { numReplaced += 1 if (this.timestampData) { createdAt = candidate.createdAt } modifiedDoc = model.modify(candidate, updateQuery) if (this.timestampData) { modifiedDoc.createdAt = createdAt modifiedDoc.updatedAt = new Date() } modifications.push({ oldDoc: candidate, newDoc: modifiedDoc }) } } // Change the docs in memory this.updateIndexes(modifications) // Update the datafile const updatedDocs = modifications.map(x => x.newDoc) await this.persistence.persistNewStateAsync(updatedDocs) if (!options.returnUpdatedDocs) return { numAffected: numReplaced } else { let updatedDocsDC = [] updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) }) if (!multi) updatedDocsDC = updatedDocsDC[0] return { numAffected: numReplaced, affectedDocuments: updatedDocsDC } } } update (...args) { this.executor.push({ this: this, fn: this._update, arguments: args }) } updateAsync (...args) { return this.executor.pushAsync(() => this._updateAsync(args)) } /** * Remove all docs matching the query. * Use Datastore.remove which has the same signature * For now very naive implementation (similar to update) * @param {Object} query * @param {Object} options Optional options * options.multi If true, can update multiple documents (defaults to false) * @param {Function} cb Optional callback, signature: err, numRemoved * * @private */ _remove (query, options, cb) { if (typeof options === 'function') { cb = options options = {} } const callback = cb || (() => {}) callbackify(this._removeAsync.bind(this))(query, options, callback) } async _removeAsync (query, options = {}) { const multi = options.multi !== undefined ? options.multi : false const candidates = await this.getCandidatesAsync(query, true) const removedDocs = [] let numRemoved = 0 candidates.forEach(d => { if (model.match(d, query) && (multi || numRemoved === 0)) { numRemoved += 1 removedDocs.push({ $$deleted: true, _id: d._id }) this.removeFromIndexes(d) } }) await this.persistence.persistNewStateAsync(removedDocs) return numRemoved } remove (...args) { this.executor.push({ this: this, fn: this._remove, arguments: args }) } removeAsync (...args) { return this.executor.pushAsync(() => this._removeAsync(args)) } } module.exports = Datastore