Skip to content

Commit

Permalink
Fix JetStream API serialization with typed results
Browse files Browse the repository at this point in the history
Replace JsonDocument-based responses with strongly-typed `NatsJSApiResult<T>`
for improved type safety and error handling. Added new deserialization logic
and tests to cover valid responses, errors, and edge cases like empty buffers.
  • Loading branch information
mtmk committed Jan 13, 2025
1 parent e100805 commit 800b793
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 27 deletions.
59 changes: 59 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSApiResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Runtime.CompilerServices;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Internal;

internal readonly struct NatsJSApiResult<T>
{
private readonly T? _value;
private readonly ApiError? _error;
private readonly Exception? _exception;

public NatsJSApiResult(T value)
{
_value = value;
_error = null;
_exception = null;
}

public NatsJSApiResult(ApiError error)
{
_value = default;
_error = error;
_exception = null;
}

public NatsJSApiResult(Exception exception)
{
_value = default;
_error = null;
_exception = exception;
}

public T Value => _value ?? ThrowValueIsNotSetException();

public ApiError Error => _error ?? ThrowErrorIsNotSetException();

public Exception Exception => _exception ?? ThrowExceptionIsNotSetException();

public bool Success => _error == null && _exception == null;

public bool HasError => _error != null;

public bool HasException => _exception != null;

public static implicit operator NatsJSApiResult<T>(T value) => new(value);

public static implicit operator NatsJSApiResult<T>(ApiError error) => new(error);

public static implicit operator NatsJSApiResult<T>(Exception exception) => new(exception);

private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set");

private static ApiError ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");

private static Exception ThrowExceptionIsNotSetException() => throw CreateInvalidOperationException("Result exception is not set");

[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message);
}
35 changes: 32 additions & 3 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,38 @@

namespace NATS.Client.JetStream.Internal;

internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize<JsonDocument>
internal sealed class NatsJSJsonDocumentSerializer<T> : INatsDeserialize<NatsJSApiResult<T>>
{
public static readonly NatsJSJsonDocumentSerializer Default = new();
public static readonly NatsJSJsonDocumentSerializer<T> Default = new();

public JsonDocument? Deserialize(in ReadOnlySequence<byte> buffer) => buffer.Length == 0 ? default : JsonDocument.Parse(buffer);
public NatsJSApiResult<T> Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return new NatsJSException("Buffer is empty");
}

using var jsonDocument = JsonDocument.Parse(buffer);

if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return error;
}

var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(T));
if (jsonTypeInfo == null)
{
return new NatsJSException($"Unknown response type {typeof(T)}");
}

var result = (T?)jsonDocument.RootElement.Deserialize(jsonTypeInfo);

if (result == null)
{
return new NatsJSException("Null result");
}

return result;
}
}
33 changes: 9 additions & 24 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,13 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
// Validator.ValidateObject(request, new ValidationContext(request));
}

await using var sub = await Connection.CreateRequestSubAsync<TRequest, JsonDocument>(
await using var sub = await Connection.CreateRequestSubAsync<TRequest, NatsJSApiResult<TResponse>>(
subject: subject,
data: request,
headers: default,
replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSJsonDocumentSerializer.Default,
replySerializer: NatsJSJsonDocumentSerializer<TResponse>.Default,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

Expand All @@ -326,37 +326,22 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
return new NatsNoRespondersException();
}

if (msg.Data == null)
{
return new NatsJSException("No response data received");
}

// We need to determine what type we're deserializing into
// .NET 6 new APIs to the rescue: we can read the buffer once
// by deserializing into a document, inspect and using the new
// API deserialize to the final type from the document.
using var jsonDocument = msg.Data;

if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
if (msg.Error is { } messageError)
{
var error = errorElement.Deserialize(JetStream.NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
return new NatsJSResponse<TResponse>(default, error);
return messageError;
}

var jsonTypeInfo = NatsJSJsonSerializerContext.DefaultContext.GetTypeInfo(typeof(TResponse));
if (jsonTypeInfo == null)
if (msg.Data.HasException)
{
return new NatsJSException($"Unknown response type {typeof(TResponse)}");
return msg.Data.Exception;
}

var response = (TResponse?)jsonDocument.RootElement.Deserialize(jsonTypeInfo);

if (msg.Error is { } messageError)
if (msg.Data.HasError)
{
return messageError;
return new NatsJSResponse<TResponse>(null, msg.Data.Error);
}

return new NatsJSResponse<TResponse>(response, default);
return new NatsJSResponse<TResponse>(msg.Data.Value, null);
}

if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb)
Expand Down
27 changes: 27 additions & 0 deletions tests/NATS.Client.JetStream.Tests/JetStreamApiSerializerTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Buffers;
using System.Text;
using NATS.Client.Core2.Tests;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;
using JsonSerializer = System.Text.Json.JsonSerializer;

Expand Down Expand Up @@ -98,4 +101,28 @@ public async Task Should_respect_buffers_lifecycle()
{
}
}

[Fact]
public void Deserialize_value()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("""{"memory":1}""")));
result.Value.Memory.Should().Be(1);
}

[Fact]
public void Deserialize_empty_buffer()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(ReadOnlySequence<byte>.Empty);
result.Exception.Message.Should().Be("Buffer is empty");
}

[Fact]
public void Deserialize_error()
{
var serializer = NatsJSJsonDocumentSerializer<AccountInfoResponse>.Default;
var result = serializer.Deserialize(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes("""{"error":{"code":2}}""")));
result.Error.Code.Should().Be(2);
}
}

0 comments on commit 800b793

Please sign in to comment.