Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support all binary payload APIs with only generics #143

Merged
merged 9 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Runner(INatsConnection connection)
[RootCommand]
public async Task Run()
{
var subscription = await _connection.SubscribeAsync("foo");
var subscription = await _connection.SubscribeAsync<string>("foo");

_ = Task.Run(async () =>
{
Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync(subject);
var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data.Span)}\n");
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data!)}\n");
if (msg.Headers != null)
{
foreach (var (key, values) in msg.Headers)
Expand Down
11 changes: 4 additions & 7 deletions sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// > nats pub foo.xyz --count=10 "my_message_{{ Count }}"
using System.Text;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

Expand All @@ -12,13 +11,12 @@
await using var connection1 = new NatsConnection(options);

Print($"[1][SUB] Subscribing to subject '{subject}'...\n");
var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers");
var sub1 = await connection1.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var task1 = Task.Run(async () =>
{
await foreach (var msg in sub1.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
Print($"[1][RCV] {msg.Subject}: {data}\n");
Print($"[1][RCV] {msg.Subject}: {msg.Data}\n");
}
});

Expand All @@ -28,13 +26,12 @@
await using var connection2 = new NatsConnection(options);

Print($"[2][SUB] Subscribing to subject '{subject}'...\n");
var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers");
var sub2 = await connection2.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var task2 = Task.Run(async () =>
{
await foreach (var msg in sub2.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
Print($"[2][RCV] {msg.Subject}: {data}\n");
Print($"[2][RCV] {msg.Subject}: {msg.Data}\n");
}
});

Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeRaw/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync(subject);
var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
var data = Encoding.UTF8.GetString(msg.Data!);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the null-forgiving operator needed here? Is SubscribeAsync<T> eventually returning T? instead? Would be nice if we could avoid that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default values in general yield empty/sentinel payloads that's why there are T?s in a lot of places. Do you think we shouldn't allow nulls? How would we handle sending and receiving empty payloads?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I guess there is no way to avoid it for reference types when default may be retruned. Seeing as System.Text.JSON even does it

Print($"[RCV] {msg.Subject}: {data}\n");
}

Expand Down
10 changes: 5 additions & 5 deletions sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace Example.JetStream.PullConsumer;

public class RawDataSerializer : INatsSerializer
{
public INatsSerializer? Next => default;

public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
{
if (value is RawData data)
Expand All @@ -16,13 +18,11 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
throw new Exception($"Can only work with '{typeof(RawData)}'");
}

public T? Deserialize<T>(in ReadOnlySequence<byte> buffer) => (T?)Deserialize(buffer, typeof(T));

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type)
public T? Deserialize<T>(in ReadOnlySequence<byte> buffer)
{
if (type != typeof(RawData))
if (typeof(T) != typeof(RawData))
throw new Exception($"Can only work with '{typeof(RawData)}'");

return new RawData(buffer.ToArray());
return (T)(object)new RawData(buffer.ToArray());
}
}
5 changes: 2 additions & 3 deletions sandbox/MinimumWebApp/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Text;
using NATS.Client.Core;
using NATS.Client.Hosting;

Expand All @@ -11,13 +10,13 @@

app.MapGet("/subscribe", async (INatsConnection command) =>
{
var subscription = await command.SubscribeAsync("foo");
var subscription = await command.SubscribeAsync<int>("foo");

_ = Task.Run(async () =>
{
await foreach (var msg in subscription.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {Encoding.UTF8.GetString(msg.Data.ToArray())}");
Console.WriteLine($"Received {msg.Data}");
}
});
});
Expand Down
26 changes: 6 additions & 20 deletions sandbox/NatsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void RunPubSubBenchmark(string testName, long testCount, long testSize,
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -152,7 +152,7 @@ private void RunPubSubBenchmarkBatch(string testName, long testCount, long testS
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -236,7 +236,7 @@ private void ProfilingRunPubSubBenchmarkAsync(string testName, long testCount, l
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -316,7 +316,7 @@ private void RunPubSubBenchmarkBatchRaw(string testName, long testCount, long te
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -421,7 +421,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes
pubConn2.ConnectAsync().AsTask().Wait();
subConn2.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand All @@ -435,7 +435,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes
}
}
});
var d2 = subConn2.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d2 = subConn2.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount2);

Expand Down Expand Up @@ -824,18 +824,4 @@ internal static class NatsMsgTestUtils
});
return sub;
}

internal static INatsSub? Register(this INatsSub? sub, Action<NatsMsg> action)
{
if (sub == null)
return null;
Task.Run(async () =>
{
await foreach (var natsMsg in sub.Msgs.ReadAllAsync())
{
action(natsMsg);
}
});
return sub;
}
}
91 changes: 12 additions & 79 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,32 @@ public interface INatsConnection
ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the message payload to the given subject name, optionally supplying a reply subject.
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="payload">The message payload data.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="msg">A <see cref="NatsMsg"/> representing message details.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// Publishes an empty message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
/// <remarks>
/// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action
/// or indicate an event for example and of messages.
/// </remarks>
ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
caleblloyd marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -55,21 +49,6 @@ public interface INatsConnection
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(in NatsMsg<T> msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
/// <remarks>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </remarks>
ValueTask<INatsSub> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
Expand Down Expand Up @@ -106,7 +85,7 @@ public interface INatsConnection
/// <returns>Returns the <see cref="NatsMsg{T}"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg"/>.
/// Response can be (null) or one <see cref="NatsMsg{T}"/>.
/// Reply option's max messages will be set to 1.
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
Expand All @@ -118,30 +97,6 @@ public interface INatsConnection
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive a single reply from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <returns>Returns the <see cref="NatsMsg"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg"/>.
/// Reply option's max messages will be set to 1 (one).
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
ValueTask<NatsMsg?> RequestAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive zero or more replies from a responder.
/// </summary>
Expand All @@ -153,7 +108,7 @@ public interface INatsConnection
/// <param name="cancellationToken">Cancel this request</param>
/// <typeparam name="TRequest">Request type</typeparam>
/// <typeparam name="TReply">Reply type</typeparam>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg"/> objects</returns>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg{T}"/> objects</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
Expand All @@ -165,26 +120,4 @@ public interface INatsConnection
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive zero or more replies from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg"/> objects</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
IAsyncEnumerable<NatsMsg> RequestManyAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
}
Loading