From 11e07f0138456489b7a54677641de97ef5486a5e Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 29 Sep 2023 17:27:44 +0100 Subject: [PATCH 1/8] API serializer proposal --- sandbox/ConsoleApp/Program.cs | 2 +- .../Example.Core.SubscribeHeaders/Program.cs | 4 +- .../Program.cs | 11 +- sandbox/Example.Core.SubscribeRaw/Program.cs | 4 +- sandbox/MinimumWebApp/Program.cs | 5 +- sandbox/NatsBenchmark/Program.cs | 26 +--- src/NATS.Client.Core/INatsConnection.cs | 84 +---------- src/NATS.Client.Core/INatsSerializer.cs | 15 ++ src/NATS.Client.Core/INatsSub.cs | 27 ---- .../NatsConnection.Publish.cs | 14 +- .../NatsConnection.RequestReply.cs | 51 ------- .../NatsConnection.RequestSub.cs | 25 ---- .../NatsConnection.Subscribe.cs | 8 - src/NATS.Client.Core/NatsMsg.cs | 138 ------------------ src/NATS.Client.Core/NatsRequestExtensions.cs | 41 +----- .../NatsRequestManyExtensions.cs | 35 +---- .../{NatsSub.cs => NatsSubUtils.cs} | 96 +++++------- .../Internal/NatsJSConsume.cs | 4 +- .../Internal/NatsJSFetch.cs | 2 +- src/NATS.Client.JetStream/NatsJSMsg.cs | 7 +- .../NatsSubTests.cs | 4 +- .../CancellationTest.cs | 2 +- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 10 +- .../RequestReplyTest.cs | 21 +-- tests/NATS.Client.Perf/Program.cs | 2 +- tests/NATS.Client.TestUtilities/Utils.cs | 26 ---- 26 files changed, 100 insertions(+), 564 deletions(-) rename src/NATS.Client.Core/{NatsSub.cs => NatsSubUtils.cs} (72%) diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index d670013f2..cee6c33d4 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -84,7 +84,7 @@ public Runner(INatsConnection connection) [RootCommand] public async Task Run() { - var subscription = await _connection.SubscribeAsync("foo"); + var subscription = await _connection.SubscribeAsync("foo"); _ = Task.Run(async () => { diff --git a/sandbox/Example.Core.SubscribeHeaders/Program.cs b/sandbox/Example.Core.SubscribeHeaders/Program.cs index b0d7bb6ed..ab8feee76 100644 --- a/sandbox/Example.Core.SubscribeHeaders/Program.cs +++ b/sandbox/Example.Core.SubscribeHeaders/Program.cs @@ -13,11 +13,11 @@ Print($"[SUB] Subscribing to subject '{subject}'...\n"); -var sub = await connection.SubscribeAsync(subject); +var sub = await connection.SubscribeAsync(subject); await foreach (var msg in sub.Msgs.ReadAllAsync()) { - Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data.Span)}\n"); + Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data!)}\n"); if (msg.Headers != null) { foreach (var (key, values) in msg.Headers) diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs index 5f17ea87e..32cfab0fb 100644 --- a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs +++ b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs @@ -1,5 +1,4 @@ // > nats pub foo.xyz --count=10 "my_message_{{ Count }}" -using System.Text; using Microsoft.Extensions.Logging; using NATS.Client.Core; @@ -12,13 +11,12 @@ await using var connection1 = new NatsConnection(options); Print($"[1][SUB] Subscribing to subject '{subject}'...\n"); -var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers"); +var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers"); var task1 = Task.Run(async () => { await foreach (var msg in sub1.Msgs.ReadAllAsync()) { - var data = Encoding.UTF8.GetString(msg.Data.ToArray()); - Print($"[1][RCV] {msg.Subject}: {data}\n"); + Print($"[1][RCV] {msg.Subject}: {msg.Data}\n"); } }); @@ -28,13 +26,12 @@ await using var connection2 = new NatsConnection(options); Print($"[2][SUB] Subscribing to subject '{subject}'...\n"); -var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers"); +var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers"); var task2 = Task.Run(async () => { await foreach (var msg in sub2.Msgs.ReadAllAsync()) { - var data = Encoding.UTF8.GetString(msg.Data.ToArray()); - Print($"[2][RCV] {msg.Subject}: {data}\n"); + Print($"[2][RCV] {msg.Subject}: {msg.Data}\n"); } }); diff --git a/sandbox/Example.Core.SubscribeRaw/Program.cs b/sandbox/Example.Core.SubscribeRaw/Program.cs index 514c0ea79..7ab1559d8 100644 --- a/sandbox/Example.Core.SubscribeRaw/Program.cs +++ b/sandbox/Example.Core.SubscribeRaw/Program.cs @@ -11,11 +11,11 @@ Print($"[SUB] Subscribing to subject '{subject}'...\n"); -var sub = await connection.SubscribeAsync(subject); +var sub = await connection.SubscribeAsync(subject); await foreach (var msg in sub.Msgs.ReadAllAsync()) { - var data = Encoding.UTF8.GetString(msg.Data.ToArray()); + var data = Encoding.UTF8.GetString(msg.Data!); Print($"[RCV] {msg.Subject}: {data}\n"); } diff --git a/sandbox/MinimumWebApp/Program.cs b/sandbox/MinimumWebApp/Program.cs index e62c9e059..d32e7b178 100644 --- a/sandbox/MinimumWebApp/Program.cs +++ b/sandbox/MinimumWebApp/Program.cs @@ -1,4 +1,3 @@ -using System.Text; using NATS.Client.Core; using NATS.Client.Hosting; @@ -11,13 +10,13 @@ app.MapGet("/subscribe", async (INatsConnection command) => { - var subscription = await command.SubscribeAsync("foo"); + var subscription = await command.SubscribeAsync("foo"); _ = Task.Run(async () => { await foreach (var msg in subscription.Msgs.ReadAllAsync()) { - Console.WriteLine($"Received {Encoding.UTF8.GetString(msg.Data.ToArray())}"); + Console.WriteLine($"Received {msg.Data}"); } }); }); diff --git a/sandbox/NatsBenchmark/Program.cs b/sandbox/NatsBenchmark/Program.cs index 7c71b73de..4152738d6 100644 --- a/sandbox/NatsBenchmark/Program.cs +++ b/sandbox/NatsBenchmark/Program.cs @@ -77,7 +77,7 @@ private void RunPubSubBenchmark(string testName, long testCount, long testSize, pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -152,7 +152,7 @@ private void RunPubSubBenchmarkBatch(string testName, long testCount, long testS pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -236,7 +236,7 @@ private void ProfilingRunPubSubBenchmarkAsync(string testName, long testCount, l pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -316,7 +316,7 @@ private void RunPubSubBenchmarkBatchRaw(string testName, long testCount, long te pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -421,7 +421,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes pubConn2.ConnectAsync().AsTask().Wait(); subConn2.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -435,7 +435,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes } } }); - var d2 = subConn2.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d2 = subConn2.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount2); @@ -824,18 +824,4 @@ internal static class NatsMsgTestUtils }); return sub; } - - internal static INatsSub? Register(this INatsSub? sub, Action action) - { - if (sub == null) - return null; - Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - action(natsMsg); - } - }); - return sub; - } } diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index 647a8b52a..e1a43d8f5 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -11,27 +11,6 @@ public interface INatsConnection /// A that represents the asynchronous round trip operation. ValueTask PingAsync(CancellationToken cancellationToken = default); - /// - /// Publishes the message payload to the given subject name, optionally supplying a reply subject. - /// - /// The destination subject to publish to. - /// The message payload data. - /// Optional message headers. - /// Optional reply-to subject. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - - /// - /// Publishes the message payload to the given subject name, optionally supplying a reply subject. - /// - /// A representing message details. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. /// @@ -45,6 +24,8 @@ public interface INatsConnection /// A that represents the asynchronous send operation. ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. /// @@ -55,21 +36,6 @@ public interface INatsConnection /// A that represents the asynchronous send operation. ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - /// - /// Initiates a subscription to a subject, optionally joining a distributed queue group. - /// - /// The subject name to subscribe to. - /// If specified, the subscriber will join this queue group. - /// A for subscription options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Subscribers with the same queue group name, become a queue group, - /// and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); - /// /// Initiates a subscription to a subject, optionally joining a distributed queue group. /// @@ -118,30 +84,6 @@ public interface INatsConnection NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); - /// - /// Request and receive a single reply from a responder. - /// - /// Subject of the responder - /// Payload to send to responder - /// Optional message headers - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// Returns the received from the responder as reply. - /// Raised when cancellation token is used - /// - /// Response can be (null) or one . - /// Reply option's max messages will be set to 1 (one). - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - ValueTask RequestAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default); - /// /// Request and receive zero or more replies from a responder. /// @@ -165,26 +107,4 @@ public interface INatsConnection NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); - - /// - /// Request and receive zero or more replies from a responder. - /// - /// Subject of the responder - /// Payload to send to responder - /// Optional message headers - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// An asynchronous enumerable of objects - /// Raised when cancellation token is used - /// - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - IAsyncEnumerable RequestManyAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default); } diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index 4b78f71de..a8b2d6132 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -84,3 +84,18 @@ public void Advance(int count) public Span GetSpan(int sizeHint = 0) => Array.Empty(); } } + +public abstract class NatsBaseSerializer : INatsSerializer +{ + protected NatsBaseSerializer(NatsBaseSerializer next) => Next = next; + + public NatsBaseSerializer Next { get; } + + public abstract int Serialize(ICountableBufferWriter bufferWriter, T? value); + + public abstract T? Deserialize(in ReadOnlySequence buffer); + + public abstract object? Deserialize(in ReadOnlySequence buffer, Type type); +} + +public class diff --git a/src/NATS.Client.Core/INatsSub.cs b/src/NATS.Client.Core/INatsSub.cs index 91ef3c48f..a74ecfb58 100644 --- a/src/NATS.Client.Core/INatsSub.cs +++ b/src/NATS.Client.Core/INatsSub.cs @@ -2,33 +2,6 @@ namespace NATS.Client.Core; -public interface INatsSub : IAsyncDisposable -{ - /// - /// Access incoming messages for your subscription. - /// - ChannelReader Msgs { get; } - - /// - /// The subject name to subscribe to. - /// - string Subject { get; } - - /// - /// If specified, the subscriber will join this queue group. Subscribers with the same queue group name, - /// become a queue group, and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - string? QueueGroup { get; } - - /// - /// Complete the message channel, stop timers if they were used and send an unsubscribe - /// message to the server. - /// - /// A that represents the asynchronous server UNSUB operation. - public ValueTask UnsubscribeAsync(); -} - public interface INatsSub : IAsyncDisposable { /// diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 388908106..f98f5a573 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -1,28 +1,20 @@ -using System.Buffers; - namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { if (opts?.WaitUntilSent ?? false) { - return PubAsync(subject, replyTo, payload, headers, cancellationToken); + return PubAsync(subject, replyTo, payload: default, headers, cancellationToken); } else { - return PubPostAsync(subject, replyTo, payload, headers, cancellationToken); + return PubPostAsync(subject, replyTo, payload: default, headers, cancellationToken); } } - /// - public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - return PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken); - } - /// public ValueTask PublishAsync(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index 04a5c6491..c6fb41a1f 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -1,4 +1,3 @@ -using System.Buffers; using System.Runtime.CompilerServices; namespace NATS.Client.Core; @@ -35,30 +34,6 @@ public partial class NatsConnection return null; } - /// - public async ValueTask RequestAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - var opts = SetReplyOptsDefaults(replyOpts); - - await using var sub = await RequestSubAsync(subject, payload, headers, requestOpts, opts, cancellationToken).ConfigureAwait(false); - - if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - if (sub.Msgs.TryRead(out var msg)) - { - return msg; - } - } - - return null; - } - /// public async IAsyncEnumerable> RequestManyAsync( string subject, @@ -86,32 +61,6 @@ public partial class NatsConnection } } - /// - public async IAsyncEnumerable RequestManyAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - await using var sub = await RequestSubAsync(subject, payload, headers, requestOpts, replyOpts, cancellationToken).ConfigureAwait(false); - - while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - while (sub.Msgs.TryRead(out var msg)) - { - // Received end of stream sentinel - if (msg.Data.Length == 0) - { - yield break; - } - - yield return msg; - } - } - } - private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts) { var opts = replyOpts ?? DefaultReplyOpts; diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index cbdfa5320..ae81ddb3f 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -1,32 +1,7 @@ -using System.Buffers; - namespace NATS.Client.Core; public partial class NatsConnection { - internal async ValueTask RequestSubAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; - var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts); - await SubAsync(replyTo, queueGroup: default, replyOpts, sub, cancellationToken).ConfigureAwait(false); - if (requestOpts?.WaitUntilSent == true) - { - await PubAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); - } - else - { - await PubPostAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); - } - - return sub; - } - internal async ValueTask> RequestSubAsync( string subject, TRequest? data, diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index f849cb331..3e6460420 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -2,14 +2,6 @@ namespace NATS.Client.Core; public partial class NatsConnection { - /// - public async ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) - { - var sub = new NatsSub(this, SubscriptionManager, subject, queueGroup, opts); - await SubAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); - return sub; - } - /// public async ValueTask> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 67edc2b61..43dab1ce6 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -3,110 +3,6 @@ namespace NATS.Client.Core; -/// -/// NATS message structure as defined by the protocol. -/// -/// The destination subject to publish to. -/// The reply subject that subscribers can use to send a response back to the publisher/requester. -/// Message size in bytes. -/// Pass additional information using name-value pairs. -/// The message payload data. -/// NATS connection this message is associated to. -/// -/// Connection property is used to provide reply functionality. -/// -/// Message size is calculated using the same method NATS server uses: -/// -/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; -/// -/// -/// -public readonly record struct NatsMsg( - string Subject, - string? ReplyTo, - int Size, - NatsHeaders? Headers, - ReadOnlyMemory Data, - INatsConnection? Connection) -{ - internal static NatsMsg Build( - string subject, - string? replyTo, - in ReadOnlySequence? headersBuffer, - in ReadOnlySequence payloadBuffer, - INatsConnection? connection, - NatsHeaderParser headerParser) - { - NatsHeaders? headers = null; - - if (headersBuffer != null) - { - headers = new NatsHeaders(); - if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) - { - throw new NatsException("Error parsing headers"); - } - - headers.SetReadOnly(); - } - - var size = subject.Length - + (replyTo?.Length ?? 0) - + (headersBuffer?.Length ?? 0) - + payloadBuffer.Length; - - return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); - } - - /// - /// Reply to this message. - /// - /// The message payload data. - /// Optional message headers. - /// Optional reply-to subject. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload, headers, replyTo, opts, cancellationToken); - } - - /// - /// Reply to this message. - /// - /// A representing message details. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); - } - - [MemberNotNull(nameof(Connection))] - private void CheckReplyPreconditions() - { - if (Connection == default) - { - throw new NatsException("unable to send reply; message did not originate from a subscription"); - } - - if (string.IsNullOrWhiteSpace(ReplyTo)) - { - throw new NatsException("unable to send reply; ReplyTo is empty"); - } - } -} - /// /// NATS message structure as defined by the protocol. /// @@ -207,40 +103,6 @@ public ValueTask ReplyAsync(NatsMsg msg, NatsPubOpts? opts = def return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); } - /// - /// Reply to this message. - /// - /// The message payload data. - /// Optional message headers. - /// Optional reply-to subject. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload: payload, headers, replyTo, opts, cancellationToken); - } - - /// - /// Reply to this message. - /// - /// A representing message details. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); - } - [MemberNotNull(nameof(Connection))] private void CheckReplyPreconditions() { diff --git a/src/NATS.Client.Core/NatsRequestExtensions.cs b/src/NATS.Client.Core/NatsRequestExtensions.cs index 99265e822..117041f94 100644 --- a/src/NATS.Client.Core/NatsRequestExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestExtensions.cs @@ -1,5 +1,3 @@ -using System.Buffers; - namespace NATS.Client.Core; public static class NatsRequestExtensions @@ -17,7 +15,7 @@ public static class NatsRequestExtensions /// Returns the received from the responder as reply. /// Raised when cancellation token is used /// - /// Response can be (null) or one . + /// Response can be (null) or one . /// Reply option's max messages will be set to 1. /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. /// @@ -39,48 +37,13 @@ public static class NatsRequestExtensions cancellationToken); } - /// - /// Request and receive a single reply from a responder. - /// - /// NATS connection - /// Message to be sent as request - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// Returns the received from the responder as reply. - /// Raised when cancellation token is used - /// - /// Response can be (null) or one . - /// Reply option's max messages will be set to 1. - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - public static ValueTask RequestAsync( - this INatsConnection nats, - in NatsMsg msg, - in NatsPubOpts? requestOpts = default, - in NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - CheckMsgForRequestReply(msg); - - return nats.RequestAsync( - msg.Subject, - payload: new ReadOnlySequence(msg.Data), - msg.Headers, - requestOpts, - replyOpts, - cancellationToken); - } - - internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); - internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); private static void CheckForRequestReply(string? replyTo) { if (!string.IsNullOrWhiteSpace(replyTo)) { - throw new NatsException($"Can't set {nameof(NatsMsg.ReplyTo)} for a request"); + throw new NatsException($"Can't set reply-to for a request"); } } } diff --git a/src/NATS.Client.Core/NatsRequestManyExtensions.cs b/src/NATS.Client.Core/NatsRequestManyExtensions.cs index 425c7e0bc..967feeb25 100644 --- a/src/NATS.Client.Core/NatsRequestManyExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestManyExtensions.cs @@ -1,40 +1,7 @@ -using System.Buffers; - namespace NATS.Client.Core; public static class NatsRequestManyExtensions { - /// - /// Request and receive zero or more replies from a responder. - /// - /// NATS connection - /// Message to be sent as request - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// An asynchronous enumerable of objects - /// Raised when cancellation token is used - /// - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - public static IAsyncEnumerable RequestManyAsync( - this INatsConnection nats, - NatsMsg msg, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - NatsRequestExtensions.CheckMsgForRequestReply(msg); - - return nats.RequestManyAsync( - msg.Subject, - payload: new ReadOnlySequence(msg.Data), - msg.Headers, - requestOpts, - replyOpts, - cancellationToken); - } - /// /// Request and receive zero or more replies from a responder. /// @@ -45,7 +12,7 @@ public static IAsyncEnumerable RequestManyAsync( /// Cancel this request /// Request type /// Reply type - /// An asynchronous enumerable of objects + /// An asynchronous enumerable of objects /// Raised when cancellation token is used /// /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSubUtils.cs similarity index 72% rename from src/NATS.Client.Core/NatsSub.cs rename to src/NATS.Client.Core/NatsSubUtils.cs index f7ef07f65..5ee655f57 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSubUtils.cs @@ -5,66 +5,6 @@ namespace NATS.Client.Core; -public sealed class NatsSub : NatsSubBase, INatsSub -{ - private static readonly BoundedChannelOptions DefaultChannelOpts = - new BoundedChannelOptions(1_000) - { - FullMode = BoundedChannelFullMode.Wait, - SingleWriter = true, - SingleReader = false, - AllowSynchronousContinuations = false, - }; - - private readonly Channel _msgs; - - internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, string? queueGroup, NatsSubOpts? opts) - : base(connection, manager, subject, queueGroup, opts) => - _msgs = Channel.CreateBounded( - GetChannelOpts(opts?.ChannelOpts)); - - public ChannelReader Msgs => _msgs.Reader; - - internal static BoundedChannelOptions GetChannelOpts( - NatsSubChannelOpts? subChannelOpts) - { - if (subChannelOpts is { } overrideOpts) - { - return new BoundedChannelOptions(overrideOpts.Capacity ?? - DefaultChannelOpts.Capacity) - { - AllowSynchronousContinuations = - DefaultChannelOpts.AllowSynchronousContinuations, - FullMode = - overrideOpts.FullMode ?? DefaultChannelOpts.FullMode, - SingleWriter = DefaultChannelOpts.SingleWriter, - SingleReader = DefaultChannelOpts.SingleReader, - }; - } - else - { - return DefaultChannelOpts; - } - } - - protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) - { - var natsMsg = NatsMsg.Build( - subject, - replyTo, - headersBuffer, - payloadBuffer, - Connection, - Connection.HeaderParser); - - await _msgs.Writer.WriteAsync(natsMsg).ConfigureAwait(false); - - DecrementMaxMsgs(); - } - - protected override void TryComplete() => _msgs.Writer.TryComplete(); -} - public sealed class NatsSub : NatsSubBase, INatsSub { private readonly Channel> _msgs; @@ -79,7 +19,7 @@ internal NatsSub( : base(connection, manager, subject, queueGroup, opts) { _msgs = Channel.CreateBounded>( - NatsSub.GetChannelOpts(opts?.ChannelOpts)); + NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); Serializer = serializer; } @@ -123,3 +63,37 @@ public NatsSubException(string message, ExceptionDispatchInfo exception, Memory< public Memory Headers { get; } } + +internal sealed class NatsSubUtils +{ + private static readonly BoundedChannelOptions DefaultChannelOpts = + new BoundedChannelOptions(1_000) + { + FullMode = BoundedChannelFullMode.Wait, + SingleWriter = true, + SingleReader = false, + AllowSynchronousContinuations = false, + }; + + internal static BoundedChannelOptions GetChannelOpts( + NatsSubChannelOpts? subChannelOpts) + { + if (subChannelOpts is { } overrideOpts) + { + return new BoundedChannelOptions(overrideOpts.Capacity ?? + DefaultChannelOpts.Capacity) + { + AllowSynchronousContinuations = + DefaultChannelOpts.AllowSynchronousContinuations, + FullMode = + overrideOpts.FullMode ?? DefaultChannelOpts.FullMode, + SingleWriter = DefaultChannelOpts.SingleWriter, + SingleReader = DefaultChannelOpts.SingleReader, + }; + } + else + { + return DefaultChannelOpts; + } + } +} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 64b7e2df6..76f594049 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -105,10 +105,10 @@ public NatsJSConsume( Timeout.Infinite, Timeout.Infinite); - _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOpts(opts?.ChannelOpts)); + _userMsgs = Channel.CreateBounded>(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); Msgs = _userMsgs.Reader; - _pullRequests = Channel.CreateBounded(NatsSub.GetChannelOpts(opts?.ChannelOpts)); + _pullRequests = Channel.CreateBounded(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); _pullTask = Task.Run(PullLoop); ResetPending(); diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index c04049b15..ea69635af 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -57,7 +57,7 @@ public NatsJSFetch( _pendingMsgs = _maxMsgs; _pendingBytes = _maxBytes; - _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOpts(opts?.ChannelOpts)); + _userMsgs = Channel.CreateBounded>(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); Msgs = _userMsgs.Reader; if (_debug) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 5b5d040ec..ee657083e 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -108,8 +108,11 @@ private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = de throw new NatsJSException("No user message, can't acknowledge"); return _msg.ReplyAsync( - payload: payload, - opts: new NatsPubOpts { WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent }, + data: payload, + opts: new NatsPubOpts + { + WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, + }, cancellationToken: cancellationToken); } } diff --git a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs index bab7caaad..4290303f9 100644 --- a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs +++ b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs @@ -16,7 +16,7 @@ public void Subject_manager_should_not_hold_on_to_subscription_if_collected() async Task Isolator() { // Subscription is not being disposed here - var natsSub = await nats.SubscribeAsync("foo"); + var natsSub = await nats.SubscribeAsync("foo"); Assert.That(natsSub.Subject, Is.EqualTo("foo")); } @@ -26,7 +26,7 @@ async Task Isolator() dotMemory.Check(memory => { - var count = memory.GetObjects(where => where.Type.Is()).ObjectsCount; + var count = memory.GetObjects(where => where.Type.Is()).ObjectsCount; Assert.That(count, Is.EqualTo(0)); }); } diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs index e6e63ce11..3a51ad62f 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs @@ -21,7 +21,7 @@ public async Task CommandTimeoutTest() await using var pubConnection = server.CreateClientConnection(NatsOpts.Default with { CommandTimeout = TimeSpan.FromSeconds(1) }); await pubConnection.ConnectAsync(); - await subConnection.SubscribeAsync("foo"); + await subConnection.SubscribeAsync("foo"); var cmd = new SleepWriteCommand("PUB foo 5\r\naiueo", TimeSpan.FromSeconds(10)); pubConnection.PostDirectWrite(cmd); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index b045b31cd..021a687fa 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -112,9 +112,9 @@ void Log(string text) var (nats, proxy) = server.CreateProxiedClientConnection(); var sync = 0; - var signal1 = new WaitSignal(); - var signal2 = new WaitSignal(); - var sub = await nats.SubscribeAsync("foo.*"); + var signal1 = new WaitSignal>(); + var signal2 = new WaitSignal>(); + var sub = await nats.SubscribeAsync("foo.*"); var reg = sub.Register(m => { switch (m.Subject) @@ -140,7 +140,7 @@ await Retry.Until( Log("PUB notifications"); await nats.PublishAsync("foo.signal1"); var msg1 = await signal1; - Assert.Equal(0, msg1.Data.Length); + Assert.Equal(0, msg1.Data); Assert.Null(msg1.Headers); var pubFrame1 = proxy.Frames.First(f => f.Message.StartsWith("PUB foo.signal1")); Assert.Equal("PUB foo.signal1 0␍␊", pubFrame1.Message); @@ -150,7 +150,7 @@ await Retry.Until( Log("HPUB notifications"); await nats.PublishAsync("foo.signal2", headers: new NatsHeaders()); var msg2 = await signal2; - Assert.Equal(0, msg2.Data.Length); + Assert.Equal(0, msg2.Data); Assert.NotNull(msg2.Headers); Assert.Empty(msg2.Headers!); var pubFrame2 = proxy.Frames.First(f => f.Message.StartsWith("HPUB foo.signal2")); diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index f60b4993e..5c5e52954 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -260,11 +260,6 @@ public async Task Request_reply_many_test_sentinel() [Fact] public async Task Request_reply_binary_test() { - static ReadOnlySequence ToSeq(string input) - { - return new ReadOnlySequence(Encoding.ASCII.GetBytes(input)); - } - static string ToStr(ReadOnlyMemory input) { return Encoding.ASCII.GetString(input.Span); @@ -274,22 +269,22 @@ static string ToStr(ReadOnlyMemory input) await using var server = NatsServer.Start(); await using var nats = server.CreateClientConnection(); - await using var sub = await nats.SubscribeAsync("foo", cancellationToken: cts.Token); + await using var sub = await nats.SubscribeAsync("foo", cancellationToken: cts.Token); var reg = sub.Register(async m => { - if (ToStr(m.Data) == "1") + if (m.Data == "1") { - await m.ReplyAsync(payload: ToSeq("qw"), cancellationToken: cts.Token); - await m.ReplyAsync(payload: ToSeq("er"), cancellationToken: cts.Token); - await m.ReplyAsync(payload: ToSeq("ty"), cancellationToken: cts.Token); - await m.ReplyAsync(payload: default, cancellationToken: cts.Token); // sentinel + await m.ReplyAsync("qw", cancellationToken: cts.Token); + await m.ReplyAsync("er", cancellationToken: cts.Token); + await m.ReplyAsync("ty", cancellationToken: cts.Token); + await m.ReplyAsync(default(string), cancellationToken: cts.Token); // sentinel } }); var writer = new ArrayBufferWriter(); - await foreach (var msg in nats.RequestManyAsync("foo", ToSeq("1"), cancellationToken: cts.Token)) + await foreach (var msg in nats.RequestManyAsync("foo", "1", cancellationToken: cts.Token)) { - writer.Write(msg.Data.Span); + writer.Write(Encoding.UTF8.GetBytes(msg.Data!)); } var buffer = ToStr(writer.WrittenMemory); diff --git a/tests/NATS.Client.Perf/Program.cs b/tests/NATS.Client.Perf/Program.cs index 36fb9f142..14a1977ae 100644 --- a/tests/NATS.Client.Perf/Program.cs +++ b/tests/NATS.Client.Perf/Program.cs @@ -29,7 +29,7 @@ await nats1.PingAsync(); await nats2.PingAsync(); -await using var sub = await nats1.SubscribeAsync(t.Subject); +await using var sub = await nats1.SubscribeAsync(t.Subject); var stopwatch = Stopwatch.StartNew(); diff --git a/tests/NATS.Client.TestUtilities/Utils.cs b/tests/NATS.Client.TestUtilities/Utils.cs index 517349f22..a5315ffc5 100644 --- a/tests/NATS.Client.TestUtilities/Utils.cs +++ b/tests/NATS.Client.TestUtilities/Utils.cs @@ -72,32 +72,6 @@ public static Task Register(this INatsSub? sub, Func, Task> ac } }); } - - public static Task Register(this INatsSub? sub, Action action) - { - if (sub == null) - return Task.CompletedTask; - return Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - action(natsMsg); - } - }); - } - - public static Task Register(this INatsSub? sub, Func action) - { - if (sub == null) - return Task.CompletedTask; - return Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - await action(natsMsg); - } - }); - } } public static class BinaryUtils From ad780d6547006711de614cf72aab57733824b655 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Oct 2023 00:59:10 +0100 Subject: [PATCH 2/8] Nested NATS default serializer --- .../RawDataSerializer.cs | 10 +- src/NATS.Client.Core/INatsConnection.cs | 4 +- src/NATS.Client.Core/INatsSerializer.cs | 165 +++++++++++++++--- src/NATS.Client.Core/NatsMsg.cs | 2 +- src/NATS.Client.Core/NatsOpts.cs | 2 +- .../NatsJSErrorAwareJsonSerializer.cs | 5 +- 6 files changed, 153 insertions(+), 35 deletions(-) diff --git a/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs b/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs index 6d26036a0..efdfb15fa 100644 --- a/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs +++ b/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs @@ -5,6 +5,8 @@ namespace Example.JetStream.PullConsumer; public class RawDataSerializer : INatsSerializer { + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) { if (value is RawData data) @@ -16,13 +18,11 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) throw new Exception($"Can only work with '{typeof(RawData)}'"); } - public T? Deserialize(in ReadOnlySequence buffer) => (T?)Deserialize(buffer, typeof(T)); - - public object? Deserialize(in ReadOnlySequence buffer, Type type) + public T? Deserialize(in ReadOnlySequence buffer) { - if (type != typeof(RawData)) + if (typeof(T) != typeof(RawData)) throw new Exception($"Can only work with '{typeof(RawData)}'"); - return new RawData(buffer.ToArray()); + return (T)(object)new RawData(buffer.ToArray()); } } diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index e1a43d8f5..05995453f 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -72,7 +72,7 @@ public interface INatsConnection /// Returns the received from the responder as reply. /// Raised when cancellation token is used /// - /// Response can be (null) or one . + /// Response can be (null) or one . /// Reply option's max messages will be set to 1. /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. /// @@ -95,7 +95,7 @@ public interface INatsConnection /// Cancel this request /// Request type /// Reply type - /// An asynchronous enumerable of objects + /// An asynchronous enumerable of objects /// Raised when cancellation token is used /// /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index a8b2d6132..f0fbba874 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; @@ -6,11 +7,11 @@ namespace NATS.Client.Core; public interface INatsSerializer { + public INatsSerializer? Next { get; } + int Serialize(ICountableBufferWriter bufferWriter, T? value); T? Deserialize(in ReadOnlySequence buffer); - - object? Deserialize(in ReadOnlySequence buffer, Type type); } public interface ICountableBufferWriter : IBufferWriter @@ -18,6 +19,143 @@ public interface ICountableBufferWriter : IBufferWriter int WrittenCount { get; } } +public static class NatsDefaultSerializer +{ + public static readonly INatsSerializer Default = + new NatsRawSerializer( + new NatsStringSerializer( + Encoding.UTF8, + NatsJsonSerializer.Default)); +} + +public class NatsRawSerializer : INatsSerializer +{ + public NatsRawSerializer(INatsSerializer? next) => Next = next; + + public INatsSerializer? Next { get; } + + public int Serialize(ICountableBufferWriter bufferWriter, T? value) + { + if (value is byte[] bytes) + { + bufferWriter.Write(bytes); + return bytes.Length; + } + + if (value is Memory memory) + { + bufferWriter.Write(memory.Span); + return memory.Length; + } + + if (value is ReadOnlyMemory readOnlyMemory) + { + bufferWriter.Write(readOnlyMemory.Span); + return readOnlyMemory.Length; + } + + if (value is ReadOnlySequence readOnlySequence) + { + if (readOnlySequence.IsSingleSegment) + { + bufferWriter.Write(readOnlySequence.FirstSpan); + } + else + { + foreach (var source in readOnlySequence) + { + bufferWriter.Write(source.Span); + } + } + + return (int)readOnlySequence.Length; + } + + if (value is IMemoryOwner memoryOwner) + { + using (memoryOwner) + { + bufferWriter.Write(memoryOwner.Memory.Span); + return memoryOwner.Memory.Length; + } + } + + if (Next != null) + return Next.Serialize(bufferWriter, value); + + throw new NatsException($"Can't serialize {typeof(T)}"); + } + + public T? Deserialize(in ReadOnlySequence buffer) + { + if (typeof(T) == typeof(byte[])) + { + return (T)(object)buffer.ToArray(); + } + + if (typeof(T) == typeof(Memory)) + { + return (T)(object)new Memory(buffer.ToArray()); + } + + if (typeof(T) == typeof(ReadOnlyMemory)) + { + return (T)(object)new ReadOnlyMemory(buffer.ToArray()); + } + + if (typeof(T) == typeof(ReadOnlySequence)) + { + return (T)(object)new ReadOnlySequence(buffer.ToArray()); + } + + if (Next != null) + return Next.Deserialize(buffer); + + throw new NatsException($"Can't deserialize {typeof(T)}"); + } +} + +public class NatsStringSerializer : INatsSerializer +{ + private readonly Encoding _encoding; + + public NatsStringSerializer(Encoding encoding, INatsSerializer? next) + { + _encoding = encoding; + Next = next; + } + + public INatsSerializer? Next { get; } + + public int Serialize(ICountableBufferWriter bufferWriter, T? value) + { + if (value is string str) + { + var bytes = _encoding.GetBytes(str); + bufferWriter.Write(bytes); + return bytes.Length; + } + + if (Next != null) + return Next.Serialize(bufferWriter, value); + + throw new NatsException($"Can't serialize {typeof(T)}"); + } + + public T? Deserialize(in ReadOnlySequence buffer) + { + if (typeof(T) == typeof(string)) + { + return (T)(object)_encoding.GetString(buffer.ToArray()); + } + + if (Next != null) + return Next.Deserialize(buffer); + + throw new NatsException($"Can't deserialize {typeof(T)}"); + } +} + public sealed class NatsJsonSerializer : INatsSerializer { private static readonly JsonWriterOptions JsonWriterOpts = new JsonWriterOptions @@ -39,6 +177,8 @@ public sealed class NatsJsonSerializer : INatsSerializer DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, }); + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) { Utf8JsonWriter writer; @@ -65,12 +205,6 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) return JsonSerializer.Deserialize(ref reader, _opts); } - public object? Deserialize(in ReadOnlySequence buffer, Type type) - { - var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate. - return JsonSerializer.Deserialize(ref reader, type, _opts); - } - private sealed class NullBufferWriter : IBufferWriter { internal static readonly IBufferWriter Instance = new NullBufferWriter(); @@ -84,18 +218,3 @@ public void Advance(int count) public Span GetSpan(int sizeHint = 0) => Array.Empty(); } } - -public abstract class NatsBaseSerializer : INatsSerializer -{ - protected NatsBaseSerializer(NatsBaseSerializer next) => Next = next; - - public NatsBaseSerializer Next { get; } - - public abstract int Serialize(ICountableBufferWriter bufferWriter, T? value); - - public abstract T? Deserialize(in ReadOnlySequence buffer); - - public abstract object? Deserialize(in ReadOnlySequence buffer, Type type); -} - -public class diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 43dab1ce6..14bd1c78a 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -89,7 +89,7 @@ public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, /// /// Reply to this message. /// - /// A representing message details. + /// A representing message details. /// A for publishing options. /// A used to cancel the command. /// Specifies the type of data that may be sent to the NATS Server. diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index bda3589ce..936b1e9ac 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -71,7 +71,7 @@ public sealed record NatsOpts Headers: true, AuthOpts: NatsAuthOpts.Default, TlsOpts: NatsTlsOpts.Default, - Serializer: NatsJsonSerializer.Default, + Serializer: NatsDefaultSerializer.Default, LoggerFactory: NullLoggerFactory.Instance, WriterBufferSize: 65534, // 32767 ReaderBufferSize: 1048576, diff --git a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs index 1c1fbb463..f72ef1cd9 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs @@ -9,6 +9,8 @@ internal sealed class NatsJSErrorAwareJsonSerializer : INatsSerializer { public static readonly NatsJSErrorAwareJsonSerializer Default = new(); + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new NotSupportedException(); @@ -27,9 +29,6 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) => return jsonDocument.Deserialize(); } - - public object? Deserialize(in ReadOnlySequence buffer, Type type) => - throw new NotSupportedException(); } internal class NatsJSApiErrorException : Exception From aaa9410de936bf1493e06d731cc1abbaaa43b639 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Oct 2023 01:24:36 +0100 Subject: [PATCH 3/8] Fixed perf test --- tests/NATS.Client.Perf/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/NATS.Client.Perf/Program.cs b/tests/NATS.Client.Perf/Program.cs index 14a1977ae..4e4b0c0af 100644 --- a/tests/NATS.Client.Perf/Program.cs +++ b/tests/NATS.Client.Perf/Program.cs @@ -29,7 +29,7 @@ await nats1.PingAsync(); await nats2.PingAsync(); -await using var sub = await nats1.SubscribeAsync(t.Subject); +await using var sub = await nats1.SubscribeAsync(t.Subject); var stopwatch = Stopwatch.StartNew(); From f2355064a2a490e273bd37dd4d8526a183899f47 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Oct 2023 11:55:06 +0100 Subject: [PATCH 4/8] Micro benchmarks tidy up * Added switcher so we can add more benchmarks * Added a benchmark comparing pub wait-until-sent option --- sandbox/MicroBenchmark/DefaultBench.cs | 169 +++++++++++++++++ sandbox/MicroBenchmark/MicroBenchmark.csproj | 2 +- sandbox/MicroBenchmark/Program.cs | 175 +----------------- .../SerializationBuffersBench.cs | 45 +++++ 4 files changed, 217 insertions(+), 174 deletions(-) create mode 100644 sandbox/MicroBenchmark/DefaultBench.cs create mode 100644 sandbox/MicroBenchmark/SerializationBuffersBench.cs diff --git a/sandbox/MicroBenchmark/DefaultBench.cs b/sandbox/MicroBenchmark/DefaultBench.cs new file mode 100644 index 000000000..717b42ab4 --- /dev/null +++ b/sandbox/MicroBenchmark/DefaultBench.cs @@ -0,0 +1,169 @@ +#pragma warning disable IDE0044 +using System.Text.Json; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using StackExchange.Redis; +using ZLogger; + +namespace MicroBenchmark; + +public struct MyVector3 +{ + public float X { get; set; } + + public float Y { get; set; } + + public float Z { get; set; } +} + +// var run = new DefaultRun(); +// await run.SetupAsync(); +// await run.RunBenchmark(); +// await run.RunStackExchangeRedis(); + +// await run.CleanupAsync(); +#pragma warning disable CS8618 + +[MemoryDiagnoser] +[ShortRunJob] +[PlainExporter] +public class DefaultBench +{ +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. + private NatsConnection _connection; + private string _subject; + private ConnectionMultiplexer _redis; + private object _gate; + private Handler _handler; + private IDisposable _subscription = default!; + + [GlobalSetup] + public async Task SetupAsync() + { + var provider = new ServiceCollection() + .AddLogging(x => + { + x.ClearProviders(); + x.SetMinimumLevel(LogLevel.Information); + x.AddZLoggerConsole(); + }) + .BuildServiceProvider(); + + var loggerFactory = provider.GetRequiredService(); + var logger = loggerFactory.CreateLogger>(); + var options = NatsOpts.Default with + { + LoggerFactory = loggerFactory, + Echo = true, + Verbose = false, + }; + + _connection = new NATS.Client.Core.NatsConnection(options); + _subject = "foobar"; + await _connection.ConnectAsync(); + _gate = new object(); + _redis = StackExchange.Redis.ConnectionMultiplexer.Connect("localhost"); + + _handler = new Handler(); + + // subscription = connection.Subscribe(key, handler.Handle); + } + + // [Benchmark] + public async Task Nop() + { + await Task.Yield(); + } + + [Benchmark] + public async Task PublishAsync() + { + for (var i = 0; i < 1; i++) + { + await _connection.PublishAsync(_subject, default(MyVector3)); + } + } + + // [Benchmark] + public async Task PublishAsyncRedis() + { + for (var i = 0; i < 1; i++) + { + await _redis.GetDatabase().PublishAsync(_subject, JsonSerializer.Serialize(default(MyVector3))); + } + } + + // [Benchmark] + public void RunBenchmark() + { + const int count = 10000; + _handler.Gate = _gate; + _handler.Called = 0; + _handler.Max = count; + + for (var i = 0; i < count; i++) + { + _connection.PublishAsync(_subject, default(MyVector3)); + } + + lock (_gate) + { + // Monitor.Wait(gate); + Thread.Sleep(1000); + } + } + + // [Benchmark] + // public async Task RunStackExchangeRedis() + // { + // var tcs = new TaskCompletionSource(); + // var called = 0; + // redis.GetSubscriber().Subscribe(key.Key, (channel, v) => + // { + // if (Interlocked.Increment(ref called) == 1000) + // { + // tcs.TrySetResult(); + // } + // }); + + // for (int i = 0; i < 1000; i++) + // { + // _ = redis.GetDatabase().PublishAsync(key.Key, JsonSerializer.Serialize(new MyVector3()), StackExchange.Redis.CommandFlags.FireAndForget); + // } + + // await tcs.Task; + // } + [GlobalCleanup] + public async Task CleanupAsync() + { + _subscription?.Dispose(); + if (_connection != null) + { + await _connection.DisposeAsync(); + } + + _redis?.Dispose(); + } + + private class Handler + { +#pragma warning disable SA1401 + public int Called; + public int Max; + public object Gate; +#pragma warning restore SA1401 + + public void Handle(MyVector3 vec) + { + if (Interlocked.Increment(ref Called) == Max) + { + lock (Gate) + { + Monitor.PulseAll(Gate); + } + } + } + } +} diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 62a7481cd..53579c16d 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -9,7 +9,7 @@ - + diff --git a/sandbox/MicroBenchmark/Program.cs b/sandbox/MicroBenchmark/Program.cs index 8cdd34abc..c9a046727 100644 --- a/sandbox/MicroBenchmark/Program.cs +++ b/sandbox/MicroBenchmark/Program.cs @@ -1,174 +1,3 @@ -#pragma warning disable IDE0044 +using BenchmarkDotNet.Running; -using System.Text.Json; -using BenchmarkDotNet.Attributes; -using BenchmarkDotNet.Configs; -using BenchmarkDotNet.Diagnosers; -using BenchmarkDotNet.Exporters; -using BenchmarkDotNet.Jobs; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using NATS.Client.Core; -using StackExchange.Redis; -using ZLogger; - -var config = ManualConfig.CreateMinimumViable() - .AddDiagnoser(MemoryDiagnoser.Default) - .AddExporter(DefaultExporters.Plain) - .AddJob(Job.ShortRun); - -BenchmarkDotNet.Running.BenchmarkRunner.Run(config, args); - -public struct MyVector3 -{ - public float X { get; set; } - - public float Y { get; set; } - - public float Z { get; set; } -} - -// var run = new DefaultRun(); -// await run.SetupAsync(); -// await run.RunBenchmark(); -// await run.RunStackExchangeRedis(); - -// await run.CleanupAsync(); -public class DefaultRun -{ -#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. - private NatsConnection _connection; - private string _subject; - private ConnectionMultiplexer _redis; - private object _gate; - private Handler _handler; - private IDisposable _subscription = default!; - - [GlobalSetup] - public async Task SetupAsync() - { - var provider = new ServiceCollection() - .AddLogging(x => - { - x.ClearProviders(); - x.SetMinimumLevel(LogLevel.Information); - x.AddZLoggerConsole(); - }) - .BuildServiceProvider(); - - var loggerFactory = provider.GetRequiredService(); - var logger = loggerFactory.CreateLogger>(); - var options = NatsOpts.Default with - { - LoggerFactory = loggerFactory, - Echo = true, - Verbose = false, - }; - - _connection = new NATS.Client.Core.NatsConnection(options); - _subject = "foobar"; - await _connection.ConnectAsync(); - _gate = new object(); - _redis = StackExchange.Redis.ConnectionMultiplexer.Connect("localhost"); - - _handler = new Handler(); - - // subscription = connection.Subscribe(key, handler.Handle); - } - - // [Benchmark] - public async Task Nop() - { - await Task.Yield(); - } - - [Benchmark] - public async Task PublishAsync() - { - for (var i = 0; i < 1; i++) - { - await _connection.PublishAsync(_subject, default(MyVector3)); - } - } - - // [Benchmark] - public async Task PublishAsyncRedis() - { - for (var i = 0; i < 1; i++) - { - await _redis.GetDatabase().PublishAsync(_subject, JsonSerializer.Serialize(default(MyVector3))); - } - } - - // [Benchmark] - public void RunBenchmark() - { - const int count = 10000; - _handler.Gate = _gate; - _handler.Called = 0; - _handler.Max = count; - - for (var i = 0; i < count; i++) - { - _connection.PublishAsync(_subject, default(MyVector3)); - } - - lock (_gate) - { - // Monitor.Wait(gate); - Thread.Sleep(1000); - } - } - - // [Benchmark] - // public async Task RunStackExchangeRedis() - // { - // var tcs = new TaskCompletionSource(); - // var called = 0; - // redis.GetSubscriber().Subscribe(key.Key, (channel, v) => - // { - // if (Interlocked.Increment(ref called) == 1000) - // { - // tcs.TrySetResult(); - // } - // }); - - // for (int i = 0; i < 1000; i++) - // { - // _ = redis.GetDatabase().PublishAsync(key.Key, JsonSerializer.Serialize(new MyVector3()), StackExchange.Redis.CommandFlags.FireAndForget); - // } - - // await tcs.Task; - // } - [GlobalCleanup] - public async Task CleanupAsync() - { - _subscription?.Dispose(); - if (_connection != null) - { - await _connection.DisposeAsync(); - } - - _redis?.Dispose(); - } - - private class Handler - { -#pragma warning disable SA1401 - public int Called; - public int Max; - public object Gate; -#pragma warning restore SA1401 - - public void Handle(MyVector3 vec) - { - if (Interlocked.Increment(ref Called) == Max) - { - lock (Gate) - { - Monitor.PulseAll(Gate); - } - } - } - } -} +BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args); diff --git a/sandbox/MicroBenchmark/SerializationBuffersBench.cs b/sandbox/MicroBenchmark/SerializationBuffersBench.cs new file mode 100644 index 000000000..4bd864473 --- /dev/null +++ b/sandbox/MicroBenchmark/SerializationBuffersBench.cs @@ -0,0 +1,45 @@ +using BenchmarkDotNet.Attributes; +using NATS.Client.Core; +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. + +namespace MicroBenchmark; + +[MemoryDiagnoser] +[ShortRunJob] +[PlainExporter] +public class SerializationBuffersBench +{ + private static readonly string Data = new('0', 126); + private static readonly NatsPubOpts OptsWaitUntilSentTrue = new() { WaitUntilSent = true }; + private static readonly NatsPubOpts OptsWaitUntilSentFalse = new() { WaitUntilSent = false }; + + private NatsConnection _nats; + + [Params(64, 512, 1024)] + public int Iter { get; set; } + + [GlobalSetup] + public void Setup() => _nats = new NatsConnection(); + + [Benchmark] + public async ValueTask WaitUntilSentTrue() + { + for (var i = 0; i < Iter; i++) + { + await _nats.PublishAsync("foo", Data, opts: OptsWaitUntilSentTrue); + } + + return await _nats.PingAsync(); + } + + [Benchmark] + public async ValueTask WaitUntilSentFalse() + { + for (var i = 0; i < Iter; i++) + { + await _nats.PublishAsync("foo", Data, opts: OptsWaitUntilSentFalse); + } + + return await _nats.PingAsync(); + } +} From cf285c25a733d015f97f0ddcba37c49c31309b35 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Oct 2023 13:18:40 +0100 Subject: [PATCH 5/8] Removed string serializer String serialized is causing excess allocations. --- src/NATS.Client.Core/INatsSerializer.cs | 47 +------------------------ 1 file changed, 1 insertion(+), 46 deletions(-) diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index f0fbba874..3b564f9de 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -21,11 +21,7 @@ public interface ICountableBufferWriter : IBufferWriter public static class NatsDefaultSerializer { - public static readonly INatsSerializer Default = - new NatsRawSerializer( - new NatsStringSerializer( - Encoding.UTF8, - NatsJsonSerializer.Default)); + public static readonly INatsSerializer Default = new NatsRawSerializer(NatsJsonSerializer.Default); } public class NatsRawSerializer : INatsSerializer @@ -115,47 +111,6 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) } } -public class NatsStringSerializer : INatsSerializer -{ - private readonly Encoding _encoding; - - public NatsStringSerializer(Encoding encoding, INatsSerializer? next) - { - _encoding = encoding; - Next = next; - } - - public INatsSerializer? Next { get; } - - public int Serialize(ICountableBufferWriter bufferWriter, T? value) - { - if (value is string str) - { - var bytes = _encoding.GetBytes(str); - bufferWriter.Write(bytes); - return bytes.Length; - } - - if (Next != null) - return Next.Serialize(bufferWriter, value); - - throw new NatsException($"Can't serialize {typeof(T)}"); - } - - public T? Deserialize(in ReadOnlySequence buffer) - { - if (typeof(T) == typeof(string)) - { - return (T)(object)_encoding.GetString(buffer.ToArray()); - } - - if (Next != null) - return Next.Deserialize(buffer); - - throw new NatsException($"Can't deserialize {typeof(T)}"); - } -} - public sealed class NatsJsonSerializer : INatsSerializer { private static readonly JsonWriterOptions JsonWriterOpts = new JsonWriterOptions From 4709d2d66b9f227d397547af6a60729ec867ff66 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Oct 2023 15:54:23 +0100 Subject: [PATCH 6/8] Rename publishing and empty payload This is to make the purpose of this method more obvious. --- sandbox/ConsoleApp/Program.cs | 2 +- src/NATS.Client.Core/INatsConnection.cs | 15 ++++++++++++++- src/NATS.Client.Core/NatsConnection.Publish.cs | 2 +- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 6 +++--- tests/NATS.Client.TestUtilities/NatsProxy.cs | 2 +- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index cee6c33d4..77b90f3a9 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -95,7 +95,7 @@ public async Task Run() }); await _connection.PingAsync(); - await _connection.PublishAsync("foo"); + await _connection.PublishSentinelAsync("foo"); } } diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index 05995453f..78fdeae18 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -24,7 +24,20 @@ public interface INatsConnection /// A that represents the asynchronous send operation. ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + /// + /// Publishes an empty message payload to the given subject name, optionally supplying a reply subject. + /// + /// The destination subject to publish to. + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + /// + /// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action + /// or indicate an event for example and of messages. + /// + ValueTask PublishSentinelAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index f98f5a573..e62f8b945 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -3,7 +3,7 @@ namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishSentinelAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { if (opts?.WaitUntilSent ?? false) { diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 021a687fa..649ed0bd0 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -134,11 +134,11 @@ void Log(string text) await Retry.Until( "subscription is active", () => Volatile.Read(ref sync) == 1, - async () => await nats.PublishAsync("foo.sync"), + async () => await nats.PublishSentinelAsync("foo.sync"), retryDelay: TimeSpan.FromSeconds(1)); Log("PUB notifications"); - await nats.PublishAsync("foo.signal1"); + await nats.PublishSentinelAsync("foo.signal1"); var msg1 = await signal1; Assert.Equal(0, msg1.Data); Assert.Null(msg1.Headers); @@ -148,7 +148,7 @@ await Retry.Until( Assert.Matches(@"^MSG foo.signal1 \w+ 0␍␊$", msgFrame1.Message); Log("HPUB notifications"); - await nats.PublishAsync("foo.signal2", headers: new NatsHeaders()); + await nats.PublishSentinelAsync("foo.signal2", headers: new NatsHeaders()); var msg2 = await signal2; Assert.Equal(0, msg2.Data); Assert.NotNull(msg2.Headers); diff --git a/tests/NATS.Client.TestUtilities/NatsProxy.cs b/tests/NATS.Client.TestUtilities/NatsProxy.cs index 5695c50e9..4e8e64c8a 100644 --- a/tests/NATS.Client.TestUtilities/NatsProxy.cs +++ b/tests/NATS.Client.TestUtilities/NatsProxy.cs @@ -151,7 +151,7 @@ public async Task FlushFramesAsync(NatsConnection nats) { var subject = $"_SIGNAL_SYNC_{Interlocked.Increment(ref _syncCount)}"; - await nats.PublishAsync(subject); + await nats.PublishSentinelAsync(subject); await Retry.Until( "flush sync frame", From f17b4e5eabe1ba904a1e11d6cd12c7a7b7e8ae63 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Oct 2023 20:40:32 +0100 Subject: [PATCH 7/8] Revert pub sentinel naming Complicates the API. Documentation should be enough. --- sandbox/ConsoleApp/Program.cs | 2 +- src/NATS.Client.Core/INatsConnection.cs | 2 +- src/NATS.Client.Core/NatsConnection.Publish.cs | 2 +- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 6 +++--- tests/NATS.Client.TestUtilities/NatsProxy.cs | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index 77b90f3a9..cee6c33d4 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -95,7 +95,7 @@ public async Task Run() }); await _connection.PingAsync(); - await _connection.PublishSentinelAsync("foo"); + await _connection.PublishAsync("foo"); } } diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index 78fdeae18..f927f2530 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -37,7 +37,7 @@ public interface INatsConnection /// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action /// or indicate an event for example and of messages. /// - ValueTask PublishSentinelAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index e62f8b945..f98f5a573 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -3,7 +3,7 @@ namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask PublishSentinelAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { if (opts?.WaitUntilSent ?? false) { diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 649ed0bd0..021a687fa 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -134,11 +134,11 @@ void Log(string text) await Retry.Until( "subscription is active", () => Volatile.Read(ref sync) == 1, - async () => await nats.PublishSentinelAsync("foo.sync"), + async () => await nats.PublishAsync("foo.sync"), retryDelay: TimeSpan.FromSeconds(1)); Log("PUB notifications"); - await nats.PublishSentinelAsync("foo.signal1"); + await nats.PublishAsync("foo.signal1"); var msg1 = await signal1; Assert.Equal(0, msg1.Data); Assert.Null(msg1.Headers); @@ -148,7 +148,7 @@ await Retry.Until( Assert.Matches(@"^MSG foo.signal1 \w+ 0␍␊$", msgFrame1.Message); Log("HPUB notifications"); - await nats.PublishSentinelAsync("foo.signal2", headers: new NatsHeaders()); + await nats.PublishAsync("foo.signal2", headers: new NatsHeaders()); var msg2 = await signal2; Assert.Equal(0, msg2.Data); Assert.NotNull(msg2.Headers); diff --git a/tests/NATS.Client.TestUtilities/NatsProxy.cs b/tests/NATS.Client.TestUtilities/NatsProxy.cs index 4e8e64c8a..5695c50e9 100644 --- a/tests/NATS.Client.TestUtilities/NatsProxy.cs +++ b/tests/NATS.Client.TestUtilities/NatsProxy.cs @@ -151,7 +151,7 @@ public async Task FlushFramesAsync(NatsConnection nats) { var subject = $"_SIGNAL_SYNC_{Interlocked.Increment(ref _syncCount)}"; - await nats.PublishSentinelAsync(subject); + await nats.PublishAsync(subject); await Retry.Until( "flush sync frame", From 882f993e474d8858971bbaf3e143deb01fda5b8c Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Oct 2023 21:06:42 +0100 Subject: [PATCH 8/8] Fixed test --- tests/NATS.Client.Core.Tests/SerializerTest.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs index 834f978ea..ecdf6555e 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -60,11 +60,11 @@ await nats.PublishAsync( public class TestSerializer : INatsSerializer { + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); public T? Deserialize(in ReadOnlySequence buffer) => throw new TestSerializerException(); - - public object? Deserialize(in ReadOnlySequence buffer, Type type) => throw new TestSerializerException(); } public class TestSerializerException : Exception