diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index 36b55087a2c10..50a1abf00faeb 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -10,7 +10,7 @@ import { pipeline, } from 'node:stream' import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node' -import { DetachedPromise } from '../../lib/detached-promise' +import isError from '../../lib/is-error' export * from './stream-utils.edge' @@ -107,47 +107,52 @@ export function streamFromString(string: string): Readable { return Readable.from(string) } +/** + * This utility function buffers all of the chunks it receives from the input + * during a single "macro-task". The transform function schedules a + * `setImmediate` callback that will push the buffered chunks to the readable. + * The transform also ensures not to execute the final callback too early. The + * overall timing of this utility is very specific and must match that of the + * edge based version. + */ export function createBufferedTransformStream(): Transform { - let buffered: Uint8Array[] = [] - let byteLength = 0 - let pending: DetachedPromise | undefined - - const flush = (transform: Transform) => { - if (pending) return - - const detached = new DetachedPromise() - pending = detached - - setImmediate(() => { - try { - const chunk = new Uint8Array(byteLength) - let copiedBytes = 0 - for (let i = 0; i < buffered.length; i++) { - chunk.set(buffered[i], copiedBytes) - copiedBytes += buffered[i].byteLength - } - buffered.length = 0 - byteLength = 0 - transform.push(chunk) - } catch { - } finally { - pending = undefined - detached.resolve() - } - }) - } + let bufferedChunks: Uint8Array[] = [] + let bufferedChunksByteLength = 0 + let pending = false return new Transform({ transform(chunk, _, callback) { - buffered.push(chunk) - byteLength += chunk.byteLength - flush(this) - callback() + bufferedChunks.push(chunk) + bufferedChunksByteLength += chunk.byteLength + + if (pending) callback() + + pending = true + + setImmediate(() => { + try { + const bufferedChunk = new Uint8Array(bufferedChunksByteLength) + let copiedBytes = 0 + for (let i = 0; i < bufferedChunks.length; i++) { + bufferedChunk.set(bufferedChunks[i], copiedBytes) + copiedBytes += bufferedChunks[i].byteLength + } + bufferedChunks.length = 0 + bufferedChunksByteLength = 0 + callback(null, bufferedChunk) + } catch (err: unknown) { + if (isError(err)) callback(err) + } finally { + pending = false + } + }) }, final(callback) { if (!pending) callback() - pending?.promise.then(() => callback()) + process.nextTick(() => { + callback() + }) }, }) }