Skip to content

Commit

Permalink
Fix Issue samcday#30 where ReadableStreamBuffer stalls in certain cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
mvachhar committed Apr 3, 2018
1 parent 78d2e1a commit aabdc3c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
15 changes: 12 additions & 3 deletions lib/readable_streambuffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var ReadableStreamBuffer = module.exports = function(opts) {

var size = 0;
var buffer = new Buffer(initialSize);
var allowPush = false;

var sendData = function() {
var amount = Math.min(chunkSize, size);
Expand All @@ -30,6 +31,7 @@ var ReadableStreamBuffer = module.exports = function(opts) {
buffer.copy(chunk, 0, 0, amount);

sendMore = that.push(chunk) !== false;
allowPush = sendMore;

buffer.copy(buffer, 0, amount, size);
size -= amount;
Expand Down Expand Up @@ -76,6 +78,12 @@ var ReadableStreamBuffer = module.exports = function(opts) {
}
};

var kickSendDataTask = function () {
if (!sendData.timeout && allowPush) {
sendData.timeout = setTimeout(sendData, frequency);
}
}

this.put = function(data, encoding) {
if (that.stopped) {
throw new Error('Tried to write data to a stopped ReadableStreamBuffer');
Expand All @@ -93,12 +101,13 @@ var ReadableStreamBuffer = module.exports = function(opts) {
buffer.write(data, size, encoding || 'utf8');
size += dataSizeInBytes;
}

kickSendDataTask();
};

this._read = function() {
if (!sendData.timeout) {
sendData.timeout = setTimeout(sendData, frequency);
}
allowPush = true;
kickSendDataTask();
};
};

Expand Down
19 changes: 18 additions & 1 deletion test/readable_streambuffer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ describe('A default ReadableStreamBuffer', function() {
this.buffer.stop();
});

it('pushes new data even if read when empty', function(done) {
var that = this;
var str = '';
this.buffer.on('readable', function() {
str += (that.buffer.read() || new Buffer(0)).toString('utf8');
});
this.buffer.on('end', function() {
expect(str).to.equal(fixtures.unicodeString);
done();
});

setTimeout(function() {
that.buffer.put(fixtures.unicodeString);
that.buffer.stop();
}, streamBuffer.DEFAULT_FREQUENCY + 1);
});

describe('when writing binary data', function() {
beforeEach(function(done) {
var that = this;
Expand Down Expand Up @@ -87,7 +104,7 @@ describe('A default ReadableStreamBuffer', function() {
});
})

describe('when writing binary data larger than initial backing buffer size', function() {
describe('when writing binary data larger than initial backing buffer size', function () {
beforeEach(function() {
this.buffer.pause();
this.buffer.put(fixtures.largeBinaryData);
Expand Down

0 comments on commit aabdc3c

Please sign in to comment.