Use executor for db operations

pull/2/head
Louis Chatriot 12 years ago
parent f28313f1a3
commit c57af28464
  1. 22
      lib/datastore.js
  2. 40
      lib/executor.js
  3. 47
      test/db.test.js

@ -1,8 +1,7 @@
/**
* The datastore itself
* TODO
* Queue operations
* Update and removes should only modify the corresponding part of the database
* Update and removes should only modify the corresponding part of the database (or use much faster append-only format)
*/
var fs = require('fs')
@ -10,6 +9,7 @@ var fs = require('fs')
, customUtils = require('./customUtils')
, model = require('./model')
, async = require('async')
, executor = require('./executor')
;
@ -76,7 +76,7 @@ Datastore.treatRawData = function (rawData) {
* Insert a new document
* @param {Function} cb Optional callback, signature: err, insertedDoc
*/
Datastore.prototype.insert = function (newDoc, cb) {
Datastore.prototype._insert = function (newDoc, cb) {
var callback = cb || function () {}
, self = this
, persistableNewDoc
@ -99,6 +99,10 @@ Datastore.prototype.insert = function (newDoc, cb) {
});
};
Datastore.prototype.insert = function () {
executor.push({ this: this, fn: this._insert, arguments: arguments });
};
/**
* Find all documents matching the query
@ -178,7 +182,7 @@ Datastore.prototype.persistWholeDatabase = function (data, cb) {
* options.upsert If true, document is inserted if the query doesn't match anything
* @param {Function} cb Optional callback, signature: err, numReplaced, upsert (set to true if the update was in fact an upsert)
*/
Datastore.prototype.update = function (query, updateQuery, options, cb) {
Datastore.prototype._update = function (query, updateQuery, options, cb) {
var callback
, self = this
, numReplaced = 0
@ -201,7 +205,7 @@ Datastore.prototype.update = function (query, updateQuery, options, cb) {
} else {
// The upserted document is the query (since for now queries have the same structure as
// documents), modified by the updateQuery
return self.insert(model.modify(query, updateQuery), function (err) {
return self._insert(model.modify(query, updateQuery), function (err) {
if (err) { return callback(err); }
return callback(null, 1, true);
});
@ -230,6 +234,9 @@ Datastore.prototype.update = function (query, updateQuery, options, cb) {
}
]);
};
Datastore.prototype.update = function () {
executor.push({ this: this, fn: this._update, arguments: arguments });
};
/**
@ -240,7 +247,7 @@ Datastore.prototype.update = function (query, updateQuery, options, cb) {
* options.multi If true, can update multiple documents (defaults to false)
* @param {Function} cb Optional callback, signature: err, numRemoved
*/
Datastore.prototype.remove = function (query, options, cb) {
Datastore.prototype._remove = function (query, options, cb) {
var callback
, self = this
, numRemoved = 0
@ -265,6 +272,9 @@ Datastore.prototype.remove = function (query, options, cb) {
return callback(null, numRemoved);
});
};
Datastore.prototype.remove = function () {
executor.push({ this: this, fn: this._remove, arguments: arguments });
};
module.exports = Datastore;

@ -1,5 +1,5 @@
/**
* Responsible for sequentially executing requests
* Responsible for sequentially executing actions on the database
*/
var async = require('async')
@ -31,33 +31,33 @@ executor = async.queue(function (task, cb) {
}, 1);
function test1 (msg, cb) {
var callback = cb || function () {};
//function test1 (msg, cb) {
//var callback = cb || function () {};
console.log("ooooo TEST1");
//console.log("ooooo TEST1");
setTimeout(function () {
console.log("Hello " + msg);
callback();
}, 1500);
}
//setTimeout(function () {
//console.log("Hello " + msg);
//callback();
//}, 1500);
//}
function test2 (msg, cb) {
var callback = cb || function () {};
//function test2 (msg, cb) {
//var callback = cb || function () {};
console.log("ooooo TEST2");
//console.log("ooooo TEST2");
setTimeout(function () {
console.log("Ola " + msg);
callback('YEAH');
}, 500);
}
//setTimeout(function () {
//console.log("Ola " + msg);
//callback('YEAH');
//}, 500);
//}
function bloup () { console.log("FINISHED"); console.log(arguments); }
//function bloup () { console.log("FINISHED"); console.log(arguments); }
executor.push({ this: null, fn: test1, arguments: [ 'world' ] });
executor.push({ this: null, fn: test2, arguments: [ 'world', bloup ] });
//executor.push({ this: null, fn: test1, arguments: [ 'world' ] });
//executor.push({ this: null, fn: test2, arguments: [ 'world' ] });
/*

@ -567,30 +567,29 @@ describe('Database', function () {
});
// This tests concurrency issues
// Right now, it doesn't pass, because I need to
//it.only('Remove can be called multiple times in parallel and everything that needs to be removed will be', function (done) {
//d.insert({ planet: 'Earth' }, function () {
//d.insert({ planet: 'Mars' }, function () {
//d.insert({ planet: 'Saturn' }, function () {
//d.find({}, function (err, docs) {
//docs.length.should.equal(3);
//// Remove two docs simultaneously
//var toRemove = ['Mars', 'Saturn'];
//async.each(toRemove, function(planet, cb) {
//d.remove({ planet: planet }, function (err) { return cb(err); });
//}, function (err) {
//d.find({}, function (err, docs) {
//docs.length.should.equal(1);
//done();
//});
//});
//});
//});
//});
//});
//});
it('Remove can be called multiple times in parallel and everything that needs to be removed will be', function (done) {
d.insert({ planet: 'Earth' }, function () {
d.insert({ planet: 'Mars' }, function () {
d.insert({ planet: 'Saturn' }, function () {
d.find({}, function (err, docs) {
docs.length.should.equal(3);
// Remove two docs simultaneously
var toRemove = ['Mars', 'Saturn'];
async.each(toRemove, function(planet, cb) {
d.remove({ planet: planet }, function (err) { return cb(err); });
}, function (err) {
d.find({}, function (err, docs) {
docs.length.should.equal(1);
done();
});
});
});
});
});
});
});
}); // ==== End of 'Remove' ==== //

Loading…
Cancel
Save