From a67622dcf78a1130b055998bd40c8bef91740482 Mon Sep 17 00:00:00 2001 From: eliot-akira Date: Wed, 11 Aug 2021 15:10:29 +0200 Subject: [PATCH] Stream files line by line when parsing to avoid memory limits Based on https://github.com/louischatriot/nedb/pull/463 --- lib/persistence.js | 53 ++++++++------ lib/storage.js | 1 + package-lock.json | 14 ++++ package.json | 1 + test/persistence.test.js | 147 ++++++++++++++++++++++++++------------- 5 files changed, 147 insertions(+), 69 deletions(-) diff --git a/lib/persistence.js b/lib/persistence.js index 3bc3458..1788972 100755 --- a/lib/persistence.js +++ b/lib/persistence.js @@ -6,6 +6,7 @@ */ const path = require('path') const async = require('async') +const byline = require('byline') const customUtils = require('./customUtils.js') const Index = require('./indexes.js') const model = require('./model.js') @@ -149,19 +150,23 @@ class Persistence { } /** - * From a database's raw data, return the corresponding + * From a database's raw stream, return the corresponding * machine understandable collection */ - treatRawData (rawData) { - const data = rawData.split('\n') + treatRawStream (rawStream, cb) { const dataById = {} const tdata = [] const indexes = {} - let corruptItems = -1 + let corruptItems = 0 + + const lineStream = byline(rawStream) + const that = this + let length = 0 + + lineStream.on('data', function(line) { - for (const datum of data) { try { - const doc = model.deserialize(this.beforeDeserialization(datum)) + const doc = model.deserialize(that.beforeDeserialization(line)) if (doc._id) { if (doc.$$deleted === true) delete dataById[doc._id] else dataById[doc._id] = doc @@ -170,17 +175,27 @@ class Persistence { } catch (e) { corruptItems += 1 } - } - // A bit lenient on corruption - if ( - data.length > 0 && - corruptItems / data.length > this.corruptAlertThreshold - ) throw new Error(`More than ${Math.floor(100 * this.corruptAlertThreshold)}% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss`) + length++ + }) + + lineStream.on('end', function() { + // A bit lenient on corruption + let err + if (length > 0 && corruptItems / length > that.corruptAlertThreshold) { + err = new Error("More than " + Math.floor(100 * that.corruptAlertThreshold) + "% of the data file is corrupt, the wrong beforeDeserialization hook may be used. Cautiously refusing to start NeDB to prevent dataloss") + } - tdata.push(...Object.values(dataById)) + Object.keys(dataById).forEach(function (k) { + tdata.push(dataById[k]) + }) - return { data: tdata, indexes: indexes } + cb(err, { data: tdata, indexes: indexes }) + }) + + lineStream.on('error', function(err) { + cb(err) + }) } /** @@ -207,14 +222,10 @@ class Persistence { // eslint-disable-next-line node/handle-callback-err storage.ensureDatafileIntegrity(this.filename, err => { // TODO: handle error - storage.readFile(this.filename, 'utf8', (err, rawData) => { + const fileStream = storage.readFileStream(this.filename, { encoding : 'utf8' }) + this.treatRawStream(fileStream, (err, treatedData) => { + if (err) return cb(err) - let treatedData - try { - treatedData = this.treatRawData(rawData) - } catch (e) { - return cb(e) - } // Recreate all indexes in the datafile Object.keys(treatedData.indexes).forEach(key => { diff --git a/lib/storage.js b/lib/storage.js index 7903241..56ad8e4 100755 --- a/lib/storage.js +++ b/lib/storage.js @@ -18,6 +18,7 @@ storage.writeFile = fs.writeFile storage.unlink = fs.unlink storage.appendFile = fs.appendFile storage.readFile = fs.readFile +storage.readFileStream = fs.createReadStream storage.mkdir = fs.mkdir /** diff --git a/package-lock.json b/package-lock.json index b808559..aede386 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "@seald-io/binary-search-tree": "^1.0.2", "async": "0.2.10", + "byline": "^5.0.0", "localforage": "^1.9.0" }, "devDependencies": { @@ -786,6 +787,14 @@ "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==", "dev": true }, + "node_modules/byline": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/byline/-/byline-5.0.0.tgz", + "integrity": "sha1-dBxSFkaOrcRXsDQQEYrXfejB3bE=", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/bytes": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz", @@ -6246,6 +6255,11 @@ "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==", "dev": true }, + "byline": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/byline/-/byline-5.0.0.tgz", + "integrity": "sha1-dBxSFkaOrcRXsDQQEYrXfejB3bE=" + }, "bytes": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz", diff --git a/package.json b/package.json index 1f34f28..5670303 100755 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "dependencies": { "@seald-io/binary-search-tree": "^1.0.2", "async": "0.2.10", + "byline": "^5.0.0", "localforage": "^1.9.0" }, "devDependencies": { diff --git a/test/persistence.test.js b/test/persistence.test.js index e62b134..5131f14 100755 --- a/test/persistence.test.js +++ b/test/persistence.test.js @@ -9,6 +9,7 @@ const Datastore = require('../lib/datastore') const Persistence = require('../lib/persistence') const storage = require('../lib/storage') const { execFile, fork } = require('child_process') +const Readable = require('stream').Readable const { assert } = chai chai.should() @@ -41,101 +42,151 @@ describe('Persistence', function () { ], done) }) - it('Every line represents a document', function () { + it('Every line represents a document', function (done) { const now = new Date() const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' + model.serialize({ _id: '2', hello: 'world' }) + '\n' + model.serialize({ _id: '3', nested: { today: now } }) - const treatedData = d.persistence.treatRawData(rawData).data - - treatedData.sort(function (a, b) { return a._id - b._id }) - treatedData.length.should.equal(3) - assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) - assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' }) - assert.deepStrictEqual(treatedData[2], { _id: '3', nested: { today: now } }) + const stream = new Readable() + + stream.push(rawData) + stream.push(null) + + d.persistence.treatRawStream(stream, function (err, result) { + const treatedData = result.data + treatedData.sort(function (a, b) { return a._id - b._id }) + treatedData.length.should.equal(3) + assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) + assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' }) + assert.deepStrictEqual(treatedData[2], { _id: '3', nested: { today: now } }) + done() + }) }) - it('Badly formatted lines have no impact on the treated data', function () { + it('Badly formatted lines have no impact on the treated data', function (done) { const now = new Date() const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' + 'garbage\n' + model.serialize({ _id: '3', nested: { today: now } }) - const treatedData = d.persistence.treatRawData(rawData).data - - treatedData.sort(function (a, b) { return a._id - b._id }) - treatedData.length.should.equal(2) - assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) - assert.deepStrictEqual(treatedData[1], { _id: '3', nested: { today: now } }) + const stream = new Readable() + + stream.push(rawData) + stream.push(null) + + d.persistence.treatRawStream(stream, function (err, result) { + console.log(err) + var treatedData = result.data + treatedData.sort(function (a, b) { return a._id - b._id }) + treatedData.length.should.equal(2) + assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) + assert.deepStrictEqual(treatedData[1], { _id: '3', nested: { today: now } }) + done() + }) }) - it('Well formatted lines that have no _id are not included in the data', function () { + it('Well formatted lines that have no _id are not included in the data', function (done) { const now = new Date() const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' + model.serialize({ _id: '2', hello: 'world' }) + '\n' + model.serialize({ nested: { today: now } }) - const treatedData = d.persistence.treatRawData(rawData).data + const stream = new Readable() - treatedData.sort(function (a, b) { return a._id - b._id }) - treatedData.length.should.equal(2) - assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) - assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' }) + stream.push(rawData) + stream.push(null) + + d.persistence.treatRawStream(stream, function (err, result) { + var treatedData = result.data + treatedData.sort(function (a, b) { return a._id - b._id }) + treatedData.length.should.equal(2) + assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) + assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' }) + done() + }) }) - it('If two lines concern the same doc (= same _id), the last one is the good version', function () { + it('If two lines concern the same doc (= same _id), the last one is the good version', function (done) { const now = new Date() const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' + model.serialize({ _id: '2', hello: 'world' }) + '\n' + model.serialize({ _id: '1', nested: { today: now } }) - const treatedData = d.persistence.treatRawData(rawData).data + const stream = new Readable() - treatedData.sort(function (a, b) { return a._id - b._id }) - treatedData.length.should.equal(2) - assert.deepStrictEqual(treatedData[0], { _id: '1', nested: { today: now } }) - assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' }) + stream.push(rawData) + stream.push(null) + + d.persistence.treatRawStream(stream, function (err, result) { + var treatedData = result.data + treatedData.sort(function (a, b) { return a._id - b._id }) + treatedData.length.should.equal(2) + assert.deepStrictEqual(treatedData[0], { _id: '1', nested: { today: now } }) + assert.deepStrictEqual(treatedData[1], { _id: '2', hello: 'world' }) + done() + }) }) - it('If a doc contains $$deleted: true, that means we need to remove it from the data', function () { + it('If a doc contains $$deleted: true, that means we need to remove it from the data', function (done) { const now = new Date() const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' + model.serialize({ _id: '2', hello: 'world' }) + '\n' + model.serialize({ _id: '1', $$deleted: true }) + '\n' + model.serialize({ _id: '3', today: now }) - const treatedData = d.persistence.treatRawData(rawData).data + const stream = new Readable() + + stream.push(rawData) + stream.push(null) - treatedData.sort(function (a, b) { return a._id - b._id }) - treatedData.length.should.equal(2) - assert.deepStrictEqual(treatedData[0], { _id: '2', hello: 'world' }) - assert.deepStrictEqual(treatedData[1], { _id: '3', today: now }) + d.persistence.treatRawStream(stream, function (err, result) { + var treatedData = result.data + treatedData.sort(function (a, b) { return a._id - b._id }) + treatedData.length.should.equal(2) + assert.deepStrictEqual(treatedData[0], { _id: '2', hello: 'world' }) + assert.deepStrictEqual(treatedData[1], { _id: '3', today: now }) + done() + }) }) - it('If a doc contains $$deleted: true, no error is thrown if the doc wasnt in the list before', function () { + it('If a doc contains $$deleted: true, no error is thrown if the doc wasnt in the list before', function (done) { const now = new Date() const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' + model.serialize({ _id: '2', $$deleted: true }) + '\n' + model.serialize({ _id: '3', today: now }) - const treatedData = d.persistence.treatRawData(rawData).data + const stream = new Readable() + + stream.push(rawData) + stream.push(null) - treatedData.sort(function (a, b) { return a._id - b._id }) - treatedData.length.should.equal(2) - assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) - assert.deepStrictEqual(treatedData[1], { _id: '3', today: now }) + d.persistence.treatRawStream(stream, function (err, result) { + var treatedData = result.data + treatedData.sort(function (a, b) { return a._id - b._id }) + treatedData.length.should.equal(2) + assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) + assert.deepStrictEqual(treatedData[1], { _id: '3', today: now }) + done() + }) }) - it('If a doc contains $$indexCreated, no error is thrown during treatRawData and we can get the index options', function () { + it('If a doc contains $$indexCreated, no error is thrown during treatRawData and we can get the index options', function (done) { const now = new Date() const rawData = model.serialize({ _id: '1', a: 2, ages: [1, 5, 12] }) + '\n' + model.serialize({ $$indexCreated: { fieldName: 'test', unique: true } }) + '\n' + model.serialize({ _id: '3', today: now }) - const treatedData = d.persistence.treatRawData(rawData).data - const indexes = d.persistence.treatRawData(rawData).indexes + const stream = new Readable() + + stream.push(rawData) + stream.push(null) - Object.keys(indexes).length.should.equal(1) - assert.deepStrictEqual(indexes.test, { fieldName: 'test', unique: true }) + d.persistence.treatRawStream(stream, function (err, result) { + var treatedData = result.data + var indexes = result.indexes + Object.keys(indexes).length.should.equal(1) + assert.deepEqual(indexes.test, { fieldName: "test", unique: true }) - treatedData.sort(function (a, b) { return a._id - b._id }) - treatedData.length.should.equal(2) - assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) - assert.deepStrictEqual(treatedData[1], { _id: '3', today: now }) + treatedData.sort(function (a, b) { return a._id - b._id }) + treatedData.length.should.equal(2) + assert.deepStrictEqual(treatedData[0], { _id: '1', a: 2, ages: [1, 5, 12] }) + assert.deepStrictEqual(treatedData[1], { _id: '3', today: now }) + done() + }) }) it('Compact database on load', function (done) {