Skip to content

Commit

Permalink
Merge branch 'main' into consume-max-msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Sep 9, 2024
2 parents 46186be + 323124a commit 33dc6ed
Show file tree
Hide file tree
Showing 37 changed files with 647 additions and 126 deletions.
10 changes: 2 additions & 8 deletions .github/workflows/perf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@ jobs:
steps:
- name: Install nats
run: |
rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//)
rel=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/natscli/latest | sed s/v//)
wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip
unzip nats-$rel-linux-amd64.zip
sudo mv nats-$rel-linux-amd64/nats /usr/local/bin
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down
16 changes: 2 additions & 14 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@ jobs:
steps:
- name: Install nats-server
run: |
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down Expand Up @@ -119,13 +113,7 @@ jobs:
shell: bash
run: |
mkdir tools-nats-server && cd tools-nats-server
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down
16 changes: 8 additions & 8 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Test
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Platform.Windows.Tests", "tests\NATS.Client.Platform.Windows.Tests\NATS.Client.Platform.Windows.Tests.csproj", "{A37994CC-A23A-415E-8B61-9468C7178A55}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client", "src\NATS.Client\NATS.Client.csproj", "{48F1F736-3D87-4453-B497-BD9C203B2385}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Client", "sandbox\Example.Client\Example.Client.csproj", "{A15CCDD5-B707-4142-B99A-64F0AB62318A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Tests", "tests\NATS.Client.Tests\NATS.Client.Tests.csproj", "{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Simplified.Tests", "tests\NATS.Client.Simplified.Tests\NATS.Client.Simplified.Tests.csproj", "{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Simplified", "src\NATS.Client.Simplified\NATS.Client.Simplified.csproj", "{227C88B1-0510-4010-B142-C44725578FCD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -297,10 +297,6 @@ Global
{A37994CC-A23A-415E-8B61-9468C7178A55}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.Build.0 = Release|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.Build.0 = Debug|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.ActiveCfg = Release|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.Build.0 = Release|Any CPU
{A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A15CCDD5-B707-4142-B99A-64F0AB62318A}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand All @@ -309,6 +305,10 @@ Global
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.Build.0 = Release|Any CPU
{227C88B1-0510-4010-B142-C44725578FCD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{227C88B1-0510-4010-B142-C44725578FCD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{227C88B1-0510-4010-B142-C44725578FCD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{227C88B1-0510-4010-B142-C44725578FCD}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -359,9 +359,9 @@ Global
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{B8554582-DE19-41A2-9784-9B27C9F22429} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{A37994CC-A23A-415E-8B61-9468C7178A55} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{48F1F736-3D87-4453-B497-BD9C203B2385} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{A15CCDD5-B707-4142-B99A-64F0AB62318A} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{227C88B1-0510-4010-B142-C44725578FCD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Client/Example.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client\NATS.Client.csproj" />
<ProjectReference Include="..\..\src\NATS.Net\NATS.Net.csproj" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion sandbox/Example.Client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// See https://aka.ms/new-console-template for more information

using System.Text;
using NATS.Client;
using NATS.Net;

CancellationTokenSource cts = new();

Expand Down
4 changes: 2 additions & 2 deletions sandbox/MicroBenchmark/NewInboxBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ public class NewInboxBenchmarks
[GlobalSetup]
public void Setup()
{
NuidWriter.TryWriteNuid(new char[100]);
Nuid.TryWriteNuid(new char[100]);
}

[Benchmark(Baseline = true)]
[SkipLocalsInit]
public bool TryWriteNuid()
{
return NuidWriter.TryWriteNuid(_buf);
return Nuid.TryWriteNuid(_buf);
}

[Benchmark]
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Commands/ProtocolWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void WriteConnect(IBufferWriter<byte> writer, ClientOpts opts)
BinaryPrimitives.WriteUInt64LittleEndian(span, ConnectSpace);
writer.Advance(ConnectSpaceLength);

var jsonWriter = new Utf8JsonWriter(writer);
using var jsonWriter = new Utf8JsonWriter(writer);
JsonSerializer.Serialize(jsonWriter, opts, JsonContext.Default.ClientOpts);

span = writer.GetSpan(UInt16Length);
Expand Down
7 changes: 7 additions & 0 deletions src/NATS.Client.Core/Internal/SslStreamConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ public void SignalDisconnected(Exception exception)
public async Task AuthenticateAsClientAsync(NatsUri uri, TimeSpan timeout)
{
var options = await _tlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(true);

#if NETSTANDARD2_0
if (_sslStream != null)
_sslStream.Dispose();

_sslStream = new SslStream(
innerStream: new NetworkStream(_socket, true),
leaveInnerStreamOpen: false,
Expand All @@ -134,6 +138,9 @@ await _sslStream.AuthenticateAsClientAsync(
throw new NatsException($"TLS authentication failed", ex);
}
#else
if (_sslStream != null)
await _sslStream.DisposeAsync().ConfigureAwait(false);

_sslStream = new SslStream(innerStream: new NetworkStream(_socket, true));
try
{
Expand Down
14 changes: 12 additions & 2 deletions src/NATS.Client.Core/Internal/WebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
/// <summary>
/// Connect with Timeout. When failed, Dispose this connection.
/// </summary>
public async ValueTask ConnectAsync(Uri uri, TimeSpan timeout)
public async ValueTask ConnectAsync(Uri uri, NatsOpts opts)
{
using var cts = new CancellationTokenSource(timeout);
using var cts = new CancellationTokenSource(opts.ConnectTimeout);
try
{
await InvokeCallbackForClientWebSocketOptionsAsync(opts, uri, _socket.Options, cts.Token).ConfigureAwait(false);
await _socket.ConnectAsync(uri, cts.Token).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down Expand Up @@ -130,4 +131,13 @@ public void SignalDisconnected(Exception exception)
{
_waitForClosedSource.TrySetResult(exception);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private async Task InvokeCallbackForClientWebSocketOptionsAsync(NatsOpts opts, Uri uri, ClientWebSocketOptions options, CancellationToken token)
{
if (opts.ConfigureWebSocketOpts != null)
{
await opts.ConfigureWebSocketOpts(uri, options, token).ConfigureAwait(false);
}
}
}
3 changes: 2 additions & 1 deletion src/NATS.Client.Core/NKeyPair.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public byte[] Sign(byte[] src)
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
Expand Down Expand Up @@ -269,7 +270,7 @@ internal static string Encode(byte prefixbyte, bool seed, byte[] src)
if (src.Length != 32)
throw new NatsException("Invalid seed size");

var stream = new MemoryStream();
using var stream = new MemoryStream();

if (seed)
{
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ static void WriteBuffer(Span<char> buffer, (string prefix, int pfxLen) state)
state.prefix.AsSpan().CopyTo(buffer);
buffer[state.prefix.Length] = '.';
var remaining = buffer.Slice(state.pfxLen);
var didWrite = NuidWriter.TryWriteNuid(remaining);
var didWrite = Nuid.TryWriteNuid(remaining);
Debug.Assert(didWrite, "didWrite");
}

var separatorLength = prefix.Length > 0 ? 1 : 0;
var totalLength = prefix.Length + (int)NuidWriter.NuidLength + separatorLength;
var totalLength = prefix.Length + (int)Nuid.NuidLength + separatorLength;
var totalPrefixLength = prefix.Length + separatorLength;

#if NET6_0_OR_GREATER || NETSTANDARD2_1
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private async ValueTask InitialConnectAsync()
if (uri.IsWebSocket)
{
var conn = new WebSocketConnection();
await conn.ConnectAsync(uri.Uri, Opts.ConnectTimeout).ConfigureAwait(false);
await conn.ConnectAsync(uri.Uri, Opts).ConfigureAwait(false);
_socket = conn;
}
else
Expand Down Expand Up @@ -606,7 +606,7 @@ private async void ReconnectLoop()
{
_logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect using WebSocket {Url} [{ReconnectCount}]", url, reconnectCount);
var conn = new WebSocketConnection();
await conn.ConnectAsync(url.Uri, Opts.ConnectTimeout).ConfigureAwait(false);
await conn.ConnectAsync(url.Uri, Opts).ConfigureAwait(false);
_socket = conn;
}
else
Expand Down Expand Up @@ -848,7 +848,7 @@ private async void StartPingTimer(CancellationToken cancellationToken)

_logger.LogDebug(NatsLogEvents.Connection, "Starting ping timer");

var periodicTimer = new PeriodicTimer(Opts.PingInterval);
using var periodicTimer = new PeriodicTimer(Opts.PingInterval);
ResetPongCount();
try
{
Expand Down
23 changes: 23 additions & 0 deletions src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Net.WebSockets;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -114,6 +115,28 @@ public sealed record NatsOpts
/// </remarks>
public BoundedChannelFullMode SubPendingChannelFullMode { get; init; } = BoundedChannelFullMode.DropNewest;

/// <summary>
/// An optional async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections.
/// </summary>
/// <remarks>
/// This can be used to set authorization header and other HTTP header values.
/// Note: Setting HTTP header values is not supported by Blazor WebAssembly as the underlying browser implementation does not support adding headers to a WebSocket.
/// The callback's execution time contributes to the connection establishment subject to the <see cref="ConnectTimeout"/>.
/// Implementors should use the passed CancellationToken for async operations called by this handler.
/// </remarks>
/// <example>
/// await using var nats = new NatsConnection(new NatsOpts
/// {
/// Url = "ws://localhost:8080",
/// ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) =>
/// {
/// clientWsOpts.SetRequestHeader("authorization", $"Bearer MY_TOKEN");
/// return ValueTask.CompletedTask;
/// },
/// });
/// </example>
public Func<Uri, ClientWebSocketOptions, CancellationToken, ValueTask>? ConfigureWebSocketOpts { get; init; } = null;

internal NatsUri[] GetSeedUris()
{
var urls = Url.Split(',');
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ protected void ResetIdleTimeout()
if (_startUpTimeoutTimer != null)
{
_startUpTimeoutTimer.Change(dueTime: Timeout.InfiniteTimeSpan, period: Timeout.InfiniteTimeSpan);
_startUpTimeoutTimer = null;
_startUpTimeoutTimer.Dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
#endif

namespace NATS.Client.Core.Internal;

namespace NATS.Client.Core;

/// <summary>
/// Represents a unique identifier generator.
/// </summary>
/// <remarks>
/// The <c>Nuid</c> class generates unique identifiers that can be used
/// to ensure uniqueness in distributed systems.
/// </remarks>
[SkipLocalsInit]
internal sealed class NuidWriter
public sealed class Nuid
{
// NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code
// however, they were changed to uint to fix the compilation error for IL2CPP Unity projects.
Expand All @@ -25,13 +32,13 @@ internal sealed class NuidWriter
private const int MaxIncrement = 333;

[ThreadStatic]
private static NuidWriter? _writer;
private static Nuid? _writer;

private char[] _prefix;
private ulong _increment;
private ulong _sequential;

private NuidWriter()
private Nuid()
{
Refresh(out _);
}
Expand All @@ -42,25 +49,30 @@ private NuidWriter()
private static ReadOnlySpan<char> Digits => "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
#endif

public static bool TryWriteNuid(Span<char> nuidBuffer)
/// <summary>
/// Generates a new NATS unique identifier (NUID).
/// </summary>
/// <returns>A new NUID as a string.</returns>
/// <exception cref="InvalidOperationException">Thrown when unable to generate the NUID.</exception>
public static string NewNuid()
{
if (_writer is not null)
Span<char> buffer = stackalloc char[(int)NuidLength];
if (TryWriteNuid(buffer))
{
return _writer.TryWriteNuidCore(nuidBuffer);
return buffer.ToString();
}

return InitAndWrite(nuidBuffer);
throw new InvalidOperationException("Internal error: can't generate nuid");
}

public static string NewNuid()
internal static bool TryWriteNuid(Span<char> nuidBuffer)
{
Span<char> buffer = stackalloc char[22];
if (TryWriteNuid(buffer))
if (_writer is not null)
{
return buffer.ToString();
return _writer.TryWriteNuidCore(nuidBuffer);
}

throw new InvalidOperationException("Internal error: can't generate nuid");
return InitAndWrite(nuidBuffer);
}

private static bool TryWriteNuidCore(Span<char> buffer, Span<char> prefix, ulong sequential)
Expand Down Expand Up @@ -140,7 +152,7 @@ private static char[] GetPrefix(RandomNumberGenerator? rng = null)
[MethodImpl(MethodImplOptions.NoInlining)]
private static bool InitAndWrite(Span<char> span)
{
_writer = new NuidWriter();
_writer = new Nuid();
return _writer.TryWriteNuidCore(span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace NATS.Client.JetStream.Internal;

internal class NatsJSNotificationChannel : IAsyncDisposable
internal sealed class NatsJSNotificationChannel : IAsyncDisposable
{
private readonly Func<INatsJSNotification, CancellationToken, Task> _notificationHandler;
private readonly Action<Exception> _exceptionHandler;
Expand Down
Loading

0 comments on commit 33dc6ed

Please sign in to comment.