Skip to content

Commit

Permalink
Add JetStream metadata to msg parsed from reply string (#139)
Browse files Browse the repository at this point in the history
* Add seq and datetime to msg parsed from reply string

* Map NatsJSMsg Seq, DateTime to the underlying message

* Correct formatting

* Implement Msg DateTime and Sequence only in JetStream

* Parse reply based on official JSAck ADR

---------

Co-authored-by: Simon Hoss <[email protected]>
  • Loading branch information
simonhoss and simonhoss-oag authored Sep 30, 2023
1 parent c440282 commit fabec64
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 4 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ generated_code = true

[*.cs]
indent_size = 4
max_line_length = 300

# changes from VS2019 defaults
csharp_style_namespace_declarations = file_scoped
Expand Down
74 changes: 74 additions & 0 deletions src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
namespace NATS.Client.JetStream.Internal;

internal static class ReplyToDateTimeAndSeq
{
private const int V1TokenCounts = 9;
private const int V2TokenCounts = 12;

private const int AckDomainTokenPos = 2;
private const int AckAccHashTokenPos = 3;
private const int AckStreamTokenPos = 4;
private const int AckConsumerTokenPos = 5;
private const int AckNumDeliveredTokenPos = 6;
private const int AckStreamSeqTokenPos = 7;
private const int AckConsumerSeqTokenPos = 8;
private const int AckTimestampSeqTokenPos = 9;
private const int AckNumPendingTokenPos = 10;

internal static NatsJSMsgMetadata? Parse(string? reply)
{
if (reply == null)
{
return null;
}

var originalTokens = reply.Split(".").AsSpan();
var tokensLength = originalTokens.Length;

// Parsed based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-15.md#jsack

// If lower than 9 or more than 9 but less than 11, report an error
if (tokensLength is < V1TokenCounts or > V1TokenCounts and < V2TokenCounts - 1)
{
return null;
}

if (originalTokens[0] != "$JS" || originalTokens[1] != "ACK")
{
return null;
}

var tokens = new string[V2TokenCounts].AsSpan();

if (tokensLength == V1TokenCounts)
{
originalTokens[..2].CopyTo(tokens);
originalTokens[2..].CopyTo(tokens[4..]);

tokens[AckDomainTokenPos] = string.Empty;
tokens[AckAccHashTokenPos] = string.Empty;
}
else
{
tokens = originalTokens;

if (tokens[AckDomainTokenPos] == "_")
{
tokens[AckDomainTokenPos] = string.Empty;
}
}

var timestamp = long.Parse(tokens[AckTimestampSeqTokenPos]);
var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000);
var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero);

return new NatsJSMsgMetadata(
new NatsJSSequencePair(ulong.Parse(tokens[AckStreamSeqTokenPos]), ulong.Parse(tokens[AckConsumerSeqTokenPos])),
ulong.Parse(tokens[AckNumDeliveredTokenPos]),
ulong.Parse(tokens[AckNumPendingTokenPos]),
dateTime,
tokens[AckStreamTokenPos],
tokens[AckConsumerTokenPos],
tokens[AckDomainTokenPos]);
}
}
12 changes: 8 additions & 4 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ public readonly struct NatsJSMsg<T>
{
private readonly NatsJSContext _context;
private readonly NatsMsg<T> _msg;
private readonly Lazy<NatsJSMsgMetadata?> _replyToDateTimeAndSeq;

internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
{
_msg = msg;
_context = context;
_replyToDateTimeAndSeq = new Lazy<NatsJSMsgMetadata?>(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo));
}

/// <summary>
Expand Down Expand Up @@ -50,6 +52,11 @@ internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
/// </summary>
public INatsConnection? Connection => _msg.Connection;

/// <summary>
/// Additional metadata about the message.
/// </summary>
public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value;

/// <summary>
/// Acknowledges the message was completely handled.
/// </summary>
Expand Down Expand Up @@ -102,10 +109,7 @@ private ValueTask SendAckAsync(ReadOnlySequence<byte> payload, AckOpts opts = de

return _msg.ReplyAsync(
payload: payload,
opts: new NatsPubOpts
{
WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,
},
opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,},

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

cancellationToken: cancellationToken);
}
}
Expand Down
38 changes: 38 additions & 0 deletions src/NATS.Client.JetStream/NatsJSMsgMetadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NATS.Client.JetStream;

/// <summary>
/// Additional metadata about the message.
/// </summary>
/// <param name="Sequence">
/// The sequence pair for the message.
/// </param>
/// <param name="NumDelivered">
/// The number of times the message was delivered.
/// </param>
/// <param name="NumPending">
/// The number of messages pending for the consumer.
/// </param>
/// <param name="Timestamp">
/// The timestamp of the message.
/// </param>
/// <param name="Stream">
/// The stream the message was sent to.
/// </param>
/// <param name="Consumer">
/// The consumer the message was sent to.
/// </param>
/// <param name="Domain">
/// The domain the message was sent to.
/// </param>
public readonly record struct NatsJSMsgMetadata(NatsJSSequencePair Sequence, ulong NumDelivered, ulong NumPending, DateTimeOffset Timestamp, string Stream, string Consumer, string Domain);

/// <summary>
/// The sequence pair for the message.
/// </summary>
/// <param name="Stream">
/// The stream sequence number.
/// </param>
/// <param name="Consumer">
/// The consumer sequence number.
/// </param>
public readonly record struct NatsJSSequencePair(ulong Stream, ulong Consumer);
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using NATS.Client.JetStream.Internal;

namespace NATS.Client.JetStream.Tests.Internal;

public class ReplyToDateTimeAndSeqTest
{
[Fact]
public void ShouldParseV1ReplyToDateTimeAndSeq()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0");

natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00");
natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100);
natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1);
natsJSMsgMetadata.Value.NumDelivered.Should().Be(1);
natsJSMsgMetadata.Value.NumPending.Should().Be(0);
natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest");
natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0");
natsJSMsgMetadata.Value.Domain.Should().BeEmpty();
}

[Fact]
public void ShouldParseV2ReplyToDateTimeAndSeq()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.MyDomain.1234.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0");

natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00");
natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100);
natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1);
natsJSMsgMetadata.Value.NumDelivered.Should().Be(1);
natsJSMsgMetadata.Value.NumPending.Should().Be(0);
natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest");
natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0");
natsJSMsgMetadata.Value.Domain.Should().Be("MyDomain");
}

[Fact]
public void ShouldSetNullForReturnWhenReplyToIsNull()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse(null);

natsJSMsgMetadata.Should().BeNull();
}

[Fact]
public void ShouldSetNullWhenReplyToIsASimpleReqResponse()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("_INBOX.1");

natsJSMsgMetadata.Should().BeNull();
}

[Fact]
public void ShouldSetNullWhenDoesNotStartWithJsAck()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("1.2.3.4.5.6.7.8.9");

natsJSMsgMetadata.Should().BeNull();
}
}

0 comments on commit fabec64

Please sign in to comment.