diff --git a/.editorconfig b/.editorconfig index a41b130ef..408cdcc41 100644 --- a/.editorconfig +++ b/.editorconfig @@ -13,6 +13,7 @@ generated_code = true [*.cs] indent_size = 4 +max_line_length = 300 # changes from VS2019 defaults csharp_style_namespace_declarations = file_scoped diff --git a/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs new file mode 100644 index 000000000..90e284c63 --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs @@ -0,0 +1,74 @@ +namespace NATS.Client.JetStream.Internal; + +internal static class ReplyToDateTimeAndSeq +{ + private const int V1TokenCounts = 9; + private const int V2TokenCounts = 12; + + private const int AckDomainTokenPos = 2; + private const int AckAccHashTokenPos = 3; + private const int AckStreamTokenPos = 4; + private const int AckConsumerTokenPos = 5; + private const int AckNumDeliveredTokenPos = 6; + private const int AckStreamSeqTokenPos = 7; + private const int AckConsumerSeqTokenPos = 8; + private const int AckTimestampSeqTokenPos = 9; + private const int AckNumPendingTokenPos = 10; + + internal static NatsJSMsgMetadata? Parse(string? reply) + { + if (reply == null) + { + return null; + } + + var originalTokens = reply.Split(".").AsSpan(); + var tokensLength = originalTokens.Length; + + // Parsed based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-15.md#jsack + + // If lower than 9 or more than 9 but less than 11, report an error + if (tokensLength is < V1TokenCounts or > V1TokenCounts and < V2TokenCounts - 1) + { + return null; + } + + if (originalTokens[0] != "$JS" || originalTokens[1] != "ACK") + { + return null; + } + + var tokens = new string[V2TokenCounts].AsSpan(); + + if (tokensLength == V1TokenCounts) + { + originalTokens[..2].CopyTo(tokens); + originalTokens[2..].CopyTo(tokens[4..]); + + tokens[AckDomainTokenPos] = string.Empty; + tokens[AckAccHashTokenPos] = string.Empty; + } + else + { + tokens = originalTokens; + + if (tokens[AckDomainTokenPos] == "_") + { + tokens[AckDomainTokenPos] = string.Empty; + } + } + + var timestamp = long.Parse(tokens[AckTimestampSeqTokenPos]); + var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); + var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); + + return new NatsJSMsgMetadata( + new NatsJSSequencePair(ulong.Parse(tokens[AckStreamSeqTokenPos]), ulong.Parse(tokens[AckConsumerSeqTokenPos])), + ulong.Parse(tokens[AckNumDeliveredTokenPos]), + ulong.Parse(tokens[AckNumPendingTokenPos]), + dateTime, + tokens[AckStreamTokenPos], + tokens[AckConsumerTokenPos], + tokens[AckDomainTokenPos]); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index c0a4dfc54..ed4e49149 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -12,11 +12,13 @@ public readonly struct NatsJSMsg { private readonly NatsJSContext _context; private readonly NatsMsg _msg; + private readonly Lazy _replyToDateTimeAndSeq; internal NatsJSMsg(NatsMsg msg, NatsJSContext context) { _msg = msg; _context = context; + _replyToDateTimeAndSeq = new Lazy(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); } /// @@ -50,6 +52,11 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// public INatsConnection? Connection => _msg.Connection; + /// + /// Additional metadata about the message. + /// + public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value; + /// /// Acknowledges the message was completely handled. /// @@ -102,10 +109,7 @@ private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = de return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts - { - WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, - }, + opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, cancellationToken: cancellationToken); } } diff --git a/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs b/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs new file mode 100644 index 000000000..e31341a5d --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs @@ -0,0 +1,38 @@ +namespace NATS.Client.JetStream; + +/// +/// Additional metadata about the message. +/// +/// +/// The sequence pair for the message. +/// +/// +/// The number of times the message was delivered. +/// +/// +/// The number of messages pending for the consumer. +/// +/// +/// The timestamp of the message. +/// +/// +/// The stream the message was sent to. +/// +/// +/// The consumer the message was sent to. +/// +/// +/// The domain the message was sent to. +/// +public readonly record struct NatsJSMsgMetadata(NatsJSSequencePair Sequence, ulong NumDelivered, ulong NumPending, DateTimeOffset Timestamp, string Stream, string Consumer, string Domain); + +/// +/// The sequence pair for the message. +/// +/// +/// The stream sequence number. +/// +/// +/// The consumer sequence number. +/// +public readonly record struct NatsJSSequencePair(ulong Stream, ulong Consumer); diff --git a/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs new file mode 100644 index 000000000..bf1118e5c --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs @@ -0,0 +1,60 @@ +using NATS.Client.JetStream.Internal; + +namespace NATS.Client.JetStream.Tests.Internal; + +public class ReplyToDateTimeAndSeqTest +{ + [Fact] + public void ShouldParseV1ReplyToDateTimeAndSeq() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100); + natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1); + natsJSMsgMetadata.Value.NumDelivered.Should().Be(1); + natsJSMsgMetadata.Value.NumPending.Should().Be(0); + natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest"); + natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0"); + natsJSMsgMetadata.Value.Domain.Should().BeEmpty(); + } + + [Fact] + public void ShouldParseV2ReplyToDateTimeAndSeq() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.MyDomain.1234.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100); + natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1); + natsJSMsgMetadata.Value.NumDelivered.Should().Be(1); + natsJSMsgMetadata.Value.NumPending.Should().Be(0); + natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest"); + natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0"); + natsJSMsgMetadata.Value.Domain.Should().Be("MyDomain"); + } + + [Fact] + public void ShouldSetNullForReturnWhenReplyToIsNull() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse(null); + + natsJSMsgMetadata.Should().BeNull(); + } + + [Fact] + public void ShouldSetNullWhenReplyToIsASimpleReqResponse() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("_INBOX.1"); + + natsJSMsgMetadata.Should().BeNull(); + } + + [Fact] + public void ShouldSetNullWhenDoesNotStartWithJsAck() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("1.2.3.4.5.6.7.8.9"); + + natsJSMsgMetadata.Should().BeNull(); + } +}