Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement NUID #147

Merged
merged 7 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sandbox/MicroBenchmark/MicroBenchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
Expand Down
51 changes: 51 additions & 0 deletions sandbox/MicroBenchmark/NewInboxBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System.Runtime.CompilerServices;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using NATS.Client.Core;
using NATS.Client.Core.Internal;

namespace MicroBenchmark;

[MemoryDiagnoser]
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net70, baseline: true)]
[SimpleJob(RuntimeMoniker.Net80)]
[SimpleJob(RuntimeMoniker.NativeAot80)]
public class NewInboxBenchmarks
{
private static readonly NatsOpts LongPrefixOpt = NatsOpts.Default
with
{
InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here",
};

private static readonly NatsConnection ConnectionDefaultPrefix = new();
private static readonly NatsConnection ConnectionLongPrefix = new(LongPrefixOpt);

private char[] _buf = new char[32];

[GlobalSetup]
public void Setup()
{
NuidWriter.TryWriteNuid(new char[100]);
}

[Benchmark(Baseline = true)]
[SkipLocalsInit]
public bool TryWriteNuid()
{
return NuidWriter.TryWriteNuid(_buf);
}

[Benchmark]
public string NewInbox_ShortPrefix()
{
return ConnectionDefaultPrefix.NewInbox();
}

[Benchmark]
public string NewInbox_LongPrefix()
{
return ConnectionLongPrefix.NewInbox();
}
}
135 changes: 135 additions & 0 deletions src/NATS.Client.Core/Internal/NuidWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Security.Cryptography;

namespace NATS.Client.Core.Internal;

[SkipLocalsInit]
internal sealed class NuidWriter
{
internal const nuint NuidLength = PrefixLength + SequentialLength;
private const nuint Base = 62;
private const ulong MaxSequential = 839299365868340224; // 62^10
private const uint PrefixLength = 12;
private const nuint SequentialLength = 10;
private const int MinIncrement = 33;
private const int MaxIncrement = 333;

[ThreadStatic]
private static NuidWriter? _writer;

private char[] _prefix;
private ulong _increment;
private ulong _sequential;

private NuidWriter()
{
Refresh(out _);
}

private static ReadOnlySpan<char> Digits => "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";

public static bool TryWriteNuid(Span<char> nuidBuffer)
{
if (_writer is not null)
{
return _writer.TryWriteNuidCore(nuidBuffer);
}

return InitAndWrite(nuidBuffer);
}

private static bool TryWriteNuidCore(Span<char> buffer, Span<char> prefix, ulong sequential)
{
if ((uint)buffer.Length < NuidLength || prefix.Length != PrefixLength)
{
return false;
}

Unsafe.CopyBlockUnaligned(ref Unsafe.As<char, byte>(ref buffer[0]), ref Unsafe.As<char, byte>(ref prefix[0]), PrefixLength * sizeof(char));

// NOTE: We must never write to digitsPtr!
ref var digitsPtr = ref MemoryMarshal.GetReference(Digits);

for (nuint i = PrefixLength; i < NuidLength; i++)
{
var digitIndex = (nuint)(sequential % Base);
Unsafe.Add(ref buffer[0], i) = Unsafe.Add(ref digitsPtr, digitIndex);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of Unsafe is necessary here to avoid range checks, both when accessing buffer and Digits.

sequential /= Base;
}

return true;
}

private static uint GetIncrement()
{
return (uint)Random.Shared.Next(MinIncrement, MaxIncrement + 1);
}

private static ulong GetSequential()
{
return (ulong)Random.Shared.NextInt64(0, (long)MaxSequential + 1);
}

private static char[] GetPrefix(RandomNumberGenerator? rng = null)
{
Span<byte> randomBytes = stackalloc byte[(int)PrefixLength];

// TODO: For .NET 8+, use GetItems for better distribution
if (rng == null)
{
RandomNumberGenerator.Fill(randomBytes);
}
else
{
rng.GetBytes(randomBytes);
}

var newPrefix = new char[PrefixLength];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious. Is there a technical reason why _prefix shouldn't be preallocated since it'd be on TLS? I assume it'd be over optimization since this isn't on hot path. Only a question really.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is no reason. I implemented NUID a long time ago using locks and never changed this.


for (var i = 0; i < randomBytes.Length; i++)
{
var digitIndex = (int)(randomBytes[i] % Base);
newPrefix[i] = Digits[digitIndex];
}

return newPrefix;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static bool InitAndWrite(Span<char> span)
{
_writer = new NuidWriter();
return _writer.TryWriteNuidCore(span);
}

private bool TryWriteNuidCore(Span<char> nuidBuffer)
{
var sequential = _sequential += _increment;

if (sequential < MaxSequential)
{
return TryWriteNuidCore(nuidBuffer, _prefix, sequential);
}

return RefreshAndWrite(nuidBuffer);

[MethodImpl(MethodImplOptions.NoInlining)]
bool RefreshAndWrite(Span<char> buffer)
{
var prefix = Refresh(out sequential);
return TryWriteNuidCore(buffer, prefix, sequential);
}
}

[MethodImpl(MethodImplOptions.NoInlining)]
[MemberNotNull(nameof(_prefix))]
private char[] Refresh(out ulong sequential)
{
var prefix = _prefix = GetPrefix();
_increment = GetIncrement();
sequential = _sequential = GetSequential();
return prefix;
}
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N
{
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
var inboxSubject = $"{_inboxPrefix}*";
var inboxSubject = $"{_inboxPrefix}.*";
_inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this);
await SubscribeInternalAsync(
inboxSubject,
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.Core/NATS.Client.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="System.IO.Hashing" Version="6.0.1" />

<InternalsVisibleTo Include="MicroBenchmark, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="$(AssemblyName).Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="$(AssemblyName).MemoryTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NatsBenchmark, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
Expand Down
41 changes: 40 additions & 1 deletion src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

Expand All @@ -7,7 +10,7 @@ public partial class NatsConnection
private static readonly NatsSubOpts DefaultReplyOpts = new() { MaxMsgs = 1 };

/// <inheritdoc />
public string NewInbox() => $"{InboxPrefix}{Guid.NewGuid():n}";
public string NewInbox() => NewInbox(InboxPrefix);

/// <inheritdoc />
public async ValueTask<NatsMsg<TReply?>?> RequestAsync<TRequest, TReply>(
Expand Down Expand Up @@ -61,6 +64,42 @@ public partial class NatsConnection
}
}

[SkipLocalsInit]
private static string NewInbox(ReadOnlySpan<char> prefix)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is a bit of a mess. It is rather complicated and still slow and allocates quite heavily. Allocations could be reduced by renting buffers instead of new'ing them, but that reduces throughput.

Maybe it would be possible to not operate with strings (apart from public interfaces) for inbox prefix/subjects and instead use only byte[]. But that would be a more invasive change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. this alternative implementation which looks much more reasonable

    [SkipLocalsInit]
    private static string NewInbox(ReadOnlySpan<char> prefix)
    {
        Span<char> buffer = stackalloc char[22];
        NuidWriter.TryWriteNuid(buffer);

        if (prefix.IsEmpty)
        {
            return new string(buffer);
        }
        else
        {
            return $"{prefix}.{buffer}";
        }
    }

is slower but also reduces the allocations for long prefixes:

| Method               | Job           | Runtime       | Mean     | Error    | StdDev   | Ratio | RatioSD | Gen0   | Allocated | Alloc Ratio |
|--------------------- |-------------- |-------------- |---------:|---------:|---------:|------:|--------:|-------:|----------:|------------:|
| NewInbox_ShortPrefix | .NET 6.0      | .NET 6.0      | 79.85 ns | 1.617 ns | 2.370 ns |  0.91 |    0.05 | 0.0612 |     128 B |        1.00 |
| NewInbox_ShortPrefix | .NET 7.0      | .NET 7.0      | 87.93 ns | 1.789 ns | 2.732 ns |  1.00 |    0.00 | 0.0612 |     128 B |        1.00 |
| NewInbox_ShortPrefix | .NET 8.0      | .NET 8.0      | 72.40 ns | 1.353 ns | 1.266 ns |  0.82 |    0.03 | 0.0612 |     128 B |        1.00 |
| NewInbox_ShortPrefix | NativeAOT 8.0 | NativeAOT 8.0 | 66.20 ns | 1.358 ns | 1.270 ns |  0.75 |    0.03 | 0.0612 |     128 B |        1.00 |
|                      |               |               |          |          |          |       |         |        |           |             |
| NewInbox_LongPrefix  | .NET 6.0      | .NET 6.0      | 82.68 ns | 0.958 ns | 1.025 ns |  0.97 |    0.02 | 0.0994 |     208 B |        1.00 |
| NewInbox_LongPrefix  | .NET 7.0      | .NET 7.0      | 85.93 ns | 0.881 ns | 0.688 ns |  1.00 |    0.00 | 0.0994 |     208 B |        1.00 |
| NewInbox_LongPrefix  | .NET 8.0      | .NET 8.0      | 78.37 ns | 1.593 ns | 1.705 ns |  0.91 |    0.02 | 0.0994 |     208 B |        1.00 |
| NewInbox_LongPrefix  | NativeAOT 8.0 | NativeAOT 8.0 | 72.60 ns | 1.492 ns | 1.532 ns |  0.85 |    0.01 | 0.0994 |     208 B |        1.00 |

{
Span<char> buffer = stackalloc char[64];
var separatorLength = prefix.Length > 0 ? 1u : 0u;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if support for empty prefixes is really needed, but right now nothing is preventing users from doing it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, practice is to have a prefix so the accounts can be permissined accordingly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, Actually just realized an empty prefix might create interesting behaviours in subscription manager i.e. SubscriptionManager.IsInboxSubject() would always return true causing all subscriptions to go through an internal muxed 'inbox' subscription.

var totalLength = (uint)prefix.Length + (uint)NuidWriter.NuidLength + separatorLength;
if (totalLength <= buffer.Length)
{
buffer = buffer.Slice(0, (int)totalLength);
}
else
{
buffer = new char[totalLength];
}

var totalPrefixLength = (uint)prefix.Length + separatorLength;
if ((uint)buffer.Length > totalPrefixLength && (uint)buffer.Length > (uint)prefix.Length)
{
prefix.CopyTo(buffer);
buffer[prefix.Length] = '.';
var remaining = buffer.Slice((int)totalPrefixLength);
var didWrite = NuidWriter.TryWriteNuid(remaining);
Debug.Assert(didWrite, "didWrite");
return new string(buffer);
}

return Throw();

[DoesNotReturn]
string Throw()
{
Debug.Fail("Must not happen");
throw new InvalidOperationException("This should never be raised!");
}
}

private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts)
{
var opts = replyOpts ?? DefaultReplyOpts;
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.RequestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal async ValueTask<INatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default)
{
var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}";
var replyTo = NewInbox();

var replySerializer = replyOpts?.Serializer ?? Opts.Serializer;
var sub = new NatsSub<TReply>(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public NatsConnection(NatsOpts opts)
Counter = new ConnectionStatsCounter();
_writerState = new WriterState(opts);
_commandWriter = _writerState.CommandBuffer.Writer;
InboxPrefix = $"{opts.InboxPrefix}.{Guid.NewGuid():n}.";
InboxPrefix = NewInbox(opts.InboxPrefix);
SubscriptionManager = new SubscriptionManager(this, InboxPrefix);
_logger = opts.LoggerFactory.CreateLogger<NatsConnection>();
_clientOpts = ClientOpts.Create(Opts);
Expand Down
Loading