From a6b2500baf174ecfefd9e3d5993c781dbdc70a41 Mon Sep 17 00:00:00 2001 From: Lars Grammel Date: Tue, 9 Apr 2024 16:17:36 +0200 Subject: [PATCH] Enable stream data protocol and deprecate experimental_streamData: true flag. (#1192) --- .changeset/two-bats-itch.md | 5 + .../api-reference/providers/inkeep-stream.mdx | 1 - .../providers/mistral-stream.mdx | 1 - docs/pages/docs/api-reference/stream-data.mdx | 2 - docs/pages/docs/guides/providers/inkeep.mdx | 1 - examples/next-inkeep/app/api/chat/route.ts | 1 - .../app/api/stream-data-basic/route.ts | 1 - .../app/api/stream-data-chain/route.ts | 1 - .../next-mistral/app/api/completion/route.ts | 1 - .../app/api/chat-with-functions/route.ts | 1 - .../app/api/chat-with-tools/route.ts | 1 - .../next-openai/app/api/completion/route.ts | 1 - .../app/stream-react-response/action.tsx | 11 +- .../server/api/chat-with-functions.ts | 1 - examples/nuxt-openai/server/api/completion.ts | 1 - .../routes/api/chat-with-functions/index.ts | 1 - .../src/routes/api/completion/index.ts | 1 - .../routes/api/chat-with-functions/+server.ts | 1 - .../src/routes/api/chat-with-tools/+server.ts | 1 - .../src/routes/api/completion/+server.ts | 1 - .../core/core/generate-text/stream-text.ts | 4 +- packages/core/react/use-chat.ts | 3 - .../core/react/use-completion.ui.test.tsx | 13 - packages/core/shared/call-chat-api.ts | 102 +---- packages/core/shared/call-completion-api.ts | 42 +- packages/core/shared/utils.ts | 5 - packages/core/solid/use-chat.ts | 3 - .../core/solid/use-completion.ui.test.tsx | 13 - packages/core/streams/ai-stream.ts | 6 +- .../core/streams/anthropic-stream.test.ts | 226 ++++----- packages/core/streams/anthropic-stream.ts | 4 +- .../core/streams/aws-bedrock-stream.test.ts | 274 +++++------ packages/core/streams/aws-bedrock-stream.ts | 4 +- packages/core/streams/cohere-stream.test.ts | 95 ++-- packages/core/streams/cohere-stream.ts | 8 +- .../google-generative-ai-stream.test.ts | 76 ++-- .../streams/google-generative-ai-stream.ts | 2 +- .../core/streams/huggingface-stream.test.ts | 90 ++-- packages/core/streams/huggingface-stream.ts | 4 +- packages/core/streams/inkeep-stream.test.ts | 113 ++--- packages/core/streams/inkeep-stream.ts | 2 +- .../core/streams/langchain-stream.test.ts | 301 +++++------- packages/core/streams/langchain-stream.ts | 4 +- packages/core/streams/mistral-stream.test.ts | 100 ++-- packages/core/streams/mistral-stream.ts | 4 +- packages/core/streams/openai-stream.test.ts | 428 +++++++----------- packages/core/streams/openai-stream.ts | 25 +- .../core/streams/replicate-stream.test.ts | 101 ++--- packages/core/streams/replicate-stream.ts | 2 +- packages/core/streams/stream-data.ts | 11 +- .../streams/streaming-react-response.test.tsx | 6 +- .../core/streams/streaming-react-response.ts | 119 ++--- .../core/streams/streaming-text-response.ts | 2 - packages/core/svelte/use-chat.ts | 3 - packages/core/tests/utils/mock-fetch.ts | 4 - packages/core/vue/use-chat.ts | 3 - packages/core/vue/use-completion.ui.test.ts | 14 - 57 files changed, 799 insertions(+), 1452 deletions(-) create mode 100644 .changeset/two-bats-itch.md diff --git a/.changeset/two-bats-itch.md b/.changeset/two-bats-itch.md new file mode 100644 index 000000000000..31d232534804 --- /dev/null +++ b/.changeset/two-bats-itch.md @@ -0,0 +1,5 @@ +--- +'ai': patch +--- + +Deprecated the `experimental_streamData: true` setting from AIStreamCallbacksAndOptions. You can delete occurrences in your code. The stream data protocol is now used by default. diff --git a/docs/pages/docs/api-reference/providers/inkeep-stream.mdx b/docs/pages/docs/api-reference/providers/inkeep-stream.mdx index debccde5b360..cac12a555557 100644 --- a/docs/pages/docs/api-reference/providers/inkeep-stream.mdx +++ b/docs/pages/docs/api-reference/providers/inkeep-stream.mdx @@ -142,7 +142,6 @@ export async function POST(req: Request) { } data.close(); }, - experimental_streamData: true, }); return new StreamingTextResponse(stream, {}, data); diff --git a/docs/pages/docs/api-reference/providers/mistral-stream.mdx b/docs/pages/docs/api-reference/providers/mistral-stream.mdx index d77513b8bf32..c15fabdc4037 100644 --- a/docs/pages/docs/api-reference/providers/mistral-stream.mdx +++ b/docs/pages/docs/api-reference/providers/mistral-stream.mdx @@ -84,7 +84,6 @@ export async function POST(req: Request) { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); // Respond with the stream diff --git a/docs/pages/docs/api-reference/stream-data.mdx b/docs/pages/docs/api-reference/stream-data.mdx index 687bf6bacd50..1e28f24df0cb 100644 --- a/docs/pages/docs/api-reference/stream-data.mdx +++ b/docs/pages/docs/api-reference/stream-data.mdx @@ -79,8 +79,6 @@ export async function POST(req: Request) { // IMPORTANT! you must close StreamData manually or the response will never finish. data.close(); }, - // IMPORTANT! until this is stable, you must explicitly opt in to supporting streamData. - experimental_streamData: true, }); data.append({ diff --git a/docs/pages/docs/guides/providers/inkeep.mdx b/docs/pages/docs/guides/providers/inkeep.mdx index 268c4d7f694c..d8327047c177 100644 --- a/docs/pages/docs/guides/providers/inkeep.mdx +++ b/docs/pages/docs/guides/providers/inkeep.mdx @@ -123,7 +123,6 @@ export async function POST(req: Request) { } data.close(); }, - experimental_streamData: true, }); return new StreamingTextResponse(stream, {}, data); diff --git a/examples/next-inkeep/app/api/chat/route.ts b/examples/next-inkeep/app/api/chat/route.ts index 9571f9cee899..7bcf44909c0c 100644 --- a/examples/next-inkeep/app/api/chat/route.ts +++ b/examples/next-inkeep/app/api/chat/route.ts @@ -68,7 +68,6 @@ export async function POST(req: Request) { } data.close(); }, - experimental_streamData: true, }); return new StreamingTextResponse(stream, {}, data); diff --git a/examples/next-langchain/app/api/stream-data-basic/route.ts b/examples/next-langchain/app/api/stream-data-basic/route.ts index 9250cfa85948..eb1a94eae549 100644 --- a/examples/next-langchain/app/api/stream-data-basic/route.ts +++ b/examples/next-langchain/app/api/stream-data-basic/route.ts @@ -20,7 +20,6 @@ export async function POST(req: Request) { data.append(JSON.stringify({ key: 'value' })); // example data.close(); }, - experimental_streamData: true, }); const llm = new ChatOpenAI({ diff --git a/examples/next-langchain/app/api/stream-data-chain/route.ts b/examples/next-langchain/app/api/stream-data-chain/route.ts index 05245fe8d122..9c947a1a207b 100644 --- a/examples/next-langchain/app/api/stream-data-chain/route.ts +++ b/examples/next-langchain/app/api/stream-data-chain/route.ts @@ -26,7 +26,6 @@ export async function POST(req: Request) { data.append(JSON.stringify({ key: 'value' })); // example data.close(); }, - experimental_streamData: true, }); await chain.stream({ product: value }, { callbacks: [handlers] }); diff --git a/examples/next-mistral/app/api/completion/route.ts b/examples/next-mistral/app/api/completion/route.ts index 6ef7a3ab244c..167ef523c1a9 100644 --- a/examples/next-mistral/app/api/completion/route.ts +++ b/examples/next-mistral/app/api/completion/route.ts @@ -30,7 +30,6 @@ export async function POST(req: Request) { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); // Respond with the stream diff --git a/examples/next-openai/app/api/chat-with-functions/route.ts b/examples/next-openai/app/api/chat-with-functions/route.ts index be2d3ba37565..03fafdeb2692 100644 --- a/examples/next-openai/app/api/chat-with-functions/route.ts +++ b/examples/next-openai/app/api/chat-with-functions/route.ts @@ -91,7 +91,6 @@ export async function POST(req: Request) { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); data.append({ diff --git a/examples/next-openai/app/api/chat-with-tools/route.ts b/examples/next-openai/app/api/chat-with-tools/route.ts index f4448e1ca59a..4b8bd988ed52 100644 --- a/examples/next-openai/app/api/chat-with-tools/route.ts +++ b/examples/next-openai/app/api/chat-with-tools/route.ts @@ -111,7 +111,6 @@ export async function POST(req: Request) { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); data.append({ diff --git a/examples/next-openai/app/api/completion/route.ts b/examples/next-openai/app/api/completion/route.ts index de1e5ccd571c..6616ac1fc989 100644 --- a/examples/next-openai/app/api/completion/route.ts +++ b/examples/next-openai/app/api/completion/route.ts @@ -27,7 +27,6 @@ export async function POST(req: Request) { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); // Respond with the stream diff --git a/examples/next-openai/app/stream-react-response/action.tsx b/examples/next-openai/app/stream-react-response/action.tsx index df5b07aae92c..a17e407aa148 100644 --- a/examples/next-openai/app/stream-react-response/action.tsx +++ b/examples/next-openai/app/stream-react-response/action.tsx @@ -101,7 +101,6 @@ export async function handler({ messages }: { messages: Message[] }) { return undefined; }, - experimental_streamData: true, }); return new experimental_StreamingReactResponse(stream, { @@ -113,11 +112,11 @@ export async function handler({ messages }: { messages: Message[] }) { switch (value.type) { case 'weather': { return ( -
-
+
+

{value.location}

-

+

{value.temperature}° {value.format}

@@ -143,7 +142,7 @@ export async function handler({ messages }: { messages: Message[] }) {
Framed Image { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); data.append({ diff --git a/examples/nuxt-openai/server/api/completion.ts b/examples/nuxt-openai/server/api/completion.ts index 1403d6b20da3..487f8c700d9f 100644 --- a/examples/nuxt-openai/server/api/completion.ts +++ b/examples/nuxt-openai/server/api/completion.ts @@ -33,7 +33,6 @@ export default defineLazyEventHandler(async () => { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); // Respond with the stream diff --git a/examples/solidstart-openai/src/routes/api/chat-with-functions/index.ts b/examples/solidstart-openai/src/routes/api/chat-with-functions/index.ts index 7ebb4620492c..9c677f6012ba 100644 --- a/examples/solidstart-openai/src/routes/api/chat-with-functions/index.ts +++ b/examples/solidstart-openai/src/routes/api/chat-with-functions/index.ts @@ -85,7 +85,6 @@ export const POST = async (event: APIEvent) => { onFinal() { data.close(); }, - experimental_streamData: true, }); data.append({ diff --git a/examples/solidstart-openai/src/routes/api/completion/index.ts b/examples/solidstart-openai/src/routes/api/completion/index.ts index ebf207fd66dc..dc66b00d9e30 100644 --- a/examples/solidstart-openai/src/routes/api/completion/index.ts +++ b/examples/solidstart-openai/src/routes/api/completion/index.ts @@ -33,7 +33,6 @@ export const POST = async (event: APIEvent) => { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); // Respond with the stream diff --git a/examples/sveltekit-openai/src/routes/api/chat-with-functions/+server.ts b/examples/sveltekit-openai/src/routes/api/chat-with-functions/+server.ts index 35c8449fc704..0509a3bdd51d 100644 --- a/examples/sveltekit-openai/src/routes/api/chat-with-functions/+server.ts +++ b/examples/sveltekit-openai/src/routes/api/chat-with-functions/+server.ts @@ -88,7 +88,6 @@ export async function POST({ request }) { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); data.append({ diff --git a/examples/sveltekit-openai/src/routes/api/chat-with-tools/+server.ts b/examples/sveltekit-openai/src/routes/api/chat-with-tools/+server.ts index 5564c9bd5ec9..794834c070e9 100644 --- a/examples/sveltekit-openai/src/routes/api/chat-with-tools/+server.ts +++ b/examples/sveltekit-openai/src/routes/api/chat-with-tools/+server.ts @@ -109,7 +109,6 @@ export async function POST({ request }) { onFinal() { data.close(); }, - experimental_streamData: true, }); data.append({ diff --git a/examples/sveltekit-openai/src/routes/api/completion/+server.ts b/examples/sveltekit-openai/src/routes/api/completion/+server.ts index d1535b2f31e8..379d92f0fc9f 100644 --- a/examples/sveltekit-openai/src/routes/api/completion/+server.ts +++ b/examples/sveltekit-openai/src/routes/api/completion/+server.ts @@ -38,7 +38,6 @@ export const POST = (async ({ request }) => { onFinal(completion) { data.close(); }, - experimental_streamData: true, }); // Respond with the stream diff --git a/packages/core/core/generate-text/stream-text.ts b/packages/core/core/generate-text/stream-text.ts index 5a4e5a4a3825..1b33562ec6fa 100644 --- a/packages/core/core/generate-text/stream-text.ts +++ b/packages/core/core/generate-text/stream-text.ts @@ -214,8 +214,6 @@ Stream callbacks that will be called when the stream emits events. // TODO add support for tool calls return readableFromAsyncIterable(this.textStream) .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData), - ); + .pipeThrough(createStreamDataTransformer()); } } diff --git a/packages/core/react/use-chat.ts b/packages/core/react/use-chat.ts index 28c3a7de4332..869cc58433c1 100644 --- a/packages/core/react/use-chat.ts +++ b/packages/core/react/use-chat.ts @@ -185,9 +185,6 @@ const getStreamedResponse = async ( ...chatRequest.options?.headers, }, abortController: () => abortControllerRef.current, - appendMessage(message) { - mutate([...chatRequest.messages, message], false); - }, restoreMessagesOnFailure() { mutate(previousMessages, false); }, diff --git a/packages/core/react/use-completion.ui.test.tsx b/packages/core/react/use-completion.ui.test.tsx index c51289dd728f..ff7a1df39d05 100644 --- a/packages/core/react/use-completion.ui.test.tsx +++ b/packages/core/react/use-completion.ui.test.tsx @@ -5,7 +5,6 @@ import { mockFetchDataStream, mockFetchDataStreamWithGenerator, mockFetchError, - mockFetchTextStream, } from '../tests/utils/mock-fetch'; import { useCompletion } from './use-completion'; @@ -45,18 +44,6 @@ afterEach(() => { cleanup(); }); -it('should render normal streamed stream', async () => { - mockFetchTextStream({ - url: 'https://example.com/api/completion', - chunks: ['Hello', ',', ' world', '.'], - }); - - await userEvent.type(screen.getByTestId('input'), 'hi{enter}'); - - await screen.findByTestId('completion'); - expect(screen.getByTestId('completion')).toHaveTextContent('Hello, world.'); -}); - it('should render complex text stream', async () => { mockFetchDataStream({ url: 'https://example.com/api/completion', diff --git a/packages/core/shared/call-chat-api.ts b/packages/core/shared/call-chat-api.ts index b6ea889635b4..8b30888ecb3d 100644 --- a/packages/core/shared/call-chat-api.ts +++ b/packages/core/shared/call-chat-api.ts @@ -1,12 +1,5 @@ import { parseComplexResponse } from './parse-complex-response'; -import { - FunctionCall, - IdGenerator, - JSONValue, - Message, - ToolCall, -} from './types'; -import { COMPLEX_HEADER, createChunkDecoder } from './utils'; +import { IdGenerator, JSONValue, Message } from './types'; export async function callChatApi({ api, @@ -15,7 +8,6 @@ export async function callChatApi({ credentials, headers, abortController, - appendMessage, restoreMessagesOnFailure, onResponse, onUpdate, @@ -29,7 +21,6 @@ export async function callChatApi({ headers?: HeadersInit; abortController?: () => AbortController | null; restoreMessagesOnFailure: () => void; - appendMessage: (message: Message) => void; onResponse?: (response: Response) => void | Promise; onUpdate: (merged: Message[], data: JSONValue[] | undefined) => void; onFinish?: (message: Message) => void; @@ -72,86 +63,17 @@ export async function callChatApi({ } const reader = response.body.getReader(); - const isComplexMode = response.headers.get(COMPLEX_HEADER) === 'true'; - if (isComplexMode) { - return await parseComplexResponse({ - reader, - abortControllerRef: - abortController != null ? { current: abortController() } : undefined, - update: onUpdate, - onFinish(prefixMap) { - if (onFinish && prefixMap.text != null) { - onFinish(prefixMap.text); - } - }, - generateId, - }); - } else { - const createdAt = new Date(); - const decode = createChunkDecoder(false); - - // TODO-STREAMDATA: Remove this once Stream Data is not experimental - let streamedResponse = ''; - const replyId = generateId(); - let responseMessage: Message = { - id: replyId, - createdAt, - content: '', - role: 'assistant', - }; - - // TODO-STREAMDATA: Remove this once Stream Data is not experimental - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - // Update the chat state with the new message tokens. - streamedResponse += decode(value); - - if (streamedResponse.startsWith('{"function_call":')) { - // While the function call is streaming, it will be a string. - responseMessage['function_call'] = streamedResponse; - } else if (streamedResponse.startsWith('{"tool_calls":')) { - // While the tool calls are streaming, it will be a string. - responseMessage['tool_calls'] = streamedResponse; - } else { - responseMessage['content'] = streamedResponse; + return await parseComplexResponse({ + reader, + abortControllerRef: + abortController != null ? { current: abortController() } : undefined, + update: onUpdate, + onFinish(prefixMap) { + if (onFinish && prefixMap.text != null) { + onFinish(prefixMap.text); } - - appendMessage({ ...responseMessage }); - - // The request has been aborted, stop reading the stream. - if (abortController?.() === null) { - reader.cancel(); - break; - } - } - - if (streamedResponse.startsWith('{"function_call":')) { - // Once the stream is complete, the function call is parsed into an object. - const parsedFunctionCall: FunctionCall = - JSON.parse(streamedResponse).function_call; - - responseMessage['function_call'] = parsedFunctionCall; - - appendMessage({ ...responseMessage }); - } - if (streamedResponse.startsWith('{"tool_calls":')) { - // Once the stream is complete, the tool calls are parsed into an array. - const parsedToolCalls: ToolCall[] = - JSON.parse(streamedResponse).tool_calls; - - responseMessage['tool_calls'] = parsedToolCalls; - - appendMessage({ ...responseMessage }); - } - - if (onFinish) { - onFinish(responseMessage); - } - - return responseMessage; - } + }, + generateId, + }); } diff --git a/packages/core/shared/call-completion-api.ts b/packages/core/shared/call-completion-api.ts index d8ba124b6c31..db1bc52ff403 100644 --- a/packages/core/shared/call-completion-api.ts +++ b/packages/core/shared/call-completion-api.ts @@ -1,6 +1,5 @@ import { readDataStream } from './read-data-stream'; import { JSONValue } from './types'; -import { COMPLEX_HEADER, createChunkDecoder } from './utils'; export async function callCompletionApi({ api, @@ -78,40 +77,17 @@ export async function callCompletionApi({ let result = ''; const reader = res.body.getReader(); - const isComplexMode = res.headers.get(COMPLEX_HEADER) === 'true'; - - if (isComplexMode) { - for await (const { type, value } of readDataStream(reader, { - isAborted: () => abortController === null, - })) { - switch (type) { - case 'text': { - result += value; - setCompletion(result); - break; - } - case 'data': { - onData?.(value); - break; - } - } - } - } else { - const decoder = createChunkDecoder(); - - while (true) { - const { done, value } = await reader.read(); - if (done) { + for await (const { type, value } of readDataStream(reader, { + isAborted: () => abortController === null, + })) { + switch (type) { + case 'text': { + result += value; + setCompletion(result); break; } - - // Update the completion state with the new message tokens. - result += decoder(value); - setCompletion(result); - - // The request has been aborted, stop reading the stream. - if (abortController === null) { - reader.cancel(); + case 'data': { + onData?.(value); break; } } diff --git a/packages/core/shared/utils.ts b/packages/core/shared/utils.ts index c74e5915872c..2c99cbfb4cf8 100644 --- a/packages/core/shared/utils.ts +++ b/packages/core/shared/utils.ts @@ -52,8 +52,3 @@ export const isStreamStringEqualToType = ( export type StreamString = `${(typeof StreamStringPrefixes)[keyof typeof StreamStringPrefixes]}:${string}\n`; - -/** - * A header sent to the client so it knows how to handle parsing the stream (as a deprecated text response or using the new prefixed protocol) - */ -export const COMPLEX_HEADER = 'X-Experimental-Stream-Data'; diff --git a/packages/core/solid/use-chat.ts b/packages/core/solid/use-chat.ts index d99a2bc139b7..02b9a0d771ce 100644 --- a/packages/core/solid/use-chat.ts +++ b/packages/core/solid/use-chat.ts @@ -170,9 +170,6 @@ export function useChat({ setStreamData([...existingData, ...(data ?? [])]); }, onFinish, - appendMessage(message) { - mutate([...chatRequest.messages, message]); - }, restoreMessagesOnFailure() { // Restore the previous messages if the request fails. if (previousMessages.status === 'success') { diff --git a/packages/core/solid/use-completion.ui.test.tsx b/packages/core/solid/use-completion.ui.test.tsx index 4959e5b469a1..319a5353bc8b 100644 --- a/packages/core/solid/use-completion.ui.test.tsx +++ b/packages/core/solid/use-completion.ui.test.tsx @@ -6,7 +6,6 @@ import { mockFetchDataStream, mockFetchDataStreamWithGenerator, mockFetchError, - mockFetchTextStream, } from '../tests/utils/mock-fetch'; import { useCompletion } from './use-completion'; @@ -39,18 +38,6 @@ afterEach(() => { cleanup(); }); -it('should render normal streamed stream', async () => { - mockFetchTextStream({ - url: 'https://example.com/api/completion', - chunks: ['Hello', ',', ' world', '.'], - }); - - await userEvent.click(screen.getByTestId('button')); - - await screen.findByTestId('completion'); - expect(screen.getByTestId('completion')).toHaveTextContent('Hello, world.'); -}); - it('should render complex text stream', async () => { mockFetchDataStream({ url: 'https://example.com/api/completion', diff --git a/packages/core/streams/ai-stream.ts b/packages/core/streams/ai-stream.ts index 9931b449eeb1..1bd0aa2bb2b4 100644 --- a/packages/core/streams/ai-stream.ts +++ b/packages/core/streams/ai-stream.ts @@ -37,10 +37,8 @@ export interface AIStreamCallbacksAndOptions { /** `onText`: Called for each text chunk. */ onText?: (text: string) => Promise | void; /** - * A flag for enabling the experimental_StreamData class and the new protocol. - * @see https://github.com/vercel-labs/ai/pull/425 - * - * When StreamData is rolled out, this will be removed and the new protocol will be used by default. + * @deprecated This flag is no longer used and only retained for backwards compatibility. + * You can remove it from your code. */ experimental_streamData?: boolean; } diff --git a/packages/core/streams/anthropic-stream.test.ts b/packages/core/streams/anthropic-stream.test.ts index 716e1fe4af89..dc3dfdf27569 100644 --- a/packages/core/streams/anthropic-stream.test.ts +++ b/packages/core/streams/anthropic-stream.test.ts @@ -40,12 +40,14 @@ afterAll(() => { }); describe('Anthropic completion', () => { - it('should be able to parse SSE and receive the streamed response', async () => { + it('should send text', async () => { const anthropic = new Anthropic({ fetch: () => fetch(DEFAULT_TEST_URL), apiKey: 'sk-doesnt-matter', }); + const data = new experimental_StreamData(); + const anthropicResponse = await anthropic.completions.create({ prompt: '', model: 'claude-2', @@ -53,95 +55,66 @@ describe('Anthropic completion', () => { max_tokens_to_sample: 300, }); - const stream = AnthropicStream(anthropicResponse); + const stream = AnthropicStream(anthropicResponse, { + onFinal() { + data.close(); + }, + }); - const response = new StreamingTextResponse(stream); + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - ' Hello', - ',', - ' world', - '.', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const anthropic = new Anthropic({ - fetch: () => fetch(DEFAULT_TEST_URL), - apiKey: 'sk-doesnt-matter', - }); - - const data = new experimental_StreamData(); - - const anthropicResponse = await anthropic.completions.create({ - prompt: '', - model: 'claude-2', - stream: true, - max_tokens_to_sample: 300, - }); - - const stream = AnthropicStream(anthropicResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + it('should send text and data', async () => { + const anthropic = new Anthropic({ + fetch: () => fetch(DEFAULT_TEST_URL), + apiKey: 'sk-doesnt-matter', + }); + + const data = new experimental_StreamData(); + + data.append({ t1: 'v1' }); + + const anthropicResponse = await anthropic.completions.create({ + prompt: '', + model: 'claude-2', + stream: true, + max_tokens_to_sample: 300, }); - it('should send text and data', async () => { - const anthropic = new Anthropic({ - fetch: () => fetch(DEFAULT_TEST_URL), - apiKey: 'sk-doesnt-matter', - }); - - const data = new experimental_StreamData(); - - data.append({ t1: 'v1' }); - - const anthropicResponse = await anthropic.completions.create({ - prompt: '', - model: 'claude-2', - stream: true, - max_tokens_to_sample: 300, - }); - - const stream = AnthropicStream(anthropicResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + const stream = AnthropicStream(anthropicResponse, { + onFinal() { + data.close(); + }, }); + + const response = new StreamingTextResponse(stream, {}, data); + + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); }); describe('Anthropic message', () => { - it('should be able to parse SSE and receive the streamed response', async () => { + it('should send text', async () => { const anthropic = new Anthropic({ fetch: () => fetch(MESSAGE_URL), apiKey: 'sk-doesnt-matter', }); + const data = new experimental_StreamData(); + const anthropicResponse = await anthropic.messages.create({ messages: [{ role: 'user', content: 'Hello' }], model: 'claude-2.1', @@ -149,84 +122,53 @@ describe('Anthropic message', () => { max_tokens: 300, }); - const stream = AnthropicStream(anthropicResponse); + const stream = AnthropicStream(anthropicResponse, { + onFinal() { + data.close(); + }, + }); - const response = new StreamingTextResponse(stream); + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - 'Hello', - ',', - ' world', - '.', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const anthropic = new Anthropic({ - fetch: () => fetch(MESSAGE_URL), - apiKey: 'sk-doesnt-matter', - }); - - const data = new experimental_StreamData(); - - const anthropicResponse = await anthropic.messages.create({ - messages: [{ role: 'user', content: 'Hello' }], - model: 'claude-2.1', - stream: true, - max_tokens: 300, - }); - - const stream = AnthropicStream(anthropicResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + it('should send text and data', async () => { + const anthropic = new Anthropic({ + fetch: () => fetch(MESSAGE_URL), + apiKey: 'sk-doesnt-matter', + }); + + const data = new experimental_StreamData(); + + data.append({ t1: 'v1' }); + + const anthropicResponse = await anthropic.messages.create({ + messages: [{ role: 'user', content: 'Hello' }], + model: 'claude-2.1', + stream: true, + max_tokens: 300, }); - it('should send text and data', async () => { - const anthropic = new Anthropic({ - fetch: () => fetch(MESSAGE_URL), - apiKey: 'sk-doesnt-matter', - }); - - const data = new experimental_StreamData(); - - data.append({ t1: 'v1' }); - - const anthropicResponse = await anthropic.messages.create({ - messages: [{ role: 'user', content: 'Hello' }], - model: 'claude-2.1', - stream: true, - max_tokens: 300, - }); - - const stream = AnthropicStream(anthropicResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + const stream = AnthropicStream(anthropicResponse, { + onFinal() { + data.close(); + }, }); + + const response = new StreamingTextResponse(stream, {}, data); + + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); }); diff --git a/packages/core/streams/anthropic-stream.ts b/packages/core/streams/anthropic-stream.ts index 6484bfee7c5e..0c748a58eea6 100644 --- a/packages/core/streams/anthropic-stream.ts +++ b/packages/core/streams/anthropic-stream.ts @@ -188,10 +188,10 @@ export function AnthropicStream( if (Symbol.asyncIterator in res) { return readableFromAsyncIterable(streamable(res)) .pipeThrough(createCallbacksTransformer(cb)) - .pipeThrough(createStreamDataTransformer(cb?.experimental_streamData)); + .pipeThrough(createStreamDataTransformer()); } else { return AIStream(res, parseAnthropicStream(), cb).pipeThrough( - createStreamDataTransformer(cb?.experimental_streamData), + createStreamDataTransformer(), ); } } diff --git a/packages/core/streams/aws-bedrock-stream.test.ts b/packages/core/streams/aws-bedrock-stream.test.ts index 22ceca1bfbe4..06cebc9a26cb 100644 --- a/packages/core/streams/aws-bedrock-stream.test.ts +++ b/packages/core/streams/aws-bedrock-stream.test.ts @@ -39,64 +39,47 @@ function simulateBedrockResponse(chunks: any[]) { describe('AWS Bedrock', () => { describe('Anthropic', () => { - it('should be able to parse SSE and receive the streamed response', async () => { + it('should send text', async () => { + const data = new experimental_StreamData(); + const bedrockResponse = simulateBedrockResponse(bedrockAnthropicChunks); - const stream = AWSBedrockAnthropicStream(bedrockResponse); - const response = new StreamingTextResponse(stream); + const stream = AWSBedrockAnthropicStream(bedrockResponse, { + onFinal() { + data.close(); + }, + }); + + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - ' Hello', - ',', - ' world', - '.', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const bedrockResponse = simulateBedrockResponse(bedrockAnthropicChunks); - const stream = AWSBedrockAnthropicStream(bedrockResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - const response = new StreamingTextResponse(stream, {}, data); + data.append({ t1: 'v1' }); - expect(await readAllChunks(response)).toEqual([ - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + const bedrockResponse = simulateBedrockResponse(bedrockAnthropicChunks); + const stream = AWSBedrockAnthropicStream(bedrockResponse, { + onFinal() { + data.close(); + }, }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); - - data.append({ t1: 'v1' }); - - const bedrockResponse = simulateBedrockResponse(bedrockAnthropicChunks); - const stream = AWSBedrockAnthropicStream(bedrockResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); + const response = new StreamingTextResponse(stream, {}, data); - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); - }); + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); }); @@ -107,156 +90,115 @@ describe('AWS Bedrock', () => { const response = new StreamingTextResponse(stream); expect(await readAllChunks(response)).toEqual([ - ' Hello', - ',', - ' world', - '.', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', ]); }); }); describe('Cohere', () => { - it('should be able to parse SSE and receive the streamed response', async () => { + it('should send text', async () => { + const data = new experimental_StreamData(); + const bedrockResponse = simulateBedrockResponse(bedrockCohereChunks); - const stream = AWSBedrockCohereStream(bedrockResponse); - const response = new StreamingTextResponse(stream); + const stream = AWSBedrockCohereStream(bedrockResponse, { + onFinal() { + data.close(); + }, + }); + + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - ' Hello', - '!', - ' How', - ' can', - ' I', - ' help', - ' you', - ' today', - '?', + '0:" Hello"\n', + '0:"!"\n', + '0:" How"\n', + '0:" can"\n', + '0:" I"\n', + '0:" help"\n', + '0:" you"\n', + '0:" today"\n', + '0:"?"\n', ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - const bedrockResponse = simulateBedrockResponse(bedrockCohereChunks); - const stream = AWSBedrockCohereStream(bedrockResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + data.append({ t1: 'v1' }); - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '0:" Hello"\n', - '0:"!"\n', - '0:" How"\n', - '0:" can"\n', - '0:" I"\n', - '0:" help"\n', - '0:" you"\n', - '0:" today"\n', - '0:"?"\n', - ]); + const bedrockResponse = simulateBedrockResponse(bedrockCohereChunks); + const stream = AWSBedrockCohereStream(bedrockResponse, { + onFinal() { + data.close(); + }, }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); + const response = new StreamingTextResponse(stream, {}, data); - data.append({ t1: 'v1' }); - - const bedrockResponse = simulateBedrockResponse(bedrockCohereChunks); - const stream = AWSBedrockCohereStream(bedrockResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:" Hello"\n', - '0:"!"\n', - '0:" How"\n', - '0:" can"\n', - '0:" I"\n', - '0:" help"\n', - '0:" you"\n', - '0:" today"\n', - '0:"?"\n', - ]); - }); + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:" Hello"\n', + '0:"!"\n', + '0:" How"\n', + '0:" can"\n', + '0:" I"\n', + '0:" help"\n', + '0:" you"\n', + '0:" today"\n', + '0:"?"\n', + ]); }); }); describe('Llama2', () => { - it('should be able to parse SSE and receive the streamed response', async () => { + it('should send text', async () => { + const data = new experimental_StreamData(); + const bedrockResponse = simulateBedrockResponse(bedrockLlama2Chunks); - const stream = AWSBedrockLlama2Stream(bedrockResponse); - const response = new StreamingTextResponse(stream); + const stream = AWSBedrockLlama2Stream(bedrockResponse, { + onFinal() { + data.close(); + }, + }); + + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - '', - ' Hello', - ',', - ' world', - '.', - '', + '0:""\n', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:""\n', ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const bedrockResponse = simulateBedrockResponse(bedrockLlama2Chunks); - const stream = AWSBedrockLlama2Stream(bedrockResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - const response = new StreamingTextResponse(stream, {}, data); + data.append({ t1: 'v1' }); - expect(await readAllChunks(response)).toEqual([ - '0:""\n', - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:""\n', - ]); + const bedrockResponse = simulateBedrockResponse(bedrockLlama2Chunks); + const stream = AWSBedrockLlama2Stream(bedrockResponse, { + onFinal() { + data.close(); + }, }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); - - data.append({ t1: 'v1' }); + const response = new StreamingTextResponse(stream, {}, data); - const bedrockResponse = simulateBedrockResponse(bedrockLlama2Chunks); - const stream = AWSBedrockLlama2Stream(bedrockResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:""\n', - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:""\n', - ]); - }); + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:""\n', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:""\n', + ]); }); }); }); diff --git a/packages/core/streams/aws-bedrock-stream.ts b/packages/core/streams/aws-bedrock-stream.ts index faaa62974b0b..3f53adf7a525 100644 --- a/packages/core/streams/aws-bedrock-stream.ts +++ b/packages/core/streams/aws-bedrock-stream.ts @@ -68,7 +68,5 @@ export function AWSBedrockStream( asDeltaIterable(response, extractTextDeltaFromChunk), ) .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData), - ); + .pipeThrough(createStreamDataTransformer()); } diff --git a/packages/core/streams/cohere-stream.test.ts b/packages/core/streams/cohere-stream.test.ts index 26594f65364c..af563d6a7dd8 100644 --- a/packages/core/streams/cohere-stream.test.ts +++ b/packages/core/streams/cohere-stream.test.ts @@ -1,4 +1,3 @@ -import { CohereClient } from 'cohere-ai'; import { CohereStream, StreamingTextResponse, @@ -34,82 +33,46 @@ describe('CohereStream', () => { server.close(); }); - it('should be able to parse Chat Streaming API and receive the streamed response', async () => { - const co = new CohereClient({ - token: 'cohere-token', - }); - const cohereResponse = await co.chatStream({ - message: 'hi there!', - }); + it('should send text', async () => { + const data = new experimental_StreamData(); - const stream = CohereStream(cohereResponse); - const response = new StreamingTextResponse(stream); - expect(await readAllChunks(response)).toEqual([ - ' Hello', - ',', - ' world', - '.', - ' ', - ]); - }); + const stream = CohereStream(await fetch(DEFAULT_TEST_URL), { + onFinal() { + data.close(); + }, + }); - it('should be able to parse SSE and receive the streamed response', async () => { - const stream = CohereStream(await fetch(DEFAULT_TEST_URL)); - const response = new StreamingTextResponse(stream); + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - ' Hello', - ',', - ' world', - '.', - ' ', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:" "\n', ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const stream = CohereStream(await fetch(DEFAULT_TEST_URL), { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - const response = new StreamingTextResponse(stream, {}, data); + data.append({ t1: 'v1' }); - expect(await readAllChunks(response)).toEqual([ - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:" "\n', - ]); + const stream = CohereStream(await fetch(DEFAULT_TEST_URL), { + onFinal() { + data.close(); + }, }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); + const response = new StreamingTextResponse(stream, {}, data); - data.append({ t1: 'v1' }); - - const stream = CohereStream(await fetch(DEFAULT_TEST_URL), { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:" "\n', - ]); - }); + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:" "\n', + ]); }); }); diff --git a/packages/core/streams/cohere-stream.ts b/packages/core/streams/cohere-stream.ts index 262b3650f8d8..5188d2ed6a45 100644 --- a/packages/core/streams/cohere-stream.ts +++ b/packages/core/streams/cohere-stream.ts @@ -93,14 +93,10 @@ export function CohereStream( if (Symbol.asyncIterator in reader) { return readableFromAsyncIterable(streamable(reader)) .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData), - ); + .pipeThrough(createStreamDataTransformer()); } else { return createParser(reader) .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData), - ); + .pipeThrough(createStreamDataTransformer()); } } diff --git a/packages/core/streams/google-generative-ai-stream.test.ts b/packages/core/streams/google-generative-ai-stream.test.ts index 6cd2ab04e32c..a5a0f5f9f9f0 100644 --- a/packages/core/streams/google-generative-ai-stream.test.ts +++ b/packages/core/streams/google-generative-ai-stream.test.ts @@ -67,57 +67,45 @@ export const googleGenerativeAIChunks = [ }, ]; -it('should be able to parse SSE and receive the streamed response', async () => { +it('should send text', async () => { + const data = new experimental_StreamData(); + const aiResponse = simulateGenerativeAIResponse(googleGenerativeAIChunks); - const stream = GoogleGenerativeAIStream(aiResponse); - const response = new StreamingTextResponse(stream); + const stream = GoogleGenerativeAIStream(aiResponse, { + onFinal() { + data.close(); + }, + }); - expect(await readAllChunks(response)).toEqual(['Hello', ',', ' world', '.']); -}); + const response = new StreamingTextResponse(stream, {}, data); -describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); + expect(await readAllChunks(response)).toEqual([ + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); +}); - const aiResponse = simulateGenerativeAIResponse(googleGenerativeAIChunks); - const stream = GoogleGenerativeAIStream(aiResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); +it('should send text and data', async () => { + const data = new experimental_StreamData(); - const response = new StreamingTextResponse(stream, {}, data); + data.append({ t1: 'v1' }); - expect(await readAllChunks(response)).toEqual([ - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + const aiResponse = simulateGenerativeAIResponse(googleGenerativeAIChunks); + const stream = GoogleGenerativeAIStream(aiResponse, { + onFinal() { + data.close(); + }, }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); + const response = new StreamingTextResponse(stream, {}, data); - data.append({ t1: 'v1' }); - - const aiResponse = simulateGenerativeAIResponse(googleGenerativeAIChunks); - const stream = GoogleGenerativeAIStream(aiResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); - }); + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); diff --git a/packages/core/streams/google-generative-ai-stream.ts b/packages/core/streams/google-generative-ai-stream.ts index ce72765d7653..8fe811df2383 100644 --- a/packages/core/streams/google-generative-ai-stream.ts +++ b/packages/core/streams/google-generative-ai-stream.ts @@ -56,5 +56,5 @@ export function GoogleGenerativeAIStream( ): ReadableStream { return readableFromAsyncIterable(streamable(response)) .pipeThrough(createCallbacksTransformer(cb)) - .pipeThrough(createStreamDataTransformer(cb?.experimental_streamData)); + .pipeThrough(createStreamDataTransformer()); } diff --git a/packages/core/streams/huggingface-stream.test.ts b/packages/core/streams/huggingface-stream.test.ts index 026357062203..1f0fda941aa4 100644 --- a/packages/core/streams/huggingface-stream.test.ts +++ b/packages/core/streams/huggingface-stream.test.ts @@ -35,78 +35,56 @@ describe('HuggingFace stream', () => { return createClient(response).readAll(); } - it('should be able to parse HuggingFace response and receive the streamed response', async () => { + it('should send text', async () => { + const data = new experimental_StreamData(); + const stream = HuggingFaceStream( Hf.textGenerationStream( { model: 'model', inputs: '' }, { fetch: () => fetch(DEFAULT_TEST_URL) }, ), + { + onFinal() { + data.close(); + }, + }, ); - const response = new StreamingTextResponse(stream); + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - 'Hello', - ',', - ' world', - '.', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', ]); }); - describe('StreamData prototcol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const stream = HuggingFaceStream( - Hf.textGenerationStream( - { model: 'model', inputs: '' }, - { fetch: () => fetch(DEFAULT_TEST_URL) }, - ), - { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }, - ); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); - }); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - it('should send text and data', async () => { - const data = new experimental_StreamData(); + data.append({ t1: 'v1' }); - data.append({ t1: 'v1' }); - - const stream = HuggingFaceStream( - Hf.textGenerationStream( - { model: 'model', inputs: '' }, - { fetch: () => fetch(DEFAULT_TEST_URL) }, - ), - { - onFinal() { - data.close(); - }, - experimental_streamData: true, + const stream = HuggingFaceStream( + Hf.textGenerationStream( + { model: 'model', inputs: '' }, + { fetch: () => fetch(DEFAULT_TEST_URL) }, + ), + { + onFinal() { + data.close(); }, - ); + }, + ); - const response = new StreamingTextResponse(stream, {}, data); + const response = new StreamingTextResponse(stream, {}, data); - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); - }); + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); }); diff --git a/packages/core/streams/huggingface-stream.ts b/packages/core/streams/huggingface-stream.ts index 464d09813410..cf776427574c 100644 --- a/packages/core/streams/huggingface-stream.ts +++ b/packages/core/streams/huggingface-stream.ts @@ -42,7 +42,5 @@ export function HuggingFaceStream( ): ReadableStream { return createParser(res) .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData), - ); + .pipeThrough(createStreamDataTransformer()); } diff --git a/packages/core/streams/inkeep-stream.test.ts b/packages/core/streams/inkeep-stream.test.ts index 67f43fbcaa7a..d9ef0a6e9f21 100644 --- a/packages/core/streams/inkeep-stream.test.ts +++ b/packages/core/streams/inkeep-stream.test.ts @@ -30,84 +30,65 @@ describe('InkeepStream', () => { server.close(); }); - it('should be able to parse SSE and receive the streamed response', async () => { + const recordsCitedSerialized = + '"records_cited":{"citations":[{"number":1,"record":{"url":"https://inkeep.com","title":"Inkeep","breadcrumbs":["Home","About"]}}]}'; + + it('should receive and send Inkeep onFinal metadata with chat_session_id', async () => { + const data = new experimental_StreamData(); + const response = await fetch(DEFAULT_TEST_URL); - const stream = InkeepStream(response); + const stream = InkeepStream(response, { + onFinal: async (complete: string, metadata?: InkeepOnFinalMetadata) => { + // return the chat_session_id to the client + if (metadata) { + data.append({ onFinalMetadata: metadata }); + } + data.close(); + }, + }); - const responseStream = new StreamingTextResponse(stream); + const responseStream = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(responseStream)).toEqual([ - ' Hello', - ',', - ' world', - '.', + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + `2:[{"onFinalMetadata":{"chat_session_id":"12345",${recordsCitedSerialized}}}]\n`, ]); }); - const recordsCitedSerialized = - '"records_cited":{"citations":[{"number":1,"record":{"url":"https://inkeep.com","title":"Inkeep","breadcrumbs":["Home","About"]}}]}'; - - describe('StreamData protocol', () => { - it('should receive and send Inkeep onFinal metadata with chat_session_id', async () => { - const data = new experimental_StreamData(); - - const response = await fetch(DEFAULT_TEST_URL); + it('should receive and send Inkeep records_cited data as message annotation', async () => { + const data = new experimental_StreamData(); - const stream = InkeepStream(response, { - onFinal: async (complete: string, metadata?: InkeepOnFinalMetadata) => { - // return the chat_session_id to the client - if (metadata) { - data.append({ onFinalMetadata: metadata }); - } - data.close(); - }, - experimental_streamData: true, - }); - - const responseStream = new StreamingTextResponse(stream, {}, data); + const response = await fetch(DEFAULT_TEST_URL); - expect(await readAllChunks(responseStream)).toEqual([ - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - `2:[{"onFinalMetadata":{"chat_session_id":"12345",${recordsCitedSerialized}}}]\n`, - ]); + const stream = InkeepStream(response, { + onRecordsCited: async records_cited => { + // append the citations to the message annotations + data.appendMessageAnnotation({ + records_cited, + }); + }, + onFinal: async (complete: string, metadata?: InkeepOnFinalMetadata) => { + // return the chat_session_id to the client + if (metadata) { + data.append({ onFinalMetadata: metadata }); + } + data.close(); + }, }); - it('should receive and send Inkeep records_cited data as message annotation', async () => { - const data = new experimental_StreamData(); - - const response = await fetch(DEFAULT_TEST_URL); - - const stream = InkeepStream(response, { - onRecordsCited: async records_cited => { - // append the citations to the message annotations - data.appendMessageAnnotation({ - records_cited, - }); - }, - onFinal: async (complete: string, metadata?: InkeepOnFinalMetadata) => { - // return the chat_session_id to the client - if (metadata) { - data.append({ onFinalMetadata: metadata }); - } - data.close(); - }, - experimental_streamData: true, - }); + const responseStream = new StreamingTextResponse(stream, {}, data); - const responseStream = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(responseStream)).toEqual([ - '0:" Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - `2:[{"onFinalMetadata":{"chat_session_id":"12345",${recordsCitedSerialized}}}]\n`, - `8:[{${recordsCitedSerialized}}]\n`, - ]); - }); + expect(await readAllChunks(responseStream)).toEqual([ + '0:" Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + `2:[{"onFinalMetadata":{"chat_session_id":"12345",${recordsCitedSerialized}}}]\n`, + `8:[{${recordsCitedSerialized}}]\n`, + ]); }); }); diff --git a/packages/core/streams/inkeep-stream.ts b/packages/core/streams/inkeep-stream.ts index 6b20f33a4f36..d5a214ce70ec 100644 --- a/packages/core/streams/inkeep-stream.ts +++ b/packages/core/streams/inkeep-stream.ts @@ -66,6 +66,6 @@ export function InkeepStream( }; return AIStream(res, inkeepEventParser, passThroughCallbacks).pipeThrough( - createStreamDataTransformer(passThroughCallbacks?.experimental_streamData), + createStreamDataTransformer(), ); } diff --git a/packages/core/streams/langchain-stream.test.ts b/packages/core/streams/langchain-stream.test.ts index 0d36e25d94e9..354c50d72cee 100644 --- a/packages/core/streams/langchain-stream.test.ts +++ b/packages/core/streams/langchain-stream.test.ts @@ -38,128 +38,106 @@ describe('LangchainStream', () => { server.close(); }); - describe('LangChain Expression Language call', () => { - it('should be able to parse SSE and receive the streamed response', async () => { - const model = new ChatOpenAI({ - streaming: true, - openAIApiKey: 'fake', - configuration: { baseURL: DEFAULT_TEST_URL }, - }); + it('should send text', async () => { + const data = new experimental_StreamData(); - const stream = await PromptTemplate.fromTemplate('{input}') - .pipe(model) - .pipe(new BytesOutputParser()) - .stream({ input: 'Hello' }); - - const response = new StreamingTextResponse(stream); - - expect(await readAllChunks(response)).toEqual([ - '', - 'Hello', - ',', - ' world', - '.', - '', - ]); + const model = new ChatOpenAI({ + streaming: true, + openAIApiKey: 'fake', + configuration: { baseURL: DEFAULT_TEST_URL }, }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const model = new ChatOpenAI({ - streaming: true, - openAIApiKey: 'fake', - configuration: { baseURL: DEFAULT_TEST_URL }, - }); - - const stream = await PromptTemplate.fromTemplate('{input}') - .pipe(model) - .pipe(new BytesOutputParser()) - .stream( - { input: 'Hello' }, + const stream = await PromptTemplate.fromTemplate('{input}') + .pipe(model) + .pipe(new BytesOutputParser()) + .stream( + { input: 'Hello' }, + { + callbacks: [ { - callbacks: [ - { - handleChainEnd(outputs, runId, parentRunId) { - // check that main chain (without parent) is finished: - if (parentRunId == null) { - data.close(); - } - }, - }, - ], + handleChainEnd(outputs, runId, parentRunId) { + // check that main chain (without parent) is finished: + if (parentRunId == null) { + data.close(); + } + }, }, - ); - - const response = new StreamingTextResponse( - stream.pipeThrough(createStreamDataTransformer(true)), - {}, - data, - ); - - expect(await readAllChunks(response)).toEqual([ - '0:""\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:""\n', - ]); - }); + ], + }, + ); + + const response = new StreamingTextResponse( + stream.pipeThrough(createStreamDataTransformer()), + {}, + data, + ); + + expect(await readAllChunks(response)).toEqual([ + '0:""\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:""\n', + ]); + }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - data.append({ t1: 'v1' }); + data.append({ t1: 'v1' }); - const model = new ChatOpenAI({ - streaming: true, - openAIApiKey: 'fake', - configuration: { baseURL: DEFAULT_TEST_URL }, - }); + const model = new ChatOpenAI({ + streaming: true, + openAIApiKey: 'fake', + configuration: { baseURL: DEFAULT_TEST_URL }, + }); - const stream = await PromptTemplate.fromTemplate('{input}') - .pipe(model) - .pipe(new BytesOutputParser()) - .stream( - { input: 'Hello' }, + const stream = await PromptTemplate.fromTemplate('{input}') + .pipe(model) + .pipe(new BytesOutputParser()) + .stream( + { input: 'Hello' }, + { + callbacks: [ { - callbacks: [ - { - handleChainEnd(outputs, runId, parentRunId) { - // check that main chain (without parent) is finished: - if (parentRunId == null) { - data.close(); - } - }, - }, - ], + handleChainEnd(outputs, runId, parentRunId) { + // check that main chain (without parent) is finished: + if (parentRunId == null) { + data.close(); + } + }, }, - ); - - const response = new StreamingTextResponse( - stream.pipeThrough(createStreamDataTransformer(true)), - {}, - data, - ); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:""\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:""\n', - ]); - }); - }); + ], + }, + ); + + const response = new StreamingTextResponse( + stream.pipeThrough(createStreamDataTransformer()), + {}, + data, + ); + + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:""\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:""\n', + ]); }); describe('LangChain LLM call', () => { - it('should be able to parse SSE and receive the streamed response', async () => { - const { stream, handlers } = LangChainStream(); + it('should send text', async () => { + const data = new experimental_StreamData(); + + const { stream, handlers } = LangChainStream({ + onFinal() { + data.close(); + }, + }); const llm = new ChatOpenAI({ streaming: true, @@ -171,85 +149,50 @@ describe('LangchainStream', () => { .call([new HumanMessage('hello')], {}, [handlers]) .catch(console.error); - const response = new StreamingTextResponse(stream); + const response = new StreamingTextResponse(stream, {}, data); expect(await readAllChunks(response)).toEqual([ - '', - 'Hello', - ',', - ' world', - '.', - '', + '0:""\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:""\n', ]); }); - describe('StreamData prototcol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const { stream, handlers } = LangChainStream({ - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const llm = new ChatOpenAI({ - streaming: true, - openAIApiKey: 'fake', - configuration: { baseURL: DEFAULT_TEST_URL }, - }); - - llm - .call([new HumanMessage('hello')], {}, [handlers]) - .catch(console.error); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '0:""\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:""\n', - ]); + it('should send text and data', async () => { + const data = new experimental_StreamData(); + + data.append({ t1: 'v1' }); + + const { stream, handlers } = LangChainStream({ + onFinal() { + data.close(); + }, }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); - - data.append({ t1: 'v1' }); - - const { stream, handlers } = LangChainStream({ - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const llm = new ChatOpenAI({ - streaming: true, - openAIApiKey: 'fake', - configuration: { baseURL: DEFAULT_TEST_URL }, - }); - - llm - .call([new HumanMessage('hello')], {}, [handlers]) - .catch(console.error); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:""\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - '0:""\n', - ]); + const llm = new ChatOpenAI({ + streaming: true, + openAIApiKey: 'fake', + configuration: { baseURL: DEFAULT_TEST_URL }, }); + + llm + .call([new HumanMessage('hello')], {}, [handlers]) + .catch(console.error); + + const response = new StreamingTextResponse(stream, {}, data); + + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:""\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + '0:""\n', + ]); }); }); }); diff --git a/packages/core/streams/langchain-stream.ts b/packages/core/streams/langchain-stream.ts index 2256dc7f97f1..1fec69374579 100644 --- a/packages/core/streams/langchain-stream.ts +++ b/packages/core/streams/langchain-stream.ts @@ -32,9 +32,7 @@ export function LangChainStream(callbacks?: AIStreamCallbacksAndOptions) { return { stream: stream.readable .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData), - ), + .pipeThrough(createStreamDataTransformer()), writer, handlers: { handleLLMNewToken: async (token: string) => { diff --git a/packages/core/streams/mistral-stream.test.ts b/packages/core/streams/mistral-stream.test.ts index e898c4423172..540b77cd5205 100644 --- a/packages/core/streams/mistral-stream.test.ts +++ b/packages/core/streams/mistral-stream.test.ts @@ -27,7 +27,9 @@ describe('MistralStream', () => { server.close(); }); - it('should be able to parse SSE and receive the streamed response', async () => { + it('should send text', async () => { + const data = new experimental_StreamData(); + const client = new MistralClient('api-key', 'http://localhost:3030'); const mistralResponse = client.chatStream({ @@ -35,80 +37,52 @@ describe('MistralStream', () => { messages: [{ role: 'user', content: 'What is the best French cheese?' }], }); - const stream = MistralStream(mistralResponse); - const response = new StreamingTextResponse(stream); + const stream = MistralStream(mistralResponse, { + onFinal() { + data.close(); + }, + }); + + const response = new StreamingTextResponse(stream, {}, data); const chunks = await readAllChunks(response); - expect(JSON.stringify(chunks)).toMatchInlineSnapshot( - `"[\\"Hello\\",\\",\\",\\" world\\",\\".\\"]"`, - ); + expect(chunks).toEqual([ + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const client = new MistralClient('api-key', 'http://localhost:3030'); - - const mistralResponse = client.chatStream({ - model: 'mistral-small', - messages: [ - { role: 'user', content: 'What is the best French cheese?' }, - ], - }); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - const stream = MistralStream(mistralResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + data.append({ t1: 'v1' }); - const response = new StreamingTextResponse(stream, {}, data); - - const chunks = await readAllChunks(response); + const client = new MistralClient('api-key', 'http://localhost:3030'); - expect(chunks).toEqual([ - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + const mistralResponse = client.chatStream({ + model: 'mistral-small', + messages: [{ role: 'user', content: 'What is the best French cheese?' }], }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); - - data.append({ t1: 'v1' }); - - const client = new MistralClient('api-key', 'http://localhost:3030'); - - const mistralResponse = client.chatStream({ - model: 'mistral-small', - messages: [ - { role: 'user', content: 'What is the best French cheese?' }, - ], - }); - - const stream = MistralStream(mistralResponse, { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + const stream = MistralStream(mistralResponse, { + onFinal() { + data.close(); + }, + }); - const response = new StreamingTextResponse(stream, {}, data); + const response = new StreamingTextResponse(stream, {}, data); - const chunks = await readAllChunks(response); + const chunks = await readAllChunks(response); - expect(chunks).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); - }); + expect(chunks).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); }); diff --git a/packages/core/streams/mistral-stream.ts b/packages/core/streams/mistral-stream.ts index 8004a16a54dc..4f288618acc5 100644 --- a/packages/core/streams/mistral-stream.ts +++ b/packages/core/streams/mistral-stream.ts @@ -25,7 +25,5 @@ export function MistralStream( const stream = readableFromAsyncIterable(streamable(response)); return stream .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData), - ); + .pipeThrough(createStreamDataTransformer()); } diff --git a/packages/core/streams/openai-stream.test.ts b/packages/core/streams/openai-stream.test.ts index c790bb411908..08065ae8db56 100644 --- a/packages/core/streams/openai-stream.test.ts +++ b/packages/core/streams/openai-stream.test.ts @@ -69,276 +69,203 @@ describe('OpenAIStream', () => { const stream = OpenAIStream(response); }); - it('should be able to parse SSE and receive the streamed response', async () => { - const stream = OpenAIStream(await fetch(DEFAULT_TEST_URL)); - const response = new StreamingTextResponse(stream); + it('should send text', async () => { + const data = new experimental_StreamData(); - const chunks = await readAllChunks(response); + const stream = OpenAIStream(await fetch(DEFAULT_TEST_URL), { + onFinal() { + data.close(); + }, + }); - expect(JSON.stringify(chunks)).toMatchInlineSnapshot( - `"[\\"Hello\\",\\",\\",\\" world\\",\\".\\"]"`, - ); - }); + const response = new StreamingTextResponse(stream, {}, data); - it('should correctly parse and escape function call JSON chunks', async () => { - const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL)); - const response = new StreamingTextResponse(stream); const client = createClient(response); const chunks = await client.readAll(); - const expectedChunks = [ - '{"function_call": {"name": "get_current_weather", "arguments": "', - '{\\n', - '\\"', - 'location', - '\\":', - ' \\"', - 'Char', - 'l', - 'ottesville', - ',', - ' Virginia', - '\\",\\n', - '\\"', - 'format', - '\\":', - ' \\"', - 'c', - 'elsius', - '\\"\\n', - '}', - '"}}', - ]; - - expect(chunks).toEqual(expectedChunks); - expect(chunks.join('')).toEqual( - `{"function_call": {"name": "get_current_weather", "arguments": "{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}}`, - ); - }); - - it('should handle backpressure on the server', async () => { - const controller = new AbortController(); - const stream = OpenAIStream( - await fetch(DEFAULT_TEST_URL, { - signal: controller.signal, - }), - ); - const response = new StreamingTextResponse(stream); - const client = createClient(response); - const chunks = await client.readAndAbort(controller); - - expect(JSON.stringify(chunks)).toMatchInlineSnapshot(`"[\\"Hello\\"]"`); + expect(chunks).toEqual([ + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const stream = OpenAIStream(await fetch(DEFAULT_TEST_URL), { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); - - const client = createClient(response); - const chunks = await client.readAll(); + it('should send function response as text stream when onFunctionCall is not defined', async () => { + const data = new experimental_StreamData(); - expect(chunks).toEqual([ - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); + const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { + onFinal() { + data.close(); + }, }); - it('should send function response as text stream when onFunctionCall is not defined', async () => { - const data = new experimental_StreamData(); + const response = new StreamingTextResponse(stream, {}, data); - const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + const client = createClient(response); + const chunks = await client.readAll(); - const response = new StreamingTextResponse(stream, {}, data); + expect(chunks).toEqual([ + '0:"{\\"function_call\\": {\\"name\\": \\"get_current_weather\\", \\"arguments\\": \\""\n', + '0:"{\\\\n"\n', + '0:"\\\\\\""\n', + '0:"location"\n', + '0:"\\\\\\":"\n', + '0:" \\\\\\""\n', + '0:"Char"\n', + '0:"l"\n', + '0:"ottesville"\n', + '0:","\n', + '0:" Virginia"\n', + '0:"\\\\\\",\\\\n"\n', + '0:"\\\\\\""\n', + '0:"format"\n', + '0:"\\\\\\":"\n', + '0:" \\\\\\""\n', + '0:"c"\n', + '0:"elsius"\n', + '0:"\\\\\\"\\\\n"\n', + '0:"}"\n', + '0:"\\"}}"\n', + ]); + }); - const client = createClient(response); - const chunks = await client.readAll(); + it('should send function response when onFunctionCall is defined and returns undefined', async () => { + const data = new experimental_StreamData(); - expect(chunks).toEqual([ - '0:"{\\"function_call\\": {\\"name\\": \\"get_current_weather\\", \\"arguments\\": \\""\n', - '0:"{\\\\n"\n', - '0:"\\\\\\""\n', - '0:"location"\n', - '0:"\\\\\\":"\n', - '0:" \\\\\\""\n', - '0:"Char"\n', - '0:"l"\n', - '0:"ottesville"\n', - '0:","\n', - '0:" Virginia"\n', - '0:"\\\\\\",\\\\n"\n', - '0:"\\\\\\""\n', - '0:"format"\n', - '0:"\\\\\\":"\n', - '0:" \\\\\\""\n', - '0:"c"\n', - '0:"elsius"\n', - '0:"\\\\\\"\\\\n"\n', - '0:"}"\n', - '0:"\\"}}"\n', - ]); + const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { + onFinal() { + data.close(); + }, + async experimental_onFunctionCall({ name }) { + // no response + }, }); - it('should send function response when onFunctionCall is defined and returns undefined', async () => { - const data = new experimental_StreamData(); + const response = new StreamingTextResponse(stream, {}, data); - const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { - onFinal() { - data.close(); - }, - async experimental_onFunctionCall({ name }) { - // no response - }, - experimental_streamData: true, - }); - - const response = new StreamingTextResponse(stream, {}, data); + const client = createClient(response); + const chunks = await client.readAll(); - const client = createClient(response); - const chunks = await client.readAll(); + expect(chunks).toEqual([ + '1:{"function_call":{"name":"get_current_weather","arguments":"{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}}\n', + ]); + }); - expect(chunks).toEqual([ - '1:{"function_call":{"name":"get_current_weather","arguments":"{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}}\n', - ]); + it('should not call onText for function calls', async () => { + const data = new experimental_StreamData(); + + const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { + onFinal() { + data.close(); + }, + async experimental_onFunctionCall({ name }) { + // no response + }, + onText(token) { + assert.fail(`onText should not be called (token: ${token})`); + }, }); - it('should not call onText for function calls', async () => { - const data = new experimental_StreamData(); + const response = new StreamingTextResponse(stream, {}, data); - const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { - onFinal() { - data.close(); - }, - async experimental_onFunctionCall({ name }) { - // no response - }, - onText(token) { - assert.fail(`onText should not be called (token: ${token})`); - }, - experimental_streamData: true, - }); + const client = createClient(response); - const response = new StreamingTextResponse(stream, {}, data); + await client.readAll(); // consume stream + }); - const client = createClient(response); + it('should send function response and data when onFunctionCall is defined, returns undefined, and data is added', async () => { + const data = new experimental_StreamData(); - await client.readAll(); // consume stream - }); + const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { + onFinal() { + data.close(); + }, + async experimental_onFunctionCall({ name }) { + data.append({ fn: name }); - it('should send function response and data when onFunctionCall is defined, returns undefined, and data is added', async () => { - const data = new experimental_StreamData(); + // no response + }, + }); - const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { - onFinal() { - data.close(); - }, - async experimental_onFunctionCall({ name }) { - data.append({ fn: name }); + const response = new StreamingTextResponse(stream, {}, data); - // no response - }, - experimental_streamData: true, - }); + const client = createClient(response); + const chunks = await client.readAll(); - const response = new StreamingTextResponse(stream, {}, data); + expect(chunks).toEqual([ + '2:[{"fn":"get_current_weather"}]\n', + '1:{"function_call":{"name":"get_current_weather","arguments":"{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}}\n', + ]); + }); - const client = createClient(response); - const chunks = await client.readAll(); + it('should send return value when onFunctionCall is defined and returns value', async () => { + const data = new experimental_StreamData(); - expect(chunks).toEqual([ - '2:[{"fn":"get_current_weather"}]\n', - '1:{"function_call":{"name":"get_current_weather","arguments":"{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}}\n', - ]); + const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { + onFinal() { + data.close(); + }, + async experimental_onFunctionCall({ name }) { + return 'experimental_onFunctionCall-return-value'; + }, }); - it('should send return value when onFunctionCall is defined and returns value', async () => { - const data = new experimental_StreamData(); - - const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { - onFinal() { - data.close(); - }, - async experimental_onFunctionCall({ name }) { - return 'experimental_onFunctionCall-return-value'; - }, - experimental_streamData: true, - }); + const response = new StreamingTextResponse(stream, {}, data); - const response = new StreamingTextResponse(stream, {}, data); + const client = createClient(response); + const chunks = await client.readAll(); - const client = createClient(response); - const chunks = await client.readAll(); + expect(chunks).toEqual(['0:"experimental_onFunctionCall-return-value"\n']); + }); - expect(chunks).toEqual([ - '0:"experimental_onFunctionCall-return-value"\n', - ]); + it('should send return value and data when onFunctionCall is defined, returns value and data is added', async () => { + const data = new experimental_StreamData(); + + const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { + onFinal() { + data.close(); + }, + async experimental_onFunctionCall({ name }) { + data.append({ fn: name }); + return 'experimental_onFunctionCall-return-value'; + }, }); - it('should send return value and data when onFunctionCall is defined, returns value and data is added', async () => { - const data = new experimental_StreamData(); - - const stream = OpenAIStream(await fetch(FUNCTION_CALL_TEST_URL), { - onFinal() { - data.close(); - }, - async experimental_onFunctionCall({ name }) { - data.append({ fn: name }); - return 'experimental_onFunctionCall-return-value'; - }, - experimental_streamData: true, - }); + const response = new StreamingTextResponse(stream, {}, data); - const response = new StreamingTextResponse(stream, {}, data); - - const client = createClient(response); - const chunks = await client.readAll(); + const client = createClient(response); + const chunks = await client.readAll(); - expect(chunks).toEqual([ - '2:[{"fn":"get_current_weather"}]\n', - '0:"experimental_onFunctionCall-return-value"\n', - ]); - }); + expect(chunks).toEqual([ + '2:[{"fn":"get_current_weather"}]\n', + '0:"experimental_onFunctionCall-return-value"\n', + ]); + }); - it('should send text and data', async () => { - const data = new experimental_StreamData(); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - data.append({ t1: 'v1' }); + data.append({ t1: 'v1' }); - const stream = OpenAIStream(await fetch(DEFAULT_TEST_URL), { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }); + const stream = OpenAIStream(await fetch(DEFAULT_TEST_URL), { + onFinal() { + data.close(); + }, + }); - const response = new StreamingTextResponse(stream, {}, data); + const response = new StreamingTextResponse(stream, {}, data); - const client = createClient(response); - const chunks = await client.readAll(); + const client = createClient(response); + const chunks = await client.readAll(); - expect(chunks).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:"Hello"\n', - '0:","\n', - '0:" world"\n', - '0:"."\n', - ]); - }); + expect(chunks).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:"Hello"\n', + '0:","\n', + '0:" world"\n', + '0:"."\n', + ]); }); describe('tool calls', () => { @@ -407,37 +334,34 @@ describe('OpenAIStream', () => { } } - describe('StreamData prototcol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); + it('should send text', async () => { + const data = new experimental_StreamData(); - const stream = OpenAIStream( - asyncIterableFromArray(azureOpenaiChatCompletionChunks), - { - onFinal() { - data.close(); - }, - experimental_streamData: true, + const stream = OpenAIStream( + asyncIterableFromArray(azureOpenaiChatCompletionChunks), + { + onFinal() { + data.close(); }, - ); - - const response = new StreamingTextResponse(stream, {}, data); - - const client = createClient(response); - const chunks = await client.readAll(); - - expect(chunks).toEqual([ - '0:"Hello"\n', - '0:"!"\n', - '0:" How"\n', - '0:" can"\n', - '0:" I"\n', - '0:" assist"\n', - '0:" you"\n', - '0:" today"\n', - '0:"?"\n', - ]); - }); + }, + ); + + const response = new StreamingTextResponse(stream, {}, data); + + const client = createClient(response); + const chunks = await client.readAll(); + + expect(chunks).toEqual([ + '0:"Hello"\n', + '0:"!"\n', + '0:" How"\n', + '0:" can"\n', + '0:" I"\n', + '0:" assist"\n', + '0:" you"\n', + '0:" today"\n', + '0:"?"\n', + ]); }); }); }); diff --git a/packages/core/streams/openai-stream.ts b/packages/core/streams/openai-stream.ts index fe44723ab026..ddacf183637b 100644 --- a/packages/core/streams/openai-stream.ts +++ b/packages/core/streams/openai-stream.ts @@ -481,9 +481,7 @@ export function OpenAIStream( const functionCallTransformer = createFunctionCallTransformer(cb); return stream.pipeThrough(functionCallTransformer); } else { - return stream.pipeThrough( - createStreamDataTransformer(cb?.experimental_streamData), - ); + return stream.pipeThrough(createStreamDataTransformer()); } } @@ -501,7 +499,6 @@ function createFunctionCallTransformer( let functionCallMessages: CreateMessage[] = callbacks[__internal__OpenAIFnMessagesSymbol] || []; - const isComplexMode = callbacks?.experimental_streamData; const decode = createChunkDecoder(); return new TransformStream({ @@ -524,9 +521,7 @@ function createFunctionCallTransformer( // Stream as normal if (!isFunctionStreamingIn) { controller.enqueue( - isComplexMode - ? textEncoder.encode(formatStreamPart('text', message)) - : chunk, + textEncoder.encode(formatStreamPart('text', message)), ); return; } else { @@ -667,22 +662,18 @@ function createFunctionCallTransformer( // so we just return the function call as a message controller.enqueue( textEncoder.encode( - isComplexMode - ? formatStreamPart( - payload.function_call ? 'function_call' : 'tool_calls', - // parse to prevent double-encoding: - JSON.parse(aggregatedResponse), - ) - : aggregatedResponse, + formatStreamPart( + payload.function_call ? 'function_call' : 'tool_calls', + // parse to prevent double-encoding: + JSON.parse(aggregatedResponse), + ), ), ); return; } else if (typeof functionResponse === 'string') { // The user returned a string, so we just return it as a message controller.enqueue( - isComplexMode - ? textEncoder.encode(formatStreamPart('text', functionResponse)) - : textEncoder.encode(functionResponse), + textEncoder.encode(formatStreamPart('text', functionResponse)), ); aggregatedFinalCompletionResponse = functionResponse; return; diff --git a/packages/core/streams/replicate-stream.test.ts b/packages/core/streams/replicate-stream.test.ts index 0ec58daa7c8f..a011a1471a75 100644 --- a/packages/core/streams/replicate-stream.test.ts +++ b/packages/core/streams/replicate-stream.test.ts @@ -28,8 +28,8 @@ describe('ReplicateStream', () => { server.close(); }); - it('should be able to parse SSE and receive the streamed response', async () => { - // Note: this only tests the streaming response from Replicate, not the framework invocation. + it('should send text', async () => { + const data = new experimental_StreamData(); const stream = await ReplicateStream( { @@ -41,76 +41,51 @@ describe('ReplicateStream', () => { created_at: 'fake', urls: { get: '', cancel: '', stream: DEFAULT_TEST_URL }, }, - undefined, + { + onFinal() { + data.close(); + }, + }, ); - const response = new StreamingTextResponse(stream); + const response = new StreamingTextResponse(stream, {}, data); - expect(await readAllChunks(response)).toEqual([' Hello,', ' world', '.']); + expect(await readAllChunks(response)).toEqual([ + '0:" Hello,"\n', + '0:" world"\n', + '0:"."\n', + ]); }); - describe('StreamData protocol', () => { - it('should send text', async () => { - const data = new experimental_StreamData(); - - const stream = await ReplicateStream( - { - id: 'fake', - status: 'processing', - version: 'fake', - input: {}, - source: 'api', - created_at: 'fake', - urls: { get: '', cancel: '', stream: DEFAULT_TEST_URL }, - }, - { - onFinal() { - data.close(); - }, - experimental_streamData: true, - }, - ); - - const response = new StreamingTextResponse(stream, {}, data); - - expect(await readAllChunks(response)).toEqual([ - '0:" Hello,"\n', - '0:" world"\n', - '0:"."\n', - ]); - }); - - it('should send text and data', async () => { - const data = new experimental_StreamData(); + it('should send text and data', async () => { + const data = new experimental_StreamData(); - data.append({ t1: 'v1' }); + data.append({ t1: 'v1' }); - const stream = await ReplicateStream( - { - id: 'fake', - status: 'processing', - version: 'fake', - input: {}, - source: 'api', - created_at: 'fake', - urls: { get: '', cancel: '', stream: DEFAULT_TEST_URL }, - }, - { - onFinal() { - data.close(); - }, - experimental_streamData: true, + const stream = await ReplicateStream( + { + id: 'fake', + status: 'processing', + version: 'fake', + input: {}, + source: 'api', + created_at: 'fake', + urls: { get: '', cancel: '', stream: DEFAULT_TEST_URL }, + }, + { + onFinal() { + data.close(); }, - ); + }, + ); - const response = new StreamingTextResponse(stream, {}, data); + const response = new StreamingTextResponse(stream, {}, data); - expect(await readAllChunks(response)).toEqual([ - '2:[{"t1":"v1"}]\n', - '0:" Hello,"\n', - '0:" world"\n', - '0:"."\n', - ]); - }); + expect(await readAllChunks(response)).toEqual([ + '2:[{"t1":"v1"}]\n', + '0:" Hello,"\n', + '0:" world"\n', + '0:"."\n', + ]); }); }); diff --git a/packages/core/streams/replicate-stream.ts b/packages/core/streams/replicate-stream.ts index 4242e2660cc1..dd7e00c961c5 100644 --- a/packages/core/streams/replicate-stream.ts +++ b/packages/core/streams/replicate-stream.ts @@ -68,6 +68,6 @@ export async function ReplicateStream( }); return AIStream(eventStream, undefined, cb).pipeThrough( - createStreamDataTransformer(cb?.experimental_streamData), + createStreamDataTransformer(), ); } diff --git a/packages/core/streams/stream-data.ts b/packages/core/streams/stream-data.ts index 785db1772f6a..f32aad8e0e92 100644 --- a/packages/core/streams/stream-data.ts +++ b/packages/core/streams/stream-data.ts @@ -119,16 +119,7 @@ export class experimental_StreamData { * A TransformStream for LLMs that do not have their own transform stream handlers managing encoding (e.g. OpenAIStream has one for function call handling). * This assumes every chunk is a 'text' chunk. */ -export function createStreamDataTransformer( - experimental_streamData: boolean | undefined, -) { - if (!experimental_streamData) { - return new TransformStream({ - transform: async (chunk, controller) => { - controller.enqueue(chunk); - }, - }); - } +export function createStreamDataTransformer() { const encoder = new TextEncoder(); const decoder = new TextDecoder(); return new TransformStream({ diff --git a/packages/core/streams/streaming-react-response.test.tsx b/packages/core/streams/streaming-react-response.test.tsx index 272fc7a2a199..0c530591e529 100644 --- a/packages/core/streams/streaming-react-response.test.tsx +++ b/packages/core/streams/streaming-react-response.test.tsx @@ -67,6 +67,7 @@ async function extractReactRowContents(response: Promise) { describe('without ui', () => { it('should stream text response as React rows', async () => { const stream = OpenAIStream(await fetch(DEFAULT_TEST_URL)); + const response = new experimental_StreamingReactResponse( stream, {}, @@ -90,7 +91,6 @@ describe('without ui', () => { onFinal() { data.close(); }, - experimental_streamData: true, }); const response = new experimental_StreamingReactResponse(stream, { @@ -134,7 +134,6 @@ describe('with ui: sync jsx for content', () => { onFinal() { data.close(); }, - experimental_streamData: true, }); const response = new experimental_StreamingReactResponse(stream, { @@ -179,7 +178,6 @@ describe('with ui: async sync jsx for content', () => { onFinal() { data.close(); }, - experimental_streamData: true, }); const response = new experimental_StreamingReactResponse(stream, { @@ -211,7 +209,6 @@ describe('with ui: sync jsx for content and data', () => { data.append({ fn: name }); return undefined; }, - experimental_streamData: true, }); const response = new experimental_StreamingReactResponse(stream, { @@ -256,7 +253,6 @@ describe('with ui: async jsx for content and data', () => { data.append({ fn: name }); return undefined; }, - experimental_streamData: true, }); const response = new experimental_StreamingReactResponse(stream, { diff --git a/packages/core/streams/streaming-react-response.ts b/packages/core/streams/streaming-react-response.ts index 489462bb53b5..a49fc058598f 100644 --- a/packages/core/streams/streaming-react-response.ts +++ b/packages/core/streams/streaming-react-response.ts @@ -8,10 +8,9 @@ * between the rows, but flushing the full payload on each row. */ -import { generateId } from '../shared/generate-id'; import { parseComplexResponse } from '../shared/parse-complex-response'; import { IdGenerator, JSONValue } from '../shared/types'; -import { createChunkDecoder } from '../shared/utils'; +import { nanoid } from '../shared/utils'; import { experimental_StreamData } from './stream-data'; type UINode = string | JSX.Element | JSX.Element[] | null | undefined; @@ -45,87 +44,43 @@ export class experimental_StreamingReactResponse { resolveFunc = resolve; }); - if (options?.data) { - const processedStream: ReadableStream = res.pipeThrough( - options.data.stream, - ); - - let lastPayload: Payload | undefined = undefined; - - // runs asynchronously (no await on purpose) - parseComplexResponse({ - reader: processedStream.getReader(), - update: (merged, data) => { - const content = merged[0]?.content ?? ''; - const ui = options?.ui?.({ content, data }) || content; - const payload: Payload = { ui, content }; - - const resolvePrevious = resolveFunc; - const nextRow = new Promise(resolve => { - resolveFunc = resolve; - }); - - resolvePrevious({ - next: nextRow, - ...payload, - }); - - lastPayload = payload; - }, - generateId: options.generateId ?? generateId, - onFinish: () => { - // The last payload is resolved twice. This is necessary because we immediately - // push out a payload, but we also need to forward the finish event with a payload. - if (lastPayload !== undefined) { - resolveFunc({ - next: null, - ...lastPayload, - }); - } - }, - }); - - return next; - } - - let content = ''; - - const decode = createChunkDecoder(); - const reader = res.getReader(); - async function readChunk() { - const { done, value } = await reader.read(); - if (!done) { - content += decode(value); - } - - // TODO: Handle generators. With this current implementation we can support - // synchronous and asynchronous UIs. - // TODO: Handle function calls. - const ui = options?.ui?.({ content }) || content; - - const payload: Payload = { - ui, - content, - }; - - const resolvePrevious = resolveFunc; - const nextRow = done - ? null - : new Promise(resolve => { - resolveFunc = resolve; + const processedStream: ReadableStream = + options?.data != null ? res.pipeThrough(options?.data?.stream) : res; + + let lastPayload: Payload | undefined = undefined; + + // runs asynchronously (no await on purpose) + parseComplexResponse({ + reader: processedStream.getReader(), + update: (merged, data) => { + const content = merged[0]?.content ?? ''; + const ui = options?.ui?.({ content, data }) || content; + const payload: Payload = { ui, content }; + + const resolvePrevious = resolveFunc; + const nextRow = new Promise(resolve => { + resolveFunc = resolve; + }); + + resolvePrevious({ + next: nextRow, + ...payload, + }); + + lastPayload = payload; + }, + generateId: options?.generateId ?? nanoid, + onFinish: () => { + // The last payload is resolved twice. This is necessary because we immediately + // push out a payload, but we also need to forward the finish event with a payload. + if (lastPayload !== undefined) { + resolveFunc({ + next: null, + ...lastPayload, }); - resolvePrevious({ - next: nextRow, - ...payload, - }); - - if (done) { - return; - } - - await readChunk(); - } - readChunk(); + } + }, + }); return next; } diff --git a/packages/core/streams/streaming-text-response.ts b/packages/core/streams/streaming-text-response.ts index 9b06df6f9fe2..5c9425e5d3f3 100644 --- a/packages/core/streams/streaming-text-response.ts +++ b/packages/core/streams/streaming-text-response.ts @@ -1,6 +1,5 @@ import type { ServerResponse } from 'node:http'; import { experimental_StreamData } from './stream-data'; -import { COMPLEX_HEADER } from '../shared/utils'; /** * A utility class for streaming text responses. @@ -22,7 +21,6 @@ export class StreamingTextResponse extends Response { status: 200, headers: { 'Content-Type': 'text/plain; charset=utf-8', - [COMPLEX_HEADER]: data ? 'true' : 'false', ...init?.headers, }, }); diff --git a/packages/core/svelte/use-chat.ts b/packages/core/svelte/use-chat.ts index 261e8a14ff38..2e69c16f2498 100644 --- a/packages/core/svelte/use-chat.ts +++ b/packages/core/svelte/use-chat.ts @@ -122,9 +122,6 @@ const getStreamedResponse = async ( ...chatRequest.options?.headers, }, abortController: () => abortControllerRef, - appendMessage(message) { - mutate([...chatRequest.messages, message]); - }, restoreMessagesOnFailure() { mutate(previousMessages); }, diff --git a/packages/core/tests/utils/mock-fetch.ts b/packages/core/tests/utils/mock-fetch.ts index c00fbd87b990..ed15f2c7c33d 100644 --- a/packages/core/tests/utils/mock-fetch.ts +++ b/packages/core/tests/utils/mock-fetch.ts @@ -1,5 +1,4 @@ import { vi } from 'vitest'; -import { COMPLEX_HEADER } from '../../shared/utils'; export function mockFetchTextStream({ url, @@ -70,9 +69,6 @@ export function mockFetchDataStreamWithGenerator({ url, ok: true, status: 200, - headers: new Map([ - [COMPLEX_HEADER, 'true'], - ]) as any as Headers, bodyUsed: false, body: { getReader() { diff --git a/packages/core/vue/use-chat.ts b/packages/core/vue/use-chat.ts index d0655d8b267d..c8376c156a3e 100644 --- a/packages/core/vue/use-chat.ts +++ b/packages/core/vue/use-chat.ts @@ -171,9 +171,6 @@ export function useChat({ mutate([...chatRequest.messages, message]); onFinish?.(message); }, - appendMessage(message) { - mutate([...chatRequest.messages, message]); - }, restoreMessagesOnFailure() { // Restore the previous messages if the request fails. mutate(previousMessages); diff --git a/packages/core/vue/use-completion.ui.test.ts b/packages/core/vue/use-completion.ui.test.ts index 1d0d7efc30b9..2e86522cfeda 100644 --- a/packages/core/vue/use-completion.ui.test.ts +++ b/packages/core/vue/use-completion.ui.test.ts @@ -5,7 +5,6 @@ import { mockFetchDataStream, mockFetchDataStreamWithGenerator, mockFetchError, - mockFetchTextStream, } from '../tests/utils/mock-fetch'; import TestCompletionComponent from './TestCompletionComponent.vue'; @@ -18,19 +17,6 @@ afterEach(() => { cleanup(); }); -it('should render normal streamed stream', async () => { - mockFetchTextStream({ - url: 'https://example.com/api/completion', - chunks: ['Hello', ',', ' world', '.'], - }); - - await userEvent.type(screen.getByTestId('input'), 'hi{enter}'); - - findByText(await screen.findByTestId('completion'), 'Hello, world.'); - - expect(screen.getByTestId('completion')).toHaveTextContent('Hello, world.'); -}); - it('should render complex text stream', async () => { mockFetchDataStream({ url: 'https://example.com/api/completion',