mirror of https://github.com/seald/nedb
Merge pull request #5 from eliot-akira/stream-files-by-line
Stream database file line by line to avoid string length limit in Node.jspull/10/head
commit
1d8c88842b
@ -0,0 +1 @@ |
||||
module.exports = {} |
@ -0,0 +1,153 @@ |
||||
// Forked from https://github.com/jahewson/node-byline
|
||||
|
||||
// Copyright (C) 2011-2015 John Hewson
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
// of this software and associated documentation files (the "Software"), to
|
||||
// deal in the Software without restriction, including without limitation the
|
||||
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
// sell copies of the Software, and to permit persons to whom the Software is
|
||||
// furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
// IN THE SOFTWARE.
|
||||
|
||||
const stream = require('stream') |
||||
const util = require('util') |
||||
const timers = require('timers') |
||||
|
||||
// convinience API
|
||||
module.exports = function (readStream, options) { |
||||
return module.exports.createStream(readStream, options) |
||||
} |
||||
|
||||
// basic API
|
||||
module.exports.createStream = function (readStream, options) { |
||||
if (readStream) { |
||||
return createLineStream(readStream, options) |
||||
} else { |
||||
return new LineStream(options) |
||||
} |
||||
} |
||||
|
||||
// deprecated API
|
||||
module.exports.createLineStream = function (readStream) { |
||||
console.log('WARNING: byline#createLineStream is deprecated and will be removed soon') |
||||
return createLineStream(readStream) |
||||
} |
||||
|
||||
function createLineStream (readStream, options) { |
||||
if (!readStream) { |
||||
throw new Error('expected readStream') |
||||
} |
||||
if (!readStream.readable) { |
||||
throw new Error('readStream must be readable') |
||||
} |
||||
const ls = new LineStream(options) |
||||
readStream.pipe(ls) |
||||
return ls |
||||
} |
||||
|
||||
//
|
||||
// using the new node v0.10 "streams2" API
|
||||
//
|
||||
|
||||
module.exports.LineStream = LineStream |
||||
|
||||
function LineStream (options) { |
||||
stream.Transform.call(this, options) |
||||
options = options || {} |
||||
|
||||
// use objectMode to stop the output from being buffered
|
||||
// which re-concatanates the lines, just without newlines.
|
||||
this._readableState.objectMode = true |
||||
this._lineBuffer = [] |
||||
this._keepEmptyLines = options.keepEmptyLines || false |
||||
this._lastChunkEndedWithCR = false |
||||
|
||||
// take the source's encoding if we don't have one
|
||||
const self = this |
||||
this.on('pipe', function (src) { |
||||
if (!self.encoding) { |
||||
// but we can't do this for old-style streams
|
||||
if (src instanceof stream.Readable) { |
||||
self.encoding = src._readableState.encoding |
||||
} |
||||
} |
||||
}) |
||||
} |
||||
util.inherits(LineStream, stream.Transform) |
||||
|
||||
LineStream.prototype._transform = function (chunk, encoding, done) { |
||||
// decode binary chunks as UTF-8
|
||||
encoding = encoding || 'utf8' |
||||
|
||||
if (Buffer.isBuffer(chunk)) { |
||||
if (encoding === 'buffer') { |
||||
chunk = chunk.toString() // utf8
|
||||
encoding = 'utf8' |
||||
} else { |
||||
chunk = chunk.toString(encoding) |
||||
} |
||||
} |
||||
this._chunkEncoding = encoding |
||||
|
||||
// see: http://www.unicode.org/reports/tr18/#Line_Boundaries
|
||||
const lines = chunk.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g) |
||||
|
||||
// don't split CRLF which spans chunks
|
||||
if (this._lastChunkEndedWithCR && chunk[0] === '\n') { |
||||
lines.shift() |
||||
} |
||||
|
||||
if (this._lineBuffer.length > 0) { |
||||
this._lineBuffer[this._lineBuffer.length - 1] += lines[0] |
||||
lines.shift() |
||||
} |
||||
|
||||
this._lastChunkEndedWithCR = chunk[chunk.length - 1] === '\r' |
||||
this._lineBuffer = this._lineBuffer.concat(lines) |
||||
this._pushBuffer(encoding, 1, done) |
||||
} |
||||
|
||||
LineStream.prototype._pushBuffer = function (encoding, keep, done) { |
||||
// always buffer the last (possibly partial) line
|
||||
while (this._lineBuffer.length > keep) { |
||||
const line = this._lineBuffer.shift() |
||||
// skip empty lines
|
||||
if (this._keepEmptyLines || line.length > 0) { |
||||
if (!this.push(this._reencode(line, encoding))) { |
||||
// when the high-water mark is reached, defer pushes until the next tick
|
||||
timers.setImmediate(() => { |
||||
this._pushBuffer(encoding, keep, done) |
||||
}) |
||||
return |
||||
} |
||||
} |
||||
} |
||||
done() |
||||
} |
||||
|
||||
LineStream.prototype._flush = function (done) { |
||||
this._pushBuffer(this._chunkEncoding, 0, done) |
||||
} |
||||
|
||||
// see Readable::push
|
||||
LineStream.prototype._reencode = function (line, chunkEncoding) { |
||||
if (this.encoding && this.encoding !== chunkEncoding) { |
||||
return Buffer.from(line, chunkEncoding).toString(this.encoding) |
||||
} else if (this.encoding) { |
||||
// this should be the most common case, i.e. we're using an encoded source stream
|
||||
return line |
||||
} else { |
||||
return Buffer.from(line, chunkEncoding) |
||||
} |
||||
} |
@ -0,0 +1,207 @@ |
||||
/* eslint-env mocha */ |
||||
// Copyright (C) 2013-2015 John Hewson
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
// of this software and associated documentation files (the "Software"), to
|
||||
// deal in the Software without restriction, including without limitation the
|
||||
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
// sell copies of the Software, and to permit persons to whom the Software is
|
||||
// furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
// IN THE SOFTWARE.
|
||||
|
||||
const chai = require('chai') |
||||
const fs = require('fs') |
||||
const path = require('path') |
||||
const byline = require('../lib/byline') |
||||
|
||||
const { assert } = chai |
||||
|
||||
const regEx = /\r\n|[\n\v\f\r\x85\u2028\u2029]/g |
||||
const localPath = file => path.join(__dirname, 'byline', file) |
||||
|
||||
describe('byline', function () { |
||||
it('should pipe a small file', function (done) { |
||||
const input = fs.createReadStream(localPath('empty.txt')) |
||||
const lineStream = byline(input) // convinience API
|
||||
const output = fs.createWriteStream(localPath('test.txt')) |
||||
lineStream.pipe(output) |
||||
output.on('close', function () { |
||||
const out = fs.readFileSync(localPath('test.txt'), 'utf8') |
||||
const in_ = fs.readFileSync(localPath('empty.txt'), 'utf8').replace(/\n/g, '') |
||||
assert.equal(in_, out) |
||||
fs.unlinkSync(localPath('test.txt')) |
||||
done() |
||||
}) |
||||
}) |
||||
|
||||
it('should work with streams2 API', function (done) { |
||||
let stream = fs.createReadStream(localPath('empty.txt')) |
||||
stream = byline.createStream(stream) |
||||
|
||||
stream.on('readable', function () { |
||||
while (stream.read() !== null) { |
||||
// eslint-ignore-line no-empty
|
||||
} |
||||
}) |
||||
|
||||
stream.on('end', function () { |
||||
done() |
||||
}) |
||||
}) |
||||
|
||||
it('should ignore empty lines by default', function (done) { |
||||
const input = fs.createReadStream(localPath('empty.txt')) |
||||
const lineStream = byline(input) |
||||
lineStream.setEncoding('utf8') |
||||
|
||||
const lines1 = [] |
||||
lineStream.on('data', function (line) { |
||||
lines1.push(line) |
||||
}) |
||||
|
||||
lineStream.on('end', function () { |
||||
let lines2 = fs.readFileSync(localPath('empty.txt'), 'utf8').split(regEx) |
||||
lines2 = lines2.filter(function (line) { |
||||
return line.length > 0 |
||||
}) |
||||
assert.deepEqual(lines2, lines1) |
||||
done() |
||||
}) |
||||
}) |
||||
|
||||
it('should keep empty lines when keepEmptyLines is true', function (done) { |
||||
const input = fs.createReadStream(localPath('empty.txt')) |
||||
const lineStream = byline(input, { keepEmptyLines: true }) |
||||
lineStream.setEncoding('utf8') |
||||
|
||||
const lines = [] |
||||
lineStream.on('data', function (line) { |
||||
lines.push(line) |
||||
}) |
||||
|
||||
lineStream.on('end', function () { |
||||
assert.deepEqual(['', '', '', '', '', 'Line 6'], lines) |
||||
done() |
||||
}) |
||||
}) |
||||
|
||||
it('should not split a CRLF which spans two chunks', function (done) { |
||||
const input = fs.createReadStream(localPath('CRLF.txt')) |
||||
const lineStream = byline(input, { keepEmptyLines: true }) |
||||
lineStream.setEncoding('utf8') |
||||
|
||||
const lines = [] |
||||
lineStream.on('data', function (line) { |
||||
lines.push(line) |
||||
}) |
||||
|
||||
lineStream.on('end', function () { |
||||
assert.equal(2, lines.length) |
||||
done() |
||||
}) |
||||
}) |
||||
|
||||
it('should read a large file', function (done) { |
||||
readFile(localPath('rfc.txt'), done) |
||||
}) |
||||
|
||||
it('should read a huge file', function (done) { |
||||
// Readable highWaterMark is 16384, so we test a file with more lines than this
|
||||
readFile(localPath('rfc_huge.txt'), done) |
||||
}) |
||||
|
||||
function readFile (filename, done) { |
||||
const input = fs.createReadStream(filename) |
||||
const lineStream = byline(input) |
||||
lineStream.setEncoding('utf8') |
||||
|
||||
let lines2 = fs.readFileSync(filename, 'utf8').split(regEx) |
||||
lines2 = lines2.filter(function (line) { |
||||
return line.length > 0 |
||||
}) |
||||
|
||||
const lines1 = [] |
||||
let i = 0 |
||||
lineStream.on('data', function (line) { |
||||
lines1.push(line) |
||||
if (line !== lines2[i]) { |
||||
console.log('EXPECTED:', lines2[i]) |
||||
console.log(' GOT:', line) |
||||
assert.fail(null, null, 'difference at line ' + (i + 1)) |
||||
} |
||||
i++ |
||||
}) |
||||
|
||||
lineStream.on('end', function () { |
||||
assert.equal(lines2.length, lines1.length) |
||||
assert.deepEqual(lines2, lines1) |
||||
done() |
||||
}) |
||||
} |
||||
|
||||
it('should handle encodings like fs', function (done) { |
||||
areStreamsEqualTypes(undefined, function () { |
||||
areStreamsEqualTypes({ encoding: 'utf8' }, function () { |
||||
done() |
||||
}) |
||||
}) |
||||
}) |
||||
|
||||
it('should pause() and resume() with a huge file', function (done) { |
||||
const input = fs.createReadStream(localPath('rfc_huge.txt')) |
||||
const lineStream = byline(input) |
||||
lineStream.setEncoding('utf8') |
||||
|
||||
let lines2 = fs.readFileSync(localPath('rfc_huge.txt'), 'utf8').split(regEx) |
||||
lines2 = lines2.filter(function (line) { |
||||
return line.length > 0 |
||||
}) |
||||
|
||||
const lines1 = [] |
||||
let i = 0 |
||||
lineStream.on('data', function (line) { |
||||
lines1.push(line) |
||||
if (line !== lines2[i]) { |
||||
console.log('EXPECTED:', lines2[i]) |
||||
console.log(' GOT:', line) |
||||
assert.fail(null, null, 'difference at line ' + (i + 1)) |
||||
} |
||||
i++ |
||||
|
||||
// pause/resume
|
||||
lineStream.pause() |
||||
setImmediate(function () { |
||||
lineStream.resume() |
||||
}) |
||||
}) |
||||
|
||||
lineStream.on('end', function () { |
||||
assert.equal(lines2.length, lines1.length) |
||||
assert.deepEqual(lines2, lines1) |
||||
done() |
||||
}) |
||||
}) |
||||
|
||||
function areStreamsEqualTypes (options, callback) { |
||||
const fsStream = fs.createReadStream(localPath('empty.txt'), options) |
||||
const lineStream = byline(fs.createReadStream(localPath('empty.txt'), options)) |
||||
fsStream.on('data', function (data1) { |
||||
lineStream.on('data', function (data2) { |
||||
assert.equal(Buffer.isBuffer(data1), Buffer.isBuffer(data2)) |
||||
}) |
||||
lineStream.on('end', function () { |
||||
callback() |
||||
}) |
||||
}) |
||||
} |
||||
}) |
File diff suppressed because one or more lines are too long
@ -0,0 +1,6 @@ |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Line 6 |
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue