Skip to content

Commit

Permalink
Implement NUID (#147)
Browse files Browse the repository at this point in the history
* Add NuidWriter

* dotnet format

* Manually fix style analyzer warnings

* Fix tests

* Skip long running tests

* Fix CI build

* Remove unnecessary compare
  • Loading branch information
jasper-d authored Oct 10, 2023
1 parent 1220541 commit caada52
Show file tree
Hide file tree
Showing 9 changed files with 593 additions and 4 deletions.
1 change: 1 addition & 0 deletions sandbox/MicroBenchmark/MicroBenchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
Expand Down
51 changes: 51 additions & 0 deletions sandbox/MicroBenchmark/NewInboxBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System.Runtime.CompilerServices;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using NATS.Client.Core;
using NATS.Client.Core.Internal;

namespace MicroBenchmark;

[MemoryDiagnoser]
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net70, baseline: true)]
[SimpleJob(RuntimeMoniker.Net80)]
[SimpleJob(RuntimeMoniker.NativeAot80)]
public class NewInboxBenchmarks
{
private static readonly NatsOpts LongPrefixOpt = NatsOpts.Default
with
{
InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here",
};

private static readonly NatsConnection ConnectionDefaultPrefix = new();
private static readonly NatsConnection ConnectionLongPrefix = new(LongPrefixOpt);

private char[] _buf = new char[32];

[GlobalSetup]
public void Setup()
{
NuidWriter.TryWriteNuid(new char[100]);
}

[Benchmark(Baseline = true)]
[SkipLocalsInit]
public bool TryWriteNuid()
{
return NuidWriter.TryWriteNuid(_buf);
}

[Benchmark]
public string NewInbox_ShortPrefix()
{
return ConnectionDefaultPrefix.NewInbox();
}

[Benchmark]
public string NewInbox_LongPrefix()
{
return ConnectionLongPrefix.NewInbox();
}
}
135 changes: 135 additions & 0 deletions src/NATS.Client.Core/Internal/NuidWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Security.Cryptography;

namespace NATS.Client.Core.Internal;

[SkipLocalsInit]
internal sealed class NuidWriter
{
internal const nuint NuidLength = PrefixLength + SequentialLength;
private const nuint Base = 62;
private const ulong MaxSequential = 839299365868340224; // 62^10
private const uint PrefixLength = 12;
private const nuint SequentialLength = 10;
private const int MinIncrement = 33;
private const int MaxIncrement = 333;

[ThreadStatic]
private static NuidWriter? _writer;

private char[] _prefix;
private ulong _increment;
private ulong _sequential;

private NuidWriter()
{
Refresh(out _);
}

private static ReadOnlySpan<char> Digits => "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";

public static bool TryWriteNuid(Span<char> nuidBuffer)
{
if (_writer is not null)
{
return _writer.TryWriteNuidCore(nuidBuffer);
}

return InitAndWrite(nuidBuffer);
}

private static bool TryWriteNuidCore(Span<char> buffer, Span<char> prefix, ulong sequential)
{
if ((uint)buffer.Length < NuidLength || prefix.Length != PrefixLength)
{
return false;
}

Unsafe.CopyBlockUnaligned(ref Unsafe.As<char, byte>(ref buffer[0]), ref Unsafe.As<char, byte>(ref prefix[0]), PrefixLength * sizeof(char));

// NOTE: We must never write to digitsPtr!
ref var digitsPtr = ref MemoryMarshal.GetReference(Digits);

for (nuint i = PrefixLength; i < NuidLength; i++)
{
var digitIndex = (nuint)(sequential % Base);
Unsafe.Add(ref buffer[0], i) = Unsafe.Add(ref digitsPtr, digitIndex);
sequential /= Base;
}

return true;
}

private static uint GetIncrement()
{
return (uint)Random.Shared.Next(MinIncrement, MaxIncrement + 1);
}

private static ulong GetSequential()
{
return (ulong)Random.Shared.NextInt64(0, (long)MaxSequential + 1);
}

private static char[] GetPrefix(RandomNumberGenerator? rng = null)
{
Span<byte> randomBytes = stackalloc byte[(int)PrefixLength];

// TODO: For .NET 8+, use GetItems for better distribution
if (rng == null)
{
RandomNumberGenerator.Fill(randomBytes);
}
else
{
rng.GetBytes(randomBytes);
}

var newPrefix = new char[PrefixLength];

for (var i = 0; i < randomBytes.Length; i++)
{
var digitIndex = (int)(randomBytes[i] % Base);
newPrefix[i] = Digits[digitIndex];
}

return newPrefix;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static bool InitAndWrite(Span<char> span)
{
_writer = new NuidWriter();
return _writer.TryWriteNuidCore(span);
}

private bool TryWriteNuidCore(Span<char> nuidBuffer)
{
var sequential = _sequential += _increment;

if (sequential < MaxSequential)
{
return TryWriteNuidCore(nuidBuffer, _prefix, sequential);
}

return RefreshAndWrite(nuidBuffer);

[MethodImpl(MethodImplOptions.NoInlining)]
bool RefreshAndWrite(Span<char> buffer)
{
var prefix = Refresh(out sequential);
return TryWriteNuidCore(buffer, prefix, sequential);
}
}

[MethodImpl(MethodImplOptions.NoInlining)]
[MemberNotNull(nameof(_prefix))]
private char[] Refresh(out ulong sequential)
{
var prefix = _prefix = GetPrefix();
_increment = GetIncrement();
sequential = _sequential = GetSequential();
return prefix;
}
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N
{
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
var inboxSubject = $"{_inboxPrefix}*";
var inboxSubject = $"{_inboxPrefix}.*";
_inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this);
await SubscribeInternalAsync(
inboxSubject,
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.Core/NATS.Client.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="System.IO.Hashing" Version="6.0.1" />

<InternalsVisibleTo Include="MicroBenchmark, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="$(AssemblyName).Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="$(AssemblyName).MemoryTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NatsBenchmark, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
Expand Down
41 changes: 40 additions & 1 deletion src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

Expand All @@ -7,7 +10,7 @@ public partial class NatsConnection
private static readonly NatsSubOpts DefaultReplyOpts = new() { MaxMsgs = 1 };

/// <inheritdoc />
public string NewInbox() => $"{InboxPrefix}{Guid.NewGuid():n}";
public string NewInbox() => NewInbox(InboxPrefix);

/// <inheritdoc />
public async ValueTask<NatsMsg<TReply?>?> RequestAsync<TRequest, TReply>(
Expand Down Expand Up @@ -61,6 +64,42 @@ public partial class NatsConnection
}
}

[SkipLocalsInit]
private static string NewInbox(ReadOnlySpan<char> prefix)
{
Span<char> buffer = stackalloc char[64];
var separatorLength = prefix.Length > 0 ? 1u : 0u;
var totalLength = (uint)prefix.Length + (uint)NuidWriter.NuidLength + separatorLength;
if (totalLength <= buffer.Length)
{
buffer = buffer.Slice(0, (int)totalLength);
}
else
{
buffer = new char[totalLength];
}

var totalPrefixLength = (uint)prefix.Length + separatorLength;
if ((uint)buffer.Length > totalPrefixLength && (uint)buffer.Length > (uint)prefix.Length)
{
prefix.CopyTo(buffer);
buffer[prefix.Length] = '.';
var remaining = buffer.Slice((int)totalPrefixLength);
var didWrite = NuidWriter.TryWriteNuid(remaining);
Debug.Assert(didWrite, "didWrite");
return new string(buffer);
}

return Throw();

[DoesNotReturn]
string Throw()
{
Debug.Fail("Must not happen");
throw new InvalidOperationException("This should never be raised!");
}
}

private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts)
{
var opts = replyOpts ?? DefaultReplyOpts;
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.RequestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal async ValueTask<INatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default)
{
var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}";
var replyTo = NewInbox();

var replySerializer = replyOpts?.Serializer ?? Opts.Serializer;
var sub = new NatsSub<TReply>(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public NatsConnection(NatsOpts opts)
Counter = new ConnectionStatsCounter();
_writerState = new WriterState(opts);
_commandWriter = _writerState.CommandBuffer.Writer;
InboxPrefix = $"{opts.InboxPrefix}.{Guid.NewGuid():n}.";
InboxPrefix = NewInbox(opts.InboxPrefix);
SubscriptionManager = new SubscriptionManager(this, InboxPrefix);
_logger = opts.LoggerFactory.CreateLogger<NatsConnection>();
_clientOpts = ClientOpts.Create(Opts);
Expand Down
Loading

0 comments on commit caada52

Please sign in to comment.