diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index cef2bba4e..c814e556a 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -24,17 +24,11 @@ jobs: steps: - name: Install nats run: | - rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//) + rel=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/natscli/latest | sed s/v//) wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip unzip nats-$rel-linux-amd64.zip sudo mv nats-$rel-linux-amd64/nats /usr/local/bin - gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases" - branch="${{ matrix.config.branch }}" - if [[ $branch == "v"* ]]; then - branch=$(curl -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1) - elif [[ $branch == "latest" ]]; then - branch=$(curl -s $gh_api_url/latest | jq -r .tag_name) - fi + branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6adbc0225..3636ec585 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,13 +24,7 @@ jobs: steps: - name: Install nats-server run: | - gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases" - branch="${{ matrix.config.branch }}" - if [[ $branch == "v"* ]]; then - branch=$(curl -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1) - elif [[ $branch == "latest" ]]; then - branch=$(curl -s $gh_api_url/latest | jq -r .tag_name) - fi + branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 @@ -119,13 +113,7 @@ jobs: shell: bash run: | mkdir tools-nats-server && cd tools-nats-server - gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases" - branch="${{ matrix.config.branch }}" - if [[ $branch == "v"* ]]; then - branch=$(curl -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1) - elif [[ $branch == "latest" ]]; then - branch=$(curl -s $gh_api_url/latest | jq -r .tag_name) - fi + branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 diff --git a/NATS.Client.sln b/NATS.Client.sln index d36b69e4b..27c245115 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -109,11 +109,11 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Platform.Windows.Tests", "tests\NATS.Client.Platform.Windows.Tests\NATS.Client.Platform.Windows.Tests.csproj", "{A37994CC-A23A-415E-8B61-9468C7178A55}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client", "src\NATS.Client\NATS.Client.csproj", "{48F1F736-3D87-4453-B497-BD9C203B2385}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Client", "sandbox\Example.Client\Example.Client.csproj", "{A15CCDD5-B707-4142-B99A-64F0AB62318A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Tests", "tests\NATS.Client.Tests\NATS.Client.Tests.csproj", "{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Simplified.Tests", "tests\NATS.Client.Simplified.Tests\NATS.Client.Simplified.Tests.csproj", "{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Simplified", "src\NATS.Client.Simplified\NATS.Client.Simplified.csproj", "{227C88B1-0510-4010-B142-C44725578FCD}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -297,10 +297,6 @@ Global {A37994CC-A23A-415E-8B61-9468C7178A55}.Debug|Any CPU.Build.0 = Debug|Any CPU {A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.ActiveCfg = Release|Any CPU {A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.Build.0 = Release|Any CPU - {48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.Build.0 = Debug|Any CPU - {48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.ActiveCfg = Release|Any CPU - {48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.Build.0 = Release|Any CPU {A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.Build.0 = Debug|Any CPU {A15CCDD5-B707-4142-B99A-64F0AB62318A}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -309,6 +305,10 @@ Global {6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Debug|Any CPU.Build.0 = Debug|Any CPU {6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.ActiveCfg = Release|Any CPU {6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.Build.0 = Release|Any CPU + {227C88B1-0510-4010-B142-C44725578FCD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {227C88B1-0510-4010-B142-C44725578FCD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {227C88B1-0510-4010-B142-C44725578FCD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {227C88B1-0510-4010-B142-C44725578FCD}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -359,9 +359,9 @@ Global {474BA453-9CFF-41C2-B2E7-ADD92CC93E86} = {95A69671-16CA-4133-981C-CC381B7AAA30} {B8554582-DE19-41A2-9784-9B27C9F22429} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {A37994CC-A23A-415E-8B61-9468C7178A55} = {C526E8AB-739A-48D7-8FC4-048978C9B650} - {48F1F736-3D87-4453-B497-BD9C203B2385} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {A15CCDD5-B707-4142-B99A-64F0AB62318A} = {95A69671-16CA-4133-981C-CC381B7AAA30} {6DAAAA87-8DDF-4E60-81CE-D8900327DE33} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {227C88B1-0510-4010-B142-C44725578FCD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/sandbox/Example.Client/Example.Client.csproj b/sandbox/Example.Client/Example.Client.csproj index 607b57d7b..186ac3df7 100644 --- a/sandbox/Example.Client/Example.Client.csproj +++ b/sandbox/Example.Client/Example.Client.csproj @@ -8,7 +8,7 @@ - + diff --git a/sandbox/Example.Client/Program.cs b/sandbox/Example.Client/Program.cs index 835438fa9..0e78c0ba6 100644 --- a/sandbox/Example.Client/Program.cs +++ b/sandbox/Example.Client/Program.cs @@ -1,7 +1,7 @@ // See https://aka.ms/new-console-template for more information using System.Text; -using NATS.Client; +using NATS.Net; CancellationTokenSource cts = new(); diff --git a/sandbox/MicroBenchmark/NewInboxBenchmarks.cs b/sandbox/MicroBenchmark/NewInboxBenchmarks.cs index 9b65cca79..0170a10e7 100644 --- a/sandbox/MicroBenchmark/NewInboxBenchmarks.cs +++ b/sandbox/MicroBenchmark/NewInboxBenchmarks.cs @@ -25,14 +25,14 @@ public class NewInboxBenchmarks [GlobalSetup] public void Setup() { - NuidWriter.TryWriteNuid(new char[100]); + Nuid.TryWriteNuid(new char[100]); } [Benchmark(Baseline = true)] [SkipLocalsInit] public bool TryWriteNuid() { - return NuidWriter.TryWriteNuid(_buf); + return Nuid.TryWriteNuid(_buf); } [Benchmark] diff --git a/src/NATS.Client.Core/Commands/ProtocolWriter.cs b/src/NATS.Client.Core/Commands/ProtocolWriter.cs index 6b7e7f00f..d85b368c2 100644 --- a/src/NATS.Client.Core/Commands/ProtocolWriter.cs +++ b/src/NATS.Client.Core/Commands/ProtocolWriter.cs @@ -48,7 +48,7 @@ public void WriteConnect(IBufferWriter writer, ClientOpts opts) BinaryPrimitives.WriteUInt64LittleEndian(span, ConnectSpace); writer.Advance(ConnectSpaceLength); - var jsonWriter = new Utf8JsonWriter(writer); + using var jsonWriter = new Utf8JsonWriter(writer); JsonSerializer.Serialize(jsonWriter, opts, JsonContext.Default.ClientOpts); span = writer.GetSpan(UInt16Length); diff --git a/src/NATS.Client.Core/Internal/SslStreamConnection.cs b/src/NATS.Client.Core/Internal/SslStreamConnection.cs index 2eef0e1f9..adeda8871 100644 --- a/src/NATS.Client.Core/Internal/SslStreamConnection.cs +++ b/src/NATS.Client.Core/Internal/SslStreamConnection.cs @@ -115,7 +115,11 @@ public void SignalDisconnected(Exception exception) public async Task AuthenticateAsClientAsync(NatsUri uri, TimeSpan timeout) { var options = await _tlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(true); + #if NETSTANDARD2_0 + if (_sslStream != null) + _sslStream.Dispose(); + _sslStream = new SslStream( innerStream: new NetworkStream(_socket, true), leaveInnerStreamOpen: false, @@ -134,6 +138,9 @@ await _sslStream.AuthenticateAsClientAsync( throw new NatsException($"TLS authentication failed", ex); } #else + if (_sslStream != null) + await _sslStream.DisposeAsync().ConfigureAwait(false); + _sslStream = new SslStream(innerStream: new NetworkStream(_socket, true)); try { diff --git a/src/NATS.Client.Core/Internal/WebSocketConnection.cs b/src/NATS.Client.Core/Internal/WebSocketConnection.cs index 588ad82f3..74085e21e 100644 --- a/src/NATS.Client.Core/Internal/WebSocketConnection.cs +++ b/src/NATS.Client.Core/Internal/WebSocketConnection.cs @@ -39,11 +39,12 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken) /// /// Connect with Timeout. When failed, Dispose this connection. /// - public async ValueTask ConnectAsync(Uri uri, TimeSpan timeout) + public async ValueTask ConnectAsync(Uri uri, NatsOpts opts) { - using var cts = new CancellationTokenSource(timeout); + using var cts = new CancellationTokenSource(opts.ConnectTimeout); try { + await InvokeCallbackForClientWebSocketOptionsAsync(opts, uri, _socket.Options, cts.Token).ConfigureAwait(false); await _socket.ConnectAsync(uri, cts.Token).ConfigureAwait(false); } catch (Exception ex) @@ -130,4 +131,13 @@ public void SignalDisconnected(Exception exception) { _waitForClosedSource.TrySetResult(exception); } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private async Task InvokeCallbackForClientWebSocketOptionsAsync(NatsOpts opts, Uri uri, ClientWebSocketOptions options, CancellationToken token) + { + if (opts.ConfigureWebSocketOpts != null) + { + await opts.ConfigureWebSocketOpts(uri, options, token).ConfigureAwait(false); + } + } } diff --git a/src/NATS.Client.Core/NKeyPair.cs b/src/NATS.Client.Core/NKeyPair.cs index e4c7f48c2..886b7e80d 100644 --- a/src/NATS.Client.Core/NKeyPair.cs +++ b/src/NATS.Client.Core/NKeyPair.cs @@ -72,6 +72,7 @@ public byte[] Sign(byte[] src) public void Dispose() { Dispose(true); + GC.SuppressFinalize(this); } /// @@ -269,7 +270,7 @@ internal static string Encode(byte prefixbyte, bool seed, byte[] src) if (src.Length != 32) throw new NatsException("Invalid seed size"); - var stream = new MemoryStream(); + using var stream = new MemoryStream(); if (seed) { diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index b9b058459..5c838116f 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -112,12 +112,12 @@ static void WriteBuffer(Span buffer, (string prefix, int pfxLen) state) state.prefix.AsSpan().CopyTo(buffer); buffer[state.prefix.Length] = '.'; var remaining = buffer.Slice(state.pfxLen); - var didWrite = NuidWriter.TryWriteNuid(remaining); + var didWrite = Nuid.TryWriteNuid(remaining); Debug.Assert(didWrite, "didWrite"); } var separatorLength = prefix.Length > 0 ? 1 : 0; - var totalLength = prefix.Length + (int)NuidWriter.NuidLength + separatorLength; + var totalLength = prefix.Length + (int)Nuid.NuidLength + separatorLength; var totalPrefixLength = prefix.Length + separatorLength; #if NET6_0_OR_GREATER || NETSTANDARD2_1 diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 93d2ab021..f08315b8f 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -318,7 +318,7 @@ private async ValueTask InitialConnectAsync() if (uri.IsWebSocket) { var conn = new WebSocketConnection(); - await conn.ConnectAsync(uri.Uri, Opts.ConnectTimeout).ConfigureAwait(false); + await conn.ConnectAsync(uri.Uri, Opts).ConfigureAwait(false); _socket = conn; } else @@ -606,7 +606,7 @@ private async void ReconnectLoop() { _logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect using WebSocket {Url} [{ReconnectCount}]", url, reconnectCount); var conn = new WebSocketConnection(); - await conn.ConnectAsync(url.Uri, Opts.ConnectTimeout).ConfigureAwait(false); + await conn.ConnectAsync(url.Uri, Opts).ConfigureAwait(false); _socket = conn; } else @@ -848,7 +848,7 @@ private async void StartPingTimer(CancellationToken cancellationToken) _logger.LogDebug(NatsLogEvents.Connection, "Starting ping timer"); - var periodicTimer = new PeriodicTimer(Opts.PingInterval); + using var periodicTimer = new PeriodicTimer(Opts.PingInterval); ResetPongCount(); try { diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index c9e2f2f00..49b1ef1d9 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -1,3 +1,4 @@ +using System.Net.WebSockets; using System.Text; using System.Threading.Channels; using Microsoft.Extensions.Logging; @@ -114,6 +115,28 @@ public sealed record NatsOpts /// public BoundedChannelFullMode SubPendingChannelFullMode { get; init; } = BoundedChannelFullMode.DropNewest; + /// + /// An optional async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections. + /// + /// + /// This can be used to set authorization header and other HTTP header values. + /// Note: Setting HTTP header values is not supported by Blazor WebAssembly as the underlying browser implementation does not support adding headers to a WebSocket. + /// The callback's execution time contributes to the connection establishment subject to the . + /// Implementors should use the passed CancellationToken for async operations called by this handler. + /// + /// + /// await using var nats = new NatsConnection(new NatsOpts + /// { + /// Url = "ws://localhost:8080", + /// ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => + /// { + /// clientWsOpts.SetRequestHeader("authorization", $"Bearer MY_TOKEN"); + /// return ValueTask.CompletedTask; + /// }, + /// }); + /// + public Func? ConfigureWebSocketOpts { get; init; } = null; + internal NatsUri[] GetSeedUris() { var urls = Url.Split(','); diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index d0bdf318a..dbbcb7606 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -314,7 +314,7 @@ protected void ResetIdleTimeout() if (_startUpTimeoutTimer != null) { _startUpTimeoutTimer.Change(dueTime: Timeout.InfiniteTimeSpan, period: Timeout.InfiniteTimeSpan); - _startUpTimeoutTimer = null; + _startUpTimeoutTimer.Dispose(); } } diff --git a/src/NATS.Client.Core/Internal/NuidWriter.cs b/src/NATS.Client.Core/Nuid.cs similarity index 87% rename from src/NATS.Client.Core/Internal/NuidWriter.cs rename to src/NATS.Client.Core/Nuid.cs index e8a578543..8994f3f28 100644 --- a/src/NATS.Client.Core/Internal/NuidWriter.cs +++ b/src/NATS.Client.Core/Nuid.cs @@ -6,10 +6,17 @@ using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random; #endif -namespace NATS.Client.Core.Internal; - +namespace NATS.Client.Core; + +/// +/// Represents a unique identifier generator. +/// +/// +/// The Nuid class generates unique identifiers that can be used +/// to ensure uniqueness in distributed systems. +/// [SkipLocalsInit] -internal sealed class NuidWriter +public sealed class Nuid { // NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code // however, they were changed to uint to fix the compilation error for IL2CPP Unity projects. @@ -25,13 +32,13 @@ internal sealed class NuidWriter private const int MaxIncrement = 333; [ThreadStatic] - private static NuidWriter? _writer; + private static Nuid? _writer; private char[] _prefix; private ulong _increment; private ulong _sequential; - private NuidWriter() + private Nuid() { Refresh(out _); } @@ -42,25 +49,30 @@ private NuidWriter() private static ReadOnlySpan Digits => "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; #endif - public static bool TryWriteNuid(Span nuidBuffer) + /// + /// Generates a new NATS unique identifier (NUID). + /// + /// A new NUID as a string. + /// Thrown when unable to generate the NUID. + public static string NewNuid() { - if (_writer is not null) + Span buffer = stackalloc char[(int)NuidLength]; + if (TryWriteNuid(buffer)) { - return _writer.TryWriteNuidCore(nuidBuffer); + return buffer.ToString(); } - return InitAndWrite(nuidBuffer); + throw new InvalidOperationException("Internal error: can't generate nuid"); } - public static string NewNuid() + internal static bool TryWriteNuid(Span nuidBuffer) { - Span buffer = stackalloc char[22]; - if (TryWriteNuid(buffer)) + if (_writer is not null) { - return buffer.ToString(); + return _writer.TryWriteNuidCore(nuidBuffer); } - throw new InvalidOperationException("Internal error: can't generate nuid"); + return InitAndWrite(nuidBuffer); } private static bool TryWriteNuidCore(Span buffer, Span prefix, ulong sequential) @@ -140,7 +152,7 @@ private static char[] GetPrefix(RandomNumberGenerator? rng = null) [MethodImpl(MethodImplOptions.NoInlining)] private static bool InitAndWrite(Span span) { - _writer = new NuidWriter(); + _writer = new Nuid(); return _writer.TryWriteNuidCore(span); } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs index c7eaf1e7a..7c24f0e74 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs @@ -2,7 +2,7 @@ namespace NATS.Client.JetStream.Internal; -internal class NatsJSNotificationChannel : IAsyncDisposable +internal sealed class NatsJSNotificationChannel : IAsyncDisposable { private readonly Func _notificationHandler; private readonly Action _exceptionHandler; diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index e4b39a96d..14d1d9bff 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -141,6 +141,12 @@ public async ValueTask DisposeAsync() { _nats.ConnectionDisconnected -= OnDisconnected; +#if NETSTANDARD2_0 + _timer.Dispose(); +#else + await _timer.DisposeAsync().ConfigureAwait(false); +#endif + // For correctly Dispose, // first stop the consumer Creation operations and then the command execution operations. // It is necessary that all consumerCreation operations have time to complete before command CommandLoop stop @@ -390,7 +396,7 @@ await _context.CreateOrUpdateConsumerAsync( private string NewNuid() { Span buffer = stackalloc char[22]; - if (NuidWriter.TryWriteNuid(buffer)) + if (Nuid.TryWriteNuid(buffer)) { return buffer.ToString(); } diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index 45a78642b..d638aff86 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -1,5 +1,5 @@ using System.Runtime.CompilerServices; -using NATS.Client.Core.Internal; +using NATS.Client.Core; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -228,7 +228,7 @@ internal ValueTask CreateOrderedConsumerInternalAsync( request.Config.FilterSubjects = opts.FilterSubjects; } - var name = NuidWriter.NewNuid(); + var name = Nuid.NewNuid(); var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}.{name}"; return JSRequestResponseAsync( diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 036d19965..b5599c310 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -24,7 +24,7 @@ public NatsKVWatchCommandMsg() public NatsJSMsg Msg { get; init; } = default; } -internal class NatsKVWatcher : IAsyncDisposable +internal sealed class NatsKVWatcher : IAsyncDisposable { private readonly ILogger _logger; private readonly bool _debug; @@ -144,6 +144,12 @@ public async ValueTask DisposeAsync() await _sub.DisposeAsync(); } +#if NETSTANDARD2_0 + _timer.Dispose(); +#else + await _timer.DisposeAsync().ConfigureAwait(false); +#endif + _consumerCreateChannel.Writer.TryComplete(); _commandChannel.Writer.TryComplete(); _entryChannel.Writer.TryComplete(); @@ -444,7 +450,7 @@ private async ValueTask CreatePushConsumer(string origin) private string NewNuid() { Span buffer = stackalloc char[22]; - if (NuidWriter.TryWriteNuid(buffer)) + if (Nuid.TryWriteNuid(buffer)) { return buffer.ToString(); } diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 4416dab2b..5e6c8006a 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -52,7 +52,7 @@ internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSCon /// Object value as a byte array. public async ValueTask GetBytesAsync(string key, CancellationToken cancellationToken = default) { - var memoryStream = new MemoryStream(); + using var memoryStream = new MemoryStream(); await GetAsync(key, memoryStream, cancellationToken: cancellationToken).ConfigureAwait(false); return memoryStream.ToArray(); } @@ -707,7 +707,7 @@ private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cance private string NewNuid() { Span buffer = stackalloc char[22]; - if (NuidWriter.TryWriteNuid(buffer)) + if (Nuid.TryWriteNuid(buffer)) { return buffer.ToString(); } diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index 417ad5348..08f4379b4 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -34,7 +34,7 @@ public class NatsSvcServer : INatsSvcServer public NatsSvcServer(NatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken) { _logger = nats.Opts.LoggerFactory.CreateLogger(); - _id = NuidWriter.NewNuid(); + _id = Nuid.NewNuid(); _nats = nats; _config = config; _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); diff --git a/src/NATS.Client.Simplified/NATS.Client.Simplified.csproj b/src/NATS.Client.Simplified/NATS.Client.Simplified.csproj new file mode 100644 index 000000000..a013c71fd --- /dev/null +++ b/src/NATS.Client.Simplified/NATS.Client.Simplified.csproj @@ -0,0 +1,23 @@ + + + + netstandard2.0;netstandard2.1;net6.0;net8.0 + enable + enable + true + + + pubsub;messaging + NATS simplified client for .NET + + true + NATS.Net + + + + + + + + + \ No newline at end of file diff --git a/src/NATS.Client/NatsClient.cs b/src/NATS.Client.Simplified/NatsClient.cs similarity index 99% rename from src/NATS.Client/NatsClient.cs rename to src/NATS.Client.Simplified/NatsClient.cs index 0db53d9f5..eb489d2b3 100644 --- a/src/NATS.Client/NatsClient.cs +++ b/src/NATS.Client.Simplified/NatsClient.cs @@ -1,7 +1,7 @@ using System.Threading.Channels; using NATS.Client.Core; -namespace NATS.Client; +namespace NATS.Net; /// /// Represents a NATS client that provides methods for interacting with NATS server. diff --git a/src/NATS.Client/NatsClientDefaultSerializer.cs b/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs similarity index 97% rename from src/NATS.Client/NatsClientDefaultSerializer.cs rename to src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs index fecc12a9a..811c9571c 100644 --- a/src/NATS.Client/NatsClientDefaultSerializer.cs +++ b/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs @@ -1,7 +1,7 @@ using NATS.Client.Core; using NATS.Client.Serializers.Json; -namespace NATS.Client; +namespace NATS.Net; /// /// Default serializer interface for NATS messages. diff --git a/src/NATS.Client/NatsClientDefaultSerializerRegistry.cs b/src/NATS.Client.Simplified/NatsClientDefaultSerializerRegistry.cs similarity index 97% rename from src/NATS.Client/NatsClientDefaultSerializerRegistry.cs rename to src/NATS.Client.Simplified/NatsClientDefaultSerializerRegistry.cs index 0b4a780bc..5b4214514 100644 --- a/src/NATS.Client/NatsClientDefaultSerializerRegistry.cs +++ b/src/NATS.Client.Simplified/NatsClientDefaultSerializerRegistry.cs @@ -1,6 +1,6 @@ using NATS.Client.Core; -namespace NATS.Client; +namespace NATS.Net; /// /// Default implementation of the INatsSerializerRegistry interface. diff --git a/src/NATS.Client/NATS.Client.csproj b/src/NATS.Client/NATS.Client.csproj deleted file mode 100644 index 834a4f053..000000000 --- a/src/NATS.Client/NATS.Client.csproj +++ /dev/null @@ -1,25 +0,0 @@ - - - - netstandard2.0;netstandard2.1;net6.0;net8.0 - enable - enable - true - - - pubsub;messaging - NATS client for .NET - - - false - - - - - - - - - diff --git a/src/NATS.Net/NATS.Net.csproj b/src/NATS.Net/NATS.Net.csproj index 8e0a9b422..f2ec554c2 100644 --- a/src/NATS.Net/NATS.Net.csproj +++ b/src/NATS.Net/NATS.Net.csproj @@ -9,7 +9,7 @@ pubsub;messaging - NATS client for modern .NET. + NATS client for modern .NET true @@ -19,7 +19,9 @@ + + diff --git a/tests/NATS.Client.Core.Tests/NuidWriterTests.cs b/tests/NATS.Client.Core.Tests/NuidTests.cs similarity index 86% rename from tests/NATS.Client.Core.Tests/NuidWriterTests.cs rename to tests/NATS.Client.Core.Tests/NuidTests.cs index 77d0e4f50..4a7a84a03 100644 --- a/tests/NATS.Client.Core.Tests/NuidWriterTests.cs +++ b/tests/NATS.Client.Core.Tests/NuidTests.cs @@ -5,13 +5,13 @@ namespace NATS.Client.Core.Tests; -public class NuidWriterTests +public class NuidTests { private static readonly Regex NuidRegex = new("[A-z0-9]{22}"); private readonly ITestOutputHelper _outputHelper; - public NuidWriterTests(ITestOutputHelper outputHelper) + public NuidTests(ITestOutputHelper outputHelper) { _outputHelper = outputHelper; } @@ -42,7 +42,7 @@ public void GetNextNuid_ReturnsNuidOfLength22_Char() Span buffer = stackalloc char[44]; // Act - var result = NuidWriter.TryWriteNuid(buffer); + var result = Nuid.TryWriteNuid(buffer); // Assert ReadOnlySpan lower = buffer.Slice(0, 22); @@ -59,10 +59,10 @@ public void GetNextNuid_ReturnsNuidOfLength22_Char() public void GetNextNuid_BufferToShort_False_Char() { // Arrange - Span nuid = stackalloc char[(int)NuidWriter.NuidLength - 1]; + Span nuid = stackalloc char[(int)Nuid.NuidLength - 1]; // Act - var result = NuidWriter.TryWriteNuid(nuid); + var result = Nuid.TryWriteNuid(nuid); // Assert Assert.False(result); @@ -77,8 +77,8 @@ public void GetNextNuid_ReturnsDifferentNuidEachTime_Char() Span secondNuid = stackalloc char[22]; // Act - var result = NuidWriter.TryWriteNuid(firstNuid); - result &= NuidWriter.TryWriteNuid(secondNuid); + var result = Nuid.TryWriteNuid(firstNuid); + result &= Nuid.TryWriteNuid(secondNuid); // Assert Assert.False(firstNuid.SequenceEqual(secondNuid)); @@ -93,8 +93,8 @@ public void GetNextNuid_PrefixIsConstant_Char() Span secondNuid = stackalloc char[22]; // Act - var result = NuidWriter.TryWriteNuid(firstNuid); - result &= NuidWriter.TryWriteNuid(secondNuid); + var result = Nuid.TryWriteNuid(firstNuid); + result &= Nuid.TryWriteNuid(secondNuid); // Assert Assert.True(result); @@ -108,7 +108,7 @@ public void GetNextNuid_ContainsOnlyValidCharacters_Char() Span nuid = stackalloc char[22]; // Act - var result = NuidWriter.TryWriteNuid(nuid); + var result = Nuid.TryWriteNuid(nuid); // Assert Assert.True(result); @@ -129,8 +129,8 @@ public void GetNextNuid_PrefixRenewed_Char() var maxSequential = 839299365868340224ul - increment - 1; SetSequentialAndIncrement(maxSequential, increment); - result = NuidWriter.TryWriteNuid(firstNuid); - result &= NuidWriter.TryWriteNuid(secondNuid); + result = Nuid.TryWriteNuid(firstNuid); + result &= Nuid.TryWriteNuid(secondNuid); }); executionThread.Start(); @@ -148,7 +148,7 @@ public void GetPrefix_PrefixAsExpected() var rngBytes = new byte[12] { 0, 1, 2, 3, 4, 5, 6, 7, 11, 253, 254, 255 }; DeterministicRng rng = new(new Queue(new[] { rngBytes, rngBytes })); - var mi = typeof(NuidWriter).GetMethod("GetPrefix", BindingFlags.Static | BindingFlags.NonPublic); + var mi = typeof(Nuid).GetMethod("GetPrefix", BindingFlags.Static | BindingFlags.NonPublic); var mGetPrefix = mi!.CreateDelegate>(); // Act @@ -166,7 +166,7 @@ public void InitAndWrite_Char() Thread t = new(() => { var buffer = new char[22]; - var didWrite = NuidWriter.TryWriteNuid(buffer); + var didWrite = Nuid.TryWriteNuid(buffer); var isMatch = NuidRegex.IsMatch(new string(buffer)); Volatile.Write(ref completedSuccessfully, didWrite && isMatch); @@ -192,7 +192,7 @@ public void DifferentThreads_DifferentPrefixes() Thread t = new(() => { var buffer = new char[22]; - NuidWriter.TryWriteNuid(buffer); + Nuid.TryWriteNuid(buffer); nuids.Enqueue((buffer, Environment.CurrentManagedThreadId)); }); t.Start(); @@ -226,7 +226,7 @@ public void AllNuidsAreUnique() for (var i = 0; i < count; i++) { - var didWrite = NuidWriter.TryWriteNuid(buffer); + var didWrite = Nuid.TryWriteNuid(buffer); if (!didWrite) { @@ -259,7 +259,7 @@ public void AllNuidsAreUnique_SmallSequentials() for (var i = 0; i < 2048; i++) { - if (!NuidWriter.TryWriteNuid(buffer)) + if (!Nuid.TryWriteNuid(buffer)) { writeFailed = true; return; @@ -301,7 +301,7 @@ public void AllNuidsAreUnique_ZeroSequential() Span buffer = new char[22]; for (var i = 0; i < 100_000_000; i++) { - if (!NuidWriter.TryWriteNuid(buffer)) + if (!Nuid.TryWriteNuid(buffer)) { writeFailed = true; return; @@ -333,11 +333,11 @@ public void Only_last_few_digits_change() const int tail = 4; const int head = 22 - tail; - var nuid1 = NuidWriter.NewNuid(); + var nuid1 = Nuid.NewNuid(); var head1 = nuid1.Substring(0, head); var tail1 = nuid1.Substring(head, tail); - var nuid2 = NuidWriter.NewNuid(); + var nuid2 = Nuid.NewNuid(); var head2 = nuid2.Substring(0, head); var tail2 = nuid2.Substring(head, tail); @@ -350,17 +350,17 @@ public void Only_last_few_digits_change() // on separate threads (distinct NuidWriter instances) only. private static void SetSequentialAndIncrement(ulong sequential, ulong increment) { - var didWrite = NuidWriter.TryWriteNuid(new char[128]); + var didWrite = Nuid.TryWriteNuid(new char[128]); Assert.True(didWrite, "didWrite"); - var fInstance = typeof(NuidWriter).GetField("_writer", BindingFlags.Static | BindingFlags.NonPublic); + var fInstance = typeof(Nuid).GetField("_writer", BindingFlags.Static | BindingFlags.NonPublic); var instance = fInstance!.GetValue(null); - var fSequential = typeof(NuidWriter).GetField("_sequential", BindingFlags.Instance | BindingFlags.NonPublic); + var fSequential = typeof(Nuid).GetField("_sequential", BindingFlags.Instance | BindingFlags.NonPublic); fSequential!.SetValue(instance, sequential); - var fIncrement = typeof(NuidWriter).GetField("_increment", BindingFlags.Instance | BindingFlags.NonPublic); + var fIncrement = typeof(Nuid).GetField("_increment", BindingFlags.Instance | BindingFlags.NonPublic); fIncrement!.SetValue(instance, increment); } diff --git a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs new file mode 100644 index 000000000..fce13df15 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs @@ -0,0 +1,325 @@ +using System.Net; +using Microsoft.Extensions.Logging; +using NATS.Client.TestUtilities; + +namespace NATS.Client.Core.Tests; + +public class WebSocketOptionsTest +{ + private readonly List _logs = new(); + + // Modeled after similar test in SendBufferTest.cs which also uses the MockServer. + [Fact] + public async Task MockWebsocketServer_PubSubWithCancelAndReconnect_ShouldCallbackTwice() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + List pubs = new(); + await using var server = new MockServer( + handler: (client, cmd) => + { + if (cmd.Name == "PUB") + { + lock (pubs) + pubs.Add($"PUB {cmd.Subject}"); + } + + if (cmd is { Name: "PUB", Subject: "close" }) + { + client.Close(); + } + + return Task.CompletedTask; + }, + Log, + info: $"{{\"max_payload\":{1024 * 4}}}", + cancellationToken: cts.Token); + + await using var wsServer = new WebSocketMockServer( + server.Url, + connectHandler: (httpContext) => + { + return true; + }, + Log, + cts.Token); + + Log("__________________________________"); + + var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => + { + Log($"[NC] {m.Message}"); + }); + + var tokenCount = 0; + await using var nats = new NatsConnection(new NatsOpts + { + Url = wsServer.WebSocketUrl, + LoggerFactory = testLogger, + ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => + { + tokenCount++; + Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); + clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); + return ValueTask.CompletedTask; + }, + }); + + Log($"[C] connect {server.Url}"); + await nats.ConnectAsync(); + + Log($"[C] ping"); + var rtt = await nats.PingAsync(cts.Token); + Log($"[C] ping rtt={rtt}"); + + Log($"[C] publishing x1..."); + await nats.PublishAsync("x1", "x", cancellationToken: cts.Token); + + // we will close the connection in mock server when we receive subject "close" + Log($"[C] publishing close (4KB)..."); + var pubTask = nats.PublishAsync("close", new byte[1024 * 4], cancellationToken: cts.Token).AsTask(); + + await pubTask.WaitAsync(cts.Token); + + for (var i = 1; i <= 10; i++) + { + try + { + await nats.PingAsync(cts.Token); + break; + } + catch (OperationCanceledException) + { + if (i == 10) + throw; + await Task.Delay(10 * i, cts.Token); + } + } + + Log($"[C] publishing x2..."); + await nats.PublishAsync("x2", "x", cancellationToken: cts.Token); + + Log($"[C] flush..."); + await nats.PingAsync(cts.Token); + + // Look for logs like the following: + // [WS] Received WebSocketRequest with authorization header: Bearer TOKEN_2 + var tokens = GetLogs().Where(l => l.Contains("Bearer")).ToList(); + Assert.Equal(2, tokens.Count); + var token = tokens.Where(t => t.Contains("TOKEN_1")); + Assert.Single(token); + token = tokens.Where(t => t.Contains("TOKEN_2")); + Assert.Single(token); + + lock (pubs) + { + Assert.Equal(3, pubs.Count); + Assert.Equal("PUB x1", pubs[0]); + Assert.Equal("PUB close", pubs[1]); + Assert.Equal("PUB x2", pubs[2]); + } + } + + [Fact] + public async Task WebSocketRespondsWithHttpError_ShouldThrowNatsException() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = new MockServer( + handler: (client, cmd) => + { + return Task.CompletedTask; + }, + Log, + info: $"{{\"max_payload\":{1024 * 4}}}", + cancellationToken: cts.Token); + + await using var wsServer = new WebSocketMockServer( + server.Url, + connectHandler: (httpContext) => + { + httpContext.Response.StatusCode = (int)HttpStatusCode.Forbidden; + return false; // reject connection + }, + Log, + cts.Token); + + Log("__________________________________"); + + var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => + { + Log($"[NC] {m.Message}"); + }); + + await using var nats = new NatsConnection(new NatsOpts + { + Url = wsServer.WebSocketUrl, + LoggerFactory = testLogger, + }); + + Log($"[C] connect {server.Url}"); + + // expect: NATS.Client.Core.NatsException : can not connect uris: ws://127.0.0.1:5004 + var exception = await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); + } + + [Fact] + public async Task HttpErrorDuringReconnect_ShouldContinueToReconnect() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = new MockServer( + handler: (client, cmd) => + { + if (cmd is { Name: "PUB", Subject: "close" }) + { + client.Close(); + } + + return Task.CompletedTask; + }, + Log, + info: $"{{\"max_payload\":{1024 * 4}}}", + cancellationToken: cts.Token); + + var tokenCount = 0; + + await using var wsServer = new WebSocketMockServer( + server.Url, + connectHandler: (httpContext) => + { + var token = httpContext.Request.Headers.Authorization; + if (token.Contains("Bearer TOKEN_1") || token.Contains("Bearer TOKEN_4")) + { + return true; + } + else + { + httpContext.Response.StatusCode = (int)HttpStatusCode.Forbidden; + return false; // reject connection + } + }, + Log, + cts.Token); + + Log("__________________________________"); + + var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => + { + Log($"[NC] {m.Message}"); + }); + + await using var nats = new NatsConnection(new NatsOpts + { + Url = wsServer.WebSocketUrl, + LoggerFactory = testLogger, + ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => + { + tokenCount++; + Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); + clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); + return ValueTask.CompletedTask; + }, + }); + + Log($"[C] connect {server.Url}"); + + // close connection and trigger reconnect + Log($"[C] publishing close ..."); + await nats.PublishAsync("close", "x", cancellationToken: cts.Token); + + for (var i = 1; i <= 10; i++) + { + try + { + await nats.PingAsync(cts.Token); + break; + } + catch (OperationCanceledException) + { + if (i == 10) + throw; + await Task.Delay(100 * i, cts.Token); + } + } + + Log($"[C] publishing reconnected"); + await nats.PublishAsync("reconnected", "rc", cancellationToken: cts.Token); + await Task.Delay(100); // short delay to allow log to be collected for reconnect + + // Expect to see in logs: + // 1st callback and TOKEN_1 + // Initial Connect + // 2nd callback with rejected TOKEN_2 + // NC reconnect + // 3rd callback with rejected TOKEN_3 + // NC reconnect + // 4th callback with good TOKEN_4 + // Successful Publish after reconnect + + // 4 tokens + var logs = GetLogs(); + var tokens = logs.Where(l => l.Contains("Bearer")).ToList(); + Assert.Equal(4, tokens.Count); + Assert.Single(tokens.Where(t => t.Contains("TOKEN_1"))); + Assert.Single(tokens.Where(t => t.Contains("TOKEN_2"))); + Assert.Single(tokens.Where(t => t.Contains("TOKEN_3"))); + Assert.Single(tokens.Where(t => t.Contains("TOKEN_4"))); + + // 2 errors in NATS.Client triggering the reconnect + var failures = logs.Where(l => l.Contains("[NC] Failed to connect NATS")); + Assert.Equal(2, failures.Count()); + + // 2 connects in MockServer + var connects = logs.Where(l => l.Contains("RCV CONNECT")); + Assert.Equal(2, failures.Count()); + + // 1 reconnect in MockServer + var reconnectPublish = logs.Where(l => l.Contains("RCV PUB reconnected")); + Assert.Single(reconnectPublish); + } + + [Fact] + public async Task ExceptionThrownInCallback_ShouldThrowNatsException() + { + // void Log(string m) => TmpFileLogger.Log(m); + List logs = new(); + void Log(string m) + { + lock (logs) + logs.Add(m); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => + { + Log($"[NC] {m.Message}"); + }); + + await using var nats = new NatsConnection(new NatsOpts + { + Url = "ws://localhost:1234", + LoggerFactory = testLogger, + ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => + { + throw new Exception("Error in callback"); + }, + }); + + // expect: NATS.Client.Core.NatsException : can not connect uris: ws://localhost:1234 + var exception = await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); + } + + private void Log(string m) + { + lock (_logs) + _logs.Add(m); + } + + private List GetLogs() + { + lock (_logs) + return _logs.ToList(); + } +} diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 0f608099d..01c973cf9 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -611,7 +611,7 @@ public async Task Validate_keys() [Fact] public async Task TestDirectMessageRepublishedSubject() { - var streamBucketName = "sb-" + NuidWriter.NewNuid(); + var streamBucketName = "sb-" + Nuid.NewNuid(); var subject = "test"; var streamSubject = subject + ".>"; var publishSubject1 = subject + ".one"; diff --git a/tests/NATS.Client.Tests/ClientTest.cs b/tests/NATS.Client.Simplified.Tests/ClientTest.cs similarity index 99% rename from tests/NATS.Client.Tests/ClientTest.cs rename to tests/NATS.Client.Simplified.Tests/ClientTest.cs index 226a30140..28b6875e5 100644 --- a/tests/NATS.Client.Tests/ClientTest.cs +++ b/tests/NATS.Client.Simplified.Tests/ClientTest.cs @@ -1,8 +1,9 @@ using System.Text; using NATS.Client.Core.Tests; +using NATS.Net; // ReSharper disable AccessToDisposedClosure -namespace NATS.Client.Tests; +namespace NATS.Client.Simplified.Tests; public class ClientTest { diff --git a/tests/NATS.Client.Tests/NATS.Client.Tests.csproj b/tests/NATS.Client.Simplified.Tests/NATS.Client.Simplified.Tests.csproj similarity index 94% rename from tests/NATS.Client.Tests/NATS.Client.Tests.csproj rename to tests/NATS.Client.Simplified.Tests/NATS.Client.Simplified.Tests.csproj index c64f26ef5..b505b1ca8 100644 --- a/tests/NATS.Client.Tests/NATS.Client.Tests.csproj +++ b/tests/NATS.Client.Simplified.Tests/NATS.Client.Simplified.Tests.csproj @@ -35,7 +35,7 @@ - + diff --git a/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs b/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs index c04100c24..46f39b375 100644 --- a/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs +++ b/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs @@ -47,7 +47,12 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except public bool IsEnabled(LogLevel logLevel) => logLevel >= level; +#if NET8_0_OR_GREATER + public IDisposable? BeginScope(TState state) + where TState : notnull => new NullDisposable(); +#else public IDisposable BeginScope(TState state) => new NullDisposable(); +#endif private class NullDisposable : IDisposable { diff --git a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj index 4bf81b8e4..b9421f412 100644 --- a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj +++ b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj @@ -1,4 +1,4 @@ - + net6.0;net8.0 diff --git a/tests/NATS.Client.TestUtilities/OutputHelperLogger.cs b/tests/NATS.Client.TestUtilities/OutputHelperLogger.cs index a487ab562..75f5285e4 100644 --- a/tests/NATS.Client.TestUtilities/OutputHelperLogger.cs +++ b/tests/NATS.Client.TestUtilities/OutputHelperLogger.cs @@ -41,7 +41,12 @@ public Logger(string categoryName, ITestOutputHelper testOutputHelper, NatsServe _natsServer = natsServer; } +#if NET8_0_OR_GREATER + public IDisposable? BeginScope(TState state) + where TState : notnull +#else public IDisposable BeginScope(TState state) +#endif { return NullDisposable.Instance; } diff --git a/tests/NATS.Client.TestUtilities/WebSocketMockServer.cs b/tests/NATS.Client.TestUtilities/WebSocketMockServer.cs new file mode 100644 index 000000000..33779d500 --- /dev/null +++ b/tests/NATS.Client.TestUtilities/WebSocketMockServer.cs @@ -0,0 +1,138 @@ +using System.Net; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Text; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; + +namespace NATS.Client.TestUtilities; + +public class WebSocketMockServer : IAsyncDisposable +{ + private readonly string _natsServerUrl; + private readonly Action _logger; + private readonly CancellationTokenSource _cts; + private readonly Task _wsServerTask; + + public WebSocketMockServer( + string natsServerUrl, + Func connectHandler, + Action logger, + CancellationToken cancellationToken = default) + { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cancellationToken = _cts.Token; + _natsServerUrl = natsServerUrl; + _logger = logger; + WebSocketPort = 5004; + + _wsServerTask = RunWsServer(connectHandler, cancellationToken); + } + + public int WebSocketPort { get; } + + public string WebSocketUrl => $"ws://127.0.0.1:{WebSocketPort}"; + + public async ValueTask DisposeAsync() + { + _cts.Cancel(); + + try + { + await _wsServerTask.WaitAsync(TimeSpan.FromSeconds(10)); + } + catch (TimeoutException) + { + } + catch (OperationCanceledException) + { + } + catch (SocketException) + { + } + catch (IOException) + { + } + } + + private Task RunWsServer(Func connectHandler, CancellationToken ct) + { + var wsServerTask = WebHost.CreateDefaultBuilder() + .SuppressStatusMessages(true) + .ConfigureLogging(logging => logging.ClearProviders()) + .ConfigureKestrel(options => options.ListenLocalhost(WebSocketPort)) // unfortunately need to hard-code WebSocket port because ListenLocalhost() doesn't support picking a dynamic port + .Configure(app => app.UseWebSockets().Run(async context => + { + _logger($"[WS] Received WebSocketRequest with authorization header: {context.Request.Headers.Authorization}"); + + if (!connectHandler(context)) + return; + + if (context.WebSockets.IsWebSocketRequest) + { + using var webSocket = await context.WebSockets.AcceptWebSocketAsync(); + await HandleRequestResponse(webSocket, ct); + } + })) + .Build().RunAsync(ct); + + return wsServerTask; + } + + private async Task HandleRequestResponse(WebSocket webSocket, CancellationToken ct) + { + var wsRequestBuffer = new byte[1024 * 4]; + using TcpClient tcpClient = new(); + var endpoint = IPEndPoint.Parse(_natsServerUrl); + await tcpClient.ConnectAsync(endpoint); + await using var stream = tcpClient.GetStream(); + + // send responses received from NATS mock server back to WebSocket client + var respondBackTask = Task.Run(async () => + { + try + { + var tcpResponseBuffer = new byte[1024 * 4]; + + while (!ct.IsCancellationRequested) + { + var received = await stream.ReadAsync(tcpResponseBuffer, ct); + + var message = Encoding.UTF8.GetString(tcpResponseBuffer, 0, received); + + await webSocket.SendAsync( + new ArraySegment(tcpResponseBuffer, 0, received), + WebSocketMessageType.Binary, + true, + ct); + } + } + catch (Exception e) + { + // if our TCP connection with the NATS mock server breaks then close the WebSocket too. + _logger($"[WS] Exception in response task: {e.Message}"); + webSocket.Abort(); + } + }); + + // forward received message via TCP to NATS mock server + var receiveResult = await webSocket.ReceiveAsync( + new ArraySegment(wsRequestBuffer), ct); + + while (!receiveResult.CloseStatus.HasValue) + { + await stream.WriteAsync(wsRequestBuffer, 0, receiveResult.Count, ct); + + receiveResult = await webSocket.ReceiveAsync( + new ArraySegment(wsRequestBuffer), ct); + } + + await webSocket.CloseAsync( + receiveResult.CloseStatus.Value, + receiveResult.CloseStatusDescription, + CancellationToken.None); + } +} diff --git a/version.txt b/version.txt index 169fcd8bd..05e817a2f 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -2.4.0-preview.1 +2.4.0-preview.2