From a2eb31dd947192b18b0a2bdc037ea449e80c33a9 Mon Sep 17 00:00:00 2001 From: Manish Vachharajani Date: Tue, 3 Apr 2018 10:28:11 -0600 Subject: [PATCH] Fix Issue #30 where ReadableStreamBuffer stalls in certain cases. --- lib/readable_streambuffer.js | 15 ++++++++++++--- test/readable_streambuffer.test.js | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/lib/readable_streambuffer.js b/lib/readable_streambuffer.js index 0316c20..79f63a9 100644 --- a/lib/readable_streambuffer.js +++ b/lib/readable_streambuffer.js @@ -19,6 +19,7 @@ var ReadableStreamBuffer = module.exports = function(opts) { var size = 0; var buffer = new Buffer(initialSize); + var allowPush = false; var sendData = function() { var amount = Math.min(chunkSize, size); @@ -30,6 +31,7 @@ var ReadableStreamBuffer = module.exports = function(opts) { buffer.copy(chunk, 0, 0, amount); sendMore = that.push(chunk) !== false; + allowPush = sendMore; buffer.copy(buffer, 0, amount, size); size -= amount; @@ -76,6 +78,12 @@ var ReadableStreamBuffer = module.exports = function(opts) { } }; + var kickSendDataTask = function () { + if (!sendData.timeout && allowPush) { + sendData.timeout = setTimeout(sendData, frequency); + } + } + this.put = function(data, encoding) { if (that.stopped) { throw new Error('Tried to write data to a stopped ReadableStreamBuffer'); @@ -93,12 +101,13 @@ var ReadableStreamBuffer = module.exports = function(opts) { buffer.write(data, size, encoding || 'utf8'); size += dataSizeInBytes; } + + kickSendDataTask(); }; this._read = function() { - if (!sendData.timeout) { - sendData.timeout = setTimeout(sendData, frequency); - } + allowPush = true; + kickSendDataTask(); }; }; diff --git a/test/readable_streambuffer.test.js b/test/readable_streambuffer.test.js index 470b15d..a75efb3 100644 --- a/test/readable_streambuffer.test.js +++ b/test/readable_streambuffer.test.js @@ -55,6 +55,23 @@ describe('A default ReadableStreamBuffer', function() { this.buffer.stop(); }); + it('pushes new data even if read when empty', function(done) { + var that = this; + var str = ''; + this.buffer.on('readable', function() { + str += (that.buffer.read() || new Buffer(0)).toString('utf8'); + }); + this.buffer.on('end', function() { + expect(str).to.equal(fixtures.unicodeString); + done(); + }); + + setTimeout(function() { + that.buffer.put(fixtures.unicodeString); + that.buffer.stop(); + }, streamBuffer.DEFAULT_FREQUENCY + 1); + }); + describe('when writing binary data', function() { beforeEach(function(done) { var that = this;