From 8ac16e2f5206fbaecb460ff0975eb9a3390025b7 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Mon, 13 May 2024 18:05:15 -0400 Subject: [PATCH] Add System.Formats.Sse --- .../System.Formats.Sse/System.Formats.Sse.sln | 48 + .../ref/System.Formats.Sse.cs | 33 + .../ref/System.Formats.Sse.csproj | 16 + .../System.Formats.Sse/src/PACKAGE.md | 50 + .../src/Resources/Strings.resx | 126 +++ .../src/System.Formats.Sse.csproj | 32 + .../src/System/Formats/Sse/SseEnumerable.cs | 546 +++++++++++ .../src/System/Formats/Sse/SseItem.cs | 25 + .../src/System/Formats/Sse/SseItemParser.cs | 12 + .../src/System/Formats/Sse/SseParser.cs | 49 + .../tests/SseParserTests.cs | 860 ++++++++++++++++++ .../tests/System.Formats.Sse.Tests.csproj | 19 + 12 files changed, 1816 insertions(+) create mode 100644 src/libraries/System.Formats.Sse/System.Formats.Sse.sln create mode 100644 src/libraries/System.Formats.Sse/ref/System.Formats.Sse.cs create mode 100644 src/libraries/System.Formats.Sse/ref/System.Formats.Sse.csproj create mode 100644 src/libraries/System.Formats.Sse/src/PACKAGE.md create mode 100644 src/libraries/System.Formats.Sse/src/Resources/Strings.resx create mode 100644 src/libraries/System.Formats.Sse/src/System.Formats.Sse.csproj create mode 100644 src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseEnumerable.cs create mode 100644 src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItem.cs create mode 100644 src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItemParser.cs create mode 100644 src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseParser.cs create mode 100644 src/libraries/System.Formats.Sse/tests/SseParserTests.cs create mode 100644 src/libraries/System.Formats.Sse/tests/System.Formats.Sse.Tests.csproj diff --git a/src/libraries/System.Formats.Sse/System.Formats.Sse.sln b/src/libraries/System.Formats.Sse/System.Formats.Sse.sln new file mode 100644 index 00000000000000..a87c6e91cf6b83 --- /dev/null +++ b/src/libraries/System.Formats.Sse/System.Formats.Sse.sln @@ -0,0 +1,48 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.11.34910.147 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Formats.Sse", "ref\System.Formats.Sse.csproj", "{ACB7E0BF-015F-43DC-A2F5-85506173B223}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Formats.Sse", "src\System.Formats.Sse.csproj", "{ACDB56AF-7B9F-4762-9764-D6FF09118D09}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Formats.Sse.Tests", "tests\System.Formats.Sse.Tests.csproj", "{804B5D44-05A3-491E-A6AB-35C592E6703E}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{2BD73108-47D7-40E6-BFCB-169E6AD42A81}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{E6AF8CEE-6550-4190-97D4-D51C5B114919}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{D908DCBE-EFA4-4CCA-9A1C-AEB48D59C504}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Release|Any CPU.Build.0 = Release|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Release|Any CPU.Build.0 = Release|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {ACB7E0BF-015F-43DC-A2F5-85506173B223} = {E6AF8CEE-6550-4190-97D4-D51C5B114919} + {ACDB56AF-7B9F-4762-9764-D6FF09118D09} = {D908DCBE-EFA4-4CCA-9A1C-AEB48D59C504} + {804B5D44-05A3-491E-A6AB-35C592E6703E} = {2BD73108-47D7-40E6-BFCB-169E6AD42A81} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {01DAF96B-AF8E-4576-A1BC-57D19BDB317E} + EndGlobalSection +EndGlobal diff --git a/src/libraries/System.Formats.Sse/ref/System.Formats.Sse.cs b/src/libraries/System.Formats.Sse/ref/System.Formats.Sse.cs new file mode 100644 index 00000000000000..cd5ac695da6dbd --- /dev/null +++ b/src/libraries/System.Formats.Sse/ref/System.Formats.Sse.cs @@ -0,0 +1,33 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// ------------------------------------------------------------------------------ +// Changes to this file must follow the https://aka.ms/api-review process. +// ------------------------------------------------------------------------------ + +namespace System.Formats.Sse +{ + public sealed partial class SseEnumerable : System.Collections.Generic.IAsyncEnumerable>, System.Collections.Generic.IEnumerable>, System.Collections.IEnumerable + { + internal SseEnumerable() { } + public string LastEventId { get { throw null; } } + public System.TimeSpan ReconnectionInterval { get { throw null; } } + public System.Collections.Generic.IAsyncEnumerator> GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Collections.Generic.IEnumerator> GetEnumerator() { throw null; } + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { throw null; } + } + public delegate T SseItemParser(string eventType, System.ReadOnlySpan data); + public readonly partial struct SseItem + { + private readonly T _Data_k__BackingField; + private readonly object _dummy; + private readonly int _dummyPrimitive; + public SseItem(string eventType, T data) { throw null; } + public T Data { get { throw null; } } + public string EventType { get { throw null; } } + } + public static partial class SseParser + { + public static System.Formats.Sse.SseEnumerable Parse(System.IO.Stream sseStream) { throw null; } + public static System.Formats.Sse.SseEnumerable Parse(System.IO.Stream sseStream, System.Formats.Sse.SseItemParser itemParser) { throw null; } + } +} diff --git a/src/libraries/System.Formats.Sse/ref/System.Formats.Sse.csproj b/src/libraries/System.Formats.Sse/ref/System.Formats.Sse.csproj new file mode 100644 index 00000000000000..a7bb6bf15c78d6 --- /dev/null +++ b/src/libraries/System.Formats.Sse/ref/System.Formats.Sse.csproj @@ -0,0 +1,16 @@ + + + + $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) + + + + + + + + + + + + diff --git a/src/libraries/System.Formats.Sse/src/PACKAGE.md b/src/libraries/System.Formats.Sse/src/PACKAGE.md new file mode 100644 index 00000000000000..f8a9f447658ccc --- /dev/null +++ b/src/libraries/System.Formats.Sse/src/PACKAGE.md @@ -0,0 +1,50 @@ +## About + + + +System.Formats.Sse introduces the `SseParser` type, which provides factory methods for creating enumerables of the events in a stream of server-sent events (SSE). + +## Key Features + + + +* Parser for server-sent events (SSE) + +## How to Use + + + +Asynchronously parsing event contents as strings + +```csharp +using HttpClient client = new(); +using Stream response = await client.GetStreamAsync("https://localhost:12345/sse"); +await foreach (SseItem item in SseParser.Parse(response)) +{ + Console.WriteLine(item.Data); +} +``` + +Synchronously parsing event contents as JSON + +```csharp +MemoryStream stream = new MemoryStream(data); +foreach (SseItem item in SseParser.Parse(response, (eventType, bytes) => JsonSerializer.Deserialize(bytes))) +{ + Console.WriteLine(item.Author); +} +``` + +## Main Types + + + +The main types provided by this library are: + +* `System.Formats.Sse.SseParser` + +## Feedback & Contributing + + + +System.Formats.Sse is released as open source under the [MIT license](https://licenses.nuget.org/MIT). Bug reports and contributions are welcome at [the GitHub repository](https://github.com/dotnet/runtime). diff --git a/src/libraries/System.Formats.Sse/src/Resources/Strings.resx b/src/libraries/System.Formats.Sse/src/Resources/Strings.resx new file mode 100644 index 00000000000000..fd5898f2830134 --- /dev/null +++ b/src/libraries/System.Formats.Sse/src/Resources/Strings.resx @@ -0,0 +1,126 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + text/microsoft-resx + + + 2.0 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + The enumerable may be enumerated only once. + + + The Stream implementation is invalid, returning a negative number from a read operation. + + \ No newline at end of file diff --git a/src/libraries/System.Formats.Sse/src/System.Formats.Sse.csproj b/src/libraries/System.Formats.Sse/src/System.Formats.Sse.csproj new file mode 100644 index 00000000000000..fff5ef5fff1750 --- /dev/null +++ b/src/libraries/System.Formats.Sse/src/System.Formats.Sse.csproj @@ -0,0 +1,32 @@ + + + + $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) + true + true + true + Provides a simple parser for server-sent events (SSE). + +Commonly Used Types: +System.Formats.Sse.SseParser + + + true + + + + + + + + + + + + + + + + diff --git a/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseEnumerable.cs b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseEnumerable.cs new file mode 100644 index 00000000000000..64752d25cfc4ed --- /dev/null +++ b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseEnumerable.cs @@ -0,0 +1,546 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Formats.Sse +{ + /// Provides an enumerable of server-sent events information. + /// Specifies the type of data parsed from an event. + public sealed class SseEnumerable : IAsyncEnumerable>, IEnumerable> + { + // For reference: + // Specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events + + /// Spec: "Initialize event's type attribute to "message"" + private const string DefaultEventType = "message"; + /// Carriage Return. + private const byte CR = (byte)'\r'; + /// Line Feed. + private const byte LF = (byte)'\n'; + /// Carriage Return Line Feed. + private static ReadOnlySpan CRLF => "\r\n"u8; + + /// The default size of an ArrayPool buffer to rent. + /// Larger size used by default to minimize number of reads. Smaller size used in debug to stress growth/shifting logic. + private const int DefaultArrayPoolRentSize = +#if DEBUG + 16; +#else + 1024; +#endif + + /// The stream to be parsed. + private readonly Stream _stream; + /// The parser delegate used to transform bytes into a . + private readonly SseItemParser _itemParser; + + /// Indicates whether the enumerable has already been used for enumeration. + private int _used; + + /// Buffer, either empty or rented, containing the data being read from the stream while looking for the next line. + private byte[] _lineBuffer = []; + /// The starting offset of valid data in . + private int _lineOffset; + /// The length of valid data in , starting from . + private int _lineLength; + /// The index in where a newline ('\r', '\n', or "\r\n") was found. + private int _newlineIndex; + /// The index in of characters already checked for newlines. + /// + /// This is to avoid O(LineLength^2) behavior in the rare case where we have long lines that are built-up over multiple reads. + /// We want to avoid re-checking the same characters we've already checked over and over again. + /// + private int _lastSearchedForNewline; + /// Set when eof has been reached in the stream. + private bool _eof; + + /// Rented buffer containing buffered data for the next event. + private byte[]? _dataBuffer; + /// The length of valid data in , starting from index 0. + private int _dataLength; + /// Whether data has been appended to . + /// This can be different than != 0 if empty data was appended. + private bool _dataAppended; + + /// The event type for the next event. + private string _eventType = DefaultEventType; + + /// Initialize the enumerable. + /// The stream to parse. + /// The function to use to parse payload bytes into a . + internal SseEnumerable(Stream stream, SseItemParser itemParser) + { + _stream = stream; + _itemParser = itemParser; + } + + /// Gets an enumerable of the server-sent events. + public IEnumerator> GetEnumerator() + { + // Validate that the enumerable is only used for one enumeration. + ThrowIfNotFirstEnumeration(); + + return IterateEvents(); + + IEnumerator> IterateEvents() + { + // Rent a line buffer. This will grow as needed. The line buffer is what's passed to the stream, + // so we want it to be large enough to reduce the number of reads we need to do when data is + // arriving quickly. (In debug, we use a smaller buffer to stress the growth and shifting logic.) + _lineBuffer = ArrayPool.Shared.Rent(DefaultArrayPoolRentSize); + try + { + // Spec: "Event streams in this format must always be encoded as UTF-8". + // Skip a UTF8 BOM if it exists at the beginning of the stream. (The BOM is defined as optional in the SSE grammar.) + while (FillLineBuffer() != 0 && _lineLength < Utf8Bom.Length) ; + SkipStartingBomIfPresent(); + + // Process all events in the stream. + while (true) + { + // See if there's a complete line in data already read from the stream. Lines are permitted to + // end with CR, LF, or CRLF. Look for all of them and if we find one, process the line. However, + // if we only find a CR and it's at the end of the read data, don't process it now, as we want + // to process it together with an LF that might immediately follow, rather than treating them + // as two separate characters, in which case we'd incorrectly process the CR as a line by itself. + + int searchOffset, searchLength; + if (_lastSearchedForNewline > _lineOffset) + { + searchOffset = _lastSearchedForNewline; + searchLength = _lineLength - (_lastSearchedForNewline - _lineOffset); + } + else + { + searchOffset = _lineOffset; + searchLength = _lineLength; + } + + _newlineIndex = _lineBuffer.AsSpan(searchOffset, searchLength).IndexOfAny(CR, LF); + if (_newlineIndex >= 0) + { + _lastSearchedForNewline = -1; + _newlineIndex += searchOffset; + if (_lineBuffer[_newlineIndex] is LF || // the newline is LF + _newlineIndex - _lineOffset + 1 < _lineLength || // we must have CR and we have whatever comes after it + _eof) // if we get here, we know we have a CR at the end of the buffer, so it's definitely the whole newline if we've hit EOF + { + // Process the line. + if (ProcessLine(out SseItem sseItem, out int advance)) + { + yield return sseItem; + } + + // Move past the line. + _lineOffset += advance; + _lineLength -= advance; + continue; + } + } + else + { + // Record the last position searched for a newline. The next time we search, + // we'll search from here rather than from _lineOffset, in order to avoid searching + // the same characters again. + _lastSearchedForNewline = _lineOffset + _lineLength; + } + + // We've processed everything in the buffer we currently can, so if we've already read EOF, we're done. + if (_eof) + { + // Spec: "Once the end of the file is reached, any pending data must be discarded. (If the file ends in the middle of an + // event, before the final empty line, the incomplete event is not dispatched.)" + break; + } + + // Read more data into the buffer. + FillLineBuffer(); + } + } + finally + { + ArrayPool.Shared.Return(_lineBuffer); + if (_dataBuffer is not null) + { + ArrayPool.Shared.Return(_dataBuffer); + } + } + } + } + + /// Gets an enumerable of the server-sent events. + public IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + // Validate that the enumerable is only used for one enumeration. + ThrowIfNotFirstEnumeration(); + + return IterateEvents(cancellationToken); + + async IAsyncEnumerator> IterateEvents(CancellationToken cancellationToken = default) + { + // Rent a line buffer. This will grow as needed. The line buffer is what's passed to the stream, + // so we want it to be large enough to reduce the number of reads we need to do when data is + // arriving quickly. (In debug, we use a smaller buffer to stress the growth and shifting logic.) + _lineBuffer = ArrayPool.Shared.Rent(DefaultArrayPoolRentSize); + try + { + // Spec: "Event streams in this format must always be encoded as UTF-8". + // Skip a UTF8 BOM if it exists at the beginning of the stream. (The BOM is defined as optional in the SSE grammar.) + while (await FillLineBufferAsync(cancellationToken).ConfigureAwait(false) != 0 && _lineLength < Utf8Bom.Length) ; + SkipStartingBomIfPresent(); + + // Process all events in the stream. + while (true) + { + // See if there's a complete line in data already read from the stream. Lines are permitted to + // end with CR, LF, or CRLF. Look for all of them and if we find one, process the line. However, + // if we only find a CR and it's at the end of the read data, don't process it now, as we want + // to process it together with an LF that might immediately follow, rather than treating them + // as two separate characters, in which case we'd incorrectly process the CR as a line by itself. + + int searchOffset, searchLength; + if (_lastSearchedForNewline > _lineOffset) + { + searchOffset = _lastSearchedForNewline; + searchLength = _lineLength - (_lastSearchedForNewline - _lineOffset); + } + else + { + searchOffset = _lineOffset; + searchLength = _lineLength; + } + + _newlineIndex = _lineBuffer.AsSpan(searchOffset, searchLength).IndexOfAny(CR, LF); + if (_newlineIndex >= 0) + { + _lastSearchedForNewline = -1; + _newlineIndex += searchOffset; + if (_lineBuffer[_newlineIndex] is LF || // the newline is LF + _newlineIndex - _lineOffset + 1 < _lineLength || // we must have CR and we have whatever comes after it + _eof) // if we get here, we know we have a CR at the end of the buffer, so it's definitely the whole newline if we've hit EOF + { + // Process the line. + if (ProcessLine(out SseItem sseItem, out int advance)) + { + yield return sseItem; + } + + // Move past the line. + _lineOffset += advance; + _lineLength -= advance; + continue; + } + } + else + { + // Record the last position searched for a newline. The next time we search, + // we'll search from here rather than from _lineOffset, in order to avoid searching + // the same characters again. + _lastSearchedForNewline = _lineOffset + _lineLength; + } + + // We've processed everything in the buffer we currently can, so if we've already read EOF, we're done. + if (_eof) + { + // Spec: "Once the end of the file is reached, any pending data must be discarded. (If the file ends in the middle of an + // event, before the final empty line, the incomplete event is not dispatched.)" + break; + } + + // Read more data into the buffer. + await FillLineBufferAsync(cancellationToken).ConfigureAwait(false); + } + } + finally + { + ArrayPool.Shared.Return(_lineBuffer); + if (_dataBuffer is not null) + { + ArrayPool.Shared.Return(_dataBuffer); + } + } + } + } + + private int GetNewLineLength() + { + Debug.Assert(_newlineIndex - _lineOffset < _lineLength, "Expected to be positioned at a non-empty newline"); + return _lineBuffer.AsSpan(_newlineIndex).StartsWith(CRLF) ? 2 : 1; + } + + /// + /// If there's no room remaining in the line buffer, either shifts the contents + /// left or grows the buffer in order to make room for the next read. + /// + private void ShiftOrGrowLineBufferIfNecessary() + { + // If data we've read is butting up against the end of the buffer and + // it's not taking up the entire buffer, slide what's there down to + // the beginning, making room to read more data into the buffer (since + // there's no newline in the data that's there). Otherwise, if the whole + // buffer is full, grow the buffer to accommodate more data, since, again, + // what's there doesn't contain a newline and thus a line is longer than + // the current buffer accomodates. + if (_lineOffset + _lineLength == _lineBuffer.Length) + { + if (_lineOffset != 0) + { + _lineBuffer.AsSpan(_lineOffset, _lineLength).CopyTo(_lineBuffer); + if (_lastSearchedForNewline >= 0) + { + _lastSearchedForNewline -= _lineOffset; + } + _lineOffset = 0; + } + else if (_lineLength == _lineBuffer.Length) + { + GrowBuffer(ref _lineBuffer, _lineBuffer.Length * 2); + } + } + } + + /// Processes a complete line from the SSE stream. + /// The parsed item if the method returns true. + /// How many characters to advance in the line buffer. + /// true if an SSE item was successfully parsed; otherwise, false. + private bool ProcessLine(out SseItem sseItem, out int advance) + { + ReadOnlySpan line = _lineBuffer.AsSpan(_lineOffset, _newlineIndex - _lineOffset); + + // Spec: "If the line is empty (a blank line) Dispatch the event" + if (line.IsEmpty) + { + advance = GetNewLineLength(); + + if (_dataAppended) + { + sseItem = new SseItem(_eventType, _itemParser(_eventType, _dataBuffer.AsSpan(0, _dataLength))); + _eventType = DefaultEventType; + _dataLength = 0; + _dataAppended = false; + return true; + } + + sseItem = default; + return false; + } + + // Find the colon separating the field name and value. + int colonPos = line.IndexOf((byte)':'); + ReadOnlySpan fieldName; + ReadOnlySpan fieldValue; + if (colonPos >= 0) + { + // Spec: "Collect the characters on the line before the first U+003A COLON character (:), and let field be that string." + fieldName = line.Slice(0, colonPos); + + // Spec: "Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. + // If value starts with a U+0020 SPACE character, remove it from value." + fieldValue = line.Slice(colonPos + 1); + if (!fieldValue.IsEmpty && fieldValue[0] == (byte)' ') + { + fieldValue = fieldValue.Slice(1); + } + } + else + { + // Spec: "using the whole line as the field name, and the empty string as the field value." + fieldName = line; + fieldValue = []; + } + + if (fieldName.SequenceEqual("data"u8)) + { + // Spec: "Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer." + + // If there's nothing currently in the data buffer and we can easily detect that this line is immediately followed by + // an empty line, we can optimize it to just handle the data directly from the line buffer, rather than first copying + // into the data buffer and dispatching from there. + if (!_dataAppended) + { + int newlineLength = GetNewLineLength(); + ReadOnlySpan remainder = _lineBuffer.AsSpan(_newlineIndex + newlineLength, _lineLength - line.Length - newlineLength); + if (!remainder.IsEmpty && + (remainder[0] is LF || (remainder[0] is CR && remainder.Length > 1))) + { + advance = line.Length + newlineLength + (remainder.StartsWith(CRLF) ? 2 : 1); + sseItem = new SseItem(_eventType, _itemParser(_eventType, fieldValue)); + _eventType = DefaultEventType; + return true; + } + } + + // We need to copy the data from the data buffer to the line buffer. Make sure there's enough room. + if (_dataBuffer is null || _dataLength + _lineLength + 1 > _dataBuffer.Length) + { + GrowBuffer(ref _dataBuffer, _dataLength + _lineLength + 1); + } + + // Append a newline if there's already content in the buffer. + // Then copy the field value to the data buffer + if (_dataAppended) + { + _dataBuffer[_dataLength++] = LF; + } + fieldValue.CopyTo(_dataBuffer.AsSpan(_dataLength)); + _dataLength += fieldValue.Length; + _dataAppended = true; + } + else if (fieldName.SequenceEqual("event"u8)) + { + // Spec: "Set the event type buffer to field value." + _eventType = SseParser.Utf8GetString(fieldValue); + } + else if (fieldName.SequenceEqual("id"u8)) + { + // Spec: "If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value. Otherwise, ignore the field." + if (fieldValue.IndexOf((byte)'\0') < 0) + { + // Note that fieldValue might be empty, in which case LastEventId will naturally be reset to the empty string. This is per spec. + LastEventId = SseParser.Utf8GetString(fieldValue); + } + } + else if (fieldName.SequenceEqual("retry"u8)) + { + // Spec: "If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten, + // and set the event stream's reconnection time to that integer. Otherwise, ignore the field." + if (long.TryParse( +#if NET + fieldValue, +#else + SseParser.Utf8GetString(fieldValue), +#endif + NumberStyles.None, CultureInfo.InvariantCulture, out long milliseconds)) + { + ReconnectionInterval = TimeSpan.FromMilliseconds(milliseconds); + } + } + else + { + // We'll end up here if the line starts with a colon, producing an empty field name, or if the field name is otherwise unrecognized. + // Spec: "If the line starts with a U+003A COLON character (:) Ignore the line." + // Spec: "Otherwise, The field is ignored" + } + + advance = line.Length + GetNewLineLength(); + sseItem = default; + return false; + } + + /// Gets the last event ID. + /// This value is updated any time a new last event ID is parsed. It is not reset between SSE items. + public string LastEventId { get; private set; } = string.Empty; // Spec: "must be initialized to the empty string" + + /// Gets the reconnection interval. + /// + /// If no retry event was received, this defaults to , and it will only + /// ever be in that situation. If a client wishes to retry, the server-sent + /// events specification states that the interval may then be decided by the client implementation and should be a + /// few seconds. + /// + public TimeSpan ReconnectionInterval { get; private set; } = Timeout.InfiniteTimeSpan; + + /// Transitions the object to a used state, throwing if it's already been used. + private void ThrowIfNotFirstEnumeration() + { + if (Interlocked.Exchange(ref _used, 1) != 0) + { + throw new InvalidOperationException(SR.InvalidOperation_EnumerateOnlyOnce); + } + } + + /// + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// Reads data from the stream into the line buffer. + private int FillLineBuffer() + { + ShiftOrGrowLineBufferIfNecessary(); + + int offset = _lineOffset + _lineLength; + int bytesRead = _stream.Read( +#if NET + _lineBuffer.AsSpan(offset)); +#else + _lineBuffer, offset, _lineBuffer.Length - offset); +#endif + + if (bytesRead <= 0) + { + if (bytesRead < 0) + { + throw new InvalidOperationException(SR.InvalidOperation_InvalidStreamNegativeBytes); + } + + _eof = true; + } + + _lineLength += bytesRead; + + return bytesRead; + } + + /// Reads data asynchronously from the stream into the line buffer. + private async ValueTask FillLineBufferAsync(CancellationToken cancellationToken) + { + ShiftOrGrowLineBufferIfNecessary(); + + int offset = _lineOffset + _lineLength; + int bytesRead = await +#if NET + _stream.ReadAsync(_lineBuffer.AsMemory(offset), cancellationToken) +#else + new ValueTask(_stream.ReadAsync(_lineBuffer, offset, _lineBuffer.Length - offset, cancellationToken)) +#endif + .ConfigureAwait(false); + + if (bytesRead <= 0) + { + if (bytesRead < 0) + { + throw new InvalidOperationException(SR.InvalidOperation_InvalidStreamNegativeBytes); + } + + _eof = true; + } + + _lineLength += bytesRead; + + return bytesRead; + } + + /// Called at the beginning of processing to skip over an optional UTF8 byte order mark. + private void SkipStartingBomIfPresent() + { + if (_lineLength >= 3 && _lineBuffer.AsSpan().StartsWith(Utf8Bom)) + { + _lineOffset += 3; + _lineLength -= 3; + } + } + + /// Gets the UTF8 BOM. + private static ReadOnlySpan Utf8Bom => [0xEF, 0xBB, 0xBF]; + + /// Grows the buffer, returning the existing one to the ArrayPool and renting an ArrayPool replacement. + private static void GrowBuffer([NotNull] ref byte[]? buffer, int minimumLength) + { + byte[]? toReturn = buffer; + buffer = ArrayPool.Shared.Rent(Math.Max(minimumLength, DefaultArrayPoolRentSize)); + if (toReturn is not null) + { + Array.Copy(toReturn, buffer, toReturn.Length); + ArrayPool.Shared.Return(toReturn); + } + } + } +} diff --git a/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItem.cs b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItem.cs new file mode 100644 index 00000000000000..f7746acc96f6bc --- /dev/null +++ b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItem.cs @@ -0,0 +1,25 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Formats.Sse +{ + /// Represents a server-sent event. + /// Specifies the type of data payload in the event. + public readonly struct SseItem + { + /// Initializes the server-sent event. + /// The event's type. + /// The event's payload. + public SseItem(string eventType, T data) + { + EventType = eventType; + Data = data; + } + + /// Gets the event's type. + public string EventType { get; } + + /// Gets the event's payload. + public T Data { get; } + } +} diff --git a/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItemParser.cs b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItemParser.cs new file mode 100644 index 00000000000000..0b9345619543fc --- /dev/null +++ b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseItemParser.cs @@ -0,0 +1,12 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Formats.Sse +{ + /// Encapsulates a method for parsing the bytes payload of a server-sent event. + /// Specifies the type of the return value of the parser. + /// The event's type. + /// The event's payload bytes. + /// The parsed . + public delegate T SseItemParser(string eventType, ReadOnlySpan data); +} diff --git a/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseParser.cs b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseParser.cs new file mode 100644 index 00000000000000..809baafc5a3440 --- /dev/null +++ b/src/libraries/System.Formats.Sse/src/System/Formats/Sse/SseParser.cs @@ -0,0 +1,49 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO; +using System.Text; + +namespace System.Formats.Sse +{ + /// Provides a parser for parsing server-sent events. + public static class SseParser + { + /// Parses a of server-sent events into a sequence of values. + /// The stream containing the data to parse. + /// + /// The enumerable of strings, which may be enumerated synchronously or asynchronously. The strings + /// are decoded from the UTF8-encoded bytes of the payload of each event. + /// + /// is null. + public static SseEnumerable Parse(Stream sseStream) => + Parse(sseStream, static (_, bytes) => Utf8GetString(bytes)); + + /// Parses a of server-sent events into a sequence of values. + /// Specifies the type of data in each event. + /// The stream containing the data to parse. + /// The parser to use to transform each payload of bytes into a data element. + /// The enumerable, which may be enumerated synchronously or asynchronously. + /// is null. + /// is null. + public static SseEnumerable Parse(Stream sseStream, SseItemParser itemParser) => + new SseEnumerable( + sseStream ?? throw new ArgumentNullException(nameof(sseStream)), + itemParser ?? throw new ArgumentNullException(nameof(itemParser))); + + /// Encoding.UTF8.GetString(bytes) + internal static unsafe string Utf8GetString(ReadOnlySpan bytes) + { +#if NET + return Encoding.UTF8.GetString(bytes); +#else + fixed (byte* ptr = bytes) + { + return ptr is null ? + string.Empty : + Encoding.UTF8.GetString(ptr, bytes.Length); + } +#endif + } + } +} diff --git a/src/libraries/System.Formats.Sse/tests/SseParserTests.cs b/src/libraries/System.Formats.Sse/tests/SseParserTests.cs new file mode 100644 index 00000000000000..6257e0b154ffbc --- /dev/null +++ b/src/libraries/System.Formats.Sse/tests/SseParserTests.cs @@ -0,0 +1,860 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Formats.Sse.Tests +{ + public partial class SseParserTests + { + [Fact] + public void Parse_InvalidArguments_Throws() + { + AssertExtensions.Throws("sseStream", () => SseParser.Parse(null)); + AssertExtensions.Throws("sseStream", () => SseParser.Parse(null, delegate { return ""; })); + AssertExtensions.Throws("itemParser", () => SseParser.Parse(Stream.Null, null)); + } + + [Fact] + public void Parse_Sync_SupportsOnlyOneEnumeration_Throws() + { + SseEnumerable e = SseParser.Parse(Stream.Null); + e.GetEnumerator(); + Assert.Throws(() => e.GetEnumerator()); + Assert.Throws(() => e.GetAsyncEnumerator()); + } + + [Fact] + public void Parse_Async_SupportsOnlyOneEnumeration_Throws() + { + SseEnumerable e = SseParser.Parse(Stream.Null); + e.GetAsyncEnumerator(); + Assert.Throws(() => e.GetAsyncEnumerator()); + Assert.Throws(() => e.GetEnumerator()); + } + + [Fact] + public void SseItem_Roundtrips() + { + SseItem item; + + item = new SseItem(); + Assert.Null(item.EventType); + Assert.Null(item.Data); + + item = new SseItem("eventType", "some data"); + Assert.Equal("eventType", item.EventType); + Assert.Equal("some data", item.Data); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_Empty_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream("", trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_BlankLine_NoItems(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream(newline, trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_TwoBlankLines_NoItems(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream(newline + newline, trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_MultipleBlankLinesBetweenItems_AllItemsProduces(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"event:A{newline}" + + $"data:1{newline}" + + $"id:2{newline}" + + $"retry:300{newline}" + + $"{newline}{newline}{newline}{newline}{newline}" + + + $"event:B{newline}" + + $"data:4{newline}" + + $"id:5{newline}" + + $"retry:600{newline}" + + $"{newline}{newline}{newline}{newline}{newline}" + + + $"event:C{newline}" + + $"data:7{newline}" + + $"id:8{newline}" + + $"retry:900{newline}" + + $"{newline}{newline}{newline}{newline}{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("A", "1"), items[0]); + AssertSseItemEqual(new SseItem("B", "4"), items[1]); + AssertSseItemEqual(new SseItem("C", "7"), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example1(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data: This is the first message.{newline}" + + $"{newline}" + + $"data: This is the second message, it{newline}" + + $"data: has two lines.{newline}" + + $"{newline}" + + $"data: This is the third message.{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("message", "This is the first message."), items[0]); + AssertSseItemEqual(new SseItem("message", "This is the second message, it\nhas two lines."), items[1]); + AssertSseItemEqual(new SseItem("message", "This is the third message."), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example2(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"event: add{newline}data: 73857293{newline}" + + $"{newline}" + + $"event: remove{newline}data: 2153{newline}" + + $"{newline}" + + $"event: add{newline}data: 113411{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("add", "73857293"), items[0]); + AssertSseItemEqual(new SseItem("remove", "2153"), items[1]); + AssertSseItemEqual(new SseItem("add", "113411"), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example3(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data: YHOO{newline}" + + $"data: +2{newline}" + + $"data: 10{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(1, items.Count); + AssertSseItemEqual(new SseItem("message", "YHOO\n+2\n10"), items[0]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example4(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $": test stream{newline}" + + $"{newline}" + + $"data: first event{newline}" + + $"id: 1{newline}" + + $"{newline}" + + $"data:second event{newline}" + + $"id{newline}" + + $"{newline}" + + $"data: third event{newline}" + + $"{newline}", + trickle); + + SseEnumerable items = SseParser.Parse(stream); + if (useAsync) + { + Assert.Equal(string.Empty, items.LastEventId); + + using IEnumerator> e = items.GetEnumerator(); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "first event"), e.Current); + Assert.Equal("1", items.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "second event"), e.Current); + Assert.Equal(string.Empty, items.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", " third event"), e.Current); + Assert.Equal(string.Empty, items.LastEventId); + } + else + { + Assert.Equal(string.Empty, items.LastEventId); + + await using IAsyncEnumerator> e = items.GetAsyncEnumerator(); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "first event"), e.Current); + Assert.Equal("1", items.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "second event"), e.Current); + Assert.Equal(string.Empty, items.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", " third event"), e.Current); + Assert.Equal(string.Empty, items.LastEventId); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $": test stream{newline}" + + $"{newline}" + + $"data: first event{newline}" + + $"id: 1{newline}" + + $"{newline}" + + $"data:second event{newline}" + + $"{newline}" + + $"data: third event{newline}" + + $"id: 42{newline}" + + $"{newline}", + trickle); + + SseEnumerable items = SseParser.Parse(stream); + if (useAsync) + { + Assert.Equal(string.Empty, items.LastEventId); + + using IEnumerator> e = items.GetEnumerator(); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "first event"), e.Current); + Assert.Equal("1", items.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "second event"), e.Current); + Assert.Equal("1", items.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", " third event"), e.Current); + Assert.Equal("42", items.LastEventId); + } + else + { + Assert.Equal(string.Empty, items.LastEventId); + + await using IAsyncEnumerator> e = items.GetAsyncEnumerator(); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "first event"), e.Current); + Assert.Equal("1", items.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "second event"), e.Current); + Assert.Equal("1", items.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", " third event"), e.Current); + Assert.Equal("42", items.LastEventId); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example5(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data{newline}" + + $"{newline}" + + $"data{newline}" + + $"data{newline}" + + $"{newline}" + + $"data:", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("message", ""), items[0]); + AssertSseItemEqual(new SseItem("message", "\n"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example5_WithColon(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data{newline}" + + $"{newline}" + + $"data:{newline}" + + $"data{newline}" + + $"{newline}" + + $"data:", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("message", ""), items[0]); + AssertSseItemEqual(new SseItem("message", "\n"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example6(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data:test{newline}" + + $"{newline}" + + $"data: test{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("message", "test"), items[0]); + AssertSseItemEqual(new SseItem("message", "test"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Delegate_EventTypeArgument_MatchesValueFromEvent(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"event: add{newline}data: 73857293{newline}" + + $"{newline}" + + $"event: remove{newline}data: 2153{newline}" + + $"{newline}" + + $"event: add{newline}data: 113411{newline}" + + $"{newline}", + trickle); + + SseItemParser itemParser = (eventType, bytes) => eventType; + + List> items = useAsync ? + await ReadAllEventsAsync(stream, itemParser) : + ReadAllEvents(stream, itemParser); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("add", "add"), items[0]); + AssertSseItemEqual(new SseItem("remove", "remove"), items[1]); + AssertSseItemEqual(new SseItem("add", "add"), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Retry_SetsReconnectionInterval(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $": test stream{newline}" + + $"{newline}" + + $"data: first event{newline}" + + $"{newline}" + + $"data:second event{newline}" + + $"retry: 42{newline}" + + $"{newline}" + + $"data: third event{newline}" + + $"retry: 12345678910{newline}" + + $"{newline}" + + $"data:fourth event{newline}" + + $"{newline}" + + $"data:fifth event{newline}" + + $"retry: invalidmilliseconds{newline}" + + $"{newline}", + trickle); + + SseEnumerable items = SseParser.Parse(stream); + Assert.Equal(Timeout.InfiniteTimeSpan, items.ReconnectionInterval); + + if (useAsync) + { + Assert.Equal(string.Empty, items.LastEventId); + + using IEnumerator> e = items.GetEnumerator(); + Assert.Equal(Timeout.InfiniteTimeSpan, items.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "first event"), e.Current); + Assert.Equal(Timeout.InfiniteTimeSpan, items.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "second event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(42), items.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", " third event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), items.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "fourth event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), items.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "fifth event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), items.ReconnectionInterval); + } + else + { + Assert.Equal(string.Empty, items.LastEventId); + + await using IAsyncEnumerator> e = items.GetAsyncEnumerator(); + Assert.Equal(Timeout.InfiniteTimeSpan, items.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "first event"), e.Current); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "second event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(42), items.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", " third event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), items.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "fourth event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), items.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("message", "fifth event"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), items.ReconnectionInterval); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task JsonContent_DelegateInvoked(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data: {{{newline}" + + $"data: \"title\": \"The Catcher in the Rye\",{newline}" + + $"data: \"author\": \"J.D. Salinger\",{newline}" + + $"data: \"published_year\": 1951,{newline}" + + $"data: \"genre\": \"Fiction\"{newline}" + + $"data: }}{newline}" + + $"{newline}" + + $"data: {{{newline}" + + $"data: \"title\": \"1984\",{newline}" + + $"data: \"author\": \"George Orwell\",{newline}" + + $"data: \"published_year\": 1949,{newline}" + + $"data: \"genre\": \"Fiction\"{newline}" + + $"data: }}{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream, static (eventType, data) => JsonSerializer.Deserialize(data)) : + ReadAllEvents(stream, static (eventType, data) => JsonSerializer.Deserialize(data)); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("message", new Book { title = "The Catcher in the Rye", author = "J.D. Salinger", published_year = 1951, genre = "Fiction" }), items[0]); + AssertSseItemEqual(new SseItem("message", new Book { title = "1984", author = "George Orwell", published_year = 1949, genre = "Fiction" }), items[1]); + } + + private struct Book + { + public string title { get; set; } + public string author { get; set; } + public int published_year { get; set; } + public string genre { get; set; } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Valid_Skipped(string newline, bool trickle, bool useAsync) + { + byte[] newlineBytes = Encoding.UTF8.GetBytes(newline); + using Stream stream = GetStream( + [ + 0xEF, 0xBB, 0xBF, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'h', (byte)'i', + ..newlineBytes, + ..newlineBytes, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'t', (byte)'h', (byte)'e', (byte)'r', (byte)'e', + ..newlineBytes, + ..newlineBytes, + ], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("message", "hi"), items[0]); + AssertSseItemEqual(new SseItem("message", "there"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Partial1_LineSkipped(string newline, bool trickle, bool useAsync) + { + byte[] newlineBytes = Encoding.UTF8.GetBytes(newline); + using Stream stream = GetStream( + [ + 0xEF, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'h', (byte)'i', + ..newlineBytes, + ..newlineBytes, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'t', (byte)'h', (byte)'e', (byte)'r', (byte)'e', + ..newlineBytes, + ..newlineBytes, + ], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(1, items.Count); + AssertSseItemEqual(new SseItem("message", "there"), items[0]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Partial2_LineSkipped(string newline, bool trickle, bool useAsync) + { + byte[] newlineBytes = Encoding.UTF8.GetBytes(newline); + using Stream stream = GetStream( + [ + 0xEF, 0xBB, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'h', (byte)'i', + ..newlineBytes, + ..newlineBytes, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'t', (byte)'h', (byte)'e', (byte)'r', (byte)'e', + ..newlineBytes, + ..newlineBytes, + ], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(1, items.Count); + AssertSseItemEqual(new SseItem("message", "there"), items[0]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Interrupted1_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream([0xEF], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Interrupted2_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream([0xEF, 0xBB], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Interrupted3_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream([0xEF, 0xBB, 0xBF], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task LongLines_ItemsProducedCorrectly(string newline, bool trickle, bool useAsync) + { + string[] expected = Enumerable.Range(1, 100).Select(i => string.Concat(Enumerable.Repeat($"{i} ", i))).ToArray(); + + using Stream stream = GetStream([..expected.Select(s => $"data: {s}{newline}{newline}").SelectMany(Encoding.UTF8.GetBytes)], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(expected.Length, items.Count); + for (int i = 0; i < expected.Length; i++) + { + AssertSseItemEqual(new SseItem("message", expected[i]), items[i]); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Delegate_ThrowsException_Propagates(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream($"data: hello{newline}{newline}data:world{newline}{newline}", trickle); + + SseEnumerable sse = SseParser.Parse(stream, (eventType, bytes) => throw new FormatException(Encoding.UTF8.GetString(bytes.ToArray()))); + + FormatException fe; + if (useAsync) + { + await using IAsyncEnumerator> e = sse.GetAsyncEnumerator(); + fe = await Assert.ThrowsAsync(async () => await e.MoveNextAsync()); + } + else + { + using IEnumerator> e = sse.GetEnumerator(); + fe = Assert.Throws(() => e.MoveNext()); + } + + Assert.Equal("hello", fe.Message); + } + + [Fact] + public void NonGenericEnumerator_ProducesExpectedItems() + { + using Stream stream = GetStream($"data: hello\n\ndata:world\n\n", trickle: false); + + IEnumerable sse = SseParser.Parse(stream); + IEnumerator e = sse.GetEnumerator(); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "hello"), (SseItem)e.Current); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("message", "world"), (SseItem)e.Current); + + Assert.False(e.MoveNext()); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task InvalidStream_ReturnsNegativeValueFromRead_Throws(bool useAsync) + { + using Stream stream = new InvalidReadStream(); + + SseEnumerable sse = SseParser.Parse(stream); + + if (useAsync) + { + await using IAsyncEnumerator> e = sse.GetAsyncEnumerator(); + await Assert.ThrowsAsync(async () => await e.MoveNextAsync()); + } + else + { + using IEnumerator> e = sse.GetEnumerator(); + Assert.Throws(() => e.MoveNext()); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task MultipleItemParsers_OpenAI_StreamingResponse(string newline, bool trickle, bool useAsync) + { + string exampleResponse = + $"data: {{\"id\":\"xxx\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{\"content\":\"!\"}},\"index\":0,\"finish_reason\":null}}]}}{newline}{newline}" + + $"data: {{\"id\":\"yyy\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{}},\"index\":0,\"finish_reason\":\"stop\"}}]}}{newline}{newline}" + + $"data: [DONE]{newline}{newline}"; + + using Stream stream = GetStream(exampleResponse, trickle); + + SseItemParser itemParser = (eventType, bytes) => + { + return bytes.SequenceEqual("[DONE]"u8) ? + new ChunkOrDone { Done = true } : + new ChunkOrDone { Json = JsonSerializer.Deserialize(bytes) }; + }; + + List> items = useAsync ? + await ReadAllEventsAsync(stream, itemParser) : + ReadAllEvents(stream, itemParser); + + Assert.Equal(3, items.Count); + Assert.False(items[0].Data.Done); + Assert.False(items[1].Data.Done); + Assert.True(items[2].Data.Done); + } + + private struct ChunkOrDone + { + public JsonElement Json { get; set; } + public bool Done { get; set; } + } + + private static void AssertSseItemEqual(SseItem left, SseItem right) + { + Assert.Equal(left.EventType, right.EventType); + if (left.Data is string leftData && right.Data is string rightData) + { + Assert.Equal($"{leftData.Length} {leftData}", $"{rightData.Length} {rightData}"); + } + else + { + Assert.Equal(left.Data, right.Data); + } + } + + public static IEnumerable NewlineTrickleAsyncData() => + from newline in new[] { "\r", "\n", "\r\n" } + from trickle in new[] { false, true } + from async in new[] { false, true } + select new object[] { newline, trickle, async }; + + private static Stream GetStream(string data, bool trickle) => + GetStream(Encoding.UTF8.GetBytes(data), trickle); + + private static Stream GetStream(byte[] bytes, bool trickle) => + trickle ? new TrickleStream(bytes) : new MemoryStream(bytes); + + private static List> ReadAllEvents(Stream stream) + { + return new List>(SseParser.Parse(stream)); + } + + private static List> ReadAllEvents(Stream stream, SseItemParser parser) + { + return new List>(SseParser.Parse(stream, parser)); + } + + private static async Task>> ReadAllEventsAsync(Stream stream, SseItemParser parser) + { + var list = new List>(); + await foreach (SseItem item in SseParser.Parse(stream, parser)) + { + list.Add(item); + } + + return list; + } + + private static async Task>> ReadAllEventsAsync(Stream stream) + { + var list = new List>(); + await foreach (SseItem item in SseParser.Parse(stream)) + { + list.Add(item); + } + + return list; + } + + /// Stream where each read reads at most one byte and where every asynchronous operation yields. + private sealed class TrickleStream : MemoryStream + { + public TrickleStream(byte[] buffer) : base(buffer) { } + + public override int Read(byte[] buffer, int offset, int count) => + base.Read(buffer, offset, Math.Min(count, 1)); + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await Task.Yield(); + return await base.ReadAsync(buffer, offset, Math.Min(count, 1), cancellationToken); + } + +#if NET + public override int Read(Span buffer) => + base.Read(buffer.Slice(0, Math.Min(buffer.Length, 1))); + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + await Task.Yield(); + return await base.ReadAsync(buffer.Slice(0, Math.Min(buffer.Length, 1)), cancellationToken); + } +#endif + } + + private sealed class InvalidReadStream : MemoryStream + { + public override int Read(byte[] buffer, int offset, int count) => + -1; + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + Task.FromResult(-1); + +#if NET + public override int Read(Span buffer) => + -1; + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + ValueTask.FromResult(-1); +#endif + } + } +} diff --git a/src/libraries/System.Formats.Sse/tests/System.Formats.Sse.Tests.csproj b/src/libraries/System.Formats.Sse/tests/System.Formats.Sse.Tests.csproj new file mode 100644 index 00000000000000..0bcf28cf463329 --- /dev/null +++ b/src/libraries/System.Formats.Sse/tests/System.Formats.Sse.Tests.csproj @@ -0,0 +1,19 @@ + + + + $(NetCoreAppCurrent);$(NetFrameworkMinimum) + + + + + + + + + + + + + + +