WIP: remove async from persistence module

pull/11/head
Timothée Rebours 3 years ago
parent 64bdd37c4f
commit 4d900da6fe
  1. 120
      lib/persistence.js
  2. 6
      lib/storage.js

@ -5,7 +5,7 @@
* * Persistence.persistNewState(newDocs, callback) where newDocs is an array of documents and callback has signature err * * Persistence.persistNewState(newDocs, callback) where newDocs is an array of documents and callback has signature err
*/ */
const path = require('path') const path = require('path')
const async = require('async') const { callbackify, promisify } = require('util')
const byline = require('./byline') const byline = require('./byline')
const customUtils = require('./customUtils.js') const customUtils = require('./customUtils.js')
const Index = require('./indexes.js') const Index = require('./indexes.js')
@ -73,9 +73,13 @@ class Persistence {
* @param {Function} callback Optional callback, signature: err * @param {Function} callback Optional callback, signature: err
*/ */
persistCachedDatabase (callback = () => {}) { persistCachedDatabase (callback = () => {}) {
return callbackify(this.persistCachedDatabaseAsync.bind(this))(callback)
}
async persistCachedDatabaseAsync () {
const lines = [] const lines = []
if (this.inMemoryOnly) return callback(null) if (this.inMemoryOnly) return
this.db.getAllData().forEach(doc => { this.db.getAllData().forEach(doc => {
lines.push(this.afterSerialization(model.serialize(doc))) lines.push(this.afterSerialization(model.serialize(doc)))
@ -92,11 +96,8 @@ class Persistence {
} }
}) })
storage.crashSafeWriteFileLines(this.filename, lines, err => { await storage.crashSafeWriteFileLinesAsync(this.filename, lines)
if (err) return callback(err) this.db.emit('compaction.done')
this.db.emit('compaction.done')
return callback(null)
})
} }
/** /**
@ -135,18 +136,22 @@ class Persistence {
* @param {Function} callback Optional, signature: err * @param {Function} callback Optional, signature: err
*/ */
persistNewState (newDocs, callback = () => {}) { persistNewState (newDocs, callback = () => {}) {
callbackify(this.persistNewStateAsync.bind(this))(newDocs, err => callback(err))
}
async persistNewStateAsync (newDocs) {
let toPersist = '' let toPersist = ''
// In-memory only datastore // In-memory only datastore
if (this.inMemoryOnly) return callback(null) if (this.inMemoryOnly) return
newDocs.forEach(doc => { newDocs.forEach(doc => {
toPersist += this.afterSerialization(model.serialize(doc)) + '\n' toPersist += this.afterSerialization(model.serialize(doc)) + '\n'
}) })
if (toPersist.length === 0) return callback(null) if (toPersist.length === 0) return
storage.appendFile(this.filename, toPersist, 'utf8', err => callback(err)) await storage.appendFileAsync(this.filename, toPersist, 'utf8')
} }
/** /**
@ -232,6 +237,10 @@ class Persistence {
}) })
} }
async treatRawStreamAsync (rawStream) {
return promisify(this.treatRawStream.bind(this))(rawStream)
}
/** /**
* Load the database * Load the database
* 1) Create all indexes * 1) Create all indexes
@ -243,65 +252,42 @@ class Persistence {
* @param {Function} callback Optional callback, signature: err * @param {Function} callback Optional callback, signature: err
*/ */
loadDatabase (callback = () => {}) { loadDatabase (callback = () => {}) {
callbackify(this.loadDatabaseAsync.bind(this))(err => callback(err))
}
async loadDatabaseAsync () {
this.db.resetIndexes() this.db.resetIndexes()
// In-memory only datastore // In-memory only datastore
if (this.inMemoryOnly) return callback(null) if (this.inMemoryOnly) return
await Persistence.ensureDirectoryExistsAsync(path.dirname(this.filename)) // TODO: maybe ignore error
async.waterfall([ await storage.ensureDatafileIntegrityAsync(this.filename) // TODO: maybe ignore error
cb => {
// eslint-disable-next-line node/handle-callback-err let treatedData
Persistence.ensureDirectoryExists(path.dirname(this.filename), err => { if (storage.readFileStream) {
// TODO: handle error // Server side
// eslint-disable-next-line node/handle-callback-err const fileStream = storage.readFileStream(this.filename, { encoding: 'utf8' })
storage.ensureDatafileIntegrity(this.filename, err => { treatedData = await this.treatRawStreamAsync(fileStream)
// TODO: handle error } else {
const treatedDataCallback = (err, treatedData) => { // Browser
if (err) return cb(err) const rawData = await storage.readFileAsync(this.filename, 'utf8')
treatedData = this.treatRawData(rawData)
// Recreate all indexes in the datafile }
Object.keys(treatedData.indexes).forEach(key => { // Recreate all indexes in the datafile
this.db.indexes[key] = new Index(treatedData.indexes[key]) Object.keys(treatedData.indexes).forEach(key => {
}) this.db.indexes[key] = new Index(treatedData.indexes[key])
// Fill cached database (i.e. all indexes) with data
try {
this.db.resetIndexes(treatedData.data)
} catch (e) {
this.db.resetIndexes() // Rollback any index which didn't fail
return cb(e)
}
this.db.persistence.persistCachedDatabase(cb)
}
if (storage.readFileStream) {
// Server side
const fileStream = storage.readFileStream(this.filename, { encoding: 'utf8' })
this.treatRawStream(fileStream, treatedDataCallback)
return
}
// Browser
storage.readFile(this.filename, 'utf8', (err, rawData) => {
if (err) return cb(err)
try {
const treatedData = this.treatRawData(rawData)
treatedDataCallback(null, treatedData)
} catch (e) {
return cb(e)
}
})
})
})
}
], err => {
if (err) return callback(err)
this.db.executor.processBuffer()
return callback(null)
}) })
// Fill cached database (i.e. all indexes) with data
try {
this.db.resetIndexes(treatedData.data)
} catch (e) {
this.db.resetIndexes() // Rollback any index which didn't fail
throw e
}
await this.db.persistence.persistCachedDatabaseAsync()
this.db.executor.processBuffer()
} }
/** /**
@ -312,6 +298,10 @@ class Persistence {
storage.mkdir(dir, { recursive: true }, err => { callback(err) }) storage.mkdir(dir, { recursive: true }, err => { callback(err) })
} }
static async ensureDirectoryExistsAsync (dir) {
await storage.mkdirAsync(dir, { recursive: true })
}
/** /**
* Return the path the datafile if the given filename is relative to the directory where Node Webkit stores * Return the path the datafile if the given filename is relative to the directory where Node Webkit stores
* data for this application. Probably the best place to store data * data for this application. Probably the best place to store data

@ -20,12 +20,16 @@ storage.rename = fs.rename
storage.renameAsync = fsPromises.rename storage.renameAsync = fsPromises.rename
storage.writeFile = fs.writeFile storage.writeFile = fs.writeFile
storage.writeFileAsync = fsPromises.writeFile storage.writeFileAsync = fsPromises.writeFile
storage.writeFileStream = fs.createWriteStream
storage.unlink = fs.unlink storage.unlink = fs.unlink
storage.unlinkAsync = fsPromises.unlink storage.unlinkAsync = fsPromises.unlink
storage.appendFile = fs.appendFile storage.appendFile = fs.appendFile
storage.appendFileAsync = fsPromises.appendFile
storage.readFile = fs.readFile storage.readFile = fs.readFile
storage.readFileAsync = fsPromises.readFile
storage.readFileStream = fs.createReadStream storage.readFileStream = fs.createReadStream
storage.mkdir = fs.mkdir storage.mkdir = fs.mkdir
storage.mkdirAsync = fsPromises.mkdir
/** /**
* Explicit name ... * Explicit name ...
@ -100,7 +104,7 @@ storage.flushToStorageAsync = async (options) => {
*/ */
storage.writeFileLines = (filename, lines, callback = () => {}) => { storage.writeFileLines = (filename, lines, callback = () => {}) => {
try { try {
const stream = fs.createWriteStream(filename) const stream = storage.writeFileStream(filename)
const readable = Readable.from(lines) const readable = Readable.from(lines)
readable.on('data', (line) => { readable.on('data', (line) => {
try { try {

Loading…
Cancel
Save