Skip to content

Commit

Permalink
stream: support dispose in writable
Browse files Browse the repository at this point in the history
Add support to Symbol.asyncDispose in writable streams.
Additionally add a test for writable, transform and duplex streams
who inherit from readable/writable to avoid breakage.

Co-authored-by: Robert Nagy <[email protected]>
Co-authored-by: atlowChemi <[email protected]>
PR-URL: #48547
Reviewed-By: Chemi Atlow <[email protected]>
Reviewed-By: Moshe Atlow <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
3 people authored and marco-ippolito committed Jul 19, 2024
1 parent 91c05f3 commit 71af3e8
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 10 deletions.
11 changes: 11 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,17 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Writable` stream.

##### `writable[Symbol.asyncDispose]()`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
Calls [`writable.destroy()`][writable-destroy] with an `AbortError` and returns
a promise that fulfills when the stream is finished.

##### `writable.write(chunk[, encoding][, callback])`

<!-- YAML
Expand Down
37 changes: 27 additions & 10 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ const {
ObjectDefineProperty,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
StringPrototypeToLowerCase,
Symbol,
SymbolAsyncDispose,
SymbolHasInstance,
} = primordials;

Expand All @@ -44,6 +46,7 @@ const EE = require('events');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');

const {
addAbortSignal,
Expand All @@ -54,16 +57,19 @@ const {
getDefaultHighWaterMark,
} = require('internal/streams/state');
const {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
} = require('internal/errors').codes;
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
},
} = require('internal/errors');
const {
kState,
// bitfields
Expand Down Expand Up @@ -1142,3 +1148,14 @@ Writable.fromWeb = function(writableStream, options) {
Writable.toWeb = function(streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
};

Writable.prototype[SymbolAsyncDispose] = function() {
let error;
if (!this.destroyed) {
error = this.writableFinished ? null : new AbortError();
this.destroy(error);
}
return new Promise((resolve, reject) =>
eos(this, (err) => (err && err.name !== 'AbortError' ? reject(err) : resolve(null))),
);
};
15 changes: 15 additions & 0 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,18 @@ const assert = require('assert');
}));
duplex.destroy();
}

{
// Check Symbol.asyncDispose
const duplex = new Duplex({
write(chunk, enc, cb) { cb(); },
read() {},
});
let count = 0;
duplex.on('error', common.mustCall((e) => {
assert.strictEqual(count++, 0); // Ensure not called twice
assert.strictEqual(e.name, 'AbortError');
}));
duplex.on('close', common.mustCall());
duplex[Symbol.asyncDispose]().then(common.mustCall());
}
11 changes: 11 additions & 0 deletions test/parallel/test-stream-transform-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,14 @@ const assert = require('assert');

transform.destroy();
}

{
const transform = new Transform({
transform(chunk, enc, cb) {}
});
transform.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
transform.on('close', common.mustCall());
transform[Symbol.asyncDispose]().then(common.mustCall());
}
12 changes: 12 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,15 @@ const assert = require('assert');
}));
s.destroy(_err);
}

{
const write = new Writable({
write(chunk, enc, cb) { cb(); }
});

write.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
assert.strictEqual(write.destroyed, true);
}));
write[Symbol.asyncDispose]().then(common.mustCall());
}

0 comments on commit 71af3e8

Please sign in to comment.