Skip to content

Commit

Permalink
Merge branch 'main' into kv-initial-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Oct 6, 2023
2 parents 45e97f9 + 1220541 commit 6b9cf77
Show file tree
Hide file tree
Showing 30 changed files with 203 additions and 581 deletions.
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Runner(INatsConnection connection)
[RootCommand]
public async Task Run()
{
var subscription = await _connection.SubscribeAsync("foo");
var subscription = await _connection.SubscribeAsync<string>("foo");

_ = Task.Run(async () =>
{
Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync(subject);
var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data.Span)}\n");
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data!)}\n");
if (msg.Headers != null)
{
foreach (var (key, values) in msg.Headers)
Expand Down
11 changes: 4 additions & 7 deletions sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// > nats pub foo.xyz --count=10 "my_message_{{ Count }}"
using System.Text;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

Expand All @@ -12,13 +11,12 @@
await using var connection1 = new NatsConnection(options);

Print($"[1][SUB] Subscribing to subject '{subject}'...\n");
var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers");
var sub1 = await connection1.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var task1 = Task.Run(async () =>
{
await foreach (var msg in sub1.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
Print($"[1][RCV] {msg.Subject}: {data}\n");
Print($"[1][RCV] {msg.Subject}: {msg.Data}\n");
}
});

Expand All @@ -28,13 +26,12 @@
await using var connection2 = new NatsConnection(options);

Print($"[2][SUB] Subscribing to subject '{subject}'...\n");
var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers");
var sub2 = await connection2.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var task2 = Task.Run(async () =>
{
await foreach (var msg in sub2.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
Print($"[2][RCV] {msg.Subject}: {data}\n");
Print($"[2][RCV] {msg.Subject}: {msg.Data}\n");
}
});

Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeRaw/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync(subject);
var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
var data = Encoding.UTF8.GetString(msg.Data.ToArray());
var data = Encoding.UTF8.GetString(msg.Data!);
Print($"[RCV] {msg.Subject}: {data}\n");
}

Expand Down
10 changes: 5 additions & 5 deletions sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace Example.JetStream.PullConsumer;

public class RawDataSerializer : INatsSerializer
{
public INatsSerializer? Next => default;

public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
{
if (value is RawData data)
Expand All @@ -16,13 +18,11 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
throw new Exception($"Can only work with '{typeof(RawData)}'");
}

public T? Deserialize<T>(in ReadOnlySequence<byte> buffer) => (T?)Deserialize(buffer, typeof(T));

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type)
public T? Deserialize<T>(in ReadOnlySequence<byte> buffer)
{
if (type != typeof(RawData))
if (typeof(T) != typeof(RawData))
throw new Exception($"Can only work with '{typeof(RawData)}'");

return new RawData(buffer.ToArray());
return (T)(object)new RawData(buffer.ToArray());
}
}
5 changes: 2 additions & 3 deletions sandbox/MinimumWebApp/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Text;
using NATS.Client.Core;
using NATS.Client.Hosting;

Expand All @@ -11,13 +10,13 @@

app.MapGet("/subscribe", async (INatsConnection command) =>
{
var subscription = await command.SubscribeAsync("foo");
var subscription = await command.SubscribeAsync<int>("foo");

_ = Task.Run(async () =>
{
await foreach (var msg in subscription.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {Encoding.UTF8.GetString(msg.Data.ToArray())}");
Console.WriteLine($"Received {msg.Data}");
}
});
});
Expand Down
26 changes: 6 additions & 20 deletions sandbox/NatsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void RunPubSubBenchmark(string testName, long testCount, long testSize,
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -152,7 +152,7 @@ private void RunPubSubBenchmarkBatch(string testName, long testCount, long testS
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -236,7 +236,7 @@ private void ProfilingRunPubSubBenchmarkAsync(string testName, long testCount, l
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -316,7 +316,7 @@ private void RunPubSubBenchmarkBatchRaw(string testName, long testCount, long te
pubConn.ConnectAsync().AsTask().Wait();
subConn.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand Down Expand Up @@ -421,7 +421,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes
pubConn2.ConnectAsync().AsTask().Wait();
subConn2.ConnectAsync().AsTask().Wait();

var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d = subConn.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount);

Expand All @@ -435,7 +435,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes
}
}
});
var d2 = subConn2.SubscribeAsync(_subject).AsTask().Result.Register(_ =>
var d2 = subConn2.SubscribeAsync<byte[]>(_subject).AsTask().Result.Register(_ =>
{
Interlocked.Increment(ref subCount2);

Expand Down Expand Up @@ -824,18 +824,4 @@ internal static class NatsMsgTestUtils
});
return sub;
}

internal static INatsSub? Register(this INatsSub? sub, Action<NatsMsg> action)
{
if (sub == null)
return null;
Task.Run(async () =>
{
await foreach (var natsMsg in sub.Msgs.ReadAllAsync())
{
action(natsMsg);
}
});
return sub;
}
}
91 changes: 12 additions & 79 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,32 @@ public interface INatsConnection
ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the message payload to the given subject name, optionally supplying a reply subject.
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="payload">The message payload data.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="msg">A <see cref="NatsMsg"/> representing message details.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// Publishes an empty message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
/// <remarks>
/// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action
/// or indicate an event for example and of messages.
/// </remarks>
ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -55,21 +49,6 @@ public interface INatsConnection
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(in NatsMsg<T> msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
/// <remarks>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </remarks>
ValueTask<INatsSub> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
Expand Down Expand Up @@ -106,7 +85,7 @@ public interface INatsConnection
/// <returns>Returns the <see cref="NatsMsg{T}"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg"/>.
/// Response can be (null) or one <see cref="NatsMsg{T}"/>.
/// Reply option's max messages will be set to 1.
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
Expand All @@ -118,30 +97,6 @@ public interface INatsConnection
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive a single reply from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <returns>Returns the <see cref="NatsMsg"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg"/>.
/// Reply option's max messages will be set to 1 (one).
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
ValueTask<NatsMsg?> RequestAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive zero or more replies from a responder.
/// </summary>
Expand All @@ -153,7 +108,7 @@ public interface INatsConnection
/// <param name="cancellationToken">Cancel this request</param>
/// <typeparam name="TRequest">Request type</typeparam>
/// <typeparam name="TReply">Reply type</typeparam>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg"/> objects</returns>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg{T}"/> objects</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
Expand All @@ -165,26 +120,4 @@ public interface INatsConnection
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive zero or more replies from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg"/> objects</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
IAsyncEnumerable<NatsMsg> RequestManyAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
}
Loading

0 comments on commit 6b9cf77

Please sign in to comment.