Skip to content

Commit

Permalink
Enable stream data protocol and deprecate experimental_streamData: tr…
Browse files Browse the repository at this point in the history
…ue flag. (#1192)
  • Loading branch information
lgrammel authored Apr 9, 2024
1 parent f42bbb5 commit a6b2500
Show file tree
Hide file tree
Showing 57 changed files with 799 additions and 1,452 deletions.
5 changes: 5 additions & 0 deletions .changeset/two-bats-itch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': patch
---

Deprecated the `experimental_streamData: true` setting from AIStreamCallbacksAndOptions. You can delete occurrences in your code. The stream data protocol is now used by default.
1 change: 0 additions & 1 deletion docs/pages/docs/api-reference/providers/inkeep-stream.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ export async function POST(req: Request) {
}
data.close();
},
experimental_streamData: true,
});

return new StreamingTextResponse(stream, {}, data);
Expand Down
1 change: 0 additions & 1 deletion docs/pages/docs/api-reference/providers/mistral-stream.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ export async function POST(req: Request) {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

// Respond with the stream
Expand Down
2 changes: 0 additions & 2 deletions docs/pages/docs/api-reference/stream-data.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ export async function POST(req: Request) {
// IMPORTANT! you must close StreamData manually or the response will never finish.
data.close();
},
// IMPORTANT! until this is stable, you must explicitly opt in to supporting streamData.
experimental_streamData: true,
});

data.append({
Expand Down
1 change: 0 additions & 1 deletion docs/pages/docs/guides/providers/inkeep.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ export async function POST(req: Request) {
}
data.close();
},
experimental_streamData: true,
});

return new StreamingTextResponse(stream, {}, data);
Expand Down
1 change: 0 additions & 1 deletion examples/next-inkeep/app/api/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ export async function POST(req: Request) {
}
data.close();
},
experimental_streamData: true,
});

return new StreamingTextResponse(stream, {}, data);
Expand Down
1 change: 0 additions & 1 deletion examples/next-langchain/app/api/stream-data-basic/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export async function POST(req: Request) {
data.append(JSON.stringify({ key: 'value' })); // example
data.close();
},
experimental_streamData: true,
});

const llm = new ChatOpenAI({
Expand Down
1 change: 0 additions & 1 deletion examples/next-langchain/app/api/stream-data-chain/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export async function POST(req: Request) {
data.append(JSON.stringify({ key: 'value' })); // example
data.close();
},
experimental_streamData: true,
});

await chain.stream({ product: value }, { callbacks: [handlers] });
Expand Down
1 change: 0 additions & 1 deletion examples/next-mistral/app/api/completion/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ export async function POST(req: Request) {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

// Respond with the stream
Expand Down
1 change: 0 additions & 1 deletion examples/next-openai/app/api/chat-with-functions/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ export async function POST(req: Request) {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

data.append({
Expand Down
1 change: 0 additions & 1 deletion examples/next-openai/app/api/chat-with-tools/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ export async function POST(req: Request) {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

data.append({
Expand Down
1 change: 0 additions & 1 deletion examples/next-openai/app/api/completion/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export async function POST(req: Request) {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

// Respond with the stream
Expand Down
11 changes: 5 additions & 6 deletions examples/next-openai/app/stream-react-response/action.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ export async function handler({ messages }: { messages: Message[] }) {

return undefined;
},
experimental_streamData: true,
});

return new experimental_StreamingReactResponse(stream, {
Expand All @@ -113,11 +112,11 @@ export async function handler({ messages }: { messages: Message[] }) {
switch (value.type) {
case 'weather': {
return (
<div className="bg-blue-500 text-white p-6 rounded-lg shadow-md">
<div className="flex justify-between items-center">
<div className="p-6 text-white bg-blue-500 rounded-lg shadow-md">
<div className="flex items-center justify-between">
<h2 className="text-2xl font-bold">{value.location}</h2>
<svg
className=" w-8 h-8"
className="w-8 h-8 "
fill="none"
height="24"
stroke="currentColor"
Expand All @@ -131,7 +130,7 @@ export async function handler({ messages }: { messages: Message[] }) {
<path d="M17.5 19H9a7 7 0 1 1 6.71-9h1.79a4.5 4.5 0 1 1 0 9Z" />
</svg>
</div>
<p className="text-4xl font-semibold mt-2">
<p className="mt-2 text-4xl font-semibold">
{value.temperature}° {value.format}
</p>
</div>
Expand All @@ -143,7 +142,7 @@ export async function handler({ messages }: { messages: Message[] }) {
<div className="border-8 border-[#8B4513] dark:border-[#5D2E1F] rounded-lg overflow-hidden">
<img
alt="Framed Image"
className="aspect-square object-cover w-full"
className="object-cover w-full aspect-square"
height="500"
src={value.url}
width="500"
Expand Down
1 change: 0 additions & 1 deletion examples/nuxt-openai/server/api/chat-with-functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export default defineLazyEventHandler(async () => {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

data.append({
Expand Down
1 change: 0 additions & 1 deletion examples/nuxt-openai/server/api/completion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export default defineLazyEventHandler(async () => {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

// Respond with the stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ export const POST = async (event: APIEvent) => {
onFinal() {
data.close();
},
experimental_streamData: true,
});

data.append({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export const POST = async (event: APIEvent) => {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

// Respond with the stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export async function POST({ request }) {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

data.append({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ export async function POST({ request }) {
onFinal() {
data.close();
},
experimental_streamData: true,
});

data.append({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export const POST = (async ({ request }) => {
onFinal(completion) {
data.close();
},
experimental_streamData: true,
});

// Respond with the stream
Expand Down
4 changes: 1 addition & 3 deletions packages/core/core/generate-text/stream-text.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,6 @@ Stream callbacks that will be called when the stream emits events.
// TODO add support for tool calls
return readableFromAsyncIterable(this.textStream)
.pipeThrough(createCallbacksTransformer(callbacks))
.pipeThrough(
createStreamDataTransformer(callbacks?.experimental_streamData),
);
.pipeThrough(createStreamDataTransformer());
}
}
3 changes: 0 additions & 3 deletions packages/core/react/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ const getStreamedResponse = async (
...chatRequest.options?.headers,
},
abortController: () => abortControllerRef.current,
appendMessage(message) {
mutate([...chatRequest.messages, message], false);
},
restoreMessagesOnFailure() {
mutate(previousMessages, false);
},
Expand Down
13 changes: 0 additions & 13 deletions packages/core/react/use-completion.ui.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
mockFetchDataStream,
mockFetchDataStreamWithGenerator,
mockFetchError,
mockFetchTextStream,
} from '../tests/utils/mock-fetch';
import { useCompletion } from './use-completion';

Expand Down Expand Up @@ -45,18 +44,6 @@ afterEach(() => {
cleanup();
});

it('should render normal streamed stream', async () => {
mockFetchTextStream({
url: 'https://example.com/api/completion',
chunks: ['Hello', ',', ' world', '.'],
});

await userEvent.type(screen.getByTestId('input'), 'hi{enter}');

await screen.findByTestId('completion');
expect(screen.getByTestId('completion')).toHaveTextContent('Hello, world.');
});

it('should render complex text stream', async () => {
mockFetchDataStream({
url: 'https://example.com/api/completion',
Expand Down
102 changes: 12 additions & 90 deletions packages/core/shared/call-chat-api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
import { parseComplexResponse } from './parse-complex-response';
import {
FunctionCall,
IdGenerator,
JSONValue,
Message,
ToolCall,
} from './types';
import { COMPLEX_HEADER, createChunkDecoder } from './utils';
import { IdGenerator, JSONValue, Message } from './types';

export async function callChatApi({
api,
Expand All @@ -15,7 +8,6 @@ export async function callChatApi({
credentials,
headers,
abortController,
appendMessage,
restoreMessagesOnFailure,
onResponse,
onUpdate,
Expand All @@ -29,7 +21,6 @@ export async function callChatApi({
headers?: HeadersInit;
abortController?: () => AbortController | null;
restoreMessagesOnFailure: () => void;
appendMessage: (message: Message) => void;
onResponse?: (response: Response) => void | Promise<void>;
onUpdate: (merged: Message[], data: JSONValue[] | undefined) => void;
onFinish?: (message: Message) => void;
Expand Down Expand Up @@ -72,86 +63,17 @@ export async function callChatApi({
}

const reader = response.body.getReader();
const isComplexMode = response.headers.get(COMPLEX_HEADER) === 'true';

if (isComplexMode) {
return await parseComplexResponse({
reader,
abortControllerRef:
abortController != null ? { current: abortController() } : undefined,
update: onUpdate,
onFinish(prefixMap) {
if (onFinish && prefixMap.text != null) {
onFinish(prefixMap.text);
}
},
generateId,
});
} else {
const createdAt = new Date();
const decode = createChunkDecoder(false);

// TODO-STREAMDATA: Remove this once Stream Data is not experimental
let streamedResponse = '';
const replyId = generateId();
let responseMessage: Message = {
id: replyId,
createdAt,
content: '',
role: 'assistant',
};

// TODO-STREAMDATA: Remove this once Stream Data is not experimental
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// Update the chat state with the new message tokens.
streamedResponse += decode(value);

if (streamedResponse.startsWith('{"function_call":')) {
// While the function call is streaming, it will be a string.
responseMessage['function_call'] = streamedResponse;
} else if (streamedResponse.startsWith('{"tool_calls":')) {
// While the tool calls are streaming, it will be a string.
responseMessage['tool_calls'] = streamedResponse;
} else {
responseMessage['content'] = streamedResponse;
return await parseComplexResponse({
reader,
abortControllerRef:
abortController != null ? { current: abortController() } : undefined,
update: onUpdate,
onFinish(prefixMap) {
if (onFinish && prefixMap.text != null) {
onFinish(prefixMap.text);
}

appendMessage({ ...responseMessage });

// The request has been aborted, stop reading the stream.
if (abortController?.() === null) {
reader.cancel();
break;
}
}

if (streamedResponse.startsWith('{"function_call":')) {
// Once the stream is complete, the function call is parsed into an object.
const parsedFunctionCall: FunctionCall =
JSON.parse(streamedResponse).function_call;

responseMessage['function_call'] = parsedFunctionCall;

appendMessage({ ...responseMessage });
}
if (streamedResponse.startsWith('{"tool_calls":')) {
// Once the stream is complete, the tool calls are parsed into an array.
const parsedToolCalls: ToolCall[] =
JSON.parse(streamedResponse).tool_calls;

responseMessage['tool_calls'] = parsedToolCalls;

appendMessage({ ...responseMessage });
}

if (onFinish) {
onFinish(responseMessage);
}

return responseMessage;
}
},
generateId,
});
}
Loading

0 comments on commit a6b2500

Please sign in to comment.