From 4b231fc7ee000388f59858b197b6246888a0264a Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 04:32:53 +0100 Subject: [PATCH 01/15] Add flags to NatsMsg --- src/NATS.Client.Core/NatsMsg.cs | 162 +++++++++++++++--- src/NATS.Client.Core/NatsSubBase.cs | 6 +- .../NatsMsgTests.cs | 27 +++ 3 files changed, 170 insertions(+), 25 deletions(-) create mode 100644 tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 3a1f0d929..3e07e9678 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -4,6 +4,14 @@ namespace NATS.Client.Core; +[Flags] +public enum NatsMsgFlags : byte +{ + None = 0, + Empty = 1, + NoResponders = 2, +} + /// /// This interface provides an optional contract when passing /// messages to processing methods which is usually helpful in @@ -103,12 +111,6 @@ public interface INatsMsg /// /// NATS message structure as defined by the protocol. /// -/// The destination subject to publish to. -/// The reply subject that subscribers can use to send a response back to the publisher/requester. -/// Message size in bytes. -/// Pass additional information using name-value pairs. -/// Serializable data object. -/// NATS connection this message is associated to. /// Specifies the type of data that may be sent to the NATS Server. /// /// Connection property is used to provide reply functionality. @@ -119,20 +121,108 @@ public interface INatsMsg /// /// /// -public readonly record struct NatsMsg( - string Subject, - string? ReplyTo, - int Size, - NatsHeaders? Headers, - T? Data, - INatsConnection? Connection) : INatsMsg +public readonly record struct NatsMsg : INatsMsg { + private readonly uint _flagsAndSize; + + /// + /// NATS message structure as defined by the protocol. + /// + /// The destination subject to publish to. + /// The reply subject that subscribers can use to send a response back to the publisher/requester. + /// Message size in bytes. + /// Pass additional information using name-value pairs. + /// Serializable data object. + /// NATS connection this message is associated to. + /// Message flags to indicate no responders and empty payloads. + /// Specifies the type of data that may be sent to the NATS Server. + /// + /// Connection property is used to provide reply functionality. + /// + /// Message size is calculated using the same method NATS server uses: + /// + /// int size = subject.Length + replyTo.Length + headers.Length + payload.Length; + /// + /// + /// + public NatsMsg( + string subject, + string? replyTo, + int size, + NatsHeaders? headers, + T? data, + INatsConnection? connection, + NatsMsgFlags flags = default) + { + Subject = subject; + ReplyTo = replyTo; + Size = size; + Flags = flags; + Headers = headers; + Data = data; + Connection = connection; + } + /// public NatsException? Error => Headers?.Error; + /// The destination subject to publish to. + public string Subject { get; init; } + + /// The reply subject that subscribers can use to send a response back to the publisher/requester. + public string? ReplyTo { get; init; } + + /// Message size in bytes. + public int Size + { + // Extract the lower 30 bits + get => (int)(_flagsAndSize & 0x3FFFFFFF); + + // Clear the lower 30 bits and set the new number + init + { + // Mask the input value to fit within 30 bits (clear upper bits) + var numberPart = (uint)(value & 0x3FFFFFFF); + + // Clear the lower 30 bits and set the new number value + // Preserve the flags, update the number + _flagsAndSize = (_flagsAndSize & 0xC0000000) | numberPart; + } + } + + public NatsMsgFlags Flags + { + // Extract the two leftmost bits (31st and 30th bit) + // Mask with 0b11 to get two bits + get => (NatsMsgFlags)((_flagsAndSize >> 30) & 0b11); + + init + { + // Clear the current flag bits (set to 0) and then set the new flag value + var flagsPart = (uint)value << 30; + _flagsAndSize = (_flagsAndSize & 0x3FFFFFFF) | flagsPart; + } + } + + /// Pass additional information using name-value pairs. + public NatsHeaders? Headers { get; init; } + + /// Serializable data object. + public T? Data { get; init; } + + /// NATS connection this message is associated to. + public INatsConnection? Connection { get; init; } + + public bool IsEmpty => (Flags & NatsMsgFlags.Empty) == NatsMsgFlags.Empty; + + public bool HasNoResponders => (Flags & NatsMsgFlags.NoResponders) == NatsMsgFlags.NoResponders; + /// public void EnsureSuccess() { + if (HasNoResponders) + throw new NatsNoRespondersException(); + if (Error != null) throw Error; } @@ -197,6 +287,17 @@ public ValueTask ReplyAsync(NatsMsg msg, INatsSerialize? return Connection.PublishAsync(msg with { Subject = ReplyTo }, serializer, opts, cancellationToken); } + public void Deconstruct(out string subject, out string? replyTo, out int size, out NatsHeaders? headers, out T? data, out INatsConnection? connection, out NatsMsgFlags flags) + { + subject = Subject; + replyTo = ReplyTo; + size = Size; + headers = Headers; + data = Data; + connection = Connection; + flags = Flags; + } + internal static NatsMsg Build( string subject, string? replyTo, @@ -207,22 +308,35 @@ internal static NatsMsg Build( INatsDeserialize serializer) { NatsHeaders? headers = null; + var flags = NatsMsgFlags.None; - if (headersBuffer != null) + if (payloadBuffer.Length == 0) { - headers = new NatsHeaders(); + flags |= NatsMsgFlags.Empty; + } - try + if (headersBuffer != null) + { + if (NatsSubBase.IsHeader503(headersBuffer)) { - // Parsing can also throw an exception. - if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) - { - throw new NatsException("Error parsing headers"); - } + flags |= NatsMsgFlags.NoResponders; } - catch (Exception e) + else { - headers.Error ??= new NatsHeaderParseException(headersBuffer.Value.ToArray(), e); + headers = new NatsHeaders(); + + try + { + // Parsing can also throw an exception. + if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + { + throw new NatsException("Error parsing headers"); + } + } + catch (Exception e) + { + headers.Error ??= new NatsHeaderParseException(headersBuffer.Value.ToArray(), e); + } } } @@ -277,7 +391,7 @@ internal static NatsMsg Build( } } - return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); + return new NatsMsg(subject, replyTo, (int)size, headers, data, connection, flags); } [MemberNotNull(nameof(Connection))] diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index c6c2d699b..2737cb6b7 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -271,7 +271,7 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea { switch (Opts) { - case { ThrowIfNoResponders: true } when headersBuffer is { Length: >= 12 } && headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence): + case { ThrowIfNoResponders: true } when IsHeader503(headersBuffer): SetException(new NatsNoRespondersException()); return; case { StopOnEmptyMsg: true }: @@ -311,6 +311,10 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea } } + internal static bool IsHeader503(ReadOnlySequence? headersBuffer) => + headersBuffer is { Length: >= 12 } + && headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence); + internal void ClearException() => Interlocked.Exchange(ref _exception, null); /// diff --git a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs new file mode 100644 index 000000000..9b3d5dd3c --- /dev/null +++ b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs @@ -0,0 +1,27 @@ +using System.Runtime.CompilerServices; + +namespace NATS.Client.CoreUnit.Tests; + +public class NatsMsgTests +{ + [Theory] + [InlineData(42, NatsMsgFlags.None, false, false)] + [InlineData(42, NatsMsgFlags.Empty, true, false)] + [InlineData(42, NatsMsgFlags.NoResponders, false, true)] + [InlineData(42, NatsMsgFlags.Empty | NatsMsgFlags.NoResponders, true, true)] + [InlineData(1024 * 1024 * 128, NatsMsgFlags.Empty, true, false)] + public void Size_and_flags(int size, NatsMsgFlags flags, bool isEmpty, bool hasNoResponders) + { + var msg = new NatsMsg { Size = size, Flags = flags }; + Assert.Equal(size, msg.Size); + Assert.Equal(flags, msg.Flags); + Assert.Equal(isEmpty, msg.IsEmpty); + Assert.Equal(hasNoResponders, msg.HasNoResponders); + } + + [Fact] + public void Check_struct_size() + { + Assert.Equal(48, Unsafe.SizeOf>()); + } +} From d8c80f9640631c127f98b21ce2a39227e19a5e25 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 04:48:47 +0100 Subject: [PATCH 02/15] Code comments for msg size --- src/NATS.Client.Core/NatsMsg.cs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 3e07e9678..4014ed091 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -123,6 +123,20 @@ public interface INatsMsg /// public readonly record struct NatsMsg : INatsMsg { + /* + 2 30 + +--+------------------------------+ + |EN| Message Size | + +--+------------------------------+ + E: Empty flag + N: No responders flag + + # Size is 30 bits: + Max Size: 1,073,741,823 (0x3FFFFFFF / 00111111111111111111111111111111) + Uint.Max: 4,294,967,295 + Int.Max: 2,147,483,647 + 8mb: 8,388,608 + */ private readonly uint _flagsAndSize; /// From be0334fbc6cef024874d032d26b648404f5f3eff Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 04:51:07 +0100 Subject: [PATCH 03/15] Format --- tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs index 9b3d5dd3c..e482d1c03 100644 --- a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs @@ -1,4 +1,4 @@ -using System.Runtime.CompilerServices; +using System.Runtime.CompilerServices; namespace NATS.Client.CoreUnit.Tests; From dfa7d0314532f7eacc1b0ec90e5ac33a55c03569 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 05:01:53 +0100 Subject: [PATCH 04/15] Fix warnings --- src/NATS.Client.Core/NatsMsg.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 4014ed091..c71cabb05 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -149,7 +149,6 @@ 2 30 /// Serializable data object. /// NATS connection this message is associated to. /// Message flags to indicate no responders and empty payloads. - /// Specifies the type of data that may be sent to the NATS Server. /// /// Connection property is used to provide reply functionality. /// From d68ec5baa420771aac22d51160e93ae4f500f75b Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 05:27:25 +0100 Subject: [PATCH 05/15] Keep 503 header --- src/NATS.Client.Core/NatsMsg.cs | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index c71cabb05..cd2bc878b 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -326,31 +326,27 @@ internal static NatsMsg Build( if (payloadBuffer.Length == 0) { flags |= NatsMsgFlags.Empty; - } - - if (headersBuffer != null) - { if (NatsSubBase.IsHeader503(headersBuffer)) { flags |= NatsMsgFlags.NoResponders; } - else - { - headers = new NatsHeaders(); + } - try - { - // Parsing can also throw an exception. - if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) - { - throw new NatsException("Error parsing headers"); - } - } - catch (Exception e) + if (headersBuffer != null) + { + headers = new NatsHeaders(); + try + { + // Parsing can also throw an exception. + if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) { - headers.Error ??= new NatsHeaderParseException(headersBuffer.Value.ToArray(), e); + throw new NatsException("Error parsing headers"); } } + catch (Exception e) + { + headers.Error ??= new NatsHeaderParseException(headersBuffer.Value.ToArray(), e); + } } headers?.SetReadOnly(); From 93c3ab7c3e11431f06952c2ef7bfa40233a81ac4 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 05:29:57 +0100 Subject: [PATCH 06/15] Format --- src/NATS.Client.Core/NatsMsg.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index cd2bc878b..13d884c68 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -335,6 +335,7 @@ internal static NatsMsg Build( if (headersBuffer != null) { headers = new NatsHeaders(); + try { // Parsing can also throw an exception. From a4517e03a469594b2a14c0868bf6570346c25cce Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 07:18:10 +0100 Subject: [PATCH 07/15] JetStream TryJSRequestAsync --- .../NatsJSErrorAwareJsonSerializer.cs | 41 ---------- .../NatsJSForcedJsonDocumentSerializer.cs | 21 ++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 74 +++++++++++++++---- src/NATS.Client.JetStream/NatsJSException.cs | 6 ++ .../NatsJSJsonSerializer.cs | 37 +++++----- .../NatsMsgTests.cs | 51 +++++++++++++ 6 files changed, 158 insertions(+), 72 deletions(-) delete mode 100644 src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs create mode 100644 src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs diff --git a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs deleted file mode 100644 index be22a165b..000000000 --- a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Buffers; -using System.Text.Json; -using NATS.Client.Core; -using NATS.Client.JetStream.Models; - -namespace NATS.Client.JetStream.Internal; - -internal sealed class NatsJSErrorAwareJsonSerializer : INatsDeserialize -{ - public static readonly NatsJSErrorAwareJsonSerializer Default = new(); - - public T? Deserialize(in ReadOnlySequence buffer) - { - if (buffer.Length == 0) - { - return default; - } - - // We need to determine what type we're deserializing into - // .NET 6 new APIs to the rescue: we can read the buffer once - // by deserializing into a document, inspect and using the new - // API deserialize to the final type from the document. - var jsonDocument = JsonDocument.Parse(buffer); - if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) - { - var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); - throw new NatsJSApiErrorException(error); - } - - return NatsJSJsonSerializer.Default.Deserialize(buffer); - } -} - -internal class NatsJSApiErrorException : NatsException -{ - public NatsJSApiErrorException(ApiError error) - : base("JetStream API error") - => Error = error; - - public ApiError Error { get; } -} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs new file mode 100644 index 000000000..5eee1ab2b --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs @@ -0,0 +1,21 @@ +using System.Buffers; +using System.Text.Json; +using NATS.Client.Core; + +namespace NATS.Client.JetStream.Internal; + +internal sealed class NatsJSForcedJsonDocumentSerializer : INatsDeserialize +{ + public static readonly NatsJSForcedJsonDocumentSerializer Default = new(); + + public T? Deserialize(in ReadOnlySequence buffer) + { + if (buffer.Length == 0) + { + return default; + } + + // Force return JsonDocument instead of T + return (T)(object)JsonDocument.Parse(buffer); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 8076dccec..a8e49e99c 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using System.Text.Json; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream.Internal; @@ -285,6 +286,25 @@ internal async ValueTask> JSRequestAsync(subject, request, cancellationToken).ConfigureAwait(false); + if (exception != null) + { + throw exception; + } + + if (response != null) + return response.Value; + + throw new Exception("State error: No response received"); + } + + internal async ValueTask<(NatsJSResponse?, Exception?)> TryJSRequestAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class { if (request != null) { @@ -292,42 +312,62 @@ internal async ValueTask> JSRequestAsync( + await using var sub = await Connection.CreateRequestSubAsync( subject: subject, data: request, headers: default, replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, requestSerializer: NatsJSJsonSerializer.Default, - replySerializer: NatsJSErrorAwareJsonSerializer.Default, + replySerializer: NatsJSForcedJsonDocumentSerializer.Default, cancellationToken: cancellationToken) .ConfigureAwait(false); await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { - if (msg.Error is { } error) + // We need to determine what type we're deserializing into + // .NET 6 new APIs to the rescue: we can read the buffer once + // by deserializing into a document, inspect and using the new + // API deserialize to the final type from the document. + if (msg.Data == null) { - if (error.InnerException is NatsJSApiErrorException jsError) - { - return new NatsJSResponse(default, jsError.Error); - } + return (default, new NatsJSException("No response data received")); + } + + var jsonDocument = msg.Data; - throw error; + if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) + { + var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); + return (new NatsJSResponse(default, error), default); } - if (msg.Data == null) + var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse)); + if (jsonTypeInfo == null) { - throw new NatsJSException("No response data received"); + return (default, new NatsJSException($"Unknown response type {typeof(TResponse)}")); } - return new NatsJSResponse(msg.Data, default); + var response = (TResponse?)errorElement.Deserialize(jsonTypeInfo); + + if (msg.Error is { } messageError) + { + return (default, messageError); + } + + return (new NatsJSResponse(response, default), default); } if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb) { - throw sb.Exception; + return (default, sb.Exception); + } + + if (sub.EndReason != NatsSubEndReason.None) + { + return (default, new NatsJSApiNoResponseException(sub.EndReason)); } - throw new NatsJSApiNoResponseException(); + return (default, new NatsJSApiNoResponseException()); } private static void ConvertDomain(StreamSource streamSource) @@ -352,4 +392,12 @@ private static void ThrowInvalidStreamNameException(string? paramName) => [DoesNotReturn] private static void ThrowEmptyException(string? paramName) => throw new ArgumentException("The value cannot be an empty string.", paramName); + // + // [DoesNotReturn] + // private static void ThrowNoResponseDataReceived() => + // throw new NatsJSException("No response data received"); + // + // [DoesNotReturn] + // private static void ThrowUnknownResponseType() => + // throw new NatsJSException($"Unknown response type {typeof(TResponse)}"); } diff --git a/src/NATS.Client.JetStream/NatsJSException.cs b/src/NATS.Client.JetStream/NatsJSException.cs index 4f621a86e..14795338c 100644 --- a/src/NATS.Client.JetStream/NatsJSException.cs +++ b/src/NATS.Client.JetStream/NatsJSException.cs @@ -106,6 +106,12 @@ public NatsJSApiNoResponseException() : base("No API response received from the server") { } + + public NatsJSApiNoResponseException(NatsSubEndReason reason) + : base($"No API response received from the server: {reason}") => + Reason = reason; + + public NatsSubEndReason Reason { get; } } public class NatsJSTimeoutException : NatsJSException diff --git a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs index 375be743f..f6b84a0d7 100644 --- a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs +++ b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs @@ -7,24 +7,7 @@ namespace NATS.Client.JetStream; public static class NatsJSJsonSerializer { -#if NET6_0 - public static readonly INatsSerializer Default = new NatsJsonContextSerializer(NatsJSJsonSerializerContext.Default); -#else - public static readonly INatsSerializer Default = new NatsJsonContextSerializer(new NatsJSJsonSerializerContext(new JsonSerializerOptions - { - Converters = - { - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - }, - })); -#endif + public static readonly INatsSerializer Default = new NatsJsonContextSerializer(NatsJSJsonSerializerContext.DefaultContext); } [JsonSerializable(typeof(AccountInfoResponse))] @@ -104,6 +87,24 @@ public static class NatsJSJsonSerializer [JsonSerializable(typeof(Tier))] internal partial class NatsJSJsonSerializerContext : JsonSerializerContext { +#if NET6_0 + internal static readonly NatsJSJsonSerializerContext DefaultContext = NatsJSJsonSerializerContext.Default; +#else + internal static readonly NatsJSJsonSerializerContext DefaultContext = new NatsJSJsonSerializerContext(new JsonSerializerOptions + { + Converters = + { + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + }, + }); +#endif } #if NET6_0 diff --git a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs index e482d1c03..d5dab6241 100644 --- a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs @@ -22,6 +22,57 @@ public void Size_and_flags(int size, NatsMsgFlags flags, bool isEmpty, bool hasN [Fact] public void Check_struct_size() { + // Using https://github.com/SergeyTeplyakov/ObjectLayoutInspector + /* Size: 48 bytes. Paddings: 4 bytes (%8 of empty space) + |==============================================================| + | 0-7: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 8-15: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 16-23: NatsHeaders k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 24-31: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 32-39: INatsConnection k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 40-43: UInt32 _flagsAndSize (4 bytes) | + |--------------------------------------------------------------| + | 44-47: padding (4 bytes) | + |==============================================================| */ Assert.Equal(48, Unsafe.SizeOf>()); + + /* Size: 40 bytes. Paddings: 0 bytes (%0 of empty space) + |==============================================================| + | 0-7: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 8-15: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 16-23: NatsHeaders k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 24-31: INatsConnection k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 32-35: UInt32 _flagsAndSize (4 bytes) | + |--------------------------------------------------------------| + | 36-39: Int32 k__BackingField (4 bytes) | + |==============================================================| */ + Assert.Equal(40, Unsafe.SizeOf>()); + + /* Size: 40 bytes. Paddings: 3 bytes (%7 of empty space) + |==============================================================| + | 0-7: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 8-15: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 16-23: NatsHeaders k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 24-31: INatsConnection k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 32-35: UInt32 _flagsAndSize (4 bytes) | + |--------------------------------------------------------------| + | 36: Byte k__BackingField (1 byte) | + |--------------------------------------------------------------| + | 37-39: padding (3 bytes) | + |==============================================================| */ + Assert.Equal(40, Unsafe.SizeOf>()); } } From 2f8e90e8e7cb24a950c448d2cda95ea04abe1757 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 07:24:15 +0100 Subject: [PATCH 08/15] Format --- src/NATS.Client.JetStream/NatsJSContext.cs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index a8e49e99c..3994a4a6e 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -392,12 +392,4 @@ private static void ThrowInvalidStreamNameException(string? paramName) => [DoesNotReturn] private static void ThrowEmptyException(string? paramName) => throw new ArgumentException("The value cannot be an empty string.", paramName); - // - // [DoesNotReturn] - // private static void ThrowNoResponseDataReceived() => - // throw new NatsJSException("No response data received"); - // - // [DoesNotReturn] - // private static void ThrowUnknownResponseType() => - // throw new NatsJSException($"Unknown response type {typeof(TResponse)}"); } From a64fd08eb6003b19a119f08fab007d2675675cc3 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 16:57:37 +0100 Subject: [PATCH 09/15] Fix tests --- src/NATS.Client.JetStream/NatsJSContext.cs | 2 +- src/NATS.Client.JetStream/NatsJSJsonSerializer.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 3994a4a6e..3a1d6ac44 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -347,7 +347,7 @@ internal async ValueTask> JSRequestAsync internal partial class NatsJSJsonSerializerContext : JsonSerializerContext { #if NET6_0 - internal static readonly NatsJSJsonSerializerContext DefaultContext = NatsJSJsonSerializerContext.Default; + internal static readonly NatsJSJsonSerializerContext DefaultContext = new NatsJSJsonSerializerContext(new JsonSerializerOptions()); #else internal static readonly NatsJSJsonSerializerContext DefaultContext = new NatsJSJsonSerializerContext(new JsonSerializerOptions { From 71ed9d9f3d3c94f2bd78b9a4cca34edba2b9f8eb Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 13 Oct 2024 02:32:12 +0100 Subject: [PATCH 10/15] Refactor JetStream method results to use NatsResult --- src/NATS.Client.Core/NatsResult.cs | 38 ++++++++++++++++++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 29 ++++++++--------- 2 files changed, 51 insertions(+), 16 deletions(-) create mode 100644 src/NATS.Client.Core/NatsResult.cs diff --git a/src/NATS.Client.Core/NatsResult.cs b/src/NATS.Client.Core/NatsResult.cs new file mode 100644 index 000000000..3b246aa6b --- /dev/null +++ b/src/NATS.Client.Core/NatsResult.cs @@ -0,0 +1,38 @@ +using System.Runtime.CompilerServices; + +namespace NATS.Client.Core; + +public readonly struct NatsResult +{ + private readonly T? _value; + private readonly Exception? _error; + + public NatsResult(T value) + { + _value = value; + _error = null; + } + + public NatsResult(Exception error) + { + _value = default; + _error = error; + } + + public T Value => _value ?? ThrowValueIsNotSetException(); + + public Exception Error => _error ?? ThrowErrorIsNotSetException(); + + public bool Success => _error == null; + + public static implicit operator NatsResult(T value) => new(value); + + public static implicit operator NatsResult(Exception error) => new(error); + + private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set"); + + private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set"); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message); +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 3a1d6ac44..c90f225e2 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -287,19 +287,16 @@ internal async ValueTask> JSRequestAsync(subject, request, cancellationToken).ConfigureAwait(false); - if (exception != null) + var result = await TryJSRequestAsync(subject, request, cancellationToken).ConfigureAwait(false); + if (!result.Success) { - throw exception; + throw result.Error; } - if (response != null) - return response.Value; - - throw new Exception("State error: No response received"); + return result.Value; } - internal async ValueTask<(NatsJSResponse?, Exception?)> TryJSRequestAsync( + internal async ValueTask>> TryJSRequestAsync( string subject, TRequest? request, CancellationToken cancellationToken = default) @@ -330,7 +327,7 @@ internal async ValueTask> JSRequestAsync> JSRequestAsync(default, error), default); + return new NatsJSResponse(default, error); } var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse)); if (jsonTypeInfo == null) { - return (default, new NatsJSException($"Unknown response type {typeof(TResponse)}")); + return new NatsJSException($"Unknown response type {typeof(TResponse)}"); } var response = (TResponse?)jsonDocument.RootElement.Deserialize(jsonTypeInfo); if (msg.Error is { } messageError) { - return (default, messageError); + return messageError; } - return (new NatsJSResponse(response, default), default); + return new NatsJSResponse(response, default); } if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb) { - return (default, sb.Exception); + return sb.Exception; } if (sub.EndReason != NatsSubEndReason.None) { - return (default, new NatsJSApiNoResponseException(sub.EndReason)); + return new NatsJSApiNoResponseException(sub.EndReason); } - return (default, new NatsJSApiNoResponseException()); + return new NatsJSApiNoResponseException(); } private static void ConvertDomain(StreamSource streamSource) From 7e5b6997a23f6c8ea0eeec0ec636151a1b165942 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 13 Oct 2024 03:00:21 +0100 Subject: [PATCH 11/15] Refactor NatsMsg size and flags handling in tests and class --- src/NATS.Client.Core/NatsMsg.cs | 3 +- .../NatsMsgTests.cs | 31 ++++++++++++++++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 13d884c68..d837e312f 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -169,8 +169,7 @@ public NatsMsg( { Subject = subject; ReplyTo = replyTo; - Size = size; - Flags = flags; + _flagsAndSize = ((uint)flags << 30) | (uint)(size & 0x3FFFFFFF); Headers = headers; Data = data; Connection = connection; diff --git a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs index d5dab6241..3500eefe9 100644 --- a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs @@ -10,13 +10,34 @@ public class NatsMsgTests [InlineData(42, NatsMsgFlags.NoResponders, false, true)] [InlineData(42, NatsMsgFlags.Empty | NatsMsgFlags.NoResponders, true, true)] [InlineData(1024 * 1024 * 128, NatsMsgFlags.Empty, true, false)] + [InlineData(1024 * 1024 * 1000, NatsMsgFlags.Empty, true, false)] + [InlineData(0b00111111111111111111111111111111, NatsMsgFlags.Empty, true, false)] public void Size_and_flags(int size, NatsMsgFlags flags, bool isEmpty, bool hasNoResponders) { - var msg = new NatsMsg { Size = size, Flags = flags }; - Assert.Equal(size, msg.Size); - Assert.Equal(flags, msg.Flags); - Assert.Equal(isEmpty, msg.IsEmpty); - Assert.Equal(hasNoResponders, msg.HasNoResponders); + AssertSizeAndFlags(new NatsMsg + { + Size = size, + Flags = flags, + }); + + AssertSizeAndFlags(new NatsMsg( + subject: "x", + replyTo: null, + size: size, + headers: null, + data: null, + connection: null, + flags: flags)); + + return; + + void AssertSizeAndFlags(NatsMsg msg) + { + Assert.Equal(size, msg.Size); + Assert.Equal(flags, msg.Flags); + Assert.Equal(isEmpty, msg.IsEmpty); + Assert.Equal(hasNoResponders, msg.HasNoResponders); + } } [Fact] From 643e3d532760c7bd7763d0762e74fe5283041798 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 13 Oct 2024 03:12:17 +0100 Subject: [PATCH 12/15] Refactor JSON document deserialization --- .../NatsJSForcedJsonDocumentSerializer.cs | 21 ------------------- .../Internal/NatsJSJsonDocumentSerializer.cs | 12 +++++++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 2 +- 3 files changed, 13 insertions(+), 22 deletions(-) delete mode 100644 src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs create mode 100644 src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs diff --git a/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs deleted file mode 100644 index 5eee1ab2b..000000000 --- a/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System.Buffers; -using System.Text.Json; -using NATS.Client.Core; - -namespace NATS.Client.JetStream.Internal; - -internal sealed class NatsJSForcedJsonDocumentSerializer : INatsDeserialize -{ - public static readonly NatsJSForcedJsonDocumentSerializer Default = new(); - - public T? Deserialize(in ReadOnlySequence buffer) - { - if (buffer.Length == 0) - { - return default; - } - - // Force return JsonDocument instead of T - return (T)(object)JsonDocument.Parse(buffer); - } -} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs new file mode 100644 index 000000000..5f930d114 --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs @@ -0,0 +1,12 @@ +using System.Buffers; +using System.Text.Json; +using NATS.Client.Core; + +namespace NATS.Client.JetStream.Internal; + +internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize +{ + public static readonly NatsJSJsonDocumentSerializer Default = new(); + + public JsonDocument? Deserialize(in ReadOnlySequence buffer) => buffer.Length == 0 ? default : JsonDocument.Parse(buffer); +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index c90f225e2..ec4f8c623 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -315,7 +315,7 @@ internal async ValueTask>> TryJSRequestAsyn headers: default, replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, requestSerializer: NatsJSJsonSerializer.Default, - replySerializer: NatsJSForcedJsonDocumentSerializer.Default, + replySerializer: NatsJSJsonDocumentSerializer.Default, cancellationToken: cancellationToken) .ConfigureAwait(false); From 2f9e8152b72b2544203188ca5310146c9bd9db93 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 13 Oct 2024 15:30:26 +0100 Subject: [PATCH 13/15] Refactor NatsJSContext error handling logic. Reordered error checks in `NatsJSContext.cs` to prioritize handling `HasNoResponders` and `null` data conditions early. Modified flag checks in `NatsMsg.cs` for `IsEmpty` and `HasNoResponders` by directly using bitwise operations instead of enum comparisons. --- src/NATS.Client.Core/NatsMsg.cs | 5 +++-- src/NATS.Client.JetStream/NatsJSContext.cs | 13 +++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index d837e312f..8c7541dde 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; +using System.Xml.Linq; using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -225,9 +226,9 @@ public NatsMsgFlags Flags /// NATS connection this message is associated to. public INatsConnection? Connection { get; init; } - public bool IsEmpty => (Flags & NatsMsgFlags.Empty) == NatsMsgFlags.Empty; + public bool IsEmpty => (_flagsAndSize & 0x40000000) != 0; - public bool HasNoResponders => (Flags & NatsMsgFlags.NoResponders) == NatsMsgFlags.NoResponders; + public bool HasNoResponders => (_flagsAndSize & 0x80000000) != 0; /// public void EnsureSuccess() diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index ec4f8c623..359c86bb9 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -321,15 +321,20 @@ internal async ValueTask>> TryJSRequestAsyn await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { - // We need to determine what type we're deserializing into - // .NET 6 new APIs to the rescue: we can read the buffer once - // by deserializing into a document, inspect and using the new - // API deserialize to the final type from the document. + if (msg.HasNoResponders) + { + return new NatsNoRespondersException(); + } + if (msg.Data == null) { return new NatsJSException("No response data received"); } + // We need to determine what type we're deserializing into + // .NET 6 new APIs to the rescue: we can read the buffer once + // by deserializing into a document, inspect and using the new + // API deserialize to the final type from the document. var jsonDocument = msg.Data; if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) From 6a47672056b3dc985d20d31643c7eb90a181b6ef Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 21 Nov 2024 06:57:22 +0000 Subject: [PATCH 14/15] Fix memory leak by disposing JsonDocument properly --- src/NATS.Client.JetStream/NatsJSContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 359c86bb9..8876a200c 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -335,7 +335,7 @@ internal async ValueTask>> TryJSRequestAsyn // .NET 6 new APIs to the rescue: we can read the buffer once // by deserializing into a document, inspect and using the new // API deserialize to the final type from the document. - var jsonDocument = msg.Data; + using var jsonDocument = msg.Data; if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) { From c3357510dac161cbea61e3241253a1c1e0446bde Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 21 Nov 2024 07:22:56 +0000 Subject: [PATCH 15/15] Skip slow tests for unique NUID generation --- tests/NATS.Client.CoreUnit.Tests/NuidTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/NATS.Client.CoreUnit.Tests/NuidTests.cs b/tests/NATS.Client.CoreUnit.Tests/NuidTests.cs index eb4effb70..cbcc17255 100644 --- a/tests/NATS.Client.CoreUnit.Tests/NuidTests.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NuidTests.cs @@ -242,7 +242,7 @@ public void AllNuidsAreUnique() } } - [Fact] + [Fact(Skip = "slow")] public void AllNuidsAreUnique_SmallSequentials() { var writeFailed = false; @@ -285,7 +285,7 @@ public void AllNuidsAreUnique_SmallSequentials() Assert.Equal(string.Empty, duplicateFailure); } - [Fact] + [Fact(Skip = "slow")] public void AllNuidsAreUnique_ZeroSequential() { var writeFailed = false;