Stream files line by line when parsing to avoid memory limits

Based on https://github.com/louischatriot/nedb/pull/463
pull/5/head
eliot-akira 3 years ago
parent a8ef3487bd
commit a67622dcf7
  1. 51
      lib/persistence.js
  2. 1
      lib/storage.js
  3. 14
      package-lock.json
  4. 1
      package.json
  5. 83
      test/persistence.test.js

@ -6,6 +6,7 @@
*/
const path = require('path')
const async = require('async')
const byline = require('byline')
const customUtils = require('./customUtils.js')
const Index = require('./indexes.js')
const model = require('./model.js')
@ -149,19 +150,23 @@ class Persistence {
}
/**
* From a database's raw data, return the corresponding
* From a database's raw stream, return the corresponding
* machine understandable collection
*/
treatRawData (rawData) {
const data = rawData.split('\n')
treatRawStream (rawStream, cb) {
const dataById = {}
const tdata = []
const indexes = {}
let corruptItems = -1
let corruptItems = 0
const lineStream = byline(rawStream)
const that = this
let length = 0
lineStream.on('data', function(line) {
for (const datum of data) {
try {
const doc = model.deserialize(this.beforeDeserialization(datum))
const doc = model.deserialize(that.beforeDeserialization(line))
if (doc._id) {
if (doc.$$deleted === true) delete dataById[doc._id]
else dataById[doc._id] = doc
@ -170,17 +175,27 @@ class Persistence {
} catch (e) {
corruptItems += 1
}
}
length++
})
lineStream.on('end', function() {
// A bit lenient on corruption
if (
data.length > 0 &&
corruptItems / data.length > this.corruptAlertThreshold
) throw new Error(`More than ${Math.floor(100 * this.corruptAlertThreshold)}% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss`)
let err
if (length > 0 && corruptItems / length > that.corruptAlertThreshold) {
err = new Error("More than " + Math.floor(100 * that.corruptAlertThreshold) + "% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss")
}
Object.keys(dataById).forEach(function (k) {
tdata.push(dataById[k])
})
tdata.push(...Object.values(dataById))
cb(err, { data: tdata, indexes: indexes })
})
return { data: tdata, indexes: indexes }
lineStream.on('error', function(err) {
cb(err)
})
}
/**
@ -207,14 +222,10 @@ class Persistence {
// eslint-disable-next-line node/handle-callback-err
storage.ensureDatafileIntegrity(this.filename, err => {
// TODO: handle error
storage.readFile(this.filename, 'utf8', (err, rawData) => {
const fileStream = storage.readFileStream(this.filename, { encoding : 'utf8' })
this.treatRawStream(fileStream, (err, treatedData) => {
if (err) return cb(err)
let treatedData
try {
treatedData = this.treatRawData(rawData)
} catch (e) {
return cb(e)
}
// Recreate all indexes in the datafile
Object.keys(treatedData.indexes).forEach(key => {

@ -18,6 +18,7 @@ storage.writeFile = fs.writeFile
storage.unlink = fs.unlink
storage.appendFile = fs.appendFile
storage.readFile = fs.readFile
storage.readFileStream = fs.createReadStream
storage.mkdir = fs.mkdir
/**

14
package-lock.json generated

@ -11,6 +11,7 @@
"dependencies": {
"@seald-io/binary-search-tree": "^1.0.2",
"async": "0.2.10",
"byline": "^5.0.0",
"localforage": "^1.9.0"
},
"devDependencies": {
@ -786,6 +787,14 @@
"integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==",
"dev": true
},
"node_modules/byline": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/byline/-/byline-5.0.0.tgz",
"integrity": "sha1-dBxSFkaOrcRXsDQQEYrXfejB3bE=",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/bytes": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz",
@ -6246,6 +6255,11 @@
"integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==",
"dev": true
},
"byline": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/byline/-/byline-5.0.0.tgz",
"integrity": "sha1-dBxSFkaOrcRXsDQQEYrXfejB3bE="
},
"bytes": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz",

@ -36,6 +36,7 @@
"dependencies": {
"@seald-io/binary-search-tree": "^1.0.2",
"async": "0.2.10",
"byline": "^5.0.0",
"localforage": "^1.9.0"
},
"devDependencies": {

@ -9,6 +9,7 @@ const Datastore = require('../lib/datastore')
const Persistence = require('../lib/persistence')
const storage = require('../lib/storage')
const { execFile, fork } = require('child_process')
const Readable = require('stream').Readable
const { assert } = chai
chai.should()
@ -41,101 +42,151 @@ describe('Persistence', function () {
], done)
})
it('Every line represents a document', function () {
it('Every line represents a document', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ _id: '3', nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()
stream.push(rawData)
stream.push(null)
d.persistence.treatRawStream(stream, function (err, result) {
const treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(3)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
assert.deepStrictEqual(treatedData[2], { _id: '3', nested: { today: now } })
done()
})
})
it('Badly formatted lines have no impact on the treated data', function () {
it('Badly formatted lines have no impact on the treated data', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
'garbage\n' +
model.serialize({ _id: '3', nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()
stream.push(rawData)
stream.push(null)
d.persistence.treatRawStream(stream, function (err, result) {
console.log(err)
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', nested: { today: now } })
done()
})
})
it('Well formatted lines that have no _id are not included in the data', function () {
it('Well formatted lines that have no _id are not included in the data', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()
stream.push(rawData)
stream.push(null)
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
done()
})
})
it('If two lines concern the same doc (= same _id), the last one is the good version', function () {
it('If two lines concern the same doc (= same _id), the last one is the good version', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ _id: '1', nested: { today: now } })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()
stream.push(rawData)
stream.push(null)
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', nested: { today: now } })
assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' })
done()
})
})
it('If a doc contains $$deleted: true, that means we need to remove it from the data', function () {
it('If a doc contains $$deleted: true, that means we need to remove it from the data', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', hello: 'world' }) + '\n' +
model.serialize({ _id: '1', $$deleted: true }) + '\n' +
model.serialize({ _id: '3', today: now })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()
stream.push(rawData)
stream.push(null)
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '2', hello: 'world' })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
done()
})
})
it('If a doc contains $$deleted: true, no error is thrown if the doc wasnt in the list before', function () {
it('If a doc contains $$deleted: true, no error is thrown if the doc wasnt in the list before', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ _id: '2', $$deleted: true }) + '\n' +
model.serialize({ _id: '3', today: now })
const treatedData = d.persistence.treatRawData(rawData).data
const stream = new Readable()
stream.push(rawData)
stream.push(null)
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
done()
})
})
it('If a doc contains $$indexCreated, no error is thrown during treatRawData and we can get the index options', function () {
it('If a doc contains $$indexCreated, no error is thrown during treatRawData and we can get the index options', function (done) {
const now = new Date()
const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' +
model.serialize({ $$indexCreated: { fieldName: 'test', unique: true } }) + '\n' +
model.serialize({ _id: '3', today: now })
const treatedData = d.persistence.treatRawData(rawData).data
const indexes = d.persistence.treatRawData(rawData).indexes
const stream = new Readable()
stream.push(rawData)
stream.push(null)
d.persistence.treatRawStream(stream, function (err, result) {
var treatedData = result.data
var indexes = result.indexes
Object.keys(indexes).length.should.equal(1)
assert.deepStrictEqual(indexes.test, { fieldName: 'test', unique: true })
assert.deepEqual(indexes.test, { fieldName: "test", unique: true })
treatedData.sort(function (a, b) { return a._id - b._id })
treatedData.length.should.equal(2)
assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] })
assert.deepStrictEqual(treatedData[1], { _id: '3', today: now })
done()
})
})
it('Compact database on load', function (done) {

Loading…
Cancel
Save