From e85e9600159e40e53af9989c456675b872cdad55 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 30 Sep 2023 02:53:33 +0100 Subject: [PATCH 1/3] Raise publish serialization exception early When publishing, if there are serialization errors we need to throw the exception at the point of publish call being made. This changes buffer handling quite a bit. --- .../Commands/PublishCommand.cs | 10 +++- .../NatsPipeliningWriteProtocolProcessor.cs | 19 ++++++-- .../NATS.Client.Core.Tests/SerializerTest.cs | 46 +++++++++++++++++++ 3 files changed, 69 insertions(+), 6 deletions(-) create mode 100644 tests/NATS.Client.Core.Tests/SerializerTest.cs diff --git a/src/NATS.Client.Core/Commands/PublishCommand.cs b/src/NATS.Client.Core/Commands/PublishCommand.cs index ecea042ed..e6facb783 100644 --- a/src/NATS.Client.Core/Commands/PublishCommand.cs +++ b/src/NATS.Client.Core/Commands/PublishCommand.cs @@ -5,6 +5,9 @@ namespace NATS.Client.Core.Commands; internal sealed class PublishCommand : CommandBase> { + // This buffer will be pooled with this command object + private readonly FixedArrayBufferWriter _buffer = new(); + private string? _subject; private string? _replyTo; private NatsHeaders? _headers; @@ -32,12 +35,17 @@ public static PublishCommand Create(ObjectPool pool, string subject, string? result._serializer = serializer; result._cancellationToken = cancellationToken; + // Serialize data as soon as possible to propagate any exceptions + // to the caller so that publish method will throw the exception + result._buffer.Reset(); + serializer.Serialize(result._buffer, value); + return result; } public override void Write(ProtocolWriter writer) { - writer.WritePublish(_subject!, _replyTo, _headers, _value, _serializer!); + writer.WritePublish(_subject!, _replyTo, _headers, new ReadOnlySequence(_buffer.WrittenMemory)); } protected override void Reset() diff --git a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs index 8ea96dcdd..33227ee3d 100644 --- a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs @@ -129,14 +129,23 @@ private async Task WriteLoopAsync() continue; } - if (command is IBatchCommand batch) + try { - count += batch.Write(protocolWriter); + if (command is IBatchCommand batch) + { + count += batch.Write(protocolWriter); + } + else + { + command.Write(protocolWriter); + count++; + } } - else + catch (Exception e) { - command.Write(protocolWriter); - count++; + // flag potential serialization exceptions + ((IPromise)command).SetException(e); + continue; } if (command is IPromise p) diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs new file mode 100644 index 000000000..4eeda6b12 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -0,0 +1,46 @@ +using System.Buffers; + +namespace NATS.Client.Core.Tests; + +public class SerializerTest +{ + private readonly ITestOutputHelper _output; + + public SerializerTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Serializer_exceptions() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + await Assert.ThrowsAsync(async () => + { + await nats.PublishAsync( + "foo", + 0, + opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = false, }); + }); + + await Assert.ThrowsAsync(async () => + { + await nats.PublishAsync( + "foo", + 0, + opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = true, }); + }); + } +} + +public class TestSerializer : INatsSerializer +{ + public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); + + public T? Deserialize(in ReadOnlySequence buffer) => throw new TestSerializerException(); + + public object? Deserialize(in ReadOnlySequence buffer, Type type) => throw new TestSerializerException(); +} + +public class TestSerializerException : Exception +{ +} From 3e53bd3eaace2de9ef8caf15e8d37ee3c2cefbe0 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 3 Oct 2023 09:40:14 +0100 Subject: [PATCH 2/3] Early serialization option --- sandbox/MicroBenchmark/DefaultRun.cs | 169 +++++++++++++++++ sandbox/MicroBenchmark/MicroBenchmark.csproj | 2 +- sandbox/MicroBenchmark/Program.cs | 175 +----------------- .../SerializationBuffersBench.cs | 67 +++++++ .../Commands/ProtocolWriter.cs | 19 +- .../Commands/PublishCommand.cs | 60 ++++-- .../NatsConnection.LowLevelApi.cs | 8 +- .../NatsConnection.Publish.cs | 2 +- .../NatsConnection.RequestSub.cs | 2 +- src/NATS.Client.Core/NatsConnection.cs | 6 + src/NATS.Client.Core/NatsPubOpts.cs | 2 + .../Internal/NatsJSConsume.cs | 1 + .../Internal/NatsJSFetch.cs | 1 + 13 files changed, 316 insertions(+), 198 deletions(-) create mode 100644 sandbox/MicroBenchmark/DefaultRun.cs create mode 100644 sandbox/MicroBenchmark/SerializationBuffersBench.cs diff --git a/sandbox/MicroBenchmark/DefaultRun.cs b/sandbox/MicroBenchmark/DefaultRun.cs new file mode 100644 index 000000000..1e73aab6b --- /dev/null +++ b/sandbox/MicroBenchmark/DefaultRun.cs @@ -0,0 +1,169 @@ +#pragma warning disable IDE0044 +using System.Text.Json; +using BenchmarkDotNet.Attributes; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using StackExchange.Redis; +using ZLogger; + +namespace MicroBenchmark; + +public struct MyVector3 +{ + public float X { get; set; } + + public float Y { get; set; } + + public float Z { get; set; } +} + +// var run = new DefaultRun(); +// await run.SetupAsync(); +// await run.RunBenchmark(); +// await run.RunStackExchangeRedis(); + +// await run.CleanupAsync(); +#pragma warning disable CS8618 + +[MemoryDiagnoser] +[ShortRunJob] +[PlainExporter] +public class DefaultRun +{ +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. + private NatsConnection _connection; + private string _subject; + private ConnectionMultiplexer _redis; + private object _gate; + private Handler _handler; + private IDisposable _subscription = default!; + + [GlobalSetup] + public async Task SetupAsync() + { + var provider = new ServiceCollection() + .AddLogging(x => + { + x.ClearProviders(); + x.SetMinimumLevel(LogLevel.Information); + x.AddZLoggerConsole(); + }) + .BuildServiceProvider(); + + var loggerFactory = provider.GetRequiredService(); + var logger = loggerFactory.CreateLogger>(); + var options = NatsOpts.Default with + { + LoggerFactory = loggerFactory, + Echo = true, + Verbose = false, + }; + + _connection = new NATS.Client.Core.NatsConnection(options); + _subject = "foobar"; + await _connection.ConnectAsync(); + _gate = new object(); + _redis = StackExchange.Redis.ConnectionMultiplexer.Connect("localhost"); + + _handler = new Handler(); + + // subscription = connection.Subscribe(key, handler.Handle); + } + + // [Benchmark] + public async Task Nop() + { + await Task.Yield(); + } + + [Benchmark] + public async Task PublishAsync() + { + for (var i = 0; i < 1; i++) + { + await _connection.PublishAsync(_subject, default(MyVector3)); + } + } + + // [Benchmark] + public async Task PublishAsyncRedis() + { + for (var i = 0; i < 1; i++) + { + await _redis.GetDatabase().PublishAsync(_subject, JsonSerializer.Serialize(default(MyVector3))); + } + } + + // [Benchmark] + public void RunBenchmark() + { + const int count = 10000; + _handler.Gate = _gate; + _handler.Called = 0; + _handler.Max = count; + + for (var i = 0; i < count; i++) + { + _connection.PublishAsync(_subject, default(MyVector3)); + } + + lock (_gate) + { + // Monitor.Wait(gate); + Thread.Sleep(1000); + } + } + + // [Benchmark] + // public async Task RunStackExchangeRedis() + // { + // var tcs = new TaskCompletionSource(); + // var called = 0; + // redis.GetSubscriber().Subscribe(key.Key, (channel, v) => + // { + // if (Interlocked.Increment(ref called) == 1000) + // { + // tcs.TrySetResult(); + // } + // }); + + // for (int i = 0; i < 1000; i++) + // { + // _ = redis.GetDatabase().PublishAsync(key.Key, JsonSerializer.Serialize(new MyVector3()), StackExchange.Redis.CommandFlags.FireAndForget); + // } + + // await tcs.Task; + // } + [GlobalCleanup] + public async Task CleanupAsync() + { + _subscription?.Dispose(); + if (_connection != null) + { + await _connection.DisposeAsync(); + } + + _redis?.Dispose(); + } + + private class Handler + { +#pragma warning disable SA1401 + public int Called; + public int Max; + public object Gate; +#pragma warning restore SA1401 + + public void Handle(MyVector3 vec) + { + if (Interlocked.Increment(ref Called) == Max) + { + lock (Gate) + { + Monitor.PulseAll(Gate); + } + } + } + } +} diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 62a7481cd..53579c16d 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -9,7 +9,7 @@ - + diff --git a/sandbox/MicroBenchmark/Program.cs b/sandbox/MicroBenchmark/Program.cs index 8cdd34abc..c9a046727 100644 --- a/sandbox/MicroBenchmark/Program.cs +++ b/sandbox/MicroBenchmark/Program.cs @@ -1,174 +1,3 @@ -#pragma warning disable IDE0044 +using BenchmarkDotNet.Running; -using System.Text.Json; -using BenchmarkDotNet.Attributes; -using BenchmarkDotNet.Configs; -using BenchmarkDotNet.Diagnosers; -using BenchmarkDotNet.Exporters; -using BenchmarkDotNet.Jobs; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using NATS.Client.Core; -using StackExchange.Redis; -using ZLogger; - -var config = ManualConfig.CreateMinimumViable() - .AddDiagnoser(MemoryDiagnoser.Default) - .AddExporter(DefaultExporters.Plain) - .AddJob(Job.ShortRun); - -BenchmarkDotNet.Running.BenchmarkRunner.Run(config, args); - -public struct MyVector3 -{ - public float X { get; set; } - - public float Y { get; set; } - - public float Z { get; set; } -} - -// var run = new DefaultRun(); -// await run.SetupAsync(); -// await run.RunBenchmark(); -// await run.RunStackExchangeRedis(); - -// await run.CleanupAsync(); -public class DefaultRun -{ -#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. - private NatsConnection _connection; - private string _subject; - private ConnectionMultiplexer _redis; - private object _gate; - private Handler _handler; - private IDisposable _subscription = default!; - - [GlobalSetup] - public async Task SetupAsync() - { - var provider = new ServiceCollection() - .AddLogging(x => - { - x.ClearProviders(); - x.SetMinimumLevel(LogLevel.Information); - x.AddZLoggerConsole(); - }) - .BuildServiceProvider(); - - var loggerFactory = provider.GetRequiredService(); - var logger = loggerFactory.CreateLogger>(); - var options = NatsOpts.Default with - { - LoggerFactory = loggerFactory, - Echo = true, - Verbose = false, - }; - - _connection = new NATS.Client.Core.NatsConnection(options); - _subject = "foobar"; - await _connection.ConnectAsync(); - _gate = new object(); - _redis = StackExchange.Redis.ConnectionMultiplexer.Connect("localhost"); - - _handler = new Handler(); - - // subscription = connection.Subscribe(key, handler.Handle); - } - - // [Benchmark] - public async Task Nop() - { - await Task.Yield(); - } - - [Benchmark] - public async Task PublishAsync() - { - for (var i = 0; i < 1; i++) - { - await _connection.PublishAsync(_subject, default(MyVector3)); - } - } - - // [Benchmark] - public async Task PublishAsyncRedis() - { - for (var i = 0; i < 1; i++) - { - await _redis.GetDatabase().PublishAsync(_subject, JsonSerializer.Serialize(default(MyVector3))); - } - } - - // [Benchmark] - public void RunBenchmark() - { - const int count = 10000; - _handler.Gate = _gate; - _handler.Called = 0; - _handler.Max = count; - - for (var i = 0; i < count; i++) - { - _connection.PublishAsync(_subject, default(MyVector3)); - } - - lock (_gate) - { - // Monitor.Wait(gate); - Thread.Sleep(1000); - } - } - - // [Benchmark] - // public async Task RunStackExchangeRedis() - // { - // var tcs = new TaskCompletionSource(); - // var called = 0; - // redis.GetSubscriber().Subscribe(key.Key, (channel, v) => - // { - // if (Interlocked.Increment(ref called) == 1000) - // { - // tcs.TrySetResult(); - // } - // }); - - // for (int i = 0; i < 1000; i++) - // { - // _ = redis.GetDatabase().PublishAsync(key.Key, JsonSerializer.Serialize(new MyVector3()), StackExchange.Redis.CommandFlags.FireAndForget); - // } - - // await tcs.Task; - // } - [GlobalCleanup] - public async Task CleanupAsync() - { - _subscription?.Dispose(); - if (_connection != null) - { - await _connection.DisposeAsync(); - } - - _redis?.Dispose(); - } - - private class Handler - { -#pragma warning disable SA1401 - public int Called; - public int Max; - public object Gate; -#pragma warning restore SA1401 - - public void Handle(MyVector3 vec) - { - if (Interlocked.Increment(ref Called) == Max) - { - lock (Gate) - { - Monitor.PulseAll(Gate); - } - } - } - } -} +BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args); diff --git a/sandbox/MicroBenchmark/SerializationBuffersBench.cs b/sandbox/MicroBenchmark/SerializationBuffersBench.cs new file mode 100644 index 000000000..0f3ff01b3 --- /dev/null +++ b/sandbox/MicroBenchmark/SerializationBuffersBench.cs @@ -0,0 +1,67 @@ +using BenchmarkDotNet.Attributes; +using NATS.Client.Core; +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. + +namespace MicroBenchmark; + +/* +| Method | Iter | Mean | Error | StdDev | Gen0 | Gen1 | Gen2 | Allocated | +|------------------------ |----- |------------:|-------------:|------------:|---------:|--------:|--------:|----------:| +| WaitUntilSentTrue | 64 | 2,828.9 us | 1,637.15 us | 89.74 us | - | - | - | 6996 B | +| WaitUntilSentFalse | 64 | 161.5 us | 39.35 us | 2.16 us | - | - | - | 602 B | +| WaitUntilSentFalseEarly | 64 | 216.9 us | 74.63 us | 4.09 us | 14.4043 | 7.0801 | - | 200043 B | +| WaitUntilSentTrue | 1000 | 43,930.1 us | 48,173.78 us | 2,640.57 us | - | - | - | 105673 B | +| WaitUntilSentFalse | 1000 | 723.5 us | 113.32 us | 6.21 us | 2.9297 | - | - | 48479 B | +| WaitUntilSentFalseEarly | 1000 | 1,136.7 us | 175.79 us | 9.64 us | 183.5938 | 91.7969 | 35.1563 | 2507530 B | + */ +[MemoryDiagnoser] +[ShortRunJob] +[PlainExporter] +public class SerializationBuffersBench +{ + private static readonly string Data = new('0', 126); + private static readonly NatsPubOpts OptsWaitUntilSentTrue = new() { WaitUntilSent = true }; + private static readonly NatsPubOpts OptsWaitUntilSentFalse = new() { WaitUntilSent = false }; + private static readonly NatsPubOpts OptsWaitUntilSentFalseEarly = new() { WaitUntilSent = false, SerializeEarly = true }; + + private NatsConnection _nats; + + [Params(64, 1_000)] + public int Iter { get; set; } + + [GlobalSetup] + public void Setup() => _nats = new NatsConnection(); + + [Benchmark] + public async ValueTask WaitUntilSentTrue() + { + for (var i = 0; i < Iter; i++) + { + await _nats.PublishAsync("foo", Data, opts: OptsWaitUntilSentTrue); + } + + return await _nats.PingAsync(); + } + + [Benchmark] + public async ValueTask WaitUntilSentFalse() + { + for (var i = 0; i < Iter; i++) + { + await _nats.PublishAsync("foo", Data, opts: OptsWaitUntilSentFalse); + } + + return await _nats.PingAsync(); + } + + [Benchmark] + public async ValueTask WaitUntilSentFalseEarly() + { + for (var i = 0; i < Iter; i++) + { + await _nats.PublishAsync("foo", Data, opts: OptsWaitUntilSentFalseEarly); + } + + return await _nats.PingAsync(); + } +} diff --git a/src/NATS.Client.Core/Commands/ProtocolWriter.cs b/src/NATS.Client.Core/Commands/ProtocolWriter.cs index f080c27e9..78ec5c7e2 100644 --- a/src/NATS.Client.Core/Commands/ProtocolWriter.cs +++ b/src/NATS.Client.Core/Commands/ProtocolWriter.cs @@ -48,8 +48,10 @@ public void WritePong() // https://docs.nats.io/reference/reference-protocols/nats-protocol#pub // PUB [reply-to] <#bytes>\r\n[payload]\r\n - public void WritePublish(string subject, string? replyTo, NatsHeaders? headers, ReadOnlySequence payload) + public void WritePublish(string subject, string? replyTo, NatsHeaders? headers, ReadOnlySequence payload, ReadOnlySpan payloadAsSpan = default) { + var payloadLength = payloadAsSpan == default ? payload.Length : payloadAsSpan.Length; + // We use a separate buffer to write the headers so that we can calculate the // size before we write to the output buffer '_writer'. if (headers != null) @@ -70,14 +72,14 @@ public void WritePublish(string subject, string? replyTo, NatsHeaders? headers, if (headers == null) { - _writer.WriteNumber(payload.Length); + _writer.WriteNumber(payloadLength); } else { var headersLength = _bufferHeaders.WrittenSpan.Length; _writer.WriteNumber(CommandConstants.NatsHeaders10NewLine.Length + headersLength); _writer.WriteSpace(); - var total = CommandConstants.NatsHeaders10NewLine.Length + headersLength + payload.Length; + var total = CommandConstants.NatsHeaders10NewLine.Length + headersLength + payloadLength; _writer.WriteNumber(total); } @@ -90,9 +92,16 @@ public void WritePublish(string subject, string? replyTo, NatsHeaders? headers, _writer.WriteSpan(_bufferHeaders.WrittenSpan); } - if (payload.Length != 0) + if (payloadLength != 0) { - _writer.WriteSequence(payload); + if (payloadAsSpan == default) + { + _writer.WriteSequence(payload); + } + else + { + _writer.WriteSpan(payloadAsSpan); + } } _writer.WriteNewLine(); diff --git a/src/NATS.Client.Core/Commands/PublishCommand.cs b/src/NATS.Client.Core/Commands/PublishCommand.cs index e6facb783..bf358ac4d 100644 --- a/src/NATS.Client.Core/Commands/PublishCommand.cs +++ b/src/NATS.Client.Core/Commands/PublishCommand.cs @@ -3,29 +3,38 @@ namespace NATS.Client.Core.Commands; -internal sealed class PublishCommand : CommandBase> +internal class BufferObjectPoolNode : IObjectPoolNode { - // This buffer will be pooled with this command object - private readonly FixedArrayBufferWriter _buffer = new(); + private BufferObjectPoolNode? _next; + + public BufferObjectPoolNode(FixedArrayBufferWriter writer) => Writer = writer; + + public ref BufferObjectPoolNode? NextNode => ref _next; + + public FixedArrayBufferWriter Writer { get; } +} +internal sealed class PublishCommand : CommandBase> +{ + private readonly ObjectPool _pool; + private BufferObjectPoolNode? _buffer; private string? _subject; private string? _replyTo; private NatsHeaders? _headers; private T? _value; private INatsSerializer? _serializer; + private bool _serializeEarly; private CancellationToken _cancellationToken; - private PublishCommand() - { - } + private PublishCommand(ObjectPool pool) => _pool = pool; public override bool IsCanceled => _cancellationToken.IsCancellationRequested; - public static PublishCommand Create(ObjectPool pool, string subject, string? replyTo, NatsHeaders? headers, T? value, INatsSerializer serializer, CancellationToken cancellationToken) + public static PublishCommand Create(ObjectPool pool, string subject, string? replyTo, NatsHeaders? headers, T? value, INatsSerializer serializer, bool serializeEarly, CancellationToken cancellationToken) { if (!TryRent(pool, out var result)) { - result = new PublishCommand(); + result = new PublishCommand(pool); } result._subject = subject; @@ -33,19 +42,42 @@ public static PublishCommand Create(ObjectPool pool, string subject, string? result._headers = headers; result._value = value; result._serializer = serializer; + result._serializeEarly = serializeEarly; result._cancellationToken = cancellationToken; - // Serialize data as soon as possible to propagate any exceptions - // to the caller so that publish method will throw the exception - result._buffer.Reset(); - serializer.Serialize(result._buffer, value); + if (serializeEarly) + { + // Serialize data as soon as possible to propagate any exceptions + // to the caller so that publish method will throw the exception + if (!result._pool.TryRent(out result._buffer)) + { + result._buffer = new BufferObjectPoolNode(new FixedArrayBufferWriter()); + } + + result._buffer.Writer.Reset(); + serializer.Serialize(result._buffer.Writer, value); + } return result; } public override void Write(ProtocolWriter writer) { - writer.WritePublish(_subject!, _replyTo, _headers, new ReadOnlySequence(_buffer.WrittenMemory)); + if (_serializeEarly) + { + try + { + writer.WritePublish(_subject!, _replyTo, _headers, default, _buffer!.Writer.WrittenSpan); + } + finally + { + _pool.Return(_buffer!); + } + } + else + { + writer.WritePublish(_subject!, _replyTo, _headers, _value, _serializer!); + } } protected override void Reset() @@ -55,6 +87,8 @@ protected override void Reset() _value = default; _serializer = null; _cancellationToken = default; + _buffer = default; + _serializeEarly = default; } } diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs index c010d43d1..0156320b2 100644 --- a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -28,20 +28,20 @@ internal ValueTask PubPostAsync(string subject, string? replyTo = default, ReadO } } - internal ValueTask PubModelPostAsync(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) + internal ValueTask PubModelPostAsync(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, bool serializeEarly = default, CancellationToken cancellationToken = default) { headers?.SetReadOnly(); if (ConnectionState == NatsConnectionState.Open) { - var command = PublishCommand.Create(_pool, subject, replyTo, headers, data, serializer, cancellationToken); + var command = PublishCommand.Create(_pool, subject, replyTo, headers, data, serializer, serializeEarly, cancellationToken); return EnqueueCommandAsync(command); } else { - return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, d, ser, c) => + return WithConnectAsync(subject, replyTo, headers, data, serializer, serializeEarly, cancellationToken, static (self, s, r, h, d, ser, se, c) => { - var command = PublishCommand.Create(self._pool, s, r, h, d, ser, c); + var command = PublishCommand.Create(self._pool, s, r, h, d, ser, se, c); return self.EnqueueCommandAsync(command); }); } diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 388908106..5829e28bb 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -33,7 +33,7 @@ public ValueTask PublishAsync(string subject, T? data, NatsHeaders? headers = } else { - return PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken); + return PubModelPostAsync(subject, data, serializer, replyTo, headers, opts?.SerializeEarly ?? false, cancellationToken); } } diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index cbdfa5320..26b740b11 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -49,7 +49,7 @@ internal async ValueTask> RequestSubAsync( } else { - await PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken).ConfigureAwait(false); + await PubModelPostAsync(subject, data, serializer, replyTo, headers, requestOpts?.SerializeEarly ?? false, cancellationToken).ConfigureAwait(false); } return sub; diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index cf578cbee..0d5d6c9d0 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -747,6 +747,12 @@ private async ValueTask WithConnectAsync(T1 item1, T2 it await coreAsync(this, item1, item2, item3, item4, item5, item6).ConfigureAwait(false); } + private async ValueTask WithConnectAsync(T1 item1, T2 item2, T3 item3, T4 item4, T5 item5, T6 item6, T7 item7, Func coreAsync) + { + await ConnectAsync().ConfigureAwait(false); + await coreAsync(this, item1, item2, item3, item4, item5, item6, item7).ConfigureAwait(false); + } + private async ValueTask WithConnectAsync(Func> coreAsync) { await ConnectAsync().ConfigureAwait(false); diff --git a/src/NATS.Client.Core/NatsPubOpts.cs b/src/NATS.Client.Core/NatsPubOpts.cs index dc1d14539..d5318400d 100644 --- a/src/NATS.Client.Core/NatsPubOpts.cs +++ b/src/NATS.Client.Core/NatsPubOpts.cs @@ -9,4 +9,6 @@ public record NatsPubOpts /// Default value is false, and calls to PublishAsync will complete after the publish command has been written to the Command Channel /// public bool? WaitUntilSent { get; init; } + + public bool? SerializeEarly { get; init; } } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 64b7e2df6..88f0ea730 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -165,6 +165,7 @@ internal override IEnumerable GetReconnectCommands(int sid) headers: default, value: request, serializer: NatsJsonSerializer.Default, + serializeEarly: true, cancellationToken: default); } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index c04049b15..f5f29881a 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -151,6 +151,7 @@ internal override IEnumerable GetReconnectCommands(int sid) headers: default, value: request, serializer: NatsJsonSerializer.Default, + serializeEarly: true, cancellationToken: default); } From 0ac7b8f14b073eea96ed69580acf9588d5475875 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 3 Oct 2023 09:40:44 +0100 Subject: [PATCH 3/3] Serialization test fix --- tests/NATS.Client.Core.Tests/SerializerTest.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs index 4eeda6b12..afeb3a75e 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; namespace NATS.Client.Core.Tests; @@ -19,7 +19,7 @@ await Assert.ThrowsAsync(async () => await nats.PublishAsync( "foo", 0, - opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = false, }); + opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = false, SerializeEarly = true }); }); await Assert.ThrowsAsync(async () => @@ -27,7 +27,7 @@ await Assert.ThrowsAsync(async () => await nats.PublishAsync( "foo", 0, - opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = true, }); + opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = true }); }); } }