Skip to content

Commit

Permalink
remove SseReader's EndOfStream use (Azure#41844)
Browse files Browse the repository at this point in the history
  • Loading branch information
trrwilson authored Feb 8, 2024
1 parent d1a48b5 commit c7c2f4c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
8 changes: 4 additions & 4 deletions sdk/openai/Azure.AI.OpenAI/src/Helpers/SseReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public SseReader(Stream stream)

public SseLine? TryReadLine()
{
if (_reader.EndOfStream)
return null;
string lineText = _reader.ReadLine();
if (lineText == null)
return null;
if (lineText.Length == 0)
return SseLine.Empty;
if (TryParseLine(lineText, out SseLine line))
Expand All @@ -70,9 +70,9 @@ public SseReader(Stream stream)
// TODO: we should support cancellation tokens, but StreamReader does not in NS2
public async Task<SseLine?> TryReadLineAsync()
{
if (_reader.EndOfStream)
return null;
string lineText = await _reader.ReadLineAsync().ConfigureAwait(false);
if (lineText == null)
return null;
if (lineText.Length == 0)
return SseLine.Empty;
if (TryParseLine(lineText, out SseLine line))
Expand Down
52 changes: 52 additions & 0 deletions sdk/openai/Azure.AI.OpenAI/tests/ChatCompletionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,57 @@ StreamingResponse<StreamingChatCompletionsUpdate> response
Assert.IsTrue(gotResponseContentFilterResults);
}
}

[RecordedTest]
[LiveOnly] // pending timed recording playback integration, this must be live
[TestCase(Service.NonAzure)] // Azure OpenAI's default RAI behavior introduces timing confounds
public async Task StreamingChatDoesNotBlockEnumerator(Service serviceTarget)
{
OpenAIClient client = GetTestClient(serviceTarget);
string deploymentOrModelName = GetDeploymentOrModelName(serviceTarget);

var requestOptions = new ChatCompletionsOptions()
{
DeploymentName = deploymentOrModelName,
Messages =
{
new ChatRequestSystemMessage("You are a helpful assistant."),
new ChatRequestUserMessage("Can you help me?"),
new ChatRequestAssistantMessage("Of course! What do you need help with?"),
new ChatRequestUserMessage("What temperature should I bake pizza at?"),
},
};

StreamingResponse<StreamingChatCompletionsUpdate> response
= await client.GetChatCompletionsStreamingAsync(requestOptions);
Assert.That(response, Is.Not.Null);

IAsyncEnumerable<StreamingChatCompletionsUpdate> updateEnumerable = response.EnumerateValues();
IAsyncEnumerator<StreamingChatCompletionsUpdate> updateEnumerator = updateEnumerable.GetAsyncEnumerator();

int tasksAlreadyComplete = 0;
int tasksNotYetComplete = 0;

while (true)
{
ValueTask<bool> hasNextTask = updateEnumerator.MoveNextAsync();
if (hasNextTask.IsCompleted)
{
tasksAlreadyComplete++;
}
else
{
tasksNotYetComplete++;
}
if (!await hasNextTask)
{
break;
}
}
Assert.That(
tasksNotYetComplete,
Is.GreaterThan(tasksAlreadyComplete / 5),
"Live streaming is expected to encounter a significant proportion of not yet buffered reads");
}
}
}

0 comments on commit c7c2f4c

Please sign in to comment.