mirror of https://github.com/seald/nedb
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
663 lines
24 KiB
663 lines
24 KiB
const { EventEmitter } = require('events')
|
|
const async = require('async')
|
|
const Cursor = require('./cursor.js')
|
|
const customUtils = require('./customUtils.js')
|
|
const Executor = require('./executor.js')
|
|
const Index = require('./indexes.js')
|
|
const model = require('./model.js')
|
|
const Persistence = require('./persistence.js')
|
|
const { isDate } = require('./utils.js')
|
|
|
|
class Datastore extends EventEmitter {
|
|
/**
|
|
* Create a new collection
|
|
* @param {String} [options.filename] Optional, datastore will be in-memory only if not provided
|
|
* @param {Boolean} [options.timestampData] Optional, defaults to false. If set to true, createdAt and updatedAt will be created and populated automatically (if not specified by user)
|
|
* @param {Boolean} [options.inMemoryOnly] Optional, defaults to false
|
|
* @param {String} [options.nodeWebkitAppName] Optional, specify the name of your NW app if you want options.filename to be relative to the directory where
|
|
* Node Webkit stores application data such as cookies and local storage (the best place to store data in my opinion)
|
|
* @param {Boolean} [options.autoload] Optional, defaults to false
|
|
* @param {Function} [options.onload] Optional, if autoload is used this will be called after the load database with the error object as parameter. If you don't pass it the error will be thrown
|
|
* @param {Function} [options.beforeDeserialization] Optional, serialization hooks
|
|
* @param {Function} [options.afterSerialization] Optional, serialization hooks
|
|
* @param {Number} [options.corruptAlertThreshold] Optional, threshold after which an alert is thrown if too much data is corrupt
|
|
* @param {Function} [options.compareStrings] Optional, string comparison function that overrides default for sorting
|
|
*
|
|
* Event Emitter - Events
|
|
* * compaction.done - Fired whenever a compaction operation was finished
|
|
*/
|
|
constructor (options) {
|
|
super()
|
|
let filename
|
|
|
|
// Retrocompatibility with v0.6 and before
|
|
if (typeof options === 'string') {
|
|
filename = options
|
|
this.inMemoryOnly = false // Default
|
|
} else {
|
|
options = options || {}
|
|
filename = options.filename
|
|
this.inMemoryOnly = options.inMemoryOnly || false
|
|
this.autoload = options.autoload || false
|
|
this.timestampData = options.timestampData || false
|
|
}
|
|
|
|
// Determine whether in memory or persistent
|
|
if (!filename || typeof filename !== 'string' || filename.length === 0) {
|
|
this.filename = null
|
|
this.inMemoryOnly = true
|
|
} else {
|
|
this.filename = filename
|
|
}
|
|
|
|
// String comparison function
|
|
this.compareStrings = options.compareStrings
|
|
|
|
// Persistence handling
|
|
this.persistence = new Persistence({
|
|
db: this,
|
|
nodeWebkitAppName: options.nodeWebkitAppName,
|
|
afterSerialization: options.afterSerialization,
|
|
beforeDeserialization: options.beforeDeserialization,
|
|
corruptAlertThreshold: options.corruptAlertThreshold
|
|
})
|
|
|
|
// This new executor is ready if we don't use persistence
|
|
// If we do, it will only be ready once loadDatabase is called
|
|
this.executor = new Executor()
|
|
if (this.inMemoryOnly) this.executor.ready = true
|
|
|
|
// Indexed by field name, dot notation can be used
|
|
// _id is always indexed and since _ids are generated randomly the underlying
|
|
// binary is always well-balanced
|
|
this.indexes = {}
|
|
this.indexes._id = new Index({ fieldName: '_id', unique: true })
|
|
this.ttlIndexes = {}
|
|
|
|
// Queue a load of the database right away and call the onload handler
|
|
// By default (no onload handler), if there is an error there, no operation will be possible so warn the user by throwing an exception
|
|
if (this.autoload) {
|
|
this.loadDatabase(options.onload || (err => {
|
|
if (err) throw err
|
|
}))
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Load the database from the datafile, and trigger the execution of buffered commands if any
|
|
*/
|
|
loadDatabase () {
|
|
this.executor.push({ this: this.persistence, fn: this.persistence.loadDatabase, arguments: arguments }, true)
|
|
}
|
|
|
|
/**
|
|
* Get an array of all the data in the database
|
|
*/
|
|
getAllData () {
|
|
return this.indexes._id.getAll()
|
|
}
|
|
|
|
/**
|
|
* Reset all currently defined indexes
|
|
*/
|
|
resetIndexes (newData) {
|
|
for (const index of Object.values(this.indexes)) {
|
|
index.reset(newData)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensure an index is kept for this field. Same parameters as lib/indexes
|
|
* For now this function is synchronous, we need to test how much time it takes
|
|
* We use an async API for consistency with the rest of the code
|
|
* @param {Object} options
|
|
* @param {String} options.fieldName
|
|
* @param {Boolean} options.unique
|
|
* @param {Boolean} options.sparse
|
|
* @param {Number} options.expireAfterSeconds - Optional, if set this index becomes a TTL index (only works on Date fields, not arrays of Date)
|
|
* @param {Function} callback Optional callback, signature: err
|
|
*/
|
|
ensureIndex (options = {}, callback = () => {}) {
|
|
if (!options.fieldName) {
|
|
const err = new Error('Cannot create an index without a fieldName')
|
|
err.missingFieldName = true
|
|
return callback(err)
|
|
}
|
|
if (this.indexes[options.fieldName]) return callback(null)
|
|
|
|
this.indexes[options.fieldName] = new Index(options)
|
|
if (options.expireAfterSeconds !== undefined) this.ttlIndexes[options.fieldName] = options.expireAfterSeconds // With this implementation index creation is not necessary to ensure TTL but we stick with MongoDB's API here
|
|
|
|
try {
|
|
this.indexes[options.fieldName].insert(this.getAllData())
|
|
} catch (e) {
|
|
delete this.indexes[options.fieldName]
|
|
return callback(e)
|
|
}
|
|
|
|
// We may want to force all options to be persisted including defaults, not just the ones passed the index creation function
|
|
this.persistence.persistNewState([{ $$indexCreated: options }], err => {
|
|
if (err) return callback(err)
|
|
return callback(null)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Remove an index
|
|
* @param {String} fieldName
|
|
* @param {Function} callback Optional callback, signature: err
|
|
*/
|
|
removeIndex (fieldName, callback = () => {}) {
|
|
delete this.indexes[fieldName]
|
|
|
|
this.persistence.persistNewState([{ $$indexRemoved: fieldName }], err => {
|
|
if (err) return callback(err)
|
|
return callback(null)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Add one or several document(s) to all indexes
|
|
*/
|
|
addToIndexes (doc) {
|
|
let failingIndex
|
|
let error
|
|
const keys = Object.keys(this.indexes)
|
|
|
|
for (let i = 0; i < keys.length; i += 1) {
|
|
try {
|
|
this.indexes[keys[i]].insert(doc)
|
|
} catch (e) {
|
|
failingIndex = i
|
|
error = e
|
|
break
|
|
}
|
|
}
|
|
|
|
// If an error happened, we need to rollback the insert on all other indexes
|
|
if (error) {
|
|
for (let i = 0; i < failingIndex; i += 1) {
|
|
this.indexes[keys[i]].remove(doc)
|
|
}
|
|
|
|
throw error
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Remove one or several document(s) from all indexes
|
|
*/
|
|
removeFromIndexes (doc) {
|
|
for (const index of Object.values(this.indexes)) {
|
|
index.remove(doc)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Update one or several documents in all indexes
|
|
* To update multiple documents, oldDoc must be an array of { oldDoc, newDoc } pairs
|
|
* If one update violates a constraint, all changes are rolled back
|
|
*/
|
|
updateIndexes (oldDoc, newDoc) {
|
|
let failingIndex
|
|
let error
|
|
const keys = Object.keys(this.indexes)
|
|
|
|
for (let i = 0; i < keys.length; i += 1) {
|
|
try {
|
|
this.indexes[keys[i]].update(oldDoc, newDoc)
|
|
} catch (e) {
|
|
failingIndex = i
|
|
error = e
|
|
break
|
|
}
|
|
}
|
|
|
|
// If an error happened, we need to rollback the update on all other indexes
|
|
if (error) {
|
|
for (let i = 0; i < failingIndex; i += 1) {
|
|
this.indexes[keys[i]].revertUpdate(oldDoc, newDoc)
|
|
}
|
|
|
|
throw error
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Return the list of candidates for a given query
|
|
* Crude implementation for now, we return the candidates given by the first usable index if any
|
|
* We try the following query types, in this order: basic match, $in match, comparison match
|
|
* One way to make it better would be to enable the use of multiple indexes if the first usable index
|
|
* returns too much data. I may do it in the future.
|
|
*
|
|
* Returned candidates will be scanned to find and remove all expired documents
|
|
*
|
|
* @param {Query} query
|
|
* @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations
|
|
* @param {Function} callback Signature err, candidates
|
|
*/
|
|
getCandidates (query, dontExpireStaleDocs, callback) {
|
|
const indexNames = Object.keys(this.indexes)
|
|
let usableQueryKeys
|
|
|
|
if (typeof dontExpireStaleDocs === 'function') {
|
|
callback = dontExpireStaleDocs
|
|
dontExpireStaleDocs = false
|
|
}
|
|
|
|
async.waterfall([
|
|
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
|
|
cb => {
|
|
// For a basic match
|
|
usableQueryKeys = []
|
|
Object.keys(query).forEach(k => {
|
|
if (typeof query[k] === 'string' || typeof query[k] === 'number' || typeof query[k] === 'boolean' || isDate(query[k]) || query[k] === null) {
|
|
usableQueryKeys.push(k)
|
|
}
|
|
})
|
|
usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k))
|
|
if (usableQueryKeys.length > 0) {
|
|
return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]]))
|
|
}
|
|
|
|
// For a $in match
|
|
usableQueryKeys = []
|
|
Object.keys(query).forEach(k => {
|
|
if (query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) {
|
|
usableQueryKeys.push(k)
|
|
}
|
|
})
|
|
usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k))
|
|
if (usableQueryKeys.length > 0) {
|
|
return cb(null, this.indexes[usableQueryKeys[0]].getMatching(query[usableQueryKeys[0]].$in))
|
|
}
|
|
|
|
// For a comparison match
|
|
usableQueryKeys = []
|
|
Object.keys(query).forEach(k => {
|
|
if (query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) {
|
|
usableQueryKeys.push(k)
|
|
}
|
|
})
|
|
usableQueryKeys = usableQueryKeys.filter(k => indexNames.includes(k))
|
|
if (usableQueryKeys.length > 0) {
|
|
return cb(null, this.indexes[usableQueryKeys[0]].getBetweenBounds(query[usableQueryKeys[0]]))
|
|
}
|
|
|
|
// By default, return all the DB data
|
|
return cb(null, this.getAllData())
|
|
},
|
|
// STEP 2: remove all expired documents
|
|
docs => {
|
|
if (dontExpireStaleDocs) return callback(null, docs)
|
|
|
|
const expiredDocsIds = []
|
|
const validDocs = []
|
|
const ttlIndexesFieldNames = Object.keys(this.ttlIndexes)
|
|
|
|
docs.forEach(doc => {
|
|
let valid = true
|
|
ttlIndexesFieldNames.forEach(i => {
|
|
if (doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000) {
|
|
valid = false
|
|
}
|
|
})
|
|
if (valid) validDocs.push(doc)
|
|
else expiredDocsIds.push(doc._id)
|
|
})
|
|
|
|
async.eachSeries(expiredDocsIds, (_id, cb) => {
|
|
this._remove({ _id: _id }, {}, err => {
|
|
if (err) return callback(err)
|
|
return cb()
|
|
})
|
|
// eslint-disable-next-line node/handle-callback-err
|
|
}, err => {
|
|
// TODO: handle error
|
|
return callback(null, validDocs)
|
|
})
|
|
}])
|
|
}
|
|
|
|
/**
|
|
* Insert a new document
|
|
* @param {Document} newDoc
|
|
* @param {Function} callback Optional callback, signature: err, insertedDoc
|
|
*
|
|
* @api private Use Datastore.insert which has the same signature
|
|
*/
|
|
_insert (newDoc, callback = () => {}) {
|
|
let preparedDoc
|
|
|
|
try {
|
|
preparedDoc = this.prepareDocumentForInsertion(newDoc)
|
|
this._insertInCache(preparedDoc)
|
|
} catch (e) {
|
|
return callback(e)
|
|
}
|
|
|
|
this.persistence.persistNewState(Array.isArray(preparedDoc) ? preparedDoc : [preparedDoc], err => {
|
|
if (err) return callback(err)
|
|
return callback(null, model.deepCopy(preparedDoc))
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Create a new _id that's not already in use
|
|
*/
|
|
createNewId () {
|
|
let attemptId = customUtils.uid(16)
|
|
// Try as many times as needed to get an unused _id. As explained in customUtils, the probability of this ever happening is extremely small, so this is O(1)
|
|
if (this.indexes._id.getMatching(attemptId).length > 0) attemptId = this.createNewId()
|
|
return attemptId
|
|
}
|
|
|
|
/**
|
|
* Prepare a document (or array of documents) to be inserted in a database
|
|
* Meaning adds _id and timestamps if necessary on a copy of newDoc to avoid any side effect on user input
|
|
* @api private
|
|
*/
|
|
prepareDocumentForInsertion (newDoc) {
|
|
let preparedDoc
|
|
|
|
if (Array.isArray(newDoc)) {
|
|
preparedDoc = []
|
|
newDoc.forEach(doc => { preparedDoc.push(this.prepareDocumentForInsertion(doc)) })
|
|
} else {
|
|
preparedDoc = model.deepCopy(newDoc)
|
|
if (preparedDoc._id === undefined) preparedDoc._id = this.createNewId()
|
|
const now = new Date()
|
|
if (this.timestampData && preparedDoc.createdAt === undefined) preparedDoc.createdAt = now
|
|
if (this.timestampData && preparedDoc.updatedAt === undefined) preparedDoc.updatedAt = now
|
|
model.checkObject(preparedDoc)
|
|
}
|
|
|
|
return preparedDoc
|
|
}
|
|
|
|
/**
|
|
* If newDoc is an array of documents, this will insert all documents in the cache
|
|
* @api private
|
|
*/
|
|
_insertInCache (preparedDoc) {
|
|
if (Array.isArray(preparedDoc)) this._insertMultipleDocsInCache(preparedDoc)
|
|
else this.addToIndexes(preparedDoc)
|
|
}
|
|
|
|
/**
|
|
* If one insertion fails (e.g. because of a unique constraint), roll back all previous
|
|
* inserts and throws the error
|
|
* @api private
|
|
*/
|
|
_insertMultipleDocsInCache (preparedDocs) {
|
|
let failingIndex
|
|
let error
|
|
|
|
for (let i = 0; i < preparedDocs.length; i += 1) {
|
|
try {
|
|
this.addToIndexes(preparedDocs[i])
|
|
} catch (e) {
|
|
error = e
|
|
failingIndex = i
|
|
break
|
|
}
|
|
}
|
|
|
|
if (error) {
|
|
for (let i = 0; i < failingIndex; i += 1) {
|
|
this.removeFromIndexes(preparedDocs[i])
|
|
}
|
|
|
|
throw error
|
|
}
|
|
}
|
|
|
|
insert () {
|
|
this.executor.push({ this: this, fn: this._insert, arguments: arguments })
|
|
}
|
|
|
|
/**
|
|
* Count all documents matching the query
|
|
* @param {Object} query MongoDB-style query
|
|
* @param {Function} callback Optional callback, signature: err, count
|
|
*/
|
|
count (query, callback) {
|
|
const cursor = new Cursor(this, query, function (err, docs, callback) {
|
|
if (err) { return callback(err) }
|
|
return callback(null, docs.length)
|
|
})
|
|
|
|
if (typeof callback === 'function') cursor.exec(callback)
|
|
else return cursor
|
|
}
|
|
|
|
/**
|
|
* Find all documents matching the query
|
|
* If no callback is passed, we return the cursor so that user can limit, skip and finally exec
|
|
* @param {Object} query MongoDB-style query
|
|
* @param {Object} projection MongoDB-style projection
|
|
* @param {Function} callback Optional callback, signature: err, docs
|
|
*/
|
|
find (query, projection, callback) {
|
|
if (arguments.length === 1) {
|
|
projection = {}
|
|
// callback is undefined, will return a cursor
|
|
} else if (arguments.length === 2) {
|
|
if (typeof projection === 'function') {
|
|
callback = projection
|
|
projection = {}
|
|
} // If not assume projection is an object and callback undefined
|
|
}
|
|
|
|
const cursor = new Cursor(this, query, function (err, docs, callback) {
|
|
if (err) { return callback(err) }
|
|
|
|
const res = docs.map(doc => model.deepCopy(doc))
|
|
|
|
return callback(null, res)
|
|
})
|
|
|
|
cursor.projection(projection)
|
|
if (typeof callback === 'function') cursor.exec(callback)
|
|
else return cursor
|
|
}
|
|
|
|
/**
|
|
* Find one document matching the query
|
|
* @param {Object} query MongoDB-style query
|
|
* @param {Object} projection MongoDB-style projection
|
|
* @param {Function} callback Optional callback, signature: err, doc
|
|
*/
|
|
findOne (query, projection, callback) {
|
|
if (arguments.length === 1) {
|
|
projection = {}
|
|
// callback is undefined, will return a cursor
|
|
} else if (arguments.length === 2) {
|
|
if (typeof projection === 'function') {
|
|
callback = projection
|
|
projection = {}
|
|
} // If not assume projection is an object and callback undefined
|
|
}
|
|
|
|
const cursor = new Cursor(this, query, (err, docs, callback) => {
|
|
if (err) return callback(err)
|
|
if (docs.length === 1) return callback(null, model.deepCopy(docs[0]))
|
|
else return callback(null, null)
|
|
})
|
|
|
|
cursor.projection(projection).limit(1)
|
|
if (typeof callback === 'function') cursor.exec(callback)
|
|
else return cursor
|
|
}
|
|
|
|
/**
|
|
* Update all docs matching query
|
|
* @param {Object} query
|
|
* @param {Object} updateQuery
|
|
* @param {Object} options Optional options
|
|
* options.multi If true, can update multiple documents (defaults to false)
|
|
* options.upsert If true, document is inserted if the query doesn't match anything
|
|
* options.returnUpdatedDocs Defaults to false, if true return as third argument the array of updated matched documents (even if no change actually took place)
|
|
* @param {Function} cb Optional callback, signature: (err, numAffected, affectedDocuments, upsert)
|
|
* If update was an upsert, upsert flag is set to true
|
|
* affectedDocuments can be one of the following:
|
|
* * For an upsert, the upserted document
|
|
* * For an update with returnUpdatedDocs option false, null
|
|
* * For an update with returnUpdatedDocs true and multi false, the updated document
|
|
* * For an update with returnUpdatedDocs true and multi true, the array of updated documents
|
|
*
|
|
* WARNING: The API was changed between v1.7.4 and v1.8, for consistency and readability reasons. Prior and including to v1.7.4,
|
|
* the callback signature was (err, numAffected, updated) where updated was the updated document in case of an upsert
|
|
* or the array of updated documents for an update if the returnUpdatedDocs option was true. That meant that the type of
|
|
* affectedDocuments in a non multi update depended on whether there was an upsert or not, leaving only two ways for the
|
|
* user to check whether an upsert had occured: checking the type of affectedDocuments or running another find query on
|
|
* the whole dataset to check its size. Both options being ugly, the breaking change was necessary.
|
|
*
|
|
* @api private Use Datastore.update which has the same signature
|
|
*/
|
|
_update (query, updateQuery, options, cb) {
|
|
if (typeof options === 'function') {
|
|
cb = options
|
|
options = {}
|
|
}
|
|
const callback = cb || (() => {})
|
|
const multi = options.multi !== undefined ? options.multi : false
|
|
const upsert = options.upsert !== undefined ? options.upsert : false
|
|
|
|
async.waterfall([
|
|
cb => { // If upsert option is set, check whether we need to insert the doc
|
|
if (!upsert) return cb()
|
|
|
|
// Need to use an internal function not tied to the executor to avoid deadlock
|
|
const cursor = new Cursor(this, query)
|
|
cursor.limit(1)._exec((err, docs) => {
|
|
if (err) return callback(err)
|
|
if (docs.length === 1) return cb()
|
|
else {
|
|
let toBeInserted
|
|
|
|
try {
|
|
model.checkObject(updateQuery)
|
|
// updateQuery is a simple object with no modifier, use it as the document to insert
|
|
toBeInserted = updateQuery
|
|
} catch (e) {
|
|
// updateQuery contains modifiers, use the find query as the base,
|
|
// strip it from all operators and update it according to updateQuery
|
|
try {
|
|
toBeInserted = model.modify(model.deepCopy(query, true), updateQuery)
|
|
} catch (err) {
|
|
return callback(err)
|
|
}
|
|
}
|
|
|
|
return this._insert(toBeInserted, (err, newDoc) => {
|
|
if (err) return callback(err)
|
|
return callback(null, 1, newDoc, true)
|
|
})
|
|
}
|
|
})
|
|
},
|
|
() => { // Perform the update
|
|
let numReplaced = 0
|
|
let modifiedDoc
|
|
const modifications = []
|
|
let createdAt
|
|
|
|
this.getCandidates(query, (err, candidates) => {
|
|
if (err) return callback(err)
|
|
|
|
// Preparing update (if an error is thrown here neither the datafile nor
|
|
// the in-memory indexes are affected)
|
|
try {
|
|
for (const candidate of candidates) {
|
|
if (model.match(candidate, query) && (multi || numReplaced === 0)) {
|
|
numReplaced += 1
|
|
if (this.timestampData) { createdAt = candidate.createdAt }
|
|
modifiedDoc = model.modify(candidate, updateQuery)
|
|
if (this.timestampData) {
|
|
modifiedDoc.createdAt = createdAt
|
|
modifiedDoc.updatedAt = new Date()
|
|
}
|
|
modifications.push({ oldDoc: candidate, newDoc: modifiedDoc })
|
|
}
|
|
}
|
|
} catch (err) {
|
|
return callback(err)
|
|
}
|
|
|
|
// Change the docs in memory
|
|
try {
|
|
this.updateIndexes(modifications)
|
|
} catch (err) {
|
|
return callback(err)
|
|
}
|
|
|
|
// Update the datafile
|
|
const updatedDocs = modifications.map(x => x.newDoc)
|
|
this.persistence.persistNewState(updatedDocs, err => {
|
|
if (err) return callback(err)
|
|
if (!options.returnUpdatedDocs) {
|
|
return callback(null, numReplaced)
|
|
} else {
|
|
let updatedDocsDC = []
|
|
updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) })
|
|
if (!multi) updatedDocsDC = updatedDocsDC[0]
|
|
return callback(null, numReplaced, updatedDocsDC)
|
|
}
|
|
})
|
|
})
|
|
}])
|
|
}
|
|
|
|
update () {
|
|
this.executor.push({ this: this, fn: this._update, arguments: arguments })
|
|
}
|
|
|
|
/**
|
|
* Remove all docs matching the query
|
|
* For now very naive implementation (similar to update)
|
|
* @param {Object} query
|
|
* @param {Object} options Optional options
|
|
* options.multi If true, can update multiple documents (defaults to false)
|
|
* @param {Function} cb Optional callback, signature: err, numRemoved
|
|
*
|
|
* @api private Use Datastore.remove which has the same signature
|
|
*/
|
|
_remove (query, options, cb) {
|
|
if (typeof options === 'function') {
|
|
cb = options
|
|
options = {}
|
|
}
|
|
const callback = cb || (() => {})
|
|
const multi = options.multi !== undefined ? options.multi : false
|
|
|
|
this.getCandidates(query, true, (err, candidates) => {
|
|
if (err) return callback(err)
|
|
const removedDocs = []
|
|
let numRemoved = 0
|
|
|
|
try {
|
|
candidates.forEach(d => {
|
|
if (model.match(d, query) && (multi || numRemoved === 0)) {
|
|
numRemoved += 1
|
|
removedDocs.push({ $$deleted: true, _id: d._id })
|
|
this.removeFromIndexes(d)
|
|
}
|
|
})
|
|
} catch (err) {
|
|
return callback(err)
|
|
}
|
|
|
|
this.persistence.persistNewState(removedDocs, err => {
|
|
if (err) return callback(err)
|
|
return callback(null, numRemoved)
|
|
})
|
|
})
|
|
}
|
|
|
|
remove () {
|
|
this.executor.push({ this: this, fn: this._remove, arguments: arguments })
|
|
}
|
|
}
|
|
|
|
module.exports = Datastore
|
|
|