Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Options tidy-up #128

Merged
merged 4 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/documentation/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ var replyTasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
// Create three subscriptions all on the same queue group
var opts = new NatsSubOpts { QueueGroup = "maths-service" };
var sub = await nats.SubscribeAsync<int>("math.double", opts);
var sub = await nats.SubscribeAsync<int>("math.double", queueGroup: "maths-service");

subs.Add(sub);

Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.PublishHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
await connection.PublishAsync<Bar>(
subject,
new Bar { Id = i, Name = "Baz" },
new NatsPubOpts { Headers = new NatsHeaders { ["XFoo"] = $"bar{i}" } });
headers: new NatsHeaders { ["XFoo"] = $"bar{i}" });
}

void Print(string message)
Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
await using var connection1 = new NatsConnection(options);

Print($"[1][SUB] Subscribing to subject '{subject}'...\n");
var sub1 = await connection1.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" });
var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers");
var task1 = Task.Run(async () =>
{
await foreach (var msg in sub1.Msgs.ReadAllAsync())
Expand All @@ -28,7 +28,7 @@
await using var connection2 = new NatsConnection(options);

Print($"[2][SUB] Subscribing to subject '{subject}'...\n");
var sub2 = await connection2.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" });
var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers");
var task2 = Task.Run(async () =>
{
await foreach (var msg in sub2.Msgs.ReadAllAsync())
Expand Down
39 changes: 31 additions & 8 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Buffers;
using System.Runtime.CompilerServices;

namespace NATS.Client.Core;

Expand All @@ -17,10 +16,12 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="payload">The message payload data.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -35,41 +36,55 @@ public interface INatsConnection
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="data">Serializable data object</param>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(string subject, T data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="msg">A <see cref="NatsMsg{T}"/> representing message details.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(in NatsMsg<T> msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<INatsSub> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);
/// <remarks>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </remarks>
ValueTask<INatsSub> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);
/// <remarks>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </remarks>
ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID}
Expand All @@ -82,6 +97,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="data">Data to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -97,6 +113,7 @@ public interface INatsConnection
ValueTask<NatsMsg<TReply?>?> RequestAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand All @@ -106,6 +123,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -119,6 +137,7 @@ public interface INatsConnection
ValueTask<NatsMsg?> RequestAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand All @@ -128,6 +147,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="data">Data to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -141,6 +161,7 @@ public interface INatsConnection
IAsyncEnumerable<NatsMsg<TReply?>> RequestManyAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand All @@ -150,6 +171,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -161,6 +183,7 @@ public interface INatsConnection
IAsyncEnumerable<NatsMsg> RequestManyAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public InboxSub(
NatsSubOpts? opts,
NatsConnection connection,
ISubscriptionManager manager)
: base(connection, manager, subject, opts)
: base(connection, manager, subject, queueGroup: default, opts)
{
_inbox = inbox;
_connection = connection;
Expand Down
9 changes: 5 additions & 4 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)

internal InboxSubBuilder InboxSubBuilder { get; }

public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
public async ValueTask SubscribeAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
if (IsInboxSubject(subject))
{
await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
}
else
{
await SubscribeInternalAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
await SubscribeInternalAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -179,6 +179,7 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N
_inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this);
await SubscribeInternalAsync(
inboxSubject,
queueGroup: default,
opts: default,
_inboxSub,
cancellationToken).ConfigureAwait(false);
Expand All @@ -193,7 +194,7 @@ await SubscribeInternalAsync(
await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false);
}

private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
var sid = GetNextSid();
lock (_gate)
Expand All @@ -204,7 +205,7 @@ private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts

try
{
await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken)
await _connection.SubscribeCoreAsync(sid, subject, queueGroup, opts?.MaxMsgs, cancellationToken)
.ConfigureAwait(false);
await sub.ReadyAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Buffers;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

Expand Down Expand Up @@ -100,17 +99,17 @@ internal ValueTask PubModelAsync<T>(string subject, T? data, INatsSerializer ser
}
}

internal ValueTask SubAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default)
internal ValueTask SubAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default)
{
if (ConnectionState == NatsConnectionState.Open)
{
return SubscriptionManager.SubscribeAsync(subject, opts, sub, cancellationToken);
return SubscriptionManager.SubscribeAsync(subject, queueGroup, opts, sub, cancellationToken);
}
else
{
return WithConnectAsync(subject, opts, sub, cancellationToken, static (self, s, o, b, token) =>
return WithConnectAsync(subject, queueGroup, opts, sub, cancellationToken, static (self, s, q, o, b, token) =>
{
return self.SubscriptionManager.SubscribeAsync(s, o, b, token);
return self.SubscriptionManager.SubscribeAsync(s, q, o, b, token);
});
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/NATS.Client.Core/NatsConnection.Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,41 @@ namespace NATS.Client.Core;
public partial class NatsConnection
{
/// <inheritdoc />
public ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
public ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
if (opts?.WaitUntilSent ?? false)
{
return PubAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken);
return PubAsync(subject, replyTo, payload, headers, cancellationToken);
}
else
{
return PubPostAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken);
return PubPostAsync(subject, replyTo, payload, headers, cancellationToken);
}
}

/// <inheritdoc />
public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken);
return PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken);
}

/// <inheritdoc />
public ValueTask PublishAsync<T>(string subject, T? data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
public ValueTask PublishAsync<T>(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
var serializer = opts?.Serializer ?? Opts.Serializer;
if (opts?.WaitUntilSent ?? false)
{
return PubModelAsync<T>(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken);
return PubModelAsync<T>(subject, data, serializer, replyTo, headers, cancellationToken);
}
else
{
return PubModelPostAsync<T>(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken);
return PubModelPostAsync<T>(subject, data, serializer, replyTo, headers, cancellationToken);
}
}

/// <inheritdoc />
public ValueTask PublishAsync<T>(in NatsMsg<T> msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
return PublishAsync<T>(msg.Subject, msg.Data, opts, cancellationToken);
return PublishAsync<T>(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken);
}
}
Loading