From 088af2a0b88a1ff367a2b812fc83e4aea54940e2 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 29 Oct 2021 12:48:16 +0200 Subject: [PATCH] stream: return readable stream on promisified pipeline --- lib/stream/promises.js | 8 ++++++-- test/parallel/test-stream-pipeline.js | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 0db01a8b208d60..1d5bbc25b4d894 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -23,11 +23,15 @@ function pipeline(...streams) { signal = options.signal; } - pl(streams, (err, value) => { + const stream = pl(streams, (err, value) => { if (err) { reject(err); - } else { + } else if (value !== undefined) { resolve(value); + } else if (stream.readable) { + resolve(stream); + } else { + resolve(); } }, { signal }); }); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 061ef923d03a59..892fbb2398e7ac 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1447,3 +1447,29 @@ const tsp = require('timers/promises'); assert.strictEqual(text, 'Hello World!'); })); } + +{ + const pipelinePromise = promisify(pipeline); + + async function run() { + const read = new Readable({ + read() {} + }); + + const duplex = new PassThrough(); + + read.push('data'); + read.push(null); + + const stream = await pipelinePromise(read, duplex); + + let ret = '' + for await (const x of stream) { + ret += x; + } + + assert.strictEqual(ret, 'data'); + } + + run(); +}