From a91da64ff0eeebcc64fb19dc7b75d4f4c593fc78 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 13 Sep 2023 15:39:02 +0100 Subject: [PATCH] Options tidy-up (#128) * Options tidy-up * Removed variable PubOpts Headers and ReplyTo * Added headers and replyTo params to Publish methods * Make all Opts classes (not structs) * Added default options for JS context * Removed useless queue-group from request-reply * Swapped publish reply-to and header params * Headers is more likely to be used than reply-to * Added reference documentation for NatsMsg * Removed unnecessary exception throw --- docs/documentation/queue.md | 3 +- .../Example.Core.PublishHeaders/Program.cs | 2 +- .../Program.cs | 4 +- src/NATS.Client.Core/INatsConnection.cs | 39 ++++-- src/NATS.Client.Core/Internal/InboxSub.cs | 2 +- .../Internal/SubscriptionManager.cs | 9 +- .../NatsConnection.LowLevelApi.cs | 9 +- .../NatsConnection.Publish.cs | 16 +-- .../NatsConnection.RequestReply.cs | 18 ++- .../NatsConnection.RequestSub.cs | 40 ++++-- .../NatsConnection.Subscribe.cs | 15 +-- src/NATS.Client.Core/NatsMsg.cs | 125 ++++++++++++++++-- src/NATS.Client.Core/NatsPubOpts.cs | 6 +- src/NATS.Client.Core/NatsRequestExtensions.cs | 44 ++++-- .../NatsRequestManyExtensions.cs | 32 +++-- src/NATS.Client.Core/NatsSub.cs | 10 +- src/NATS.Client.Core/NatsSubBase.cs | 5 +- src/NATS.Client.Core/NatsSubChannelOpts.cs | 2 +- src/NATS.Client.Core/NatsSubOpts.cs | 12 +- src/NATS.Client.JetStream/NatsJSConsume.cs | 3 +- src/NATS.Client.JetStream/NatsJSConsumer.cs | 23 +++- src/NATS.Client.JetStream/NatsJSContext.cs | 5 +- src/NATS.Client.JetStream/NatsJSFetch.cs | 3 +- src/NATS.Client.JetStream/NatsJSOpts.cs | 21 +++ .../CancellationTest.cs | 2 +- .../NATS.Client.Core.Tests/LowLevelApiTest.cs | 4 +- .../NatsConnectionTest.Headers.cs | 4 +- .../NatsConnectionTest.QueueGroups.cs | 4 +- .../NatsConnectionTest.cs | 2 +- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 10 +- .../SubscriptionTest.cs | 10 +- .../JetStreamTest.cs | 8 +- .../PublishTest.cs | 4 +- 33 files changed, 346 insertions(+), 150 deletions(-) diff --git a/docs/documentation/queue.md b/docs/documentation/queue.md index 175a0b20b..9ed3d56fa 100644 --- a/docs/documentation/queue.md +++ b/docs/documentation/queue.md @@ -21,8 +21,7 @@ var replyTasks = new List(); for (int i = 0; i < 3; i++) { // Create three subscriptions all on the same queue group - var opts = new NatsSubOpts { QueueGroup = "maths-service" }; - var sub = await nats.SubscribeAsync("math.double", opts); + var sub = await nats.SubscribeAsync("math.double", queueGroup: "maths-service"); subs.Add(sub); diff --git a/sandbox/Example.Core.PublishHeaders/Program.cs b/sandbox/Example.Core.PublishHeaders/Program.cs index 2227d45c9..696f22e0b 100644 --- a/sandbox/Example.Core.PublishHeaders/Program.cs +++ b/sandbox/Example.Core.PublishHeaders/Program.cs @@ -15,7 +15,7 @@ await connection.PublishAsync( subject, new Bar { Id = i, Name = "Baz" }, - new NatsPubOpts { Headers = new NatsHeaders { ["XFoo"] = $"bar{i}" } }); + headers: new NatsHeaders { ["XFoo"] = $"bar{i}" }); } void Print(string message) diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs index 2f91b5a8e..5f17ea87e 100644 --- a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs +++ b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs @@ -12,7 +12,7 @@ await using var connection1 = new NatsConnection(options); Print($"[1][SUB] Subscribing to subject '{subject}'...\n"); -var sub1 = await connection1.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" }); +var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers"); var task1 = Task.Run(async () => { await foreach (var msg in sub1.Msgs.ReadAllAsync()) @@ -28,7 +28,7 @@ await using var connection2 = new NatsConnection(options); Print($"[2][SUB] Subscribing to subject '{subject}'...\n"); -var sub2 = await connection2.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" }); +var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers"); var task2 = Task.Run(async () => { await foreach (var msg in sub2.Msgs.ReadAllAsync()) diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index cce160449..647a8b52a 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -1,5 +1,4 @@ using System.Buffers; -using System.Runtime.CompilerServices; namespace NATS.Client.Core; @@ -17,10 +16,12 @@ public interface INatsConnection /// /// The destination subject to publish to. /// The message payload data. + /// Optional message headers. + /// Optional reply-to subject. /// A for publishing options. /// A used to cancel the command. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes the message payload to the given subject name, optionally supplying a reply subject. @@ -35,12 +36,14 @@ public interface INatsConnection /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. /// /// The destination subject to publish to. - /// Serializable data object + /// Serializable data object. + /// Optional message headers. + /// Optional reply-to subject. /// A for publishing options. /// A used to cancel the command. - /// Specifies the type of data that may be send to the NATS Server. + /// Specifies the type of data that may be sent to the NATS Server. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, T data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. @@ -48,7 +51,7 @@ public interface INatsConnection /// A representing message details. /// A for publishing options. /// A used to cancel the command. - /// Specifies the type of data that may be send to the NATS Server. + /// Specifies the type of data that may be sent to the NATS Server. /// A that represents the asynchronous send operation. ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); @@ -56,20 +59,32 @@ public interface INatsConnection /// Initiates a subscription to a subject, optionally joining a distributed queue group. /// /// The subject name to subscribe to. + /// If specified, the subscriber will join this queue group. /// A for subscription options. /// A used to cancel the command. /// A that represents the asynchronous send operation. - ValueTask SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + /// + /// Subscribers with the same queue group name, become a queue group, + /// and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Initiates a subscription to a subject, optionally joining a distributed queue group. /// /// The subject name to subscribe to. + /// If specified, the subscriber will join this queue group. /// A for subscription options. /// A used to cancel the command. /// Specifies the type of data that may be received from the NATS Server. /// A that represents the asynchronous send operation. - ValueTask> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + /// + /// Subscribers with the same queue group name, become a queue group, + /// and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + ValueTask> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID} @@ -82,6 +97,7 @@ public interface INatsConnection /// /// Subject of the responder /// Data to send to responder + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -97,6 +113,7 @@ public interface INatsConnection ValueTask?> RequestAsync( string subject, TRequest? data, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); @@ -106,6 +123,7 @@ public interface INatsConnection /// /// Subject of the responder /// Payload to send to responder + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -119,6 +137,7 @@ public interface INatsConnection ValueTask RequestAsync( string subject, ReadOnlySequence payload = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); @@ -128,6 +147,7 @@ public interface INatsConnection /// /// Subject of the responder /// Data to send to responder + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -141,6 +161,7 @@ public interface INatsConnection IAsyncEnumerable> RequestManyAsync( string subject, TRequest? data, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); @@ -150,6 +171,7 @@ public interface INatsConnection /// /// Subject of the responder /// Payload to send to responder + /// Optional message headers /// Request publish options /// Reply handler subscription options /// Cancel this request @@ -161,6 +183,7 @@ public interface INatsConnection IAsyncEnumerable RequestManyAsync( string subject, ReadOnlySequence payload = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); diff --git a/src/NATS.Client.Core/Internal/InboxSub.cs b/src/NATS.Client.Core/Internal/InboxSub.cs index e9373307c..bda5fa3e5 100644 --- a/src/NATS.Client.Core/Internal/InboxSub.cs +++ b/src/NATS.Client.Core/Internal/InboxSub.cs @@ -16,7 +16,7 @@ public InboxSub( NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) - : base(connection, manager, subject, opts) + : base(connection, manager, subject, queueGroup: default, opts) { _inbox = inbox; _connection = connection; diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index ed09a7544..cd4ad43bf 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -47,7 +47,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix) internal InboxSubBuilder InboxSubBuilder { get; } - public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) + public async ValueTask SubscribeAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { if (IsInboxSubject(subject)) { @@ -55,7 +55,7 @@ public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSub } else { - await SubscribeInternalAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + await SubscribeInternalAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); } } @@ -179,6 +179,7 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N _inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this); await SubscribeInternalAsync( inboxSubject, + queueGroup: default, opts: default, _inboxSub, cancellationToken).ConfigureAwait(false); @@ -193,7 +194,7 @@ await SubscribeInternalAsync( await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false); } - private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) + private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { var sid = GetNextSid(); lock (_gate) @@ -204,7 +205,7 @@ private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts try { - await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken) + await _connection.SubscribeCoreAsync(sid, subject, queueGroup, opts?.MaxMsgs, cancellationToken) .ConfigureAwait(false); await sub.ReadyAsync().ConfigureAwait(false); } diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs index e3b0f3e30..c010d43d1 100644 --- a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -1,6 +1,5 @@ using System.Buffers; using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -100,17 +99,17 @@ internal ValueTask PubModelAsync(string subject, T? data, INatsSerializer ser } } - internal ValueTask SubAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default) + internal ValueTask SubAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default) { if (ConnectionState == NatsConnectionState.Open) { - return SubscriptionManager.SubscribeAsync(subject, opts, sub, cancellationToken); + return SubscriptionManager.SubscribeAsync(subject, queueGroup, opts, sub, cancellationToken); } else { - return WithConnectAsync(subject, opts, sub, cancellationToken, static (self, s, o, b, token) => + return WithConnectAsync(subject, queueGroup, opts, sub, cancellationToken, static (self, s, q, o, b, token) => { - return self.SubscriptionManager.SubscribeAsync(s, o, b, token); + return self.SubscriptionManager.SubscribeAsync(s, q, o, b, token); }); } } diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index b12e032c0..388908106 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -5,41 +5,41 @@ namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { if (opts?.WaitUntilSent ?? false) { - return PubAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken); + return PubAsync(subject, replyTo, payload, headers, cancellationToken); } else { - return PubPostAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken); + return PubPostAsync(subject, replyTo, payload, headers, cancellationToken); } } /// public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { - return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken); + return PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken); } /// - public ValueTask PublishAsync(string subject, T? data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { var serializer = opts?.Serializer ?? Opts.Serializer; if (opts?.WaitUntilSent ?? false) { - return PubModelAsync(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken); + return PubModelAsync(subject, data, serializer, replyTo, headers, cancellationToken); } else { - return PubModelPostAsync(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken); + return PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken); } } /// public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { - return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken); + return PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken); } } diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index 12ac3b7bc..04a5c6491 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -5,6 +5,8 @@ namespace NATS.Client.Core; public partial class NatsConnection { + private static readonly NatsSubOpts DefaultReplyOpts = new() { MaxMsgs = 1 }; + /// public string NewInbox() => $"{InboxPrefix}{Guid.NewGuid():n}"; @@ -12,13 +14,14 @@ public partial class NatsConnection public async ValueTask?> RequestAsync( string subject, TRequest? data, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { var opts = SetReplyOptsDefaults(replyOpts); - await using var sub = await RequestSubAsync(subject, data, requestOpts, opts, cancellationToken) + await using var sub = await RequestSubAsync(subject, data, headers, requestOpts, opts, cancellationToken) .ConfigureAwait(false); if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) @@ -36,13 +39,14 @@ public partial class NatsConnection public async ValueTask RequestAsync( string subject, ReadOnlySequence payload = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { var opts = SetReplyOptsDefaults(replyOpts); - await using var sub = await RequestSubAsync(subject, payload, requestOpts, opts, cancellationToken).ConfigureAwait(false); + await using var sub = await RequestSubAsync(subject, payload, headers, requestOpts, opts, cancellationToken).ConfigureAwait(false); if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { @@ -59,11 +63,12 @@ public partial class NatsConnection public async IAsyncEnumerable> RequestManyAsync( string subject, TRequest? data, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await using var sub = await RequestSubAsync(subject, data, requestOpts, replyOpts, cancellationToken) + await using var sub = await RequestSubAsync(subject, data, headers, requestOpts, replyOpts, cancellationToken) .ConfigureAwait(false); while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) @@ -85,11 +90,12 @@ public partial class NatsConnection public async IAsyncEnumerable RequestManyAsync( string subject, ReadOnlySequence payload = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await using var sub = await RequestSubAsync(subject, payload, requestOpts, replyOpts, cancellationToken).ConfigureAwait(false); + await using var sub = await RequestSubAsync(subject, payload, headers, requestOpts, replyOpts, cancellationToken).ConfigureAwait(false); while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { @@ -106,9 +112,9 @@ public async IAsyncEnumerable RequestManyAsync( } } - private NatsSubOpts SetReplyOptsDefaults(in NatsSubOpts? replyOpts) + private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts) { - var opts = (replyOpts ?? default) with { MaxMsgs = 1, }; + var opts = replyOpts ?? DefaultReplyOpts; if ((opts.Timeout ?? default) == default) { diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index d1ac0860e..cbdfa5320 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -1,5 +1,4 @@ using System.Buffers; -using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -8,20 +7,30 @@ public partial class NatsConnection internal async ValueTask RequestSubAsync( string subject, ReadOnlySequence payload = default, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; - var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, replyOpts); - await SubAsync(replyTo, replyOpts, sub, cancellationToken).ConfigureAwait(false); - await PubAsync(subject, replyTo, payload, requestOpts?.Headers, cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts); + await SubAsync(replyTo, queueGroup: default, replyOpts, sub, cancellationToken).ConfigureAwait(false); + if (requestOpts?.WaitUntilSent == true) + { + await PubAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); + } + else + { + await PubPostAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); + } + return sub; } internal async ValueTask> RequestSubAsync( string subject, TRequest? data, + NatsHeaders? headers = default, NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) @@ -29,16 +38,19 @@ internal async ValueTask> RequestSubAsync( var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; var replySerializer = replyOpts?.Serializer ?? Opts.Serializer; - var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, replyOpts, replySerializer); - await SubAsync(replyTo, replyOpts, sub, cancellationToken).ConfigureAwait(false); - - await PubModelAsync( - subject, - data, - requestOpts?.Serializer ?? Opts.Serializer, - replyTo, - requestOpts?.Headers, - cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer); + await SubAsync(replyTo, queueGroup: default, replyOpts, sub, cancellationToken).ConfigureAwait(false); + + var serializer = requestOpts?.Serializer ?? Opts.Serializer; + + if (requestOpts?.WaitUntilSent == true) + { + await PubModelAsync(subject, data, serializer, replyTo, headers, cancellationToken).ConfigureAwait(false); + } + else + { + await PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken).ConfigureAwait(false); + } return sub; } diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index 77deb3475..f849cb331 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -1,24 +1,21 @@ -using System.Collections.Concurrent; -using NATS.Client.Core.Internal; - namespace NATS.Client.Core; public partial class NatsConnection { /// - public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + public async ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { - var sub = new NatsSub(this, SubscriptionManager, subject, opts); - await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager, subject, queueGroup, opts); + await SubAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); return sub; } /// - public async ValueTask> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + public async ValueTask> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { var serializer = opts?.Serializer ?? Opts.Serializer; - var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, opts, serializer); - await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer); + await SubAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); return sub; } } diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 4c4cd7a8c..67edc2b61 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -3,6 +3,24 @@ namespace NATS.Client.Core; +/// +/// NATS message structure as defined by the protocol. +/// +/// The destination subject to publish to. +/// The reply subject that subscribers can use to send a response back to the publisher/requester. +/// Message size in bytes. +/// Pass additional information using name-value pairs. +/// The message payload data. +/// NATS connection this message is associated to. +/// +/// Connection property is used to provide reply functionality. +/// +/// Message size is calculated using the same method NATS server uses: +/// +/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; +/// +/// +/// public readonly record struct NatsMsg( string Subject, string? ReplyTo, @@ -40,12 +58,34 @@ internal static NatsMsg Build( return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); } - public ValueTask ReplyAsync(ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + /// + /// Reply to this message. + /// + /// The message payload data. + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// + public ValueTask ReplyAsync(ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload, opts, cancellationToken); + return Connection.PublishAsync(ReplyTo!, payload, headers, replyTo, opts, cancellationToken); } + /// + /// Reply to this message. + /// + /// A representing message details. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// public ValueTask ReplyAsync(NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); @@ -67,6 +107,25 @@ private void CheckReplyPreconditions() } } +/// +/// NATS message structure as defined by the protocol. +/// +/// The destination subject to publish to. +/// The reply subject that subscribers can use to send a response back to the publisher/requester. +/// Message size in bytes. +/// Pass additional information using name-value pairs. +/// Serializable data object. +/// NATS connection this message is associated to. +/// Specifies the type of data that may be sent to the NATS Server. +/// +/// Connection property is used to provide reply functionality. +/// +/// Message size is calculated using the same method NATS server uses: +/// +/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; +/// +/// +/// public readonly record struct NatsMsg( string Subject, string? ReplyTo, @@ -112,28 +171,74 @@ internal static NatsMsg Build( return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); } - public ValueTask ReplyAsync(TReply data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + /// + /// Reply to this message. + /// + /// Serializable data object. + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// Specifies the type of data that may be sent to the NATS Server. + /// A that represents the asynchronous send operation. + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// + public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, data, opts, cancellationToken); + return Connection.PublishAsync(ReplyTo!, data, headers, replyTo, opts, cancellationToken); } - public ValueTask ReplyAsync(NatsMsg msg) + /// + /// Reply to this message. + /// + /// A representing message details. + /// A for publishing options. + /// A used to cancel the command. + /// Specifies the type of data that may be sent to the NATS Server. + /// A that represents the asynchronous send operation. + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// + public ValueTask ReplyAsync(NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(msg with { Subject = ReplyTo! }); + return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); } - public ValueTask ReplyAsync(ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + /// + /// Reply to this message. + /// + /// The message payload data. + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// + public ValueTask ReplyAsync(ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload: payload, opts, cancellationToken); + return Connection.PublishAsync(ReplyTo!, payload: payload, headers, replyTo, opts, cancellationToken); } - public ValueTask ReplyAsync(NatsMsg msg) + /// + /// Reply to this message. + /// + /// A representing message details. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + /// + /// Publishes a new message using the reply-to subject from the this message as the destination subject. + /// + public ValueTask ReplyAsync(NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(msg with { Subject = ReplyTo! }); + return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); } [MemberNotNull(nameof(Connection))] diff --git a/src/NATS.Client.Core/NatsPubOpts.cs b/src/NATS.Client.Core/NatsPubOpts.cs index 576758292..dc1d14539 100644 --- a/src/NATS.Client.Core/NatsPubOpts.cs +++ b/src/NATS.Client.Core/NatsPubOpts.cs @@ -1,11 +1,7 @@ namespace NATS.Client.Core; -public readonly record struct NatsPubOpts +public record NatsPubOpts { - public string? ReplyTo { get; init; } - - public NatsHeaders? Headers { get; init; } - public INatsSerializer? Serializer { get; init; } /// diff --git a/src/NATS.Client.Core/NatsRequestExtensions.cs b/src/NATS.Client.Core/NatsRequestExtensions.cs index 64d6387bf..99265e822 100644 --- a/src/NATS.Client.Core/NatsRequestExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestExtensions.cs @@ -9,6 +9,7 @@ public static class NatsRequestExtensions /// /// NATS connection /// Message to be sent as request + /// Request publish options /// Reply handler subscription options /// Cancel this request /// Request type @@ -23,23 +24,27 @@ public static class NatsRequestExtensions public static ValueTask?> RequestAsync( this INatsConnection nats, in NatsMsg msg, + NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestAsync( + CancellationToken cancellationToken = default) + { + CheckMsgForRequestReply(msg); + + return nats.RequestAsync( msg.Subject, msg.Data, - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } /// /// Request and receive a single reply from a responder. /// /// NATS connection /// Message to be sent as request + /// Request publish options /// Reply handler subscription options /// Cancel this request /// Returns the received from the responder as reply. @@ -52,15 +57,30 @@ public static class NatsRequestExtensions public static ValueTask RequestAsync( this INatsConnection nats, in NatsMsg msg, + in NatsPubOpts? requestOpts = default, in NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestAsync( + CancellationToken cancellationToken = default) + { + CheckMsgForRequestReply(msg); + + return nats.RequestAsync( msg.Subject, payload: new ReadOnlySequence(msg.Data), - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } + + internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); + + internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); + + private static void CheckForRequestReply(string? replyTo) + { + if (!string.IsNullOrWhiteSpace(replyTo)) + { + throw new NatsException($"Can't set {nameof(NatsMsg.ReplyTo)} for a request"); + } + } } diff --git a/src/NATS.Client.Core/NatsRequestManyExtensions.cs b/src/NATS.Client.Core/NatsRequestManyExtensions.cs index bee9b6aba..425c7e0bc 100644 --- a/src/NATS.Client.Core/NatsRequestManyExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestManyExtensions.cs @@ -9,6 +9,7 @@ public static class NatsRequestManyExtensions /// /// NATS connection /// Message to be sent as request + /// Request publish options /// Reply handler subscription options /// Cancel this request /// An asynchronous enumerable of objects @@ -19,23 +20,27 @@ public static class NatsRequestManyExtensions public static IAsyncEnumerable RequestManyAsync( this INatsConnection nats, NatsMsg msg, + NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestManyAsync( + CancellationToken cancellationToken = default) + { + NatsRequestExtensions.CheckMsgForRequestReply(msg); + + return nats.RequestManyAsync( msg.Subject, payload: new ReadOnlySequence(msg.Data), - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } /// /// Request and receive zero or more replies from a responder. /// /// NATS connection /// Message to be sent as request + /// Request publish options /// Reply handler subscription options /// Cancel this request /// Request type @@ -48,15 +53,18 @@ public static IAsyncEnumerable RequestManyAsync( public static IAsyncEnumerable> RequestManyAsync( this INatsConnection nats, NatsMsg msg, + NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) => - nats.RequestManyAsync( + CancellationToken cancellationToken = default) + { + NatsRequestExtensions.CheckMsgForRequestReply(msg); + + return nats.RequestManyAsync( msg.Subject, msg.Data, - requestOpts: new NatsPubOpts - { - Headers = msg.Headers, - }, + msg.Headers, + requestOpts, replyOpts, cancellationToken); + } } diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index f25067367..f7ef07f65 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -18,8 +18,8 @@ public sealed class NatsSub : NatsSubBase, INatsSub private readonly Channel _msgs; - internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, NatsSubOpts? opts) - : base(connection, manager, subject, opts) => + internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, string? queueGroup, NatsSubOpts? opts) + : base(connection, manager, subject, queueGroup, opts) => _msgs = Channel.CreateBounded( GetChannelOpts(opts?.ChannelOpts)); @@ -28,9 +28,8 @@ internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string internal static BoundedChannelOptions GetChannelOpts( NatsSubChannelOpts? subChannelOpts) { - if (subChannelOpts != null) + if (subChannelOpts is { } overrideOpts) { - var overrideOpts = subChannelOpts.Value; return new BoundedChannelOptions(overrideOpts.Capacity ?? DefaultChannelOpts.Capacity) { @@ -74,9 +73,10 @@ internal NatsSub( NatsConnection connection, ISubscriptionManager manager, string subject, + string? queueGroup, NatsSubOpts? opts, INatsSerializer serializer) - : base(connection, manager, subject, opts) + : base(connection, manager, subject, queueGroup, opts) { _msgs = Channel.CreateBounded>( NatsSub.GetChannelOpts(opts?.ChannelOpts)); diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index b99957a7c..1eb6164c8 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -43,12 +43,13 @@ internal NatsSubBase( NatsConnection connection, ISubscriptionManager manager, string subject, + string? queueGroup, NatsSubOpts? opts) { _logger = connection.Opts.LoggerFactory.CreateLogger(); _debug = _logger.IsEnabled(LogLevel.Debug); _manager = manager; - _pendingMsgs = opts is { MaxMsgs: > 0 } ? opts.Value.MaxMsgs ?? -1 : -1; + _pendingMsgs = opts is { MaxMsgs: > 0 } ? opts.MaxMsgs ?? -1 : -1; _countPendingMsgs = _pendingMsgs > 0; _idleTimeout = opts?.IdleTimeout ?? default; _startUpTimeout = opts?.StartUpTimeout ?? default; @@ -56,7 +57,7 @@ internal NatsSubBase( Connection = connection; Subject = subject; - QueueGroup = opts?.QueueGroup; + QueueGroup = queueGroup; // Only allocate timers if necessary to reduce GC pressure if (_idleTimeout != default) diff --git a/src/NATS.Client.Core/NatsSubChannelOpts.cs b/src/NATS.Client.Core/NatsSubChannelOpts.cs index fc63aa973..fcc9197f9 100644 --- a/src/NATS.Client.Core/NatsSubChannelOpts.cs +++ b/src/NATS.Client.Core/NatsSubChannelOpts.cs @@ -5,7 +5,7 @@ namespace NATS.Client.Core; /// /// Options For setting the FullMode and Capacity for a the created for Subscriptions /// -public readonly record struct NatsSubChannelOpts +public record NatsSubChannelOpts { /// /// The Behavior of the Subscription's Channel when the Capacity has been reached. diff --git a/src/NATS.Client.Core/NatsSubOpts.cs b/src/NATS.Client.Core/NatsSubOpts.cs index 2cae34fd2..c46cb1771 100644 --- a/src/NATS.Client.Core/NatsSubOpts.cs +++ b/src/NATS.Client.Core/NatsSubOpts.cs @@ -2,18 +2,8 @@ namespace NATS.Client.Core; -public readonly record struct NatsSubOpts +public record NatsSubOpts { - /// - /// If specified, the subscriber will join this queue group. - /// - /// - /// Subscribers with the same queue group name, become a queue group, - /// and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - public string? QueueGroup { get; init; } - /// /// Serializer to use to deserialize the message if a model is being used. /// diff --git a/src/NATS.Client.JetStream/NatsJSConsume.cs b/src/NATS.Client.JetStream/NatsJSConsume.cs index becc9e582..2cc560eae 100644 --- a/src/NATS.Client.JetStream/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/NatsJSConsume.cs @@ -44,8 +44,9 @@ public NatsJSConsume( string stream, string consumer, string subject, + string? queueGroup, NatsSubOpts? opts) - : base(context.Connection, context.Connection.SubscriptionManager, subject, opts) + : base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts) { _logger = Connection.Opts.LoggerFactory.CreateLogger>(); _debug = _logger.IsEnabled(LogLevel.Debug); diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 66b085af4..e89d15b8f 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -30,9 +30,10 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d } public async IAsyncEnumerable> ConsumeAllAsync( - NatsJSConsumeOpts opts, + NatsJSConsumeOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { + opts ??= _context.Opts.DefaultConsumeOpts; await using var cc = await ConsumeAsync(opts, cancellationToken); await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken)) { @@ -40,9 +41,10 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d } } - public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts, CancellationToken cancellationToken = default) + public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); + opts ??= _context.Opts.DefaultConsumeOpts; var inbox = _context.NewInbox(); @@ -67,6 +69,7 @@ public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts consumer: _consumer, context: _context, subject: inbox, + queueGroup: default, opts: requestOpts, maxMsgs: max.MaxMsgs, maxBytes: max.MaxBytes, @@ -77,6 +80,7 @@ public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts await _context.Connection.SubAsync( subject: inbox, + queueGroup: default, opts: requestOpts, sub: sub, cancellationToken); @@ -97,8 +101,11 @@ await sub.CallMsgNextAsync( return sub; } - public async ValueTask?> NextAsync(NatsJSNextOpts opts, CancellationToken cancellationToken = default) + public async ValueTask?> NextAsync(NatsJSNextOpts? opts = default, CancellationToken cancellationToken = default) { + ThrowIfDeleted(); + opts ??= _context.Opts.DefaultNextOpts; + await using var f = await FetchAsync( new NatsJSFetchOpts { @@ -118,9 +125,12 @@ await sub.CallMsgNextAsync( } public async IAsyncEnumerable> FetchAllAsync( - NatsJSFetchOpts opts, + NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { + ThrowIfDeleted(); + opts ??= _context.Opts.DefaultFetchOpts; + await using var fc = await FetchAsync(opts, cancellationToken); await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) { @@ -129,10 +139,11 @@ await sub.CallMsgNextAsync( } public async ValueTask> FetchAsync( - NatsJSFetchOpts opts, + NatsJSFetchOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); + opts ??= _context.Opts.DefaultFetchOpts; var inbox = _context.NewInbox(); @@ -157,6 +168,7 @@ public async ValueTask> FetchAsync( consumer: _consumer, context: _context, subject: inbox, + queueGroup: default, opts: requestOpts, maxMsgs: max.MaxMsgs, maxBytes: max.MaxBytes, @@ -165,6 +177,7 @@ public async ValueTask> FetchAsync( await _context.Connection.SubAsync( subject: inbox, + queueGroup: default, opts: requestOpts, sub: sub, cancellationToken); diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index d2d3bbc71..e1a846edb 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -31,12 +31,14 @@ public ValueTask GetAccountInfoAsync(CancellationToken canc public async ValueTask PublishAsync( string subject, T? data, - NatsPubOpts opts = default, + NatsHeaders? headers = default, + NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { await using var sub = await Connection.RequestSubAsync( subject: subject, data: data, + headers: headers, requestOpts: opts, replyOpts: default, cancellationToken) @@ -87,6 +89,7 @@ internal async ValueTask> JSRequestAsync( subject: subject, data: request, + headers: default, requestOpts: default, replyOpts: new NatsSubOpts { Serializer = NatsJSErrorAwareJsonSerializer.Default }, cancellationToken) diff --git a/src/NATS.Client.JetStream/NatsJSFetch.cs b/src/NATS.Client.JetStream/NatsJSFetch.cs index d17c278c1..fe2c0d750 100644 --- a/src/NATS.Client.JetStream/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/NatsJSFetch.cs @@ -39,8 +39,9 @@ public NatsJSFetch( string stream, string consumer, string subject, + string? queueGroup, NatsSubOpts? opts) - : base(context.Connection, context.Connection.SubscriptionManager, subject, opts) + : base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts) { _logger = Connection.Opts.LoggerFactory.CreateLogger>(); _debug = _logger.IsEnabled(LogLevel.Debug); diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index 1a9342b91..af87b587d 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -33,6 +33,27 @@ public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, string? domain = d /// These options are used as the defaults when acknowledging messages received from a stream using a consumer. /// public AckOpts AckOpts { get; init; } + + /// + /// Default consume options to be used in consume calls in this context. + /// + /// + /// Defaults to MaxMsgs = 1,000. + /// + public NatsJSConsumeOpts DefaultConsumeOpts { get; init; } = new() { MaxMsgs = 1_000 }; + + /// + /// Default fetch options to be used in fetch calls in this context. + /// + /// + /// Defaults to MaxMsgs = 1,000. + /// + public NatsJSFetchOpts DefaultFetchOpts { get; init; } = new() { MaxMsgs = 1_000 }; + + /// + /// Default next options to be used in next calls in this context. + /// + public NatsJSNextOpts DefaultNextOpts { get; init; } = new(); } public record NatsJSConsumeOpts diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs index 96e8b7149..e6e63ce11 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs @@ -28,7 +28,7 @@ public async Task CommandTimeoutTest() var timeoutException = await Assert.ThrowsAsync(async () => { - await pubConnection.PublishAsync("foo", "aiueo", new NatsPubOpts { WaitUntilSent = true }); + await pubConnection.PublishAsync("foo", "aiueo", opts: new NatsPubOpts { WaitUntilSent = true }); }); timeoutException.Message.Should().Contain("1 seconds elapsing"); diff --git a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs index 357b1ec41..71365a21f 100644 --- a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs +++ b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs @@ -18,7 +18,7 @@ public async Task Sub_custom_builder_test() var subject = "foo.*"; var builder = new NatsSubCustomTestBuilder(_output); var sub = builder.Build(subject, default, nats, nats.SubscriptionManager); - await nats.SubAsync(subject, opts: default, sub); + await nats.SubAsync(subject, queueGroup: default, opts: default, sub: sub); await Retry.Until( "subscription is ready", @@ -45,7 +45,7 @@ private class NatsSubTest : NatsSubBase private readonly ITestOutputHelper _output; public NatsSubTest(string subject, NatsConnection connection, NatsSubCustomTestBuilder builder, ITestOutputHelper output, ISubscriptionManager manager) - : base(connection, manager, subject, default) + : base(connection, manager, subject, default, default) { _builder = builder; _output = output; diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs index 12a12fb18..77023f0ef 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Headers.cs @@ -40,7 +40,7 @@ await Retry.Until( Assert.False(headers.IsReadOnly); // Send with headers - await nats.PublishAsync("foo", 100, new NatsPubOpts { Headers = headers }); + await nats.PublishAsync("foo", 100, headers: headers); Assert.True(headers.IsReadOnly); Assert.Throws(() => @@ -63,7 +63,7 @@ await Retry.Until( Assert.Equal("multi-value-1", msg1.Headers["Multi"][1]); // Send empty headers - await nats.PublishAsync("foo", 200, new NatsPubOpts { Headers = new NatsHeaders() }); + await nats.PublishAsync("foo", 200, headers: new NatsHeaders()); var msg2 = await signal2; Assert.Equal(200, msg2.Data); diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs index 28c9e1f83..811d06776 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.QueueGroups.cs @@ -14,8 +14,8 @@ public async Task QueueGroupsTest() await using var conn2 = server.CreateClientConnection(); await using var conn3 = server.CreateClientConnection(); - var sub1 = await conn1.SubscribeAsync("foo.*", new NatsSubOpts { QueueGroup = "my-group" }); - var sub2 = await conn2.SubscribeAsync("foo.*", new NatsSubOpts { QueueGroup = "my-group" }); + var sub1 = await conn1.SubscribeAsync("foo.*", queueGroup: "my-group"); + var sub2 = await conn2.SubscribeAsync("foo.*", queueGroup: "my-group"); var signal = new WaitSignal(); var cts = new CancellationTokenSource(); diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs index 06a82d067..98c1407d7 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs @@ -129,7 +129,7 @@ public async Task RequestTest(int minSize) await Retry.Until( "reply handle is ready", () => Volatile.Read(ref sync) == 1, - async () => await pubConnection.PublishAsync(subject, 1, new NatsPubOpts { ReplyTo = "ignore" }), + async () => await pubConnection.PublishAsync(subject, 1, replyTo: "ignore"), retryDelay: TimeSpan.FromSeconds(1)); var v = await pubConnection.RequestAsync(subject, 9999); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 60b6f41b9..b045b31cd 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -148,7 +148,7 @@ await Retry.Until( Assert.Matches(@"^MSG foo.signal1 \w+ 0␍␊$", msgFrame1.Message); Log("HPUB notifications"); - await nats.PublishAsync("foo.signal2", opts: new NatsPubOpts { Headers = new NatsHeaders() }); + await nats.PublishAsync("foo.signal2", headers: new NatsHeaders()); var msg2 = await signal2; Assert.Equal(0, msg2.Data.Length); Assert.NotNull(msg2.Headers); @@ -182,7 +182,7 @@ void Log(string text) Log("### Auto-unsubscribe after consuming max-msgs"); { var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - await using var sub = await nats.SubscribeAsync("foo", opts); + await using var sub = await nats.SubscribeAsync("foo", opts: opts); sid++; await Retry.Until("all frames arrived", () => proxy.Frames.Count >= 2); @@ -246,7 +246,7 @@ void Log(string text) proxy.Reset(); var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - var sub = await nats.SubscribeAsync("foo3", opts); + var sub = await nats.SubscribeAsync("foo3", opts: opts); sid++; var count = 0; var reg = sub.Register(_ => Interlocked.Increment(ref count)); @@ -303,7 +303,7 @@ public async Task Reconnect_with_sub_and_additional_commands() var sync = 0; await using var sub = new NatsSubReconnectTest(nats, subject, i => Interlocked.Exchange(ref sync, i)); - await nats.SubAsync(sub.Subject, opts: default, sub); + await nats.SubAsync(sub.Subject, queueGroup: default, opts: default, sub: sub); await Retry.Until( "subscribed", @@ -341,7 +341,7 @@ private sealed class NatsSubReconnectTest : NatsSubBase private readonly Action _callback; internal NatsSubReconnectTest(NatsConnection connection, string subject, Action callback) - : base(connection, connection.SubscriptionManager, subject, default) => + : base(connection, connection.SubscriptionManager, subject, queueGroup: default, opts: default) => _callback = callback; internal override IEnumerable GetReconnectCommands(int sid) diff --git a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs index 9e40e5ce8..9b70d56c8 100644 --- a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs +++ b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs @@ -90,8 +90,8 @@ public async Task Auto_unsubscribe_on_max_messages_with_inbox_subscription_test( await using var nats = server.CreateClientConnection(); var subject = nats.NewInbox(); - await using var sub1 = await nats.SubscribeAsync(subject, new NatsSubOpts { MaxMsgs = 1 }); - await using var sub2 = await nats.SubscribeAsync(subject, new NatsSubOpts { MaxMsgs = 2 }); + await using var sub1 = await nats.SubscribeAsync(subject, opts: new NatsSubOpts { MaxMsgs = 1 }); + await using var sub2 = await nats.SubscribeAsync(subject, opts: new NatsSubOpts { MaxMsgs = 2 }); for (var i = 0; i < 3; i++) { @@ -131,7 +131,7 @@ public async Task Auto_unsubscribe_on_max_messages_test() const int maxMsgs = 99; var opts = new NatsSubOpts { MaxMsgs = maxMsgs }; - await using var sub = await nats.SubscribeAsync(subject, opts); + await using var sub = await nats.SubscribeAsync(subject, opts: opts); // send more messages than max to check we only get max for (var i = 0; i < maxMsgs + 10; i++) @@ -161,7 +161,7 @@ public async Task Auto_unsubscribe_on_timeout_test() const string subject = "foo2"; var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(1) }; - await using var sub = await nats.SubscribeAsync(subject, opts); + await using var sub = await nats.SubscribeAsync(subject, opts: opts); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; @@ -183,7 +183,7 @@ public async Task Auto_unsubscribe_on_idle_timeout_test() const string subject = "foo3"; var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(3) }; - await using var sub = await nats.SubscribeAsync(subject, opts); + await using var sub = await nats.SubscribeAsync(subject, opts: opts); await nats.PublishAsync(subject, 0); await nats.PublishAsync(subject, 1); diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index f53558fe6..14c73fde7 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -62,8 +62,8 @@ public async Task Create_stream_test() ack = await js.PublishAsync( "events.foo", new TestData { Test = 2 }, - new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, }, - cts1.Token); + headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + cancellationToken: cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(2, ack.Seq); @@ -73,8 +73,8 @@ public async Task Create_stream_test() ack = await js.PublishAsync( "events.foo", new TestData { Test = 2 }, - new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, }, - cts1.Token); + headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + cancellationToken: cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(2, ack.Seq); diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs index ffdc2aba3..dffa40110 100644 --- a/tests/NATS.Client.JetStream.Tests/PublishTest.cs +++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs @@ -33,7 +33,7 @@ public async Task Publish_test() var ack1 = await js.PublishAsync( subject: "s1.foo", data: new TestData { Test = 2 }, - opts: new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "2" } } }, + headers: new NatsHeaders { { "Nats-Msg-Id", "2" } }, cancellationToken: cts.Token); Assert.Null(ack1.Error); Assert.Equal(2, ack1.Seq); @@ -42,7 +42,7 @@ public async Task Publish_test() var ack2 = await js.PublishAsync( subject: "s1.foo", data: new TestData { Test = 2 }, - opts: new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "2" } } }, + headers: new NatsHeaders { { "Nats-Msg-Id", "2" } }, cancellationToken: cts.Token); Assert.Null(ack2.Error); Assert.True(ack2.Duplicate);