From c7c2f4cee485ccaaa32d1961918f05cdb7904d76 Mon Sep 17 00:00:00 2001 From: Travis Wilson <35748617+trrwilson@users.noreply.github.com> Date: Thu, 8 Feb 2024 08:53:21 -0800 Subject: [PATCH] remove SseReader's EndOfStream use (#41844) --- ...cs => OpenAIInferenceModelFactoryTests.cs} | 0 .../Azure.AI.OpenAI/src/Helpers/SseReader.cs | 8 +-- .../tests/ChatCompletionsTests.cs | 52 +++++++++++++++++++ 3 files changed, 56 insertions(+), 4 deletions(-) rename sdk/openai/Azure.AI.OpenAI/{tests/OpenAIInferenceModelFactoryTests.cs.cs => OpenAIInferenceModelFactoryTests.cs} (100%) diff --git a/sdk/openai/Azure.AI.OpenAI/tests/OpenAIInferenceModelFactoryTests.cs.cs b/sdk/openai/Azure.AI.OpenAI/OpenAIInferenceModelFactoryTests.cs similarity index 100% rename from sdk/openai/Azure.AI.OpenAI/tests/OpenAIInferenceModelFactoryTests.cs.cs rename to sdk/openai/Azure.AI.OpenAI/OpenAIInferenceModelFactoryTests.cs diff --git a/sdk/openai/Azure.AI.OpenAI/src/Helpers/SseReader.cs b/sdk/openai/Azure.AI.OpenAI/src/Helpers/SseReader.cs index 80636253e3107..c823ca180e4a1 100644 --- a/sdk/openai/Azure.AI.OpenAI/src/Helpers/SseReader.cs +++ b/sdk/openai/Azure.AI.OpenAI/src/Helpers/SseReader.cs @@ -57,9 +57,9 @@ public SseReader(Stream stream) public SseLine? TryReadLine() { - if (_reader.EndOfStream) - return null; string lineText = _reader.ReadLine(); + if (lineText == null) + return null; if (lineText.Length == 0) return SseLine.Empty; if (TryParseLine(lineText, out SseLine line)) @@ -70,9 +70,9 @@ public SseReader(Stream stream) // TODO: we should support cancellation tokens, but StreamReader does not in NS2 public async Task TryReadLineAsync() { - if (_reader.EndOfStream) - return null; string lineText = await _reader.ReadLineAsync().ConfigureAwait(false); + if (lineText == null) + return null; if (lineText.Length == 0) return SseLine.Empty; if (TryParseLine(lineText, out SseLine line)) diff --git a/sdk/openai/Azure.AI.OpenAI/tests/ChatCompletionsTests.cs b/sdk/openai/Azure.AI.OpenAI/tests/ChatCompletionsTests.cs index d4f227cd40369..4e78bfd7e0139 100644 --- a/sdk/openai/Azure.AI.OpenAI/tests/ChatCompletionsTests.cs +++ b/sdk/openai/Azure.AI.OpenAI/tests/ChatCompletionsTests.cs @@ -157,5 +157,57 @@ StreamingResponse response Assert.IsTrue(gotResponseContentFilterResults); } } + + [RecordedTest] + [LiveOnly] // pending timed recording playback integration, this must be live + [TestCase(Service.NonAzure)] // Azure OpenAI's default RAI behavior introduces timing confounds + public async Task StreamingChatDoesNotBlockEnumerator(Service serviceTarget) + { + OpenAIClient client = GetTestClient(serviceTarget); + string deploymentOrModelName = GetDeploymentOrModelName(serviceTarget); + + var requestOptions = new ChatCompletionsOptions() + { + DeploymentName = deploymentOrModelName, + Messages = + { + new ChatRequestSystemMessage("You are a helpful assistant."), + new ChatRequestUserMessage("Can you help me?"), + new ChatRequestAssistantMessage("Of course! What do you need help with?"), + new ChatRequestUserMessage("What temperature should I bake pizza at?"), + }, + }; + + StreamingResponse response + = await client.GetChatCompletionsStreamingAsync(requestOptions); + Assert.That(response, Is.Not.Null); + + IAsyncEnumerable updateEnumerable = response.EnumerateValues(); + IAsyncEnumerator updateEnumerator = updateEnumerable.GetAsyncEnumerator(); + + int tasksAlreadyComplete = 0; + int tasksNotYetComplete = 0; + + while (true) + { + ValueTask hasNextTask = updateEnumerator.MoveNextAsync(); + if (hasNextTask.IsCompleted) + { + tasksAlreadyComplete++; + } + else + { + tasksNotYetComplete++; + } + if (!await hasNextTask) + { + break; + } + } + Assert.That( + tasksNotYetComplete, + Is.GreaterThan(tasksAlreadyComplete / 5), + "Live streaming is expected to encounter a significant proportion of not yet buffered reads"); + } } }