const { EventEmitter } = require ( 'events' )
const { callbackify } = require ( 'util' )
const Cursor = require ( './cursor.js' )
const customUtils = require ( './customUtils.js' )
const Executor = require ( './executor.js' )
const Index = require ( './indexes.js' )
const model = require ( './model.js' )
const Persistence = require ( './persistence.js' )
const { isDate } = require ( './utils.js' )
class Datastore extends EventEmitter {
/ * *
* Create a new collection
* @ param { String } [ options . filename ] Optional , datastore will be in - memory only if not provided
* @ param { Boolean } [ options . timestampData ] Optional , defaults to false . If set to true , createdAt and updatedAt will be created and populated automatically ( if not specified by user )
* @ param { Boolean } [ options . inMemoryOnly ] Optional , defaults to false
* @ param { String } [ options . nodeWebkitAppName ] Optional , specify the name of your NW app if you want options . filename to be relative to the directory where Node Webkit stores application data such as cookies and local storage ( the best place to store data in my opinion )
* @ param { Boolean } [ options . autoload ] Optional , defaults to false
* @ param { Function } [ options . onload ] Optional , if autoload is used this will be called after the load database with the error object as parameter . If you don ' t pass it the error will be thrown
* @ param { Function } [ options . beforeDeserialization ] Optional , serialization hooks
* @ param { Function } [ options . afterSerialization ] Optional , serialization hooks
* @ param { Number } [ options . corruptAlertThreshold ] Optional , threshold after which an alert is thrown if too much data is corrupt
* @ param { Function } [ options . compareStrings ] Optional , string comparison function that overrides default for sorting
*
* Event Emitter - Events
* * compaction . done - Fired whenever a compaction operation was finished
* /
constructor ( options ) {
super ( )
let filename
// Retrocompatibility with v0.6 and before
if ( typeof options === 'string' ) {
filename = options
this . inMemoryOnly = false // Default
} else {
options = options || { }
filename = options . filename
this . inMemoryOnly = options . inMemoryOnly || false
this . autoload = options . autoload || false
this . timestampData = options . timestampData || false
}
// Determine whether in memory or persistent
if ( ! filename || typeof filename !== 'string' || filename . length === 0 ) {
this . filename = null
this . inMemoryOnly = true
} else {
this . filename = filename
}
// String comparison function
this . compareStrings = options . compareStrings
// Persistence handling
this . persistence = new Persistence ( {
db : this ,
nodeWebkitAppName : options . nodeWebkitAppName ,
afterSerialization : options . afterSerialization ,
beforeDeserialization : options . beforeDeserialization ,
corruptAlertThreshold : options . corruptAlertThreshold
} )
// This new executor is ready if we don't use persistence
// If we do, it will only be ready once loadDatabase is called
this . executor = new Executor ( )
if ( this . inMemoryOnly ) this . executor . ready = true
// Indexed by field name, dot notation can be used
// _id is always indexed and since _ids are generated randomly the underlying
// binary is always well-balanced
this . indexes = { }
this . indexes . _id = new Index ( { fieldName : '_id' , unique : true } )
this . ttlIndexes = { }
// Queue a load of the database right away and call the onload handler
// By default (no onload handler), if there is an error there, no operation will be possible so warn the user by throwing an exception
if ( this . autoload ) {
this . loadDatabase ( options . onload || ( err => {
if ( err ) throw err
} ) )
}
}
/ * *
* Load the database from the datafile , and trigger the execution of buffered commands if any
* /
loadDatabase ( ) {
this . executor . push ( { this : this . persistence , fn : this . persistence . loadDatabase , arguments : arguments } , true )
}
loadDatabaseAsync ( ) {
return this . executor . pushAsync ( {
this : this . persistence ,
fn : this . persistence . loadDatabaseAsync ,
arguments : arguments
} , true )
}
/ * *
* Get an array of all the data in the database
* /
getAllData ( ) {
return this . indexes . _id . getAll ( )
}
/ * *
* Reset all currently defined indexes
* /
resetIndexes ( newData ) {
for ( const index of Object . values ( this . indexes ) ) {
index . reset ( newData )
}
}
/ * *
* Ensure an index is kept for this field . Same parameters as lib / indexes
* For now this function is synchronous , we need to test how much time it takes
* We use an async API for consistency with the rest of the code
* @ param { Object } options
* @ param { String } options . fieldName
* @ param { Boolean } [ options . unique ]
* @ param { Boolean } [ options . sparse ]
* @ param { Number } [ options . expireAfterSeconds ] - Optional , if set this index becomes a TTL index ( only works on Date fields , not arrays of Date )
* @ param { Function } callback Optional callback , signature : err
* /
ensureIndex ( options = { } , callback = ( ) => { } ) {
if ( ! options . fieldName ) {
const err = new Error ( 'Cannot create an index without a fieldName' )
err . missingFieldName = true
return callback ( err )
}
if ( this . indexes [ options . fieldName ] ) return callback ( null )
this . indexes [ options . fieldName ] = new Index ( options )
if ( options . expireAfterSeconds !== undefined ) this . ttlIndexes [ options . fieldName ] = options . expireAfterSeconds // With this implementation index creation is not necessary to ensure TTL but we stick with MongoDB's API here
try {
this . indexes [ options . fieldName ] . insert ( this . getAllData ( ) )
} catch ( e ) {
delete this . indexes [ options . fieldName ]
return callback ( e )
}
// We may want to force all options to be persisted including defaults, not just the ones passed the index creation function
this . persistence . persistNewState ( [ { $$indexCreated : options } ] , err => {
if ( err ) return callback ( err )
return callback ( null )
} )
}
/ * *
* Remove an index
* @ param { String } fieldName
* @ param { Function } callback Optional callback , signature : err
* /
// TODO: contrary to what is said in the JSDoc, this function should probably be called through the executor, it persists a new state
removeIndex ( fieldName , callback = ( ) => { } ) {
callbackify ( this . removeIndexAsync . bind ( this ) ) ( fieldName , callback )
}
async removeIndexAsync ( fieldName ) {
delete this . indexes [ fieldName ]
await this . persistence . persistNewStateAsync ( [ { $$indexRemoved : fieldName } ] )
}
/ * *
* Add one or several document ( s ) to all indexes
* /
addToIndexes ( doc ) {
let failingIndex
let error
const keys = Object . keys ( this . indexes )
for ( let i = 0 ; i < keys . length ; i += 1 ) {
try {
this . indexes [ keys [ i ] ] . insert ( doc )
} catch ( e ) {
failingIndex = i
error = e
break
}
}
// If an error happened, we need to rollback the insert on all other indexes
if ( error ) {
for ( let i = 0 ; i < failingIndex ; i += 1 ) {
this . indexes [ keys [ i ] ] . remove ( doc )
}
throw error
}
}
/ * *
* Remove one or several document ( s ) from all indexes
* /
removeFromIndexes ( doc ) {
for ( const index of Object . values ( this . indexes ) ) {
index . remove ( doc )
}
}
/ * *
* Update one or several documents in all indexes
* To update multiple documents , oldDoc must be an array of { oldDoc , newDoc } pairs
* If one update violates a constraint , all changes are rolled back
* /
updateIndexes ( oldDoc , newDoc ) {
let failingIndex
let error
const keys = Object . keys ( this . indexes )
for ( let i = 0 ; i < keys . length ; i += 1 ) {
try {
this . indexes [ keys [ i ] ] . update ( oldDoc , newDoc )
} catch ( e ) {
failingIndex = i
error = e
break
}
}
// If an error happened, we need to rollback the update on all other indexes
if ( error ) {
for ( let i = 0 ; i < failingIndex ; i += 1 ) {
this . indexes [ keys [ i ] ] . revertUpdate ( oldDoc , newDoc )
}
throw error
}
}
_getCandidates ( query ) {
const indexNames = Object . keys ( this . indexes )
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
// For a basic match
let usableQuery
usableQuery = Object . entries ( query )
. filter ( ( [ k , v ] ) =>
! ! ( typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate ( v ) || v === null ) &&
indexNames . includes ( k )
)
. pop ( )
if ( usableQuery ) return this . indexes [ usableQuery [ 0 ] ] . getMatching ( usableQuery [ 1 ] )
// For a $in match
usableQuery = Object . entries ( query )
. filter ( ( [ k , v ] ) =>
! ! ( query [ k ] && Object . prototype . hasOwnProperty . call ( query [ k ] , '$in' ) ) &&
indexNames . includes ( k )
)
. pop ( )
if ( usableQuery ) return this . indexes [ usableQuery [ 0 ] ] . getMatching ( usableQuery [ 1 ] . $in )
// For a comparison match
usableQuery = Object . entries ( query )
. filter ( ( [ k , v ] ) =>
! ! ( query [ k ] && ( Object . prototype . hasOwnProperty . call ( query [ k ] , '$lt' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$lte' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$gt' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$gte' ) ) ) &&
indexNames . includes ( k )
)
. pop ( )
if ( usableQuery ) return this . indexes [ usableQuery [ 0 ] ] . getBetweenBounds ( usableQuery [ 1 ] )
// By default, return all the DB data
return this . getAllData ( )
}
/ * *
* Return the list of candidates for a given query
* Crude implementation for now , we return the candidates given by the first usable index if any
* We try the following query types , in this order : basic match , $in match , comparison match
* One way to make it better would be to enable the use of multiple indexes if the first usable index
* returns too much data . I may do it in the future .
*
* Returned candidates will be scanned to find and remove all expired documents
*
* @ param { Query } query
* @ param { Boolean } dontExpireStaleDocs Optional , defaults to false , if true don 't remove stale docs. Useful for the remove function which shouldn' t be impacted by expirations
* @ param { Function } callback Signature err , candidates
* /
getCandidates ( query , dontExpireStaleDocs , callback ) {
if ( typeof dontExpireStaleDocs === 'function' ) {
callback = dontExpireStaleDocs
dontExpireStaleDocs = false
}
callbackify ( this . getCandidatesAsync . bind ( this ) ) ( query , dontExpireStaleDocs , callback )
}
async getCandidatesAsync ( query , dontExpireStaleDocs = false ) {
const indexNames = Object . keys ( this . indexes )
const validDocs = [ ]
const _getCandidates = query => {
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
// For a basic match
let usableQuery
usableQuery = Object . entries ( query )
. filter ( ( [ k , v ] ) =>
! ! ( typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate ( v ) || v === null ) &&
indexNames . includes ( k )
)
. pop ( )
if ( usableQuery ) return this . indexes [ usableQuery [ 0 ] ] . getMatching ( usableQuery [ 1 ] )
// For a $in match
usableQuery = Object . entries ( query )
. filter ( ( [ k , v ] ) =>
! ! ( query [ k ] && Object . prototype . hasOwnProperty . call ( query [ k ] , '$in' ) ) &&
indexNames . includes ( k )
)
. pop ( )
if ( usableQuery ) return this . indexes [ usableQuery [ 0 ] ] . getMatching ( usableQuery [ 1 ] . $in )
// For a comparison match
usableQuery = Object . entries ( query )
. filter ( ( [ k , v ] ) =>
! ! ( query [ k ] && ( Object . prototype . hasOwnProperty . call ( query [ k ] , '$lt' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$lte' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$gt' ) || Object . prototype . hasOwnProperty . call ( query [ k ] , '$gte' ) ) ) &&
indexNames . includes ( k )
)
. pop ( )
if ( usableQuery ) return this . indexes [ usableQuery [ 0 ] ] . getBetweenBounds ( usableQuery [ 1 ] )
// By default, return all the DB data
return this . getAllData ( )
}
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
const docs = _getCandidates ( query )
// STEP 2: remove all expired documents
if ( ! dontExpireStaleDocs ) {
const expiredDocsIds = [ ]
const ttlIndexesFieldNames = Object . keys ( this . ttlIndexes )
docs . forEach ( doc => {
if ( ttlIndexesFieldNames . every ( i => ! ( doc [ i ] !== undefined && isDate ( doc [ i ] ) && Date . now ( ) > doc [ i ] . getTime ( ) + this . ttlIndexes [ i ] * 1000 ) ) ) validDocs . push ( doc )
else expiredDocsIds . push ( doc . _id )
} )
for ( const _id of expiredDocsIds ) {
await new Promise ( ( resolve , reject ) => {
this . _remove ( { _id : _id } , { } , err => {
if ( err ) return reject ( err )
return resolve ( )
} )
} )
}
} else validDocs . push ( ... docs )
return validDocs
}
/ * *
* Insert a new document
* Private Use Datastore . insert which has the same signature
* @ param { Document } newDoc
* @ param { Function } callback Optional callback , signature : err , insertedDoc
*
* @ private
* /
_insert ( newDoc , callback = ( ) => { } ) {
return callbackify ( this . _insertAsync . bind ( this ) ) ( newDoc , callback )
}
async _insertAsync ( newDoc ) {
const preparedDoc = this . _prepareDocumentForInsertion ( newDoc )
this . _insertInCache ( preparedDoc )
await this . persistence . persistNewStateAsync ( Array . isArray ( preparedDoc ) ? preparedDoc : [ preparedDoc ] )
return model . deepCopy ( preparedDoc )
}
/ * *
* Create a new _id that ' s not already in use
* @ private
* /
_createNewId ( ) {
let attemptId = customUtils . uid ( 16 )
// Try as many times as needed to get an unused _id. As explained in customUtils, the probability of this ever happening is extremely small, so this is O(1)
if ( this . indexes . _id . getMatching ( attemptId ) . length > 0 ) attemptId = this . _createNewId ( )
return attemptId
}
/ * *
* Prepare a document ( or array of documents ) to be inserted in a database
* Meaning adds _id and timestamps if necessary on a copy of newDoc to avoid any side effect on user input
* @ private
* /
_prepareDocumentForInsertion ( newDoc ) {
let preparedDoc
if ( Array . isArray ( newDoc ) ) {
preparedDoc = [ ]
newDoc . forEach ( doc => { preparedDoc . push ( this . _prepareDocumentForInsertion ( doc ) ) } )
} else {
preparedDoc = model . deepCopy ( newDoc )
if ( preparedDoc . _id === undefined ) preparedDoc . _id = this . _createNewId ( )
const now = new Date ( )
if ( this . timestampData && preparedDoc . createdAt === undefined ) preparedDoc . createdAt = now
if ( this . timestampData && preparedDoc . updatedAt === undefined ) preparedDoc . updatedAt = now
model . checkObject ( preparedDoc )
}
return preparedDoc
}
/ * *
* If newDoc is an array of documents , this will insert all documents in the cache
* @ private
* /
_insertInCache ( preparedDoc ) {
if ( Array . isArray ( preparedDoc ) ) this . _insertMultipleDocsInCache ( preparedDoc )
else this . addToIndexes ( preparedDoc )
}
/ * *
* If one insertion fails ( e . g . because of a unique constraint ) , roll back all previous
* inserts and throws the error
* @ private
* /
_insertMultipleDocsInCache ( preparedDocs ) {
let failingIndex
let error
for ( let i = 0 ; i < preparedDocs . length ; i += 1 ) {
try {
this . addToIndexes ( preparedDocs [ i ] )
} catch ( e ) {
error = e
failingIndex = i
break
}
}
if ( error ) {
for ( let i = 0 ; i < failingIndex ; i += 1 ) {
this . removeFromIndexes ( preparedDocs [ i ] )
}
throw error
}
}
insert ( ) {
this . executor . push ( { this : this , fn : this . _insert , arguments : arguments } )
}
insertAsync ( ) {
return this . executor . push ( { this : this , fn : this . _insertAsync , arguments : arguments , async : true } )
}
/ * *
* Count all documents matching the query
* @ param { Query } query MongoDB - style query
* @ param { Function } callback Optional callback , signature : err , count
* /
count ( query , callback ) {
const cursor = new Cursor ( this , query , function ( err , docs , callback ) {
if ( err ) { return callback ( err ) }
return callback ( null , docs . length )
} )
if ( typeof callback === 'function' ) cursor . exec ( callback )
else return cursor
}
/ * *
* Find all documents matching the query
* If no callback is passed , we return the cursor so that user can limit , skip and finally exec
* @ param { Object } query MongoDB - style query
* @ param { Object } projection MongoDB - style projection
* @ param { Function } callback Optional callback , signature : err , docs
* /
find ( query , projection , callback ) {
if ( arguments . length === 1 ) {
projection = { }
// callback is undefined, will return a cursor
} else if ( arguments . length === 2 ) {
if ( typeof projection === 'function' ) {
callback = projection
projection = { }
} // If not assume projection is an object and callback undefined
}
const cursor = new Cursor ( this , query , function ( err , docs , callback ) {
if ( err ) { return callback ( err ) }
const res = docs . map ( doc => model . deepCopy ( doc ) )
return callback ( null , res )
} )
cursor . projection ( projection )
if ( typeof callback === 'function' ) cursor . exec ( callback )
else return cursor
}
/ * *
* Find one document matching the query
* @ param { Object } query MongoDB - style query
* @ param { Object } projection MongoDB - style projection
* @ param { Function } callback Optional callback , signature : err , doc
* /
findOne ( query , projection , callback ) {
if ( arguments . length === 1 ) {
projection = { }
// callback is undefined, will return a cursor
} else if ( arguments . length === 2 ) {
if ( typeof projection === 'function' ) {
callback = projection
projection = { }
} // If not assume projection is an object and callback undefined
}
const cursor = new Cursor ( this , query , ( err , docs , callback ) => {
if ( err ) return callback ( err )
if ( docs . length === 1 ) return callback ( null , model . deepCopy ( docs [ 0 ] ) )
else return callback ( null , null )
} )
cursor . projection ( projection ) . limit ( 1 )
if ( typeof callback === 'function' ) cursor . exec ( callback )
else return cursor
}
/ * *
* Update all docs matching query .
* Use Datastore . update which has the same signature
* @ param { Object } query
* @ param { Object } updateQuery
* @ param { Object } options Optional options
* options . multi If true , can update multiple documents ( defaults to false )
* options . upsert If true , document is inserted if the query doesn ' t match anything
* options . returnUpdatedDocs Defaults to false , if true return as third argument the array of updated matched documents ( even if no change actually took place )
* @ param { Function } cb Optional callback , signature : ( err , numAffected , affectedDocuments , upsert )
* If update was an upsert , upsert flag is set to true
* affectedDocuments can be one of the following :
* * For an upsert , the upserted document
* * For an update with returnUpdatedDocs option false , null
* * For an update with returnUpdatedDocs true and multi false , the updated document
* * For an update with returnUpdatedDocs true and multi true , the array of updated documents
*
* WARNING : The API was changed between v1 . 7.4 and v1 . 8 , for consistency and readability reasons . Prior and including to v1 . 7.4 ,
* the callback signature was ( err , numAffected , updated ) where updated was the updated document in case of an upsert
* or the array of updated documents for an update if the returnUpdatedDocs option was true . That meant that the type of
* affectedDocuments in a non multi update depended on whether there was an upsert or not , leaving only two ways for the
* user to check whether an upsert had occured : checking the type of affectedDocuments or running another find query on
* the whole dataset to check its size . Both options being ugly , the breaking change was necessary .
*
* @ private
* /
_update ( query , updateQuery , options , cb ) {
if ( typeof options === 'function' ) {
cb = options
options = { }
}
const callback = cb || ( ( ) => { } )
const _callback = ( err , res = { } ) => {
callback ( err , res . numAffected , res . affectedDocuments , res . upsert )
}
callbackify ( this . _updateAsync . bind ( this ) ) ( query , updateQuery , options , _callback )
}
async _updateAsync ( query , updateQuery , options = { } ) {
const multi = options . multi !== undefined ? options . multi : false
const upsert = options . upsert !== undefined ? options . upsert : false
// If upsert option is set, check whether we need to insert the doc
if ( upsert ) {
// Need to use an internal function not tied to the executor to avoid deadlock
const cursor = new Cursor ( this , query )
const docs = await new Promise ( ( resolve , reject ) => {
cursor . limit ( 1 ) . _exec ( ( err , docs ) => {
if ( err ) reject ( err )
else resolve ( docs )
} )
} )
if ( docs . length !== 1 ) {
let toBeInserted
try {
model . checkObject ( updateQuery )
// updateQuery is a simple object with no modifier, use it as the document to insert
toBeInserted = updateQuery
} catch ( e ) {
// updateQuery contains modifiers, use the find query as the base,
// strip it from all operators and update it according to updateQuery
toBeInserted = model . modify ( model . deepCopy ( query , true ) , updateQuery )
}
return new Promise ( ( resolve , reject ) => {
this . _insert ( toBeInserted , ( err , newDoc ) => {
if ( err ) return reject ( err )
return resolve ( { numAffected : 1 , affectedDocuments : newDoc , upsert : true } )
} )
} )
}
}
// Perform the update
let numReplaced = 0
let modifiedDoc
const modifications = [ ]
let createdAt
const candidates = await this . getCandidatesAsync ( query )
// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
for ( const candidate of candidates ) {
if ( model . match ( candidate , query ) && ( multi || numReplaced === 0 ) ) {
numReplaced += 1
if ( this . timestampData ) { createdAt = candidate . createdAt }
modifiedDoc = model . modify ( candidate , updateQuery )
if ( this . timestampData ) {
modifiedDoc . createdAt = createdAt
modifiedDoc . updatedAt = new Date ( )
}
modifications . push ( { oldDoc : candidate , newDoc : modifiedDoc } )
}
}
// Change the docs in memory
this . updateIndexes ( modifications )
// Update the datafile
const updatedDocs = modifications . map ( x => x . newDoc )
await this . persistence . persistNewStateAsync ( updatedDocs )
if ( ! options . returnUpdatedDocs ) return { numAffected : numReplaced }
else {
let updatedDocsDC = [ ]
updatedDocs . forEach ( doc => { updatedDocsDC . push ( model . deepCopy ( doc ) ) } )
if ( ! multi ) updatedDocsDC = updatedDocsDC [ 0 ]
return { numAffected : numReplaced , affectedDocuments : updatedDocsDC }
}
}
update ( ) {
this . executor . push ( { this : this , fn : this . _update , arguments : arguments } )
}
updateAsync ( ) {
return this . executor . pushAsync ( { this : this , fn : this . _updateAsync , arguments : arguments } )
}
/ * *
* Remove all docs matching the query .
* Use Datastore . remove which has the same signature
* For now very naive implementation ( similar to update )
* @ param { Object } query
* @ param { Object } options Optional options
* options . multi If true , can update multiple documents ( defaults to false )
* @ param { Function } cb Optional callback , signature : err , numRemoved
*
* @ private
* /
_remove ( query , options , cb ) {
if ( typeof options === 'function' ) {
cb = options
options = { }
}
const callback = cb || ( ( ) => { } )
callbackify ( this . _removeAsync . bind ( this ) ) ( query , options , callback )
}
async _removeAsync ( query , options = { } ) {
const multi = options . multi !== undefined ? options . multi : false
const candidates = await this . getCandidatesAsync ( query , true )
const removedDocs = [ ]
let numRemoved = 0
candidates . forEach ( d => {
if ( model . match ( d , query ) && ( multi || numRemoved === 0 ) ) {
numRemoved += 1
removedDocs . push ( { $$deleted : true , _id : d . _id } )
this . removeFromIndexes ( d )
}
} )
await this . persistence . persistNewStateAsync ( removedDocs )
return numRemoved
}
remove ( ) {
this . executor . push ( { this : this , fn : this . _remove , arguments : arguments } )
}
removeAsync ( ) {
return this . executor . pushAsync ( { this : this , fn : this . _removeAsync , arguments : arguments } )
}
}
module . exports = Datastore