WIP instead of having one function that can handle both async and callback signatures, let's implement a new Async suite of functions, and use callbackify to shim the old interface

pull/11/head
Timothée Rebours 3 years ago
parent 5a4859c3ba
commit f9e4c525e4
  1. 219
      lib/datastore.js

@ -1,4 +1,5 @@
const { EventEmitter } = require('events')
const { callbackify } = require('util')
const async = require('async')
const Cursor = require('./cursor.js')
const customUtils = require('./customUtils.js')
@ -267,26 +268,61 @@ class Datastore extends EventEmitter {
* @param {Boolean} dontExpireStaleDocs Optional, defaults to false, if true don't remove stale docs. Useful for the remove function which shouldn't be impacted by expirations
* @param {Function} callback Signature err, candidates
*/
async getCandidates (query, dontExpireStaleDocs, callback) {
const validDocs = []
getCandidates (query, dontExpireStaleDocs, callback) {
if (typeof dontExpireStaleDocs === 'function') {
callback = dontExpireStaleDocs
dontExpireStaleDocs = false
}
try {
callbackify(this.getCandidatesAsync.bind(this))(query, dontExpireStaleDocs, callback)
}
async getCandidatesAsync (query, dontExpireStaleDocs = false) {
const indexNames = Object.keys(this.indexes)
const validDocs = []
const _getCandidates = query => {
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
const docs = this._getCandidates(query)
// STEP 2: remove all expired documents
// For a basic match
let usableQuery
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean' || isDate(v) || v === null) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1])
// For a $in match
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(query[k] && Object.prototype.hasOwnProperty.call(query[k], '$in')) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getMatching(usableQuery[1].$in)
// For a comparison match
usableQuery = Object.entries(query)
.filter(([k, v]) =>
!!(query[k] && (Object.prototype.hasOwnProperty.call(query[k], '$lt') || Object.prototype.hasOwnProperty.call(query[k], '$lte') || Object.prototype.hasOwnProperty.call(query[k], '$gt') || Object.prototype.hasOwnProperty.call(query[k], '$gte'))) &&
indexNames.includes(k)
)
.pop()
if (usableQuery) return this.indexes[usableQuery[0]].getBetweenBounds(usableQuery[1])
// By default, return all the DB data
return this.getAllData()
}
// STEP 1: get candidates list by checking indexes from most to least frequent usecase
const docs = _getCandidates(query)
// STEP 2: remove all expired documents
if (!dontExpireStaleDocs) {
const expiredDocsIds = []
const ttlIndexesFieldNames = Object.keys(this.ttlIndexes)
const expiredDocsIds = []
const ttlIndexesFieldNames = Object.keys(this.ttlIndexes)
docs.forEach(doc => {
docs.forEach(doc => {
if (ttlIndexesFieldNames.every(i => !(doc[i] !== undefined && isDate(doc[i]) && Date.now() > doc[i].getTime() + this.ttlIndexes[i] * 1000))) validDocs.push(doc)
else expiredDocsIds.push(doc._id)
})
else expiredDocsIds.push(doc._id)
})
for (const _id of expiredDocsIds) {
await new Promise((resolve, reject) => {
this._remove({ _id: _id }, {}, err => {
@ -294,14 +330,9 @@ class Datastore extends EventEmitter {
return resolve()
})
})
}
} else validDocs.push(...docs)
} catch (error) {
if (typeof callback === 'function') callback(error, null)
else throw error
}
if (typeof callback === 'function') callback(null, validDocs)
else return validDocs
}
} else validDocs.push(...docs)
return validDocs
}
/**
@ -509,92 +540,90 @@ class Datastore extends EventEmitter {
options = {}
}
const callback = cb || (() => {})
const _callback = (err, res = {}) => {
callback(err, res.numAffected, res.affectedDocuments, res.upsert)
}
callbackify(this._updateAsync.bind(this))(query, updateQuery, options, _callback)
}
async _updateAsync (query, updateQuery, options = {}) {
const multi = options.multi !== undefined ? options.multi : false
const upsert = options.upsert !== undefined ? options.upsert : false
async.waterfall([
cb => { // If upsert option is set, check whether we need to insert the doc
if (!upsert) return cb()
// If upsert option is set, check whether we need to insert the doc
if (upsert) {
// Need to use an internal function not tied to the executor to avoid deadlock
const cursor = new Cursor(this, query)
// Need to use an internal function not tied to the executor to avoid deadlock
const cursor = new Cursor(this, query)
const docs = await new Promise((resolve, reject) => {
cursor.limit(1)._exec((err, docs) => {
if (err) return callback(err)
if (docs.length === 1) return cb()
else {
let toBeInserted
try {
model.checkObject(updateQuery)
// updateQuery is a simple object with no modifier, use it as the document to insert
toBeInserted = updateQuery
} catch (e) {
// updateQuery contains modifiers, use the find query as the base,
// strip it from all operators and update it according to updateQuery
try {
toBeInserted = model.modify(model.deepCopy(query, true), updateQuery)
} catch (err) {
return callback(err)
}
}
return this._insert(toBeInserted, (err, newDoc) => {
if (err) return callback(err)
return callback(null, 1, newDoc, true)
})
}
if (err) reject(err)
else resolve(docs)
})
},
() => { // Perform the update
let numReplaced = 0
let modifiedDoc
const modifications = []
let createdAt
this.getCandidates(query, (err, candidates) => {
if (err) return callback(err)
// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
try {
for (const candidate of candidates) {
if (model.match(candidate, query) && (multi || numReplaced === 0)) {
numReplaced += 1
if (this.timestampData) { createdAt = candidate.createdAt }
modifiedDoc = model.modify(candidate, updateQuery)
if (this.timestampData) {
modifiedDoc.createdAt = createdAt
modifiedDoc.updatedAt = new Date()
}
modifications.push({ oldDoc: candidate, newDoc: modifiedDoc })
}
}
} catch (err) {
return callback(err)
}
})
// Change the docs in memory
try {
this.updateIndexes(modifications)
} catch (err) {
return callback(err)
}
if (docs.length !== 1) {
let toBeInserted
try {
model.checkObject(updateQuery)
// updateQuery is a simple object with no modifier, use it as the document to insert
toBeInserted = updateQuery
} catch (e) {
// updateQuery contains modifiers, use the find query as the base,
// strip it from all operators and update it according to updateQuery
toBeInserted = model.modify(model.deepCopy(query, true), updateQuery)
}
// Update the datafile
const updatedDocs = modifications.map(x => x.newDoc)
this.persistence.persistNewState(updatedDocs, err => {
if (err) return callback(err)
if (!options.returnUpdatedDocs) {
return callback(null, numReplaced)
} else {
let updatedDocsDC = []
updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) })
if (!multi) updatedDocsDC = updatedDocsDC[0]
return callback(null, numReplaced, updatedDocsDC)
}
return new Promise((resolve, reject) => {
this._insert(toBeInserted, (err, newDoc) => {
if (err) return reject(err)
return resolve({ numAffected: 1, affectedDocuments: newDoc, upsert: true })
})
})
}])
}
}
// Perform the update
let numReplaced = 0
let modifiedDoc
const modifications = []
let createdAt
const candidates = await this.getCandidatesAsync(query)
// Preparing update (if an error is thrown here neither the datafile nor
// the in-memory indexes are affected)
for (const candidate of candidates) {
if (model.match(candidate, query) && (multi || numReplaced === 0)) {
numReplaced += 1
if (this.timestampData) { createdAt = candidate.createdAt }
modifiedDoc = model.modify(candidate, updateQuery)
if (this.timestampData) {
modifiedDoc.createdAt = createdAt
modifiedDoc.updatedAt = new Date()
}
modifications.push({ oldDoc: candidate, newDoc: modifiedDoc })
}
}
// Change the docs in memory
this.updateIndexes(modifications)
// Update the datafile
const updatedDocs = modifications.map(x => x.newDoc)
await new Promise((resolve, reject) => {
this.persistence.persistNewState(updatedDocs, err => {
if (err) return reject(err)
else resolve()
})
})
if (!options.returnUpdatedDocs) return { numAffected: numReplaced }
else {
let updatedDocsDC = []
updatedDocs.forEach(doc => { updatedDocsDC.push(model.deepCopy(doc)) })
if (!multi) updatedDocsDC = updatedDocsDC[0]
return { numAffected: numReplaced, affectedDocuments: updatedDocsDC }
}
}
update () {

Loading…
Cancel
Save