From 2ddb5efd5a31b04ee1142a8c29902e9f4d1eb62f Mon Sep 17 00:00:00 2001 From: Krzysztof Kasprowicz <60486987+Krzysztof318@users.noreply.github.com> Date: Wed, 3 Apr 2024 12:42:26 +0200 Subject: [PATCH] .Net: Specialized SSE parser as Utility (#5710) ### Motivation and Context Closes #5610 ### Description Specialized SSE parser implementation as internal utilities. Code is partially borrowed from Azure sdk. cc: @stephentoub @RogerBarreto @markwallace-microsoft ### Contribution Checklist - [x] The code builds clean without any errors or warnings - [x] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [x] All unit tests pass, and I have added new tests where possible - [x] I didn't break anyone :smile: --------- Co-authored-by: Roger Barreto <19890735+RogerBarreto@users.noreply.github.com> --- dotnet/SK-dotnet.sln | 4 + .../Core/Gemini/GeminiStreamResponseTests.cs | 4 + .../Core/StreamJsonParserTests.cs | 244 ------------------ .../Clients/GeminiChatCompletionClient.cs | 3 +- .../Core/StreamJsonParser.cs | 219 ---------------- .../src/InternalUtilities/src/Text/SseData.cs | 44 ++++ .../src/Text/SseJsonParser.cs | 71 +++++ .../src/InternalUtilities/src/Text/SseLine.cs | 93 +++++++ .../InternalUtilities/src/Text/SseReader.cs | 174 +++++++++++++ .../src/Text/StreamJsonParser.cs | 2 + .../Utilities/SseJsonParserTests.cs | 211 +++++++++++++++ 11 files changed, 605 insertions(+), 464 deletions(-) delete mode 100644 dotnet/src/Connectors/Connectors.Google.UnitTests/Core/StreamJsonParserTests.cs delete mode 100644 dotnet/src/Connectors/Connectors.Google/Core/StreamJsonParser.cs create mode 100644 dotnet/src/InternalUtilities/src/Text/SseData.cs create mode 100644 dotnet/src/InternalUtilities/src/Text/SseJsonParser.cs create mode 100644 dotnet/src/InternalUtilities/src/Text/SseLine.cs create mode 100644 dotnet/src/InternalUtilities/src/Text/SseReader.cs create mode 100644 dotnet/src/SemanticKernel.UnitTests/Utilities/SseJsonParserTests.cs diff --git a/dotnet/SK-dotnet.sln b/dotnet/SK-dotnet.sln index a5547914b6a0..6c5f23643dd5 100644 --- a/dotnet/SK-dotnet.sln +++ b/dotnet/SK-dotnet.sln @@ -217,6 +217,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Text", "Text", "{EB2C141A-A ProjectSection(SolutionItems) = preProject src\InternalUtilities\src\Text\JsonOptionsCache.cs = src\InternalUtilities\src\Text\JsonOptionsCache.cs src\InternalUtilities\src\Text\ReadOnlyMemoryConverter.cs = src\InternalUtilities\src\Text\ReadOnlyMemoryConverter.cs + src\InternalUtilities\src\Text\SseJsonParser.cs = src\InternalUtilities\src\Text\SseJsonParser.cs + src\InternalUtilities\src\Text\SseLine.cs = src\InternalUtilities\src\Text\SseLine.cs + src\InternalUtilities\src\Text\SseReader.cs = src\InternalUtilities\src\Text\SseReader.cs + src\InternalUtilities\src\Text\SseData.cs = src\InternalUtilities\src\Text\SseData.cs EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Linq", "Linq", "{607DD6FA-FA0D-45E6-80BA-22A373609E89}" diff --git a/dotnet/src/Connectors/Connectors.Google.UnitTests/Core/Gemini/GeminiStreamResponseTests.cs b/dotnet/src/Connectors/Connectors.Google.UnitTests/Core/Gemini/GeminiStreamResponseTests.cs index 6485084a1219..52310c29139a 100644 --- a/dotnet/src/Connectors/Connectors.Google.UnitTests/Core/Gemini/GeminiStreamResponseTests.cs +++ b/dotnet/src/Connectors/Connectors.Google.UnitTests/Core/Gemini/GeminiStreamResponseTests.cs @@ -6,10 +6,14 @@ using System.Text.Json; using System.Threading.Tasks; using Microsoft.SemanticKernel.Connectors.Google.Core; +using Microsoft.SemanticKernel.Text; using Xunit; namespace SemanticKernel.Connectors.Google.UnitTests.Core.Gemini; +#pragma warning disable CS0419 // Ambiguous StreamJsonParser reference in cref attribute (InternalUtilities) +#pragma warning disable CS1574 // XML comment has cref StreamJsonParser that could not be resolved (InternalUtilities) + /// /// Tests for parsing with . /// diff --git a/dotnet/src/Connectors/Connectors.Google.UnitTests/Core/StreamJsonParserTests.cs b/dotnet/src/Connectors/Connectors.Google.UnitTests/Core/StreamJsonParserTests.cs deleted file mode 100644 index 623f097d8873..000000000000 --- a/dotnet/src/Connectors/Connectors.Google.UnitTests/Core/StreamJsonParserTests.cs +++ /dev/null @@ -1,244 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System; -using System.IO; -using System.Linq; -using System.Text.Json; -using System.Threading.Tasks; -using Microsoft.SemanticKernel.Connectors.Google.Core; -using Xunit; - -namespace SemanticKernel.Connectors.Google.UnitTests.Core; - -public sealed class StreamJsonParserTests -{ - private const string SeeTestData = - """ - data: {"candidates": [{"content": {"parts": [{"text": "lorem ipsum"}],"role": "model"},"finishReason": "STOP","index": 0,"safetyRatings": [{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HATE_SPEECH","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HARASSMENT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_DANGEROUS_CONTENT","probability": "NEGLIGIBLE"}]}],"promptFeedback": {"safetyRatings": [{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HATE_SPEECH","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HARASSMENT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_DANGEROUS_CONTENT","probability": "NEGLIGIBLE"}]}} - - data: {"candidates": [{"content": {"parts": [{"text": "lorem ipsum"}],"role": "model"},"finishReason": "STOP","index": 0,"safetyRatings": [{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HATE_SPEECH","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HARASSMENT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_DANGEROUS_CONTENT","probability": "NEGLIGIBLE"}]}]} - - data: {"candidates": [{"content": {"parts": [{"text": " lorem ipsum"}],"role": "model"},"finishReason": "STOP","index": 0,"safetyRatings": [{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HATE_SPEECH","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HARASSMENT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_DANGEROUS_CONTENT","probability": "NEGLIGIBLE"}]}]} - - data: {"candidates": [{"finishReason": "SAFETY","index": 0,"safetyRatings": [{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT","probability": "HIGH"},{"category": "HARM_CATEGORY_HATE_SPEECH","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_HARASSMENT","probability": "NEGLIGIBLE"},{"category": "HARM_CATEGORY_DANGEROUS_CONTENT","probability": "NEGLIGIBLE"}]}]} - - """; - - [Fact] - public async Task ParseSseStreamReturnsEnumerableWithFourObjectsAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - WriteToStream(stream, SeeTestData); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Equal(4, result.Count); - } - - [Fact] - public async Task ParseSseStreamReturnsEnumerableWhereEachLineStartsAndEndsWithBracketAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - WriteToStream(stream, SeeTestData); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.All(result, json => Assert.StartsWith("{", json, StringComparison.Ordinal)); - Assert.All(result, json => Assert.EndsWith("}", json, StringComparison.Ordinal)); - } - - [Fact] - public async Task ParseWhenStreamStartsWithClosedBracketThrowsInvalidOperationAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = "}{}"; - WriteToStream(stream, input); - - // Act - // ReSharper disable once ReturnValueOfPureMethodIsNotUsed - async Task Act() => await parser.ParseAsync(stream).ToListAsync(); - - // Assert - await Assert.ThrowsAnyAsync(Act); - } - - [Fact] - public async Task ParseWhenStreamIsEmptyReturnsEmptyEnumerableAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Empty(result); - } - - [Fact] - public async Task ParseWhenStreamContainsOneObjectReturnsEnumerableWithOneObjectAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = """{"foo":"bar"}"""; - WriteToStream(stream, input); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Single(result, json => input.Equals(json, StringComparison.Ordinal)); - } - - [Fact] - public async Task ParseWhenStreamContainsArrayWithOnlyOneObjectReturnsEnumerableWithOneObjectAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = """{"foo":"bar"}"""; - WriteToStream(stream, $"[{input}]"); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Single(result, json => input.Equals(json, StringComparison.Ordinal)); - } - - [Fact] - public async Task ParseWhenStreamContainsArrayOfTwoObjectsReturnsEnumerableWithTwoObjectsAsync() - { - // Arrange - var parser = new StreamJsonParser(); - using var stream = new MemoryStream(); - string firstInput = """{"foo":"bar"}"""; - string secondInput = """{"foods":"base"}"""; - WriteToStream(stream, $"[{firstInput},{secondInput}]"); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Collection(result, - json => Assert.Equal(firstInput, json), - json => Assert.Equal(secondInput, json)); - } - - [Fact] - public async Task ParseWhenStreamContainsArrayOfTwoObjectsWithNestedObjectsReturnsEnumerableWithTwoObjectsAsync() - { - // Arrange - var parser = new StreamJsonParser(); - using var stream = new MemoryStream(); - string firstInput = """{"foo":"bar","nested":{"foo":"bar"}}"""; - string secondInput = """{"foods":"base","nested":{"foo":"bar"}}"""; - WriteToStream(stream, $"[{firstInput},{secondInput}]"); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Collection(result, - json => Assert.Equal(firstInput, json), - json => Assert.Equal(secondInput, json)); - } - - [Fact] - public async Task ParseWhenStreamContainsOneObjectReturnsEnumerableWithOneObjectWithEscapedQuotesAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = """{"foo":"be\"r"}"""; - WriteToStream(stream, input); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Single(result, json => input.Equals(json, StringComparison.Ordinal)); - } - - [Fact] - public async Task ParseWhenStreamContainsOneObjectReturnsEnumerableWithOneObjectWithEscapedBackslashAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = """{"foo":"be\\r"}"""; - WriteToStream(stream, input); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Single(result, json => input.Equals(json, StringComparison.Ordinal)); - } - - [Fact] - public async Task ParseWhenStreamContainsOneObjectReturnsEnumerableWithOneObjectWithEscapedBackslashAndQuotesAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = """{"foo":"be\\\"r"}"""; - WriteToStream(stream, input); - - // Act - var result = await parser.ParseAsync(stream).ToListAsync(); - - // Assert - Assert.Single(result, json => input.Equals(json, StringComparison.Ordinal)); - } - - [Fact] - public async Task ParseWithJsonValidationWhenStreamContainsInvalidJsonThrowsJsonExceptionAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = """{"foo":,"bar"}"""; - WriteToStream(stream, input); - - // Act - async Task Act() => await parser.ParseAsync(stream, validateJson: true).ToListAsync(); - - // Assert - await Assert.ThrowsAnyAsync(Act); - } - - [Fact] - public async Task ParseWithoutJsonValidationWhenStreamContainsInvalidJsonDoesntThrowAsync() - { - // Arrange - var parser = new StreamJsonParser(); - var stream = new MemoryStream(); - string input = """{"foo":,"bar"}"""; - WriteToStream(stream, input); - - // Act & Assert - await parser.ParseAsync(stream, validateJson: false).ToListAsync(); - // We don't need to use Assert here, because we are testing that the method doesn't throw - } - - private static void WriteToStream(Stream stream, string input) - { - using var writer = new StreamWriter(stream, leaveOpen: true); - writer.Write(input); - writer.Flush(); - stream.Position = 0; - } -} diff --git a/dotnet/src/Connectors/Connectors.Google/Core/Gemini/Clients/GeminiChatCompletionClient.cs b/dotnet/src/Connectors/Connectors.Google/Core/Gemini/Clients/GeminiChatCompletionClient.cs index 1112dbed878f..49ad460d1e81 100644 --- a/dotnet/src/Connectors/Connectors.Google/Core/Gemini/Clients/GeminiChatCompletionClient.cs +++ b/dotnet/src/Connectors/Connectors.Google/Core/Gemini/Clients/GeminiChatCompletionClient.cs @@ -12,6 +12,7 @@ using Microsoft.Extensions.Logging; using Microsoft.SemanticKernel.ChatCompletion; using Microsoft.SemanticKernel.Http; +using Microsoft.SemanticKernel.Text; namespace Microsoft.SemanticKernel.Connectors.Google.Core; @@ -501,7 +502,7 @@ private async IAsyncEnumerable ParseResponseStreamAsync( Stream responseStream, [EnumeratorCancellation] CancellationToken ct) { - await foreach (var json in this._streamJsonParser.ParseAsync(responseStream, ct: ct)) + await foreach (var json in this._streamJsonParser.ParseAsync(responseStream, cancellationToken: ct)) { yield return DeserializeResponse(json); } diff --git a/dotnet/src/Connectors/Connectors.Google/Core/StreamJsonParser.cs b/dotnet/src/Connectors/Connectors.Google/Core/StreamJsonParser.cs deleted file mode 100644 index b7bf35a139c4..000000000000 --- a/dotnet/src/Connectors/Connectors.Google/Core/StreamJsonParser.cs +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System; -using System.Collections.Generic; -using System.IO; -using System.Runtime.CompilerServices; -using System.Text; -using System.Text.Json.Nodes; -using System.Threading; -using System.Threading.Tasks; - -namespace Microsoft.SemanticKernel.Connectors.Google.Core; - -/// -/// Internal class for parsing a stream of text which contains a series of discrete JSON strings into en enumerable containing each separate JSON string. -/// -/// -/// This class is thread-safe. -/// -internal sealed class StreamJsonParser -{ - /// - /// Parses a Stream containing JSON data and yields the individual JSON objects. - /// - /// The Stream containing the JSON data. - /// Set to true to enable checking json chunks are well-formed. Default is false. - /// The cancellation token. - /// An enumerable collection of string representing the individual JSON objects. - /// Stream will be disposed after parsing. - public async IAsyncEnumerable ParseAsync( - Stream stream, - bool validateJson = false, - [EnumeratorCancellation] CancellationToken ct = default) - { - using var reader = new StreamReader(stream, Encoding.UTF8); - ChunkParser chunkParser = new(reader); - while (await chunkParser.ExtractNextChunkAsync(validateJson, ct).ConfigureAwait(false) is { } json) - { - yield return json; - } - } - - private sealed class ChunkParser - { - private readonly StringBuilder _jsonBuilder = new(); - private readonly StreamReader _reader; - - private int _bracketsCount; - private int _startBracketIndex = -1; - private bool _insideQuotes; - private bool _isEscaping; - private bool _isCompleteJson; - private char _currentCharacter; - private string? _lastLine; - - internal ChunkParser(StreamReader reader) - { - this._reader = reader; - } - - internal async Task ExtractNextChunkAsync( - bool validateJson, - CancellationToken ct) - { - this.ResetState(); - string? line; - while (!ct.IsCancellationRequested && ((line = await this._reader.ReadLineAsync().ConfigureAwait(false)) != null || this._lastLine != null)) - { - if (this._lastLine != null) - { - line = this._lastLine + line; - this._lastLine = null; - } - - if (this.ProcessLineUntilCompleteJson(line!)) - { - return this.GetJsonString(validateJson); - } - - this.AppendLine(line!); - } - - return null; - } - - private bool ProcessLineUntilCompleteJson(string line) - { - for (int i = 0; i < line!.Length; i++) - { - this._currentCharacter = line[i]; - - if (this.IsEscapedCharacterInsideQuotes()) - { - continue; - } - - this.DetermineIfQuoteStartOrEnd(); - this.HandleCurrentCharacterOutsideQuotes(i); - - if (this._isCompleteJson) - { - int nextIndex = i + 1; - if (nextIndex < line.Length) - { - this._lastLine = line.Substring(nextIndex); - this.AppendLine(line.Substring(0, nextIndex)); - } - else - { - this.AppendLine(line); - } - - return true; - } - - this.ResetEscapeFlag(); - } - - return false; - } - - private void ResetState() - { - this._jsonBuilder.Clear(); - this._bracketsCount = 0; - this._startBracketIndex = -1; - this._insideQuotes = false; - this._isEscaping = false; - this._isCompleteJson = false; - this._currentCharacter = default; - } - - private void AppendLine(string line) - { - switch (this._jsonBuilder) - { - case { Length: 0 } when this._startBracketIndex >= 0: - this._jsonBuilder.Append(line.Substring(this._startBracketIndex)); - break; - case { Length: > 0 }: - this._jsonBuilder.Append(line); - break; - } - } - - private string GetJsonString(bool validateJson) - { - if (!this._isCompleteJson) - { - throw new InvalidOperationException("Cannot get JSON string when JSON is not complete."); - } - - var json = this._jsonBuilder.ToString(); - if (validateJson) - { - _ = JsonNode.Parse(json); - } - - return json; - } - - private void MarkJsonAsComplete() - { - this._isCompleteJson = true; - } - - private void ResetEscapeFlag() => this._isEscaping = false; - - private void HandleCurrentCharacterOutsideQuotes(int index) - { - if (this._insideQuotes) - { - return; - } - - switch (this._currentCharacter) - { - case '{': - if (++this._bracketsCount == 1) - { - this._startBracketIndex = index; - } - - break; - case '}': - if (--this._bracketsCount < 0) - { - throw new InvalidOperationException("Invalid JSON in stream."); - } - - if (this._bracketsCount == 0) - { - this.MarkJsonAsComplete(); - } - - break; - } - } - - private void DetermineIfQuoteStartOrEnd() - { - if (this is { _currentCharacter: '\"', _isEscaping: false }) - { - this._insideQuotes = !this._insideQuotes; - } - } - - private bool IsEscapedCharacterInsideQuotes() - { - if (this is { _currentCharacter: '\\', _isEscaping: false, _insideQuotes: true }) - { - this._isEscaping = true; - return true; - } - - return false; - } - } -} diff --git a/dotnet/src/InternalUtilities/src/Text/SseData.cs b/dotnet/src/InternalUtilities/src/Text/SseData.cs new file mode 100644 index 000000000000..4b67f2d90eb0 --- /dev/null +++ b/dotnet/src/InternalUtilities/src/Text/SseData.cs @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; + +namespace Microsoft.SemanticKernel.Text; + +#pragma warning disable CA1812 // Avoid uninstantiated internal classes + +/// +/// Represents a single Server-Sent Events (SSE) data object. +/// +[ExcludeFromCodeCoverage] +internal sealed class SseData +{ + /// + /// The name of the sse event. + /// + public string? EventName { get; } + + /// + /// Represents the type of data parsed from SSE message. + /// + public Type DataType { get; } + + /// + /// Represents the data parsed from SSE message. + /// + public object Data { get; } + + /// + /// Represents a single Server-Sent Events (SSE) data object. + /// + /// The name of the sse event. + /// The data parsed from SSE message. + public SseData(string? eventName, object data) + { + Verify.NotNull(data); + + this.EventName = eventName; + this.DataType = data.GetType(); + this.Data = data; + } +} diff --git a/dotnet/src/InternalUtilities/src/Text/SseJsonParser.cs b/dotnet/src/InternalUtilities/src/Text/SseJsonParser.cs new file mode 100644 index 000000000000..626e5eeea784 --- /dev/null +++ b/dotnet/src/InternalUtilities/src/Text/SseJsonParser.cs @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace Microsoft.SemanticKernel.Text; + +/// +/// Internal class for parsing Server-Sent Events (SSE) data from a stream. +/// +/// +/// This is specialized parser for Server-Sent Events (SSE) data that is formatted as JSON.
+/// If you need to parse non-structured json streaming data, use instead.
+/// SSE specification
+/// This class is thread-safe. +///
+[ExcludeFromCodeCoverage] +internal static class SseJsonParser +{ + /// + /// Parses Server-Sent Events (SSE) data asynchronously from a stream. + /// + /// The stream containing the SSE data. + /// The function to parse each into an object. + /// A cancellation token to stop the parsing process. + /// will be disposed immediately once enumeration is complete. + /// An asynchronous enumerable sequence of objects. + public static async IAsyncEnumerable ParseAsync( + Stream stream, + Func parser, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + try + { + using SseReader sseReader = new(stream); + while (!cancellationToken.IsCancellationRequested) + { + SseLine? sseLine = await sseReader.ReadSingleDataEventAsync(cancellationToken).ConfigureAwait(false); + if (sseLine == null) + { + break; // end of stream + } + + ReadOnlyMemory value = sseLine.Value.FieldValue; + if (value.Span.SequenceEqual("[DONE]".AsSpan())) + { + break; + } + + var sseData = parser(sseLine.Value); + if (sseData != null) + { + yield return sseData; + } + } + } + finally + { + // Always dispose the stream immediately once enumeration is complete for any reason +#if NETCOREAPP3_0_OR_GREATER + await stream.DisposeAsync().ConfigureAwait(false); +#else + stream.Dispose(); +#endif + } + } +} diff --git a/dotnet/src/InternalUtilities/src/Text/SseLine.cs b/dotnet/src/InternalUtilities/src/Text/SseLine.cs new file mode 100644 index 000000000000..e1a2d47c2e64 --- /dev/null +++ b/dotnet/src/InternalUtilities/src/Text/SseLine.cs @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; + +namespace Microsoft.SemanticKernel.Text; + +/// +/// Represents a line of a Server-Sent Events (SSE) stream. +/// +/// +/// SSE specification +/// +[ExcludeFromCodeCoverage] +internal readonly struct SseLine : IEquatable +{ + private readonly string _original; + private readonly int _colonIndex; + private readonly int _valueIndex; + + /// + /// Represents an empty SSE line. + /// + /// + /// The property is a static instance of the struct. + /// + internal static SseLine Empty { get; } = new(string.Empty, 0, false, null); + + internal SseLine(string original, int colonIndex, bool hasSpaceAfterColon, string? lastEventName) + { + this._original = original; + this._colonIndex = colonIndex; + this._valueIndex = colonIndex >= 0 ? colonIndex + (hasSpaceAfterColon ? 2 : 1) : -1; + if (this._valueIndex >= this._original.Length) + { + this._valueIndex = -1; + } + + this.EventName = lastEventName; + } + + /// + /// The name of the last event for the Server-Sent Events (SSE) line. + /// + public string? EventName { get; } + + /// + /// Determines whether the SseLine is empty. + /// + public bool IsEmpty => this._original.Length == 0; + + /// + /// Gets a value indicating whether the value of the SseLine is empty. + /// + public bool IsValueEmpty => this._valueIndex < 0; + + /// + /// Determines whether the SseLine is comment line. + /// + public bool IsComment => !this.IsEmpty && this._original[0] == ':'; + + /// + /// Represents a field name in a Server-Sent Events (SSE) line. + /// + public ReadOnlyMemory FieldName => this._colonIndex >= 0 ? this._original.AsMemory(0, this._colonIndex) : this._original.AsMemory(); + + /// + /// Represents a field value in Server-Sent Events (SSE) format. + /// + public ReadOnlyMemory FieldValue => this._valueIndex >= 0 ? this._original.AsMemory(this._valueIndex) : string.Empty.AsMemory(); + + /// + public override string ToString() => this._original; + + /// + public bool Equals(SseLine other) => this._original.Equals(other._original, StringComparison.Ordinal); + + /// + public override bool Equals(object? obj) => obj is SseLine other && this.Equals(other); + + /// + public override int GetHashCode() => StringComparer.Ordinal.GetHashCode(this._original); + + /// + /// Defines the equality operator for comparing two instances of the SseLine class. + /// + public static bool operator ==(SseLine left, SseLine right) => left.Equals(right); + + /// + /// Represents the inequality operator for comparing two SseLine objects. + /// + public static bool operator !=(SseLine left, SseLine right) => !left.Equals(right); +} diff --git a/dotnet/src/InternalUtilities/src/Text/SseReader.cs b/dotnet/src/InternalUtilities/src/Text/SseReader.cs new file mode 100644 index 000000000000..c8506e597812 --- /dev/null +++ b/dotnet/src/InternalUtilities/src/Text/SseReader.cs @@ -0,0 +1,174 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.SemanticKernel.Text; + +/// +/// Provides a reader for Server-Sent Events (SSE) data. +/// +/// +/// SSE specification +/// +[ExcludeFromCodeCoverage] +internal sealed class SseReader : IDisposable +{ + private readonly Stream _stream; + private readonly StreamReader _reader; + private string? _lastEventName; + + public SseReader(Stream stream) + { + this._stream = stream; + this._reader = new StreamReader(stream); + } + + public SseLine? ReadSingleDataEvent() + { + while (this.ReadLine() is { } line) + { + if (line.IsEmpty) + { + this._lastEventName = null; + continue; + } + + if (line.IsComment) + { + continue; + } + + if (line.FieldName.Span.SequenceEqual("event".AsSpan())) + { + // Save the last event name + this._lastEventName = line.FieldValue.ToString(); + continue; + } + + if (!line.FieldName.Span.SequenceEqual("data".AsSpan())) + { + // Skip non-data fields + continue; + } + + if (!line.IsValueEmpty) + { + // Return data field + return line; + } + } + + return null; + } + + public async Task ReadSingleDataEventAsync(CancellationToken cancellationToken) + { + while (await this.ReadLineAsync(cancellationToken).ConfigureAwait(false) is { } line) + { + if (line.IsEmpty) + { + this._lastEventName = null; + continue; + } + + if (line.IsComment) + { + continue; + } + + if (line.FieldName.Span.SequenceEqual("event".AsSpan())) + { + // Save the last event name + this._lastEventName = line.FieldValue.ToString(); + continue; + } + + if (!line.FieldName.Span.SequenceEqual("data".AsSpan())) + { + // Skip non-data fields + continue; + } + + if (!line.IsValueEmpty) + { + // Return data field + return line; + } + } + + return null; + } + + private SseLine? ReadLine() + { + string? lineText = this._reader.ReadLine(); + if (lineText == null) + { + return null; + } + + if (lineText.Length == 0) + { + return SseLine.Empty; + } + + if (this.TryParseLine(lineText, out SseLine line)) + { + return line; + } + + return null; + } + + private async Task ReadLineAsync(CancellationToken cancellationToken) + { +#if NET7_0_OR_GREATER + string lineText = await this._reader.ReadLineAsync(cancellationToken).ConfigureAwait(false); +#else + string? lineText = await this._reader.ReadLineAsync().ConfigureAwait(false); +#endif + if (lineText == null) + { + return null; + } + + if (lineText.Length == 0) + { + return SseLine.Empty; + } + + if (this.TryParseLine(lineText, out SseLine line)) + { + return line; + } + + return null; + } + + private bool TryParseLine(string lineText, out SseLine line) + { + if (lineText.Length == 0) + { + line = default; + return false; + } + + ReadOnlySpan lineSpan = lineText.AsSpan(); + int colonIndex = lineSpan.IndexOf(':'); + ReadOnlySpan fieldValue = colonIndex >= 0 ? lineSpan.Slice(colonIndex + 1) : string.Empty.AsSpan(); + + bool hasSpace = fieldValue.Length > 0 && fieldValue[0] == ' '; + line = new SseLine(lineText, colonIndex, hasSpace, this._lastEventName); + return true; + } + + public void Dispose() + { + this._reader.Dispose(); + this._stream.Dispose(); + } +} diff --git a/dotnet/src/InternalUtilities/src/Text/StreamJsonParser.cs b/dotnet/src/InternalUtilities/src/Text/StreamJsonParser.cs index e3518b3e543d..0753cb059b47 100644 --- a/dotnet/src/InternalUtilities/src/Text/StreamJsonParser.cs +++ b/dotnet/src/InternalUtilities/src/Text/StreamJsonParser.cs @@ -19,6 +19,8 @@ namespace Microsoft.SemanticKernel.Text; /// Internal class for parsing a stream of text which contains a series of discrete JSON strings into en enumerable containing each separate JSON string. /// /// +/// This is universal parser for parsing stream of text which contains a series of discrete JSON.
+/// If you need a specialized SSE parser, use instead.
/// This class is thread-safe. ///
[ExcludeFromCodeCoverage] diff --git a/dotnet/src/SemanticKernel.UnitTests/Utilities/SseJsonParserTests.cs b/dotnet/src/SemanticKernel.UnitTests/Utilities/SseJsonParserTests.cs new file mode 100644 index 000000000000..4c96c887ca0b --- /dev/null +++ b/dotnet/src/SemanticKernel.UnitTests/Utilities/SseJsonParserTests.cs @@ -0,0 +1,211 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Microsoft.SemanticKernel.Text; +using Xunit; + +#pragma warning disable CA1812 // Avoid uninstantiated internal classes + +namespace SemanticKernel.UnitTests.Utilities; + +public sealed class SseJsonParserTests +{ + public const string SampleSseData1 = + """ + event: message_start + data: {"type": "message_start", "message": {"id": "msg_1nZdL29xx5MUA1yADyHTEsnR8uuvGzszyY", "type": "message", "role": "assistant", "content": [], "model": "claude-3-opus-20240229", "stop_reason": null, "stop_sequence": null, "usage": {"input_tokens": 25, "output_tokens": 1}}} + + event: content_block_start + data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}} + + event: ping + data: {"type": "ping"} + + event: content_block_delta + data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}} + + event: content_block_delta + data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "!"}} + + event: content_block_stop + data: {"type": "content_block_stop", "index": 0} + + event: message_delta + data: {"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence":null, "usage":{"output_tokens": 15}}} + + event: message_stop + data: {"type": "message_stop"} + + """; + + public const string SampleSseData2 = + """ + event: userconnect + data: {"username": "bobby", "time": "02:33:48"} + + event: usermessage + data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."} + + event: userdisconnect + data: {"username": "bobby", "time": "02:34:23"} + + event: usermessage + data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."} + """; + + public const string SampleSseData3 = + """ + event: userconnect + data: {"username": "bobby", "time": "02:33:48"} + + data: Here's a system message of some kind that will get used + data: to accomplish some task. + + event: usermessage + data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."} + """; + + public const string SampleSseData4 = + """ + event: userconnect + data: {"username": "bobby", "time": "02:33:48"} + + data: none + + event: usermessage + data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."} + + event: userdisconnect + data: {"username": "bobby", "time": "02:34:23"} + data: + data + id: 3 + + data: [DONE] + + event: usermessage + data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."} + + """; + + [Theory] + [InlineData(SampleSseData1)] + [InlineData(SampleSseData2)] + [InlineData(SampleSseData3)] + [InlineData(SampleSseData4)] + public async Task ItReturnsAnyDataAsync(string data) + { + // Arrange + using var stream = new MemoryStream(); + WriteToStream(stream, data); + + // Act + var result = await SseJsonParser.ParseAsync(stream, + line => new SseData(line.EventName, line.FieldValue)) + .ToListAsync(); + + // Assert + Assert.NotEmpty(result); + } + + [Fact] + public async Task ItReturnsValidEventNamesAsync() + { + // Arrange + using var stream = new MemoryStream(); + WriteToStream(stream, SampleSseData2); + + // Act + var result = await SseJsonParser.ParseAsync(stream, + line => new SseData(line.EventName, line.FieldValue)) + .ToListAsync(); + + // Assert + Assert.Collection(result, + item => Assert.Equal("userconnect", item.EventName), + item => Assert.Equal("usermessage", item.EventName), + item => Assert.Equal("userdisconnect", item.EventName), + item => Assert.Equal("usermessage", item.EventName)); + } + + [Fact] + public async Task ItReturnsAllParsedJsonsAsync() + { + // Arrange + using var stream = new MemoryStream(); + WriteToStream(stream, SampleSseData1); + + // Act + var result = await SseJsonParser.ParseAsync(stream, + line => + { + var obj = JsonSerializer.Deserialize(line.FieldValue.Span, JsonOptionsCache.ReadPermissive); + return new SseData(line.EventName, obj!); + }) + .ToListAsync(); + + // Assert + Assert.True(result.Count == 8); + } + + [Fact] + public async Task ItReturnsValidParsedDataAsync() + { + // Arrange + using var stream = new MemoryStream(); + WriteToStream(stream, SampleSseData3); + + // Act + var result = await SseJsonParser.ParseAsync(stream, + line => + { + if (line.EventName == null) + { + return null; + } + + var userObject = JsonSerializer.Deserialize(line.FieldValue.Span, JsonOptionsCache.ReadPermissive); + return new SseData(line.EventName, userObject!); + }) + .ToListAsync(); + + // Assert + Assert.Collection(result, + item => + { + Assert.Equal("userconnect", item.EventName); + var userObject = Assert.IsType(item.Data); + Assert.Equal("bobby", userObject.Username); + Assert.Equal(TimeSpan.Parse("02:33:48", formatProvider: new DateTimeFormatInfo()), userObject.Time); + Assert.Null(userObject.Text); + }, + item => + { + Assert.Equal("usermessage", item.EventName); + var userObject = Assert.IsType(item.Data); + Assert.Equal("bobby", userObject.Username); + Assert.Equal(TimeSpan.Parse("02:34:11", formatProvider: new DateTimeFormatInfo()), userObject.Time); + Assert.Equal("Hi everyone.", userObject.Text); + }); + } + + private static void WriteToStream(Stream stream, string input) + { + using var writer = new StreamWriter(stream, leaveOpen: true); + writer.Write(input); + writer.Flush(); + stream.Position = 0; + } + + private sealed class UserObject + { + public string? Username { get; set; } + public TimeSpan Time { get; set; } + public string? Text { get; set; } + } +}