Skip to content

Commit

Permalink
JetStream TryJSRequestAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Oct 12, 2024
1 parent 93c3ab7 commit a4517e0
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 72 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System.Buffers;
using System.Text.Json;
using NATS.Client.Core;

namespace NATS.Client.JetStream.Internal;

internal sealed class NatsJSForcedJsonDocumentSerializer<T> : INatsDeserialize<T>
{
public static readonly NatsJSForcedJsonDocumentSerializer<T> Default = new();

public T? Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return default;
}

// Force return JsonDocument instead of T
return (T)(object)JsonDocument.Parse(buffer);
}
}
74 changes: 61 additions & 13 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream.Internal;
Expand Down Expand Up @@ -285,49 +286,88 @@ internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TRe
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var (response, exception) = await TryJSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken).ConfigureAwait(false);
if (exception != null)
{
throw exception;
}

if (response != null)
return response.Value;

throw new Exception("State error: No response received");
}

internal async ValueTask<(NatsJSResponse<TResponse>?, Exception?)> TryJSRequestAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
if (request != null)
{
// TODO: Can't validate using JSON serializer context at the moment.
// Validator.ValidateObject(request, new ValidationContext(request));
}

await using var sub = await Connection.CreateRequestSubAsync<TRequest, TResponse>(
await using var sub = await Connection.CreateRequestSubAsync<TRequest, JsonDocument>(
subject: subject,
data: request,
headers: default,
replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSErrorAwareJsonSerializer<TResponse>.Default,
replySerializer: NatsJSForcedJsonDocumentSerializer<JsonDocument>.Default,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
if (msg.Error is { } error)
// We need to determine what type we're deserializing into
// .NET 6 new APIs to the rescue: we can read the buffer once
// by deserializing into a document, inspect and using the new
// API deserialize to the final type from the document.
if (msg.Data == null)
{
if (error.InnerException is NatsJSApiErrorException jsError)
{
return new NatsJSResponse<TResponse>(default, jsError.Error);
}
return (default, new NatsJSException("No response data received"));
}

var jsonDocument = msg.Data;

throw error;
if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return (new NatsJSResponse<TResponse>(default, error), default);
}

if (msg.Data == null)
var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse));
if (jsonTypeInfo == null)
{
throw new NatsJSException("No response data received");
return (default, new NatsJSException($"Unknown response type {typeof(TResponse)}"));
}

return new NatsJSResponse<TResponse>(msg.Data, default);
var response = (TResponse?)errorElement.Deserialize(jsonTypeInfo);

if (msg.Error is { } messageError)
{
return (default, messageError);
}

return (new NatsJSResponse<TResponse>(response, default), default);
}

if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb)
{
throw sb.Exception;
return (default, sb.Exception);
}

if (sub.EndReason != NatsSubEndReason.None)
{
return (default, new NatsJSApiNoResponseException(sub.EndReason));
}

throw new NatsJSApiNoResponseException();
return (default, new NatsJSApiNoResponseException());
}

private static void ConvertDomain(StreamSource streamSource)
Expand All @@ -352,4 +392,12 @@ private static void ThrowInvalidStreamNameException(string? paramName) =>
[DoesNotReturn]
private static void ThrowEmptyException(string? paramName) =>
throw new ArgumentException("The value cannot be an empty string.", paramName);
//

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Single-line comment should be preceded by blank line

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Comments should contain text

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Single-line comment should be preceded by blank line

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Comments should contain text

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Single-line comment should be preceded by blank line

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Comments should contain text

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Single-line comment should be preceded by blank line

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / check

Comments should contain text

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Check warning on line 395 in src/NATS.Client.JetStream/NatsJSContext.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

// [DoesNotReturn]
// private static void ThrowNoResponseDataReceived() =>
// throw new NatsJSException("No response data received");
//
// [DoesNotReturn]
// private static void ThrowUnknownResponseType<TResponse>() =>
// throw new NatsJSException($"Unknown response type {typeof(TResponse)}");
}
6 changes: 6 additions & 0 deletions src/NATS.Client.JetStream/NatsJSException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public NatsJSApiNoResponseException()
: base("No API response received from the server")
{
}

public NatsJSApiNoResponseException(NatsSubEndReason reason)
: base($"No API response received from the server: {reason}") =>
Reason = reason;

public NatsSubEndReason Reason { get; }
}

public class NatsJSTimeoutException : NatsJSException
Expand Down
37 changes: 19 additions & 18 deletions src/NATS.Client.JetStream/NatsJSJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,7 @@ namespace NATS.Client.JetStream;

public static class NatsJSJsonSerializer<T>
{
#if NET6_0
public static readonly INatsSerializer<T> Default = new NatsJsonContextSerializer<T>(NatsJSJsonSerializerContext.Default);
#else
public static readonly INatsSerializer<T> Default = new NatsJsonContextSerializer<T>(new NatsJSJsonSerializerContext(new JsonSerializerOptions
{
Converters =
{
new JsonStringEnumConverter<ConsumerConfigDeliverPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerConfigAckPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerConfigReplayPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigCompression>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigDiscard>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigRetention>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigStorage>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerCreateAction>(JsonNamingPolicy.SnakeCaseLower),
},
}));
#endif
public static readonly INatsSerializer<T> Default = new NatsJsonContextSerializer<T>(NatsJSJsonSerializerContext.DefaultContext);
}

[JsonSerializable(typeof(AccountInfoResponse))]
Expand Down Expand Up @@ -104,6 +87,24 @@ public static class NatsJSJsonSerializer<T>
[JsonSerializable(typeof(Tier))]
internal partial class NatsJSJsonSerializerContext : JsonSerializerContext
{
#if NET6_0
internal static readonly NatsJSJsonSerializerContext DefaultContext = NatsJSJsonSerializerContext.Default;

Check warning on line 91 in src/NATS.Client.JetStream/NatsJSJsonSerializer.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

Possible null reference assignment.

Check warning on line 91 in src/NATS.Client.JetStream/NatsJSJsonSerializer.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

Possible null reference assignment.

Check warning on line 91 in src/NATS.Client.JetStream/NatsJSJsonSerializer.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Possible null reference assignment.

Check warning on line 91 in src/NATS.Client.JetStream/NatsJSJsonSerializer.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

Possible null reference assignment.

Check warning on line 91 in src/NATS.Client.JetStream/NatsJSJsonSerializer.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

Possible null reference assignment.

Check warning on line 91 in src/NATS.Client.JetStream/NatsJSJsonSerializer.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

Possible null reference assignment.

Check warning on line 91 in src/NATS.Client.JetStream/NatsJSJsonSerializer.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

Possible null reference assignment.
#else
internal static readonly NatsJSJsonSerializerContext DefaultContext = new NatsJSJsonSerializerContext(new JsonSerializerOptions
{
Converters =
{
new JsonStringEnumConverter<ConsumerConfigDeliverPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerConfigAckPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerConfigReplayPolicy>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigCompression>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigDiscard>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigRetention>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigStorage>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerCreateAction>(JsonNamingPolicy.SnakeCaseLower),
},
});
#endif
}

#if NET6_0
Expand Down
51 changes: 51 additions & 0 deletions tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,57 @@ public void Size_and_flags(int size, NatsMsgFlags flags, bool isEmpty, bool hasN
[Fact]
public void Check_struct_size()
{
// Using https://github.com/SergeyTeplyakov/ObjectLayoutInspector
/* Size: 48 bytes. Paddings: 4 bytes (%8 of empty space)
|==============================================================|
| 0-7: String <Subject>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 8-15: String <ReplyTo>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 16-23: NatsHeaders <Headers>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 24-31: String <Data>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 32-39: INatsConnection <Connection>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 40-43: UInt32 _flagsAndSize (4 bytes) |
|--------------------------------------------------------------|
| 44-47: padding (4 bytes) |
|==============================================================| */
Assert.Equal(48, Unsafe.SizeOf<NatsMsg<string>>());

/* Size: 40 bytes. Paddings: 0 bytes (%0 of empty space)
|==============================================================|
| 0-7: String <Subject>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 8-15: String <ReplyTo>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 16-23: NatsHeaders <Headers>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 24-31: INatsConnection <Connection>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 32-35: UInt32 _flagsAndSize (4 bytes) |
|--------------------------------------------------------------|
| 36-39: Int32 <Data>k__BackingField (4 bytes) |
|==============================================================| */
Assert.Equal(40, Unsafe.SizeOf<NatsMsg<int>>());

/* Size: 40 bytes. Paddings: 3 bytes (%7 of empty space)
|==============================================================|
| 0-7: String <Subject>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 8-15: String <ReplyTo>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 16-23: NatsHeaders <Headers>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 24-31: INatsConnection <Connection>k__BackingField (8 bytes) |
|--------------------------------------------------------------|
| 32-35: UInt32 _flagsAndSize (4 bytes) |
|--------------------------------------------------------------|
| 36: Byte <Data>k__BackingField (1 byte) |
|--------------------------------------------------------------|
| 37-39: padding (3 bytes) |
|==============================================================| */
Assert.Equal(40, Unsafe.SizeOf<NatsMsg<byte>>());
}
}

0 comments on commit a4517e0

Please sign in to comment.