diff --git a/packages/next/src/server/app-render/app-render.tsx b/packages/next/src/server/app-render/app-render.tsx index 920c87af28cb9..15fd28344a7f6 100644 --- a/packages/next/src/server/app-render/app-render.tsx +++ b/packages/next/src/server/app-render/app-render.tsx @@ -33,6 +33,7 @@ import { continueStaticPrerender, continueDynamicHTMLResume, continueDynamicDataResume, + convertReadable, } from '../stream-utils' import { canSegmentBeOverridden } from '../../client/components/match-segments' import { stripInternalQueries } from '../internal-utils' @@ -1020,21 +1021,22 @@ async function renderToHTMLOrFlightImpl( try { let { stream, postponed, resumed } = await renderer.render(children) - if ( - process.env.NEXT_RUNTIME === 'nodejs' && - !(stream instanceof ReadableStream) - ) { - const { Readable } = require('node:stream') - stream = Readable.toWeb(stream) as ReadableStream - } + // if ( + // process.env.NEXT_RUNTIME === 'nodejs' && + // !(stream instanceof ReadableStream) + // ) { + // const { Readable } = require('node:stream') + // stream = Readable.toWeb(stream) as ReadableStream + // } - // TODO (@Ethan-Arrowood): Remove this when stream utilities support both stream types. - if (!(stream instanceof ReadableStream)) { - throw new Error("Invariant: stream isn't a ReadableStream") - } + // // TODO (@Ethan-Arrowood): Remove this when stream utilities support both stream types. + // if (!(stream instanceof ReadableStream)) { + // throw new Error("Invariant: stream isn't a ReadableStream") + // } const prerenderState = staticGenerationStore.prerenderState if (prerenderState) { + stream = convertReadable(stream) /** * When prerendering there are three outcomes to consider * @@ -1175,6 +1177,7 @@ async function renderToHTMLOrFlightImpl( } } } else if (renderOpts.postponed) { + stream = convertReadable(stream) // This is a continuation of either an Incomplete or Dynamic Data Prerender. const inlinedDataStream = createInlinedDataReadableStream( dataStream, @@ -1198,21 +1201,22 @@ async function renderToHTMLOrFlightImpl( } } } else { + const resultStream = await continueFizzStream(stream, { + inlinedDataStream: createInlinedDataReadableStream( + dataStream, + nonce, + formState + ), + isStaticGeneration: isStaticGeneration || generateStaticHTML, + getServerInsertedHTML, + serverInsertedHTMLToHead: true, + validateRootLayout, + }) // This may be a static render or a dynamic render // @TODO factor this further to make the render types more clearly defined and remove // the deluge of optional params that passed to configure the various behaviors return { - stream: await continueFizzStream(stream, { - inlinedDataStream: createInlinedDataReadableStream( - dataStream, - nonce, - formState - ), - isStaticGeneration: isStaticGeneration || generateStaticHTML, - getServerInsertedHTML, - serverInsertedHTMLToHead: true, - validateRootLayout, - }), + stream: convertReadable(resultStream), } } } catch (err) { @@ -1318,45 +1322,46 @@ async function renderToHTMLOrFlightImpl( }, }) - if ( - process.env.NEXT_RUNTIME === 'nodejs' && - !(fizzStream instanceof ReadableStream) - ) { - const { Readable } = require('node:stream') - - fizzStream = Readable.toWeb( - fizzStream - ) as ReadableStream - } - - // TODO (@Ethan-Arrowood): Remove this when stream utilities support both stream types. - if (!(fizzStream instanceof ReadableStream)) { - throw new Error("Invariant: stream isn't a ReadableStream") - } - + // if ( + // process.env.NEXT_RUNTIME === 'nodejs' && + // !(fizzStream instanceof ReadableStream) + // ) { + // const { Readable } = require('node:stream') + + // fizzStream = Readable.toWeb( + // fizzStream + // ) as ReadableStream + // } + + // // TODO (@Ethan-Arrowood): Remove this when stream utilities support both stream types. + // if (!(fizzStream instanceof ReadableStream)) { + // throw new Error("Invariant: stream isn't a ReadableStream") + // } + + const resultStream = await continueFizzStream(fizzStream, { + inlinedDataStream: createInlinedDataReadableStream( + // This is intentionally using the readable datastream from the + // main render rather than the flight data from the error page + // render + dataStream, + nonce, + formState + ), + isStaticGeneration, + getServerInsertedHTML: makeGetServerInsertedHTML({ + polyfills, + renderServerInsertedHTML, + serverCapturedErrors: [], + basePath: renderOpts.basePath, + }), + serverInsertedHTMLToHead: true, + validateRootLayout, + }) return { // Returning the error that was thrown so it can be used to handle // the response in the caller. err, - stream: await continueFizzStream(fizzStream, { - inlinedDataStream: createInlinedDataReadableStream( - // This is intentionally using the readable datastream from the - // main render rather than the flight data from the error page - // render - dataStream, - nonce, - formState - ), - isStaticGeneration, - getServerInsertedHTML: makeGetServerInsertedHTML({ - polyfills, - renderServerInsertedHTML, - serverCapturedErrors: [], - basePath: renderOpts.basePath, - }), - serverInsertedHTMLToHead: true, - validateRootLayout, - }), + stream: convertReadable(resultStream), } } catch (finalErr: any) { if ( diff --git a/packages/next/src/server/stream-utils/index.d.ts b/packages/next/src/server/stream-utils/index.d.ts index 3b3b1b4d8da5a..fcfc525141e57 100644 --- a/packages/next/src/server/stream-utils/index.d.ts +++ b/packages/next/src/server/stream-utils/index.d.ts @@ -16,5 +16,48 @@ export function renderToInitialFizzStream( export function renderToString(element: React.ReactElement): Promise export function streamToString( - stream: Readable | ReadableStream + stream: Readable | ReadableStream ): Promise + +export function chainStreams( + ...streams: ReadableStream[] +): ReadableStream +export function chainStreams(...streams: Readable[]): Readable + +export function convertReadable( + stream: Readable | ReadableStream +): ReadableStream + +export function continueFizzStream( + stream: Readable, + options: { + inlinedDataStream?: Readable + isStaticGeneration: boolean + getServerInsertedHTML?: () => Promise + serverInsertedHTMLToHead: boolean + validateRootLayout?: boolean + suffix?: string + } +): Promise +export function continueFizzStream( + stream: ReadableStream, + options: { + inlinedDataStream?: ReadableStream + isStaticGeneration: boolean + getServerInsertedHTML?: () => Promise + serverInsertedHTMLToHead: boolean + validateRootLayout?: boolean + suffix?: string + } +): Promise> +export function continueFizzStream( + stream: Readable | ReadableStream, + options: { + inlinedDataStream?: Readable | ReadableStream + isStaticGeneration: boolean + getServerInsertedHTML?: () => Promise + serverInsertedHTMLToHead: boolean + validateRootLayout?: boolean + suffix?: string + } +): Promise> diff --git a/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx b/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx new file mode 100644 index 0000000000000..3a88567e411db --- /dev/null +++ b/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx @@ -0,0 +1,45 @@ +import { createBufferedTransformStream } from './stream-utils.edge' +import { renderToReadableStream } from 'react-dom/server.edge' +import { Suspense } from 'react' + +function App() { + const Data = async () => { + const data = await Promise.resolve('1') + return

{data}

+ } + + return ( + + + My App + + +

Hello, World!

+ Fallback}> + + + + + ) +} + +async function createInput(app = ): Promise> { + const stream = await renderToReadableStream(app) + await stream.allReady + return stream +} + +describe('createBufferedTransformStream', () => { + it('should return a TransformStream that buffers input chunks across rendering boundaries', async () => { + const input = await createInput() + const actualStream = input.pipeThrough(createBufferedTransformStream()) + + const actualChunks = [] + // @ts-expect-error + for await (const chunks of actualStream) { + actualChunks.push(chunks) + } + + expect(actualChunks.length).toBe(1) + }) +}) diff --git a/packages/next/src/server/stream-utils/stream-utils.edge.ts b/packages/next/src/server/stream-utils/stream-utils.edge.ts index e32b8a04683b6..577af0addbb46 100644 --- a/packages/next/src/server/stream-utils/stream-utils.edge.ts +++ b/packages/next/src/server/stream-utils/stream-utils.edge.ts @@ -657,3 +657,9 @@ export async function continueDynamicDataResume( .pipeThrough(createMoveSuffixStream(closeTag)) ) } + +export function convertReadable( + stream: ReadableStream +): ReadableStream { + return stream +} diff --git a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx new file mode 100644 index 0000000000000..3c0e4106806ed --- /dev/null +++ b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx @@ -0,0 +1,85 @@ +import { + continueFizzStream, + createBufferedTransformStream, +} from './stream-utils.node' +import { PassThrough } from 'node:stream' +import { renderToPipeableStream } from 'react-dom/server.node' +import { Suspense } from 'react' + +function App() { + const Data = async () => { + const data = await Promise.resolve('1') + return

{data}

+ } + + return ( + + + My App + + +

Hello, World!

+ Fallback}> + + + + + ) +} + +function createInput(app = ): Promise { + return new Promise((resolve, reject) => { + const { pipe } = renderToPipeableStream(app, { + onAllReady() { + resolve(pipe(new PassThrough())) + }, + onShellError(error) { + reject(error) + }, + }) + }) +} + +describe('createBufferedTransformStream', () => { + it('should return a TransformStream that buffers input chunks across rendering boundaries', (done) => { + createInput().then((input) => { + const stream = input.pipe(createBufferedTransformStream()) + const actualChunks = [] + stream.on('data', (chunk) => { + actualChunks.push(chunk) + }) + + stream.resume() + + stream.on('end', () => { + expect(actualChunks.length).toBe(1) + done() + }) + }) + }) +}) + +describe('continueFizzStream', () => { + it.only('should passthrough render stream and buffered transform stream', (done) => { + createInput().then((input) => { + continueFizzStream(input, { + isStaticGeneration: false, + serverInsertedHTMLToHead: false, + }).then((stream) => { + const actualChunks: Uint8Array[] = [] + stream.on('data', (chunk) => { + actualChunks.push(chunk) + }) + + stream.resume() + + stream.on('end', () => { + console.log('ended') + expect(actualChunks.length).toBe(2) + console.log(actualChunks[0].toString()) + done() + }) + }) + }) + }) +}) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index d3ee4e9d0fc33..aec75edc6a7da 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -2,9 +2,21 @@ * By default, this file exports the methods from streams-utils.edge since all of those are based on Node.js web streams. * This file will then be an incremental re-implementation of all of those methods into Node.js only versions (based on proper Node.js Streams). */ - -import { PassThrough, type Readable, Writable } from 'node:stream' +import { + PassThrough, + Readable, + Transform, + Writable, + pipeline, +} from 'node:stream' import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node' +import isError from '../../lib/is-error' +import { + indexOfUint8Array, + isEquivalentUint8Arrays, + removeFromUint8Array, +} from './uint8array-helpers' +import { ENCODED_TAGS } from './encodedTags' export * from './stream-utils.edge' @@ -76,3 +88,416 @@ export async function streamToString(stream: Readable) { return string } + +export function chainStreams(...streams: Readable[]): Readable { + if (streams.length === 0) { + throw new Error('Invariant: chainStreams requires at least one stream') + } + if (streams.length === 1) { + return streams[0] + } + + const pt = new PassThrough() + + pipeline(streams, pt, (err) => { + // to match `stream-utils.edge.ts`, this error is just ignored. + // but maybe we at least log it? + console.log(`Invariant: error when pipelining streams`) + console.error(err) + }) + + return pt +} + +export function streamFromString(string: string): Readable { + return Readable.from(string) +} + +/** + * This utility function buffers all of the chunks it receives from the input + * during a single "macro-task". The transform function schedules a + * `setImmediate` callback that will push the buffered chunks to the readable. + * The transform also ensures not to execute the final callback too early. The + * overall timing of this utility is very specific and must match that of the + * edge based version. + */ +export function createBufferedTransformStream(): Transform { + let bufferedChunks: Uint8Array[] = [] + let bufferedChunksByteLength = 0 + let pending = false + + return new Transform({ + transform(chunk, _, callback) { + bufferedChunks.push(chunk) + bufferedChunksByteLength += chunk.byteLength + + if (pending) return callback() + + pending = true + + setImmediate(() => { + try { + const bufferedChunk = new Uint8Array(bufferedChunksByteLength) + let copiedBytes = 0 + for (let i = 0; i < bufferedChunks.length; i++) { + bufferedChunk.set(bufferedChunks[i], copiedBytes) + copiedBytes += bufferedChunks[i].byteLength + } + bufferedChunks.length = 0 + bufferedChunksByteLength = 0 + callback(null, bufferedChunk) + } catch (err: unknown) { + if (isError(err)) callback(err) + } finally { + pending = false + } + }) + }, + flush(callback) { + if (!pending) return callback() + + process.nextTick(() => { + callback() + }) + }, + }) +} + +const encoder = new TextEncoder() + +function createInsertedHTMLStream( + getServerInsertedHTML: () => Promise +): Transform { + return new Transform({ + transform(chunk, _, callback) { + getServerInsertedHTML() + .then((html) => { + if (html) { + this.push(encoder.encode(html)) + } + + return callback(null, chunk) + }) + .catch((err) => { + return callback(err) + }) + }, + }) +} + +function createHeadInsertionTransformStream( + insert: () => Promise +): Transform { + let inserted = false + let freezing = false + let hasBytes = false + return new Transform({ + transform(chunk, _, callback) { + hasBytes = true + if (freezing) { + return callback(null, chunk) + } + insert() + .then((insertion) => { + if (inserted) { + if (insertion) { + this.push(encoder.encode(insertion)) + } + this.push(chunk) + freezing = true + } else { + const index = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.HEAD) + if (index !== -1) { + if (insertion) { + const encodedInsertion = encoder.encode(insertion) + const insertedHeadContent = new Uint8Array( + chunk.length + encodedInsertion.length + ) + insertedHeadContent.set(chunk.slice(0, index)) + insertedHeadContent.set(encodedInsertion, index) + insertedHeadContent.set( + chunk.slice(index), + index + encodedInsertion.length + ) + this.push(insertedHeadContent) + } else { + this.push(chunk) + } + freezing = true + inserted = true + } + } + + if (!inserted) { + this.push(chunk) + } else { + setImmediate(() => { + freezing = false + }) + } + + return callback() + }) + .catch((err) => { + return callback(err) + }) + }, + flush(callback) { + if (hasBytes) { + insert() + .then((insertion) => { + return callback(null, insertion && encoder.encode(insertion)) + }) + .catch((err) => { + return callback(err) + }) + } + + return callback() + }, + }) +} + +function createDeferredSuffixStream(suffix: string): Transform { + let flushed = false + let pending = false + + return new Transform({ + transform(chunk, _, callback) { + this.push(chunk) + + if (flushed) return callback() + + flushed = true + pending = true + setImmediate(() => { + try { + this.push(encoder.encode(suffix)) + } catch { + } finally { + pending = false + return callback() + } + }) + }, + flush(callback) { + if (pending || flushed) return callback() + return callback(null, encoder.encode(suffix)) + }, + }) +} + +function createMoveSuffixStream(suffix: string): Transform { + let found = false + const encodedSuffix = encoder.encode(suffix) + return new Transform({ + transform(chunk, _, callback) { + if (found) { + return callback(null, chunk) + } + + const index = indexOfUint8Array(chunk, encodedSuffix) + if (index > -1) { + found = true + + if (chunk.length === suffix.length) { + return callback() + } + + const before = chunk.slice(0, index) + this.push(before) + + if (chunk.length > suffix.length + index) { + return callback(null, chunk.slice(index + suffix.length)) + } + } else { + return callback(null, chunk) + } + }, + flush(callback) { + return callback(null, encodedSuffix) + }, + }) +} + +// eslint-disable-next-line +function createStripDocumentClosingTagsTransform(): Transform { + return new Transform({ + transform(chunk, _, callback) { + if ( + isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.BODY_AND_HTML) || + isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.BODY) || + isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.HTML) + ) { + return callback() + } + + chunk = removeFromUint8Array(chunk, ENCODED_TAGS.CLOSED.BODY) + chunk = removeFromUint8Array(chunk, ENCODED_TAGS.CLOSED.HTML) + + return callback(null, chunk) + }, + }) +} + +function createRootLayoutValidatorStream(): Transform { + let foundHtml = false + let foundBody = false + return new Transform({ + transform(chunk, _, callback) { + if ( + !foundHtml && + indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.HTML) > -1 + ) { + foundHtml = true + } + + if ( + !foundBody && + indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.BODY) > -1 + ) { + foundBody = true + } + + return callback(null, chunk) + }, + flush(callback) { + const missingTags: typeof window.__next_root_layout_missing_tags = [] + if (!foundHtml) missingTags.push('html') + if (!foundBody) missingTags.push('body') + + if (!missingTags.length) return + + return callback( + null, + encoder.encode( + `` + ) + ) + }, + }) +} + +function createPassThroughFromReadable(readable: Readable) { + return readable.pipe(new PassThrough()) +} + +export function continueFizzStream( + renderStream: Readable, + { + suffix, + inlinedDataStream, + // eslint-disable-next-line + isStaticGeneration, + getServerInsertedHTML, + serverInsertedHTMLToHead, + validateRootLayout, + }: { + inlinedDataStream?: Readable + isStaticGeneration: boolean + getServerInsertedHTML?: () => Promise + serverInsertedHTMLToHead: boolean + validateRootLayout?: boolean + suffix?: string + } +): Promise { + // @ts-ignore + if (inlinedDataStream instanceof ReadableStream) { + // @ts-ignore + inlinedDataStream = Readable.fromWeb(inlinedDataStream) + } + + const closeTag = '' + const suffixUnclosed = suffix ? suffix.split(closeTag, 1)[0] : null + + // this doesn't make sense anymore, but keep it in mind if there are issues rendering static stuff. the renderToInitialFizzStream may be calling `pipe` either in `onShellReady` or `onAllReady` + // if (isStaticGeneration && 'allReady' in renderStream) { + // await renderStream.allReady + // } + + const pt = new PassThrough() + + const streams: Readable[] = [renderStream, createBufferedTransformStream()] + + if (getServerInsertedHTML && !serverInsertedHTMLToHead) { + streams.push(createInsertedHTMLStream(getServerInsertedHTML)) + } + + if (suffixUnclosed != null && suffixUnclosed.length > 0) { + streams.push(createDeferredSuffixStream(suffixUnclosed)) + } + + if (inlinedDataStream) { + streams.push(createPassThroughFromReadable(inlinedDataStream)) + } + + if (validateRootLayout) { + streams.push(createRootLayoutValidatorStream()) + } + + streams.push(createMoveSuffixStream(closeTag)) + + if (getServerInsertedHTML && serverInsertedHTMLToHead) { + streams.push(createHeadInsertionTransformStream(getServerInsertedHTML)) + } + + streams.push(pt) + + return new Promise((resolve, reject) => { + // @ts-expect-error + pipeline(streams, (error) => { + if (error) return reject(error) + else return resolve(pt) + }) + }) +} + +export function convertReadable( + stream: Readable | ReadableStream +): ReadableStream { + return !(stream instanceof ReadableStream) + ? (Readable.toWeb(stream) as ReadableStream) + : stream +} + +// export function continueDynamicPrerender(prerenderStream: Readable, { getServerInsertedHTML }: { getServerInsertedHTML: ()=> Promise}) { +// const pt = new PassThrough(); + +// return new Promise((resolve, reject) => { +// // @ts-expect-error +// pipeline([ +// prerenderStream, +// createBufferedTransformStream(), +// createStripDocumentClosingTagsTransform(), +// createHeadInsertionTransformStream(getServerInsertedHTML), +// pt +// ], (error) => { +// if (error) return reject(error) +// else return resolve(pt) +// }) +// }) +// } + +// export function continueStaticPrerender( +// prerenderStream: Readable, +// { +// inlinedDataStream, +// getServerInsertedHTML +// }: { +// inlinedDataStream: Readable, +// getServerInsertedHTML: () => Promise +// } +// ) { +// const pt = new PassThrough(); +// return new Promise((resolve, reject) => { +// pipeline( +// prerenderStream, +// createBufferedTransformStream(), +// createHeadInsertionTransformStream(getServerInsertedHTML), +// inlinedDataStream, +// createMoveSuffixStream('') +// ) +// }) +// } diff --git a/test/.stats-app/stats-config.js b/test/.stats-app/stats-config.js index 6e280627bfb7f..7b7b71d9a545c 100644 --- a/test/.stats-app/stats-config.js +++ b/test/.stats-app/stats-config.js @@ -153,16 +153,15 @@ module.exports = { 'http://localhost:$PORT/link', 'http://localhost:$PORT/withRouter', ], - // TODO: investigate replacing "ab" for this - // pagesToBench: [ - // 'http://localhost:$PORT/', - // 'http://localhost:$PORT/error-in-render', - // ], - // benchOptions: { - // reqTimeout: 60, - // concurrency: 50, - // numRequests: 2500, - // }, + pagesToBench: [ + 'http://localhost:$PORT/', + 'http://localhost:$PORT/error-in-render', + ], + benchOptions: { + reqTimeout: 60, + concurrency: 50, + numRequests: 2500, + }, }, ], }