From 90da65eeaff58798201fe15c10859faddfb32adf Mon Sep 17 00:00:00 2001 From: Louis Chatriot Date: Mon, 8 Jul 2013 13:53:19 +0200 Subject: [PATCH] Buffer all operations before load and no more pipelining --- lib/datastore.js | 78 ++++++++++++++++----------------- lib/executor.js | 36 +++++++++++++--- test/db.test.js | 110 +---------------------------------------------- 3 files changed, 69 insertions(+), 155 deletions(-) diff --git a/lib/datastore.js b/lib/datastore.js index dee8ec0..9f647d1 100644 --- a/lib/datastore.js +++ b/lib/datastore.js @@ -14,7 +14,7 @@ var fs = require('fs') * Create a new collection * @param {String} options.filename Optional, datastore will be in-memory only if not provided * @param {Boolean} options.inMemoryOnly Optional, default to false - * @param {Boolean} options.pipeline Optional, defaults to false, pipeline appends to the data file (enable to return before writes are persisted to disk for greater speed) + * @param {Boolean} options.pipeline DEPRECATED, doesn't have any effect anymore * @param {Boolean} options.nodeWebkitAppName Optional, specify the name of your NW app if you want options.filename to be relative to the directory where * Node Webkit stores application data such as cookies and local storage (the best place to store data in my opinion) */ @@ -25,12 +25,10 @@ function Datastore (options) { if (typeof options === 'string') { filename = options; this.inMemoryOnly = false; // Default - this.pipeline = false; // Default } else { options = options || {}; filename = options.filename; this.inMemoryOnly = options.inMemoryOnly || false; - this.pipeline = options.pipeline || false; } // Determine whether in memory or persistent @@ -47,7 +45,6 @@ function Datastore (options) { } this.executor = new Executor(); - if (this.pipeline) { this.persistenceExecutor = new Executor(); } // We keep internally the number of lines in the datafile // This will be used when/if I implement autocompacting when the datafile grows too big @@ -255,35 +252,40 @@ Datastore.prototype._loadDatabase = function (cb) { // In-memory only datastore if (self.inMemoryOnly) { return callback(null); } - customUtils.ensureDirectoryExists(path.dirname(self.filename), function (err) { - fs.exists(self.filename, function (exists) { - if (!exists) { return fs.writeFile(self.filename, '', 'utf8', function (err) { callback(err); }); } - - fs.readFile(self.filename, 'utf8', function (err, rawData) { - if (err) { return callback(err); } - var treatedData = Datastore.treatRawData(rawData); - - try { - self.resetIndexes(treatedData); - } catch (e) { - self.resetIndexes(); // Rollback any index which didn't fail - self.datafileSize = 0; - return callback(e); - } - - self.datafileSize = treatedData.length; - self.persistCachedDatabase(callback); + async.waterfall([ + function (cb) { + customUtils.ensureDirectoryExists(path.dirname(self.filename), function (err) { + fs.exists(self.filename, function (exists) { + if (!exists) { return fs.writeFile(self.filename, '', 'utf8', function (err) { cb(err); }); } + + fs.readFile(self.filename, 'utf8', function (err, rawData) { + if (err) { return cb(err); } + var treatedData = Datastore.treatRawData(rawData); + + try { + self.resetIndexes(treatedData); + } catch (e) { + self.resetIndexes(); // Rollback any index which didn't fail + self.datafileSize = 0; + return cb(e); + } + + self.datafileSize = treatedData.length; + self.persistCachedDatabase(cb); + }); + }); }); - }); - }); + } + ], function (err) { + if (err) { return callback(err); } + + self.executor.processBuffer(); + return callback(null); + }); }; Datastore.prototype.loadDatabase = function () { - if (this.pipeline) { - this.persistenceExecutor.push({ this: this, fn: this._loadDatabase, arguments: arguments }); - } else { - this.executor.push({ this: this, fn: this._loadDatabase, arguments: arguments }); - } + this.executor.push({ this: this, fn: this._loadDatabase, arguments: arguments }, true); }; @@ -371,18 +373,10 @@ Datastore.prototype._persistNewState = function (newDocs, cb) { }; Datastore.prototype.persistNewState = function (newDocs, cb) { if (this.inMemoryOnly) { - cb(); // No persistence - return; - } - - if (this.pipeline) { - this.persistenceExecutor.push({ this: this, fn: this._persistNewState, arguments: [newDocs] }); - cb (); // Return right away with no error - return; + cb(); + } else { + this._persistNewState(newDocs, cb); } - - // Default behaviour - this._persistNewState(newDocs, cb); }; @@ -424,6 +418,8 @@ Datastore.prototype.insert = function () { /** * Find all documents matching the query * @param {Object} query MongoDB-style query + * + * @api private Use find */ Datastore.prototype._find = function (query, callback) { var res = [] @@ -453,6 +449,8 @@ Datastore.prototype.find = function () { /** * Find one document matching the query * @param {Object} query MongoDB-style query + * + * @api private Use findOne */ Datastore.prototype._findOne = function (query, callback) { var self = this diff --git a/lib/executor.js b/lib/executor.js index abb25c8..8be5a69 100644 --- a/lib/executor.js +++ b/lib/executor.js @@ -6,12 +6,17 @@ var async = require('async') ; function Executor () { + this.buffer = []; + this.ready = false; + + // This queue will execute all commands, one-by-one in order this.queue = async.queue(function (task, cb) { var callback , lastArg = task.arguments[task.arguments.length - 1] , i, newArguments = [] ; + // task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array for (i = 0; i < task.arguments.length; i += 1) { newArguments.push(task.arguments[i]); } // Always tell the queue task is complete. Execute callback if any was given. @@ -34,13 +39,32 @@ function Executor () { /** - * @param {Object} options - * options.this - Object to use as this - * options.fn - Function to execute - * options.arguments - Array of arguments + * If executor is ready, queue task (and process it immediately if executor was idle) + * If not, buffer task for later processing + * @param {Object} task + * task.this - Object to use as this + * task.fn - Function to execute + * task.arguments - Array of arguments + * @param {Boolean} forceQueuing Optional (defaults to false) force executor to queue task even if it is not ready + */ +Executor.prototype.push = function (task, forceQueuing) { + if (this.ready || forceQueuing) { + this.queue.push(task); + } else { + this.buffer.push(task); + } +}; + + +/** + * Queue all tasks in buffer (in the same order they came in) + * Automatically sets executor as ready */ -Executor.prototype.push = function () { - this.queue.push.apply(this, arguments); +Executor.prototype.processBuffer = function () { + var i; + this.ready = true; + for (i = 0; i < this.buffer.length; i += 1) { this.queue.push(this.buffer[i]); } + this.buffer = []; }; diff --git a/test/db.test.js b/test/db.test.js index 744af3b..6027ef6 100644 --- a/test/db.test.js +++ b/test/db.test.js @@ -17,7 +17,6 @@ describe('Database', function () { beforeEach(function (done) { d = new Datastore({ filename: testDb }); d.filename.should.equal(testDb); - d.pipeline.should.equal(false); d.inMemoryOnly.should.equal(false); async.waterfall([ @@ -45,17 +44,14 @@ describe('Database', function () { it('Constructor compatibility with v0.6-', function () { var dbef = new Datastore('somefile'); dbef.filename.should.equal('somefile'); - dbef.pipeline.should.equal(false); dbef.inMemoryOnly.should.equal(false); var dbef = new Datastore(''); assert.isNull(dbef.filename); - dbef.pipeline.should.equal(false); dbef.inMemoryOnly.should.equal(true); var dbef = new Datastore(); assert.isNull(dbef.filename); - dbef.pipeline.should.equal(false); dbef.inMemoryOnly.should.equal(true); }); @@ -304,7 +300,7 @@ describe('Database', function () { describe('Insert', function () { - it('Able to insert a document in the database, setting an _id if none provided, and retrieve it even after a reload', function (done) { + it.only('Able to insert a document in the database, setting an _id if none provided, and retrieve it even after a reload', function (done) { d.find({}, function (err, docs) { docs.length.should.equal(0); @@ -2029,108 +2025,4 @@ describe('Database', function () { }); // ==== End of 'Using indexes' ==== // - describe('Pipelining', function () { - - it('Can insert documents and persist them', function (done) { - d = new Datastore({ filename: testDb, pipeline: true }); - d.filename.should.equal(testDb); - d.pipeline.should.equal(true); - d.inMemoryOnly.should.equal(false); - assert.isDefined(d.persistenceExecutor); - - d.insert({ f: 12 }, function (err, doc12) { - assert.isNull(err); - d.insert({ f: 5 }, function (err, doc5) { - assert.isNull(err); - d.insert({ f: 31 }, function (err, doc31) { - assert.isNull(err); - - // Need to wait a bit for persistence pipeline to be taken care of - // 2ms is enough but let's use 50 to be really sure tests don't fail for a bad reason - setTimeout(function () { - var rawData = fs.readFileSync(testDb, 'utf8') - , treatedData = Datastore.treatRawData(rawData) - ; - - treatedData.sort(function (a, b) { return a.f - b.f; }); - treatedData.length.should.equal(3); - assert.deepEqual(treatedData[0], doc5); - assert.deepEqual(treatedData[1], doc12); - assert.deepEqual(treatedData[2], doc31); - - done(); - }, 50); - }); - }); - }); - }); - - it('Can remove documents in persistence file too', function (done) { - d = new Datastore({ filename: testDb, pipeline: true }); - d.filename.should.equal(testDb); - d.pipeline.should.equal(true); - d.inMemoryOnly.should.equal(false); - assert.isDefined(d.persistenceExecutor); - - d.insert({ f: 12 }, function (err, doc12) { - assert.isNull(err); - d.insert({ f: 5 }, function (err, doc5) { - assert.isNull(err); - d.remove({ f: 12 }, {}, function (err, doc31) { - assert.isNull(err); - - // Need to wait a bit for persistence pipeline to be taken care of - // 2ms is enough but let's use 50 to be really sure tests don't fail for a bad reason - setTimeout(function () { - var rawData = fs.readFileSync(testDb, 'utf8') - , treatedData = Datastore.treatRawData(rawData) - ; - - treatedData.sort(function (a, b) { return a.f - b.f; }); - treatedData.length.should.equal(1); - assert.deepEqual(treatedData[0], doc5); - - done(); - }, 50); - }); - }); - }); - }); - - it('Can update documents in persistence file too', function (done) { - d = new Datastore({ filename: testDb, pipeline: true }); - d.filename.should.equal(testDb); - d.pipeline.should.equal(true); - d.inMemoryOnly.should.equal(false); - assert.isDefined(d.persistenceExecutor); - - d.insert({ f: 12 }, function (err, doc12) { - assert.isNull(err); - d.insert({ f: 5 }, function (err, doc5) { - assert.isNull(err); - d.update({ f: 12 }, { $set: { f: 555 } }, {}, function (err) { - assert.isNull(err); - - // Need to wait a bit for persistence pipeline to be taken care of - // 2ms is enough but let's use 50 to be really sure tests don't fail for a bad reason - setTimeout(function () { - var rawData = fs.readFileSync(testDb, 'utf8') - , treatedData = Datastore.treatRawData(rawData) - ; - - treatedData.sort(function (a, b) { return a.f - b.f; }); - treatedData.length.should.equal(2); - assert.deepEqual(treatedData[0], doc5); - assert.deepEqual(treatedData[1], { f: 555, _id: doc12._id }); - - done(); - }, 50); - }); - }); - }); - }); - - }); // ==== End of 'Pipelining' ==== // - - });