diff --git a/doc/api/stream.md b/doc/api/stream.md index 6a22446743ec3c..529bf2fa0152bb 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js: is written and read (for example, [`zlib.createDeflate()`][]). Additionally, this module includes the utility functions -[`stream.pipeline()`][], [`stream.finished()`][] and -[`stream.Readable.from()`][]. +[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][] +and [`stream.addAbortSignal()`][]. ### Streams Promises API +* `signal` {AbortSignal} A signal representing possible cancellation +* `stream` {Stream} a stream to attach a signal to + +Attaches an AbortSignal to a readable or writeable stream. This lets code +control stream destruction using an `AbortController`. + +Calling `abort` on the `AbortController` corresponding to the passed +`AbortSignal` will behave the same way as calling `.destroy(new AbortError())` +on the stream. + +```js +const fs = require('fs'); + +const controller = new AbortController(); +const read = addAbortSignal( + controller.signal, + fs.createReadStream(('object.json')) +); +// Later, abort the operation closing the stream +controller.abort(); +``` + +Or using an `AbortSignal` with a readable stream as an async iterable: + +```js +const controller = new AbortController(); +setTimeout(() => controller.abort(), 10_000); // set a timeout +const stream = addAbortSignal( + controller.signal, + fs.createReadStream(('object.json')) +); +(async () => { + try { + for await (const chunk of stream) { + await process(chunk); + } + } catch (e) { + if (e.name === 'AbortError') { + // The operation was cancelled + } else { + throw e; + } + } +})(); +``` ## API for stream implementers @@ -3123,6 +3172,7 @@ contain multi-byte characters. [`stream.finished()`]: #stream_stream_finished_stream_options_callback [`stream.pipe()`]: #stream_readable_pipe_destination_options [`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback +[`stream.addAbortSignal()`]: #stream_stream_addabortsignal_signal_stream [`stream.uncork()`]: #stream_writable_uncork [`stream.unpipe()`]: #stream_readable_unpipe_destination [`stream.wrap()`]: #stream_readable_wrap_stream diff --git a/lib/_http_client.js b/lib/_http_client.js index 75d36f19e6c0d0..6fb5dd65cb368c 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -51,7 +51,7 @@ const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { URL, urlToOptions, searchParamsSymbol } = require('internal/url'); const { kOutHeaders, kNeedDrain } = require('internal/http'); -const { AbortError, connResetException, codes } = require('internal/errors'); +const { connResetException, codes } = require('internal/errors'); const { ERR_HTTP_HEADERS_SENT, ERR_INVALID_ARG_TYPE, @@ -61,7 +61,6 @@ const { } = codes; const { validateInteger, - validateAbortSignal, } = require('internal/validators'); const { getTimerDuration } = require('internal/timers'); const { @@ -69,6 +68,8 @@ const { DTRACE_HTTP_CLIENT_RESPONSE } = require('internal/dtrace'); +const { addAbortSignal } = require('stream'); + const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/; const kError = Symbol('kError'); @@ -174,12 +175,7 @@ function ClientRequest(input, options, cb) { const signal = options.signal; if (signal) { - validateAbortSignal(signal, 'options.signal'); - const listener = (e) => this.destroy(new AbortError()); - signal.addEventListener('abort', listener); - this.once('close', () => { - signal.removeEventListener('abort', listener); - }); + addAbortSignal(signal, this); } let method = options.method; const methodIsString = (typeof method === 'string'); diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js new file mode 100644 index 00000000000000..27fefe96d21cf9 --- /dev/null +++ b/lib/internal/streams/add-abort-signal.js @@ -0,0 +1,41 @@ +'use strict'; + +const { + AbortError, + codes, +} = require('internal/errors'); + +const eos = require('internal/streams/end-of-stream'); +const { ERR_INVALID_ARG_TYPE } = codes; + +// This method is inlined here for readable-stream +// https://github.com/nodejs/node/pull/36061#discussion_r533718029 +const validateAbortSignal = (signal, name) => { + if (signal !== undefined && + (signal === null || + typeof signal !== 'object' || + !('aborted' in signal))) { + throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal); + } +}; + +function isStream(obj) { + return !!(obj && typeof obj.pipe === 'function'); +} + +module.exports = function addAbortSignal(signal, stream) { + validateAbortSignal(signal, 'signal'); + if (!isStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream); + } + const onAbort = () => { + stream.destroy(new AbortError()); + }; + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener('abort', onAbort); + eos(stream, () => signal.removeEventListener('abort', onAbort)); + } + return stream; +}; diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 93153908fef4e1..105caa1e151486 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -50,6 +50,7 @@ const { getHighWaterMark, getDefaultHighWaterMark } = require('internal/streams/state'); + const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PUSH_AFTER_EOF, diff --git a/lib/stream.js b/lib/stream.js index 11f5ca997fef34..fb6cf416cde991 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -43,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); Stream.PassThrough = require('internal/streams/passthrough'); Stream.pipeline = pipeline; +Stream.addAbortSignal = require('internal/streams/add-abort-signal'); Stream.finished = eos; function lazyLoadPromises() { diff --git a/node.gyp b/node.gyp index 643c8a43c6ff9c..865a7de93176e5 100644 --- a/node.gyp +++ b/node.gyp @@ -245,6 +245,7 @@ 'lib/internal/worker/js_transferable.js', 'lib/internal/watchdog.js', 'lib/internal/streams/lazy_transform.js', + 'lib/internal/streams/add-abort-signal.js', 'lib/internal/streams/buffer_list.js', 'lib/internal/streams/duplexpair.js', 'lib/internal/streams/from.js', diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 2c14e1ac319883..5754fb567ada15 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -78,6 +78,7 @@ const expectedModules = new Set([ 'NativeModule internal/process/warning', 'NativeModule internal/querystring', 'NativeModule internal/source_map/source_map_cache', + 'NativeModule internal/streams/add-abort-signal', 'NativeModule internal/streams/buffer_list', 'NativeModule internal/streams/destroy', 'NativeModule internal/streams/duplex', diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 23887122fc78d8..5b12531f92e7a4 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -8,7 +8,8 @@ const { Transform, pipeline, PassThrough, - Duplex + Duplex, + addAbortSignal, } = require('stream'); const assert = require('assert'); const http = require('http'); @@ -1261,3 +1262,32 @@ const net = require('net'); () => common.mustNotCall(), ); } + + +{ + const ac = new AbortController(); + const r = Readable.from(async function* () { + for (let i = 0; i < 10; i++) { + await Promise.resolve(); + yield String(i); + if (i === 5) { + ac.abort(); + } + } + }()); + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + const cb = common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.strictEqual(res, '012345'); + assert.strictEqual(w.destroyed, true); + assert.strictEqual(r.destroyed, true); + assert.strictEqual(pipelined.destroyed, true); + }); + const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb)); +} diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 8ab78ec8ccec35..1e765f36977e5b 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -1,7 +1,7 @@ 'use strict'; const common = require('../common'); -const { Readable } = require('stream'); +const { Readable, addAbortSignal } = require('stream'); const assert = require('assert'); { @@ -268,3 +268,38 @@ const assert = require('assert'); })); read.resume(); } + +{ + const controller = new AbortController(); + const read = addAbortSignal(controller.signal, new Readable({ + read() { + this.push('asd'); + }, + })); + + read.on('error', common.mustCall((e) => { + assert.strictEqual(e.name, 'AbortError'); + })); + controller.abort(); + read.on('data', common.mustNotCall()); +} + +{ + const controller = new AbortController(); + const read = addAbortSignal(controller.signal, new Readable({ + objectMode: true, + read() { + return false; + } + })); + read.push('asd'); + + read.on('error', common.mustCall((e) => { + assert.strictEqual(e.name, 'AbortError'); + })); + assert.rejects((async () => { + /* eslint-disable-next-line no-unused-vars */ + for await (const chunk of read) {} + })(), /AbortError/); + setTimeout(() => controller.abort(), 0); +} diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index dca01724f73b3b..0fcaf892897633 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -1,7 +1,7 @@ 'use strict'; const common = require('../common'); -const { Writable } = require('stream'); +const { Writable, addAbortSignal } = require('stream'); const assert = require('assert'); { @@ -417,3 +417,17 @@ const assert = require('assert'); })); write.write('asd'); } + +{ + const ac = new AbortController(); + const write = addAbortSignal(ac.signal, new Writable({ + write(chunk, enc, cb) { cb(); } + })); + + write.on('error', common.mustCall((e) => { + assert.strictEqual(e.name, 'AbortError'); + assert.strictEqual(write.destroyed, true); + })); + write.write('asd'); + ac.abort(); +}