From 1220541366849a0f9fbfb05f80044f29c40b01e5 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 6 Oct 2023 09:13:52 +0100 Subject: [PATCH] Support all binary payload APIs with only generics (#143) * API serializer proposal * Nested NATS default serializer * Fixed perf test * Micro benchmarks tidy up * Added switcher so we can add more benchmarks * Added a benchmark comparing pub wait-until-sent option * Removed string serializer String serialized is causing excess allocations. * Rename publishing and empty payload This is to make the purpose of this method more obvious. * Revert pub sentinel naming Complicates the API. Documentation should be enough. * Fixed test --- sandbox/ConsoleApp/Program.cs | 2 +- .../Example.Core.SubscribeHeaders/Program.cs | 4 +- .../Program.cs | 11 +- sandbox/Example.Core.SubscribeRaw/Program.cs | 4 +- .../RawDataSerializer.cs | 10 +- sandbox/MinimumWebApp/Program.cs | 5 +- sandbox/NatsBenchmark/Program.cs | 26 +--- src/NATS.Client.Core/INatsConnection.cs | 91 ++---------- src/NATS.Client.Core/INatsSerializer.cs | 105 ++++++++++++- src/NATS.Client.Core/INatsSub.cs | 27 ---- .../NatsConnection.Publish.cs | 14 +- .../NatsConnection.RequestReply.cs | 51 ------- .../NatsConnection.RequestSub.cs | 25 ---- .../NatsConnection.Subscribe.cs | 8 - src/NATS.Client.Core/NatsMsg.cs | 140 +----------------- src/NATS.Client.Core/NatsOpts.cs | 2 +- src/NATS.Client.Core/NatsRequestExtensions.cs | 41 +---- .../NatsRequestManyExtensions.cs | 35 +---- .../{NatsSub.cs => NatsSubUtils.cs} | 96 +++++------- .../Internal/NatsJSConsume.cs | 4 +- .../NatsJSErrorAwareJsonSerializer.cs | 5 +- .../Internal/NatsJSFetch.cs | 2 +- src/NATS.Client.JetStream/NatsJSMsg.cs | 7 +- .../NatsSubTests.cs | 4 +- .../CancellationTest.cs | 2 +- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 10 +- .../RequestReplyTest.cs | 21 +-- .../NATS.Client.Core.Tests/SerializerTest.cs | 4 +- tests/NATS.Client.Perf/Program.cs | 2 +- tests/NATS.Client.TestUtilities/Utils.cs | 26 ---- 30 files changed, 203 insertions(+), 581 deletions(-) rename src/NATS.Client.Core/{NatsSub.cs => NatsSubUtils.cs} (72%) diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index d670013f2..cee6c33d4 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -84,7 +84,7 @@ public Runner(INatsConnection connection) [RootCommand] public async Task Run() { - var subscription = await _connection.SubscribeAsync("foo"); + var subscription = await _connection.SubscribeAsync("foo"); _ = Task.Run(async () => { diff --git a/sandbox/Example.Core.SubscribeHeaders/Program.cs b/sandbox/Example.Core.SubscribeHeaders/Program.cs index b0d7bb6ed..ab8feee76 100644 --- a/sandbox/Example.Core.SubscribeHeaders/Program.cs +++ b/sandbox/Example.Core.SubscribeHeaders/Program.cs @@ -13,11 +13,11 @@ Print($"[SUB] Subscribing to subject '{subject}'...\n"); -var sub = await connection.SubscribeAsync(subject); +var sub = await connection.SubscribeAsync(subject); await foreach (var msg in sub.Msgs.ReadAllAsync()) { - Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data.Span)}\n"); + Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data!)}\n"); if (msg.Headers != null) { foreach (var (key, values) in msg.Headers) diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs index 5f17ea87e..32cfab0fb 100644 --- a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs +++ b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs @@ -1,5 +1,4 @@ // > nats pub foo.xyz --count=10 "my_message_{{ Count }}" -using System.Text; using Microsoft.Extensions.Logging; using NATS.Client.Core; @@ -12,13 +11,12 @@ await using var connection1 = new NatsConnection(options); Print($"[1][SUB] Subscribing to subject '{subject}'...\n"); -var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers"); +var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers"); var task1 = Task.Run(async () => { await foreach (var msg in sub1.Msgs.ReadAllAsync()) { - var data = Encoding.UTF8.GetString(msg.Data.ToArray()); - Print($"[1][RCV] {msg.Subject}: {data}\n"); + Print($"[1][RCV] {msg.Subject}: {msg.Data}\n"); } }); @@ -28,13 +26,12 @@ await using var connection2 = new NatsConnection(options); Print($"[2][SUB] Subscribing to subject '{subject}'...\n"); -var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers"); +var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers"); var task2 = Task.Run(async () => { await foreach (var msg in sub2.Msgs.ReadAllAsync()) { - var data = Encoding.UTF8.GetString(msg.Data.ToArray()); - Print($"[2][RCV] {msg.Subject}: {data}\n"); + Print($"[2][RCV] {msg.Subject}: {msg.Data}\n"); } }); diff --git a/sandbox/Example.Core.SubscribeRaw/Program.cs b/sandbox/Example.Core.SubscribeRaw/Program.cs index 514c0ea79..7ab1559d8 100644 --- a/sandbox/Example.Core.SubscribeRaw/Program.cs +++ b/sandbox/Example.Core.SubscribeRaw/Program.cs @@ -11,11 +11,11 @@ Print($"[SUB] Subscribing to subject '{subject}'...\n"); -var sub = await connection.SubscribeAsync(subject); +var sub = await connection.SubscribeAsync(subject); await foreach (var msg in sub.Msgs.ReadAllAsync()) { - var data = Encoding.UTF8.GetString(msg.Data.ToArray()); + var data = Encoding.UTF8.GetString(msg.Data!); Print($"[RCV] {msg.Subject}: {data}\n"); } diff --git a/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs b/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs index 6d26036a0..efdfb15fa 100644 --- a/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs +++ b/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs @@ -5,6 +5,8 @@ namespace Example.JetStream.PullConsumer; public class RawDataSerializer : INatsSerializer { + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) { if (value is RawData data) @@ -16,13 +18,11 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) throw new Exception($"Can only work with '{typeof(RawData)}'"); } - public T? Deserialize(in ReadOnlySequence buffer) => (T?)Deserialize(buffer, typeof(T)); - - public object? Deserialize(in ReadOnlySequence buffer, Type type) + public T? Deserialize(in ReadOnlySequence buffer) { - if (type != typeof(RawData)) + if (typeof(T) != typeof(RawData)) throw new Exception($"Can only work with '{typeof(RawData)}'"); - return new RawData(buffer.ToArray()); + return (T)(object)new RawData(buffer.ToArray()); } } diff --git a/sandbox/MinimumWebApp/Program.cs b/sandbox/MinimumWebApp/Program.cs index e62c9e059..d32e7b178 100644 --- a/sandbox/MinimumWebApp/Program.cs +++ b/sandbox/MinimumWebApp/Program.cs @@ -1,4 +1,3 @@ -using System.Text; using NATS.Client.Core; using NATS.Client.Hosting; @@ -11,13 +10,13 @@ app.MapGet("/subscribe", async (INatsConnection command) => { - var subscription = await command.SubscribeAsync("foo"); + var subscription = await command.SubscribeAsync("foo"); _ = Task.Run(async () => { await foreach (var msg in subscription.Msgs.ReadAllAsync()) { - Console.WriteLine($"Received {Encoding.UTF8.GetString(msg.Data.ToArray())}"); + Console.WriteLine($"Received {msg.Data}"); } }); }); diff --git a/sandbox/NatsBenchmark/Program.cs b/sandbox/NatsBenchmark/Program.cs index 7c71b73de..4152738d6 100644 --- a/sandbox/NatsBenchmark/Program.cs +++ b/sandbox/NatsBenchmark/Program.cs @@ -77,7 +77,7 @@ private void RunPubSubBenchmark(string testName, long testCount, long testSize, pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -152,7 +152,7 @@ private void RunPubSubBenchmarkBatch(string testName, long testCount, long testS pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -236,7 +236,7 @@ private void ProfilingRunPubSubBenchmarkAsync(string testName, long testCount, l pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -316,7 +316,7 @@ private void RunPubSubBenchmarkBatchRaw(string testName, long testCount, long te pubConn.ConnectAsync().AsTask().Wait(); subConn.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -421,7 +421,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes pubConn2.ConnectAsync().AsTask().Wait(); subConn2.ConnectAsync().AsTask().Wait(); - var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d = subConn.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount); @@ -435,7 +435,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes } } }); - var d2 = subConn2.SubscribeAsync(_subject).AsTask().Result.Register(_ => + var d2 = subConn2.SubscribeAsync(_subject).AsTask().Result.Register(_ => { Interlocked.Increment(ref subCount2); @@ -824,18 +824,4 @@ internal static class NatsMsgTestUtils }); return sub; } - - internal static INatsSub? Register(this INatsSub? sub, Action action) - { - if (sub == null) - return null; - Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - action(natsMsg); - } - }); - return sub; - } } diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index 647a8b52a..f927f2530 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -12,38 +12,32 @@ public interface INatsConnection ValueTask PingAsync(CancellationToken cancellationToken = default); /// - /// Publishes the message payload to the given subject name, optionally supplying a reply subject. + /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. /// /// The destination subject to publish to. - /// The message payload data. + /// Serializable data object. /// Optional message headers. /// Optional reply-to subject. /// A for publishing options. /// A used to cancel the command. + /// Specifies the type of data that may be sent to the NATS Server. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - - /// - /// Publishes the message payload to the given subject name, optionally supplying a reply subject. - /// - /// A representing message details. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// - /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. + /// Publishes an empty message payload to the given subject name, optionally supplying a reply subject. /// /// The destination subject to publish to. - /// Serializable data object. /// Optional message headers. /// Optional reply-to subject. /// A for publishing options. /// A used to cancel the command. - /// Specifies the type of data that may be sent to the NATS Server. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + /// + /// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action + /// or indicate an event for example and of messages. + /// + ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. @@ -55,21 +49,6 @@ public interface INatsConnection /// A that represents the asynchronous send operation. ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); - /// - /// Initiates a subscription to a subject, optionally joining a distributed queue group. - /// - /// The subject name to subscribe to. - /// If specified, the subscriber will join this queue group. - /// A for subscription options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Subscribers with the same queue group name, become a queue group, - /// and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); - /// /// Initiates a subscription to a subject, optionally joining a distributed queue group. /// @@ -106,7 +85,7 @@ public interface INatsConnection /// Returns the received from the responder as reply. /// Raised when cancellation token is used /// - /// Response can be (null) or one . + /// Response can be (null) or one . /// Reply option's max messages will be set to 1. /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. /// @@ -118,30 +97,6 @@ public interface INatsConnection NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); - /// - /// Request and receive a single reply from a responder. - /// - /// Subject of the responder - /// Payload to send to responder - /// Optional message headers - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// Returns the received from the responder as reply. - /// Raised when cancellation token is used - /// - /// Response can be (null) or one . - /// Reply option's max messages will be set to 1 (one). - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - ValueTask RequestAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default); - /// /// Request and receive zero or more replies from a responder. /// @@ -153,7 +108,7 @@ public interface INatsConnection /// Cancel this request /// Request type /// Reply type - /// An asynchronous enumerable of objects + /// An asynchronous enumerable of objects /// Raised when cancellation token is used /// /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. @@ -165,26 +120,4 @@ public interface INatsConnection NatsPubOpts? requestOpts = default, NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default); - - /// - /// Request and receive zero or more replies from a responder. - /// - /// Subject of the responder - /// Payload to send to responder - /// Optional message headers - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// An asynchronous enumerable of objects - /// Raised when cancellation token is used - /// - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - IAsyncEnumerable RequestManyAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default); } diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index 4b78f71de..3b564f9de 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; @@ -6,11 +7,11 @@ namespace NATS.Client.Core; public interface INatsSerializer { + public INatsSerializer? Next { get; } + int Serialize(ICountableBufferWriter bufferWriter, T? value); T? Deserialize(in ReadOnlySequence buffer); - - object? Deserialize(in ReadOnlySequence buffer, Type type); } public interface ICountableBufferWriter : IBufferWriter @@ -18,6 +19,98 @@ public interface ICountableBufferWriter : IBufferWriter int WrittenCount { get; } } +public static class NatsDefaultSerializer +{ + public static readonly INatsSerializer Default = new NatsRawSerializer(NatsJsonSerializer.Default); +} + +public class NatsRawSerializer : INatsSerializer +{ + public NatsRawSerializer(INatsSerializer? next) => Next = next; + + public INatsSerializer? Next { get; } + + public int Serialize(ICountableBufferWriter bufferWriter, T? value) + { + if (value is byte[] bytes) + { + bufferWriter.Write(bytes); + return bytes.Length; + } + + if (value is Memory memory) + { + bufferWriter.Write(memory.Span); + return memory.Length; + } + + if (value is ReadOnlyMemory readOnlyMemory) + { + bufferWriter.Write(readOnlyMemory.Span); + return readOnlyMemory.Length; + } + + if (value is ReadOnlySequence readOnlySequence) + { + if (readOnlySequence.IsSingleSegment) + { + bufferWriter.Write(readOnlySequence.FirstSpan); + } + else + { + foreach (var source in readOnlySequence) + { + bufferWriter.Write(source.Span); + } + } + + return (int)readOnlySequence.Length; + } + + if (value is IMemoryOwner memoryOwner) + { + using (memoryOwner) + { + bufferWriter.Write(memoryOwner.Memory.Span); + return memoryOwner.Memory.Length; + } + } + + if (Next != null) + return Next.Serialize(bufferWriter, value); + + throw new NatsException($"Can't serialize {typeof(T)}"); + } + + public T? Deserialize(in ReadOnlySequence buffer) + { + if (typeof(T) == typeof(byte[])) + { + return (T)(object)buffer.ToArray(); + } + + if (typeof(T) == typeof(Memory)) + { + return (T)(object)new Memory(buffer.ToArray()); + } + + if (typeof(T) == typeof(ReadOnlyMemory)) + { + return (T)(object)new ReadOnlyMemory(buffer.ToArray()); + } + + if (typeof(T) == typeof(ReadOnlySequence)) + { + return (T)(object)new ReadOnlySequence(buffer.ToArray()); + } + + if (Next != null) + return Next.Deserialize(buffer); + + throw new NatsException($"Can't deserialize {typeof(T)}"); + } +} + public sealed class NatsJsonSerializer : INatsSerializer { private static readonly JsonWriterOptions JsonWriterOpts = new JsonWriterOptions @@ -39,6 +132,8 @@ public sealed class NatsJsonSerializer : INatsSerializer DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, }); + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) { Utf8JsonWriter writer; @@ -65,12 +160,6 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) return JsonSerializer.Deserialize(ref reader, _opts); } - public object? Deserialize(in ReadOnlySequence buffer, Type type) - { - var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate. - return JsonSerializer.Deserialize(ref reader, type, _opts); - } - private sealed class NullBufferWriter : IBufferWriter { internal static readonly IBufferWriter Instance = new NullBufferWriter(); diff --git a/src/NATS.Client.Core/INatsSub.cs b/src/NATS.Client.Core/INatsSub.cs index 91ef3c48f..a74ecfb58 100644 --- a/src/NATS.Client.Core/INatsSub.cs +++ b/src/NATS.Client.Core/INatsSub.cs @@ -2,33 +2,6 @@ namespace NATS.Client.Core; -public interface INatsSub : IAsyncDisposable -{ - /// - /// Access incoming messages for your subscription. - /// - ChannelReader Msgs { get; } - - /// - /// The subject name to subscribe to. - /// - string Subject { get; } - - /// - /// If specified, the subscriber will join this queue group. Subscribers with the same queue group name, - /// become a queue group, and only one randomly chosen subscriber of the queue group will - /// consume a message each time a message is received by the queue group. - /// - string? QueueGroup { get; } - - /// - /// Complete the message channel, stop timers if they were used and send an unsubscribe - /// message to the server. - /// - /// A that represents the asynchronous server UNSUB operation. - public ValueTask UnsubscribeAsync(); -} - public interface INatsSub : IAsyncDisposable { /// diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 79094fa97..4417a5968 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -1,28 +1,20 @@ -using System.Buffers; - namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { if (opts?.WaitUntilSent ?? false) { - return PubAsync(subject, replyTo, payload, headers, cancellationToken); + return PubAsync(subject, replyTo, payload: default, headers, cancellationToken); } else { - return PubPostAsync(subject, replyTo, payload, headers, cancellationToken); + return PubPostAsync(subject, replyTo, payload: default, headers, cancellationToken); } } - /// - public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - return PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken); - } - /// public ValueTask PublishAsync(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index 04a5c6491..c6fb41a1f 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -1,4 +1,3 @@ -using System.Buffers; using System.Runtime.CompilerServices; namespace NATS.Client.Core; @@ -35,30 +34,6 @@ public partial class NatsConnection return null; } - /// - public async ValueTask RequestAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - var opts = SetReplyOptsDefaults(replyOpts); - - await using var sub = await RequestSubAsync(subject, payload, headers, requestOpts, opts, cancellationToken).ConfigureAwait(false); - - if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - if (sub.Msgs.TryRead(out var msg)) - { - return msg; - } - } - - return null; - } - /// public async IAsyncEnumerable> RequestManyAsync( string subject, @@ -86,32 +61,6 @@ public partial class NatsConnection } } - /// - public async IAsyncEnumerable RequestManyAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - await using var sub = await RequestSubAsync(subject, payload, headers, requestOpts, replyOpts, cancellationToken).ConfigureAwait(false); - - while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - while (sub.Msgs.TryRead(out var msg)) - { - // Received end of stream sentinel - if (msg.Data.Length == 0) - { - yield break; - } - - yield return msg; - } - } - } - private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts) { var opts = replyOpts ?? DefaultReplyOpts; diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index 654348e35..50a84fa26 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -1,32 +1,7 @@ -using System.Buffers; - namespace NATS.Client.Core; public partial class NatsConnection { - internal async ValueTask RequestSubAsync( - string subject, - ReadOnlySequence payload = default, - NatsHeaders? headers = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; - var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts); - await SubAsync(replyTo, queueGroup: default, replyOpts, sub, cancellationToken).ConfigureAwait(false); - if (requestOpts?.WaitUntilSent == true) - { - await PubAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); - } - else - { - await PubPostAsync(subject, replyTo, payload, headers, cancellationToken).ConfigureAwait(false); - } - - return sub; - } - internal async ValueTask> RequestSubAsync( string subject, TRequest? data, diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index f849cb331..3e6460420 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -2,14 +2,6 @@ namespace NATS.Client.Core; public partial class NatsConnection { - /// - public async ValueTask SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) - { - var sub = new NatsSub(this, SubscriptionManager, subject, queueGroup, opts); - await SubAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false); - return sub; - } - /// public async ValueTask> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 67edc2b61..14bd1c78a 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -3,110 +3,6 @@ namespace NATS.Client.Core; -/// -/// NATS message structure as defined by the protocol. -/// -/// The destination subject to publish to. -/// The reply subject that subscribers can use to send a response back to the publisher/requester. -/// Message size in bytes. -/// Pass additional information using name-value pairs. -/// The message payload data. -/// NATS connection this message is associated to. -/// -/// Connection property is used to provide reply functionality. -/// -/// Message size is calculated using the same method NATS server uses: -/// -/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; -/// -/// -/// -public readonly record struct NatsMsg( - string Subject, - string? ReplyTo, - int Size, - NatsHeaders? Headers, - ReadOnlyMemory Data, - INatsConnection? Connection) -{ - internal static NatsMsg Build( - string subject, - string? replyTo, - in ReadOnlySequence? headersBuffer, - in ReadOnlySequence payloadBuffer, - INatsConnection? connection, - NatsHeaderParser headerParser) - { - NatsHeaders? headers = null; - - if (headersBuffer != null) - { - headers = new NatsHeaders(); - if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) - { - throw new NatsException("Error parsing headers"); - } - - headers.SetReadOnly(); - } - - var size = subject.Length - + (replyTo?.Length ?? 0) - + (headersBuffer?.Length ?? 0) - + payloadBuffer.Length; - - return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); - } - - /// - /// Reply to this message. - /// - /// The message payload data. - /// Optional message headers. - /// Optional reply-to subject. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload, headers, replyTo, opts, cancellationToken); - } - - /// - /// Reply to this message. - /// - /// A representing message details. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); - } - - [MemberNotNull(nameof(Connection))] - private void CheckReplyPreconditions() - { - if (Connection == default) - { - throw new NatsException("unable to send reply; message did not originate from a subscription"); - } - - if (string.IsNullOrWhiteSpace(ReplyTo)) - { - throw new NatsException("unable to send reply; ReplyTo is empty"); - } - } -} - /// /// NATS message structure as defined by the protocol. /// @@ -193,7 +89,7 @@ public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, /// /// Reply to this message. /// - /// A representing message details. + /// A representing message details. /// A for publishing options. /// A used to cancel the command. /// Specifies the type of data that may be sent to the NATS Server. @@ -207,40 +103,6 @@ public ValueTask ReplyAsync(NatsMsg msg, NatsPubOpts? opts = def return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); } - /// - /// Reply to this message. - /// - /// The message payload data. - /// Optional message headers. - /// Optional reply-to subject. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(ReadOnlySequence payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo!, payload: payload, headers, replyTo, opts, cancellationToken); - } - - /// - /// Reply to this message. - /// - /// A representing message details. - /// A for publishing options. - /// A used to cancel the command. - /// A that represents the asynchronous send operation. - /// - /// Publishes a new message using the reply-to subject from the this message as the destination subject. - /// - public ValueTask ReplyAsync(NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - CheckReplyPreconditions(); - return Connection.PublishAsync(msg with { Subject = ReplyTo! }, opts, cancellationToken); - } - [MemberNotNull(nameof(Connection))] private void CheckReplyPreconditions() { diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index bda3589ce..936b1e9ac 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -71,7 +71,7 @@ public sealed record NatsOpts Headers: true, AuthOpts: NatsAuthOpts.Default, TlsOpts: NatsTlsOpts.Default, - Serializer: NatsJsonSerializer.Default, + Serializer: NatsDefaultSerializer.Default, LoggerFactory: NullLoggerFactory.Instance, WriterBufferSize: 65534, // 32767 ReaderBufferSize: 1048576, diff --git a/src/NATS.Client.Core/NatsRequestExtensions.cs b/src/NATS.Client.Core/NatsRequestExtensions.cs index 99265e822..117041f94 100644 --- a/src/NATS.Client.Core/NatsRequestExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestExtensions.cs @@ -1,5 +1,3 @@ -using System.Buffers; - namespace NATS.Client.Core; public static class NatsRequestExtensions @@ -17,7 +15,7 @@ public static class NatsRequestExtensions /// Returns the received from the responder as reply. /// Raised when cancellation token is used /// - /// Response can be (null) or one . + /// Response can be (null) or one . /// Reply option's max messages will be set to 1. /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. /// @@ -39,48 +37,13 @@ public static class NatsRequestExtensions cancellationToken); } - /// - /// Request and receive a single reply from a responder. - /// - /// NATS connection - /// Message to be sent as request - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// Returns the received from the responder as reply. - /// Raised when cancellation token is used - /// - /// Response can be (null) or one . - /// Reply option's max messages will be set to 1. - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - public static ValueTask RequestAsync( - this INatsConnection nats, - in NatsMsg msg, - in NatsPubOpts? requestOpts = default, - in NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - CheckMsgForRequestReply(msg); - - return nats.RequestAsync( - msg.Subject, - payload: new ReadOnlySequence(msg.Data), - msg.Headers, - requestOpts, - replyOpts, - cancellationToken); - } - - internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); - internal static void CheckMsgForRequestReply(in NatsMsg msg) => CheckForRequestReply(msg.ReplyTo); private static void CheckForRequestReply(string? replyTo) { if (!string.IsNullOrWhiteSpace(replyTo)) { - throw new NatsException($"Can't set {nameof(NatsMsg.ReplyTo)} for a request"); + throw new NatsException($"Can't set reply-to for a request"); } } } diff --git a/src/NATS.Client.Core/NatsRequestManyExtensions.cs b/src/NATS.Client.Core/NatsRequestManyExtensions.cs index 425c7e0bc..967feeb25 100644 --- a/src/NATS.Client.Core/NatsRequestManyExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestManyExtensions.cs @@ -1,40 +1,7 @@ -using System.Buffers; - namespace NATS.Client.Core; public static class NatsRequestManyExtensions { - /// - /// Request and receive zero or more replies from a responder. - /// - /// NATS connection - /// Message to be sent as request - /// Request publish options - /// Reply handler subscription options - /// Cancel this request - /// An asynchronous enumerable of objects - /// Raised when cancellation token is used - /// - /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. - /// - public static IAsyncEnumerable RequestManyAsync( - this INatsConnection nats, - NatsMsg msg, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - NatsRequestExtensions.CheckMsgForRequestReply(msg); - - return nats.RequestManyAsync( - msg.Subject, - payload: new ReadOnlySequence(msg.Data), - msg.Headers, - requestOpts, - replyOpts, - cancellationToken); - } - /// /// Request and receive zero or more replies from a responder. /// @@ -45,7 +12,7 @@ public static IAsyncEnumerable RequestManyAsync( /// Cancel this request /// Request type /// Reply type - /// An asynchronous enumerable of objects + /// An asynchronous enumerable of objects /// Raised when cancellation token is used /// /// if reply option's timeout is not defined then it will be set to NatsOpts.RequestTimeout. diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSubUtils.cs similarity index 72% rename from src/NATS.Client.Core/NatsSub.cs rename to src/NATS.Client.Core/NatsSubUtils.cs index f7ef07f65..5ee655f57 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSubUtils.cs @@ -5,66 +5,6 @@ namespace NATS.Client.Core; -public sealed class NatsSub : NatsSubBase, INatsSub -{ - private static readonly BoundedChannelOptions DefaultChannelOpts = - new BoundedChannelOptions(1_000) - { - FullMode = BoundedChannelFullMode.Wait, - SingleWriter = true, - SingleReader = false, - AllowSynchronousContinuations = false, - }; - - private readonly Channel _msgs; - - internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, string? queueGroup, NatsSubOpts? opts) - : base(connection, manager, subject, queueGroup, opts) => - _msgs = Channel.CreateBounded( - GetChannelOpts(opts?.ChannelOpts)); - - public ChannelReader Msgs => _msgs.Reader; - - internal static BoundedChannelOptions GetChannelOpts( - NatsSubChannelOpts? subChannelOpts) - { - if (subChannelOpts is { } overrideOpts) - { - return new BoundedChannelOptions(overrideOpts.Capacity ?? - DefaultChannelOpts.Capacity) - { - AllowSynchronousContinuations = - DefaultChannelOpts.AllowSynchronousContinuations, - FullMode = - overrideOpts.FullMode ?? DefaultChannelOpts.FullMode, - SingleWriter = DefaultChannelOpts.SingleWriter, - SingleReader = DefaultChannelOpts.SingleReader, - }; - } - else - { - return DefaultChannelOpts; - } - } - - protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) - { - var natsMsg = NatsMsg.Build( - subject, - replyTo, - headersBuffer, - payloadBuffer, - Connection, - Connection.HeaderParser); - - await _msgs.Writer.WriteAsync(natsMsg).ConfigureAwait(false); - - DecrementMaxMsgs(); - } - - protected override void TryComplete() => _msgs.Writer.TryComplete(); -} - public sealed class NatsSub : NatsSubBase, INatsSub { private readonly Channel> _msgs; @@ -79,7 +19,7 @@ internal NatsSub( : base(connection, manager, subject, queueGroup, opts) { _msgs = Channel.CreateBounded>( - NatsSub.GetChannelOpts(opts?.ChannelOpts)); + NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); Serializer = serializer; } @@ -123,3 +63,37 @@ public NatsSubException(string message, ExceptionDispatchInfo exception, Memory< public Memory Headers { get; } } + +internal sealed class NatsSubUtils +{ + private static readonly BoundedChannelOptions DefaultChannelOpts = + new BoundedChannelOptions(1_000) + { + FullMode = BoundedChannelFullMode.Wait, + SingleWriter = true, + SingleReader = false, + AllowSynchronousContinuations = false, + }; + + internal static BoundedChannelOptions GetChannelOpts( + NatsSubChannelOpts? subChannelOpts) + { + if (subChannelOpts is { } overrideOpts) + { + return new BoundedChannelOptions(overrideOpts.Capacity ?? + DefaultChannelOpts.Capacity) + { + AllowSynchronousContinuations = + DefaultChannelOpts.AllowSynchronousContinuations, + FullMode = + overrideOpts.FullMode ?? DefaultChannelOpts.FullMode, + SingleWriter = DefaultChannelOpts.SingleWriter, + SingleReader = DefaultChannelOpts.SingleReader, + }; + } + else + { + return DefaultChannelOpts; + } + } +} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index e8c268b9b..081e50eb8 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -105,10 +105,10 @@ public NatsJSConsume( Timeout.Infinite, Timeout.Infinite); - _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOpts(opts?.ChannelOpts)); + _userMsgs = Channel.CreateBounded>(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); Msgs = _userMsgs.Reader; - _pullRequests = Channel.CreateBounded(NatsSub.GetChannelOpts(opts?.ChannelOpts)); + _pullRequests = Channel.CreateBounded(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); _pullTask = Task.Run(PullLoop); ResetPending(); diff --git a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs index 1c1fbb463..f72ef1cd9 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs @@ -9,6 +9,8 @@ internal sealed class NatsJSErrorAwareJsonSerializer : INatsSerializer { public static readonly NatsJSErrorAwareJsonSerializer Default = new(); + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new NotSupportedException(); @@ -27,9 +29,6 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) => return jsonDocument.Deserialize(); } - - public object? Deserialize(in ReadOnlySequence buffer, Type type) => - throw new NotSupportedException(); } internal class NatsJSApiErrorException : Exception diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index a189be65e..08fcf32e9 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -57,7 +57,7 @@ public NatsJSFetch( _pendingMsgs = _maxMsgs; _pendingBytes = _maxBytes; - _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOpts(opts?.ChannelOpts)); + _userMsgs = Channel.CreateBounded>(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); Msgs = _userMsgs.Reader; if (_debug) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 5b5d040ec..ee657083e 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -108,8 +108,11 @@ private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = de throw new NatsJSException("No user message, can't acknowledge"); return _msg.ReplyAsync( - payload: payload, - opts: new NatsPubOpts { WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent }, + data: payload, + opts: new NatsPubOpts + { + WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, + }, cancellationToken: cancellationToken); } } diff --git a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs index bab7caaad..4290303f9 100644 --- a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs +++ b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs @@ -16,7 +16,7 @@ public void Subject_manager_should_not_hold_on_to_subscription_if_collected() async Task Isolator() { // Subscription is not being disposed here - var natsSub = await nats.SubscribeAsync("foo"); + var natsSub = await nats.SubscribeAsync("foo"); Assert.That(natsSub.Subject, Is.EqualTo("foo")); } @@ -26,7 +26,7 @@ async Task Isolator() dotMemory.Check(memory => { - var count = memory.GetObjects(where => where.Type.Is()).ObjectsCount; + var count = memory.GetObjects(where => where.Type.Is()).ObjectsCount; Assert.That(count, Is.EqualTo(0)); }); } diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs index e6e63ce11..3a51ad62f 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs @@ -21,7 +21,7 @@ public async Task CommandTimeoutTest() await using var pubConnection = server.CreateClientConnection(NatsOpts.Default with { CommandTimeout = TimeSpan.FromSeconds(1) }); await pubConnection.ConnectAsync(); - await subConnection.SubscribeAsync("foo"); + await subConnection.SubscribeAsync("foo"); var cmd = new SleepWriteCommand("PUB foo 5\r\naiueo", TimeSpan.FromSeconds(10)); pubConnection.PostDirectWrite(cmd); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index b045b31cd..021a687fa 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -112,9 +112,9 @@ void Log(string text) var (nats, proxy) = server.CreateProxiedClientConnection(); var sync = 0; - var signal1 = new WaitSignal(); - var signal2 = new WaitSignal(); - var sub = await nats.SubscribeAsync("foo.*"); + var signal1 = new WaitSignal>(); + var signal2 = new WaitSignal>(); + var sub = await nats.SubscribeAsync("foo.*"); var reg = sub.Register(m => { switch (m.Subject) @@ -140,7 +140,7 @@ await Retry.Until( Log("PUB notifications"); await nats.PublishAsync("foo.signal1"); var msg1 = await signal1; - Assert.Equal(0, msg1.Data.Length); + Assert.Equal(0, msg1.Data); Assert.Null(msg1.Headers); var pubFrame1 = proxy.Frames.First(f => f.Message.StartsWith("PUB foo.signal1")); Assert.Equal("PUB foo.signal1 0␍␊", pubFrame1.Message); @@ -150,7 +150,7 @@ await Retry.Until( Log("HPUB notifications"); await nats.PublishAsync("foo.signal2", headers: new NatsHeaders()); var msg2 = await signal2; - Assert.Equal(0, msg2.Data.Length); + Assert.Equal(0, msg2.Data); Assert.NotNull(msg2.Headers); Assert.Empty(msg2.Headers!); var pubFrame2 = proxy.Frames.First(f => f.Message.StartsWith("HPUB foo.signal2")); diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index f60b4993e..5c5e52954 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -260,11 +260,6 @@ public async Task Request_reply_many_test_sentinel() [Fact] public async Task Request_reply_binary_test() { - static ReadOnlySequence ToSeq(string input) - { - return new ReadOnlySequence(Encoding.ASCII.GetBytes(input)); - } - static string ToStr(ReadOnlyMemory input) { return Encoding.ASCII.GetString(input.Span); @@ -274,22 +269,22 @@ static string ToStr(ReadOnlyMemory input) await using var server = NatsServer.Start(); await using var nats = server.CreateClientConnection(); - await using var sub = await nats.SubscribeAsync("foo", cancellationToken: cts.Token); + await using var sub = await nats.SubscribeAsync("foo", cancellationToken: cts.Token); var reg = sub.Register(async m => { - if (ToStr(m.Data) == "1") + if (m.Data == "1") { - await m.ReplyAsync(payload: ToSeq("qw"), cancellationToken: cts.Token); - await m.ReplyAsync(payload: ToSeq("er"), cancellationToken: cts.Token); - await m.ReplyAsync(payload: ToSeq("ty"), cancellationToken: cts.Token); - await m.ReplyAsync(payload: default, cancellationToken: cts.Token); // sentinel + await m.ReplyAsync("qw", cancellationToken: cts.Token); + await m.ReplyAsync("er", cancellationToken: cts.Token); + await m.ReplyAsync("ty", cancellationToken: cts.Token); + await m.ReplyAsync(default(string), cancellationToken: cts.Token); // sentinel } }); var writer = new ArrayBufferWriter(); - await foreach (var msg in nats.RequestManyAsync("foo", ToSeq("1"), cancellationToken: cts.Token)) + await foreach (var msg in nats.RequestManyAsync("foo", "1", cancellationToken: cts.Token)) { - writer.Write(msg.Data.Span); + writer.Write(Encoding.UTF8.GetBytes(msg.Data!)); } var buffer = ToStr(writer.WrittenMemory); diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs index 834f978ea..ecdf6555e 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -60,11 +60,11 @@ await nats.PublishAsync( public class TestSerializer : INatsSerializer { + public INatsSerializer? Next => default; + public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); public T? Deserialize(in ReadOnlySequence buffer) => throw new TestSerializerException(); - - public object? Deserialize(in ReadOnlySequence buffer, Type type) => throw new TestSerializerException(); } public class TestSerializerException : Exception diff --git a/tests/NATS.Client.Perf/Program.cs b/tests/NATS.Client.Perf/Program.cs index 36fb9f142..4e4b0c0af 100644 --- a/tests/NATS.Client.Perf/Program.cs +++ b/tests/NATS.Client.Perf/Program.cs @@ -29,7 +29,7 @@ await nats1.PingAsync(); await nats2.PingAsync(); -await using var sub = await nats1.SubscribeAsync(t.Subject); +await using var sub = await nats1.SubscribeAsync(t.Subject); var stopwatch = Stopwatch.StartNew(); diff --git a/tests/NATS.Client.TestUtilities/Utils.cs b/tests/NATS.Client.TestUtilities/Utils.cs index 517349f22..a5315ffc5 100644 --- a/tests/NATS.Client.TestUtilities/Utils.cs +++ b/tests/NATS.Client.TestUtilities/Utils.cs @@ -72,32 +72,6 @@ public static Task Register(this INatsSub? sub, Func, Task> ac } }); } - - public static Task Register(this INatsSub? sub, Action action) - { - if (sub == null) - return Task.CompletedTask; - return Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - action(natsMsg); - } - }); - } - - public static Task Register(this INatsSub? sub, Func action) - { - if (sub == null) - return Task.CompletedTask; - return Task.Run(async () => - { - await foreach (var natsMsg in sub.Msgs.ReadAllAsync()) - { - await action(natsMsg); - } - }); - } } public static class BinaryUtils