Skip to content

Commit

Permalink
rewrite without promises
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethan-Arrowood committed May 13, 2024
1 parent fb389c9 commit 9b1b1b3
Showing 1 changed file with 39 additions and 34 deletions.
73 changes: 39 additions & 34 deletions packages/next/src/server/stream-utils/stream-utils.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
pipeline,
} from 'node:stream'
import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node'
import { DetachedPromise } from '../../lib/detached-promise'
import isError from '../../lib/is-error'

export * from './stream-utils.edge'

Expand Down Expand Up @@ -107,47 +107,52 @@ export function streamFromString(string: string): Readable {
return Readable.from(string)
}

/**
* This utility function buffers all of the chunks it receives from the input
* during a single "macro-task". The transform function schedules a
* `setImmediate` callback that will push the buffered chunks to the readable.
* The transform also ensures not to execute the final callback too early. The
* overall timing of this utility is very specific and must match that of the
* edge based version.
*/
export function createBufferedTransformStream(): Transform {
let buffered: Uint8Array[] = []
let byteLength = 0
let pending: DetachedPromise<void> | undefined

const flush = (transform: Transform) => {
if (pending) return

const detached = new DetachedPromise<void>()
pending = detached

setImmediate(() => {
try {
const chunk = new Uint8Array(byteLength)
let copiedBytes = 0
for (let i = 0; i < buffered.length; i++) {
chunk.set(buffered[i], copiedBytes)
copiedBytes += buffered[i].byteLength
}
buffered.length = 0
byteLength = 0
transform.push(chunk)
} catch {
} finally {
pending = undefined
detached.resolve()
}
})
}
let bufferedChunks: Uint8Array[] = []
let bufferedChunksByteLength = 0
let pending = false

return new Transform({
transform(chunk, _, callback) {
buffered.push(chunk)
byteLength += chunk.byteLength
flush(this)
callback()
bufferedChunks.push(chunk)
bufferedChunksByteLength += chunk.byteLength

if (pending) callback()

pending = true

setImmediate(() => {
try {
const bufferedChunk = new Uint8Array(bufferedChunksByteLength)
let copiedBytes = 0
for (let i = 0; i < bufferedChunks.length; i++) {
bufferedChunk.set(bufferedChunks[i], copiedBytes)
copiedBytes += bufferedChunks[i].byteLength
}
bufferedChunks.length = 0
bufferedChunksByteLength = 0
callback(null, bufferedChunk)
} catch (err: unknown) {
if (isError(err)) callback(err)
} finally {
pending = false
}
})
},
final(callback) {
if (!pending) callback()

pending?.promise.then(() => callback())
process.nextTick(() => {
callback()
})
},
})
}

0 comments on commit 9b1b1b3

Please sign in to comment.