From eb8635cdb0a72da188bbebb099145f4b48431519 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 7 Sep 2023 20:46:16 +0100 Subject: [PATCH 01/10] Options tidy-up * Removed variable PubOpts Headers and ReplyTo * Added headers and replyTo params to Publish methods * Make all Opts classes (not structs) * Added default options for JS context --- docs/documentation/queue.md | 3 +- .../Example.Core.PublishHeaders/Program.cs | 2 +- .../Program.cs | 4 +- src/NATS.Client.Core/INatsConnection.cs | 40 +++++++++++++-- src/NATS.Client.Core/Internal/InboxSub.cs | 2 +- .../Internal/SubscriptionManager.cs | 14 ++++-- .../NatsConnection.LowLevelApi.cs | 9 ++-- .../NatsConnection.Publish.cs | 16 +++--- .../NatsConnection.RequestReply.cs | 22 +++++--- .../NatsConnection.RequestSub.cs | 42 ++++++++++------ .../NatsConnection.Subscribe.cs | 15 +++--- src/NATS.Client.Core/NatsMsg.cs | 12 ++--- src/NATS.Client.Core/NatsPubOpts.cs | 6 +-- src/NATS.Client.Core/NatsRequestExtensions.cs | 50 ++++++++++++++----- .../NatsRequestManyExtensions.cs | 38 +++++++++----- src/NATS.Client.Core/NatsSub.cs | 10 ++-- src/NATS.Client.Core/NatsSubBase.cs | 5 +- src/NATS.Client.Core/NatsSubChannelOpts.cs | 2 +- src/NATS.Client.Core/NatsSubOpts.cs | 12 +---- src/NATS.Client.JetStream/NatsJSConsume.cs | 3 +- src/NATS.Client.JetStream/NatsJSConsumer.cs | 23 +++++++-- src/NATS.Client.JetStream/NatsJSContext.cs | 7 ++- src/NATS.Client.JetStream/NatsJSFetch.cs | 3 +- src/NATS.Client.JetStream/NatsJSOpts.cs | 21 ++++++++ .../CancellationTest.cs | 2 +- .../NATS.Client.Core.Tests/LowLevelApiTest.cs | 4 +- .../NatsConnectionTest.Headers.cs | 4 +- .../NatsConnectionTest.QueueGroups.cs | 4 +- .../NatsConnectionTest.cs | 2 +- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 10 ++-- .../SubscriptionTest.cs | 10 ++-- .../JetStreamTest.cs | 8 +-- .../PublishTest.cs | 4 +- 33 files changed, 267 insertions(+), 142 deletions(-) diff --git a/docs/documentation/queue.md b/docs/documentation/queue.md index 175a0b20b..9ed3d56fa 100644 --- a/docs/documentation/queue.md +++ b/docs/documentation/queue.md @@ -21,8 +21,7 @@ var replyTasks = new List(); for (int i = 0; i < 3; i++) { // Create three subscriptions all on the same queue group - var opts = new NatsSubOpts { QueueGroup = "maths-service" }; - var sub = await nats.SubscribeAsync("math.double", opts); + var sub = await nats.SubscribeAsync("math.double", queueGroup: "maths-service"); subs.Add(sub); diff --git a/sandbox/Example.Core.PublishHeaders/Program.cs b/sandbox/Example.Core.PublishHeaders/Program.cs index 2227d45c9..696f22e0b 100644 --- a/sandbox/Example.Core.PublishHeaders/Program.cs +++ b/sandbox/Example.Core.PublishHeaders/Program.cs @@ -15,7 +15,7 @@ await connection.PublishAsync( subject, new Bar { Id = i, Name = "Baz" }, - new NatsPubOpts { Headers = new NatsHeaders { ["XFoo"] = $"bar{i}" } }); + headers: new NatsHeaders { ["XFoo"] = $"bar{i}" }); } void Print(string message) diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs index 2f91b5a8e..5f17ea87e 100644 --- a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs +++ b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs @@ -12,7 +12,7 @@ await using var connection1 = new NatsConnection(options); Print($"[1][SUB] Subscribing to subject '{subject}'...\n"); -var sub1 = await connection1.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" }); +var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers"); var task1 = Task.Run(async () => { await foreach (var msg in sub1.Msgs.ReadAllAsync()) @@ -28,7 +28,7 @@ await using var connection2 = new NatsConnection(options); Print($"[2][SUB] Subscribing to subject '{subject}'...\n"); -var sub2 = await connection2.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" }); +var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers"); var task2 = Task.Run(async () => { await foreach (var msg in sub2.Msgs.ReadAllAsync()) diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index cce160449..ca63e8336 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -17,10 +17,12 @@ public interface INatsConnection /// /// The destination subject to publish to. /// The message payload data. + /// Optional reply-to subject. + /// Optional message headers. /// A for publishing options. /// A used to cancel the command. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, string? replyTo = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes the message payload to the given subject name, optionally supplying a reply subject. @@ -36,11 +38,13 @@ public interface INatsConnection /// /// The destination subject to publish to. /// Serializable data object + /// Optional reply-to subject. + /// Optional message headers. /// A for publishing options. /// A used to cancel the command. /// Specifies the type of data that may be send to the NATS Server. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, T data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, T data, string? replyTo = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. @@ -56,20 +60,32 @@ public interface INatsConnection /// Initiates a subscription to a subject, optionally joining a distributed queue group. /// /// The subject name to subscribe to. + /// If specified, the subscriber will join this queue group. /// A for subscription options. /// A used to cancel the command. /// A that represents the asynchronous send operation. - ValueTask SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + /// + /// Subscribers with the same queue group name, become a queue group, + /// and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Initiates a subscription to a subject, optionally joining a distributed queue group. /// /// The subject name to subscribe to. + /// If specified, the subscriber will join this queue group. /// A for subscription options. /// A used to cancel the command. /// Specifies the type of data that may be received from the NATS Server. /// A that represents the asynchronous send operation. - ValueTask> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + /// + /// Subscribers with the same queue group name, become a queue group, + /// and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + ValueTask> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID} @@ -82,6 +98,8 @@ public interface INatsConnection /// /// Subject of the responder /// Data to send to responder + /// If specified, the reply handler (subscriber) will join this queue group. + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -97,6 +115,8 @@ public interface INatsConnection ValueTask?> RequestAsync( string subject, TRequest? data, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); @@ -106,6 +126,8 @@ public interface INatsConnection /// /// Subject of the responder /// Payload to send to responder + /// If specified, the reply handler (subscriber) will join this queue group. + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -119,6 +141,8 @@ public interface INatsConnection ValueTask RequestAsync( string subject, ReadOnlySequence payload = default, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); @@ -128,6 +152,8 @@ public interface INatsConnection /// /// Subject of the responder /// Data to send to responder + /// If specified, the reply handler (subscriber) will join this queue group. + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -141,6 +167,8 @@ public interface INatsConnection IAsyncEnumerable> RequestManyAsync( string subject, TRequest? data, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); @@ -150,6 +178,8 @@ public interface INatsConnection /// /// Subject of the responder /// Payload to send to responder + /// If specified, the reply handler (subscriber) will join this queue group. + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -161,6 +191,8 @@ public interface INatsConnection IAsyncEnumerable RequestManyAsync( string subject, ReadOnlySequence payload = default, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); diff --git a/src/NATS.Client.Core/Internal/InboxSub.cs b/src/NATS.Client.Core/Internal/InboxSub.cs index e9373307c..bda5fa3e5 100644 --- a/src/NATS.Client.Core/Internal/InboxSub.cs +++ b/src/NATS.Client.Core/Internal/InboxSub.cs @@ -16,7 +16,7 @@ public InboxSub( NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) - : base(connection, manager, subject, opts) + : base(connection, manager, subject, queueGroup: default, opts) { _inbox = inbox; _connection = connection; diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index ed09a7544..561e987eb 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -47,15 +47,20 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix) internal InboxSubBuilder InboxSubBuilder { get; } - public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) + public async ValueTask SubscribeAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { if (IsInboxSubject(subject)) { + if (queueGroup != null) + { + throw new NatsException("Inbox subscriptions don't support queue groups"); + } + await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); } else { - await SubscribeInternalAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + await SubscribeInternalAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); } } @@ -179,6 +184,7 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N _inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this); await SubscribeInternalAsync( inboxSubject, + queueGroup: default, opts: default, _inboxSub, cancellationToken).ConfigureAwait(false); @@ -193,7 +199,7 @@ await SubscribeInternalAsync( await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false); } - private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) + private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { var sid = GetNextSid(); lock (_gate) @@ -204,7 +210,7 @@ private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts try { - await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken) + await _connection.SubscribeCoreAsync(sid, subject, queueGroup, opts?.MaxMsgs, cancellationToken) .ConfigureAwait(false); await sub.ReadyAsync().ConfigureAwait(false); } diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs index e3b0f3e30..c010d43d1 100644 --- a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -1,6 +1,5 @@ using System.Buffers; using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -100,17 +99,17 @@ internal ValueTask PubModelAsync(string subject, T? data, INatsSerializer ser } } - internal ValueTask SubAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default) + internal ValueTask SubAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default) { if (ConnectionState == NatsConnectionState.Open) { - return SubscriptionManager.SubscribeAsync(subject, opts, sub, cancellationToken); + return SubscriptionManager.SubscribeAsync(subject, queueGroup, opts, sub, cancellationToken); } else { - return WithConnectAsync(subject, opts, sub, cancellationToken, static (self, s, o, b, token) => + return WithConnectAsync(subject, queueGroup, opts, sub, cancellationToken, static (self, s, q, o, b, token) => { - return self.SubscriptionManager.SubscribeAsync(s, o, b, token); + return self.SubscriptionManager.SubscribeAsync(s, q, o, b, token); }); } } diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index b12e032c0..b714e2e13 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -5,41 +5,41 @@ namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, string? replyTo = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { if (opts?.WaitUntilSent ?? false) { - return PubAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken); + return PubAsync(subject, replyTo, payload, headers, cancellationToken); } else { - return PubPostAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken); + return PubPostAsync(subject, replyTo, payload, headers, cancellationToken); } } /// public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { - return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken); + return PublishAsync(msg.Subject, msg.Data, msg.ReplyTo, msg.Headers, opts, cancellationToken); } /// - public ValueTask PublishAsync(string subject, T? data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, T? data, string? replyTo = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { var serializer = opts?.Serializer ?? Opts.Serializer; if (opts?.WaitUntilSent ?? false) { - return PubModelAsync(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken); + return PubModelAsync(subject, data, serializer, replyTo, headers, cancellationToken); } else { - return PubModelPostAsync(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken); + return PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken); } } /// public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { - return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken); + return PublishAsync(msg.Subject, msg.Data, msg.ReplyTo, msg.Headers, opts, cancellationToken); } } diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index 12ac3b7bc..f5dc115a7 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -5,6 +5,8 @@ namespace NATS.Client.Core; public partial class NatsConnection { + private static readonly NatsSubOpts DefaultReplyOpts = new() { MaxMsgs = 1 }; + /// public string NewInbox() => $"{InboxPrefix}{Guid.NewGuid():n}"; @@ -12,13 +14,15 @@ public partial class NatsConnection public async ValueTask?> RequestAsync( string subject, TRequest? data, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { var opts = SetReplyOptsDefaults(replyOpts); - await using var sub = await RequestSubAsync(subject, data, requestOpts, opts, cancellationToken) + await using var sub = await RequestSubAsync(subject, data, queueGroup, headers, requestOpts, opts, cancellationToken) .ConfigureAwait(false); if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) @@ -36,13 +40,15 @@ public partial class NatsConnection public async ValueTask RequestAsync( string subject, ReadOnlySequence payload = default, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { var opts = SetReplyOptsDefaults(replyOpts); - await using var sub = await RequestSubAsync(subject, payload, requestOpts, opts, cancellationToken).ConfigureAwait(false); + await using var sub = await RequestSubAsync(subject, payload, queueGroup, headers, requestOpts, opts, cancellationToken).ConfigureAwait(false); if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { @@ -59,11 +65,13 @@ public partial class NatsConnection public async IAsyncEnumerable> RequestManyAsync( string subject, TRequest? data, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await using var sub = await RequestSubAsync(subject, data, requestOpts, replyOpts, cancellationToken) + await using var sub = await RequestSubAsync(subject, data, queueGroup, headers, requestOpts, replyOpts, cancellationToken) .ConfigureAwait(false); while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) @@ -85,11 +93,13 @@ public partial class NatsConnection public async IAsyncEnumerable RequestManyAsync( string subject, ReadOnlySequence payload = default, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await using var sub = await RequestSubAsync(subject, payload, requestOpts, replyOpts, cancellationToken).ConfigureAwait(false); + await using var sub = await RequestSubAsync(subject, payload, queueGroup, headers, requestOpts, replyOpts, cancellationToken).ConfigureAwait(false); while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { @@ -106,9 +116,9 @@ public async IAsyncEnumerable RequestManyAsync( } } - private NatsSubOpts SetReplyOptsDefaults(in NatsSubOpts? replyOpts) + private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts) { - var opts = (replyOpts ?? default) with { MaxMsgs = 1, }; + var opts = replyOpts ?? DefaultReplyOpts; if ((opts.Timeout ?? default) == default) { diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index d1ac0860e..b5607a0d8 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -1,5 +1,4 @@ using System.Buffers; -using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -8,20 +7,32 @@ public partial class NatsConnection internal async ValueTask RequestSubAsync( string subject, ReadOnlySequence payload = default, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; - var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, replyOpts); - await SubAsync(replyTo, replyOpts, sub, cancellationToken).ConfigureAwait(false); - await PubAsync(subject, replyTo, payload, requestOpts?.Headers, cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup, replyOpts); + await SubAsync(replyTo, queueGroup, replyOpts, sub, cancellationToken).ConfigureAwait(false); + if (requestOpts?.WaitUntilSent == true) + { + await PubAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); + } + else + { + await PubPostAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); + } + return sub; } internal async ValueTask> RequestSubAsync( string subject, TRequest? data, + string? queueGroup = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) @@ -29,16 +40,19 @@ internal async ValueTask> RequestSubAsync( var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; var replySerializer = replyOpts?.Serializer ?? Opts.Serializer; - var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, replyOpts, replySerializer); - await SubAsync(replyTo, replyOpts, sub, cancellationToken).ConfigureAwait(false); - - await PubModelAsync( - subject, - data, - requestOpts?.Serializer ?? Opts.Serializer, - replyTo, - requestOpts?.Headers, - cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup, replyOpts, replySerializer); + await SubAsync(replyTo, queueGroup, replyOpts, sub, cancellationToken).ConfigureAwait(false); + + var serializer = requestOpts?.Serializer ?? Opts.Serializer; + + if (requestOpts?.WaitUntilSent == true) + { + await PubModelAsync(subject, data, serializer, replyTo, headers, cancellationToken).ConfigureAwait(false); + } + else + { + await PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken).ConfigureAwait(false); + } return sub; } diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index 77deb3475..f849cb331 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -1,24 +1,21 @@ -using System.Collections.Concurrent; -using NATS.Client.Core.Internal; - namespace NATS.Client.Core; public partial class NatsConnection { /// - public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + public async ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { - var sub = new NatsSub(this, SubscriptionManager, subject, opts); - await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager, subject, queueGroup, opts); + await SubAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); return sub; } /// - public async ValueTask> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + public async ValueTask> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { var serializer = opts?.Serializer ?? Opts.Serializer; - var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, opts, serializer); - await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer); + await SubAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); return sub; } } diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 4c4cd7a8c..fe59a11b0 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -40,10 +40,10 @@ internal static NatsMsg Build( return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); } - public ValueTask ReplyAsync(ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask ReplyAsync(ReadOnlySequence payload = default, string? replyTo = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload, opts, cancellationToken); + return Connection.PublishAsync(ReplyTo!, payload, replyTo, headers, opts, cancellationToken); } public ValueTask ReplyAsync(NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) @@ -112,10 +112,10 @@ internal static NatsMsg Build( return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); } - public ValueTask ReplyAsync(TReply data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask ReplyAsync(TReply data, string? replyTo = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, data, opts, cancellationToken); + return Connection.PublishAsync(ReplyTo!, data, replyTo, headers, opts, cancellationToken); } public ValueTask ReplyAsync(NatsMsg msg) @@ -124,10 +124,10 @@ public ValueTask ReplyAsync(NatsMsg msg) return Connection.PublishAsync(msg with { Subject = ReplyTo! }); } - public ValueTask ReplyAsync(ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask ReplyAsync(ReadOnlySequence payload = default, string? replyTo = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload: payload, opts, cancellationToken); + return Connection.PublishAsync(ReplyTo!, payload: payload, replyTo, headers, opts, cancellationToken); } public ValueTask ReplyAsync(NatsMsg msg) diff --git a/src/NATS.Client.Core/NatsPubOpts.cs b/src/NATS.Client.Core/NatsPubOpts.cs index 576758292..dc1d14539 100644 --- a/src/NATS.Client.Core/NatsPubOpts.cs +++ b/src/NATS.Client.Core/NatsPubOpts.cs @@ -1,11 +1,7 @@ namespace NATS.Client.Core; -public readonly record struct NatsPubOpts +public record NatsPubOpts { - public string? ReplyTo { get; init; } - - public NatsHeaders? Headers { get; init; } - public INatsSerializer? Serializer { get; init; } /// diff --git a/src/NATS.Client.Core/NatsRequestExtensions.cs b/src/NATS.Client.Core/NatsRequestExtensions.cs index 64d6387bf..63f179ec9 100644 --- a/src/NATS.Client.Core/NatsRequestExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestExtensions.cs @@ -9,6 +9,8 @@ public static class NatsRequestExtensions /// /// NATS connection /// Message to be sent as request + /// If specified, the reply handler (subscriber) will join this queue group. + /// Request publish options /// Reply handler subscription options /// Cancel this request /// Request type @@ -23,23 +25,30 @@ public static class NatsRequestExtensions public static ValueTask?> RequestAsync( this INatsConnection nats, in NatsMsg msg, + string? queueGroup = default, + NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestAsync( + CancellationToken cancellationToken = default) + { + CheckMsgForRequestReply(msg); + + return nats.RequestAsync( msg.Subject, msg.Data, - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + queueGroup, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } /// /// Request and receive a single reply from a responder. /// /// NATS connection /// Message to be sent as request + /// If specified, the reply handler (subscriber) will join this queue group. + /// Request publish options /// Reply handler subscription options /// Cancel this request /// Returns the received from the responder as reply. @@ -52,15 +61,32 @@ public static class NatsRequestExtensions public static ValueTask RequestAsync( this INatsConnection nats, in NatsMsg msg, + string? queueGroup = default, + in NatsPubOpts? requestOpts = default, in NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestAsync( + CancellationToken cancellationToken = default) + { + CheckMsgForRequestReply(msg); + + return nats.RequestAsync( msg.Subject, payload: new ReadOnlySequence(msg.Data), - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + queueGroup, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } + + internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); + + internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); + + private static void CheckForRequestReply(string? replyTo) + { + if (!string.IsNullOrWhiteSpace(replyTo)) + { + throw new NatsException($"Can't set {nameof(NatsMsg.ReplyTo)} for a request"); + } + } } diff --git a/src/NATS.Client.Core/NatsRequestManyExtensions.cs b/src/NATS.Client.Core/NatsRequestManyExtensions.cs index bee9b6aba..0aa6a4747 100644 --- a/src/NATS.Client.Core/NatsRequestManyExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestManyExtensions.cs @@ -9,6 +9,8 @@ public static class NatsRequestManyExtensions /// /// NATS connection /// Message to be sent as request + /// If specified, the reply handler (subscriber) will join this queue group. + /// Request publish options /// Reply handler subscription options /// Cancel this request /// An asynchronous enumerable of objects @@ -19,23 +21,30 @@ public static class NatsRequestManyExtensions public static IAsyncEnumerable RequestManyAsync( this INatsConnection nats, NatsMsg msg, + string? queueGroup = default, + NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestManyAsync( + CancellationToken cancellationToken = default) + { + NatsRequestExtensions.CheckMsgForRequestReply(msg); + + return nats.RequestManyAsync( msg.Subject, payload: new ReadOnlySequence(msg.Data), - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + queueGroup, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } /// /// Request and receive zero or more replies from a responder. /// /// NATS connection /// Message to be sent as request + /// If specified, the reply handler (subscriber) will join this queue group. + /// Request publish options /// Reply handler subscription options /// Cancel this request /// Request type @@ -48,15 +57,20 @@ public static IAsyncEnumerable RequestManyAsync( public static IAsyncEnumerable> RequestManyAsync( this INatsConnection nats, NatsMsg msg, + string? queueGroup = default, + NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestManyAsync( + CancellationToken cancellationToken = default) + { + NatsRequestExtensions.CheckMsgForRequestReply(msg); + + return nats.RequestManyAsync( msg.Subject, msg.Data, - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + queueGroup, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } } diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index f25067367..f7ef07f65 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -18,8 +18,8 @@ public sealed class NatsSub : NatsSubBase, INatsSub private readonly Channel _msgs; - internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, NatsSubOpts? opts) - : base(connection, manager, subject, opts) => + internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, string? queueGroup, NatsSubOpts? opts) + : base(connection, manager, subject, queueGroup, opts) => _msgs = Channel.CreateBounded( GetChannelOpts(opts?.ChannelOpts)); @@ -28,9 +28,8 @@ internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string internal static BoundedChannelOptions GetChannelOpts( NatsSubChannelOpts? subChannelOpts) { - if (subChannelOpts != null) + if (subChannelOpts is { } overrideOpts) { - var overrideOpts = subChannelOpts.Value; return new BoundedChannelOptions(overrideOpts.Capacity ?? DefaultChannelOpts.Capacity) { @@ -74,9 +73,10 @@ internal NatsSub( NatsConnection connection, ISubscriptionManager manager, string subject, + string? queueGroup, NatsSubOpts? opts, INatsSerializer serializer) - : base(connection, manager, subject, opts) + : base(connection, manager, subject, queueGroup, opts) { _msgs = Channel.CreateBounded>( NatsSub.GetChannelOpts(opts?.ChannelOpts)); diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index b99957a7c..1eb6164c8 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -43,12 +43,13 @@ internal NatsSubBase( NatsConnection connection, ISubscriptionManager manager, string subject, + string? queueGroup, NatsSubOpts? opts) { _logger = connection.Opts.LoggerFactory.CreateLogger(); _debug = _logger.IsEnabled(LogLevel.Debug); _manager = manager; - _pendingMsgs = opts is { MaxMsgs: > 0 } ? opts.Value.MaxMsgs ?? -1 : -1; + _pendingMsgs = opts is { MaxMsgs: > 0 } ? opts.MaxMsgs ?? -1 : -1; _countPendingMsgs = _pendingMsgs > 0; _idleTimeout = opts?.IdleTimeout ?? default; _startUpTimeout = opts?.StartUpTimeout ?? default; @@ -56,7 +57,7 @@ internal NatsSubBase( Connection = connection; Subject = subject; - QueueGroup = opts?.QueueGroup; + QueueGroup = queueGroup; // Only allocate timers if necessary to reduce GC pressure if (_idleTimeout != default) diff --git a/src/NATS.Client.Core/NatsSubChannelOpts.cs b/src/NATS.Client.Core/NatsSubChannelOpts.cs index fc63aa973..fcc9197f9 100644 --- a/src/NATS.Client.Core/NatsSubChannelOpts.cs +++ b/src/NATS.Client.Core/NatsSubChannelOpts.cs @@ -5,7 +5,7 @@ namespace NATS.Client.Core; /// /// Options For setting the FullMode and Capacity for a the created for Subscriptions /// -public readonly record struct NatsSubChannelOpts +public record NatsSubChannelOpts { /// /// The Behavior of the Subscription's Channel when the Capacity has been reached. diff --git a/src/NATS.Client.Core/NatsSubOpts.cs b/src/NATS.Client.Core/NatsSubOpts.cs index 2cae34fd2..c46cb1771 100644 --- a/src/NATS.Client.Core/NatsSubOpts.cs +++ b/src/NATS.Client.Core/NatsSubOpts.cs @@ -2,18 +2,8 @@ namespace NATS.Client.Core; -public readonly record struct NatsSubOpts +public record NatsSubOpts { - /// - /// If specified, the subscriber will join this queue group. - /// - /// - /// Subscribers with the same queue group name, become a queue group, - /// and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - public string? QueueGroup { get; init; } - /// /// Serializer to use to deserialize the message if a model is being used. /// diff --git a/src/NATS.Client.JetStream/NatsJSConsume.cs b/src/NATS.Client.JetStream/NatsJSConsume.cs index becc9e582..2cc560eae 100644 --- a/src/NATS.Client.JetStream/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/NatsJSConsume.cs @@ -44,8 +44,9 @@ public NatsJSConsume( string stream, string consumer, string subject, + string? queueGroup, NatsSubOpts? opts) - : base(context.Connection, context.Connection.SubscriptionManager, subject, opts) + : base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts) { _logger = Connection.Opts.LoggerFactory.CreateLogger>(); _debug = _logger.IsEnabled(LogLevel.Debug); diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 66b085af4..e89d15b8f 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -30,9 +30,10 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d } public async IAsyncEnumerable> ConsumeAllAsync( - NatsJSConsumeOpts opts, + NatsJSConsumeOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { + opts ??= _context.Opts.DefaultConsumeOpts; await using var cc = await ConsumeAsync(opts, cancellationToken); await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken)) { @@ -40,9 +41,10 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d } } - public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts, CancellationToken cancellationToken = default) + public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); + opts ??= _context.Opts.DefaultConsumeOpts; var inbox = _context.NewInbox(); @@ -67,6 +69,7 @@ public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts consumer: _consumer, context: _context, subject: inbox, + queueGroup: default, opts: requestOpts, maxMsgs: max.MaxMsgs, maxBytes: max.MaxBytes, @@ -77,6 +80,7 @@ public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts await _context.Connection.SubAsync( subject: inbox, + queueGroup: default, opts: requestOpts, sub: sub, cancellationToken); @@ -97,8 +101,11 @@ await sub.CallMsgNextAsync( return sub; } - public async ValueTask?> NextAsync(NatsJSNextOpts opts, CancellationToken cancellationToken = default) + public async ValueTask?> NextAsync(NatsJSNextOpts? opts = default, CancellationToken cancellationToken = default) { + ThrowIfDeleted(); + opts ??= _context.Opts.DefaultNextOpts; + await using var f = await FetchAsync( new NatsJSFetchOpts { @@ -118,9 +125,12 @@ await sub.CallMsgNextAsync( } public async IAsyncEnumerable> FetchAllAsync( - NatsJSFetchOpts opts, + NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { + ThrowIfDeleted(); + opts ??= _context.Opts.DefaultFetchOpts; + await using var fc = await FetchAsync(opts, cancellationToken); await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) { @@ -129,10 +139,11 @@ await sub.CallMsgNextAsync( } public async ValueTask> FetchAsync( - NatsJSFetchOpts opts, + NatsJSFetchOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); + opts ??= _context.Opts.DefaultFetchOpts; var inbox = _context.NewInbox(); @@ -157,6 +168,7 @@ public async ValueTask> FetchAsync( consumer: _consumer, context: _context, subject: inbox, + queueGroup: default, opts: requestOpts, maxMsgs: max.MaxMsgs, maxBytes: max.MaxBytes, @@ -165,6 +177,7 @@ public async ValueTask> FetchAsync( await _context.Connection.SubAsync( subject: inbox, + queueGroup: default, opts: requestOpts, sub: sub, cancellationToken); diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index d2d3bbc71..09cc46732 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -31,12 +31,15 @@ public ValueTask GetAccountInfoAsync(CancellationToken canc public async ValueTask PublishAsync( string subject, T? data, - NatsPubOpts opts = default, + NatsHeaders? headers = default, + NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { await using var sub = await Connection.RequestSubAsync( subject: subject, data: data, + queueGroup: default, + headers: headers, requestOpts: opts, replyOpts: default, cancellationToken) @@ -87,6 +90,8 @@ internal async ValueTask> JSRequestAsync( subject: subject, data: request, + queueGroup: default, + headers: default, requestOpts: default, replyOpts: new NatsSubOpts { Serializer = NatsJSErrorAwareJsonSerializer.Default }, cancellationToken) diff --git a/src/NATS.Client.JetStream/NatsJSFetch.cs b/src/NATS.Client.JetStream/NatsJSFetch.cs index d17c278c1..fe2c0d750 100644 --- a/src/NATS.Client.JetStream/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/NatsJSFetch.cs @@ -39,8 +39,9 @@ public NatsJSFetch( string stream, string consumer, string subject, + string? queueGroup, NatsSubOpts? opts) - : base(context.Connection, context.Connection.SubscriptionManager, subject, opts) + : base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts) { _logger = Connection.Opts.LoggerFactory.CreateLogger>(); _debug = _logger.IsEnabled(LogLevel.Debug); diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index 1a9342b91..af87b587d 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -33,6 +33,27 @@ public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, string? domain = d /// These options are used as the defaults when acknowledging messages received from a stream using a consumer. /// public AckOpts AckOpts { get; init; } + + /// + /// Default consume options to be used in consume calls in this context. + /// + /// + /// Defaults to MaxMsgs = 1,000. + /// + public NatsJSConsumeOpts DefaultConsumeOpts { get; init; } = new() { MaxMsgs = 1_000 }; + + /// + /// Default fetch options to be used in fetch calls in this context. + /// + /// + /// Defaults to MaxMsgs = 1,000. + /// + public NatsJSFetchOpts DefaultFetchOpts { get; init; } = new() { MaxMsgs = 1_000 }; + + /// + /// Default next options to be used in next calls in this context. + /// + public NatsJSNextOpts DefaultNextOpts { get; init; } = new(); } public record NatsJSConsumeOpts diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs index 96e8b7149..e6e63ce11 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs @@ -28,7 +28,7 @@ public async Task CommandTimeoutTest() var timeoutException = await Assert.ThrowsAsync(async () => { - await pubConnection.PublishAsync("foo", "aiueo", new NatsPubOpts { WaitUntilSent = true }); + await pubConnection.PublishAsync("foo", "aiueo", opts: new NatsPubOpts { WaitUntilSent = true }); }); timeoutException.Message.Should().Contain("1 seconds elapsing"); diff --git a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs index 357b1ec41..71365a21f 100644 --- a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs +++ b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs @@ -18,7 +18,7 @@ public async Task Sub_custom_builder_test() var subject = "foo.*"; var builder = new NatsSubCustomTestBuilder(_output); var sub = builder.Build(subject, default, nats, nats.SubscriptionManager); - await nats.SubAsync(subject, opts: default, sub); + await nats.SubAsync(subject, queueGroup: default, opts: default, sub: sub); await Retry.Until( "subscription is ready", @@ -45,7 +45,7 @@ private class NatsSubTest : NatsSubBase private readonly ITestOutputHelper _output; public NatsSubTest(string subject, NatsConnection connection, NatsSubCustomTestBuilder builder, ITestOutputHelper output, ISubscriptionManager manager) - : base(connection, manager, subject, default) + : base(connection, manager, subject, default, default) { _builder = builder; _output = output; diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs index 12a12fb18..77023f0ef 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs @@ -40,7 +40,7 @@ await Retry.Until( Assert.False(headers.IsReadOnly); // Send with headers - await nats.PublishAsync("foo", 100, new NatsPubOpts { Headers = headers }); + await nats.PublishAsync("foo", 100, headers: headers); Assert.True(headers.IsReadOnly); Assert.Throws(() => @@ -63,7 +63,7 @@ await Retry.Until( Assert.Equal("multi-value-1", msg1.Headers["Multi"][1]); // Send empty headers - await nats.PublishAsync("foo", 200, new NatsPubOpts { Headers = new NatsHeaders() }); + await nats.PublishAsync("foo", 200, headers: new NatsHeaders()); var msg2 = await signal2; Assert.Equal(200, msg2.Data); diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs index 28c9e1f83..811d06776 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs @@ -14,8 +14,8 @@ public async Task QueueGroupsTest() await using var conn2 = server.CreateClientConnection(); await using var conn3 = server.CreateClientConnection(); - var sub1 = await conn1.SubscribeAsync("foo.*", new NatsSubOpts { QueueGroup = "my-group" }); - var sub2 = await conn2.SubscribeAsync("foo.*", new NatsSubOpts { QueueGroup = "my-group" }); + var sub1 = await conn1.SubscribeAsync("foo.*", queueGroup: "my-group"); + var sub2 = await conn2.SubscribeAsync("foo.*", queueGroup: "my-group"); var signal = new WaitSignal(); var cts = new CancellationTokenSource(); diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs index 06a82d067..98c1407d7 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs @@ -129,7 +129,7 @@ public async Task RequestTest(int minSize) await Retry.Until( "reply handle is ready", () => Volatile.Read(ref sync) == 1, - async () => await pubConnection.PublishAsync(subject, 1, new NatsPubOpts { ReplyTo = "ignore" }), + async () => await pubConnection.PublishAsync(subject, 1, replyTo: "ignore"), retryDelay: TimeSpan.FromSeconds(1)); var v = await pubConnection.RequestAsync(subject, 9999); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 60b6f41b9..b045b31cd 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -148,7 +148,7 @@ await Retry.Until( Assert.Matches(@"^MSG foo.signal1 \w+ 0␍␊$", msgFrame1.Message); Log("HPUB notifications"); - await nats.PublishAsync("foo.signal2", opts: new NatsPubOpts { Headers = new NatsHeaders() }); + await nats.PublishAsync("foo.signal2", headers: new NatsHeaders()); var msg2 = await signal2; Assert.Equal(0, msg2.Data.Length); Assert.NotNull(msg2.Headers); @@ -182,7 +182,7 @@ void Log(string text) Log("### Auto-unsubscribe after consuming max-msgs"); { var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - await using var sub = await nats.SubscribeAsync("foo", opts); + await using var sub = await nats.SubscribeAsync("foo", opts: opts); sid++; await Retry.Until("all frames arrived", () => proxy.Frames.Count >= 2); @@ -246,7 +246,7 @@ void Log(string text) proxy.Reset(); var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - var sub = await nats.SubscribeAsync("foo3", opts); + var sub = await nats.SubscribeAsync("foo3", opts: opts); sid++; var count = 0; var reg = sub.Register(_ => Interlocked.Increment(ref count)); @@ -303,7 +303,7 @@ public async Task Reconnect_with_sub_and_additional_commands() var sync = 0; await using var sub = new NatsSubReconnectTest(nats, subject, i => Interlocked.Exchange(ref sync, i)); - await nats.SubAsync(sub.Subject, opts: default, sub); + await nats.SubAsync(sub.Subject, queueGroup: default, opts: default, sub: sub); await Retry.Until( "subscribed", @@ -341,7 +341,7 @@ private sealed class NatsSubReconnectTest : NatsSubBase private readonly Action _callback; internal NatsSubReconnectTest(NatsConnection connection, string subject, Action callback) - : base(connection, connection.SubscriptionManager, subject, default) => + : base(connection, connection.SubscriptionManager, subject, queueGroup: default, opts: default) => _callback = callback; internal override IEnumerable GetReconnectCommands(int sid) diff --git a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs index 9e40e5ce8..9b70d56c8 100644 --- a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs +++ b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs @@ -90,8 +90,8 @@ public async Task Auto_unsubscribe_on_max_messages_with_inbox_subscription_test( await using var nats = server.CreateClientConnection(); var subject = nats.NewInbox(); - await using var sub1 = await nats.SubscribeAsync(subject, new NatsSubOpts { MaxMsgs = 1 }); - await using var sub2 = await nats.SubscribeAsync(subject, new NatsSubOpts { MaxMsgs = 2 }); + await using var sub1 = await nats.SubscribeAsync(subject, opts: new NatsSubOpts { MaxMsgs = 1 }); + await using var sub2 = await nats.SubscribeAsync(subject, opts: new NatsSubOpts { MaxMsgs = 2 }); for (var i = 0; i < 3; i++) { @@ -131,7 +131,7 @@ public async Task Auto_unsubscribe_on_max_messages_test() const int maxMsgs = 99; var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - await using var sub = await nats.SubscribeAsync(subject, opts); + await using var sub = await nats.SubscribeAsync(subject, opts: opts); // send more messages than max to check we only get max for (var i = 0; i < maxMsgs + 10; i++) @@ -161,7 +161,7 @@ public async Task Auto_unsubscribe_on_timeout_test() const string subject = "foo2"; var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(1) }; - await using var sub = await nats.SubscribeAsync(subject, opts); + await using var sub = await nats.SubscribeAsync(subject, opts: opts); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; @@ -183,7 +183,7 @@ public async Task Auto_unsubscribe_on_idle_timeout_test() const string subject = "foo3"; var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(3) }; - await using var sub = await nats.SubscribeAsync(subject, opts); + await using var sub = await nats.SubscribeAsync(subject, opts: opts); await nats.PublishAsync(subject, 0); await nats.PublishAsync(subject, 1); diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index f53558fe6..14c73fde7 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -62,8 +62,8 @@ public async Task Create_stream_test() ack = await js.PublishAsync( "events.foo", new TestData { Test = 2 }, - new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, }, - cts1.Token); + headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + cancellationToken: cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(2, ack.Seq); @@ -73,8 +73,8 @@ public async Task Create_stream_test() ack = await js.PublishAsync( "events.foo", new TestData { Test = 2 }, - new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, }, - cts1.Token); + headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + cancellationToken: cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(2, ack.Seq); diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs index ffdc2aba3..dffa40110 100644 --- a/tests/NATS.Client.JetStream.Tests/PublishTest.cs +++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs @@ -33,7 +33,7 @@ public async Task Publish_test() var ack1 = await js.PublishAsync( subject: "s1.foo", data: new TestData { Test = 2 }, - opts: new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "2" } } }, + headers: new NatsHeaders { { "Nats-Msg-Id", "2" } }, cancellationToken: cts.Token); Assert.Null(ack1.Error); Assert.Equal(2, ack1.Seq); @@ -42,7 +42,7 @@ public async Task Publish_test() var ack2 = await js.PublishAsync( subject: "s1.foo", data: new TestData { Test = 2 }, - opts: new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "2" } } }, + headers: new NatsHeaders { { "Nats-Msg-Id", "2" } }, cancellationToken: cts.Token); Assert.Null(ack2.Error); Assert.True(ack2.Duplicate); From ad004bd44d191997821a57ee75014a6185d575e3 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 11 Sep 2023 16:44:21 +0100 Subject: [PATCH 02/10] JetStream docs update * API docs update --- src/NATS.Client.JetStream/INatsJSConsume.cs | 6 ++ src/NATS.Client.JetStream/INatsJSFetch.cs | 6 ++ .../{ => Internal}/NatsJSConsume.cs | 5 +- .../Internal/NatsJSExtensionsInternal.cs | 2 +- .../{ => Internal}/NatsJSFetch.cs | 5 +- .../{ => Internal}/NatsJSResponse.cs | 13 +-- .../NatsJSApiException.cs | 12 +++ src/NATS.Client.JetStream/NatsJSConsumer.cs | 83 ++++++++++++++++- .../NatsJSContext.Consumers.cs | 47 ++++++++++ .../NatsJSContext.Streams.cs | 52 +++++++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 52 +++++++++++ src/NATS.Client.JetStream/NatsJSExtensions.cs | 7 ++ src/NATS.Client.JetStream/NatsJSMsg.cs | 58 ++++++++++++ .../NatsJSNotification.cs | 6 -- src/NATS.Client.JetStream/NatsJSOpts.cs | 12 +++ src/NATS.Client.JetStream/NatsJSStream.cs | 89 ++++++++++++++++++- 16 files changed, 426 insertions(+), 29 deletions(-) rename src/NATS.Client.JetStream/{ => Internal}/NatsJSConsume.cs (98%) rename src/NATS.Client.JetStream/{ => Internal}/NatsJSFetch.cs (98%) rename src/NATS.Client.JetStream/{ => Internal}/NatsJSResponse.cs (71%) create mode 100644 src/NATS.Client.JetStream/NatsJSApiException.cs delete mode 100644 src/NATS.Client.JetStream/NatsJSNotification.cs diff --git a/src/NATS.Client.JetStream/INatsJSConsume.cs b/src/NATS.Client.JetStream/INatsJSConsume.cs index 4ab2f7135..bae608922 100644 --- a/src/NATS.Client.JetStream/INatsJSConsume.cs +++ b/src/NATS.Client.JetStream/INatsJSConsume.cs @@ -2,11 +2,17 @@ namespace NATS.Client.JetStream; +/// +/// Interface to manage a consume() operation on a consumer. +/// public interface INatsJSConsume : IAsyncDisposable { void Stop(); } +/// +/// Interface to extract messages from a consume() operation on a consumer. +/// public interface INatsJSConsume : INatsJSConsume { ChannelReader> Msgs { get; } diff --git a/src/NATS.Client.JetStream/INatsJSFetch.cs b/src/NATS.Client.JetStream/INatsJSFetch.cs index 29b23eddf..e835678e2 100644 --- a/src/NATS.Client.JetStream/INatsJSFetch.cs +++ b/src/NATS.Client.JetStream/INatsJSFetch.cs @@ -2,11 +2,17 @@ namespace NATS.Client.JetStream; +/// +/// Interface to manage a fetch() operation on a consumer. +/// public interface INatsJSFetch : IAsyncDisposable { void Stop(); } +/// +/// Interface to extract messages from a fetch() operation on a consumer. +/// public interface INatsJSFetch : INatsJSFetch { ChannelReader> Msgs { get; } diff --git a/src/NATS.Client.JetStream/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs similarity index 98% rename from src/NATS.Client.JetStream/NatsJSConsume.cs rename to src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 2cc560eae..1a9a59e0d 100644 --- a/src/NATS.Client.JetStream/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -4,12 +4,11 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; -using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; -namespace NATS.Client.JetStream; +namespace NATS.Client.JetStream.Internal; -public class NatsJSConsume : NatsSubBase, INatsJSConsume +internal class NatsJSConsume : NatsSubBase, INatsJSConsume { private readonly ILogger _logger; private readonly bool _debug; diff --git a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs index 60d786a13..1673d95ee 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs @@ -9,5 +9,5 @@ public static class NatsJSExtensionsInternal public static bool HasTerminalJSError(this NatsHeaders headers) => headers is { Code: 400 } or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted } - or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted }; + or { Code: 409, Message: NatsHeaders.Messages.ConsumerIsPushBased }; } diff --git a/src/NATS.Client.JetStream/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs similarity index 98% rename from src/NATS.Client.JetStream/NatsJSFetch.cs rename to src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index fe2c0d750..c04049b15 100644 --- a/src/NATS.Client.JetStream/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -4,12 +4,11 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; -using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; -namespace NATS.Client.JetStream; +namespace NATS.Client.JetStream.Internal; -public class NatsJSFetch : NatsSubBase, INatsJSFetch +internal class NatsJSFetch : NatsSubBase, INatsJSFetch { private readonly ILogger _logger; private readonly bool _debug; diff --git a/src/NATS.Client.JetStream/NatsJSResponse.cs b/src/NATS.Client.JetStream/Internal/NatsJSResponse.cs similarity index 71% rename from src/NATS.Client.JetStream/NatsJSResponse.cs rename to src/NATS.Client.JetStream/Internal/NatsJSResponse.cs index 1237e42c8..46fe44b61 100644 --- a/src/NATS.Client.JetStream/NatsJSResponse.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSResponse.cs @@ -1,12 +1,12 @@ using NATS.Client.JetStream.Models; -namespace NATS.Client.JetStream; +namespace NATS.Client.JetStream.Internal; /// /// JetStream response including an optional error property encapsulating both successful and failed calls. /// /// JetStream response type -public readonly struct NatsJSResponse +internal readonly struct NatsJSResponse { internal NatsJSResponse(T? response, ApiError? error) { @@ -28,12 +28,3 @@ public void EnsureSuccess() } } } - -public class NatsJSApiException : NatsJSException -{ - public NatsJSApiException(ApiError error) - : base(error.Description) => - Error = error; - - public ApiError Error { get; } -} diff --git a/src/NATS.Client.JetStream/NatsJSApiException.cs b/src/NATS.Client.JetStream/NatsJSApiException.cs new file mode 100644 index 000000000..171c9ace6 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSApiException.cs @@ -0,0 +1,12 @@ +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public class NatsJSApiException : NatsJSException +{ + public NatsJSApiException(ApiError error) + : base(error.Description) => + Error = error; + + public ApiError Error { get; } +} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index e89d15b8f..f3e341d7c 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -6,6 +6,9 @@ namespace NATS.Client.JetStream; +/// +/// Represents a NATS JetStream consumer. +/// public class NatsJSConsumer { private readonly NatsJSContext _context; @@ -13,7 +16,7 @@ public class NatsJSConsumer private readonly string _consumer; private volatile bool _deleted; - public NatsJSConsumer(NatsJSContext context, ConsumerInfo info) + internal NatsJSConsumer(NatsJSContext context, ConsumerInfo info) { _context = context; Info = info; @@ -21,14 +24,33 @@ public NatsJSConsumer(NatsJSContext context, ConsumerInfo info) _consumer = Info.Name; } + /// + /// Consumer info object as retrieved from NATS JetStream server at the time this object was created, updated or refreshed. + /// public ConsumerInfo Info { get; private set; } + /// + /// Delete this consumer. + /// + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// After deletion this object can't be used anymore. public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken); } + /// + /// Starts an enumerator consuming messages from the stream using this consumer. + /// + /// Consume options. (default: MaxMsgs 1,000) + /// A used to cancel the call. + /// Message type to deserialize. + /// Async enumerable of messages which can be used in a await foreach loop. + /// Consumer is deleted, it's push based or request sent to server is invalid. public async IAsyncEnumerable> ConsumeAllAsync( NatsJSConsumeOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -41,6 +63,14 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d } } + /// + /// Starts consuming messages from the stream using this consumer. + /// + /// Consume options. (default: MaxMsgs 1,000) + /// A used to cancel the call. + /// Message type to deserialize. + /// A consume object to manage the operation and retrieve messages. + /// Consumer is deleted, it's push based or request sent to server is invalid. public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); @@ -101,6 +131,35 @@ await sub.CallMsgNextAsync( return sub; } + /// + /// Consume a single message from the stream using this consumer. + /// + /// Next message options. (default: 30 seconds timeout) + /// A used to cancel the call. + /// Message type to deserialize. + /// Message retrieved from the stream or NULL + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// + /// + /// If the request to server expires (in 30 seconds by default) this call returns NULL. + /// + /// + /// This method is implemented as a fetch with MaxMsgs=1 which means every request will create a new subscription + /// on the NATS server. This would be inefficient if you're consuming a lot of messages and you should consider using + /// fetch or consume methods. + /// + /// + /// + /// The following example shows how you might process messages: + /// + /// var next = await consumer.NextAsync<Data>(); + /// if (next is { } msg) + /// { + /// // process the message + /// await msg.AckAsync(); + /// } + /// + /// public async ValueTask?> NextAsync(NatsJSNextOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); @@ -124,6 +183,14 @@ await sub.CallMsgNextAsync( return default; } + /// + /// Consume a set number of messages from the stream using this consumer. + /// + /// Fetch options. (default: MaxMsgs 1,000 and timeout in 30 seconds) + /// A used to cancel the call. + /// Message type to deserialize. + /// Async enumerable of messages which can be used in a await foreach loop. + /// Consumer is deleted, it's push based or request sent to server is invalid. public async IAsyncEnumerable> FetchAllAsync( NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -138,6 +205,14 @@ await sub.CallMsgNextAsync( } } + /// + /// Consume a set number of messages from the stream using this consumer. + /// + /// Fetch options. (default: MaxMsgs 1,000 and timeout in 30 seconds) + /// A used to cancel the call. + /// Message type to deserialize. + /// A fetch object to manage the operation and retrieve messages. + /// Consumer is deleted, it's push based or request sent to server is invalid. public async ValueTask> FetchAsync( NatsJSFetchOpts? opts = default, CancellationToken cancellationToken = default) @@ -197,6 +272,12 @@ await sub.CallMsgNextAsync( return sub; } + /// + /// Retrieve the consumer info from the server and update this consumer. + /// + /// A used to cancel the API call. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => Info = await _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}", diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index 3b7bd8019..389bb5287 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -5,6 +5,16 @@ namespace NATS.Client.JetStream; public partial class NatsJSContext { + /// + /// Creates new consumer if it doesn't exists or returns an existing one with the same name. + /// + /// Stream name to create the consumer under. + /// Name of the consumer. + /// Ack policy to use. Must not be set to none. Default is explicit. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. public ValueTask CreateConsumerAsync( string stream, string consumer, @@ -23,6 +33,14 @@ public ValueTask CreateConsumerAsync( }, cancellationToken); + /// + /// Creates new consumer if it doesn't exists or returns an existing one with the same name. + /// + /// Consumer creation request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. public async ValueTask CreateConsumerAsync( ConsumerCreateRequest request, CancellationToken cancellationToken = default) @@ -46,6 +64,15 @@ public async ValueTask CreateConsumerAsync( return new NatsJSConsumer(this, response); } + /// + /// Gets consumer information from the server and creates a NATS JetStream consumer . + /// + /// Stream name where consumer is associated to. + /// Consumer name. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) { var response = await JSRequestResponseAsync( @@ -55,6 +82,17 @@ public async ValueTask GetConsumerAsync(string stream, string co return new NatsJSConsumer(this, response); } + /// + /// Enumerates through consumers belonging to a stream. + /// + /// Stream name the consumers belong to. + /// A used to cancel the API call. + /// Async enumerable of consumer objects. Can be used in a await foreach loop. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. + /// public async IAsyncEnumerable ListConsumersAsync( string stream, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -67,6 +105,15 @@ public async IAsyncEnumerable ListConsumersAsync( yield return new NatsJSConsumer(this, consumer); } + /// + /// Delete a consumer from a stream. + /// + /// Stream name where consumer is associated to. + /// Consumer name to be deleted. + /// A used to cancel the API call. + /// Whether the deletion was successful. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) { var response = await JSRequestResponseAsync( diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index dc8ae6a8b..3bd5d8a8a 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -5,9 +5,26 @@ namespace NATS.Client.JetStream; public partial class NatsJSContext { + /// + /// Creates a new stream if it doesn't exist or returns an existing stream with the same name. + /// + /// Name of the stream to create. (e.g. my_events) + /// List of subjects stream will persist messages from. (e.g. events.*) + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public ValueTask CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default) => CreateStreamAsync(new StreamCreateRequest { Name = stream, Subjects = subjects }, cancellationToken); + /// + /// Creates a new stream if it doesn't exist or returns an existing stream with the same name. + /// + /// Stream configuration request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask CreateStreamAsync( StreamConfiguration request, CancellationToken cancellationToken = default) @@ -19,6 +36,14 @@ public async ValueTask CreateStreamAsync( return new NatsJSStream(this, response); } + /// + /// Deletes a stream. + /// + /// Stream name to be deleted. + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask DeleteStreamAsync( string stream, CancellationToken cancellationToken = default) @@ -30,6 +55,14 @@ public async ValueTask DeleteStreamAsync( return response.Success; } + /// + /// Get stream information from the server and creates a NATS JetStream stream object . + /// + /// Name of the stream to retrieve. + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask GetStreamAsync( string stream, CancellationToken cancellationToken = default) @@ -41,6 +74,14 @@ public async ValueTask GetStreamAsync( return new NatsJSStream(this, response); } + /// + /// Update a NATS JetStream stream's properties. + /// + /// Stream update request object to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The updated NATS JetStream stream object. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask UpdateStreamAsync( StreamUpdateRequest request, CancellationToken cancellationToken = default) @@ -52,6 +93,17 @@ public async ValueTask UpdateStreamAsync( return new NatsJSStream(this, response); } + /// + /// Enumerates through the streams exists on the NATS JetStream server. + /// + /// Limit the list to streams matching this subject filter. + /// A used to cancel the API call. + /// Async enumerable of stream objects. Can be used in a await foreach loop. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of streams if there are a lot of them. + /// public async IAsyncEnumerable ListStreamsAsync( string? subject = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 09cc46732..4a5298bbe 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -5,13 +5,20 @@ namespace NATS.Client.JetStream; +/// Provides management and access to NATS JetStream streams and consumers. public partial class NatsJSContext { + /// > public NatsJSContext(NatsConnection connection) : this(connection, new NatsJSOpts(connection.Opts)) { } + /// + /// Creates a NATS JetStream context used to manage and access streams and consumers. + /// + /// A NATS server connection to access the JetStream APIs, publishers and consumers. + /// Context wide JetStream options. public NatsJSContext(NatsConnection connection, NatsJSOpts opts) { Connection = connection; @@ -22,19 +29,58 @@ public NatsJSContext(NatsConnection connection, NatsJSOpts opts) internal NatsJSOpts Opts { get; } + /// + /// Calls JetStream Account Info API. + /// + /// A used to cancel the API call. + /// The account information based on the NATS connection credentials. public ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default) => JSRequestResponseAsync( subject: $"{Opts.Prefix}.INFO", request: null, cancellationToken); + /// + /// Sends data to a stream associated with the subject. + /// + /// Subject to publish the data to. + /// Data to publish. + /// Sets Nats-Msg-Id header for idempotent message writes. + /// Optional message headers. + /// Options to be used by publishing command. + /// A used to cancel the publishing call or the wait for response. + /// Type of the data being sent. + /// + /// The ACK response to indicate if stream accepted the message as well as additional + /// information like the sequence number of the message stored by the stream. + /// + /// There was a problem receiving the response. + /// + /// + /// Note that if the subject isn't backed by a stream or the connected NATS server + /// isn't running with JetStream enabled, this call will hang waiting for an ACK + /// until the request times out. + /// + /// + /// By setting msgId you can ensure messages written to a stream only once. JetStream support idempotent + /// message writes by ignoring duplicate messages as indicated by the Nats-Msg-Id header. If both msgId + /// and the Nats-Msg-Id header value was set, msgId parameter value will be used. + /// + /// public async ValueTask PublishAsync( string subject, T? data, + string? msgId = default, NatsHeaders? headers = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { + if (msgId != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Msg-Id"] = msgId; + } + await using var sub = await Connection.RequestSubAsync( subject: subject, data: data, @@ -127,11 +173,17 @@ internal async ValueTask> JSRequestAsync +/// The exception that is thrown when JetStream publish acknowledgment indicates a duplicate sequence error. +/// public class NatsJSDuplicateMessageException : NatsJSException { public NatsJSDuplicateMessageException(long sequence) : base($"Duplicate of {sequence}") => Sequence = sequence; + /// + /// The duplicate sequence number. + /// public long Sequence { get; } } diff --git a/src/NATS.Client.JetStream/NatsJSExtensions.cs b/src/NATS.Client.JetStream/NatsJSExtensions.cs index 8fcea7484..1d35a0403 100644 --- a/src/NATS.Client.JetStream/NatsJSExtensions.cs +++ b/src/NATS.Client.JetStream/NatsJSExtensions.cs @@ -4,6 +4,13 @@ namespace NATS.Client.JetStream; public static class NatsJSExtensions { + /// + /// Make sure acknowledgment was successful and throw an exception otherwise. + /// + /// ACK response. + /// is NULL. + /// Server responded with an error. + /// A message with the same Nats-Msg-Id was received before. public static void EnsureSuccess(this PubAckResponse ack) { if (ack == null) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index babd95d11..b3d27f77a 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -19,22 +19,80 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) _context = context; } + /// + /// Subject of the user message. + /// public string Subject => _msg.Subject; + /// + /// Message size in bytes. + /// + /// + /// Message size is calculated using the same method NATS server uses: + /// + /// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; + /// + /// public int Size => _msg.Size; + /// + /// Headers of the user message if set. + /// public NatsHeaders? Headers => _msg.Headers; + /// + /// Deserialized user data. + /// public T? Data => _msg.Data; + /// + /// The connection messages was delivered on. + /// public INatsConnection? Connection => _msg.Connection; + /// + /// Acknowledges the message was completely handled. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); + /// + /// Signals that the message will not be processed now and processing can move onto the next message. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + /// + /// Messages rejected using NACK will be resent by the NATS JetStream server after the configured timeout. + /// public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); + /// + /// Indicates that work is ongoing and the wait period should be extended. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. + /// + /// + /// Time period is defined by the consumer's ack_wait configuration on the server which is + /// defined as how long to allow messages to remain un-acknowledged before attempting redelivery. + /// + /// + /// This message must be sent before the ack_wait period times-out. The period should be extended + /// by another amount of time equal to ack_wait by the NATS JetStream server. + /// + /// public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); + /// + /// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed. + /// + /// Ack options. + /// A used to cancel the call. + /// A representing the async call. public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, CancellationToken cancellationToken = default) diff --git a/src/NATS.Client.JetStream/NatsJSNotification.cs b/src/NATS.Client.JetStream/NatsJSNotification.cs deleted file mode 100644 index 34232e218..000000000 --- a/src/NATS.Client.JetStream/NatsJSNotification.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace NATS.Client.JetStream; - -public record NatsJSNotification(int Code, string Description) -{ - public static readonly NatsJSNotification HeartbeatTimeout = new NatsJSNotification(1001, "Heartbeat Timeout"); -} diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index af87b587d..c2bd7d6b4 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -2,6 +2,9 @@ namespace NATS.Client.JetStream; +/// +/// JetStream options to be used within a JetStream context. +/// public record NatsJSOpts { public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, string? domain = default, AckOpts? ackOpts = default) @@ -56,6 +59,9 @@ public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, string? domain = d public NatsJSNextOpts DefaultNextOpts { get; init; } = new(); } +/// +/// Consumer consume method options. +/// public record NatsJSConsumeOpts { /// @@ -98,6 +104,9 @@ public record NatsJSConsumeOpts public INatsSerializer? Serializer { get; init; } } +/// +/// Consumer next method options. +/// public record NatsJSNextOpts { /// @@ -120,6 +129,9 @@ public record NatsJSNextOpts public INatsSerializer? Serializer { get; init; } } +/// +/// Consumer fetch method options. +/// public record NatsJSFetchOpts { /// diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 970f63b62..5e1a9a410 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -1,29 +1,49 @@ -using System.Runtime.CompilerServices; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; +/// +/// Represents a NATS JetStream stream. +/// public class NatsJSStream { private readonly NatsJSContext _context; private readonly string _name; private bool _deleted; - public NatsJSStream(NatsJSContext context, StreamInfo info) + internal NatsJSStream(NatsJSContext context, StreamInfo info) { _context = context; Info = info; _name = info.Config.Name; } + /// + /// Stream info object as retrieved from NATS JetStream server at the time this object was created, updated or refreshed. + /// public StreamInfo Info { get; private set; } + /// + /// Delete this stream. + /// + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// After deletion this object can't be used anymore. public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _deleted = await _context.DeleteStreamAsync(_name, cancellationToken); } + /// + /// Update stream properties on the server. + /// + /// Stream update request to be sent to the server. + /// A used to cancel the API call. + /// There was an issue retrieving the response. + /// Server responded with an error. public async ValueTask UpdateAsync( StreamUpdateRequest request, CancellationToken cancellationToken = default) @@ -33,30 +53,91 @@ public async ValueTask UpdateAsync( Info = response.Info; } - public IAsyncEnumerable ListConsumersAsync(CancellationToken cancellationToken = default) + /// + /// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name. + /// + /// Name of the consumer. + /// Ack policy to use. Must not be set to none. Default is explicit. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. + public ValueTask CreateConsumerAsync(string consumer, ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, CancellationToken cancellationToken = default) { ThrowIfDeleted(); - return _context.ListConsumersAsync(_name, cancellationToken); + return _context.CreateConsumerAsync(_name, consumer, ackPolicy, cancellationToken); } + /// + /// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name. + /// + /// Consumer creation request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. public ValueTask CreateConsumerAsync(ConsumerCreateRequest request, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.CreateConsumerAsync(request, cancellationToken); } + /// + /// Gets consumer information from the server and creates a NATS JetStream consumer . + /// + /// Consumer name. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. public ValueTask GetConsumerAsync(string consumer, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.GetConsumerAsync(_name, consumer, cancellationToken); } + /// + /// Enumerates through consumers belonging to this stream. + /// + /// A used to cancel the API call. + /// Async enumerable of consumer objects. Can be used in a await foreach loop. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. + /// + public IAsyncEnumerable ListConsumersAsync(CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _context.ListConsumersAsync(_name, cancellationToken); + } + + /// + /// Delete a consumer from this stream. + /// + /// Consumer name to be deleted. + /// A used to cancel the API call. + /// Whether the deletion was successful. + /// There was an issue retrieving the response. + /// Server responded with an error. public ValueTask DeleteConsumerAsync(string consumer, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.DeleteConsumerAsync(_name, consumer, cancellationToken); } + /// + /// Retrieve the stream info from the server and update this stream. + /// + /// A used to cancel the API call. + /// There was an issue retrieving the response. + /// Server responded with an error. + public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => + Info = await _context.JSRequestResponseAsync( + subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_name}", + request: null, + cancellationToken).ConfigureAwait(false); + private void ThrowIfDeleted() { if (_deleted) From e8dad230544b30a7a8f6b3d92d928dccf49adca0 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 11 Sep 2023 16:56:32 +0100 Subject: [PATCH 03/10] JetStream exception docs --- .../NatsJSApiException.cs | 12 ---- src/NATS.Client.JetStream/NatsJSContext.cs | 15 ----- src/NATS.Client.JetStream/NatsJSException.cs | 63 +++++++++++++++++++ 3 files changed, 63 insertions(+), 27 deletions(-) delete mode 100644 src/NATS.Client.JetStream/NatsJSApiException.cs diff --git a/src/NATS.Client.JetStream/NatsJSApiException.cs b/src/NATS.Client.JetStream/NatsJSApiException.cs deleted file mode 100644 index 171c9ace6..000000000 --- a/src/NATS.Client.JetStream/NatsJSApiException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using NATS.Client.JetStream.Models; - -namespace NATS.Client.JetStream; - -public class NatsJSApiException : NatsJSException -{ - public NatsJSApiException(ApiError error) - : base(error.Description) => - Error = error; - - public ApiError Error { get; } -} diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 4a5298bbe..bd149652b 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -172,18 +172,3 @@ internal async ValueTask> JSRequestAsync -/// The exception that is thrown when JetStream publish acknowledgment indicates a duplicate sequence error. -/// -public class NatsJSDuplicateMessageException : NatsJSException -{ - public NatsJSDuplicateMessageException(long sequence) - : base($"Duplicate of {sequence}") => - Sequence = sequence; - - /// - /// The duplicate sequence number. - /// - public long Sequence { get; } -} diff --git a/src/NATS.Client.JetStream/NatsJSException.cs b/src/NATS.Client.JetStream/NatsJSException.cs index 83e0a3f32..24b8cdb3c 100644 --- a/src/NATS.Client.JetStream/NatsJSException.cs +++ b/src/NATS.Client.JetStream/NatsJSException.cs @@ -1,29 +1,92 @@ using NATS.Client.Core; +using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; +/// +/// Generic JetStream exception. +/// public class NatsJSException : NatsException { + /// + /// Create JetStream generic exception. + /// + /// Error message. public NatsJSException(string message) : base(message) { } + /// + /// Create JetStream generic exception. + /// + /// Error message. + /// Inner exception. public NatsJSException(string message, Exception exception) : base(message, exception) { } } +/// +/// JetStream protocol errors received during message consumption. +/// public class NatsJSProtocolException : NatsJSException { + /// + /// Create JetStream protocol exception. + /// + /// Error message. public NatsJSProtocolException(string message) : base(message) { } + /// + /// Create JetStream protocol exception. + /// + /// Error message. + /// Inner exception. public NatsJSProtocolException(string message, Exception exception) : base(message, exception) { } } + +/// +/// The exception that is thrown when JetStream publish acknowledgment indicates a duplicate sequence error. +/// +public class NatsJSDuplicateMessageException : NatsJSException +{ + /// + /// Create JetStream duplicate message exception. + /// + /// The duplicate sequence number. + public NatsJSDuplicateMessageException(long sequence) + : base($"Duplicate of {sequence}") => + Sequence = sequence; + + /// + /// The duplicate sequence number. + /// + public long Sequence { get; } +} + +/// +/// JetStream API call errors. +/// +public class NatsJSApiException : NatsJSException +{ + /// + /// Create JetStream API exception. + /// + /// Error response received from the server. + public NatsJSApiException(ApiError error) + : base(error.Description) => + Error = error; + + /// + /// API error response received from the server. + /// + public ApiError Error { get; } +} From 9466a983fc74be8bfe5fb8c05571a7c637d64b2d Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 11 Sep 2023 21:20:06 +0100 Subject: [PATCH 04/10] JetStream docs structure --- docs/documentation/core/intro.md | 2 ++ docs/documentation/{ => core}/pub-sub.md | 0 docs/documentation/{ => core}/queue.md | 0 docs/documentation/{ => core}/req-rep.md | 0 docs/documentation/jetstream/consume.md | 1 + docs/documentation/jetstream/intro.md | 1 + docs/documentation/jetstream/manage.md | 5 +++++ docs/documentation/jetstream/publish.md | 1 + docs/documentation/toc.yml | 25 +++++++++++++++++------- 9 files changed, 28 insertions(+), 7 deletions(-) create mode 100644 docs/documentation/core/intro.md rename docs/documentation/{ => core}/pub-sub.md (100%) rename docs/documentation/{ => core}/queue.md (100%) rename docs/documentation/{ => core}/req-rep.md (100%) create mode 100644 docs/documentation/jetstream/consume.md create mode 100644 docs/documentation/jetstream/intro.md create mode 100644 docs/documentation/jetstream/manage.md create mode 100644 docs/documentation/jetstream/publish.md diff --git a/docs/documentation/core/intro.md b/docs/documentation/core/intro.md new file mode 100644 index 000000000..98a0e8af6 --- /dev/null +++ b/docs/documentation/core/intro.md @@ -0,0 +1,2 @@ +# Core NATS + diff --git a/docs/documentation/pub-sub.md b/docs/documentation/core/pub-sub.md similarity index 100% rename from docs/documentation/pub-sub.md rename to docs/documentation/core/pub-sub.md diff --git a/docs/documentation/queue.md b/docs/documentation/core/queue.md similarity index 100% rename from docs/documentation/queue.md rename to docs/documentation/core/queue.md diff --git a/docs/documentation/req-rep.md b/docs/documentation/core/req-rep.md similarity index 100% rename from docs/documentation/req-rep.md rename to docs/documentation/core/req-rep.md diff --git a/docs/documentation/jetstream/consume.md b/docs/documentation/jetstream/consume.md new file mode 100644 index 000000000..c63dbeb92 --- /dev/null +++ b/docs/documentation/jetstream/consume.md @@ -0,0 +1 @@ +# Consuming Messages from Streams diff --git a/docs/documentation/jetstream/intro.md b/docs/documentation/jetstream/intro.md new file mode 100644 index 000000000..8544d8ed8 --- /dev/null +++ b/docs/documentation/jetstream/intro.md @@ -0,0 +1 @@ +# JetStream \ No newline at end of file diff --git a/docs/documentation/jetstream/manage.md b/docs/documentation/jetstream/manage.md new file mode 100644 index 000000000..2617caf64 --- /dev/null +++ b/docs/documentation/jetstream/manage.md @@ -0,0 +1,5 @@ +# Managing JetStream + +## Streams + +## Consumers diff --git a/docs/documentation/jetstream/publish.md b/docs/documentation/jetstream/publish.md new file mode 100644 index 000000000..f463c9b6c --- /dev/null +++ b/docs/documentation/jetstream/publish.md @@ -0,0 +1 @@ +# Publishing Messages to Streams diff --git a/docs/documentation/toc.yml b/docs/documentation/toc.yml index 8871b7202..dbf3026db 100644 --- a/docs/documentation/toc.yml +++ b/docs/documentation/toc.yml @@ -1,14 +1,25 @@ - name: Introduction href: intro.md -- name: Publish-Subscribe - href: pub-sub.md +- name: Core + href: core/intro.md + items: + - name: Publish-Subscribe + href: core/pub-sub.md + - name: Request-Reply + href: core/req-rep.md + - name: Queue Groups + href: core/queue.md -- name: Request-Reply - href: req-rep.md - -- name: Queue Groups - href: queue.md +- name: JetStream + href: jetstream/intro.md + items: + - name: Managing + href: jetstream/manage.md + - name: Publishing + href: jetstream/publish.md + - name: Consuming + href: jetstream/consume.md - name: Updating Documentation href: update-docs.md From d2aab31974e5a7cad2243837fc1d026163ee5570 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 11 Sep 2023 21:30:49 +0100 Subject: [PATCH 05/10] JetStream in-line docs typo fix --- src/NATS.Client.JetStream/NatsJSMsg.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index b3d27f77a..c0a4dfc54 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -81,7 +81,7 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// defined as how long to allow messages to remain un-acknowledged before attempting redelivery. /// /// - /// This message must be sent before the ack_wait period times-out. The period should be extended + /// This message must be sent before the ack_wait period elapses. The period should be extended /// by another amount of time equal to ack_wait by the NATS JetStream server. /// /// From c7ad8ff4a677487fc6d3e2fefa92209fbaba61d6 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 12 Sep 2023 01:21:39 +0100 Subject: [PATCH 06/10] JetStream documentation --- docs/documentation/core/intro.md | 13 +++ docs/documentation/intro.md | 4 + docs/documentation/jetstream/consume.md | 107 ++++++++++++++++++++++++ docs/documentation/jetstream/intro.md | 83 +++++++++++++++++- docs/documentation/jetstream/manage.md | 54 +++++++++++- docs/documentation/jetstream/publish.md | 49 +++++++++++ 6 files changed, 308 insertions(+), 2 deletions(-) diff --git a/docs/documentation/core/intro.md b/docs/documentation/core/intro.md index 98a0e8af6..f06e11687 100644 --- a/docs/documentation/core/intro.md +++ b/docs/documentation/core/intro.md @@ -1,2 +1,15 @@ # Core NATS +[Core NATS](https://docs.nats.io/nats-concepts/core-nats) is the base set of functionalities and qualities of service +offered by a NATS service infrastructure. Core NATS is the foundation for JetStream and other services. For the sake +of explanation, in a simplified sense you can think of Core NATS as the +[wire protocol](https://docs.nats.io/reference/reference-protocols/nats-protocol) defining a simple but powerful +pub/sub functionality and the concept of [Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects). + +[Publish-Subscribe](pub-sub.md) is the message distribution model for one-to-many communication. + +[Request-Reply](req-rep.md) is a common pattern in modern distributed systems. A request is sent, and the application +either waits on the response with a certain timeout, or receives a response asynchronously. + +[Queue Groups](queue.md) enables the 1:N fan-out pattern of messaging ensuring that any message sent by a publisher, +reaches all subscribers that have registered. diff --git a/docs/documentation/intro.md b/docs/documentation/intro.md index 9e8ed980c..2776a1894 100644 --- a/docs/documentation/intro.md +++ b/docs/documentation/intro.md @@ -7,3 +7,7 @@ asynchronous programming model. NATS.NET V2 Client, just like NATS, is Open Source as is this documentation. Please [let us know](https://natsio.slack.com/channels/dotnet) if you have updates and/or suggestions for these docs. You can also create a Pull Request using the Edit on GitHub link on each page. + +[Core NATS](core/intro.md) is the base set of functionalities and qualities of service offered by a NATS service infrastructure. + +[JetStream](jetstream/intro.md) is the built-in distributed persistence system built-in to the same NATS server binary. diff --git a/docs/documentation/jetstream/consume.md b/docs/documentation/jetstream/consume.md index c63dbeb92..fa505068e 100644 --- a/docs/documentation/jetstream/consume.md +++ b/docs/documentation/jetstream/consume.md @@ -1 +1,108 @@ # Consuming Messages from Streams + +Consuming messages from a stream can be done using one of three different methods depending on you application needs. +You can access these methods from the consumer object you can create using JetStream context: + +```csharp +await using var nats = new NatsConnection(); +var js = new NatsJSContext(nats); + +var consumer = await js.CreateConsumerAsync(stream: "orders", consumer: "order_processor"); +``` + +## Next Method + +Next method is the simplest was of retrieving messages from a stream. Every time you call the next method you get +a single message or nothing based on the expiry time to wait for a message. Once a message is received you can +process it and call next again for another. + +```csharp +while (!cancellationToken.IsCancellationRequested) +{ + var next = await consumer.NextAsync(); + + if (next is { } msg) + { + Console.WriteLine($"Processing {msg.Subject}: {msg.Data.OrderId}..."); + await msg.AckAsync(); + } +} +``` + +Next is the simplest and most conservative way of consuming messages since you request a single message from JetStream +server then acknowledge it before requesting more messages. However, next method is also the least performant since +there is not message batching. + +## Fetch Method + +Fetch method requests messages in batches to improve the performance while giving the application the control over how +fast it can process the messages without overwhelming the application process. + +```csharp +while (!cancellationToken.IsCancellationRequested) +{ + // Consume a batch of messages (1000 by default) + await foreach (var msg in consumer.FetchAllAsync()) + { + // Process message + await msg.AckAsync(); + } +} +``` + +## Consume Method + +Consume method is the most performant method of consuming messages. Request for messages (a.k.a. pull requests) are +interleaved so that there is a constant flow of messages from the JetStream server. Flow is controlled by `MaxMsgs` +or `MaxBytes` and respective thresholds not to overwhelm the application and not to waste server resources. + +```csharp +await foreach (var msg in consumer.ConsumeAllAsync()) +{ + // Process message + await msg.AckAsync(); + + // loop never exits unless there is an error or a break +} +``` + +## Handling Exceptions + +While consuming messages (using next, fetch or consume methods) there are several scenarios where exception might be +thrown by the client library, for example: + +* Consumer is deleted by another application or operator +* Connection to NATS server is interrupted (mainly for next and fetch methods, consume method can recover) +* Client request for the next batch is invalid +* Account permissions have changed +* Cluster leader changed + +A naive implementation might try to recover from errors assuming they are temporary e.g. the stream or the consumer +will be created eventually: + +```csharp +while (!cancellationToken.IsCancellationRequested) +{ + try + { + await consumer.RefreshAsync(); // or try to recreate consumer + await foreach (var msg in consumer.ConsumeAllAsync()) + { + // Process message + await msg.AckAsync(); + } + } + catch (NatsJSProtocolException e) + { + // log exception + } + catch (NatsJSException e) + { + // log exception + await Task.Delay(1000); // or back off + } +} +``` + +Depending on your application you should configure streams and consumers with appropriate settings so that the +messages are processed and stored based on your requirements. diff --git a/docs/documentation/jetstream/intro.md b/docs/documentation/jetstream/intro.md index 8544d8ed8..d2ce3f2ad 100644 --- a/docs/documentation/jetstream/intro.md +++ b/docs/documentation/jetstream/intro.md @@ -1 +1,82 @@ -# JetStream \ No newline at end of file +# JetStream + +[JetStream](https://docs.nats.io/nats-concepts/jetstream) is the built-in distributed persistence system which enables +new functionalities and higher qualities of service on top of the base _Core NATS_ functionalities and qualities of service. + +JetStream is built-in to nats-server and you only need 1 (or 3 or 5 if you want fault-tolerance against 1 or 2 +simultaneous NATS server failures) of your NATS server(s) to be JetStream enabled for it to be available to all the +client applications. + +JetStream can be enabled by running the server with `-js` flag e.g. `nats-server -js`. + +## Streaming: temporal decoupling between the publishers and subscribers + +One of the tenets of basic publish/subscribe messaging is that there is a required temporal coupling between the +publishers and the subscribers: subscribers only receive the messages that are published when they are actively +connected to the messaging system (i.e. they do not receive messages that are published while they are not subscribing +or not running or disconnected). + +Streams capture and store messages published on one (or more) subject and allow client applications to create +consumers at any time to 'replay' (or consume) all or some of the messages stored in the stream. + +Streams are message stores, each stream defines how messages are stored and what the limits (duration, size, interest) +of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured +in the defined storage system. + +A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a +stream and will keep track of which messages were delivered and acknowledged by clients. + +## Quick Start + +[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it with JetStream enabled: + +```shell +$ nats-server -js +``` + +NATS server will listen on its default TCP port 4222. + +Then create a .NET 6 (or above) console project, add `NATS.Client.JetStream` preview NuGet package and paste the below +code snippet into `Program.cs`: + +```csharp +using System; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.JetStream; + +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; + +await using var nc = new NatsConnection(opts); +var js = new NatsJSContext(nc); + +await js.CreateStreamAsync("orders", subjects: new []{"orders.>"}); + +for (var i = 0; i < 10; i++) +{ + var ack = await js.PublishAsync($"orders.new.{i}", new Order(i)); + ack.EnsureSuccess(); +} + +var consumer = await js.CreateConsumerAsync("orders", "order_processor"); + +Console.WriteLine($"Consume..."); +await foreach (var msg in consumer.ConsumeAllAsync()) +{ + var order = msg.Data; + Console.WriteLine($"Processing {msg.Subject} {order}..."); + await msg.AckAsync(); + if (order.OrderId == 5) + break; +} +Console.WriteLine($"Done consuming."); + +public record Order(int OrderId); +``` + +[Managing JetStream](manage.md) covers how to create, update, get, list and delete streams and consumers. + +[Publishing messages to streams](publish.md) is achieved by simply publishing to a subject where a stream is configured +to be interested in that subject. + +[Consuming messages from streams](consume.md) explains different ways of retrieving persisted messages. diff --git a/docs/documentation/jetstream/manage.md b/docs/documentation/jetstream/manage.md index 2617caf64..82d94d2be 100644 --- a/docs/documentation/jetstream/manage.md +++ b/docs/documentation/jetstream/manage.md @@ -1,5 +1,57 @@ -# Managing JetStream +# Managing JetStream and the JetStream Context + +_JetStream Context_ is a NATS JetStream Client concept which is mainly responsible for managing streams. It serves as +the entry point for creating, configuring, and controlling the streams. JetStream Context also exposes methods to +manage consumers directly, bypassing the need to get or create a stream first. + +You can create a context using an existing NATS connection: + +```csharp +await using var nats = new NatsConnection(); + +var js = new NatsJSContext(nats); +``` ## Streams +Streams are _message stores_, each stream defines how messages are stored and what the limits (duration, size, interest) +of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in +the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it's better +to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was +successfully stored. + +An example of creating a stream: + +```csharp +await js.CreateStreamAsync("orders", subjects: new []{"orders.>"}); +``` + +However, in practice streams are usually managed separately from the applications, for example using the [NATS command +line client](https://github.com/nats-io/natscli) you can create a stream interactively: + +```shell +$ nats stream create my_events --subjects 'events.*' +? Storage [Use arrows to move, type to filter, ? for more help] +> file + memory +# you can safely choose defaults for testing and development +``` + +Refer to [NATS JetStream documentation](https://docs.nats.io/nats-concepts/jetstream#functionalities-enabled-by-jetstream) +for stream concepts and more information. + ## Consumers + +A [consumer](https://docs.nats.io/nats-concepts/jetstream/consumers) is a stateful view of a stream. It acts as +interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were +delivered and acknowledged by clients. + +Unlike streams, consumers are accessed by NATS client libraries as part of message consumption: + +```csharp +// Create or get a consumer +var consumer = await js.CreateConsumerAsync(stream: "orders", consumer: "order_processor"); + +// Get an existing consumer +var consumer = await js.GetConsumerAsync(stream: "orders", consumer: "order_processor"); +``` diff --git a/docs/documentation/jetstream/publish.md b/docs/documentation/jetstream/publish.md index f463c9b6c..05028e65c 100644 --- a/docs/documentation/jetstream/publish.md +++ b/docs/documentation/jetstream/publish.md @@ -1 +1,50 @@ # Publishing Messages to Streams + +If you want to persist your messages you can do a normal publish to the subject for unacknowledged delivery, though +it's better to use the JetStream context publish calls instead as the JetStream server will reply with an acknowledgement +that it was successfully stored. + +The subject must be configured on a stream to be persisted: + +```csharp +await using var nats = new NatsConnection(); +var js = new NatsJSContext(nats); + +await js.CreateStreamAsync("orders", subjects: new []{"orders.>"}); +``` + +or using the nats cli: + +```shell +$ nats stream create orders --subjects 'orders.>' +``` + +Then you can publish to subjects captured by the stream: + +```csharp +await using var nats = new NatsConnection(); +var js = new NatsJSContext(nats); + +var order = new Order { OrderId = 1 }; + +var ack = await js.PublishAsync("orders.new.1", order); + +ack.EnsureSuccess(); + +public record Order(int OrderId); +``` + +## Message Deduplication + +JetStream support +[idempotent message writes](https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication) +by ignoring duplicate messages as indicated by the message ID. Message ID is not pert of the message but rather passed +as meta data, part of the message headers. + +```csharp +var ack = await js.PublishAsync("orders.new.1", order, msgId: "1"); +if (ack.Duplicate) +{ + // A message with the same ID was published before +} +``` From fbff85c010ee989b07f6c098878dc4f30afaec27 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 12 Sep 2023 11:39:00 +0100 Subject: [PATCH 07/10] JetStream index and readme updates --- README.md | 138 +++++++++++++++++++++++++++++++++++++++++++++---- docs/index.md | 140 ++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 256 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index f063e863f..09812b8df 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,41 @@ # NATS.NET V2 +NATS.NET V2 is a [NATS](https://nats.io) client for the modern [.NET](https://dot.net/). + ## Preview -The NATS.NET V2 client is in preview and not recommended for production use. -Codebase is still under heavy development and currently we only have implementations for [core NATS](https://docs.nats.io/nats-concepts/core-nats) features. +The NATS.NET V2 client is in preview and not recommended for production use yet. +Codebase is still under heavy development and we currently implemented [Core NATS](https://docs.nats.io/nats-concepts/core-nats) +and basic [JetStream](https://docs.nats.io/nats-concepts/jetstream) features. + +Please test and provide feedback: + +* on [slack.nats.io dotnet channel](https://natsio.slack.com/channels/dotnet) +* or use GitHub discussions, issues and PRs -Please test and provide feedback by visiting our [Slack channel](https://natsio.slack.com/channels/dotnet). +Thank you to our contributors so far. We feel we are growing slowly as a community and we appreciate your help +supporting and developing NATS .NET V2 project. ## NATS.NET V2 Goals -- Only support Async I/O -- Target latest .NET LTS Release (currently `net6.0`) +- Only support Async I/O (async/await) +- Target latest .NET LTS Release (currently .NET 6.0) ## Packages -- **NATS.Client.Core**: [core NATS](https://docs.nats.io/nats-concepts/core-nats) +- **NATS.Client.Core**: [Core NATS](https://docs.nats.io/nats-concepts/core-nats) - **NATS.Client.Hosting**: extension to configure DI container -- **NATS.Client.JetStream**: JetStream *not yet implemented* +- **NATS.Client.JetStream**: [JetStream](https://docs.nats.io/nats-concepts/jetstream) -## Basic Usage +## Core NATS Quick Start [Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen on its default TCP port 4222. +```shell +$ nats-server +``` + Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending and receiving `Bar` objects: @@ -36,7 +49,7 @@ public record Bar Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects): ```csharp -await using var nats = new NatsConnection(options); +await using var nats = new NatsConnection(); await using sub = await nats.SubscribeAsync("bar.>"); await foreach (var msg in sub.Msgs.ReadAllAsync()) @@ -56,17 +69,120 @@ for (int i = 0; i < 10; i++) } ``` +## JetStream Quick Start + +This time run `nats-server` with JetStream enabled: + +```shell +$ nats-server -js +``` + +Before we can so anything, we need a JetStream context: + +```csharp +await using var nc = new NatsConnection(); +var js = new NatsJSContext(nc); +``` + +Let's create our stream first. In JetStream, a stream is simply a storage for messages: + +```csharp +await js.CreateStreamAsync(stream: "shop_orders", subjects: new []{"orders.>"}); +``` + +We can save messages in a stream by publishing them to the subjects the stream is interested in, which is `orders.>` in +our case, meaning any subject prefixed with `orders.` e.g. `orders.new.123`. Have a look at NATS documentation about +[wildcards in Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects#wildcards) for more information. + +Given that we have a record `Order`, we can publish and consume stream of `Order` objects: + +```csharp +public record Order(int OrderId); +``` + +We can publish to the `shop_orders` stream and receive a confirmation that our message is persisted: + +```csharp +for (var i = 0; i < 10; i++) +{ + // Notice we're using JetStream context to publish and receive ACKs + var ack = await js.PublishAsync($"orders.new.{i}", new Order(i)); + ack.EnsureSuccess(); +} +``` + +Now that we have a few messages in our stream, let's see its status using the [NATS command +line client](https://github.com/nats-io/natscli): + +```shell +$ nats stream ls +╭───────────────────────────────────────────────────────────────────────────────────╮ +│ Streams │ +├─────────────┬─────────────┬─────────────────────┬──────────┬───────┬──────────────┤ +│ Name │ Description │ Created │ Messages │ Size │ Last Message │ +├─────────────┼─────────────┼─────────────────────┼──────────┼───────┼──────────────┤ +│ shop_orders │ │ 2023-09-12 10:25:52 │ 10 │ 600 B │ 10.41s │ +╰─────────────┴─────────────┴─────────────────────┴──────────┴───────┴──────────────╯ +``` + +We need one more JetStream construct before we can start consuming our messages: a *consumer*: + +```csharp +var consumer = await js.CreateConsumerAsync(stream: "shop_orders", consumer: "order_processor"); +``` + +In JetStream, consumers are stored on the server. Clients don't need to worry about maintaining state separately. +You can think of JetStream consumers as pointers to messages in streams stored on the NATS JetStream server. Let's +see what our consumer's state is: + +```shell +$ nats consumer report shop_orders +╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ +│ Consumer report for shop_orders with 1 consumers │ +├─────────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤ +│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │ +├─────────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤ +│ order_processor │ Pull │ Explicit │ 30.00s │ 0 │ 0 │ 10 / 100% │ 0 │ │ +╰─────────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯ +``` + +Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream) for more information on streams and consumers. + +Finally, we're ready to consume the messages we persisted in `shop_orders` stream: + +```csharp +await foreach (var msg in consumer.ConsumeAllAsync()) +{ + var order = msg.Data; + Console.WriteLine($"Processing {msg.Subject} {order}..."); + await msg.AckAsync(); + // this loop never ends unless there is an error +} +``` + +## Logging + You should also hook your logger to `NatsConnection` to make sure all is working as expected or to get help diagnosing any issues you might have: + ```csharp -var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; -await using var nats = new NatsConnection(options); +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +await using var nats = new NatsConnection(otps); ``` ## Contributing - Run `dotnet format` at root directory of project in order to clear warnings that can be auto-formatted +## Roadmap + +- [x] Core NATS +- [x] JetStream initial support +- [ ] KV initial support +- [ ] Object Store initial support +- [ ] .NET 8.0 support (e.g. Native AOT) +- [ ] Beta phase + ## Attribution This library is based on the excellent work in [Cysharp/AlterNats](https://github.com/Cysharp/AlterNats) diff --git a/docs/index.md b/docs/index.md index f51567bfa..09812b8df 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,26 +1,41 @@ -# NATS.NET V2 (Preview) +# NATS.NET V2 -The NATS.NET V2 client is in preview and not recommended for production use. -Codebase is still under heavy development and currently we only have implementations for [core NATS](https://docs.nats.io/nats-concepts/core-nats) features. +NATS.NET V2 is a [NATS](https://nats.io) client for the modern [.NET](https://dot.net/). -Please test and provide feedback by visiting our [Slack channel](https://natsio.slack.com/channels/dotnet). +## Preview + +The NATS.NET V2 client is in preview and not recommended for production use yet. +Codebase is still under heavy development and we currently implemented [Core NATS](https://docs.nats.io/nats-concepts/core-nats) +and basic [JetStream](https://docs.nats.io/nats-concepts/jetstream) features. + +Please test and provide feedback: + +* on [slack.nats.io dotnet channel](https://natsio.slack.com/channels/dotnet) +* or use GitHub discussions, issues and PRs + +Thank you to our contributors so far. We feel we are growing slowly as a community and we appreciate your help +supporting and developing NATS .NET V2 project. ## NATS.NET V2 Goals -- Only support Async I/O -- Target latest .NET LTS Release (currently `net6.0`) +- Only support Async I/O (async/await) +- Target latest .NET LTS Release (currently .NET 6.0) ## Packages -- **NATS.Client.Core**: [core NATS](https://docs.nats.io/nats-concepts/core-nats) +- **NATS.Client.Core**: [Core NATS](https://docs.nats.io/nats-concepts/core-nats) - **NATS.Client.Hosting**: extension to configure DI container -- **NATS.Client.JetStream**: JetStream *not yet implemented* +- **NATS.Client.JetStream**: [JetStream](https://docs.nats.io/nats-concepts/jetstream) -## Basic Usage +## Core NATS Quick Start [Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen on its default TCP port 4222. +```shell +$ nats-server +``` + Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending and receiving `Bar` objects: @@ -54,17 +69,120 @@ for (int i = 0; i < 10; i++) } ``` +## JetStream Quick Start + +This time run `nats-server` with JetStream enabled: + +```shell +$ nats-server -js +``` + +Before we can so anything, we need a JetStream context: + +```csharp +await using var nc = new NatsConnection(); +var js = new NatsJSContext(nc); +``` + +Let's create our stream first. In JetStream, a stream is simply a storage for messages: + +```csharp +await js.CreateStreamAsync(stream: "shop_orders", subjects: new []{"orders.>"}); +``` + +We can save messages in a stream by publishing them to the subjects the stream is interested in, which is `orders.>` in +our case, meaning any subject prefixed with `orders.` e.g. `orders.new.123`. Have a look at NATS documentation about +[wildcards in Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects#wildcards) for more information. + +Given that we have a record `Order`, we can publish and consume stream of `Order` objects: + +```csharp +public record Order(int OrderId); +``` + +We can publish to the `shop_orders` stream and receive a confirmation that our message is persisted: + +```csharp +for (var i = 0; i < 10; i++) +{ + // Notice we're using JetStream context to publish and receive ACKs + var ack = await js.PublishAsync($"orders.new.{i}", new Order(i)); + ack.EnsureSuccess(); +} +``` + +Now that we have a few messages in our stream, let's see its status using the [NATS command +line client](https://github.com/nats-io/natscli): + +```shell +$ nats stream ls +╭───────────────────────────────────────────────────────────────────────────────────╮ +│ Streams │ +├─────────────┬─────────────┬─────────────────────┬──────────┬───────┬──────────────┤ +│ Name │ Description │ Created │ Messages │ Size │ Last Message │ +├─────────────┼─────────────┼─────────────────────┼──────────┼───────┼──────────────┤ +│ shop_orders │ │ 2023-09-12 10:25:52 │ 10 │ 600 B │ 10.41s │ +╰─────────────┴─────────────┴─────────────────────┴──────────┴───────┴──────────────╯ +``` + +We need one more JetStream construct before we can start consuming our messages: a *consumer*: + +```csharp +var consumer = await js.CreateConsumerAsync(stream: "shop_orders", consumer: "order_processor"); +``` + +In JetStream, consumers are stored on the server. Clients don't need to worry about maintaining state separately. +You can think of JetStream consumers as pointers to messages in streams stored on the NATS JetStream server. Let's +see what our consumer's state is: + +```shell +$ nats consumer report shop_orders +╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ +│ Consumer report for shop_orders with 1 consumers │ +├─────────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤ +│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │ +├─────────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤ +│ order_processor │ Pull │ Explicit │ 30.00s │ 0 │ 0 │ 10 / 100% │ 0 │ │ +╰─────────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯ +``` + +Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream) for more information on streams and consumers. + +Finally, we're ready to consume the messages we persisted in `shop_orders` stream: + +```csharp +await foreach (var msg in consumer.ConsumeAllAsync()) +{ + var order = msg.Data; + Console.WriteLine($"Processing {msg.Subject} {order}..."); + await msg.AckAsync(); + // this loop never ends unless there is an error +} +``` + +## Logging + You should also hook your logger to `NatsConnection` to make sure all is working as expected or to get help diagnosing any issues you might have: + ```csharp -var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; -await using var nats = new NatsConnection(options); +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +await using var nats = new NatsConnection(otps); ``` ## Contributing - Run `dotnet format` at root directory of project in order to clear warnings that can be auto-formatted +## Roadmap + +- [x] Core NATS +- [x] JetStream initial support +- [ ] KV initial support +- [ ] Object Store initial support +- [ ] .NET 8.0 support (e.g. Native AOT) +- [ ] Beta phase + ## Attribution This library is based on the excellent work in [Cysharp/AlterNats](https://github.com/Cysharp/AlterNats) From babb1c97a1b14dec685e5aaf9e0dbe55999f59ad Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 12 Sep 2023 12:21:31 +0100 Subject: [PATCH 08/10] JetStream intro tidy-up --- README.md | 156 +---------------------- docs/documentation/core/intro.md | 57 +++++++++ docs/documentation/intro.md | 6 +- docs/documentation/jetstream/intro.md | 93 +++++++++++--- docs/index.md | 174 +------------------------- 5 files changed, 144 insertions(+), 342 deletions(-) diff --git a/README.md b/README.md index 09812b8df..f06ed8a2a 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,10 @@ Please test and provide feedback: Thank you to our contributors so far. We feel we are growing slowly as a community and we appreciate your help supporting and developing NATS .NET V2 project. +## Documentation + +Check out the [documentation](https://nats-io.github.io/nats.net.v2/) for guides and examples. + ## NATS.NET V2 Goals - Only support Async I/O (async/await) @@ -27,162 +31,10 @@ supporting and developing NATS .NET V2 project. - **NATS.Client.Hosting**: extension to configure DI container - **NATS.Client.JetStream**: [JetStream](https://docs.nats.io/nats-concepts/jetstream) -## Core NATS Quick Start - -[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen -on its default TCP port 4222. - -```shell -$ nats-server -``` - -Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending -and receiving `Bar` objects: - -```csharp -public record Bar -{ - public int Id { get; set; } - public string Name { get; set; } -} -``` - -Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(); - -await using sub = await nats.SubscribeAsync("bar.>"); -await foreach (var msg in sub.Msgs.ReadAllAsync()) -{ - Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); -} -``` - -Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(); - -for (int i = 0; i < 10; i++) -{ - Console.WriteLine($" Publishing {i}..."); - await nats.PublishAsync($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" }); -} -``` - -## JetStream Quick Start - -This time run `nats-server` with JetStream enabled: - -```shell -$ nats-server -js -``` - -Before we can so anything, we need a JetStream context: - -```csharp -await using var nc = new NatsConnection(); -var js = new NatsJSContext(nc); -``` - -Let's create our stream first. In JetStream, a stream is simply a storage for messages: - -```csharp -await js.CreateStreamAsync(stream: "shop_orders", subjects: new []{"orders.>"}); -``` - -We can save messages in a stream by publishing them to the subjects the stream is interested in, which is `orders.>` in -our case, meaning any subject prefixed with `orders.` e.g. `orders.new.123`. Have a look at NATS documentation about -[wildcards in Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects#wildcards) for more information. - -Given that we have a record `Order`, we can publish and consume stream of `Order` objects: - -```csharp -public record Order(int OrderId); -``` - -We can publish to the `shop_orders` stream and receive a confirmation that our message is persisted: - -```csharp -for (var i = 0; i < 10; i++) -{ - // Notice we're using JetStream context to publish and receive ACKs - var ack = await js.PublishAsync($"orders.new.{i}", new Order(i)); - ack.EnsureSuccess(); -} -``` - -Now that we have a few messages in our stream, let's see its status using the [NATS command -line client](https://github.com/nats-io/natscli): - -```shell -$ nats stream ls -╭───────────────────────────────────────────────────────────────────────────────────╮ -│ Streams │ -├─────────────┬─────────────┬─────────────────────┬──────────┬───────┬──────────────┤ -│ Name │ Description │ Created │ Messages │ Size │ Last Message │ -├─────────────┼─────────────┼─────────────────────┼──────────┼───────┼──────────────┤ -│ shop_orders │ │ 2023-09-12 10:25:52 │ 10 │ 600 B │ 10.41s │ -╰─────────────┴─────────────┴─────────────────────┴──────────┴───────┴──────────────╯ -``` - -We need one more JetStream construct before we can start consuming our messages: a *consumer*: - -```csharp -var consumer = await js.CreateConsumerAsync(stream: "shop_orders", consumer: "order_processor"); -``` - -In JetStream, consumers are stored on the server. Clients don't need to worry about maintaining state separately. -You can think of JetStream consumers as pointers to messages in streams stored on the NATS JetStream server. Let's -see what our consumer's state is: - -```shell -$ nats consumer report shop_orders -╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ -│ Consumer report for shop_orders with 1 consumers │ -├─────────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤ -│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │ -├─────────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤ -│ order_processor │ Pull │ Explicit │ 30.00s │ 0 │ 0 │ 10 / 100% │ 0 │ │ -╰─────────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯ -``` - -Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream) for more information on streams and consumers. - -Finally, we're ready to consume the messages we persisted in `shop_orders` stream: - -```csharp -await foreach (var msg in consumer.ConsumeAllAsync()) -{ - var order = msg.Data; - Console.WriteLine($"Processing {msg.Subject} {order}..."); - await msg.AckAsync(); - // this loop never ends unless there is an error -} -``` - -## Logging - -You should also hook your logger to `NatsConnection` to make sure all is working as expected or -to get help diagnosing any issues you might have: - -```csharp -var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; -await using var nats = new NatsConnection(otps); -``` - ## Contributing - Run `dotnet format` at root directory of project in order to clear warnings that can be auto-formatted -## Roadmap - -- [x] Core NATS -- [x] JetStream initial support -- [ ] KV initial support -- [ ] Object Store initial support -- [ ] .NET 8.0 support (e.g. Native AOT) -- [ ] Beta phase - ## Attribution This library is based on the excellent work in [Cysharp/AlterNats](https://github.com/Cysharp/AlterNats) diff --git a/docs/documentation/core/intro.md b/docs/documentation/core/intro.md index f06e11687..48904a44c 100644 --- a/docs/documentation/core/intro.md +++ b/docs/documentation/core/intro.md @@ -6,6 +6,63 @@ of explanation, in a simplified sense you can think of Core NATS as the [wire protocol](https://docs.nats.io/reference/reference-protocols/nats-protocol) defining a simple but powerful pub/sub functionality and the concept of [Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects). +## Core NATS Quick Start + +[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen +on its default TCP port 4222. + +```shell +$ nats-server +``` + +Install `NATS.Client.Core` preview from Nuget. + +Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending +and receiving `Bar` objects: + +```csharp +public record Bar +{ + public int Id { get; set; } + public string Name { get; set; } +} +``` + +Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects): + +```csharp +await using var nats = new NatsConnection(); + +await using sub = await nats.SubscribeAsync("bar.>"); +await foreach (var msg in sub.Msgs.ReadAllAsync()) +{ + Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); +} +``` + +Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects): +```csharp +await using var nats = new NatsConnection(); + +for (int i = 0; i < 10; i++) +{ + Console.WriteLine($" Publishing {i}..."); + await nats.PublishAsync($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" }); +} +``` + +## Logging + +You should also hook your logger to `NatsConnection` to make sure all is working as expected or +to get help diagnosing any issues you might have: + +```csharp +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +await using var nats = new NatsConnection(otps); +``` + +## What's Next + [Publish-Subscribe](pub-sub.md) is the message distribution model for one-to-many communication. [Request-Reply](req-rep.md) is a common pattern in modern distributed systems. A request is sent, and the application diff --git a/docs/documentation/intro.md b/docs/documentation/intro.md index 2776a1894..a44ce4111 100644 --- a/docs/documentation/intro.md +++ b/docs/documentation/intro.md @@ -1,13 +1,15 @@ # NATS.NET V2 Client -NATS.NET V2 Client is a .Net client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)! -It's build on top of the modern .Net 6+ platform, taking advantage of all the high performance features and +NATS.NET V2 Client is a .NET client for the Open Source [Connective Technology for Adaptive Edge & Distributed Systems - NATS](https://nats.io/)! +It's build on top of the modern .NET platform, taking advantage of all the high performance features and asynchronous programming model. NATS.NET V2 Client, just like NATS, is Open Source as is this documentation. Please [let us know](https://natsio.slack.com/channels/dotnet) if you have updates and/or suggestions for these docs. You can also create a Pull Request using the Edit on GitHub link on each page. +## What's Next + [Core NATS](core/intro.md) is the base set of functionalities and qualities of service offered by a NATS service infrastructure. [JetStream](jetstream/intro.md) is the built-in distributed persistence system built-in to the same NATS server binary. diff --git a/docs/documentation/jetstream/intro.md b/docs/documentation/jetstream/intro.md index d2ce3f2ad..04147a19a 100644 --- a/docs/documentation/jetstream/intro.md +++ b/docs/documentation/jetstream/intro.md @@ -26,7 +26,7 @@ in the defined storage system. A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients. -## Quick Start +## JetStream Quick Start [Download the latest](https://nats.io/download/) `nats-server` for your platform and run it with JetStream enabled: @@ -34,46 +34,103 @@ stream and will keep track of which messages were delivered and acknowledged by $ nats-server -js ``` -NATS server will listen on its default TCP port 4222. +Install `NATS.Client.JetStream` preview from Nuget. -Then create a .NET 6 (or above) console project, add `NATS.Client.JetStream` preview NuGet package and paste the below -code snippet into `Program.cs`: +Before we can so anything, we need a JetStream context: ```csharp -using System; -using Microsoft.Extensions.Logging; -using NATS.Client.Core; -using NATS.Client.JetStream; +await using var nc = new NatsConnection(); +var js = new NatsJSContext(nc); +``` -var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +Let's create our stream first. In JetStream, a stream is simply a storage for messages: -await using var nc = new NatsConnection(opts); -var js = new NatsJSContext(nc); +```csharp +await js.CreateStreamAsync(stream: "shop_orders", subjects: new []{"orders.>"}); +``` + +We can save messages in a stream by publishing them to the subjects the stream is interested in, which is `orders.>` in +our case, meaning any subject prefixed with `orders.` e.g. `orders.new.123`. Have a look at NATS documentation about +[wildcards in Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects#wildcards) for more information. + +Given that we have a record `Order`, we can publish and consume stream of `Order` objects: + +```csharp +public record Order(int OrderId); +``` -await js.CreateStreamAsync("orders", subjects: new []{"orders.>"}); +We can publish to the `shop_orders` stream and receive a confirmation that our message is persisted: +```csharp for (var i = 0; i < 10; i++) { + // Notice we're using JetStream context to publish and receive ACKs var ack = await js.PublishAsync($"orders.new.{i}", new Order(i)); ack.EnsureSuccess(); } +``` + +Now that we have a few messages in our stream, let's see its status using the [NATS command +line client](https://github.com/nats-io/natscli): + +```shell +$ nats stream ls +╭───────────────────────────────────────────────────────────────────────────────────╮ +│ Streams │ +├─────────────┬─────────────┬─────────────────────┬──────────┬───────┬──────────────┤ +│ Name │ Description │ Created │ Messages │ Size │ Last Message │ +├─────────────┼─────────────┼─────────────────────┼──────────┼───────┼──────────────┤ +│ shop_orders │ │ 2023-09-12 10:25:52 │ 10 │ 600 B │ 10.41s │ +╰─────────────┴─────────────┴─────────────────────┴──────────┴───────┴──────────────╯ +``` -var consumer = await js.CreateConsumerAsync("orders", "order_processor"); +We need one more JetStream construct before we can start consuming our messages: a *consumer*: -Console.WriteLine($"Consume..."); +```csharp +var consumer = await js.CreateConsumerAsync(stream: "shop_orders", consumer: "order_processor"); +``` + +In JetStream, consumers are stored on the server. Clients don't need to worry about maintaining state separately. +You can think of JetStream consumers as pointers to messages in streams stored on the NATS JetStream server. Let's +see what our consumer's state is: + +```shell +$ nats consumer report shop_orders +╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ +│ Consumer report for shop_orders with 1 consumers │ +├─────────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤ +│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │ +├─────────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤ +│ order_processor │ Pull │ Explicit │ 30.00s │ 0 │ 0 │ 10 / 100% │ 0 │ │ +╰─────────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯ +``` + +Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream) for more information on streams and consumers. + +Finally, we're ready to consume the messages we persisted in `shop_orders` stream: + +```csharp await foreach (var msg in consumer.ConsumeAllAsync()) { var order = msg.Data; Console.WriteLine($"Processing {msg.Subject} {order}..."); await msg.AckAsync(); - if (order.OrderId == 5) - break; + // this loop never ends unless there is an error } -Console.WriteLine($"Done consuming."); +``` -public record Order(int OrderId); +## Logging + +You should also hook your logger to `NatsConnection` to make sure all is working as expected or +to get help diagnosing any issues you might have: + +```csharp +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +await using var nats = new NatsConnection(otps); ``` +## What's Next + [Managing JetStream](manage.md) covers how to create, update, get, list and delete streams and consumers. [Publishing messages to streams](publish.md) is achieved by simply publishing to a subject where a stream is configured diff --git a/docs/index.md b/docs/index.md index 09812b8df..a59e71793 100644 --- a/docs/index.md +++ b/docs/index.md @@ -5,174 +5,6 @@ NATS.NET V2 is a [NATS](https://nats.io) client for the modern [.NET](https://do ## Preview The NATS.NET V2 client is in preview and not recommended for production use yet. -Codebase is still under heavy development and we currently implemented [Core NATS](https://docs.nats.io/nats-concepts/core-nats) -and basic [JetStream](https://docs.nats.io/nats-concepts/jetstream) features. - -Please test and provide feedback: - -* on [slack.nats.io dotnet channel](https://natsio.slack.com/channels/dotnet) -* or use GitHub discussions, issues and PRs - -Thank you to our contributors so far. We feel we are growing slowly as a community and we appreciate your help -supporting and developing NATS .NET V2 project. - -## NATS.NET V2 Goals - -- Only support Async I/O (async/await) -- Target latest .NET LTS Release (currently .NET 6.0) - -## Packages - -- **NATS.Client.Core**: [Core NATS](https://docs.nats.io/nats-concepts/core-nats) -- **NATS.Client.Hosting**: extension to configure DI container -- **NATS.Client.JetStream**: [JetStream](https://docs.nats.io/nats-concepts/jetstream) - -## Core NATS Quick Start - -[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it without any arguments. `nats-server` will listen -on its default TCP port 4222. - -```shell -$ nats-server -``` - -Given that we have a plain class `Bar`, we can publish and subscribe to our `nats-server` sending -and receiving `Bar` objects: - -```csharp -public record Bar -{ - public int Id { get; set; } - public string Name { get; set; } -} -``` - -Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(); - -await using sub = await nats.SubscribeAsync("bar.>"); -await foreach (var msg in sub.Msgs.ReadAllAsync()) -{ - Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n"); -} -``` - -Publish `Bar` objects to related `bar` [subjects](https://docs.nats.io/nats-concepts/subjects): -```csharp -await using var nats = new NatsConnection(); - -for (int i = 0; i < 10; i++) -{ - Console.WriteLine($" Publishing {i}..."); - await nats.PublishAsync($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" }); -} -``` - -## JetStream Quick Start - -This time run `nats-server` with JetStream enabled: - -```shell -$ nats-server -js -``` - -Before we can so anything, we need a JetStream context: - -```csharp -await using var nc = new NatsConnection(); -var js = new NatsJSContext(nc); -``` - -Let's create our stream first. In JetStream, a stream is simply a storage for messages: - -```csharp -await js.CreateStreamAsync(stream: "shop_orders", subjects: new []{"orders.>"}); -``` - -We can save messages in a stream by publishing them to the subjects the stream is interested in, which is `orders.>` in -our case, meaning any subject prefixed with `orders.` e.g. `orders.new.123`. Have a look at NATS documentation about -[wildcards in Subject-Based Messaging](https://docs.nats.io/nats-concepts/subjects#wildcards) for more information. - -Given that we have a record `Order`, we can publish and consume stream of `Order` objects: - -```csharp -public record Order(int OrderId); -``` - -We can publish to the `shop_orders` stream and receive a confirmation that our message is persisted: - -```csharp -for (var i = 0; i < 10; i++) -{ - // Notice we're using JetStream context to publish and receive ACKs - var ack = await js.PublishAsync($"orders.new.{i}", new Order(i)); - ack.EnsureSuccess(); -} -``` - -Now that we have a few messages in our stream, let's see its status using the [NATS command -line client](https://github.com/nats-io/natscli): - -```shell -$ nats stream ls -╭───────────────────────────────────────────────────────────────────────────────────╮ -│ Streams │ -├─────────────┬─────────────┬─────────────────────┬──────────┬───────┬──────────────┤ -│ Name │ Description │ Created │ Messages │ Size │ Last Message │ -├─────────────┼─────────────┼─────────────────────┼──────────┼───────┼──────────────┤ -│ shop_orders │ │ 2023-09-12 10:25:52 │ 10 │ 600 B │ 10.41s │ -╰─────────────┴─────────────┴─────────────────────┴──────────┴───────┴──────────────╯ -``` - -We need one more JetStream construct before we can start consuming our messages: a *consumer*: - -```csharp -var consumer = await js.CreateConsumerAsync(stream: "shop_orders", consumer: "order_processor"); -``` - -In JetStream, consumers are stored on the server. Clients don't need to worry about maintaining state separately. -You can think of JetStream consumers as pointers to messages in streams stored on the NATS JetStream server. Let's -see what our consumer's state is: - -```shell -$ nats consumer report shop_orders -╭────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ -│ Consumer report for shop_orders with 1 consumers │ -├─────────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤ -│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │ -├─────────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤ -│ order_processor │ Pull │ Explicit │ 30.00s │ 0 │ 0 │ 10 / 100% │ 0 │ │ -╰─────────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯ -``` - -Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream) for more information on streams and consumers. - -Finally, we're ready to consume the messages we persisted in `shop_orders` stream: - -```csharp -await foreach (var msg in consumer.ConsumeAllAsync()) -{ - var order = msg.Data; - Console.WriteLine($"Processing {msg.Subject} {order}..."); - await msg.AckAsync(); - // this loop never ends unless there is an error -} -``` - -## Logging - -You should also hook your logger to `NatsConnection` to make sure all is working as expected or -to get help diagnosing any issues you might have: - -```csharp -var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; -await using var nats = new NatsConnection(otps); -``` - -## Contributing - -- Run `dotnet format` at root directory of project in order to clear warnings that can be auto-formatted ## Roadmap @@ -183,6 +15,8 @@ await using var nats = new NatsConnection(otps); - [ ] .NET 8.0 support (e.g. Native AOT) - [ ] Beta phase -## Attribution +## What's Next + +[Documentation](documentation/intro.md) can help you start writing code in no time. Just follow our quick start guides. -This library is based on the excellent work in [Cysharp/AlterNats](https://github.com/Cysharp/AlterNats) +[API](api/index.md) is the generated reference documentation. From 36783e2bbe90da0a862468d3b780cc79ba83a783 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 12 Sep 2023 13:37:02 +0100 Subject: [PATCH 09/10] Fixed test --- tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index efadfb7f2..0dec6922d 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -96,7 +96,7 @@ public async Task Consume_idle_heartbeat_test() var signal = new WaitSignal(TimeSpan.FromSeconds(30)); server.OnLog += log => { - if (log is { Category: "NATS.Client.JetStream.NatsJSConsume", LogLevel: LogLevel.Debug }) + if (log is { Category: "NATS.Client.JetStream.Internal.NatsJSConsume", LogLevel: LogLevel.Debug }) { if (log.EventId == NatsJSLogEvents.IdleTimeout) signal.Pulse(); From baac749f9ffc14c5282b70ffab2aa9591a60a5f8 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 13 Sep 2023 16:23:35 +0100 Subject: [PATCH 10/10] Added missing exception docs --- src/NATS.Client.JetStream/NatsJSConsumer.cs | 6 ++++++ src/NATS.Client.JetStream/NatsJSStream.cs | 14 +++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index ac910b7b8..86d8a357c 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -37,6 +37,8 @@ internal NatsJSConsumer(NatsJSContext context, ConsumerInfo info) /// There was an issue retrieving the response. /// Server responded with an error. /// After deletion this object can't be used anymore. + /// Server responded with an error. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) { ThrowIfDeleted(); @@ -71,6 +73,7 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d /// Message type to deserialize. /// A consume object to manage the operation and retrieve messages. /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); @@ -138,6 +141,7 @@ await sub.CallMsgNextAsync( /// Message type to deserialize. /// Message retrieved from the stream or NULL /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. /// /// /// If the request to server expires (in 30 seconds by default) this call returns NULL. @@ -190,6 +194,7 @@ await sub.CallMsgNextAsync( /// Message type to deserialize. /// Async enumerable of messages which can be used in a await foreach loop. /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async IAsyncEnumerable> FetchAllAsync( NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -212,6 +217,7 @@ await sub.CallMsgNextAsync( /// Message type to deserialize. /// A fetch object to manage the operation and retrieve messages. /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. public async ValueTask> FetchAsync( NatsJSFetchOpts? opts = default, CancellationToken cancellationToken = default) diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 5e1a9a410..0de05d852 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -28,7 +28,7 @@ internal NatsJSStream(NatsJSContext context, StreamInfo info) /// /// A used to cancel the API call. /// Whether delete was successful or not. - /// There was an issue retrieving the response. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. /// After deletion this object can't be used anymore. public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) @@ -42,7 +42,7 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d /// /// Stream update request to be sent to the server. /// A used to cancel the API call. - /// There was an issue retrieving the response. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. public async ValueTask UpdateAsync( StreamUpdateRequest request, @@ -60,7 +60,7 @@ public async ValueTask UpdateAsync( /// Ack policy to use. Must not be set to none. Default is explicit. /// A used to cancel the API call. /// The NATS JetStream consumer object which can be used retrieving data from the stream. - /// Ack policy is set to none or there was an issue retrieving the response. + /// Ack policy is set to none or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. public ValueTask CreateConsumerAsync(string consumer, ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, CancellationToken cancellationToken = default) { @@ -74,7 +74,7 @@ public ValueTask CreateConsumerAsync(string consumer, ConsumerCo /// Consumer creation request to be sent to NATS JetStream server. /// A used to cancel the API call. /// The NATS JetStream consumer object which can be used retrieving data from the stream. - /// Ack policy is set to none or there was an issue retrieving the response. + /// Ack policy is set to none or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. public ValueTask CreateConsumerAsync(ConsumerCreateRequest request, CancellationToken cancellationToken = default) { @@ -88,7 +88,7 @@ public ValueTask CreateConsumerAsync(ConsumerCreateRequest reque /// Consumer name. /// A used to cancel the API call. /// The NATS JetStream consumer object which can be used retrieving data from the stream. - /// There was an issue retrieving the response. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. public ValueTask GetConsumerAsync(string consumer, CancellationToken cancellationToken = default) { @@ -101,7 +101,7 @@ public ValueTask GetConsumerAsync(string consumer, CancellationT /// /// A used to cancel the API call. /// Async enumerable of consumer objects. Can be used in a await foreach loop. - /// There was an issue retrieving the response. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. /// /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. @@ -118,7 +118,7 @@ public IAsyncEnumerable ListConsumersAsync(CancellationToken can /// Consumer name to be deleted. /// A used to cancel the API call. /// Whether the deletion was successful. - /// There was an issue retrieving the response. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. public ValueTask DeleteConsumerAsync(string consumer, CancellationToken cancellationToken = default) {