The JavaScript Database, for Node.js, nw.js, electron and the browser
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
nedb/lib/datastore.js

664 lines
24 KiB

const { EventEmitter } = require('events')
const { callbackify } = require('util')
const Cursor = require('./cursor.js')
const customUtils = require('./customUtils.js')
const Executor = require('./executor.js')
const Index = require('./indexes.js')
const model = require('./model.js')
const Persistence = require('./persistence.js')
const { isDate } = require('./utils.js')
class Datastore extends EventEmitter {
/**
* Create a new collection
* @param {String} [options.filename] Optional, datastore will be in-memory only if not provided
* @param {Boolean} [options.timestampData] Optional, defaults to false. If set to true, createdAt and updatedAt will be created and populated automatically (if not specified by user)
* @param {Boolean} [options.inMemoryOnly] Optional, defaults to false
* @param {String} [options.nodeWebkitAppName] Optional, specify the name of your NW app if you want options.filename to be relative to the directory where Node Webkit stores application data such as cookies and local storage (the best place to store data in my opinion)
* @param {Boolean} [options.autoload] Optional, defaults to false
* @param {Function} [options.onload] Optional, if autoload is used this will be called after the load database with the error object as parameter. If you don't pass it the error will be thrown
* @param {Function} [options.beforeDeserialization] Optional, serialization hooks
* @param {Function} [options.afterSerialization] Optional, serialization hooks
* @param {Number} [options.corruptAlertThreshold] Optional, threshold after which an alert is thrown if too much data is corrupt
* @param {Function} [options.compareStrings] Optional, string comparison function that overrides default for sorting
*
* Event Emitter - Events
* * compaction.done - Fired whenever a compaction operation was finished
*/
constructor (options) {
super()
let filename
// Retrocompatibility with v0.6 and before
if (typeof options === 'string') {
filename = options
this.inMemoryOnly = false // Default
} else {
options = options || {}
filename = options.filename
this.inMemoryOnly = options.inMemoryOnly || false
this.autoload = options.autoload || false
this.timestampData = options.timestampData || false
}
// Determine whether in memory or persistent
if (!filename || typeof filename !== 'string' || filename.length === 0) {
this.filename = null
this.inMemoryOnly = true
} else {
this.filename = filename
}
// String comparison function
this.compareStrings = options.compareStrings
// Persistence handling
this.persistence = new Persistence({
db: this,
nodeWebkitAppName: options.nodeWebkitAppName,
afterSerialization: options.afterSerialization,
beforeDeserialization: options.beforeDeserialization,
corruptAlertThreshold: options.corruptAlertThreshold
})
// This new executor is ready if we don't use persistence
// If we do, it will only be ready once loadDatabase is called
this.executor = new Executor()
if (this.inMemoryOnly) this.executor.ready = true
// Indexed by field name, dot notation can be used
// _id is always indexed and since _ids are generated randomly the underlying
// binary is always well-balanced
this.indexes = {}
this.indexes._id = new Index({ fieldName: '_id', unique: true })
this.ttlIndexes = {}
// Queue a load of the database right away and call the onload handler
// By default (no onload handler), if there is an error there, no operation will be possible so warn the user by throwing an exception
if (this.autoload) {
this.loadDatabase(options.onload || (err => {
if (err) throw err
}))
}
}
/**
* Load the database from the datafile, and trigger the execution of buffered commands if any
*/
loadDatabase () {
this.executor.push({ this: this.persistence, fn: this.persistence.loadDatabase, arguments: arguments }, true)
}
loadDatabaseAsync () {
return this.executor.pushAsync({
this: this.persistence,
fn: this.persistence.loadDatabaseAsync,
arguments: arguments
}, true)
}
/**
* Get an array of all the data in the database
*/
getAllData () {
return this.indexes._id.getAll()
}
/**
* Reset all currently defined indexes
*/
resetIndexes (newData) {
for (const index of Object.values(this.indexes)) {
index.reset(newData)
}
}
/**
* Ensure an index is kept for this field. Same parameters as lib/indexes
* For now this function is synchronous, we need to test how much time it takes
* We use an async API for consistency with the rest of the code
* @param {Object} options
* @param {String} options.fieldName
* @param {Boolean} [options.unique]
* @param {Boolean} [options.sparse]
* @param {Number} [options.expireAfterSeconds] - Optional, if set this index becomes a TTL index (only works on Date fields, not arrays of Date)
* @param {Function} callback Optional callback, signature: err
*/
ensureIndex (options = {}, callback = () => {}) {
callbackify(this.ensureIndexAsync.bind(this))(options, callback)
}
async ensureIndexAsync (options = {}) {
if (!options.fieldName) {
const err = new Error('Cannot create an index without a fieldName')
err.missingFieldName = true
throw err
}
if (this.indexes[options.fieldName]) return
this.indexes[options.fieldName] = new Index(options)
if (options.expireAfterSeconds !== undefined) this.ttlIndexes[options.fieldName] = options.expireAfterSeconds // With this implementation index creation is not necessary to ensure TTL but we stick with MongoDB's API here
try {
this.indexes[options.fieldName].insert(this.getAllData())
} catch (e) {
delete this.indexes[options.fieldName]
throw e
}
// We may want to force all options to be persisted including defaults, not just the ones passed the index creation function
await this.persistence.persistNewStateAsync([{ $$indexCreated: options }])
}
/**
* Remove an index
* @param {String} fieldName
* @param {Function} callback Optional callback, signature: err
*/
// TODO: contrary to what is said in the JSDoc, this function should probably be called through the executor, it persists a new state
removeIndex (fieldName, callback = () => {}) {
callbackify(this.removeIndexAsync.bind(this))(fieldName, callback)
}
async removeIndexAsync (fieldName) {
delete this.indexes[fieldName]
await this.persistence.persistNewStateAsync([{ $$indexRemoved: fieldName }])
}
/**
* Add one or several document(s) to all indexes
*/
addToIndexes (doc) {
let failingIndex
let error
const keys = Object.keys(this.indexes)
for (let i = 0; i < keys.length; i += 1) {
try {
this.indexes[keys[i]].insert(doc)
} catch (e) {
failingIndex = i
error = e
break
}
}
// If an error happened, we need to rollback the insert on all other indexes
if (error) {
for (let i = 0; i < failingIndex; i += 1) {
this.indexes[keys[i]].remove(doc)
}
throw error
}
}
/**
* Remove one or several document(s) from all indexes
*/
removeFromIndexes (doc) {
for (const index of Object.values(this.indexes)) {
index.remove(doc)
}
}
/**
* Update one or several documents in all indexes
* To update multiple documents, oldDoc must be an array of { oldDoc, newDoc } pairs
* If one update violates a constraint, all changes are rolled back
*/
updateIndexes (oldDoc, newDoc) {
let failingIndex
let error
const keys = Object.keys(this.indexes)
for (let i = 0; i < keys.length; i += 1) {
try {
this.indexes[keys[i]].update(oldDoc, newDoc)
} catch (e) {
failingIndex = i
error = e
break
}
}
// If an error happened, we need to rollback the update on all other indexes
if (error) {
for (let i = 0; i < failingIndex; i += 1) {
this.indexes[keys[i]].revertUpdate(oldDoc, newDoc)
}
throw error
}
}
_getCandidates (query) {
const indexNames = Object.keys(this.indexes)
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
// For a basic match
let usableQuery
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate(v) || v === null) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1])
// For a $in match
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1].$in)
// For a comparison match
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getBetweenBounds(usableQuery[1])
// By default, return all the DB data
return this.getAllData()
}
/**
* Return the list of candidates for a given query
* Crude implementation for now, we return the candidates given by the first usable index if any
* We try the following query types, in this order: basic match, $in match, comparison match
* One way to make it better would be to enable the use of multiple indexes if the first usable index
* returns too much data. I may do it in the future.
*
* Returned candidates will be scanned to find and remove all expired documents
*
* @param {Query} query
* @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations
* @param {Function} callback Signature err, candidates
*/
getCandidates (query, dontExpireStaleDocs, callback) {
if (typeof dontExpireStaleDocs === 'function') {
callback = dontExpireStaleDocs
dontExpireStaleDocs = false
}
callbackify(this.getCandidatesAsync.bind(this))(query, dontExpireStaleDocs, callback)
}
async getCandidatesAsync (query, dontExpireStaleDocs = false) {
const validDocs = []
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
const docs = this._getCandidates(query)
// STEP 2: remove all expired documents
if (!dontExpireStaleDocs) {
const expiredDocsIds = []
const ttlIndexesFieldNames = Object.keys(this.ttlIndexes)
docs.forEach(doc => {
if (ttlIndexesFieldNames.every(i => !(doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000))) validDocs.push(doc)
else expiredDocsIds.push(doc._id)
})
for (const _id of expiredDocsIds) {
await new Promise((resolve, reject) => {
this._remove({ _id: _id }, {}, err => {
if (err) return reject(err)
return resolve()
})
})
}
} else validDocs.push(...docs)
return validDocs
}
/**
* Insert a new document
* Private Use Datastore.insert which has the same signature
* @param {Document} newDoc
* @param {Function} callback Optional callback, signature: err, insertedDoc
*
* @private
*/
_insert (newDoc, callback = () => {}) {
return callbackify(this._insertAsync.bind(this))(newDoc, callback)
}
async _insertAsync (newDoc) {
const preparedDoc = this._prepareDocumentForInsertion(newDoc)
this._insertInCache(preparedDoc)
await this.persistence.persistNewStateAsync(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc])
return model.deepCopy(preparedDoc)
}
/**
* Create a new _id that's not already in use
* @private
*/
_createNewId () {
let attemptId = customUtils.uid(16)
// Try as many times as needed to get an unused _id. As explained in customUtils, the probability of this ever happening is extremely small, so this is O(1)
if (this.indexes._id.getMatching(attemptId).length > 0) attemptId = this._createNewId()
return attemptId
}
/**
* Prepare a document (or array of documents) to be inserted in a database
* Meaning adds _id and timestamps if necessary on a copy of newDoc to avoid any side effect on user input
* @private
*/
_prepareDocumentForInsertion (newDoc) {
let preparedDoc
if (Array.isArray(newDoc)) {
preparedDoc = []
newDoc.forEach(doc => { preparedDoc.push(this._prepareDocumentForInsertion(doc)) })
} else {
preparedDoc = model.deepCopy(newDoc)
if (preparedDoc._id === undefined) preparedDoc._id = this._createNewId()
const now = new Date()
if (this.timestampData && preparedDoc.createdAt === undefined) preparedDoc.createdAt = now
if (this.timestampData && preparedDoc.updatedAt === undefined) preparedDoc.updatedAt = now
model.checkObject(preparedDoc)
}
return preparedDoc
}
/**
* If newDoc is an array of documents, this will insert all documents in the cache
* @private
*/
_insertInCache (preparedDoc) {
if (Array.isArray(preparedDoc)) this._insertMultipleDocsInCache(preparedDoc)
else this.addToIndexes(preparedDoc)
}
/**
* If one insertion fails (e.g. because of a unique constraint), roll back all previous
* inserts and throws the error
* @private
*/
_insertMultipleDocsInCache (preparedDocs) {
let failingIndex
let error
for (let i = 0; i < preparedDocs.length; i += 1) {
try {
this.addToIndexes(preparedDocs[i])
} catch (e) {
error = e
failingIndex = i
break
}
}
if (error) {
for (let i = 0; i < failingIndex; i += 1) {
this.removeFromIndexes(preparedDocs[i])
}
throw error
}
}
insert () {
this.executor.push({ this: this, fn: this._insert, arguments: arguments })
}
insertAsync () {
return this.executor.push({ this: this, fn: this._insertAsync, arguments: arguments, async: true })
}
/**
* Count all documents matching the query
* @param {Query} query MongoDB-style query
* @param {Function} callback Optional callback, signature: err, count
*/
count (query, callback) {
const cursor = this.countAsync(query)
if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback)
else return cursor
}
countAsync (query) {
const cursor = new Cursor(this, query, async docs => docs.length, true)
return cursor // this is a trick, Cursor itself is a thenable, which allows to await it
}
/**
* Find all documents matching the query
* If no callback is passed, we return the cursor so that user can limit, skip and finally exec
* @param {Object} query MongoDB-style query
* @param {Object} projection MongoDB-style projection
* @param {Function} callback Optional callback, signature: err, docs
*/
find (query, projection, callback) {
if (arguments.length === 1) {
projection = {}
// callback is undefined, will return a cursor
} else if (arguments.length === 2) {
if (typeof projection === 'function') {
callback = projection
projection = {}
} // If not assume projection is an object and callback undefined
}
const cursor = this.findAsync(query, projection)
if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback)
else return cursor
}
findAsync (query, projection = {}) {
const cursor = new Cursor(this, query, docs => docs.map(doc => model.deepCopy(doc)), true)
cursor.projection(projection)
return cursor
}
/**
* Find one document matching the query
* @param {Object} query MongoDB-style query
* @param {Object} projection MongoDB-style projection
* @param {Function} callback Optional callback, signature: err, doc
*/
findOne (query, projection, callback) {
if (arguments.length === 1) {
projection = {}
// callback is undefined, will return a cursor
} else if (arguments.length === 2) {
if (typeof projection === 'function') {
callback = projection
projection = {}
} // If not assume projection is an object and callback undefined
}
const cursor = this.findOneAsync(query, projection)
if (typeof callback === 'function') callbackify(cursor.execAsync.bind(cursor))(callback)
else return cursor
}
findOneAsync (query, projection = {}) {
const cursor = new Cursor(this, query, docs => docs.length === 1 ? model.deepCopy(docs[0]) : null, true)
cursor.projection(projection).limit(1)
return cursor
}
/**
* Update all docs matching query.
* Use Datastore.update which has the same signature
* @param {Object} query
* @param {Object} updateQuery
* @param {Object} options Optional options
* options.multi If true, can update multiple documents (defaults to false)
* options.upsert If true, document is inserted if the query doesn't match anything
* options.returnUpdatedDocs Defaults to false, if true return as third argument the array of updated matched documents (even if no change actually took place)
* @param {Function} cb Optional callback, signature: (err, numAffected, affectedDocuments, upsert)
* If update was an upsert, upsert flag is set to true
* affectedDocuments can be one of the following:
* * For an upsert, the upserted document
* * For an update with returnUpdatedDocs option false, null
* * For an update with returnUpdatedDocs true and multi false, the updated document
* * For an update with returnUpdatedDocs true and multi true, the array of updated documents
*
* WARNING: The API was changed between v1.7.4 and v1.8, for consistency and readability reasons. Prior and including to v1.7.4,
* the callback signature was (err, numAffected, updated) where updated was the updated document in case of an upsert
* or the array of updated documents for an update if the returnUpdatedDocs option was true. That meant that the type of
* affectedDocuments in a non multi update depended on whether there was an upsert or not, leaving only two ways for the
* user to check whether an upsert had occured: checking the type of affectedDocuments or running another find query on
* the whole dataset to check its size. Both options being ugly, the breaking change was necessary.
*
* @private
*/
_update (query, updateQuery, options, cb) {
if (typeof options === 'function') {
cb = options
options = {}
}
const callback = cb || (() => {})
const _callback = (err, res = {}) => {
callback(err, res.numAffected, res.affectedDocuments, res.upsert)
}
callbackify(this._updateAsync.bind(this))(query, updateQuery, options, _callback)
}
async _updateAsync (query, updateQuery, options = {}) {
const multi = options.multi !== undefined ? options.multi : false
const upsert = options.upsert !== undefined ? options.upsert : false
// If upsert option is set, check whether we need to insert the doc
if (upsert) {
// Need to use an internal function not tied to the executor to avoid deadlock
const cursor = new Cursor(this, query)
const docs = await new Promise((resolve, reject) => {
cursor.limit(1)._exec((err, docs) => {
if (err) reject(err)
else resolve(docs)
})
})
if (docs.length !== 1) {
let toBeInserted
try {
model.checkObject(updateQuery)
// updateQuery is a simple object with no modifier, use it as the document to insert
toBeInserted = updateQuery
} catch (e) {
// updateQuery contains modifiers, use the find query as the base,
// strip it from all operators and update it according to updateQuery
toBeInserted = model.modify(model.deepCopy(query, true), updateQuery)
}
return new Promise((resolve, reject) => {
this._insert(toBeInserted, (err, newDoc) => {
if (err) return reject(err)
return resolve({ numAffected: 1, affectedDocuments: newDoc, upsert: true })
})
})
}
}
// Perform the update
let numReplaced = 0
let modifiedDoc
const modifications = []
let createdAt
const candidates = await this.getCandidatesAsync(query)
// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
for (const candidate of candidates) {
if (model.match(candidate, query) && (multi || numReplaced === 0)) {
numReplaced += 1
if (this.timestampData) { createdAt = candidate.createdAt }
modifiedDoc = model.modify(candidate, updateQuery)
if (this.timestampData) {
modifiedDoc.createdAt = createdAt
modifiedDoc.updatedAt = new Date()
}
modifications.push({ oldDoc: candidate, newDoc: modifiedDoc })
}
}
// Change the docs in memory
this.updateIndexes(modifications)
// Update the datafile
const updatedDocs = modifications.map(x => x.newDoc)
await this.persistence.persistNewStateAsync(updatedDocs)
if (!options.returnUpdatedDocs) return { numAffected: numReplaced }
else {
let updatedDocsDC = []
updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) })
if (!multi) updatedDocsDC = updatedDocsDC[0]
return { numAffected: numReplaced, affectedDocuments: updatedDocsDC }
}
}
update () {
this.executor.push({ this: this, fn: this._update, arguments: arguments })
}
updateAsync () {
return this.executor.pushAsync({ this: this, fn: this._updateAsync, arguments: arguments })
}
/**
* Remove all docs matching the query.
* Use Datastore.remove which has the same signature
* For now very naive implementation (similar to update)
* @param {Object} query
* @param {Object} options Optional options
* options.multi If true, can update multiple documents (defaults to false)
* @param {Function} cb Optional callback, signature: err, numRemoved
*
* @private
*/
_remove (query, options, cb) {
if (typeof options === 'function') {
cb = options
options = {}
}
const callback = cb || (() => {})
callbackify(this._removeAsync.bind(this))(query, options, callback)
}
async _removeAsync (query, options = {}) {
const multi = options.multi !== undefined ? options.multi : false
const candidates = await this.getCandidatesAsync(query, true)
const removedDocs = []
let numRemoved = 0
candidates.forEach(d => {
if (model.match(d, query) && (multi || numRemoved === 0)) {
numRemoved += 1
removedDocs.push({ $$deleted: true, _id: d._id })
this.removeFromIndexes(d)
}
})
await this.persistence.persistNewStateAsync(removedDocs)
return numRemoved
}
remove () {
this.executor.push({ this: this, fn: this._remove, arguments: arguments })
}
removeAsync () {
return this.executor.pushAsync({ this: this, fn: this._removeAsync, arguments: arguments })
}
}
module.exports = Datastore