diff --git a/app/scripts/background.js b/app/scripts/background.js index a4f80d7f6..f3dd8cbb6 100644 --- a/app/scripts/background.js +++ b/app/scripts/background.js @@ -1,5 +1,5 @@ const Dnode = require('dnode') -const Multiplex = require('multiplex') +const ObjectMultiplex = require('./lib/obj-multiplex') const eos = require('end-of-stream') const combineStreams = require('pumpify') const extend = require('xtend') @@ -89,7 +89,7 @@ function onRpcRequest(remoteStream, payload){ function handleInternalCommunication(portStream){ // setup multiplexing - var mx = Multiplex() + var mx = ObjectMultiplex() portStream.pipe(mx).pipe(portStream) mx.on('error', function(err) { console.error(err) @@ -99,15 +99,8 @@ function handleInternalCommunication(portStream){ console.error(err) mx.destroy() }) - var dnodeStream = mx.createSharedStream('dnode') - var providerStream = combineStreams.obj( - jsonStringifyStream(), - mx.createSharedStream('provider'), - jsonParseStream() - ) - - linkDnode(dnodeStream) - handleEthRpcRequestStream(providerStream) + linkDnode(mx.createStream('dnode')) + handleEthRpcRequestStream(mx.createStream('provider')) } function linkDnode(stream){ diff --git a/app/scripts/lib/obj-multiplex.js b/app/scripts/lib/obj-multiplex.js new file mode 100644 index 000000000..333b6061f --- /dev/null +++ b/app/scripts/lib/obj-multiplex.js @@ -0,0 +1,41 @@ +const through = require('through2') + +module.exports = ObjectMultiplex + + +function ObjectMultiplex(opts){ + opts = opts || {} + // create multiplexer + var mx = through.obj(function(chunk, enc, cb) { + var name = chunk.name + var data = chunk.data + var substream = mx.streams[name] + if (!substream) { + console.warn("orphaned data for stream " + name) + } else { + substream.push(data) + } + return cb() + }) + mx.streams = {} + // create substreams + mx.createStream = function(name) { + var substream = mx.streams[name] = through.obj(function(chunk, enc, cb) { + mx.push({ + name: name, + data: chunk, + }) + return cb() + }) + mx.on('end', function() { + return substream.emit('end') + }) + if (opts.error) { + mx.on('error', function() { + return substream.emit('error') + }) + } + return substream + } + return mx +} diff --git a/app/scripts/popup.js b/app/scripts/popup.js index 523ecbd8f..3049ff2c3 100644 --- a/app/scripts/popup.js +++ b/app/scripts/popup.js @@ -1,7 +1,7 @@ const url = require('url') const EventEmitter = require('events').EventEmitter const async = require('async') -const Multiplex = require('multiplex') +const ObjectMultiplex = require('./lib/obj-multiplex') const Dnode = require('dnode') const Web3 = require('web3') const MetaMaskUi = require('../../ui') @@ -9,8 +9,6 @@ const MetaMaskUiCss = require('../../ui/css') const injectCss = require('inject-css') const PortStream = require('./lib/port-stream.js') const StreamProvider = require('./lib/stream-provider.js') -const jsonParseStream = require('./lib/stream-utils.js').jsonParseStream -const jsonStringifyStream = require('./lib/stream-utils.js').jsonStringifyStream // setup app var css = MetaMaskUiCss() @@ -26,7 +24,7 @@ function connectToAccountManager(cb){ var pluginPort = chrome.runtime.connect({name: 'popup'}) var portStream = new PortStream(pluginPort) // setup multiplexing - var mx = Multiplex() + var mx = ObjectMultiplex() portStream.pipe(mx).pipe(portStream) mx.on('error', function(err) { console.error(err) @@ -36,19 +34,13 @@ function connectToAccountManager(cb){ console.error(err) mx.destroy() }) - var dnodeStream = mx.createSharedStream('dnode') - var providerStream = mx.createSharedStream('provider') - linkDnode(dnodeStream, cb) - linkWeb3(providerStream) + linkDnode(mx.createStream('dnode'), cb) + linkWeb3(mx.createStream('provider')) } function linkWeb3(stream){ var remoteProvider = new StreamProvider() - remoteProvider - .pipe(jsonStringifyStream()) - .pipe(stream) - .pipe(jsonParseStream()) - .pipe(remoteProvider) + remoteProvider.pipe(stream).pipe(remoteProvider) stream.on('error', console.error.bind(console)) remoteProvider.on('error', console.error.bind(console)) global.web3 = new Web3(remoteProvider)