Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: need to cleanup event listeners if last stream is readable #41954

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,9 @@ run().catch(console.error);

`stream.pipeline()` leaves dangling event listeners on the streams
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.
failure, this can cause event listener leaks and swallowed errors. If the last
stream is readable, dangling event listeners will be removed so that the last
stream can be consumed later.

`stream.pipeline()` closes all the streams when an error is raised.
The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior
Expand Down
52 changes: 41 additions & 11 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {

const {
isIterable,
isReadable,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');
Expand All @@ -45,14 +46,17 @@ function destroyer(stream, reading, writing) {
finished = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
finished = !err;
});

return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
return {
destroy: (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
},
cleanup
};
}

Expand Down Expand Up @@ -159,6 +163,10 @@ function pipelineImpl(streams, callback, opts) {
const signal = ac.signal;
const outerSignal = opts?.signal;

// Need to cleanup event listeners if last stream is readable
// https://github.com/nodejs/node/issues/35452
const lastStreamCleanup = [];

validateAbortSignal(outerSignal, 'options.signal');

function abort() {
Expand Down Expand Up @@ -194,6 +202,9 @@ function pipelineImpl(streams, callback, opts) {
ac.abort();

if (final) {
if (!error) {
lastStreamCleanup.forEach((fn) => fn());
}
process.nextTick(callback, error, value);
}
}
Expand All @@ -204,22 +215,34 @@ function pipelineImpl(streams, callback, opts) {
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;
const isLastStream = i === streams.length - 1;

if (isNodeStream(stream)) {
if (end) {
destroys.push(destroyer(stream, reading, writing));
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);

if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
}

// Catch stream errors that occur after pipe/pump has completed.
stream.on('error', (err) => {
function onError(err) {
if (
err &&
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
finish(err);
}
});
}
stream.on('error', onError);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener('error', onError);
});
}
}

if (i === 0) {
Expand Down Expand Up @@ -285,12 +308,19 @@ function pipelineImpl(streams, callback, opts) {

ret = pt;

destroys.push(destroyer(ret, false, true));
const { destroy, cleanup } = destroyer(ret, false, true);
destroys.push(destroy);
if (isLastStream) {
lastStreamCleanup.push(cleanup);
}
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
pipe(ret, stream, finish, { end });
const cleanup = pipe(ret, stream, finish, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
Expand Down Expand Up @@ -345,7 +375,7 @@ function pipe(src, dst, finish, { end }) {
finish(err);
}
});
eos(dst, { readable: false, writable: true }, finish);
return eos(dst, { readable: false, writable: true }, finish);
}

module.exports = { pipelineImpl, pipeline };
76 changes: 76 additions & 0 deletions test/parallel/test-stream-pipeline-listeners.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
'use strict';

const common = require('../common');
const { pipeline, Duplex, PassThrough, Writable } = require('stream');
const assert = require('assert');

process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'no way');
}, 2));

// Ensure that listeners is removed if last stream is readble
// And other stream's listeners unchanged
const a = new PassThrough();
a.end('foobar');
const b = new Duplex({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(a, b, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(a.listenerCount('error') > 0);
assert.strictEqual(b.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
b.destroy(new Error('no way'));
}, 100);
}));

// Async generators
const c = new PassThrough();
c.end('foobar');
const d = pipeline(
c,
async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
},
common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(c.listenerCount('error') > 0);
assert.strictEqual(d.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
d.destroy(new Error('no way'));
}, 100);
})
);

// If last stream is not readable, will not throw and remove listeners
const e = new PassThrough();
e.end('foobar');
const f = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(e, f, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(e.listenerCount('error') > 0);
assert(f.listenerCount('error') > 0);
setTimeout(() => {
assert(f.listenerCount('error') > 0);
f.destroy(new Error('no way'));
}, 100);
}));