Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: convert string to Buffer when calling unshift(<string>) #27194

Merged
merged 1 commit into from
Jun 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2349,7 +2349,7 @@ such as `process.stdout.on('data')`.
[`sign.sign()`]: crypto.html#crypto_sign_sign_privatekey_outputencoding
[`stream.pipe()`]: stream.html#stream_readable_pipe_destination_options
[`stream.push()`]: stream.html#stream_readable_push_chunk_encoding
[`stream.unshift()`]: stream.html#stream_readable_unshift_chunk
[`stream.unshift()`]: stream.html#stream_readable_unshift_chunk_encoding
[`stream.write()`]: stream.html#stream_writable_write_chunk_encoding_callback
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
[`subprocess.send()`]: child_process.html#child_process_subprocess_send_message_sendhandle_options_callback
Expand Down
4 changes: 3 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ setTimeout(() => {
}, 1000);
```

##### readable.unshift(chunk)
##### readable.unshift(chunk[, encoding])
<!-- YAML
added: v0.9.11
changes:
Expand All @@ -1208,6 +1208,8 @@ changes:
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.
* `encoding` {string} Encoding of string chunks. Must be a valid
`Buffer` encoding, such as `'utf8'` or `'ascii'`.

The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by
Expand Down
32 changes: 18 additions & 14 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,28 @@ Readable.prototype._destroy = function(err, cb) {
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
const state = this._readableState;
var skipChunkCheck;
return readableAddChunk(this, chunk, encoding, false);
};

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, true);
};

function readableAddChunk(stream, chunk, encoding, addToFront) {
debug('readableAddChunk', chunk);
const state = stream._readableState;

let skipChunkCheck;

if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
if (addToFront && state.encoding && state.encoding !== encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
Expand All @@ -223,17 +238,6 @@ Readable.prototype.push = function(chunk, encoding) {
skipChunkCheck = true;
}

return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk) {
return readableAddChunk(this, chunk, null, true, false);
};

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
debug('readableAddChunk', chunk);
const state = stream._readableState;
if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
Expand Down
187 changes: 187 additions & 0 deletions test/parallel/test-stream-readable-unshift.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');

{
// Check that strings are saved as Buffer
const readable = new Readable({ read() {} });

const string = 'abc';

readable.on('data', common.mustCall((chunk) => {
assert(Buffer.isBuffer(chunk));
assert.strictEqual(chunk.toString('utf8'), string);
}, 1));

readable.unshift(string);

}

{
// Check that data goes at the beginning
const readable = new Readable({ read() {} });
const unshift = 'front';
const push = 'back';

const expected = [unshift, push];
readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk.toString('utf8'), expected.shift());
}, 2));


readable.push(push);
readable.unshift(unshift);
}

{
// Check that buffer is saved with correct encoding
const readable = new Readable({ read() {} });

const encoding = 'base64';
const string = Buffer.from('abc').toString(encoding);

readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk.toString(encoding), string);
}, 1));

readable.unshift(string, encoding);

}

{

const streamEncoding = 'base64';

function checkEncoding(readable) {

// chunk encodings
const encodings = ['utf8', 'binary', 'hex', 'base64'];
const expected = [];

readable.on('data', common.mustCall((chunk) => {
const { encoding, string } = expected.pop();
assert.strictEqual(chunk.toString(encoding), string);
}, encodings.length));

for (const encoding of encodings) {
const string = 'abc';

// If encoding is the same as the state.encoding the string is
// saved as is
const expect = encoding !== streamEncoding ?
Buffer.from(string, encoding).toString(streamEncoding) : string;

expected.push({ encoding, string: expect });

readable.unshift(string, encoding);
}
}

const r1 = new Readable({ read() {} });
r1.setEncoding(streamEncoding);
checkEncoding(r1);

const r2 = new Readable({ read() {}, encoding: streamEncoding });
checkEncoding(r2);

}

{
// Both .push & .unshift should have the same behaviour
// When setting an encoding, each chunk should be emitted with that encoding
const encoding = 'base64';

function checkEncoding(readable) {
const string = 'abc';
readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, Buffer.from(string).toString(encoding));
}, 2));

readable.push(string);
readable.unshift(string);
}

const r1 = new Readable({ read() {} });
r1.setEncoding(encoding);
checkEncoding(r1);

const r2 = new Readable({ read() {}, encoding });
checkEncoding(r2);

}

{
// Check that error is thrown for invalid chunks

const readable = new Readable({ read() {} });
function checkError(fn) {
common.expectsError(fn, {
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}

checkError(() => readable.unshift([]));
checkError(() => readable.unshift({}));
checkError(() => readable.unshift(0));

}

{
// Check that ObjectMode works
const readable = new Readable({ objectMode: true, read() {} });

const chunks = ['a', 1, {}, []];

readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, chunks.pop());
}, chunks.length));

for (const chunk of chunks) {
readable.unshift(chunk);
}
}

{

// Should not throw: https://github.com/nodejs/node/issues/27192
const highWaterMark = 50;
class ArrayReader extends Readable {
constructor(opt) {
super({ highWaterMark });
// The error happened only when pushing above hwm
this.buffer = new Array(highWaterMark * 2).fill(0).map(String);
}
_read(size) {
while (this.buffer.length) {
const chunk = this.buffer.shift();
if (!this.buffer.length) {
this.push(chunk);
this.push(null);
return true;
}
if (!this.push(chunk))
return;
}
}
}

function onRead() {
while (null !== (stream.read())) {
// Remove the 'readable' listener before unshifting
stream.removeListener('readable', onRead);
stream.unshift('a');
stream.on('data', (chunk) => {
console.log(chunk.length);
});
break;
}
}

const stream = new ArrayReader();
stream.once('readable', common.mustCall(onRead));
stream.on('end', common.mustCall(() => {}));

}