From a4517e03a469594b2a14c0868bf6570346c25cce Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 12 Oct 2024 07:18:10 +0100 Subject: [PATCH] JetStream TryJSRequestAsync --- .../NatsJSErrorAwareJsonSerializer.cs | 41 ---------- .../NatsJSForcedJsonDocumentSerializer.cs | 21 ++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 74 +++++++++++++++---- src/NATS.Client.JetStream/NatsJSException.cs | 6 ++ .../NatsJSJsonSerializer.cs | 37 +++++----- .../NatsMsgTests.cs | 51 +++++++++++++ 6 files changed, 158 insertions(+), 72 deletions(-) delete mode 100644 src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs create mode 100644 src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs diff --git a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs deleted file mode 100644 index be22a165b..000000000 --- a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Buffers; -using System.Text.Json; -using NATS.Client.Core; -using NATS.Client.JetStream.Models; - -namespace NATS.Client.JetStream.Internal; - -internal sealed class NatsJSErrorAwareJsonSerializer : INatsDeserialize -{ - public static readonly NatsJSErrorAwareJsonSerializer Default = new(); - - public T? Deserialize(in ReadOnlySequence buffer) - { - if (buffer.Length == 0) - { - return default; - } - - // We need to determine what type we're deserializing into - // .NET 6 new APIs to the rescue: we can read the buffer once - // by deserializing into a document, inspect and using the new - // API deserialize to the final type from the document. - var jsonDocument = JsonDocument.Parse(buffer); - if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) - { - var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); - throw new NatsJSApiErrorException(error); - } - - return NatsJSJsonSerializer.Default.Deserialize(buffer); - } -} - -internal class NatsJSApiErrorException : NatsException -{ - public NatsJSApiErrorException(ApiError error) - : base("JetStream API error") - => Error = error; - - public ApiError Error { get; } -} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs new file mode 100644 index 000000000..5eee1ab2b --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/NatsJSForcedJsonDocumentSerializer.cs @@ -0,0 +1,21 @@ +using System.Buffers; +using System.Text.Json; +using NATS.Client.Core; + +namespace NATS.Client.JetStream.Internal; + +internal sealed class NatsJSForcedJsonDocumentSerializer : INatsDeserialize +{ + public static readonly NatsJSForcedJsonDocumentSerializer Default = new(); + + public T? Deserialize(in ReadOnlySequence buffer) + { + if (buffer.Length == 0) + { + return default; + } + + // Force return JsonDocument instead of T + return (T)(object)JsonDocument.Parse(buffer); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 8076dccec..a8e49e99c 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using System.Text.Json; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream.Internal; @@ -285,6 +286,25 @@ internal async ValueTask> JSRequestAsync(subject, request, cancellationToken).ConfigureAwait(false); + if (exception != null) + { + throw exception; + } + + if (response != null) + return response.Value; + + throw new Exception("State error: No response received"); + } + + internal async ValueTask<(NatsJSResponse?, Exception?)> TryJSRequestAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class { if (request != null) { @@ -292,42 +312,62 @@ internal async ValueTask> JSRequestAsync( + await using var sub = await Connection.CreateRequestSubAsync( subject: subject, data: request, headers: default, replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, requestSerializer: NatsJSJsonSerializer.Default, - replySerializer: NatsJSErrorAwareJsonSerializer.Default, + replySerializer: NatsJSForcedJsonDocumentSerializer.Default, cancellationToken: cancellationToken) .ConfigureAwait(false); await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { - if (msg.Error is { } error) + // We need to determine what type we're deserializing into + // .NET 6 new APIs to the rescue: we can read the buffer once + // by deserializing into a document, inspect and using the new + // API deserialize to the final type from the document. + if (msg.Data == null) { - if (error.InnerException is NatsJSApiErrorException jsError) - { - return new NatsJSResponse(default, jsError.Error); - } + return (default, new NatsJSException("No response data received")); + } + + var jsonDocument = msg.Data; - throw error; + if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) + { + var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); + return (new NatsJSResponse(default, error), default); } - if (msg.Data == null) + var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse)); + if (jsonTypeInfo == null) { - throw new NatsJSException("No response data received"); + return (default, new NatsJSException($"Unknown response type {typeof(TResponse)}")); } - return new NatsJSResponse(msg.Data, default); + var response = (TResponse?)errorElement.Deserialize(jsonTypeInfo); + + if (msg.Error is { } messageError) + { + return (default, messageError); + } + + return (new NatsJSResponse(response, default), default); } if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb) { - throw sb.Exception; + return (default, sb.Exception); + } + + if (sub.EndReason != NatsSubEndReason.None) + { + return (default, new NatsJSApiNoResponseException(sub.EndReason)); } - throw new NatsJSApiNoResponseException(); + return (default, new NatsJSApiNoResponseException()); } private static void ConvertDomain(StreamSource streamSource) @@ -352,4 +392,12 @@ private static void ThrowInvalidStreamNameException(string? paramName) => [DoesNotReturn] private static void ThrowEmptyException(string? paramName) => throw new ArgumentException("The value cannot be an empty string.", paramName); + // + // [DoesNotReturn] + // private static void ThrowNoResponseDataReceived() => + // throw new NatsJSException("No response data received"); + // + // [DoesNotReturn] + // private static void ThrowUnknownResponseType() => + // throw new NatsJSException($"Unknown response type {typeof(TResponse)}"); } diff --git a/src/NATS.Client.JetStream/NatsJSException.cs b/src/NATS.Client.JetStream/NatsJSException.cs index 4f621a86e..14795338c 100644 --- a/src/NATS.Client.JetStream/NatsJSException.cs +++ b/src/NATS.Client.JetStream/NatsJSException.cs @@ -106,6 +106,12 @@ public NatsJSApiNoResponseException() : base("No API response received from the server") { } + + public NatsJSApiNoResponseException(NatsSubEndReason reason) + : base($"No API response received from the server: {reason}") => + Reason = reason; + + public NatsSubEndReason Reason { get; } } public class NatsJSTimeoutException : NatsJSException diff --git a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs index 375be743f..f6b84a0d7 100644 --- a/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs +++ b/src/NATS.Client.JetStream/NatsJSJsonSerializer.cs @@ -7,24 +7,7 @@ namespace NATS.Client.JetStream; public static class NatsJSJsonSerializer { -#if NET6_0 - public static readonly INatsSerializer Default = new NatsJsonContextSerializer(NatsJSJsonSerializerContext.Default); -#else - public static readonly INatsSerializer Default = new NatsJsonContextSerializer(new NatsJSJsonSerializerContext(new JsonSerializerOptions - { - Converters = - { - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), - }, - })); -#endif + public static readonly INatsSerializer Default = new NatsJsonContextSerializer(NatsJSJsonSerializerContext.DefaultContext); } [JsonSerializable(typeof(AccountInfoResponse))] @@ -104,6 +87,24 @@ public static class NatsJSJsonSerializer [JsonSerializable(typeof(Tier))] internal partial class NatsJSJsonSerializerContext : JsonSerializerContext { +#if NET6_0 + internal static readonly NatsJSJsonSerializerContext DefaultContext = NatsJSJsonSerializerContext.Default; +#else + internal static readonly NatsJSJsonSerializerContext DefaultContext = new NatsJSJsonSerializerContext(new JsonSerializerOptions + { + Converters = + { + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower), + }, + }); +#endif } #if NET6_0 diff --git a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs index e482d1c03..d5dab6241 100644 --- a/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs +++ b/tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs @@ -22,6 +22,57 @@ public void Size_and_flags(int size, NatsMsgFlags flags, bool isEmpty, bool hasN [Fact] public void Check_struct_size() { + // Using https://github.com/SergeyTeplyakov/ObjectLayoutInspector + /* Size: 48 bytes. Paddings: 4 bytes (%8 of empty space) + |==============================================================| + | 0-7: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 8-15: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 16-23: NatsHeaders k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 24-31: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 32-39: INatsConnection k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 40-43: UInt32 _flagsAndSize (4 bytes) | + |--------------------------------------------------------------| + | 44-47: padding (4 bytes) | + |==============================================================| */ Assert.Equal(48, Unsafe.SizeOf>()); + + /* Size: 40 bytes. Paddings: 0 bytes (%0 of empty space) + |==============================================================| + | 0-7: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 8-15: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 16-23: NatsHeaders k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 24-31: INatsConnection k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 32-35: UInt32 _flagsAndSize (4 bytes) | + |--------------------------------------------------------------| + | 36-39: Int32 k__BackingField (4 bytes) | + |==============================================================| */ + Assert.Equal(40, Unsafe.SizeOf>()); + + /* Size: 40 bytes. Paddings: 3 bytes (%7 of empty space) + |==============================================================| + | 0-7: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 8-15: String k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 16-23: NatsHeaders k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 24-31: INatsConnection k__BackingField (8 bytes) | + |--------------------------------------------------------------| + | 32-35: UInt32 _flagsAndSize (4 bytes) | + |--------------------------------------------------------------| + | 36: Byte k__BackingField (1 byte) | + |--------------------------------------------------------------| + | 37-39: padding (3 bytes) | + |==============================================================| */ + Assert.Equal(40, Unsafe.SizeOf>()); } }