Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JetStream metadata to msg parsed from reply string #139

Merged
merged 5 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ generated_code = true

[*.cs]
indent_size = 4
max_line_length = 300

# changes from VS2019 defaults
csharp_style_namespace_declarations = file_scoped
Expand Down
74 changes: 74 additions & 0 deletions src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
namespace NATS.Client.JetStream.Internal;

internal static class ReplyToDateTimeAndSeq
{
private const int V1TokenCounts = 9;
private const int V2TokenCounts = 12;

private const int AckDomainTokenPos = 2;
private const int AckAccHashTokenPos = 3;
private const int AckStreamTokenPos = 4;
private const int AckConsumerTokenPos = 5;
private const int AckNumDeliveredTokenPos = 6;
private const int AckStreamSeqTokenPos = 7;
private const int AckConsumerSeqTokenPos = 8;
private const int AckTimestampSeqTokenPos = 9;
private const int AckNumPendingTokenPos = 10;

internal static NatsJSMsgMetadata? Parse(string? reply)
{
if (reply == null)
{
return null;
}

var originalTokens = reply.Split(".").AsSpan();
var tokensLength = originalTokens.Length;

// Parsed based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-15.md#jsack

// If lower than 9 or more than 9 but less than 11, report an error
if (tokensLength is < V1TokenCounts or > V1TokenCounts and < V2TokenCounts - 1)
{
return null;
}

if (originalTokens[0] != "$JS" || originalTokens[1] != "ACK")
{
return null;
}

var tokens = new string[V2TokenCounts].AsSpan();

if (tokensLength == V1TokenCounts)
{
originalTokens[..2].CopyTo(tokens);
originalTokens[2..].CopyTo(tokens[4..]);

tokens[AckDomainTokenPos] = string.Empty;
tokens[AckAccHashTokenPos] = string.Empty;
}
else
{
tokens = originalTokens;

if (tokens[AckDomainTokenPos] == "_")
{
tokens[AckDomainTokenPos] = string.Empty;
}
}

var timestamp = long.Parse(tokens[AckTimestampSeqTokenPos]);
var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000);
var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero);

return new NatsJSMsgMetadata(
new NatsJSSequencePair(ulong.Parse(tokens[AckStreamSeqTokenPos]), ulong.Parse(tokens[AckConsumerSeqTokenPos])),
ulong.Parse(tokens[AckNumDeliveredTokenPos]),
ulong.Parse(tokens[AckNumPendingTokenPos]),
dateTime,
tokens[AckStreamTokenPos],
tokens[AckConsumerTokenPos],
tokens[AckDomainTokenPos]);
}
}
12 changes: 8 additions & 4 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
{
private readonly NatsJSContext _context;
private readonly NatsMsg<T> _msg;
private readonly Lazy<NatsJSMsgMetadata?> _replyToDateTimeAndSeq;

internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
{
_msg = msg;
_context = context;
_replyToDateTimeAndSeq = new Lazy<NatsJSMsgMetadata?>(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo));
}

/// <summary>
Expand Down Expand Up @@ -50,6 +52,11 @@
/// </summary>
public INatsConnection? Connection => _msg.Connection;

/// <summary>
/// Additional metadata about the message.
/// </summary>
public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value;

/// <summary>
/// Acknowledges the message was completely handled.
/// </summary>
Expand Down Expand Up @@ -102,10 +109,7 @@

return _msg.ReplyAsync(
payload: payload,
opts: new NatsPubOpts
{
WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,
},
opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,},

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 112 in src/NATS.Client.JetStream/NatsJSMsg.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

cancellationToken: cancellationToken);
}
}
Expand Down
38 changes: 38 additions & 0 deletions src/NATS.Client.JetStream/NatsJSMsgMetadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NATS.Client.JetStream;

/// <summary>
/// Additional metadata about the message.
/// </summary>
/// <param name="Sequence">
/// The sequence pair for the message.
/// </param>
/// <param name="NumDelivered">
/// The number of times the message was delivered.
/// </param>
/// <param name="NumPending">
/// The number of messages pending for the consumer.
/// </param>
/// <param name="Timestamp">
/// The timestamp of the message.
/// </param>
/// <param name="Stream">
/// The stream the message was sent to.
/// </param>
/// <param name="Consumer">
/// The consumer the message was sent to.
/// </param>
/// <param name="Domain">
/// The domain the message was sent to.
/// </param>
public readonly record struct NatsJSMsgMetadata(NatsJSSequencePair Sequence, ulong NumDelivered, ulong NumPending, DateTimeOffset Timestamp, string Stream, string Consumer, string Domain);

/// <summary>
/// The sequence pair for the message.
/// </summary>
/// <param name="Stream">
/// The stream sequence number.
/// </param>
/// <param name="Consumer">
/// The consumer sequence number.
/// </param>
public readonly record struct NatsJSSequencePair(ulong Stream, ulong Consumer);
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using NATS.Client.JetStream.Internal;

namespace NATS.Client.JetStream.Tests.Internal;

public class ReplyToDateTimeAndSeqTest
{
[Fact]
public void ShouldParseV1ReplyToDateTimeAndSeq()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0");

natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00");
natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100);
natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1);
natsJSMsgMetadata.Value.NumDelivered.Should().Be(1);
natsJSMsgMetadata.Value.NumPending.Should().Be(0);
natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest");
natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0");
natsJSMsgMetadata.Value.Domain.Should().BeEmpty();
}

[Fact]
public void ShouldParseV2ReplyToDateTimeAndSeq()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.MyDomain.1234.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0");

natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00");
natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100);
natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1);
natsJSMsgMetadata.Value.NumDelivered.Should().Be(1);
natsJSMsgMetadata.Value.NumPending.Should().Be(0);
natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest");
natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0");
natsJSMsgMetadata.Value.Domain.Should().Be("MyDomain");
}

[Fact]
public void ShouldSetNullForReturnWhenReplyToIsNull()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse(null);

natsJSMsgMetadata.Should().BeNull();
}

[Fact]
public void ShouldSetNullWhenReplyToIsASimpleReqResponse()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("_INBOX.1");

natsJSMsgMetadata.Should().BeNull();
}

[Fact]
public void ShouldSetNullWhenDoesNotStartWithJsAck()
{
var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("1.2.3.4.5.6.7.8.9");

natsJSMsgMetadata.Should().BeNull();
}
}
Loading