const { EventEmitter } = require ( 'events' )
const util = require ( 'util' )
const async = require ( 'async' )
const Cursor = require ( './cursor.js' )
const customUtils = require ( './customUtils.js' )
const Executor = require ( './executor.js' )
const Index = require ( './indexes.js' )
const model = require ( './model.js' )
const Persistence = require ( './persistence.js' )
class Datastore extends EventEmitter {
/ * *
* Create a new collection
* @ param { String } options . filename Optional , datastore will be in - memory only if not provided
* @ param { Boolean } options . timestampData Optional , defaults to false . If set to true , createdAt and updatedAt will be created and populated automatically ( if not specified by user )
* @ param { Boolean } options . inMemoryOnly Optional , defaults to false
* @ param { String } options . nodeWebkitAppName Optional , specify the name of your NW app if you want options . filename to be relative to the directory where
* Node Webkit stores application data such as cookies and local storage ( the best place to store data in my opinion )
* @ param { Boolean } options . autoload Optional , defaults to false
* @ param { Function } options . onload Optional , if autoload is used this will be called after the load database with the error object as parameter . If you don ' t pass it the error will be thrown
* @ param { Function } options . afterSerialization / options . beforeDeserialization Optional , serialization hooks
* @ param { Number } options . corruptAlertThreshold Optional , threshold after which an alert is thrown if too much data is corrupt
* @ param { Function } options . compareStrings Optional , string comparison function that overrides default for sorting
*
* Event Emitter - Events
* * compaction . done - Fired whenever a compaction operation was finished
* /
constructor ( options ) {
super ( )
let filename
// Retrocompatibility with v0.6 and before
if ( typeof options === 'string' ) {
filename = options
this . inMemoryOnly = false // Default
} else {
options = options || { }
filename = options . filename
this . inMemoryOnly = options . inMemoryOnly || false
this . autoload = options . autoload || false
this . timestampData = options . timestampData || false
}
// Determine whether in memory or persistent
if ( ! filename || typeof filename !== 'string' || filename . length === 0 ) {
this . filename = null
this . inMemoryOnly = true
} else {
this . filename = filename
}
// String comparison function
this . compareStrings = options . compareStrings
// Persistence handling
this . persistence = new Persistence ( {
db : this ,
nodeWebkitAppName : options . nodeWebkitAppName ,
afterSerialization : options . afterSerialization ,
beforeDeserialization : options . beforeDeserialization ,
corruptAlertThreshold : options . corruptAlertThreshold
} )
// This new executor is ready if we don't use persistence
// If we do, it will only be ready once loadDatabase is called
this . executor = new Executor ( )
if ( this . inMemoryOnly ) this . executor . ready = true
// Indexed by field name, dot notation can be used
// _id is always indexed and since _ids are generated randomly the underlying
// binary is always well-balanced
this . indexes = { }
this . indexes . _id = new Index ( { fieldName : '_id' , unique : true } )
this . ttlIndexes = { }
// Queue a load of the database right away and call the onload handler
// By default (no onload handler), if there is an error there, no operation will be possible so warn the user by throwing an exception
if ( this . autoload ) {
this . loadDatabase ( options . onload || ( err => {
if ( err ) throw err
} ) )
}
}
/ * *
* Load the database from the datafile , and trigger the execution of buffered commands if any
* /
loadDatabase ( ) {
this . executor . push ( { this : this . persistence , fn : this . persistence . loadDatabase , arguments : arguments } , true )
}
/ * *
* Get an array of all the data in the database
* /
getAllData ( ) {
return this . indexes . _id . getAll ( )
}
/ * *
* Reset all currently defined indexes
* /
resetIndexes ( newData ) {
for ( const index of Object . values ( this . indexes ) ) {
index . reset ( newData )
}
}
/ * *
* Ensure an index is kept for this field . Same parameters as lib / indexes
* For now this function is synchronous , we need to test how much time it takes
* We use an async API for consistency with the rest of the code
* @ param { Object } options
* @ param { String } options . fieldName
* @ param { Boolean } options . unique
* @ param { Boolean } options . sparse
* @ param { Number } options . expireAfterSeconds - Optional , if set this index becomes a TTL index ( only works on Date fields , not arrays of Date )
* @ param { Function } callback Optional callback , signature : err
* /
ensureIndex ( options = { } , callback = ( ) => { } ) {
if ( ! options . fieldName ) {
const err = new Error ( 'Cannot create an index without a fieldName' )
err . missingFieldName = true
return callback ( err )
}
if ( this . indexes [ options . fieldName ] ) return callback ( null )
this . indexes [ options . fieldName ] = new Index ( options )
if ( options . expireAfterSeconds !== undefined ) this . ttlIndexes [ options . fieldName ] = options . expireAfterSeconds // With this implementation index creation is not necessary to ensure TTL but we stick with MongoDB's API here
try {
this . indexes [ options . fieldName ] . insert ( this . getAllData ( ) )
} catch ( e ) {
delete this . indexes [ options . fieldName ]
return callback ( e )
}
// We may want to force all options to be persisted including defaults, not just the ones passed the index creation function
this . persistence . persistNewState ( [ { $$indexCreated : options } ] , err => {
if ( err ) return callback ( err )
return callback ( null )
} )
}
/ * *
* Remove an index
* @ param { String } fieldName
* @ param { Function } callback Optional callback , signature : err
* /
removeIndex ( fieldName , callback = ( ) => { } ) {
delete this . indexes [ fieldName ]
this . persistence . persistNewState ( [ { $$indexRemoved : fieldName } ] , err => {
if ( err ) return callback ( err )
return callback ( null )
} )
}
/ * *
* Add one or several document ( s ) to all indexes
* /
addToIndexes ( doc ) {
let failingIndex
let error
const keys = Object . keys ( this . indexes )
for ( let i = 0 ; i < keys . length ; i += 1 ) {
try {
this . indexes [ keys [ i ] ] . insert ( doc )
} catch ( e ) {
failingIndex = i
error = e
break
}
}
// If an error happened, we need to rollback the insert on all other indexes
if ( error ) {
for ( let i = 0 ; i < failingIndex ; i += 1 ) {
this . indexes [ keys [ i ] ] . remove ( doc )
}
throw error
}
}
/ * *
* Remove one or several document ( s ) from all indexes
* /
removeFromIndexes ( doc ) {
for ( const index of Object . values ( this . indexes ) ) {
index . remove ( doc )
}
}
/ * *
* Update one or several documents in all indexes
* To update multiple documents , oldDoc must be an array of { oldDoc , newDoc } pairs
* If one update violates a constraint , all changes are rolled back
* /
updateIndexes ( oldDoc , newDoc ) {
let failingIndex
let error
const keys = Object . keys ( this . indexes )
for ( let i = 0 ; i < keys . length ; i += 1 ) {
try {
this . indexes [ keys [ i ] ] . update ( oldDoc , newDoc )
} catch ( e ) {
failingIndex = i
error = e
break
}
}
// If an error happened, we need to rollback the update on all other indexes
if ( error ) {
for ( let i = 0 ; i < failingIndex ; i += 1 ) {
this . indexes [ keys [ i ] ] . revertUpdate ( oldDoc , newDoc )
}
throw error
}
}
/ * *
* Return the list of candidates for a given query
* Crude implementation for now , we return the candidates given by the first usable index if any
* We try the following query types , in this order : basic match , $in match , comparison match
* One way to make it better would be to enable the use of multiple indexes if the first usable index
* returns too much data . I may do it in the future .
*
* Returned candidates will be scanned to find and remove all expired documents
*
* @ param { Query } query
* @ param { Boolean } dontExpireStaleDocs Optional , defaults to false , if true don 't remove stale docs. Useful for the remove function which shouldn' t be impacted by expirations
* @ param { Function } callback Signature err , candidates
* /
getCandidates ( query , dontExpireStaleDocs , callback ) {
const indexNames = Object . keys ( this . indexes )
let usableQueryKeys
if ( typeof dontExpireStaleDocs === 'function' ) {
callback = dontExpireStaleDocs
dontExpireStaleDocs = false
}
async . waterfall ( [
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
cb => {
// For a basic match
usableQueryKeys = [ ]
Object . keys ( query ) . forEach ( k => {
if ( typeof query [ k ] === 'string' || typeof query [ k ] === 'number' || typeof query [ k ] === 'boolean' || util . types . isDate ( query [ k ] ) || query [ k ] === null ) {
usableQueryKeys . push ( k )
}
} )
usableQueryKeys = usableQueryKeys . filter ( k => indexNames . includes ( k ) )
if ( usableQueryKeys . length > 0 ) {
return cb ( null , this . indexes [ usableQueryKeys [ 0 ] ] . getMatching ( query [ usableQueryKeys [ 0 ] ] ) )
}
// For a $in match
usableQueryKeys = [ ]
Object . keys ( query ) . forEach ( k => {
if ( query [ k ] && Object . prototype . hasOwnProperty . call ( query [ k ] , '$in' ) ) {
usableQueryKeys . push ( k )
}
} )
usableQueryKeys = usableQueryKeys . filter ( k => indexNames . includes ( k ) )
if ( usableQueryKeys . length > 0 ) {
return cb ( null , this . indexes [ usableQueryKeys [ 0 ] ] . getMatching ( query [ usableQueryKeys [ 0 ] ] . $in ) )
}
// For a comparison match
usableQueryKeys = [ ]
Object . keys ( query ) . forEach ( k => {
if ( query [ k ] && ( Object . prototype . hasOwnProperty . call ( query [ k ] , '$lt' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$lte' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$gt' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$gte' ) ) ) {
usableQueryKeys . push ( k )
}
} )
usableQueryKeys = usableQueryKeys . filter ( k => indexNames . includes ( k ) )
if ( usableQueryKeys . length > 0 ) {
return cb ( null , this . indexes [ usableQueryKeys [ 0 ] ] . getBetweenBounds ( query [ usableQueryKeys [ 0 ] ] ) )
}
// By default, return all the DB data
return cb ( null , this . getAllData ( ) )
} ,
// STEP 2: remove all expired documents
docs => {
if ( dontExpireStaleDocs ) return callback ( null , docs )
const expiredDocsIds = [ ]
const validDocs = [ ]
const ttlIndexesFieldNames = Object . keys ( this . ttlIndexes )
docs . forEach ( doc => {
let valid = true
ttlIndexesFieldNames . forEach ( i => {
if ( doc [ i ] !== undefined && util . types . isDate ( doc [ i ] ) && Date . now ( ) > doc [ i ] . getTime ( ) + this . ttlIndexes [ i ] * 1000 ) {
valid = false
}
} )
if ( valid ) validDocs . push ( doc )
else expiredDocsIds . push ( doc . _id )
} )
async . eachSeries ( expiredDocsIds , ( _id , cb ) => {
this . _remove ( { _id : _id } , { } , err => {
if ( err ) return callback ( err )
return cb ( )
} )
// eslint-disable-next-line node/handle-callback-err
} , err => {
// TODO: handle error
return callback ( null , validDocs )
} )
} ] )
}
/ * *
* Insert a new document
* @ param { Document } newDoc
* @ param { Function } callback Optional callback , signature : err , insertedDoc
*
* @ api private Use Datastore . insert which has the same signature
* /
_insert ( newDoc , callback = ( ) => { } ) {
let preparedDoc
try {
preparedDoc = this . prepareDocumentForInsertion ( newDoc )
this . _insertInCache ( preparedDoc )
} catch ( e ) {
return callback ( e )
}
this . persistence . persistNewState ( Array . isArray ( preparedDoc ) ? preparedDoc : [ preparedDoc ] , err => {
if ( err ) return callback ( err )
return callback ( null , model . deepCopy ( preparedDoc ) )
} )
}
/ * *
* Create a new _id that ' s not already in use
* /
createNewId ( ) {
let attemptId = customUtils . uid ( 16 )
// Try as many times as needed to get an unused _id. As explained in customUtils, the probability of this ever happening is extremely small, so this is O(1)
if ( this . indexes . _id . getMatching ( attemptId ) . length > 0 ) attemptId = this . createNewId ( )
return attemptId
}
/ * *
* Prepare a document ( or array of documents ) to be inserted in a database
* Meaning adds _id and timestamps if necessary on a copy of newDoc to avoid any side effect on user input
* @ api private
* /
prepareDocumentForInsertion ( newDoc ) {
let preparedDoc
if ( Array . isArray ( newDoc ) ) {
preparedDoc = [ ]
newDoc . forEach ( doc => { preparedDoc . push ( this . prepareDocumentForInsertion ( doc ) ) } )
} else {
preparedDoc = model . deepCopy ( newDoc )
if ( preparedDoc . _id === undefined ) preparedDoc . _id = this . createNewId ( )
const now = new Date ( )
if ( this . timestampData && preparedDoc . createdAt === undefined ) preparedDoc . createdAt = now
if ( this . timestampData && preparedDoc . updatedAt === undefined ) preparedDoc . updatedAt = now
model . checkObject ( preparedDoc )
}
return preparedDoc
}
/ * *
* If newDoc is an array of documents , this will insert all documents in the cache
* @ api private
* /
_insertInCache ( preparedDoc ) {
if ( Array . isArray ( preparedDoc ) ) this . _insertMultipleDocsInCache ( preparedDoc )
else this . addToIndexes ( preparedDoc )
}
/ * *
* If one insertion fails ( e . g . because of a unique constraint ) , roll back all previous
* inserts and throws the error
* @ api private
* /
_insertMultipleDocsInCache ( preparedDocs ) {
let failingIndex
let error
for ( let i = 0 ; i < preparedDocs . length ; i += 1 ) {
try {
this . addToIndexes ( preparedDocs [ i ] )
} catch ( e ) {
error = e
failingIndex = i
break
}
}
if ( error ) {
for ( let i = 0 ; i < failingIndex ; i += 1 ) {
this . removeFromIndexes ( preparedDocs [ i ] )
}
throw error
}
}
insert ( ) {
this . executor . push ( { this : this , fn : this . _insert , arguments : arguments } )
}
/ * *
* Count all documents matching the query
* @ param { Object } query MongoDB - style query
* @ param { Function } callback Optional callback , signature : err , count
* /
count ( query , callback ) {
const cursor = new Cursor ( this , query , function ( err , docs , callback ) {
if ( err ) { return callback ( err ) }
return callback ( null , docs . length )
} )
if ( typeof callback === 'function' ) cursor . exec ( callback )
else return cursor
}
/ * *
* Find all documents matching the query
* If no callback is passed , we return the cursor so that user can limit , skip and finally exec
* @ param { Object } query MongoDB - style query
* @ param { Object } projection MongoDB - style projection
* @ param { Function } callback Optional callback , signature : err , docs
* /
find ( query , projection , callback ) {
if ( arguments . length === 1 ) {
projection = { }
// callback is undefined, will return a cursor
} else if ( arguments . length === 2 ) {
if ( typeof projection === 'function' ) {
callback = projection
projection = { }
} // If not assume projection is an object and callback undefined
}
const cursor = new Cursor ( this , query , function ( err , docs , callback ) {
if ( err ) { return callback ( err ) }
const res = docs . map ( doc => model . deepCopy ( doc ) )
return callback ( null , res )
} )
cursor . projection ( projection )
if ( typeof callback === 'function' ) cursor . exec ( callback )
else return cursor
}
/ * *
* Find one document matching the query
* @ param { Object } query MongoDB - style query
* @ param { Object } projection MongoDB - style projection
* @ param { Function } callback Optional callback , signature : err , doc
* /
findOne ( query , projection , callback ) {
if ( arguments . length === 1 ) {
projection = { }
// callback is undefined, will return a cursor
} else if ( arguments . length === 2 ) {
if ( typeof projection === 'function' ) {
callback = projection
projection = { }
} // If not assume projection is an object and callback undefined
}
const cursor = new Cursor ( this , query , ( err , docs , callback ) => {
if ( err ) return callback ( err )
if ( docs . length === 1 ) return callback ( null , model . deepCopy ( docs [ 0 ] ) )
else return callback ( null , null )
} )
cursor . projection ( projection ) . limit ( 1 )
if ( typeof callback === 'function' ) cursor . exec ( callback )
else return cursor
}
/ * *
* Update all docs matching query
* @ param { Object } query
* @ param { Object } updateQuery
* @ param { Object } options Optional options
* options . multi If true , can update multiple documents ( defaults to false )
* options . upsert If true , document is inserted if the query doesn ' t match anything
* options . returnUpdatedDocs Defaults to false , if true return as third argument the array of updated matched documents ( even if no change actually took place )
* @ param { Function } cb Optional callback , signature : ( err , numAffected , affectedDocuments , upsert )
* If update was an upsert , upsert flag is set to true
* affectedDocuments can be one of the following :
* * For an upsert , the upserted document
* * For an update with returnUpdatedDocs option false , null
* * For an update with returnUpdatedDocs true and multi false , the updated document
* * For an update with returnUpdatedDocs true and multi true , the array of updated documents
*
* WARNING : The API was changed between v1 . 7.4 and v1 . 8 , for consistency and readability reasons . Prior and including to v1 . 7.4 ,
* the callback signature was ( err , numAffected , updated ) where updated was the updated document in case of an upsert
* or the array of updated documents for an update if the returnUpdatedDocs option was true . That meant that the type of
* affectedDocuments in a non multi update depended on whether there was an upsert or not , leaving only two ways for the
* user to check whether an upsert had occured : checking the type of affectedDocuments or running another find query on
* the whole dataset to check its size . Both options being ugly , the breaking change was necessary .
*
* @ api private Use Datastore . update which has the same signature
* /
_update ( query , updateQuery , options , cb ) {
if ( typeof options === 'function' ) {
cb = options
options = { }
}
const callback = cb || ( ( ) => { } )
const multi = options . multi !== undefined ? options . multi : false
const upsert = options . upsert !== undefined ? options . upsert : false
async . waterfall ( [
cb => { // If upsert option is set, check whether we need to insert the doc
if ( ! upsert ) return cb ( )
// Need to use an internal function not tied to the executor to avoid deadlock
const cursor = new Cursor ( this , query )
cursor . limit ( 1 ) . _exec ( ( err , docs ) => {
if ( err ) return callback ( err )
if ( docs . length === 1 ) return cb ( )
else {
let toBeInserted
try {
model . checkObject ( updateQuery )
// updateQuery is a simple object with no modifier, use it as the document to insert
toBeInserted = updateQuery
} catch ( e ) {
// updateQuery contains modifiers, use the find query as the base,
// strip it from all operators and update it according to updateQuery
try {
toBeInserted = model . modify ( model . deepCopy ( query , true ) , updateQuery )
} catch ( err ) {
return callback ( err )
}
}
return this . _insert ( toBeInserted , ( err , newDoc ) => {
if ( err ) return callback ( err )
return callback ( null , 1 , newDoc , true )
} )
}
} )
} ,
( ) => { // Perform the update
let numReplaced = 0
let modifiedDoc
const modifications = [ ]
let createdAt
this . getCandidates ( query , ( err , candidates ) => {
if ( err ) return callback ( err )
// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
try {
for ( const candidate of candidates ) {
if ( model . match ( candidate , query ) && ( multi || numReplaced === 0 ) ) {
numReplaced += 1
if ( this . timestampData ) { createdAt = candidate . createdAt }
modifiedDoc = model . modify ( candidate , updateQuery )
if ( this . timestampData ) {
modifiedDoc . createdAt = createdAt
modifiedDoc . updatedAt = new Date ( )
}
modifications . push ( { oldDoc : candidate , newDoc : modifiedDoc } )
}
}
} catch ( err ) {
return callback ( err )
}
// Change the docs in memory
try {
this . updateIndexes ( modifications )
} catch ( err ) {
return callback ( err )
}
// Update the datafile
const updatedDocs = modifications . map ( x => x . newDoc )
this . persistence . persistNewState ( updatedDocs , err => {
if ( err ) return callback ( err )
if ( ! options . returnUpdatedDocs ) {
return callback ( null , numReplaced )
} else {
let updatedDocsDC = [ ]
updatedDocs . forEach ( doc => { updatedDocsDC . push ( model . deepCopy ( doc ) ) } )
if ( ! multi ) updatedDocsDC = updatedDocsDC [ 0 ]
return callback ( null , numReplaced , updatedDocsDC )
}
} )
} )
} ] )
}
update ( ) {
this . executor . push ( { this : this , fn : this . _update , arguments : arguments } )
}
/ * *
* Remove all docs matching the query
* For now very naive implementation ( similar to update )
* @ param { Object } query
* @ param { Object } options Optional options
* options . multi If true , can update multiple documents ( defaults to false )
* @ param { Function } cb Optional callback , signature : err , numRemoved
*
* @ api private Use Datastore . remove which has the same signature
* /
_remove ( query , options , cb ) {
if ( typeof options === 'function' ) {
cb = options
options = { }
}
const callback = cb || ( ( ) => { } )
const multi = options . multi !== undefined ? options . multi : false
this . getCandidates ( query , true , ( err , candidates ) => {
if ( err ) return callback ( err )
const removedDocs = [ ]
let numRemoved = 0
try {
candidates . forEach ( d => {
if ( model . match ( d , query ) && ( multi || numRemoved === 0 ) ) {
numRemoved += 1
removedDocs . push ( { $$deleted : true , _id : d . _id } )
this . removeFromIndexes ( d )
}
} )
} catch ( err ) {
return callback ( err )
}
this . persistence . persistNewState ( removedDocs , err => {
if ( err ) return callback ( err )
return callback ( null , numRemoved )
} )
} )
}
remove ( ) {
this . executor . push ( { this : this , fn : this . _remove , arguments : arguments } )
}
}
module . exports = Datastore