Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flight Reply] Encode ReadableStream and AsyncIterables #28893

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 141 additions & 5 deletions packages/react-client/src/ReactFlightReplyClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';
import {
enableRenderableContext,
enableBinaryFlight,
enableFlightReadableStream,
} from 'shared/ReactFeatureFlags';

import {
Expand All @@ -28,6 +29,7 @@ import {
REACT_CONTEXT_TYPE,
REACT_PROVIDER_TYPE,
getIteratorFn,
ASYNC_ITERATOR,
} from 'shared/ReactSymbols';

import {
Expand Down Expand Up @@ -206,6 +208,123 @@ export function processReply(
return '$' + tag + blobId.toString(16);
}

function serializeReadableStream(stream: ReadableStream): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
// receiving side. It also implies that different chunks can be split up or merged as opposed
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
// received in the same slices.
// $FlowFixMe: This is a Node.js extension.
let supportsBYOB: void | boolean = stream.supportsBYOB;
if (supportsBYOB === undefined) {
try {
// $FlowFixMe[extra-arg]: This argument is accepted.
stream.getReader({mode: 'byob'}).releaseLock();
supportsBYOB = true;
} catch (x) {
supportsBYOB = false;
}
}

const reader = stream.getReader();

function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
if (entry.done) {
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, partJSON);
reader.read().then(progress, reject);
} catch (x) {
reject(x);
}
}
}
reader.read().then(progress, reject);

return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
}

function serializeAsyncIterable(
iterable: $AsyncIterable<ReactServerValue, ReactServerValue, void>,
iterator: $AsyncIterator<ReactServerValue, ReactServerValue, void>,
): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

// Generators/Iterators are Iterables but they're also their own iterator
// functions. If that's the case, we treat them as single-shot. Otherwise,
// we assume that this iterable might be a multi-shot and allow it to be
// iterated more than once on the receiving server.
const isIterator = iterable === iterator;

// There's a race condition between when the stream is aborted and when the promise
// resolves so we track whether we already aborted it to avoid writing twice.
function progress(
entry:
| {done: false, +value: ReactServerValue, ...}
| {done: true, +value: ReactServerValue, ...},
) {
if (entry.done) {
if (entry.value === undefined) {
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
} else {
// Unlike streams, the last value may not be undefined. If it's not
// we outline it and encode a reference to it in the closing instruction.
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal
} catch (x) {
reject(x);
return;
}
}
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, partJSON);
iterator.next().then(progress, reject);
} catch (x) {
reject(x);
return;
}
}
}

iterator.next().then(progress, reject);
return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16);
}

function resolveToJSON(
this:
| {+[key: string | number]: ReactServerValue}
Expand Down Expand Up @@ -349,11 +468,9 @@ export function processReply(
reject(reason);
}
},
reason => {
// In the future we could consider serializing this as an error
// that throws on the server instead.
reject(reason);
},
// In the future we could consider serializing this as an error
// that throws on the server instead.
reject,
);
return serializePromiseID(promiseId);
}
Expand Down Expand Up @@ -486,6 +603,25 @@ export function processReply(
return Array.from((iterator: any));
}

if (enableFlightReadableStream) {
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
if (
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
if (typeof getAsyncIterator === 'function') {
// We treat AsyncIterables as a Fragment and as such we might need to key them.
return serializeAsyncIterable(
(value: any),
getAsyncIterator.call((value: any)),
);
}
}

// Verify that this is a simple plain object.
const proto = getPrototypeOf(value);
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => {
// This should've been the same reference that we already saw.
expect(response.children).toBe(children);
});

// @gate enableFlightReadableStream
it('should supports streaming ReadableStream with objects', async () => {
let controller1;
let controller2;
const s1 = new ReadableStream({
start(c) {
controller1 = c;
},
});
const s2 = new ReadableStream({
start(c) {
controller2 = c;
},
});

const promise = ReactServerDOMClient.encodeReply({s1, s2});

controller1.enqueue({hello: 'world'});
controller2.enqueue({hi: 'there'});

controller1.enqueue('text1');
controller2.enqueue('text2');

controller1.close();
controller2.close();

const body = await promise;

const result = await ReactServerDOMServer.decodeReply(
body,
webpackServerMap,
);
const reader1 = result.s1.getReader();
const reader2 = result.s2.getReader();

expect(await reader1.read()).toEqual({
value: {hello: 'world'},
done: false,
});
expect(await reader2.read()).toEqual({
value: {hi: 'there'},
done: false,
});

expect(await reader1.read()).toEqual({
value: 'text1',
done: false,
});
expect(await reader1.read()).toEqual({
value: undefined,
done: true,
});
expect(await reader2.read()).toEqual({
value: 'text2',
done: false,
});
expect(await reader2.read()).toEqual({
value: undefined,
done: true,
});
});

// @gate enableFlightReadableStream
it('should supports streaming AsyncIterables with objects', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
const multiShotIterable = {
async *[Symbol.asyncIterator]() {
const next = yield {hello: 'A'};
expect(next).toBe(undefined);
await wait;
yield {hi: 'B'};
return 'C';
},
};
const singleShotIterator = (async function* () {
const next = yield {hello: 'D'};
expect(next).toBe(undefined);
await wait;
yield {hi: 'E'};
return 'F';
})();

await resolve();

const body = await ReactServerDOMClient.encodeReply({
multiShotIterable,
singleShotIterator,
});
const result = await ReactServerDOMServer.decodeReply(
body,
webpackServerMap,
);

const iterator1 = result.multiShotIterable[Symbol.asyncIterator]();
const iterator2 = result.singleShotIterator[Symbol.asyncIterator]();

expect(iterator1).not.toBe(result.multiShotIterable);
expect(iterator2).toBe(result.singleShotIterator);

expect(await iterator1.next()).toEqual({
value: {hello: 'A'},
done: false,
});
expect(await iterator2.next()).toEqual({
value: {hello: 'D'},
done: false,
});

expect(await iterator1.next()).toEqual({
value: {hi: 'B'},
done: false,
});
expect(await iterator2.next()).toEqual({
value: {hi: 'E'},
done: false,
});
expect(await iterator1.next()).toEqual({
value: 'C', // Return value
done: true,
});
expect(await iterator1.next()).toEqual({
value: undefined,
done: true,
});

expect(await iterator2.next()).toEqual({
value: 'F', // Return value
done: true,
});

// Multi-shot iterables should be able to do the same thing again
const iterator3 = result.multiShotIterable[Symbol.asyncIterator]();

expect(iterator3).not.toBe(iterator1);

// We should be able to iterate over the iterable again and it should be
// synchronously available using instrumented promises so that React can
// rerender it synchronously.
expect(iterator3.next().value).toEqual({
value: {hello: 'A'},
done: false,
});
expect(iterator3.next().value).toEqual({
value: {hi: 'B'},
done: false,
});
expect(iterator3.next().value).toEqual({
value: 'C', // Return value
done: true,
});
expect(iterator3.next().value).toEqual({
value: undefined,
done: true,
});

expect(() => iterator3.next('this is not allowed')).toThrow(
'Values cannot be passed to next() of AsyncIterables passed to Client Components.',
);
});
});
Loading