From 7f83e8bdef6a219ca778e16b0a331538f0518191 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Mon, 13 May 2024 18:05:15 -0400 Subject: [PATCH] Add System.Net.ServerSentEvents --- .../System.Net.ServerSentEvents.sln | 48 + .../ref/System.Net.ServerSentEvents.cs | 33 + .../ref/System.Net.ServerSentEvents.csproj | 16 + .../src/PACKAGE.md | 52 + .../src/Resources/Strings.resx | 126 +++ .../src/System.Net.ServerSentEvents.csproj | 32 + .../System/Net/ServerSentEvents/SseItem.cs | 25 + .../Net/ServerSentEvents/SseItemParser.cs | 12 + .../System/Net/ServerSentEvents/SseParser.cs | 58 + .../Net/ServerSentEvents/SseParser_1.cs | 532 +++++++++ .../tests/SseParserTests.cs | 1001 +++++++++++++++++ .../System.Net.ServerSentEvents.Tests.csproj | 19 + 12 files changed, 1954 insertions(+) create mode 100644 src/libraries/System.Net.ServerSentEvents/System.Net.ServerSentEvents.sln create mode 100644 src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj create mode 100644 src/libraries/System.Net.ServerSentEvents/src/PACKAGE.md create mode 100644 src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItemParser.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj diff --git a/src/libraries/System.Net.ServerSentEvents/System.Net.ServerSentEvents.sln b/src/libraries/System.Net.ServerSentEvents/System.Net.ServerSentEvents.sln new file mode 100644 index 0000000000000..a827c8cdae4fe --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/System.Net.ServerSentEvents.sln @@ -0,0 +1,48 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.11.34910.147 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.ServerSentEvents", "ref\System.Net.ServerSentEvents.csproj", "{ACB7E0BF-015F-43DC-A2F5-85506173B223}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.ServerSentEvents", "src\System.Net.ServerSentEvents.csproj", "{ACDB56AF-7B9F-4762-9764-D6FF09118D09}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.ServerSentEvents.Tests", "tests\System.Net.ServerSentEvents.Tests.csproj", "{804B5D44-05A3-491E-A6AB-35C592E6703E}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{2BD73108-47D7-40E6-BFCB-169E6AD42A81}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{E6AF8CEE-6550-4190-97D4-D51C5B114919}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{D908DCBE-EFA4-4CCA-9A1C-AEB48D59C504}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ACB7E0BF-015F-43DC-A2F5-85506173B223}.Release|Any CPU.Build.0 = Release|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ACDB56AF-7B9F-4762-9764-D6FF09118D09}.Release|Any CPU.Build.0 = Release|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {804B5D44-05A3-491E-A6AB-35C592E6703E}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {ACB7E0BF-015F-43DC-A2F5-85506173B223} = {E6AF8CEE-6550-4190-97D4-D51C5B114919} + {ACDB56AF-7B9F-4762-9764-D6FF09118D09} = {D908DCBE-EFA4-4CCA-9A1C-AEB48D59C504} + {804B5D44-05A3-491E-A6AB-35C592E6703E} = {2BD73108-47D7-40E6-BFCB-169E6AD42A81} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {01DAF96B-AF8E-4576-A1BC-57D19BDB317E} + EndGlobalSection +EndGlobal diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs new file mode 100644 index 0000000000000..994108c08e6c3 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs @@ -0,0 +1,33 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// ------------------------------------------------------------------------------ +// Changes to this file must follow the https://aka.ms/api-review process. +// ------------------------------------------------------------------------------ + +namespace System.Net.ServerSentEvents +{ + public delegate T SseItemParser(string eventType, System.ReadOnlySpan data); + public readonly partial struct SseItem + { + private readonly T _Data_k__BackingField; + private readonly object _dummy; + private readonly int _dummyPrimitive; + public SseItem(T data, string eventType) { throw null; } + public T Data { get { throw null; } } + public string EventType { get { throw null; } } + } + public static partial class SseParser + { + public const string EventTypeDefault = "message"; + public static System.Net.ServerSentEvents.SseParser Create(System.IO.Stream sseStream) { throw null; } + public static System.Net.ServerSentEvents.SseParser Create(System.IO.Stream sseStream, System.Net.ServerSentEvents.SseItemParser itemParser) { throw null; } + } + public sealed partial class SseParser + { + internal SseParser() { } + public string LastEventId { get { throw null; } } + public System.TimeSpan ReconnectionInterval { get { throw null; } } + public System.Collections.Generic.IEnumerable> Enumerate() { throw null; } + public System.Collections.Generic.IAsyncEnumerable> EnumerateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj new file mode 100644 index 0000000000000..114d16ec0664b --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj @@ -0,0 +1,16 @@ + + + + $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) + + + + + + + + + + + + diff --git a/src/libraries/System.Net.ServerSentEvents/src/PACKAGE.md b/src/libraries/System.Net.ServerSentEvents/src/PACKAGE.md new file mode 100644 index 0000000000000..dabf094e76fe7 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/PACKAGE.md @@ -0,0 +1,52 @@ +## About + + + +System.Net.ServerSentEvents provides the `SseParser` type, which exposes factory methods for creating parsers for the events in a stream of server-sent events (SSE). + +## Key Features + + + +* Parser for server-sent events (SSE) + +## How to Use + + + +Asynchronously parsing event contents as strings + +```csharp +using HttpClient client = new(); +using Stream response = await client.GetStreamAsync("https://localhost:12345/sse"); +await foreach (SseItem item in SseParser.Create(response).EnumerateAsync()) +{ + Console.WriteLine(item.Data); +} +``` + +Synchronously parsing event contents as JSON + +```csharp +MemoryStream stream = new(data); +foreach (SseItem item in SseParser.Create(response, (eventType, bytes) => JsonSerializer.Deserialize(bytes)).Enumerate()) +{ + Console.WriteLine(item.Author); +} +``` + +## Main Types + + + +The main types provided by this library are: + +* `System.Net.ServerSentEvents.SseParser` +* `System.Net.ServerSentEvents.SseParser` +* `System.Net.ServerSentEvents.SseItem` + +## Feedback & Contributing + + + +System.Net.ServerSentEvents is released as open source under the [MIT license](https://licenses.nuget.org/MIT). Bug reports and contributions are welcome at [the GitHub repository](https://github.com/dotnet/runtime). diff --git a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx new file mode 100644 index 0000000000000..fd5898f283013 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx @@ -0,0 +1,126 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + text/microsoft-resx + + + 2.0 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + The enumerable may be enumerated only once. + + + The Stream implementation is invalid, returning a negative number from a read operation. + + \ No newline at end of file diff --git a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj new file mode 100644 index 0000000000000..b57f9e5c50739 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj @@ -0,0 +1,32 @@ + + + + $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) + true + true + true + Provides a simple parser for server-sent events (SSE). + +Commonly Used Types: +System.Net.ServerSentEvents.SseParser + + + true + + + + + + + + + + + + + + + + diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs new file mode 100644 index 0000000000000..061ed89e42ae8 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs @@ -0,0 +1,25 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Net.ServerSentEvents +{ + /// Represents a server-sent event. + /// Specifies the type of data payload in the event. + public readonly struct SseItem + { + /// Initializes the server-sent event. + /// The event's payload. + /// The event's type. + public SseItem(T data, string eventType) + { + Data = data; + EventType = eventType; + } + + /// Gets the event's payload. + public T Data { get; } + + /// Gets the event's type. + public string EventType { get; } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItemParser.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItemParser.cs new file mode 100644 index 0000000000000..67654dc5cd24f --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItemParser.cs @@ -0,0 +1,12 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Net.ServerSentEvents +{ + /// Encapsulates a method for parsing the bytes payload of a server-sent event. + /// Specifies the type of the return value of the parser. + /// The event's type. + /// The event's payload bytes. + /// The parsed . + public delegate T SseItemParser(string eventType, ReadOnlySpan data); +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs new file mode 100644 index 0000000000000..26543247aacff --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs @@ -0,0 +1,58 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading; + +namespace System.Net.ServerSentEvents +{ + /// Provides a parser for parsing server-sent events. + public static class SseParser + { + /// The default ("message") for an event that did not explicitly specify a type. + public const string EventTypeDefault = "message"; + + /// Creates a parser for parsing a of server-sent events into a sequence of values. + /// The stream containing the data to parse. + /// + /// The enumerable of strings, which may be enumerated synchronously or asynchronously. The strings + /// are decoded from the UTF8-encoded bytes of the payload of each event. + /// + /// is null. + /// + /// This overload has behavior equivalent to calling with a delegate + /// that decodes the data of each event using 's GetString method. + /// + public static SseParser Create(Stream sseStream) => + Create(sseStream, static (_, bytes) => Utf8GetString(bytes)); + + /// Creates a parser for parsing a of server-sent events into a sequence of values. + /// Specifies the type of data in each event. + /// The stream containing the data to parse. + /// The parser to use to transform each payload of bytes into a data element. + /// The enumerable, which may be enumerated synchronously or asynchronously. + /// is null. + /// is null. + public static SseParser Create(Stream sseStream, SseItemParser itemParser) => + new SseParser( + sseStream ?? throw new ArgumentNullException(nameof(sseStream)), + itemParser ?? throw new ArgumentNullException(nameof(itemParser))); + + /// Encoding.UTF8.GetString(bytes) + internal static unsafe string Utf8GetString(ReadOnlySpan bytes) + { +#if NET + return Encoding.UTF8.GetString(bytes); +#else + fixed (byte* ptr = bytes) + { + return ptr is null ? + string.Empty : + Encoding.UTF8.GetString(ptr, bytes.Length); + } +#endif + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs new file mode 100644 index 0000000000000..0e342d017fcf3 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs @@ -0,0 +1,532 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.ServerSentEvents +{ + /// Provides a parser for server-sent events information. + /// Specifies the type of data parsed from an event. + public sealed class SseParser + { + // For reference: + // Specification: https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events + + /// Carriage Return. + private const byte CR = (byte)'\r'; + /// Line Feed. + private const byte LF = (byte)'\n'; + /// Carriage Return Line Feed. + private static ReadOnlySpan CRLF => "\r\n"u8; + + /// The default size of an ArrayPool buffer to rent. + /// Larger size used by default to minimize number of reads. Smaller size used in debug to stress growth/shifting logic. + private const int DefaultArrayPoolRentSize = +#if DEBUG + 16; +#else + 1024; +#endif + + /// The stream to be parsed. + private readonly Stream _stream; + /// The parser delegate used to transform bytes into a . + private readonly SseItemParser _itemParser; + + /// Indicates whether the enumerable has already been used for enumeration. + private int _used; + + /// Buffer, either empty or rented, containing the data being read from the stream while looking for the next line. + private byte[] _lineBuffer = []; + /// The starting offset of valid data in . + private int _lineOffset; + /// The length of valid data in , starting from . + private int _lineLength; + /// The index in where a newline ('\r', '\n', or "\r\n") was found. + private int _newlineIndex; + /// The index in of characters already checked for newlines. + /// + /// This is to avoid O(LineLength^2) behavior in the rare case where we have long lines that are built-up over multiple reads. + /// We want to avoid re-checking the same characters we've already checked over and over again. + /// + private int _lastSearchedForNewline; + /// Set when eof has been reached in the stream. + private bool _eof; + + /// Rented buffer containing buffered data for the next event. + private byte[]? _dataBuffer; + /// The length of valid data in , starting from index 0. + private int _dataLength; + /// Whether data has been appended to . + /// This can be different than != 0 if empty data was appended. + private bool _dataAppended; + + /// The event type for the next event. + private string _eventType = SseParser.EventTypeDefault; + + /// Initialize the enumerable. + /// The stream to parse. + /// The function to use to parse payload bytes into a . + internal SseParser(Stream stream, SseItemParser itemParser) + { + _stream = stream; + _itemParser = itemParser; + } + + /// Gets an enumerable of the server-sent events from this parser. + /// The parser has already been enumerated. Such an exception may propagate out of a call to . + public IEnumerable> Enumerate() + { + // Validate that the parser is only used for one enumeration. + ThrowIfNotFirstEnumeration(); + + // Rent a line buffer. This will grow as needed. The line buffer is what's passed to the stream, + // so we want it to be large enough to reduce the number of reads we need to do when data is + // arriving quickly. (In debug, we use a smaller buffer to stress the growth and shifting logic.) + _lineBuffer = ArrayPool.Shared.Rent(DefaultArrayPoolRentSize); + try + { + // Spec: "Event streams in this format must always be encoded as UTF-8". + // Skip a UTF8 BOM if it exists at the beginning of the stream. (The BOM is defined as optional in the SSE grammar.) + while (FillLineBuffer() != 0 && _lineLength < Utf8Bom.Length) ; + SkipBomIfPresent(); + + // Process all events in the stream. + while (true) + { + // See if there's a complete line in data already read from the stream. Lines are permitted to + // end with CR, LF, or CRLF. Look for all of them and if we find one, process the line. However, + // if we only find a CR and it's at the end of the read data, don't process it now, as we want + // to process it together with an LF that might immediately follow, rather than treating them + // as two separate characters, in which case we'd incorrectly process the CR as a line by itself. + GetNextSearchOffsetAndLength(out int searchOffset, out int searchLength); + _newlineIndex = _lineBuffer.AsSpan(searchOffset, searchLength).IndexOfAny(CR, LF); + if (_newlineIndex >= 0) + { + _lastSearchedForNewline = -1; + _newlineIndex += searchOffset; + if (_lineBuffer[_newlineIndex] is LF || // the newline is LF + _newlineIndex - _lineOffset + 1 < _lineLength || // we must have CR and we have whatever comes after it + _eof) // if we get here, we know we have a CR at the end of the buffer, so it's definitely the whole newline if we've hit EOF + { + // Process the line. + if (ProcessLine(out SseItem sseItem, out int advance)) + { + yield return sseItem; + } + + // Move past the line. + _lineOffset += advance; + _lineLength -= advance; + continue; + } + } + else + { + // Record the last position searched for a newline. The next time we search, + // we'll search from here rather than from _lineOffset, in order to avoid searching + // the same characters again. + _lastSearchedForNewline = _lineOffset + _lineLength; + } + + // We've processed everything in the buffer we currently can, so if we've already read EOF, we're done. + if (_eof) + { + // Spec: "Once the end of the file is reached, any pending data must be discarded. (If the file ends in the middle of an + // event, before the final empty line, the incomplete event is not dispatched.)" + break; + } + + // Read more data into the buffer. + FillLineBuffer(); + } + } + finally + { + ArrayPool.Shared.Return(_lineBuffer); + if (_dataBuffer is not null) + { + ArrayPool.Shared.Return(_dataBuffer); + } + } + } + + /// Gets an asynchronous enumerable of the server-sent events from this parser. + /// The cancellation token to use to cancel the enumeration. + /// The parser has already been enumerated. Such an exception may propagate out of a call to . + /// The enumeration was canceled. Such an exception may propagate out of a call to . + public async IAsyncEnumerable> EnumerateAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + // Validate that the parser is only used for one enumeration. + ThrowIfNotFirstEnumeration(); + + // Rent a line buffer. This will grow as needed. The line buffer is what's passed to the stream, + // so we want it to be large enough to reduce the number of reads we need to do when data is + // arriving quickly. (In debug, we use a smaller buffer to stress the growth and shifting logic.) + _lineBuffer = ArrayPool.Shared.Rent(DefaultArrayPoolRentSize); + try + { + // Spec: "Event streams in this format must always be encoded as UTF-8". + // Skip a UTF8 BOM if it exists at the beginning of the stream. (The BOM is defined as optional in the SSE grammar.) + while (await FillLineBufferAsync(cancellationToken).ConfigureAwait(false) != 0 && _lineLength < Utf8Bom.Length) ; + SkipBomIfPresent(); + + // Process all events in the stream. + while (true) + { + // See if there's a complete line in data already read from the stream. Lines are permitted to + // end with CR, LF, or CRLF. Look for all of them and if we find one, process the line. However, + // if we only find a CR and it's at the end of the read data, don't process it now, as we want + // to process it together with an LF that might immediately follow, rather than treating them + // as two separate characters, in which case we'd incorrectly process the CR as a line by itself. + GetNextSearchOffsetAndLength(out int searchOffset, out int searchLength); + _newlineIndex = _lineBuffer.AsSpan(searchOffset, searchLength).IndexOfAny(CR, LF); + if (_newlineIndex >= 0) + { + _lastSearchedForNewline = -1; + _newlineIndex += searchOffset; + if (_lineBuffer[_newlineIndex] is LF || // newline is LF + _newlineIndex - _lineOffset + 1 < _lineLength || // newline is CR, and we have whatever comes after it + _eof) // if we get here, we know we have a CR at the end of the buffer, so it's definitely the whole newline if we've hit EOF + { + // Process the line. + if (ProcessLine(out SseItem sseItem, out int advance)) + { + yield return sseItem; + } + + // Move past the line. + _lineOffset += advance; + _lineLength -= advance; + continue; + } + } + else + { + // Record the last position searched for a newline. The next time we search, + // we'll search from here rather than from _lineOffset, in order to avoid searching + // the same characters again. + _lastSearchedForNewline = searchOffset + searchLength; + } + + // We've processed everything in the buffer we currently can, so if we've already read EOF, we're done. + if (_eof) + { + // Spec: "Once the end of the file is reached, any pending data must be discarded. (If the file ends in the middle of an + // event, before the final empty line, the incomplete event is not dispatched.)" + break; + } + + // Read more data into the buffer. + await FillLineBufferAsync(cancellationToken).ConfigureAwait(false); + } + } + finally + { + ArrayPool.Shared.Return(_lineBuffer); + if (_dataBuffer is not null) + { + ArrayPool.Shared.Return(_dataBuffer); + } + } + } + + /// Gets the next index and length with which to perform a newline search. + private void GetNextSearchOffsetAndLength(out int searchOffset, out int searchLength) + { + if (_lastSearchedForNewline > _lineOffset) + { + searchOffset = _lastSearchedForNewline; + searchLength = _lineLength - (_lastSearchedForNewline - _lineOffset); + } + else + { + searchOffset = _lineOffset; + searchLength = _lineLength; + } + + Debug.Assert(searchOffset >= _lineOffset, $"{searchOffset}, {_lineLength}"); + Debug.Assert(searchOffset <= _lineOffset + _lineLength, $"{searchOffset}, {_lineOffset}, {_lineLength}"); + Debug.Assert(searchOffset <= _lineBuffer.Length, $"{searchOffset}, {_lineBuffer.Length}"); + + Debug.Assert(searchLength >= 0, $"{searchLength}"); + Debug.Assert(searchLength <= _lineLength, $"{searchLength}, {_lineLength}"); + } + + private int GetNewLineLength() + { + Debug.Assert(_newlineIndex - _lineOffset < _lineLength, "Expected to be positioned at a non-empty newline"); + return _lineBuffer.AsSpan(_newlineIndex, _lineLength - (_newlineIndex - _lineOffset)).StartsWith(CRLF) ? 2 : 1; + } + + /// + /// If there's no room remaining in the line buffer, either shifts the contents + /// left or grows the buffer in order to make room for the next read. + /// + private void ShiftOrGrowLineBufferIfNecessary() + { + // If data we've read is butting up against the end of the buffer and + // it's not taking up the entire buffer, slide what's there down to + // the beginning, making room to read more data into the buffer (since + // there's no newline in the data that's there). Otherwise, if the whole + // buffer is full, grow the buffer to accommodate more data, since, again, + // what's there doesn't contain a newline and thus a line is longer than + // the current buffer accomodates. + if (_lineOffset + _lineLength == _lineBuffer.Length) + { + if (_lineOffset != 0) + { + _lineBuffer.AsSpan(_lineOffset, _lineLength).CopyTo(_lineBuffer); + if (_lastSearchedForNewline >= 0) + { + _lastSearchedForNewline -= _lineOffset; + } + _lineOffset = 0; + } + else if (_lineLength == _lineBuffer.Length) + { + GrowBuffer(ref _lineBuffer, _lineBuffer.Length * 2); + } + } + } + + /// Processes a complete line from the SSE stream. + /// The parsed item if the method returns true. + /// How many characters to advance in the line buffer. + /// true if an SSE item was successfully parsed; otherwise, false. + private bool ProcessLine(out SseItem sseItem, out int advance) + { + ReadOnlySpan line = _lineBuffer.AsSpan(_lineOffset, _newlineIndex - _lineOffset); + + // Spec: "If the line is empty (a blank line) Dispatch the event" + if (line.IsEmpty) + { + advance = GetNewLineLength(); + + if (_dataAppended) + { + sseItem = new SseItem(_itemParser(_eventType, _dataBuffer.AsSpan(0, _dataLength)), _eventType); + _eventType = SseParser.EventTypeDefault; + _dataLength = 0; + _dataAppended = false; + return true; + } + + sseItem = default; + return false; + } + + // Find the colon separating the field name and value. + int colonPos = line.IndexOf((byte)':'); + ReadOnlySpan fieldName; + ReadOnlySpan fieldValue; + if (colonPos >= 0) + { + // Spec: "Collect the characters on the line before the first U+003A COLON character (:), and let field be that string." + fieldName = line.Slice(0, colonPos); + + // Spec: "Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. + // If value starts with a U+0020 SPACE character, remove it from value." + fieldValue = line.Slice(colonPos + 1); + if (!fieldValue.IsEmpty && fieldValue[0] == (byte)' ') + { + fieldValue = fieldValue.Slice(1); + } + } + else + { + // Spec: "using the whole line as the field name, and the empty string as the field value." + fieldName = line; + fieldValue = []; + } + + if (fieldName.SequenceEqual("data"u8)) + { + // Spec: "Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer." + + // If there's nothing currently in the data buffer and we can easily detect that this line is immediately followed by + // an empty line, we can optimize it to just handle the data directly from the line buffer, rather than first copying + // into the data buffer and dispatching from there. + if (!_dataAppended) + { + int newlineLength = GetNewLineLength(); + ReadOnlySpan remainder = _lineBuffer.AsSpan(_newlineIndex + newlineLength, _lineLength - line.Length - newlineLength); + if (!remainder.IsEmpty && + (remainder[0] is LF || (remainder[0] is CR && remainder.Length > 1))) + { + advance = line.Length + newlineLength + (remainder.StartsWith(CRLF) ? 2 : 1); + sseItem = new SseItem(_itemParser(_eventType, fieldValue), _eventType); + _eventType = SseParser.EventTypeDefault; + return true; + } + } + + // We need to copy the data from the data buffer to the line buffer. Make sure there's enough room. + if (_dataBuffer is null || _dataLength + _lineLength + 1 > _dataBuffer.Length) + { + GrowBuffer(ref _dataBuffer, _dataLength + _lineLength + 1); + } + + // Append a newline if there's already content in the buffer. + // Then copy the field value to the data buffer + if (_dataAppended) + { + _dataBuffer[_dataLength++] = LF; + } + fieldValue.CopyTo(_dataBuffer.AsSpan(_dataLength)); + _dataLength += fieldValue.Length; + _dataAppended = true; + } + else if (fieldName.SequenceEqual("event"u8)) + { + // Spec: "Set the event type buffer to field value." + _eventType = SseParser.Utf8GetString(fieldValue); + } + else if (fieldName.SequenceEqual("id"u8)) + { + // Spec: "If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value. Otherwise, ignore the field." + if (fieldValue.IndexOf((byte)'\0') < 0) + { + // Note that fieldValue might be empty, in which case LastEventId will naturally be reset to the empty string. This is per spec. + LastEventId = SseParser.Utf8GetString(fieldValue); + } + } + else if (fieldName.SequenceEqual("retry"u8)) + { + // Spec: "If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten, + // and set the event stream's reconnection time to that integer. Otherwise, ignore the field." + if (long.TryParse( +#if NET + fieldValue, +#else + SseParser.Utf8GetString(fieldValue), +#endif + NumberStyles.None, CultureInfo.InvariantCulture, out long milliseconds)) + { + ReconnectionInterval = TimeSpan.FromMilliseconds(milliseconds); + } + } + else + { + // We'll end up here if the line starts with a colon, producing an empty field name, or if the field name is otherwise unrecognized. + // Spec: "If the line starts with a U+003A COLON character (:) Ignore the line." + // Spec: "Otherwise, The field is ignored" + } + + advance = line.Length + GetNewLineLength(); + sseItem = default; + return false; + } + + /// Gets the last event ID. + /// This value is updated any time a new last event ID is parsed. It is not reset between SSE items. + public string LastEventId { get; private set; } = string.Empty; // Spec: "must be initialized to the empty string" + + /// Gets the reconnection interval. + /// + /// If no retry event was received, this defaults to , and it will only + /// ever be in that situation. If a client wishes to retry, the server-sent + /// events specification states that the interval may then be decided by the client implementation and should be a + /// few seconds. + /// + public TimeSpan ReconnectionInterval { get; private set; } = Timeout.InfiniteTimeSpan; + + /// Transitions the object to a used state, throwing if it's already been used. + private void ThrowIfNotFirstEnumeration() + { + if (Interlocked.Exchange(ref _used, 1) != 0) + { + throw new InvalidOperationException(SR.InvalidOperation_EnumerateOnlyOnce); + } + } + + /// Reads data from the stream into the line buffer. + private int FillLineBuffer() + { + ShiftOrGrowLineBufferIfNecessary(); + + int offset = _lineOffset + _lineLength; + int bytesRead = _stream.Read( +#if NET + _lineBuffer.AsSpan(offset)); +#else + _lineBuffer, offset, _lineBuffer.Length - offset); +#endif + + if (bytesRead <= 0) + { + _eof = true; + if (bytesRead < 0) + { + throw new InvalidOperationException(SR.InvalidOperation_InvalidStreamNegativeBytes); + } + } + + _lineLength += bytesRead; + return bytesRead; + } + + /// Reads data asynchronously from the stream into the line buffer. + private async ValueTask FillLineBufferAsync(CancellationToken cancellationToken) + { + ShiftOrGrowLineBufferIfNecessary(); + + int offset = _lineOffset + _lineLength; + int bytesRead = await +#if NET + _stream.ReadAsync(_lineBuffer.AsMemory(offset), cancellationToken) +#else + new ValueTask(_stream.ReadAsync(_lineBuffer, offset, _lineBuffer.Length - offset, cancellationToken)) +#endif + .ConfigureAwait(false); + + if (bytesRead <= 0) + { + _eof = true; + if (bytesRead < 0) + { + throw new InvalidOperationException(SR.InvalidOperation_InvalidStreamNegativeBytes); + } + } + + _lineLength += bytesRead; + return bytesRead; + } + + /// Gets the UTF8 BOM. + private static ReadOnlySpan Utf8Bom => [0xEF, 0xBB, 0xBF]; + + /// Called at the beginning of processing to skip over an optional UTF8 byte order mark. + private void SkipBomIfPresent() + { + Debug.Assert(_lineOffset == 0, $"Expected _lineOffset == 0, got {_lineOffset}"); + + if (_lineBuffer.AsSpan(0, _lineLength).StartsWith(Utf8Bom)) + { + _lineOffset += 3; + _lineLength -= 3; + } + } + + /// Grows the buffer, returning the existing one to the ArrayPool and renting an ArrayPool replacement. + private static void GrowBuffer([NotNull] ref byte[]? buffer, int minimumLength) + { + byte[]? toReturn = buffer; + buffer = ArrayPool.Shared.Rent(Math.Max(minimumLength, DefaultArrayPoolRentSize)); + if (toReturn is not null) + { + Array.Copy(toReturn, buffer, toReturn.Length); + ArrayPool.Shared.Return(toReturn); + } + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs new file mode 100644 index 0000000000000..9df8e71cb980e --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs @@ -0,0 +1,1001 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.ServerSentEvents.Tests +{ + public partial class SseParserTests + { + [Fact] + public void Parse_InvalidArguments_Throws() + { + AssertExtensions.Throws("sseStream", () => SseParser.Create(null)); + AssertExtensions.Throws("sseStream", () => SseParser.Create(null, delegate { return ""; })); + AssertExtensions.Throws("itemParser", () => SseParser.Create(Stream.Null, null)); + } + + [Fact] + public async Task Parse_Sync_SupportsOnlyOneEnumeration_Throws() + { + SseParser parser = SseParser.Create(Stream.Null); + parser.Enumerate().GetEnumerator().MoveNext(); + var e = parser.Enumerate().GetEnumerator(); + var ea = parser.EnumerateAsync().GetAsyncEnumerator(); + Assert.Throws(() => e.MoveNext()); + await Assert.ThrowsAsync(async () => await ea.MoveNextAsync()); + } + + [Fact] + public async Task Parse_Async_SupportsOnlyOneEnumeration_Throws() + { + SseParser parser = SseParser.Create(Stream.Null); + await parser.EnumerateAsync().GetAsyncEnumerator().MoveNextAsync(); + var ea = parser.EnumerateAsync().GetAsyncEnumerator(); + var e = parser.Enumerate().GetEnumerator(); + await Assert.ThrowsAsync(async () => await ea.MoveNextAsync()); + Assert.Throws(() => e.MoveNext()); + } + + [Fact] + public void SseItem_Roundtrips() + { + SseItem item; + + item = new SseItem(); + Assert.Null(item.EventType); + Assert.Null(item.Data); + + item = new SseItem("some data", "eventType"); + Assert.Equal("eventType", item.EventType); + Assert.Equal("some data", item.Data); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_Empty_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream("", trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_BlankLine_NoItems(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream(newline, trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_TwoBlankLines_NoItems(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream(newline + newline, trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_MultipleBlankLinesBetweenItems_AllItemsProduces(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"event:A{newline}" + + $"data:1{newline}" + + $"id:2{newline}" + + $"retry:300{newline}" + + $"{newline}{newline}{newline}{newline}{newline}" + + + $"event:B{newline}" + + $"data:4{newline}" + + $"id:5{newline}" + + $"retry:600{newline}" + + $"{newline}{newline}{newline}{newline}{newline}" + + + $"event:C{newline}" + + $"data:7{newline}" + + $"id:8{newline}" + + $"retry:900{newline}" + + $"{newline}{newline}{newline}{newline}{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + Assert.Equal(stream.Length, stream.Position); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("1", "A"), items[0]); + AssertSseItemEqual(new SseItem("4", "B"), items[1]); + AssertSseItemEqual(new SseItem("7", "C"), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example1(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data: This is the first message.{newline}" + + $"{newline}" + + $"data: This is the second message, it{newline}" + + $"data: has two lines.{newline}" + + $"{newline}" + + $"data: This is the third message.{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("This is the first message.", "message"), items[0]); + AssertSseItemEqual(new SseItem("This is the second message, it\nhas two lines.", "message"), items[1]); + AssertSseItemEqual(new SseItem("This is the third message.", "message"), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example2(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"event: add{newline}data: 73857293{newline}" + + $"{newline}" + + $"event: remove{newline}data: 2153{newline}" + + $"{newline}" + + $"event: add{newline}data: 113411{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("73857293", "add"), items[0]); + AssertSseItemEqual(new SseItem("2153", "remove"), items[1]); + AssertSseItemEqual(new SseItem("113411", "add"), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example3(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data: YHOO{newline}" + + $"data: +2{newline}" + + $"data: 10{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(1, items.Count); + AssertSseItemEqual(new SseItem("YHOO\n+2\n10", "message"), items[0]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example4(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $": test stream{newline}" + + $"{newline}" + + $"data: first event{newline}" + + $"id: 1{newline}" + + $"{newline}" + + $"data:second event{newline}" + + $"id{newline}" + + $"{newline}" + + $"data: third event{newline}" + + $"{newline}", + trickle); + + SseParser parser = SseParser.Create(stream); + if (useAsync) + { + Assert.Equal(string.Empty, parser.LastEventId); + + using IEnumerator> e = parser.Enumerate().GetEnumerator(); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + Assert.Equal("1", parser.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + Assert.Equal(string.Empty, parser.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + Assert.Equal(string.Empty, parser.LastEventId); + } + else + { + Assert.Equal(string.Empty, parser.LastEventId); + + await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + Assert.Equal("1", parser.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + Assert.Equal(string.Empty, parser.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + Assert.Equal(string.Empty, parser.LastEventId); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $": test stream{newline}" + + $"{newline}" + + $"data: first event{newline}" + + $"id: 1{newline}" + + $"{newline}" + + $"data:second event{newline}" + + $"{newline}" + + $"data: third event{newline}" + + $"id: 42{newline}" + + $"{newline}", + trickle); + + SseParser parser = SseParser.Create(stream); + if (useAsync) + { + Assert.Equal(string.Empty, parser.LastEventId); + + using IEnumerator> e = parser.Enumerate().GetEnumerator(); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + Assert.Equal("1", parser.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + Assert.Equal("1", parser.LastEventId); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + Assert.Equal("42", parser.LastEventId); + } + else + { + Assert.Equal(string.Empty, parser.LastEventId); + + await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + Assert.Equal("1", parser.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + Assert.Equal("1", parser.LastEventId); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + Assert.Equal("42", parser.LastEventId); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example5(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data{newline}" + + $"{newline}" + + $"data{newline}" + + $"data{newline}" + + $"{newline}" + + $"data:", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("", "message"), items[0]); + AssertSseItemEqual(new SseItem("\n", "message"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example5_WithColon(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data{newline}" + + $"{newline}" + + $"data:{newline}" + + $"data{newline}" + + $"{newline}" + + $"data:", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("", "message"), items[0]); + AssertSseItemEqual(new SseItem("\n", "message"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Parse_HtmlSpec_Example6(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data:test{newline}" + + $"{newline}" + + $"data: test{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("test", "message"), items[0]); + AssertSseItemEqual(new SseItem("test", "message"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Delegate_EventTypeArgument_MatchesValueFromEvent(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"event: add{newline}data: 73857293{newline}" + + $"{newline}" + + $"event: remove{newline}data: 2153{newline}" + + $"{newline}" + + $"event: add{newline}data: 113411{newline}" + + $"{newline}", + trickle); + + SseItemParser itemParser = (eventType, bytes) => eventType; + + List> items = useAsync ? + await ReadAllEventsAsync(stream, itemParser) : + ReadAllEvents(stream, itemParser); + + Assert.Equal(3, items.Count); + AssertSseItemEqual(new SseItem("add", "add"), items[0]); + AssertSseItemEqual(new SseItem("remove", "remove"), items[1]); + AssertSseItemEqual(new SseItem("add", "add"), items[2]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Retry_SetsReconnectionInterval(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $": test stream{newline}" + + $"{newline}" + + $"data: first event{newline}" + + $"{newline}" + + $"data:second event{newline}" + + $"retry: 42{newline}" + + $"{newline}" + + $"data: third event{newline}" + + $"retry: 12345678910{newline}" + + $"{newline}" + + $"data:fourth event{newline}" + + $"{newline}" + + $"data:fifth event{newline}" + + $"retry: invalidmilliseconds{newline}" + + $"{newline}", + trickle); + + SseParser parser = SseParser.Create(stream); + Assert.Equal(Timeout.InfiniteTimeSpan, parser.ReconnectionInterval); + + if (useAsync) + { + Assert.Equal(string.Empty, parser.LastEventId); + + using IEnumerator> e = parser.Enumerate().GetEnumerator(); + Assert.Equal(Timeout.InfiniteTimeSpan, parser.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + Assert.Equal(Timeout.InfiniteTimeSpan, parser.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(42), parser.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("fourth event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("fifth event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + } + else + { + Assert.Equal(string.Empty, parser.LastEventId); + + await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); + Assert.Equal(Timeout.InfiniteTimeSpan, parser.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(42), parser.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("fourth event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + + Assert.True(await e.MoveNextAsync()); + AssertSseItemEqual(new SseItem("fifth event", "message"), e.Current); + Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task JsonContent_DelegateInvoked(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream( + $"data: {{{newline}" + + $"data: \"title\": \"The Catcher in the Rye\",{newline}" + + $"data: \"author\": \"J.D. Salinger\",{newline}" + + $"data: \"published_year\": 1951,{newline}" + + $"data: \"genre\": \"Fiction\"{newline}" + + $"data: }}{newline}" + + $"{newline}" + + $"data: {{{newline}" + + $"data: \"title\": \"1984\",{newline}" + + $"data: \"author\": \"George Orwell\",{newline}" + + $"data: \"published_year\": 1949,{newline}" + + $"data: \"genre\": \"Fiction\"{newline}" + + $"data: }}{newline}" + + $"{newline}", + trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream, static (eventType, data) => JsonSerializer.Deserialize(data)) : + ReadAllEvents(stream, static (eventType, data) => JsonSerializer.Deserialize(data)); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem(new Book { title = "The Catcher in the Rye", author = "J.D. Salinger", published_year = 1951, genre = "Fiction" }, "message"), items[0]); + AssertSseItemEqual(new SseItem(new Book { title = "1984", author = "George Orwell", published_year = 1949, genre = "Fiction" }, "message"), items[1]); + } + + private struct Book + { + public string title { get; set; } + public string author { get; set; } + public int published_year { get; set; } + public string genre { get; set; } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Valid_Skipped(string newline, bool trickle, bool useAsync) + { + byte[] newlineBytes = Encoding.UTF8.GetBytes(newline); + using Stream stream = GetStream( + [ + 0xEF, 0xBB, 0xBF, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'h', (byte)'i', + ..newlineBytes, + ..newlineBytes, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'t', (byte)'h', (byte)'e', (byte)'r', (byte)'e', + ..newlineBytes, + ..newlineBytes, + ], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(2, items.Count); + AssertSseItemEqual(new SseItem("hi", "message"), items[0]); + AssertSseItemEqual(new SseItem("there", "message"), items[1]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Partial1_LineSkipped(string newline, bool trickle, bool useAsync) + { + byte[] newlineBytes = Encoding.UTF8.GetBytes(newline); + using Stream stream = GetStream( + [ + 0xEF, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'h', (byte)'i', + ..newlineBytes, + ..newlineBytes, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'t', (byte)'h', (byte)'e', (byte)'r', (byte)'e', + ..newlineBytes, + ..newlineBytes, + ], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(1, items.Count); + AssertSseItemEqual(new SseItem("there", "message"), items[0]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Partial2_LineSkipped(string newline, bool trickle, bool useAsync) + { + byte[] newlineBytes = Encoding.UTF8.GetBytes(newline); + using Stream stream = GetStream( + [ + 0xEF, 0xBB, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'h', (byte)'i', + ..newlineBytes, + ..newlineBytes, + (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', + (byte)'t', (byte)'h', (byte)'e', (byte)'r', (byte)'e', + ..newlineBytes, + ..newlineBytes, + ], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(1, items.Count); + AssertSseItemEqual(new SseItem("there", "message"), items[0]); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Interrupted1_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream([0xEF], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Interrupted2_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream([0xEF, 0xBB], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Bom_Interrupted3_NoItems(string newline, bool trickle, bool useAsync) + { + _ = newline; + + using Stream stream = GetStream([0xEF, 0xBB, 0xBF], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(0, items.Count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task LongLines_ItemsProducedCorrectly(string newline, bool trickle, bool useAsync) + { + string[] expected = Enumerable.Range(1, 100).Select(i => string.Concat(Enumerable.Repeat($"{i} ", i))).ToArray(); + + using Stream stream = GetStream([..expected.Select(s => $"data: {s}{newline}{newline}").SelectMany(Encoding.UTF8.GetBytes)], trickle); + + List> items = useAsync ? + await ReadAllEventsAsync(stream) : + ReadAllEvents(stream); + + Assert.Equal(expected.Length, items.Count); + for (int i = 0; i < expected.Length; i++) + { + AssertSseItemEqual(new SseItem(expected[i], "message"), items[i]); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task Delegate_ThrowsException_Propagates(string newline, bool trickle, bool useAsync) + { + using Stream stream = GetStream($"data: hello{newline}{newline}data:world{newline}{newline}", trickle); + + SseParser parser = SseParser.Create(stream, (eventType, bytes) => throw new FormatException(Encoding.UTF8.GetString(bytes.ToArray()))); + + FormatException fe; + if (useAsync) + { + await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); + fe = await Assert.ThrowsAsync(async () => await e.MoveNextAsync()); + } + else + { + using IEnumerator> e = parser.Enumerate().GetEnumerator(); + fe = Assert.Throws(() => e.MoveNext()); + } + + Assert.Equal("hello", fe.Message); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task Cancellation_Propagates(bool cancelEnumerator) + { + using Stream stream = GetStream($"data: hello\n\ndata:world\n\n", trickle: true); + + SseParser parser = SseParser.Create(stream); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + await using IAsyncEnumerator> e = cancelEnumerator ? + parser.EnumerateAsync().GetAsyncEnumerator(cts.Token) : + parser.EnumerateAsync(cts.Token).GetAsyncEnumerator(); + + await Assert.ThrowsAnyAsync(async () => await e.MoveNextAsync()); + } + + [Fact] + public void NonGenericEnumerator_ProducesExpectedItems() + { + using Stream stream = GetStream($"data: hello\n\ndata:world\n\n", trickle: false); + + IEnumerable sse = SseParser.Create(stream).Enumerate(); + IEnumerator e = sse.GetEnumerator(); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("hello", "message"), (SseItem)e.Current); + + Assert.True(e.MoveNext()); + AssertSseItemEqual(new SseItem("world", "message"), (SseItem)e.Current); + + Assert.False(e.MoveNext()); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task InvalidStream_ReturnsNegativeValueFromRead_Throws(bool useAsync) + { + using Stream stream = new InvalidReadStream(); + + SseParser parser = SseParser.Create(stream); + + if (useAsync) + { + await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); + await Assert.ThrowsAsync(async () => await e.MoveNextAsync()); + } + else + { + using IEnumerator> e = parser.Enumerate().GetEnumerator(); + Assert.Throws(() => e.MoveNext()); + } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task MultipleItemParsers_OpenAI_StreamingResponse(string newline, bool trickle, bool useAsync) + { + string exampleResponse = + $"data: {{\"id\":\"xxx\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{\"content\":\"!\"}},\"index\":0,\"finish_reason\":null}}]}}{newline}{newline}" + + $"data: {{\"id\":\"yyy\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{}},\"index\":0,\"finish_reason\":\"stop\"}}]}}{newline}{newline}" + + $"data: [DONE]{newline}{newline}"; + + using Stream stream = GetStream(exampleResponse, trickle); + + SseItemParser itemParser = (eventType, bytes) => + { + return bytes.SequenceEqual("[DONE]"u8) ? + new ChunkOrDone { Done = true } : + new ChunkOrDone { Json = JsonSerializer.Deserialize(bytes) }; + }; + + List> items = useAsync ? + await ReadAllEventsAsync(stream, itemParser) : + ReadAllEvents(stream, itemParser); + + Assert.Equal(3, items.Count); + Assert.False(items[0].Data.Done); + Assert.False(items[1].Data.Done); + Assert.True(items[2].Data.Done); + } + + private struct ChunkOrDone + { + public JsonElement Json { get; set; } + public bool Done { get; set; } + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task ArrayPoolRental_PerItem(string newline, bool trickle, bool useAsync) + { + string exampleResponse = + $"data: {{\"id\":\"xxx\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{\"content\":\"!\"}},\"index\":0,\"finish_reason\":null}}]}}{newline}{newline}" + + $"data: {{\"id\":\"yyy\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{}},\"index\":0,\"finish_reason\":\"stop\"}}]}}{newline}{newline}" + + $"data: [DONE]{newline}{newline}"; + + using Stream stream = GetStream(exampleResponse, trickle); + + SseItemParser> itemParser = (eventType, bytes) => + { + byte[] array = ArrayPool.Shared.Rent(bytes.Length); + bytes.CopyTo(array.AsSpan()); + return new ArraySegment(array, 0, bytes.Length); + }; + + int count = 0; + if (useAsync) + { + foreach (var e in SseParser.Create(stream, itemParser).Enumerate()) + { + try + { + if ("[DONE]"u8.SequenceEqual(e.Data)) + { + break; + } + } + finally + { + ArrayPool.Shared.Return(e.Data.Array); + } + + count++; + } + } + else + { + await foreach (var e in SseParser.Create(stream, itemParser).EnumerateAsync()) + { + try + { + if ("[DONE]"u8.SequenceEqual(e.Data)) + { + break; + } + } + finally + { + ArrayPool.Shared.Return(e.Data.Array); + } + + count++; + } + } + + Assert.Equal(2, count); + } + + [Theory] + [MemberData(nameof(NewlineTrickleAsyncData))] + public async Task ArrayPoolRental_Closure(string newline, bool trickle, bool useAsync) + { + string exampleResponse = + $"data: {{\"id\":\"xxx\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{\"content\":\"!\"}},\"index\":0,\"finish_reason\":null}}]}}{newline}{newline}" + + $"data: {{\"id\":\"yyy\",\"object\":\"chat.completion.chunk\",\"created\":1679168243,\"model\":\"mmm\",\"choices\":[{{\"delta\":{{}},\"index\":0,\"finish_reason\":\"stop\"}}]}}{newline}{newline}" + + $"data: [DONE]{newline}{newline}"; + + using Stream stream = GetStream(exampleResponse, trickle); + + byte[] arrayPoolArray = ArrayPool.Shared.Rent(1); + + SseItemParser> itemParser = (eventType, bytes) => + { + if (arrayPoolArray.Length < bytes.Length) + { + byte[] temp = arrayPoolArray; + arrayPoolArray = ArrayPool.Shared.Rent(bytes.Length); + ArrayPool.Shared.Return(temp); + } + bytes.CopyTo(arrayPoolArray.AsSpan()); + return new ReadOnlyMemory(arrayPoolArray, 0, bytes.Length); + }; + + int count = 0; + if (useAsync) + { + foreach (var e in SseParser.Create(stream, itemParser).Enumerate()) + { + if ("[DONE]"u8.SequenceEqual(e.Data.Span)) + { + break; + } + count++; + } + } + else + { + await foreach (var e in SseParser.Create(stream, itemParser).EnumerateAsync()) + { + if ("[DONE]"u8.SequenceEqual(e.Data.Span)) + { + break; + } + count++; + } + } + + ArrayPool.Shared.Return(arrayPoolArray); + + Assert.Equal(2, count); + } + + private static void AssertSseItemEqual(SseItem left, SseItem right) + { + Assert.Equal(left.EventType, right.EventType); + if (left.Data is string leftData && right.Data is string rightData) + { + Assert.Equal($"{leftData.Length} {leftData}", $"{rightData.Length} {rightData}"); + } + else + { + Assert.Equal(left.Data, right.Data); + } + } + + public static IEnumerable NewlineTrickleAsyncData() => + from newline in new[] { "\r", "\n", "\r\n" } + from trickle in new[] { false, true } + from async in new[] { false, true } + select new object[] { newline, trickle, async }; + + private static Stream GetStream(string data, bool trickle) => + GetStream(Encoding.UTF8.GetBytes(data), trickle); + + private static Stream GetStream(byte[] bytes, bool trickle) => + trickle ? new TrickleStream(bytes) : new MemoryStream(bytes); + + private static List> ReadAllEvents(Stream stream) + { + return new List>(SseParser.Create(stream).Enumerate()); + } + + private static List> ReadAllEvents(Stream stream, SseItemParser parser) + { + return new List>(SseParser.Create(stream, parser).Enumerate()); + } + + private static async Task>> ReadAllEventsAsync(Stream stream, SseItemParser parser) + { + var list = new List>(); + await foreach (SseItem item in SseParser.Create(stream, parser).EnumerateAsync()) + { + list.Add(item); + } + + return list; + } + + private static async Task>> ReadAllEventsAsync(Stream stream) + { + var list = new List>(); + await foreach (SseItem item in SseParser.Create(stream).EnumerateAsync()) + { + list.Add(item); + } + + return list; + } + + /// Stream where each read reads at most one byte and where every asynchronous operation yields. + private sealed class TrickleStream : MemoryStream + { + public TrickleStream(byte[] buffer) : base(buffer) { } + + public override int Read(byte[] buffer, int offset, int count) => + base.Read(buffer, offset, Math.Min(count, 1)); + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + return await base.ReadAsync(buffer, offset, Math.Min(count, 1), cancellationToken); + } + +#if NET + public override int Read(Span buffer) => + base.Read(buffer.Slice(0, Math.Min(buffer.Length, 1))); + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + return await base.ReadAsync(buffer.Slice(0, Math.Min(buffer.Length, 1)), cancellationToken); + } +#endif + } + + private sealed class InvalidReadStream : MemoryStream + { + public override int Read(byte[] buffer, int offset, int count) => + -1; + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + Task.FromResult(-1); + +#if NET + public override int Read(Span buffer) => + -1; + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + ValueTask.FromResult(-1); +#endif + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj b/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj new file mode 100644 index 0000000000000..e2b3d73610d48 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj @@ -0,0 +1,19 @@ + + + + $(NetCoreAppCurrent);$(NetFrameworkMinimum) + + + + + + + + + + + + + + +