Skip to content

Commit

Permalink
Add KV try get (#688)
Browse files Browse the repository at this point in the history
* Add KV try get

* Try-get implementation

* Added perf tweaks

* Format

* Add bench

* Add TryGetEntryAsync method to INatsKVStore interface

* Remove obsolete GetAsyncNew method from KVBench.cs

* Bench string

* Tidy up
  • Loading branch information
mtmk authored Dec 10, 2024
1 parent ff8cb03 commit 150d5a8
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 17 deletions.
103 changes: 103 additions & 0 deletions sandbox/MicroBenchmark/KVBench.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using BenchmarkDotNet.Attributes;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;

#pragma warning disable CS8618

namespace MicroBenchmark;

[MemoryDiagnoser]
[PlainExporter]
public class KvBench
{
private NatsConnection _nats;
private NatsJSContext _js;
private NatsKVContext _kv;
private NatsKVStore _store;

[GlobalSetup]
public async Task SetupAsync()
{
_nats = new NatsConnection();
_js = new NatsJSContext(_nats);
_kv = new NatsKVContext(_js);
_store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark"));
}

[Benchmark]
public async ValueTask<int> TryGetAsync()
{
var result = await _store.TryGetEntryAsync<int>("does.not.exist");
if (result is { Success: false, Error: NatsKVKeyNotFoundException })
{
return 1;
}

return 0;
}

[Benchmark]
public async ValueTask<int> GetAsync()
{
try
{
await _store.GetEntryAsync<int>("does.not.exist");
}
catch (NatsKVKeyNotFoundException)
{
return 1;
}

return 0;
}

[Benchmark]
public async ValueTask<int> TryGetMultiAsync()
{
List<Task> tasks = new();
for (var i = 0; i < 100; i++)
{
tasks.Add(Task.Run(async () =>
{
var result = await _store.TryGetEntryAsync<int>("does.not.exist");
if (result is { Success: false, Error: NatsKVKeyNotFoundException })
{
return 1;
}

return 0;
}));
}

await Task.WhenAll(tasks);

return 0;
}

[Benchmark]
public async ValueTask<int> GetMultiAsync()
{
List<Task> tasks = new();
for (var i = 0; i < 100; i++)
{
tasks.Add(Task.Run(async () =>
{
try
{
await _store.GetEntryAsync<int>("does.not.exist");
}
catch (NatsKVKeyNotFoundException)
{
return 1;
}

return 0;
}));
}

await Task.WhenAll(tasks);

return 0;
}
}
3 changes: 2 additions & 1 deletion sandbox/MicroBenchmark/MicroBenchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.12" />
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
<PackageReference Include="StackExchange.Redis" Version="2.5.43" />
<PackageReference Include="ZLogger" Version="1.6.1" />
<PackageReference Include="NATS.Client" Version="0.14.5" />
Expand All @@ -20,6 +20,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.Core\NATS.Client.Core.csproj" />
<ProjectReference Include="..\..\src\NATS.Client.JetStream\NATS.Client.JetStream.csproj" />
<ProjectReference Include="..\..\src\NATS.Client.KeyValueStore\NATS.Client.KeyValueStore.csproj" />
</ItemGroup>

</Project>
14 changes: 14 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ public interface INatsKVStore
/// <exception cref="NatsKVException">There was an error with metadata</exception>
ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Try to get an entry from the bucket using the key.
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="revision">Revision to retrieve</param>
/// <param name="serializer">Optional serialized to override the default</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>A NatsResult object representing the value or an error.</returns>
/// <remarks>
/// Use this method to avoid exceptions when, for example, the key is not found.
/// </remarks>
ValueTask<NatsResult<NatsKVEntry<T>>> TryGetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Start a watcher for specific keys
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public NatsKVCreateException()

public class NatsKVKeyNotFoundException : NatsKVException
{
public static readonly NatsKVKeyNotFoundException Default = new();

public NatsKVKeyNotFoundException()
: base("Key not found")
{
Expand Down
59 changes: 43 additions & 16 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,23 @@ public class NatsKVStore : INatsKVStore
private const string NatsSequence = "Nats-Sequence";
private const string NatsTimeStamp = "Nats-Time-Stamp";
private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
private static readonly NatsKVException MissingSequenceHeaderException = new("Missing sequence header");
private static readonly NatsKVException MissingTimestampHeaderException = new("Missing timestamp header");
private static readonly NatsKVException MissingHeadersException = new("Missing headers");
private static readonly NatsKVException UnexpectedSubjectException = new("Unexpected subject");
private static readonly NatsKVException UnexpectedNumberOfOperationHeadersException = new("Unexpected number of operation headers");
private static readonly NatsKVException InvalidSequenceException = new("Can't parse sequence header");
private static readonly NatsKVException InvalidTimestampException = new("Can't parse timestamp header");
private static readonly NatsKVException InvalidOperationException = new("Can't parse operation header");
private readonly INatsJSStream _stream;
private readonly string _kvBucket;

internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream)
{
Bucket = bucket;
JetStreamContext = context;
_stream = stream;
_kvBucket = $"$KV.{Bucket}.";
}

/// <inheritdoc />
Expand Down Expand Up @@ -166,13 +176,27 @@ public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, Cancel

/// <inheritdoc />
public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken);
if (!result.Success)
{
ThrowException(result.Error);
}

return result.Value;
}

/// <inheritdoc />
#if !NETSTANDARD
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
#endif
public async ValueTask<NatsResult<NatsKVEntry<T>>> TryGetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var keySubject = _kvBucket + key;

var request = new StreamMsgGetRequest();
var keySubject = $"$KV.{Bucket}.{key}";

if (revision == default)
{
request.LastBySubj = keySubject;
Expand All @@ -190,44 +214,44 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
if (direct is { Headers: { } headers } msg)
{
if (headers.Code == 404)
throw new NatsKVKeyNotFoundException();
return NatsKVKeyNotFoundException.Default;

if (!headers.TryGetLastValue(NatsSubject, out var subject))
throw new NatsKVException("Missing sequence header");
return MissingSequenceHeaderException;

if (revision != default)
{
if (!string.Equals(subject, keySubject, StringComparison.Ordinal))
{
throw new NatsKVException("Unexpected subject");
return UnexpectedSubjectException;
}
}

if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue))
throw new NatsKVException("Missing sequence header");
return MissingSequenceHeaderException;

if (!ulong.TryParse(sequenceValue, out var sequence))
throw new NatsKVException("Can't parse sequence header");
return InvalidSequenceException;

if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue))
throw new NatsKVException("Missing timestamp header");
return MissingTimestampHeaderException;

if (!DateTimeOffset.TryParse(timestampValue, out var timestamp))
throw new NatsKVException("Can't parse timestamp header");
return InvalidTimestampException;

var operation = NatsKVOperation.Put;
if (headers.TryGetValue(KVOperation, out var operationValues))
{
if (operationValues.Count != 1)
throw new NatsKVException("Unexpected number of operation headers");
return UnexpectedNumberOfOperationHeadersException;

if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation))
throw new NatsKVException("Can't parse operation header");
return InvalidOperationException;
}

if (operation is NatsKVOperation.Del or NatsKVOperation.Purge)
{
throw new NatsKVKeyDeletedException(sequence);
return new NatsKVKeyDeletedException(sequence);
}

return new NatsKVEntry<T>(Bucket, key)
Expand All @@ -245,7 +269,7 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
}
else
{
throw new NatsKVException("Missing headers");
return MissingHeadersException;
}
}
else
Expand All @@ -256,7 +280,7 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
{
if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal))
{
throw new NatsKVException("Unexpected subject");
return UnexpectedSubjectException;
}
}

Expand Down Expand Up @@ -452,12 +476,12 @@ internal async ValueTask<NatsKVWatcher<T>> WatchInternalAsync<T>(IEnumerable<str
/// </summary>
private static void ValidateKey(string key)
{
if (string.IsNullOrWhiteSpace(key))
if (string.IsNullOrWhiteSpace(key) || key.Length == 0)
{
ThrowNatsKVException("Key cannot be empty");
}

if (key.StartsWith(".") || key.EndsWith("."))
if (key[0] == '.' || key[^1] == '.')
{
ThrowNatsKVException("Key cannot start or end with a period");
}
Expand All @@ -470,6 +494,9 @@ private static void ValidateKey(string key)

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowNatsKVException(string message) => throw new NatsKVException(message);

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowException(Exception exception) => throw exception;
}

public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info);

0 comments on commit 150d5a8

Please sign in to comment.