diff --git a/extension/utils/map-stream.d.ts b/extension/utils/map-stream.d.ts index fd0d906..5d4555f 100644 --- a/extension/utils/map-stream.d.ts +++ b/extension/utils/map-stream.d.ts @@ -5,138 +5,3 @@ import { Duplex } from 'stream'; * @param mapper */ export declare function mapStream(mapper: any): Duplex; -/** - var Stream = require('stream').Stream - - - //create an event stream and apply function to each .write - //emitting each response as data - //unless it's an empty callback - - module.exports = function (mapper, opts) { - - var stream = new Stream() - , inputs = 0 - , outputs = 0 - , ended = false - , paused = false - , destroyed = false - , lastWritten = 0 - , inNext = false - - opts = opts || {}; - var errorEventName = opts.failures ? 'failure' : 'error'; - - // Items that are not ready to be written yet (because they would come out of - // order) get stuck in a queue for later. - var writeQueue = {} - - stream.writable = true - stream.readable = true - - function queueData (data, number) { - var nextToWrite = lastWritten + 1 - - if (number === nextToWrite) { - // If it's next, and its not undefined write it - if (data !== undefined) { - stream.emit.apply(stream, ['data', data]) - } - lastWritten ++ - nextToWrite ++ - } else { - // Otherwise queue it for later. - writeQueue[number] = data - } - - // If the next value is in the queue, write it - if (writeQueue.hasOwnProperty(nextToWrite)) { - var dataToWrite = writeQueue[nextToWrite] - delete writeQueue[nextToWrite] - return queueData(dataToWrite, nextToWrite) - } - - outputs ++ - if(inputs === outputs) { - if(paused) paused = false, stream.emit('drain') //written all the incoming events - if(ended) end() - } - } - - function next (err, data, number) { - if(destroyed) return - inNext = true - - if (!err || opts.failures) { - queueData(data, number) - } - - if (err) { - stream.emit.apply(stream, [ errorEventName, err ]); - } - - inNext = false; - } - - // Wrap the mapper function by calling its callback with the order number of - // the item in the stream. - function wrappedMapper (input, number, callback) { - return mapper.call(null, input, function(err, data){ - callback(err, data, number) - }) - } - - stream.write = function (data) { - if(ended) throw new Error('map stream is not writable') - inNext = false - inputs ++ - - try { - //catch sync errors and handle them like async errors - var written = wrappedMapper(data, inputs, next) - paused = (written === false) - return !paused - } catch (err) { - //if the callback has been called syncronously, and the error - //has occured in an listener, throw it again. - if(inNext) - throw err - next(err) - return !paused - } - } - - function end (data) { - //if end was called with args, write it, - ended = true //write will emit 'end' if ended is true - stream.writable = false - if(data !== undefined) { - return queueData(data, inputs) - } else if (inputs == outputs) { //wait for processing - stream.readable = false, stream.emit('end'), stream.destroy() - } - } - - stream.end = function (data) { - if(ended) return - end(data) - } - - stream.destroy = function () { - ended = destroyed = true - stream.writable = stream.readable = paused = false - process.nextTick(function () { - stream.emit('close') - }) - } - stream.pause = function () { - paused = true - } - - stream.resume = function () { - paused = false - } - - return stream -} - */ diff --git a/extension/utils/map-stream.js b/extension/utils/map-stream.js index 886985f..2b2040f 100644 --- a/extension/utils/map-stream.js +++ b/extension/utils/map-stream.js @@ -117,138 +117,3 @@ function mapStream(mapper) { return stream; } exports.mapStream = mapStream; -/** - var Stream = require('stream').Stream - - - //create an event stream and apply function to each .write - //emitting each response as data - //unless it's an empty callback - - module.exports = function (mapper, opts) { - - var stream = new Stream() - , inputs = 0 - , outputs = 0 - , ended = false - , paused = false - , destroyed = false - , lastWritten = 0 - , inNext = false - - opts = opts || {}; - var errorEventName = opts.failures ? 'failure' : 'error'; - - // Items that are not ready to be written yet (because they would come out of - // order) get stuck in a queue for later. - var writeQueue = {} - - stream.writable = true - stream.readable = true - - function queueData (data, number) { - var nextToWrite = lastWritten + 1 - - if (number === nextToWrite) { - // If it's next, and its not undefined write it - if (data !== undefined) { - stream.emit.apply(stream, ['data', data]) - } - lastWritten ++ - nextToWrite ++ - } else { - // Otherwise queue it for later. - writeQueue[number] = data - } - - // If the next value is in the queue, write it - if (writeQueue.hasOwnProperty(nextToWrite)) { - var dataToWrite = writeQueue[nextToWrite] - delete writeQueue[nextToWrite] - return queueData(dataToWrite, nextToWrite) - } - - outputs ++ - if(inputs === outputs) { - if(paused) paused = false, stream.emit('drain') //written all the incoming events - if(ended) end() - } - } - - function next (err, data, number) { - if(destroyed) return - inNext = true - - if (!err || opts.failures) { - queueData(data, number) - } - - if (err) { - stream.emit.apply(stream, [ errorEventName, err ]); - } - - inNext = false; - } - - // Wrap the mapper function by calling its callback with the order number of - // the item in the stream. - function wrappedMapper (input, number, callback) { - return mapper.call(null, input, function(err, data){ - callback(err, data, number) - }) - } - - stream.write = function (data) { - if(ended) throw new Error('map stream is not writable') - inNext = false - inputs ++ - - try { - //catch sync errors and handle them like async errors - var written = wrappedMapper(data, inputs, next) - paused = (written === false) - return !paused - } catch (err) { - //if the callback has been called syncronously, and the error - //has occured in an listener, throw it again. - if(inNext) - throw err - next(err) - return !paused - } - } - - function end (data) { - //if end was called with args, write it, - ended = true //write will emit 'end' if ended is true - stream.writable = false - if(data !== undefined) { - return queueData(data, inputs) - } else if (inputs == outputs) { //wait for processing - stream.readable = false, stream.emit('end'), stream.destroy() - } - } - - stream.end = function (data) { - if(ended) return - end(data) - } - - stream.destroy = function () { - ended = destroyed = true - stream.writable = stream.readable = paused = false - process.nextTick(function () { - stream.emit('close') - }) - } - stream.pause = function () { - paused = true - } - - stream.resume = function () { - paused = false - } - - return stream -} - */ diff --git a/extension/utils/map-stream.ts b/extension/utils/map-stream.ts index a2502c4..931e509 100644 --- a/extension/utils/map-stream.ts +++ b/extension/utils/map-stream.ts @@ -130,139 +130,3 @@ export function mapStream(mapper): Duplex{ return stream; } - -/** - var Stream = require('stream').Stream - - - //create an event stream and apply function to each .write - //emitting each response as data - //unless it's an empty callback - - module.exports = function (mapper, opts) { - - var stream = new Stream() - , inputs = 0 - , outputs = 0 - , ended = false - , paused = false - , destroyed = false - , lastWritten = 0 - , inNext = false - - opts = opts || {}; - var errorEventName = opts.failures ? 'failure' : 'error'; - - // Items that are not ready to be written yet (because they would come out of - // order) get stuck in a queue for later. - var writeQueue = {} - - stream.writable = true - stream.readable = true - - function queueData (data, number) { - var nextToWrite = lastWritten + 1 - - if (number === nextToWrite) { - // If it's next, and its not undefined write it - if (data !== undefined) { - stream.emit.apply(stream, ['data', data]) - } - lastWritten ++ - nextToWrite ++ - } else { - // Otherwise queue it for later. - writeQueue[number] = data - } - - // If the next value is in the queue, write it - if (writeQueue.hasOwnProperty(nextToWrite)) { - var dataToWrite = writeQueue[nextToWrite] - delete writeQueue[nextToWrite] - return queueData(dataToWrite, nextToWrite) - } - - outputs ++ - if(inputs === outputs) { - if(paused) paused = false, stream.emit('drain') //written all the incoming events - if(ended) end() - } - } - - function next (err, data, number) { - if(destroyed) return - inNext = true - - if (!err || opts.failures) { - queueData(data, number) - } - - if (err) { - stream.emit.apply(stream, [ errorEventName, err ]); - } - - inNext = false; - } - - // Wrap the mapper function by calling its callback with the order number of - // the item in the stream. - function wrappedMapper (input, number, callback) { - return mapper.call(null, input, function(err, data){ - callback(err, data, number) - }) - } - - stream.write = function (data) { - if(ended) throw new Error('map stream is not writable') - inNext = false - inputs ++ - - try { - //catch sync errors and handle them like async errors - var written = wrappedMapper(data, inputs, next) - paused = (written === false) - return !paused - } catch (err) { - //if the callback has been called syncronously, and the error - //has occured in an listener, throw it again. - if(inNext) - throw err - next(err) - return !paused - } - } - - function end (data) { - //if end was called with args, write it, - ended = true //write will emit 'end' if ended is true - stream.writable = false - if(data !== undefined) { - return queueData(data, inputs) - } else if (inputs == outputs) { //wait for processing - stream.readable = false, stream.emit('end'), stream.destroy() - } - } - - stream.end = function (data) { - if(ended) return - end(data) - } - - stream.destroy = function () { - ended = destroyed = true - stream.writable = stream.readable = paused = false - process.nextTick(function () { - stream.emit('close') - }) - } - stream.pause = function () { - paused = true - } - - stream.resume = function () { - paused = false - } - - return stream -} - */ diff --git a/extension/utils/through.js b/extension/utils/through.js index 372bd58..fb413c7 100644 --- a/extension/utils/through.js +++ b/extension/utils/through.js @@ -33,7 +33,6 @@ function through(write, end) { stream.destroy(); }; stream.write = function (data) { - console.log('data', data); write(_this, data); return !paused; }; @@ -104,104 +103,3 @@ function through(write, end) { return stream; } exports.through = through; -// exports = module.exports = through -// through.through = through -// -// //create a readable writable stream. -// -// function through (write, end, opts) { -// write = write || function (data) { this.queue(data) } -// end = end || function () { this.queue(null) } -// -// var ended = false, destroyed = false, buffer = [], _ended = false -// var stream = new Stream() -// stream.readable = stream.writable = true -// stream.paused = false -// -// // stream.autoPause = !(opts && opts.autoPause === false) -// stream.autoDestroy = !(opts && opts.autoDestroy === false) -// -// stream.write = function (data) { -// write.call(this, data) -// return !stream.paused -// } -// -// function drain() { -// while(buffer.length && !stream.paused) { -// var data = buffer.shift() -// if(null === data) -// return stream.emit('end') -// else -// stream.emit('data', data) -// } -// } -// -// stream.queue = stream.push = function (data) { -// // console.error(ended) -// if(_ended) return stream -// if(data === null) _ended = true -// buffer.push(data) -// drain() -// return stream -// } -// -// //this will be registered as the first 'end' listener -// //must call destroy next tick, to make sure we're after any -// //stream piped from here. -// //this is only a problem if end is not emitted synchronously. -// //a nicer way to do this is to make sure this is the last listener for 'end' -// -// stream.on('end', function () { -// stream.readable = false -// if(!stream.writable && stream.autoDestroy) -// process.nextTick(function () { -// stream.destroy() -// }) -// }) -// -// function _end () { -// stream.writable = false -// end.call(stream) -// if(!stream.readable && stream.autoDestroy) -// stream.destroy() -// } -// -// stream.end = function (data) { -// if(ended) return -// ended = true -// if(arguments.length) stream.write(data) -// _end() // will emit or queue -// return stream -// } -// -// stream.destroy = function () { -// if(destroyed) return -// destroyed = true -// ended = true -// buffer.length = 0 -// stream.writable = stream.readable = false -// stream.emit('close') -// return stream -// } -// -// stream.pause = function () { -// if(stream.paused) return -// stream.paused = true -// return stream -// } -// -// stream.resume = function () { -// if(stream.paused) { -// stream.paused = false -// stream.emit('resume') -// } -// drain() -// //may have become paused again, -// //as drain emits 'data'. -// if(!stream.paused) -// stream.emit('drain') -// return stream -// } -// return stream -// } -// diff --git a/extension/utils/through.ts b/extension/utils/through.ts index 4750531..7959355 100644 --- a/extension/utils/through.ts +++ b/extension/utils/through.ts @@ -37,7 +37,6 @@ export function through(write: (stream:Duplex, data?:any) => void, end: (Duplex) }; stream.write = (data) => { - console.log('data', data) write(this, data); return !paused; }; @@ -114,105 +113,3 @@ export function through(write: (stream:Duplex, data?:any) => void, end: (Duplex) return stream; } - -// exports = module.exports = through -// through.through = through -// -// //create a readable writable stream. -// -// function through (write, end, opts) { -// write = write || function (data) { this.queue(data) } -// end = end || function () { this.queue(null) } -// -// var ended = false, destroyed = false, buffer = [], _ended = false -// var stream = new Stream() -// stream.readable = stream.writable = true -// stream.paused = false -// -// // stream.autoPause = !(opts && opts.autoPause === false) -// stream.autoDestroy = !(opts && opts.autoDestroy === false) -// -// stream.write = function (data) { -// write.call(this, data) -// return !stream.paused -// } -// -// function drain() { -// while(buffer.length && !stream.paused) { -// var data = buffer.shift() -// if(null === data) -// return stream.emit('end') -// else -// stream.emit('data', data) -// } -// } -// -// stream.queue = stream.push = function (data) { -// // console.error(ended) -// if(_ended) return stream -// if(data === null) _ended = true -// buffer.push(data) -// drain() -// return stream -// } -// -// //this will be registered as the first 'end' listener -// //must call destroy next tick, to make sure we're after any -// //stream piped from here. -// //this is only a problem if end is not emitted synchronously. -// //a nicer way to do this is to make sure this is the last listener for 'end' -// -// stream.on('end', function () { -// stream.readable = false -// if(!stream.writable && stream.autoDestroy) -// process.nextTick(function () { -// stream.destroy() -// }) -// }) -// -// function _end () { -// stream.writable = false -// end.call(stream) -// if(!stream.readable && stream.autoDestroy) -// stream.destroy() -// } -// -// stream.end = function (data) { -// if(ended) return -// ended = true -// if(arguments.length) stream.write(data) -// _end() // will emit or queue -// return stream -// } -// -// stream.destroy = function () { -// if(destroyed) return -// destroyed = true -// ended = true -// buffer.length = 0 -// stream.writable = stream.readable = false -// stream.emit('close') -// return stream -// } -// -// stream.pause = function () { -// if(stream.paused) return -// stream.paused = true -// return stream -// } -// -// stream.resume = function () { -// if(stream.paused) { -// stream.paused = false -// stream.emit('resume') -// } -// drain() -// //may have become paused again, -// //as drain emits 'data'. -// if(!stream.paused) -// stream.emit('drain') -// return stream -// } -// return stream -// } -//