Skip to content

Commit

Permalink
KV direct get and msg get
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Sep 26, 2023
1 parent c6a1395 commit 9272cff
Show file tree
Hide file tree
Showing 9 changed files with 509 additions and 147 deletions.
58 changes: 33 additions & 25 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,41 +132,49 @@ internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TRe
Validator.ValidateObject(request, new ValidationContext(request));
}

await using var sub = await Connection.RequestSubAsync<TRequest, TResponse>(
subject: subject,
data: request,
headers: default,
requestOpts: default,
replyOpts: new NatsSubOpts { Serializer = NatsJSErrorAwareJsonSerializer.Default },
cancellationToken)
.ConfigureAwait(false);

if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
var cancellationTimer = Connection.GetCancellationTimer(cancellationToken);
try
{
if (sub.Msgs.TryRead(out var msg))
await using var sub = await Connection.RequestSubAsync<TRequest, TResponse>(
subject: subject,
data: request,
headers: default,
requestOpts: default,
replyOpts: new NatsSubOpts { Serializer = NatsJSErrorAwareJsonSerializer.Default },
cancellationTimer.Token)
.ConfigureAwait(false);

if (await sub.Msgs.WaitToReadAsync(cancellationTimer.Token).ConfigureAwait(false))
{
if (msg.Data == null)
if (sub.Msgs.TryRead(out var msg))
{
throw new NatsJSException("No response data received");
}
if (msg.Data == null)
{
throw new NatsJSException("No response data received");
}

return new NatsJSResponse<TResponse>(msg.Data, default);
return new NatsJSResponse<TResponse>(msg.Data, default);
}
}
}

if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb)
{
if (sb.Exception is NatsSubException { Exception.SourceException: NatsJSApiErrorException jsError })
if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb)
{
// Clear exception here so that subscription disposal won't throw it.
sb.ClearException();
if (sb.Exception is NatsSubException { Exception.SourceException: NatsJSApiErrorException jsError })
{
// Clear exception here so that subscription disposal won't throw it.
sb.ClearException();

return new NatsJSResponse<TResponse>(default, jsError.Error);
}

return new NatsJSResponse<TResponse>(default, jsError.Error);
throw sb.Exception;
}

throw sb.Exception;
throw new NatsJSException("No response received");
}
finally
{
cancellationTimer.TryReturn();
}

throw new NatsJSException("No response received");
}
}
28 changes: 23 additions & 5 deletions src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,35 @@ public async ValueTask RefreshAsync(CancellationToken cancellationToken = defaul
request: null,
cancellationToken).ConfigureAwait(false);

public async ValueTask<T?> GetAsync<T>(string subject, CancellationToken cancellationToken = default)
public ValueTask<NatsMsg<T?>?> GetDirectAsync<T>(string subject, INatsSerializer? serializer = default, CancellationToken cancellationToken = default)
{
var msg = await _context.Connection.RequestAsync<object, T>(
NatsSubOpts? subOpts;
NatsPubOpts? pubOpts;
if (serializer != null)
{
subOpts = new NatsSubOpts { Serializer = serializer };
pubOpts = new NatsPubOpts { Serializer = serializer };
}
else
{
subOpts = default;
pubOpts = default;
}

return _context.Connection.RequestAsync<object, T>(
subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}.{subject}",
data: default,
requestOpts: pubOpts,
replyOpts: subOpts,
cancellationToken: cancellationToken);
if (msg == default)
return default;
return msg.Value.Data;
}

public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) =>
_context.JSRequestResponseAsync<StreamMsgGetRequest, StreamMsgGetResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}",
request: request,
cancellationToken);

private void ThrowIfDeleted()
{
if (_deleted)
Expand Down
76 changes: 76 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
namespace NATS.Client.KeyValueStore;

public record NatsKVConfig
{
public NatsKVConfig(string bucket) => Bucket = bucket;

/// <summary>
/// Name of the bucket
/// </summary>
public string Bucket { get; init; }

/// <summary>
/// Human readable description.
/// </summary>
public string? Description { get; init; }

/// <summary>
/// Maximum size of a single value.
/// </summary>
public int MaxValueSize { get; init; }

/// <summary>
/// Maximum historical entries.
/// </summary>
public long History { get; init; }

/// <summary>
/// Maximum age of any entry in the bucket, expressed in nanoseconds
/// </summary>
public TimeSpan MaxAge { get; init; }

/// <summary>
/// How large the bucket may become in total bytes before the configured discard policy kicks in
/// </summary>
public long MaxBytes { get; init; }

/// <summary>
/// The type of storage backend, `File` (default) and `Memory`
/// </summary>
public NatsKVStorageType Storage { get; init; }

/// <summary>
/// How many replicas to keep for each entry in a cluster.
/// </summary>
public int NumberOfReplicas { get; init; }

/// <summary>
/// Republish is for republishing messages once persistent in the Key Value Bucket.
/// </summary>
public NatsKVRepublish? Republish { get; init; }

// Bucket mirror configuration.
// pub mirror: Option<Source>,
// Bucket sources configuration.
// pub sources: Option<Vec<Source>>,
// Allow mirrors using direct API.
// pub mirror_direct: bool,
}

public record NatsKVRepublish
{
/// <summary>
/// Subject that should be republished.
/// </summary>
public string? Src { get; init; }

/// <summary>
/// Subject where messages will be republished.
/// </summary>
public string? Dest { get; init; }

/// <summary>
/// If true, only headers should be republished.
/// </summary>
public bool HeadersOnly { get; init; }
}
Loading

0 comments on commit 9272cff

Please sign in to comment.