diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index aad4c594501ba6..96ad194285b0d1 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -41,6 +41,7 @@ Readable.ReadableState = ReadableState; const EE = require('events'); const { Stream, prependListener } = require('internal/streams/legacy'); const { Buffer } = require('buffer'); +const Duplex = require('internal/streams/duplex'); const { addAbortSignal, @@ -87,7 +88,7 @@ function ReadableState(options, stream, isDuplex) { // values for the readable and the writable sides of the duplex stream. // These options can be provided separately as readableXXX and writableXXX. if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; + isDuplex = stream instanceof Duplex; // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. @@ -189,7 +190,7 @@ function Readable(options) { // Checking for a Stream.Duplex instance is faster here instead of inside // the ReadableState constructor, at least with V8 6.5. - const isDuplex = this instanceof Stream.Duplex; + const isDuplex = this instanceof Duplex; this._readableState = new ReadableState(options, this, isDuplex); diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index c4d95df9ac7153..11e68b89fbc612 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -42,6 +42,7 @@ Writable.WritableState = WritableState; const EE = require('events'); const Stream = require('internal/streams/legacy').Stream; +const Duplex = require('internal/streams/duplex'); const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); @@ -81,7 +82,7 @@ function WritableState(options, stream, isDuplex) { // values for the readable and the writable sides of the duplex stream, // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; + isDuplex = stream instanceof Duplex; // Object stream flag to indicate whether or not this stream // contains buffers or objects. @@ -228,7 +229,7 @@ function Writable(options) { // Checking for a Stream.Duplex instance is faster here instead of inside // the WritableState constructor, at least with V8 6.5. - const isDuplex = (this instanceof Stream.Duplex); + const isDuplex = (this instanceof Duplex); if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options); diff --git a/test/parallel/test-stream3-pipeline-async-iterator.js b/test/parallel/test-stream3-pipeline-async-iterator.js new file mode 100644 index 00000000000000..f14031457bd46d --- /dev/null +++ b/test/parallel/test-stream3-pipeline-async-iterator.js @@ -0,0 +1,27 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { pipeline } = require('node:stream/promises'); + +{ + // Ensure that async iterators can act as readable and writable streams + async function* myCustomReadable() { + yield 'Hello'; + yield 'World'; + } + + // eslint-disable-next-line require-yield + async function* myCustomWritable(stream) { + const messages = []; + for await (const chunk of stream) { + messages.push(chunk); + } + assert.deepStrictEqual(messages, ['Hello', 'World']); + } + + pipeline( + myCustomReadable, + myCustomWritable, + ) + .then(common.mustCall()); +}