Skip to content

Commit

Permalink
[OpenAI] Fix streaming (Azure#28351)
Browse files Browse the repository at this point in the history
### Packages impacted by this PR
@azure/openai

### Issues associated with this PR
Azure#28342

### Describe the problem that is addressed by this PR
In `TransformStream`, the transform function shouldn't call
`controller.terminate()` unless it is expected that an error will be
thrown in the code that is consuming the writable stream. This PR
removes that call. Thanks @xirzec for the investigation!

UPDATE: Since the transformer stream is no longer ending, the idea of a
disposition operation is irrelevant. This is because the underlying
readable stream remains perpetually linked to the transformer stream,
making it impossible to cancel a stream that's locked. Now that I think
about it, the concept of disposition seems unnecessary in this case to
begin with. Once you're finished with the stream, there's no real need
for disposal so I deleted it. If a user wishes to cancel the stream,
they should use the standard method of calling the cancel API.

### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?
N/A

### Are there test cases added in this PR? _(If not, why?)_
We don't test on Bun but we have plenty chat completions tests.

### Provide a list of related PRs _(if any)_
N/A

### Command used to generate this PR:**_(Applicable only to SDK release
request PRs)_

### Checklists
- [x] Added impacted package name to the issue description
- [ ] Does this PR needs any fixes in the SDK Generator?** _(If so,
create an Issue in the
[Autorest/typescript](https://github.com/Azure/autorest.typescript)
repository and link it here)_
- [x] Added a changelog (if necessary)
  • Loading branch information
deyaaeldeen authored Jan 24, 2024
1 parent 5d008b0 commit 0d606f3
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 39 deletions.
1 change: 1 addition & 0 deletions sdk/openai/openai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

- Fix a bug where `toolChoice` field in the input options to chat completion methods wasn't defined correctly.
- Fix a bug where the service returns undefined `choices` in chat completion methods.
- Fix a bug in chat completion methods where the returned stream was causing an error in Bun.

### Other Changes

Expand Down
2 changes: 1 addition & 1 deletion sdk/openai/openai/review/openai.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ export interface EmbeddingsUsage {
}

// @public
export interface EventStream<T> extends ReadableStream<T>, AsyncIterable<T>, AsyncDisposable {
export interface EventStream<T> extends ReadableStream<T>, AsyncIterable<T> {
}

// @public
Expand Down
5 changes: 2 additions & 3 deletions sdk/openai/openai/sources/customizations/api/oaiSse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ export async function getOaiSSEs<TEvent, O extends Record<string, any>>(
const jsonParser = new TransformStream<EventMessage, TEvent>({
transform: async (chunk, controller) => {
if (chunk.data === "[DONE]") {
controller.terminate();
return eventStream[Symbol.asyncDispose]();
return;
}
controller.enqueue(
toEvent(
Expand All @@ -31,5 +30,5 @@ export async function getOaiSSEs<TEvent, O extends Record<string, any>>(
},
});
/** TODO: remove these polyfills once all supported runtimes support them */
return polyfillStream(eventStream.pipeThrough(jsonParser), () => eventStream.cancel());
return polyfillStream(eventStream.pipeThrough(jsonParser));
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

export function polyfillStream<T>(
stream: ReadableStream<T>,
dispose: () => PromiseLike<void>,
): ReadableStream<T> & AsyncIterable<T> & AsyncDisposable {
export function polyfillStream<T>(stream: ReadableStream<T>): ReadableStream<T> & AsyncIterable<T> {
makeAsyncIterable<T>(stream);
makeAsyncDisposable(stream, dispose);
return stream;
}

function makeAsyncDisposable<T>(
webStream: any,
dispose: () => PromiseLike<void>,
): asserts webStream is ReadableStream<T> & AsyncDisposable {
(Symbol.asyncDispose as any) ??= Symbol("Symbol.asyncDispose");
if (!webStream[Symbol.asyncDispose]) {
webStream[Symbol.asyncDispose] = () => dispose();
}
}

function makeAsyncIterable<T>(
webStream: any,
): asserts webStream is ReadableStream<T> & AsyncIterable<T> {
Expand Down
2 changes: 1 addition & 1 deletion sdk/openai/openai/sources/customizations/models/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -870,4 +870,4 @@ export interface ChatMessageImageUrl {
}

/** A readable stream that is iterable and disposable. */
export interface EventStream<T> extends ReadableStream<T>, AsyncIterable<T>, AsyncDisposable {}
export interface EventStream<T> extends ReadableStream<T>, AsyncIterable<T> {}
5 changes: 2 additions & 3 deletions sdk/openai/openai/src/api/oaiSse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ export async function getOaiSSEs<TEvent, O extends Record<string, any>>(
const jsonParser = new TransformStream<EventMessage, TEvent>({
transform: async (chunk, controller) => {
if (chunk.data === "[DONE]") {
controller.terminate();
return eventStream[Symbol.asyncDispose]();
return;
}
controller.enqueue(
toEvent(
Expand All @@ -39,5 +38,5 @@ export async function getOaiSSEs<TEvent, O extends Record<string, any>>(
},
});
/** TODO: remove these polyfills once all supported runtimes support them */
return polyfillStream(eventStream.pipeThrough(jsonParser), () => eventStream.cancel());
return polyfillStream(eventStream.pipeThrough(jsonParser));
}
16 changes: 1 addition & 15 deletions sdk/openai/openai/src/api/readableStreamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,11 @@
* If you need to make changes, please do so in the original source file, \{project-root\}/sources/custom
*/

export function polyfillStream<T>(
stream: ReadableStream<T>,
dispose: () => PromiseLike<void>,
): ReadableStream<T> & AsyncIterable<T> & AsyncDisposable {
export function polyfillStream<T>(stream: ReadableStream<T>): ReadableStream<T> & AsyncIterable<T> {
makeAsyncIterable<T>(stream);
makeAsyncDisposable(stream, dispose);
return stream;
}

function makeAsyncDisposable<T>(
webStream: any,
dispose: () => PromiseLike<void>,
): asserts webStream is ReadableStream<T> & AsyncDisposable {
(Symbol.asyncDispose as any) ??= Symbol("Symbol.asyncDispose");
if (!webStream[Symbol.asyncDispose]) {
webStream[Symbol.asyncDispose] = () => dispose();
}
}

function makeAsyncIterable<T>(
webStream: any,
): asserts webStream is ReadableStream<T> & AsyncIterable<T> {
Expand Down
2 changes: 1 addition & 1 deletion sdk/openai/openai/src/models/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1566,4 +1566,4 @@ export type OnYourDataVectorizationSource =
| OnYourDataModelIdVectorizationSource;

/** A readable stream that is iterable and disposable. */
export interface EventStream<T> extends ReadableStream<T>, AsyncIterable<T>, AsyncDisposable {}
export interface EventStream<T> extends ReadableStream<T>, AsyncIterable<T> {}

0 comments on commit 0d606f3

Please sign in to comment.