From 63b6d3b03799b1f1743f9c9af88048c484bdc29d Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Tue, 23 Apr 2024 13:41:53 -0600 Subject: [PATCH 1/8] implement chainStreams --- .../next/src/server/stream-utils/index.d.ts | 5 +++++ .../server/stream-utils/stream-utils.node.ts | 20 +++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/packages/next/src/server/stream-utils/index.d.ts b/packages/next/src/server/stream-utils/index.d.ts index 3b3b1b4d8da5a..e16cd54baa15f 100644 --- a/packages/next/src/server/stream-utils/index.d.ts +++ b/packages/next/src/server/stream-utils/index.d.ts @@ -18,3 +18,8 @@ export function renderToString(element: React.ReactElement): Promise export function streamToString( stream: Readable | ReadableStream ): Promise + +export function chainStreams( + ...streams: ReadableStream[] +): ReadableStream +export function chainStreams(...streams: Readable[]): Readable diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index d3ee4e9d0fc33..b98660001046e 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -2,8 +2,7 @@ * By default, this file exports the methods from streams-utils.edge since all of those are based on Node.js web streams. * This file will then be an incremental re-implementation of all of those methods into Node.js only versions (based on proper Node.js Streams). */ - -import { PassThrough, type Readable, Writable } from 'node:stream' +import { PassThrough, type Readable, Transform, Writable, pipeline } from 'node:stream' import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node' export * from './stream-utils.edge' @@ -76,3 +75,20 @@ export async function streamToString(stream: Readable) { return string } + +export function chainStreams(...streams: Readable[]): Readable { + if (streams.length === 0) { + throw new Error('Invariant: chainStreams requires at least one stream') + } + if (streams.length === 1) { + return streams[0] + } + + const transform = new Transform() + + pipeline(streams, transform, () => { + /* do nothing */ + }) + + return transform +} From 23432721ba670f2382721d643c5b316efe10934e Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Fri, 10 May 2024 08:51:16 -0600 Subject: [PATCH 2/8] clean up implementation --- packages/next/src/server/stream-utils/index.d.ts | 6 +++--- .../src/server/stream-utils/stream-utils.node.ts | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/packages/next/src/server/stream-utils/index.d.ts b/packages/next/src/server/stream-utils/index.d.ts index e16cd54baa15f..1ded12e019455 100644 --- a/packages/next/src/server/stream-utils/index.d.ts +++ b/packages/next/src/server/stream-utils/index.d.ts @@ -19,7 +19,7 @@ export function streamToString( stream: Readable | ReadableStream ): Promise -export function chainStreams( - ...streams: ReadableStream[] -): ReadableStream +export function chainStreams( + ...streams: ReadableStream[] +): ReadableStream export function chainStreams(...streams: Readable[]): Readable diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index b98660001046e..52b0bfbc0ce44 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -2,7 +2,13 @@ * By default, this file exports the methods from streams-utils.edge since all of those are based on Node.js web streams. * This file will then be an incremental re-implementation of all of those methods into Node.js only versions (based on proper Node.js Streams). */ -import { PassThrough, type Readable, Transform, Writable, pipeline } from 'node:stream' +import { + PassThrough, + type Readable, + Transform, + Writable, + pipeline, +} from 'node:stream' import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node' export * from './stream-utils.edge' @@ -86,8 +92,11 @@ export function chainStreams(...streams: Readable[]): Readable { const transform = new Transform() - pipeline(streams, transform, () => { - /* do nothing */ + pipeline(streams, transform, (err) => { + // to match `stream-utils.edge.ts`, this error is just ignored. + // but maybe we at least log it? + console.log(`Invariant: error when pipelining streams`) + console.error(err) }) return transform From c21378b5d4edaf3c8ca0d6cf8af01a1ce9455d79 Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Fri, 10 May 2024 12:04:20 -0600 Subject: [PATCH 3/8] Implement createBufferedTransformStream --- .../stream-utils/stream-utils.node.test.tsx | 78 +++++++++++++++++++ .../server/stream-utils/stream-utils.node.ts | 47 ++++++++++- 2 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 packages/next/src/server/stream-utils/stream-utils.node.test.tsx diff --git a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx new file mode 100644 index 0000000000000..9dca617b6bcb1 --- /dev/null +++ b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx @@ -0,0 +1,78 @@ +import { createBufferedTransformStream } from './stream-utils.node' +import { PassThrough, type Readable } from 'node:stream' +import { renderToPipeableStream } from 'react-dom/server.node' +import { Suspense } from 'react' +import { streamToString } from '.' +import { StringDecoder } from 'node:string_decoder' + +function App() { + const Data = async () => { + const data = await Promise.resolve('1') + return

{data}

+ } + return ( + + + My App + + +

Hello, World!

+ Fallback}> + + + + + ) +} + +function createInput(app = ): Promise { + return new Promise((resolve) => { + const { pipe } = renderToPipeableStream(app, { + onShellReady() { + const pt = new PassThrough() + pipe(pt) + resolve(pt) + }, + }) + }) +} + +function getExpectedOutput(input: Readable) { + return streamToString(input.pipe(new PassThrough())) +} + +describe('createBufferedTransformStream', () => { + it('should return a TransformStream that buffers input chunks across rendering boundaries', async () => { + const stream = createBufferedTransformStream() + const input = await createInput() + const output = input.pipe(stream) + const expectedCall = getExpectedOutput(input) + + const actualChunks = await new Promise((resolve) => { + const chunks: Buffer[] = [] + output.on('readable', () => { + let chunk + while (null !== (chunk = output.read())) { + chunks.push(chunk) + } + }) + output.on('end', () => { + resolve(chunks) + }) + }) + + const expected = await expectedCall + + // React will send the suspense boundary piece second + expect(actualChunks.length).toBe(2) + + let actual = '' + const decoder = new StringDecoder() + for (const chunk of actualChunks) { + actual += decoder.write(chunk) + } + actual += decoder.end() + + expect(actual).toStrictEqual(expected) + }) +}) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index 52b0bfbc0ce44..ee17889d50296 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -4,7 +4,7 @@ */ import { PassThrough, - type Readable, + Readable, Transform, Writable, pipeline, @@ -101,3 +101,48 @@ export function chainStreams(...streams: Readable[]): Readable { return transform } + +export function streamFromString(string: string): Readable { + return Readable.from(string) +} + +export function createBufferedTransformStream(): Transform { + let buffered: Buffer[] = [] + let byteLength = 0 + let pending = false + + const flush = (transform: Transform) => { + if (pending) return + + pending = true + + process.nextTick(() => { + try { + const chunk = Buffer.alloc(byteLength) + let copiedBytes = 0 + for (let i = 0; i < buffered.length; i++) { + chunk.set(buffered[i], copiedBytes) + copiedBytes += buffered[i].byteLength + } + buffered = [] + byteLength = 0 + transform.push(chunk) + } catch { + } finally { + pending = false + } + }) + } + + return new Transform({ + transform(chunk, _, callback) { + buffered.push(chunk) + byteLength += chunk.byteLength + flush(this) + callback() + }, + final(callback) { + callback() + }, + }) +} From 7f0c55b3c614a426ca2f4e1212d75515c16186a5 Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Fri, 10 May 2024 13:09:36 -0600 Subject: [PATCH 4/8] fix up test --- .../stream-utils/stream-utils.node.test.tsx | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx index 9dca617b6bcb1..06429d5048fd2 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx +++ b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx @@ -1,8 +1,10 @@ -import { createBufferedTransformStream } from './stream-utils.node' -import { PassThrough, type Readable } from 'node:stream' +import { + createBufferedTransformStream, + streamToString, +} from './stream-utils.node' +import { PassThrough } from 'node:stream' import { renderToPipeableStream } from 'react-dom/server.node' import { Suspense } from 'react' -import { streamToString } from '.' import { StringDecoder } from 'node:string_decoder' function App() { @@ -37,16 +39,14 @@ function createInput(app = ): Promise { }) } -function getExpectedOutput(input: Readable) { - return streamToString(input.pipe(new PassThrough())) -} - describe('createBufferedTransformStream', () => { it('should return a TransformStream that buffers input chunks across rendering boundaries', async () => { const stream = createBufferedTransformStream() const input = await createInput() + // This is essentially equivalent to a ReadableStream.tee() + // The important part is that both `pipe` calls happen before any read operation do. const output = input.pipe(stream) - const expectedCall = getExpectedOutput(input) + const expectedOutput = input.pipe(new PassThrough()) const actualChunks = await new Promise((resolve) => { const chunks: Buffer[] = [] @@ -61,7 +61,7 @@ describe('createBufferedTransformStream', () => { }) }) - const expected = await expectedCall + const expected = await streamToString(expectedOutput) // React will send the suspense boundary piece second expect(actualChunks.length).toBe(2) From 5b518f6e93866b7afb3f94f89bac6615957ae46d Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Fri, 10 May 2024 14:28:07 -0600 Subject: [PATCH 5/8] use uint8array over buffer --- packages/next/src/server/stream-utils/stream-utils.node.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index ee17889d50296..4d58b66f7a703 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -107,7 +107,7 @@ export function streamFromString(string: string): Readable { } export function createBufferedTransformStream(): Transform { - let buffered: Buffer[] = [] + let buffered: Uint8Array[] = [] let byteLength = 0 let pending = false @@ -118,7 +118,7 @@ export function createBufferedTransformStream(): Transform { process.nextTick(() => { try { - const chunk = Buffer.alloc(byteLength) + const chunk = new Uint8Array(byteLength) let copiedBytes = 0 for (let i = 0; i < buffered.length; i++) { chunk.set(buffered[i], copiedBytes) From f14f705d6cd3252edbb05aaefd437aa2577e411d Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Fri, 10 May 2024 18:29:45 -0600 Subject: [PATCH 6/8] fix createBufferedTransformStream. add equivalent test for edge --- .../stream-utils/stream-utils.edge.test.tsx | 45 ++++++++++++++ .../stream-utils/stream-utils.node.test.tsx | 61 ++++++------------- .../server/stream-utils/stream-utils.node.ts | 17 ++++-- 3 files changed, 76 insertions(+), 47 deletions(-) create mode 100644 packages/next/src/server/stream-utils/stream-utils.edge.test.tsx diff --git a/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx b/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx new file mode 100644 index 0000000000000..3a88567e411db --- /dev/null +++ b/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx @@ -0,0 +1,45 @@ +import { createBufferedTransformStream } from './stream-utils.edge' +import { renderToReadableStream } from 'react-dom/server.edge' +import { Suspense } from 'react' + +function App() { + const Data = async () => { + const data = await Promise.resolve('1') + return

{data}

+ } + + return ( + + + My App + + +

Hello, World!

+ Fallback}> + + + + + ) +} + +async function createInput(app = ): Promise> { + const stream = await renderToReadableStream(app) + await stream.allReady + return stream +} + +describe('createBufferedTransformStream', () => { + it('should return a TransformStream that buffers input chunks across rendering boundaries', async () => { + const input = await createInput() + const actualStream = input.pipeThrough(createBufferedTransformStream()) + + const actualChunks = [] + // @ts-expect-error + for await (const chunks of actualStream) { + actualChunks.push(chunks) + } + + expect(actualChunks.length).toBe(1) + }) +}) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx index 06429d5048fd2..edb644adc378e 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx +++ b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx @@ -1,17 +1,14 @@ -import { - createBufferedTransformStream, - streamToString, -} from './stream-utils.node' +import { createBufferedTransformStream } from './stream-utils.node' import { PassThrough } from 'node:stream' import { renderToPipeableStream } from 'react-dom/server.node' import { Suspense } from 'react' -import { StringDecoder } from 'node:string_decoder' function App() { const Data = async () => { const data = await Promise.resolve('1') return

{data}

} + return ( @@ -28,51 +25,33 @@ function App() { } function createInput(app = ): Promise { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const { pipe } = renderToPipeableStream(app, { - onShellReady() { - const pt = new PassThrough() - pipe(pt) - resolve(pt) + onAllReady() { + resolve(pipe(new PassThrough())) + }, + onShellError(error) { + reject(error) }, }) }) } describe('createBufferedTransformStream', () => { - it('should return a TransformStream that buffers input chunks across rendering boundaries', async () => { - const stream = createBufferedTransformStream() - const input = await createInput() - // This is essentially equivalent to a ReadableStream.tee() - // The important part is that both `pipe` calls happen before any read operation do. - const output = input.pipe(stream) - const expectedOutput = input.pipe(new PassThrough()) - - const actualChunks = await new Promise((resolve) => { - const chunks: Buffer[] = [] - output.on('readable', () => { - let chunk - while (null !== (chunk = output.read())) { - chunks.push(chunk) - } + it('should return a TransformStream that buffers input chunks across rendering boundaries', (done) => { + createInput().then((input) => { + const stream = input.pipe(createBufferedTransformStream()) + const actualChunks = [] + stream.on('data', (chunk) => { + actualChunks.push(chunk) }) - output.on('end', () => { - resolve(chunks) - }) - }) - const expected = await streamToString(expectedOutput) + stream.resume() - // React will send the suspense boundary piece second - expect(actualChunks.length).toBe(2) - - let actual = '' - const decoder = new StringDecoder() - for (const chunk of actualChunks) { - actual += decoder.write(chunk) - } - actual += decoder.end() - - expect(actual).toStrictEqual(expected) + stream.on('finish', () => { + expect(actualChunks.length).toBe(1) + done() + }) + }) }) }) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index 4d58b66f7a703..36b55087a2c10 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -10,6 +10,7 @@ import { pipeline, } from 'node:stream' import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node' +import { DetachedPromise } from '../../lib/detached-promise' export * from './stream-utils.edge' @@ -109,14 +110,15 @@ export function streamFromString(string: string): Readable { export function createBufferedTransformStream(): Transform { let buffered: Uint8Array[] = [] let byteLength = 0 - let pending = false + let pending: DetachedPromise | undefined const flush = (transform: Transform) => { if (pending) return - pending = true + const detached = new DetachedPromise() + pending = detached - process.nextTick(() => { + setImmediate(() => { try { const chunk = new Uint8Array(byteLength) let copiedBytes = 0 @@ -124,12 +126,13 @@ export function createBufferedTransformStream(): Transform { chunk.set(buffered[i], copiedBytes) copiedBytes += buffered[i].byteLength } - buffered = [] + buffered.length = 0 byteLength = 0 transform.push(chunk) } catch { } finally { - pending = false + pending = undefined + detached.resolve() } }) } @@ -142,7 +145,9 @@ export function createBufferedTransformStream(): Transform { callback() }, final(callback) { - callback() + if (!pending) callback() + + pending?.promise.then(() => callback()) }, }) } From 4a090bb44bc534bb078bce026ba18a6707151538 Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Fri, 10 May 2024 14:06:37 -0600 Subject: [PATCH 7/8] Implement insertion stream utilities for Node.js Streams --- .../server/stream-utils/stream-utils.node.ts | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index 36b55087a2c10..000e60ea76c99 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -11,6 +11,9 @@ import { } from 'node:stream' import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node' import { DetachedPromise } from '../../lib/detached-promise' +import { indexOfUint8Array } from './uint8array-helpers' +import { ENCODED_TAGS } from './encodedTags' + export * from './stream-utils.edge' @@ -151,3 +154,93 @@ export function createBufferedTransformStream(): Transform { }, }) } + +export function createInsertedHTMLStream( + getServerInsertedHTML: () => Promise +): Transform { + return new Transform({ + transform(chunk, _, callback) { + getServerInsertedHTML().then((html) => { + if (html) { + this.push(Buffer.from(html)) + } + + return callback(null, chunk) + }) + }, + }) +} + +export function createHeadInsertionTransformStream( + insert: () => Promise +): Transform { + let inserted = false + let freezing = false + let hasBytes = false + return new Transform({ + transform(chunk, _, callback) { + hasBytes = true + if (freezing) { + return callback(null, chunk) + } + insert() + .then((insertion) => { + if (inserted) { + if (insertion) { + this.push(Buffer.from(insertion)) + } + this.push(chunk) + freezing = true + } else { + const index = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.HEAD) + if (index !== -1) { + if (insertion) { + const encodedInsertion = Buffer.from(insertion) + const insertedHeadContent = Buffer.alloc( + chunk.length + encodedInsertion.length + ) + insertedHeadContent.set(chunk.slice(0, index)) + insertedHeadContent.set(encodedInsertion, index) + insertedHeadContent.set( + chunk.slice(index), + index + encodedInsertion.length + ) + this.push(insertedHeadContent) + } else { + this.push(chunk) + } + freezing = true + inserted = true + } + } + + if (!inserted) { + this.push(chunk) + } else { + process.nextTick(() => { + freezing = false + }) + } + + callback() + }) + .catch((err) => { + callback(err) + }) + }, + final(callback) { + if (hasBytes) { + insert() + .then((insertion) => { + if (insertion) { + this.push(Buffer.from(insertion)) + callback() + } + }) + .catch((err) => { + callback(err) + }) + } + }, + }) +} From afdb71084502055014566d7f7067a7b7ccf14c99 Mon Sep 17 00:00:00 2001 From: Ethan Arrowood Date: Fri, 10 May 2024 14:52:04 -0600 Subject: [PATCH 8/8] temp --- .../stream-utils/stream-utils.node.test.tsx | 37 ++++++++++++++++++- .../server/stream-utils/stream-utils.node.ts | 26 ++++++++----- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx index edb644adc378e..833064aea3ef4 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx +++ b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx @@ -1,4 +1,7 @@ -import { createBufferedTransformStream } from './stream-utils.node' +import { + createBufferedTransformStream, + createInsertedHTMLStream, +} from './stream-utils.node' import { PassThrough } from 'node:stream' import { renderToPipeableStream } from 'react-dom/server.node' import { Suspense } from 'react' @@ -55,3 +58,35 @@ describe('createBufferedTransformStream', () => { }) }) }) + +describe('createInsertedHTMLStream', () => { + it('should insert html to the beginning of the stream', async () => { + const insertedHTML = '' + const stream = createInsertedHTMLStream(() => Promise.resolve(insertedHTML)) + const input = await createInput() + const output = input.pipe(stream) + + const actualChunks = await new Promise((resolve) => { + const chunks: Buffer[] = [] + output.on('readable', () => { + let chunk + while (null !== (chunk = output.read())) { + chunks.push(chunk) + } + }) + output.on('end', () => { + resolve(chunks) + }) + }) + + console.log(actualChunks) + + expect(actualChunks.length).toBe(2) + const encoder = new TextEncoder() + const expected = encoder.encode(insertedHTML) + expect(actualChunks[0].indexOf(expected)).toBe(0) + expect( + new Uint8Array(actualChunks[0].subarray(expected.length)) + ).toStrictEqual(expected) + }) +}) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index 000e60ea76c99..4bda164c692e9 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -155,18 +155,24 @@ export function createBufferedTransformStream(): Transform { }) } +const encoder = new TextEncoder() + export function createInsertedHTMLStream( getServerInsertedHTML: () => Promise ): Transform { return new Transform({ transform(chunk, _, callback) { - getServerInsertedHTML().then((html) => { - if (html) { - this.push(Buffer.from(html)) - } + getServerInsertedHTML() + .then((html) => { + if (html) { + this.push(encoder.encode(html)) + } - return callback(null, chunk) - }) + return callback(null, chunk) + }) + .catch((err) => { + return callback(err) + }) }, }) } @@ -187,7 +193,7 @@ export function createHeadInsertionTransformStream( .then((insertion) => { if (inserted) { if (insertion) { - this.push(Buffer.from(insertion)) + this.push(encoder.encode(insertion)) } this.push(chunk) freezing = true @@ -195,8 +201,8 @@ export function createHeadInsertionTransformStream( const index = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.HEAD) if (index !== -1) { if (insertion) { - const encodedInsertion = Buffer.from(insertion) - const insertedHeadContent = Buffer.alloc( + const encodedInsertion = encoder.encode(insertion) + const insertedHeadContent = new Uint8Array( chunk.length + encodedInsertion.length ) insertedHeadContent.set(chunk.slice(0, index)) @@ -233,7 +239,7 @@ export function createHeadInsertionTransformStream( insert() .then((insertion) => { if (insertion) { - this.push(Buffer.from(insertion)) + this.push(encoder.encode(insertion)) callback() } })