Buffer all operations before load and no more pipelining

pull/2/head
Louis Chatriot 12 years ago
parent 7469b0d442
commit 90da65eeaf
  1. 46
      lib/datastore.js
  2. 36
      lib/executor.js
  3. 110
      test/db.test.js

@ -14,7 +14,7 @@ var fs = require('fs')
* Create a new collection
* @param {String} options.filename Optional, datastore will be in-memory only if not provided
* @param {Boolean} options.inMemoryOnly Optional, default to false
* @param {Boolean} options.pipeline Optional, defaults to false, pipeline appends to the data file (enable to return before writes are persisted to disk for greater speed)
* @param {Boolean} options.pipeline DEPRECATED, doesn't have any effect anymore
* @param {Boolean} options.nodeWebkitAppName Optional, specify the name of your NW app if you want options.filename to be relative to the directory where
* Node Webkit stores application data such as cookies and local storage (the best place to store data in my opinion)
*/
@ -25,12 +25,10 @@ function Datastore (options) {
if (typeof options === 'string') {
filename = options;
this.inMemoryOnly = false; // Default
this.pipeline = false; // Default
} else {
options = options || {};
filename = options.filename;
this.inMemoryOnly = options.inMemoryOnly || false;
this.pipeline = options.pipeline || false;
}
// Determine whether in memory or persistent
@ -47,7 +45,6 @@ function Datastore (options) {
}
this.executor = new Executor();
if (this.pipeline) { this.persistenceExecutor = new Executor(); }
// We keep internally the number of lines in the datafile
// This will be used when/if I implement autocompacting when the datafile grows too big
@ -255,12 +252,14 @@ Datastore.prototype._loadDatabase = function (cb) {
// In-memory only datastore
if (self.inMemoryOnly) { return callback(null); }
async.waterfall([
function (cb) {
customUtils.ensureDirectoryExists(path.dirname(self.filename), function (err) {
fs.exists(self.filename, function (exists) {
if (!exists) { return fs.writeFile(self.filename, '', 'utf8', function (err) { callback(err); }); }
if (!exists) { return fs.writeFile(self.filename, '', 'utf8', function (err) { cb(err); }); }
fs.readFile(self.filename, 'utf8', function (err, rawData) {
if (err) { return callback(err); }
if (err) { return cb(err); }
var treatedData = Datastore.treatRawData(rawData);
try {
@ -268,22 +267,25 @@ Datastore.prototype._loadDatabase = function (cb) {
} catch (e) {
self.resetIndexes(); // Rollback any index which didn't fail
self.datafileSize = 0;
return callback(e);
return cb(e);
}
self.datafileSize = treatedData.length;
self.persistCachedDatabase(callback);
self.persistCachedDatabase(cb);
});
});
});
}
], function (err) {
if (err) { return callback(err); }
self.executor.processBuffer();
return callback(null);
});
};
Datastore.prototype.loadDatabase = function () {
if (this.pipeline) {
this.persistenceExecutor.push({ this: this, fn: this._loadDatabase, arguments: arguments });
} else {
this.executor.push({ this: this, fn: this._loadDatabase, arguments: arguments });
}
this.executor.push({ this: this, fn: this._loadDatabase, arguments: arguments }, true);
};
@ -371,18 +373,10 @@ Datastore.prototype._persistNewState = function (newDocs, cb) {
};
Datastore.prototype.persistNewState = function (newDocs, cb) {
if (this.inMemoryOnly) {
cb(); // No persistence
return;
}
if (this.pipeline) {
this.persistenceExecutor.push({ this: this, fn: this._persistNewState, arguments: [newDocs] });
cb (); // Return right away with no error
return;
}
// Default behaviour
cb();
} else {
this._persistNewState(newDocs, cb);
}
};
@ -424,6 +418,8 @@ Datastore.prototype.insert = function () {
/**
* Find all documents matching the query
* @param {Object} query MongoDB-style query
*
* @api private Use find
*/
Datastore.prototype._find = function (query, callback) {
var res = []
@ -453,6 +449,8 @@ Datastore.prototype.find = function () {
/**
* Find one document matching the query
* @param {Object} query MongoDB-style query
*
* @api private Use findOne
*/
Datastore.prototype._findOne = function (query, callback) {
var self = this

@ -6,12 +6,17 @@ var async = require('async')
;
function Executor () {
this.buffer = [];
this.ready = false;
// This queue will execute all commands, one-by-one in order
this.queue = async.queue(function (task, cb) {
var callback
, lastArg = task.arguments[task.arguments.length - 1]
, i, newArguments = []
;
// task.arguments is an array-like object on which adding a new field doesn't work, so we transform it into a real array
for (i = 0; i < task.arguments.length; i += 1) { newArguments.push(task.arguments[i]); }
// Always tell the queue task is complete. Execute callback if any was given.
@ -34,13 +39,32 @@ function Executor () {
/**
* @param {Object} options
* options.this - Object to use as this
* options.fn - Function to execute
* options.arguments - Array of arguments
* If executor is ready, queue task (and process it immediately if executor was idle)
* If not, buffer task for later processing
* @param {Object} task
* task.this - Object to use as this
* task.fn - Function to execute
* task.arguments - Array of arguments
* @param {Boolean} forceQueuing Optional (defaults to false) force executor to queue task even if it is not ready
*/
Executor.prototype.push = function (task, forceQueuing) {
if (this.ready || forceQueuing) {
this.queue.push(task);
} else {
this.buffer.push(task);
}
};
/**
* Queue all tasks in buffer (in the same order they came in)
* Automatically sets executor as ready
*/
Executor.prototype.push = function () {
this.queue.push.apply(this, arguments);
Executor.prototype.processBuffer = function () {
var i;
this.ready = true;
for (i = 0; i < this.buffer.length; i += 1) { this.queue.push(this.buffer[i]); }
this.buffer = [];
};

@ -17,7 +17,6 @@ describe('Database', function () {
beforeEach(function (done) {
d = new Datastore({ filename: testDb });
d.filename.should.equal(testDb);
d.pipeline.should.equal(false);
d.inMemoryOnly.should.equal(false);
async.waterfall([
@ -45,17 +44,14 @@ describe('Database', function () {
it('Constructor compatibility with v0.6-', function () {
var dbef = new Datastore('somefile');
dbef.filename.should.equal('somefile');
dbef.pipeline.should.equal(false);
dbef.inMemoryOnly.should.equal(false);
var dbef = new Datastore('');
assert.isNull(dbef.filename);
dbef.pipeline.should.equal(false);
dbef.inMemoryOnly.should.equal(true);
var dbef = new Datastore();
assert.isNull(dbef.filename);
dbef.pipeline.should.equal(false);
dbef.inMemoryOnly.should.equal(true);
});
@ -304,7 +300,7 @@ describe('Database', function () {
describe('Insert', function () {
it('Able to insert a document in the database, setting an _id if none provided, and retrieve it even after a reload', function (done) {
it.only('Able to insert a document in the database, setting an _id if none provided, and retrieve it even after a reload', function (done) {
d.find({}, function (err, docs) {
docs.length.should.equal(0);
@ -2029,108 +2025,4 @@ describe('Database', function () {
}); // ==== End of 'Using indexes' ==== //
describe('Pipelining', function () {
it('Can insert documents and persist them', function (done) {
d = new Datastore({ filename: testDb, pipeline: true });
d.filename.should.equal(testDb);
d.pipeline.should.equal(true);
d.inMemoryOnly.should.equal(false);
assert.isDefined(d.persistenceExecutor);
d.insert({ f: 12 }, function (err, doc12) {
assert.isNull(err);
d.insert({ f: 5 }, function (err, doc5) {
assert.isNull(err);
d.insert({ f: 31 }, function (err, doc31) {
assert.isNull(err);
// Need to wait a bit for persistence pipeline to be taken care of
// 2ms is enough but let's use 50 to be really sure tests don't fail for a bad reason
setTimeout(function () {
var rawData = fs.readFileSync(testDb, 'utf8')
, treatedData = Datastore.treatRawData(rawData)
;
treatedData.sort(function (a, b) { return a.f - b.f; });
treatedData.length.should.equal(3);
assert.deepEqual(treatedData[0], doc5);
assert.deepEqual(treatedData[1], doc12);
assert.deepEqual(treatedData[2], doc31);
done();
}, 50);
});
});
});
});
it('Can remove documents in persistence file too', function (done) {
d = new Datastore({ filename: testDb, pipeline: true });
d.filename.should.equal(testDb);
d.pipeline.should.equal(true);
d.inMemoryOnly.should.equal(false);
assert.isDefined(d.persistenceExecutor);
d.insert({ f: 12 }, function (err, doc12) {
assert.isNull(err);
d.insert({ f: 5 }, function (err, doc5) {
assert.isNull(err);
d.remove({ f: 12 }, {}, function (err, doc31) {
assert.isNull(err);
// Need to wait a bit for persistence pipeline to be taken care of
// 2ms is enough but let's use 50 to be really sure tests don't fail for a bad reason
setTimeout(function () {
var rawData = fs.readFileSync(testDb, 'utf8')
, treatedData = Datastore.treatRawData(rawData)
;
treatedData.sort(function (a, b) { return a.f - b.f; });
treatedData.length.should.equal(1);
assert.deepEqual(treatedData[0], doc5);
done();
}, 50);
});
});
});
});
it('Can update documents in persistence file too', function (done) {
d = new Datastore({ filename: testDb, pipeline: true });
d.filename.should.equal(testDb);
d.pipeline.should.equal(true);
d.inMemoryOnly.should.equal(false);
assert.isDefined(d.persistenceExecutor);
d.insert({ f: 12 }, function (err, doc12) {
assert.isNull(err);
d.insert({ f: 5 }, function (err, doc5) {
assert.isNull(err);
d.update({ f: 12 }, { $set: { f: 555 } }, {}, function (err) {
assert.isNull(err);
// Need to wait a bit for persistence pipeline to be taken care of
// 2ms is enough but let's use 50 to be really sure tests don't fail for a bad reason
setTimeout(function () {
var rawData = fs.readFileSync(testDb, 'utf8')
, treatedData = Datastore.treatRawData(rawData)
;
treatedData.sort(function (a, b) { return a.f - b.f; });
treatedData.length.should.equal(2);
assert.deepEqual(treatedData[0], doc5);
assert.deepEqual(treatedData[1], { f: 555, _id: doc12._id });
done();
}, 50);
});
});
});
});
}); // ==== End of 'Pipelining' ==== //
});

Loading…
Cancel
Save