From 92e25cee945116a63afe85927f1978bf3b188a5b Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Fri, 29 Sep 2023 23:50:27 +0200 Subject: [PATCH 1/5] Add seq and datetime to msg parsed from reply string --- .../Internal/ReplyToDateTimeAndSeq.cs | 15 ++++++++++ src/NATS.Client.Core/NatsMsg.cs | 29 +++++++++++++++++-- .../Internal/ReplyToDateTimeAndSeqTest.cs | 13 +++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs create mode 100644 tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs diff --git a/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs new file mode 100644 index 000000000..4734024b1 --- /dev/null +++ b/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs @@ -0,0 +1,15 @@ +namespace NATS.Client.Core.Internal; + +internal static class ReplyToDateTimeAndSeq +{ + internal static (DateTimeOffset DateTime, long Seq) Parse(string reply) + { + var ackSeperated = reply.Split("."); + var timestamp = long.Parse(ackSeperated[^2]); + var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); + var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); + var seq = long.Parse(ackSeperated[5]); + + return (dateTime, seq); + } +} diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 67edc2b61..374991f94 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; +using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -24,6 +25,8 @@ namespace NATS.Client.Core; public readonly record struct NatsMsg( string Subject, string? ReplyTo, + long? Seq, + DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, ReadOnlyMemory Data, @@ -50,12 +53,22 @@ internal static NatsMsg Build( headers.SetReadOnly(); } + long? seq = null; + DateTimeOffset? dateTime = null; + + if (replyTo != null) + { + var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); + seq = dateTimeAndSeq.Seq; + dateTime = dateTimeAndSeq.DateTime; + } + var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); + return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, payloadBuffer.ToArray(), connection); } /// @@ -129,6 +142,8 @@ private void CheckReplyPreconditions() public readonly record struct NatsMsg( string Subject, string? ReplyTo, + long? Seq, + DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, T? Data, @@ -163,12 +178,22 @@ internal static NatsMsg Build( headers.SetReadOnly(); } + long? seq = null; + DateTimeOffset? dateTime = null; + + if (replyTo != null) + { + var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); + seq = dateTimeAndSeq.Seq; + dateTime = dateTimeAndSeq.DateTime; + } + var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); + return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, data, connection); } /// diff --git a/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs new file mode 100644 index 000000000..dcc6c27f3 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs @@ -0,0 +1,13 @@ +namespace NATS.Client.Core.Tests.Internal; + +public class ReplyToDateTimeAndSeqTest +{ + [Fact] + public void ShouldParseReplyToDateTimeAndSeq() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + dateTime.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + seq.Should().Be(100); + } +} From 02ebb8c2bfae9bad73960d6f1c18ff0b34190723 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 00:09:57 +0200 Subject: [PATCH 2/5] Map NatsJSMsg Seq, DateTime to the underlying message --- src/NATS.Client.JetStream/NatsJSMsg.cs | 30 ++++++++++++++++++-------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index c0a4dfc54..cdb68a30a 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -45,6 +45,16 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// public T? Data => _msg.Data; + /// + /// The sequence number of the message + /// + public long? Seq => _msg.Seq; + + /// + /// The time of the message + /// + public DateTimeOffset? DateTime => _msg.DateTime; + /// /// The connection messages was delivered on. /// @@ -56,7 +66,8 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); + public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); /// /// Signals that the message will not be processed now and processing can move onto the next message. @@ -67,7 +78,8 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// /// Messages rejected using NACK will be resent by the NATS JetStream server after the configured timeout. /// - public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); + public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); /// /// Indicates that work is ongoing and the wait period should be extended. @@ -85,7 +97,8 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// by another amount of time equal to ack_wait by the NATS JetStream server. /// /// - public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); + public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); /// /// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed. @@ -93,19 +106,18 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); + public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => + SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); - private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, CancellationToken cancellationToken = default) + private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, + CancellationToken cancellationToken = default) { if (_msg == default) throw new NatsJSException("No user message, can't acknowledge"); return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts - { - WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, - }, + opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, cancellationToken: cancellationToken); } } From 3c5abccbc066e4fdc2889b4c58b27131ee664eba Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 00:12:33 +0200 Subject: [PATCH 3/5] Correct formatting --- src/NATS.Client.JetStream/NatsJSMsg.cs | 32 ++++++++++++-------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index cdb68a30a..03b735f24 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -46,19 +46,19 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) public T? Data => _msg.Data; /// - /// The sequence number of the message + /// The connection messages was delivered on. /// - public long? Seq => _msg.Seq; + public INatsConnection? Connection => _msg.Connection; /// - /// The time of the message + /// The sequence number of the message. /// - public DateTimeOffset? DateTime => _msg.DateTime; + public long? Sequence => _msg.Seq; /// - /// The connection messages was delivered on. + /// The time of the message. /// - public INatsConnection? Connection => _msg.Connection; + public DateTimeOffset? DateTime => _msg.DateTime; /// /// Acknowledges the message was completely handled. @@ -66,8 +66,7 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); + public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); /// /// Signals that the message will not be processed now and processing can move onto the next message. @@ -78,8 +77,7 @@ public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellation /// /// Messages rejected using NACK will be resent by the NATS JetStream server after the configured timeout. /// - public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); + public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); /// /// Indicates that work is ongoing and the wait period should be extended. @@ -97,8 +95,7 @@ public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellatio /// by another amount of time equal to ack_wait by the NATS JetStream server. /// /// - public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); + public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); /// /// Instructs the server to stop redelivery of the message without acknowledging it as successfully processed. @@ -106,18 +103,19 @@ public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken canc /// Ack options. /// A used to cancel the call. /// A representing the async call. - public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => - SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); + public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); - private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, - CancellationToken cancellationToken = default) + private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, CancellationToken cancellationToken = default) { if (_msg == default) throw new NatsJSException("No user message, can't acknowledge"); return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, + opts: new NatsPubOpts + { + WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, + }, cancellationToken: cancellationToken); } } From 76ff2bd408ee06a5764fd98a65d55dd07a61f822 Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 13:16:11 +0200 Subject: [PATCH 4/5] Implement Msg DateTime and Sequence only in JetStream --- .editorconfig | 1 + .../Internal/ReplyToDateTimeAndSeq.cs | 15 --------- src/NATS.Client.Core/NatsMsg.cs | 29 ++-------------- .../Internal/ReplyToDateTimeAndSeq.cs | 28 ++++++++++++++++ src/NATS.Client.JetStream/NatsJSMsg.cs | 11 +++---- .../Internal/ReplyToDateTimeAndSeqTest.cs | 13 -------- .../Internal/ReplyToDateTimeAndSeqTest.cs | 33 +++++++++++++++++++ 7 files changed, 69 insertions(+), 61 deletions(-) delete mode 100644 src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs create mode 100644 src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs delete mode 100644 tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs create mode 100644 tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs diff --git a/.editorconfig b/.editorconfig index a41b130ef..408cdcc41 100644 --- a/.editorconfig +++ b/.editorconfig @@ -13,6 +13,7 @@ generated_code = true [*.cs] indent_size = 4 +max_line_length = 300 # changes from VS2019 defaults csharp_style_namespace_declarations = file_scoped diff --git a/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs deleted file mode 100644 index 4734024b1..000000000 --- a/src/NATS.Client.Core/Internal/ReplyToDateTimeAndSeq.cs +++ /dev/null @@ -1,15 +0,0 @@ -namespace NATS.Client.Core.Internal; - -internal static class ReplyToDateTimeAndSeq -{ - internal static (DateTimeOffset DateTime, long Seq) Parse(string reply) - { - var ackSeperated = reply.Split("."); - var timestamp = long.Parse(ackSeperated[^2]); - var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); - var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); - var seq = long.Parse(ackSeperated[5]); - - return (dateTime, seq); - } -} diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 374991f94..67edc2b61 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; -using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -25,8 +24,6 @@ namespace NATS.Client.Core; public readonly record struct NatsMsg( string Subject, string? ReplyTo, - long? Seq, - DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, ReadOnlyMemory Data, @@ -53,22 +50,12 @@ internal static NatsMsg Build( headers.SetReadOnly(); } - long? seq = null; - DateTimeOffset? dateTime = null; - - if (replyTo != null) - { - var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); - seq = dateTimeAndSeq.Seq; - dateTime = dateTimeAndSeq.DateTime; - } - var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, payloadBuffer.ToArray(), connection); + return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); } /// @@ -142,8 +129,6 @@ private void CheckReplyPreconditions() public readonly record struct NatsMsg( string Subject, string? ReplyTo, - long? Seq, - DateTimeOffset? DateTime, int Size, NatsHeaders? Headers, T? Data, @@ -178,22 +163,12 @@ internal static NatsMsg Build( headers.SetReadOnly(); } - long? seq = null; - DateTimeOffset? dateTime = null; - - if (replyTo != null) - { - var dateTimeAndSeq = ReplyToDateTimeAndSeq.Parse(replyTo); - seq = dateTimeAndSeq.Seq; - dateTime = dateTimeAndSeq.DateTime; - } - var size = subject.Length + (replyTo?.Length ?? 0) + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; - return new NatsMsg(subject, replyTo, seq, dateTime, (int) size, headers, data, connection); + return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); } /// diff --git a/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs new file mode 100644 index 000000000..035858c9f --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs @@ -0,0 +1,28 @@ +namespace NATS.Client.JetStream.Internal; + +internal static class ReplyToDateTimeAndSeq +{ + internal static (DateTimeOffset? DateTime, long? Seq) Parse(string? reply) + { + if (reply == null) + { + return (null, null); + } + + var ackSeperated = reply.Split("."); + + // It must be seperated by 9 dots as defined in + // https://docs.nats.io/reference/reference-protocols/nats_api_reference#acknowledging-messages + if (ackSeperated.Length != 9) + { + return (null, null); + } + + var timestamp = long.Parse(ackSeperated[^2]); + var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); + var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); + var seq = long.Parse(ackSeperated[5]); + + return (dateTime, seq); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 03b735f24..6fde0aa71 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -12,11 +12,13 @@ public readonly struct NatsJSMsg { private readonly NatsJSContext _context; private readonly NatsMsg _msg; + private readonly Lazy<(DateTimeOffset? DateTime, long? Sequence)> _replyToDateTimeAndSeq; internal NatsJSMsg(NatsMsg msg, NatsJSContext context) { _msg = msg; _context = context; + _replyToDateTimeAndSeq = new Lazy<(DateTimeOffset? DateTime, long? Sequnce)>(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); } /// @@ -53,12 +55,12 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// /// The sequence number of the message. /// - public long? Sequence => _msg.Seq; + public long? Sequence => _replyToDateTimeAndSeq.Value.Sequence; /// /// The time of the message. /// - public DateTimeOffset? DateTime => _msg.DateTime; + public DateTimeOffset? DateTime => _replyToDateTimeAndSeq.Value.DateTime; /// /// Acknowledges the message was completely handled. @@ -112,10 +114,7 @@ private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = de return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts - { - WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, - }, + opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, cancellationToken: cancellationToken); } } diff --git a/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs deleted file mode 100644 index dcc6c27f3..000000000 --- a/tests/NATS.Client.Core.Tests/Internal/ReplyToDateTimeAndSeqTest.cs +++ /dev/null @@ -1,13 +0,0 @@ -namespace NATS.Client.Core.Tests.Internal; - -public class ReplyToDateTimeAndSeqTest -{ - [Fact] - public void ShouldParseReplyToDateTimeAndSeq() - { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); - - dateTime.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); - seq.Should().Be(100); - } -} diff --git a/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs new file mode 100644 index 000000000..7c9a64bb8 --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs @@ -0,0 +1,33 @@ +using NATS.Client.JetStream.Internal; + +namespace NATS.Client.JetStream.Tests.Internal; + +public class ReplyToDateTimeAndSeqTest +{ + [Fact] + public void ShouldParseReplyToDateTimeAndSeq() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + dateTime!.Value.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + seq.Should().Be(100); + } + + [Fact] + public void ShouldSetNullForReturnWhenReplyToIsNull() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse(null); + + dateTime.Should().BeNull(); + seq.Should().BeNull(); + } + + [Fact] + public void ShouldSetNullWhenReplyToIsASimpleReqResponse() + { + var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("_INBOX.1"); + + dateTime.Should().BeNull(); + seq.Should().BeNull(); + } +} From 298cc5ec9fbcac576b8140c224a05dc8b8f1653d Mon Sep 17 00:00:00 2001 From: Simon Hoss Date: Sat, 30 Sep 2023 18:37:52 +0200 Subject: [PATCH 5/5] Parse reply based on official JSAck ADR --- .../Internal/ReplyToDateTimeAndSeq.cs | 66 ++++++++++++++++--- src/NATS.Client.JetStream/NatsJSMsg.cs | 13 ++-- .../NatsJSMsgMetadata.cs | 38 +++++++++++ .../Internal/ReplyToDateTimeAndSeqTest.cs | 47 ++++++++++--- 4 files changed, 135 insertions(+), 29 deletions(-) create mode 100644 src/NATS.Client.JetStream/NatsJSMsgMetadata.cs diff --git a/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs index 035858c9f..90e284c63 100644 --- a/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs +++ b/src/NATS.Client.JetStream/Internal/ReplyToDateTimeAndSeq.cs @@ -2,27 +2,73 @@ namespace NATS.Client.JetStream.Internal; internal static class ReplyToDateTimeAndSeq { - internal static (DateTimeOffset? DateTime, long? Seq) Parse(string? reply) + private const int V1TokenCounts = 9; + private const int V2TokenCounts = 12; + + private const int AckDomainTokenPos = 2; + private const int AckAccHashTokenPos = 3; + private const int AckStreamTokenPos = 4; + private const int AckConsumerTokenPos = 5; + private const int AckNumDeliveredTokenPos = 6; + private const int AckStreamSeqTokenPos = 7; + private const int AckConsumerSeqTokenPos = 8; + private const int AckTimestampSeqTokenPos = 9; + private const int AckNumPendingTokenPos = 10; + + internal static NatsJSMsgMetadata? Parse(string? reply) { if (reply == null) { - return (null, null); + return null; + } + + var originalTokens = reply.Split(".").AsSpan(); + var tokensLength = originalTokens.Length; + + // Parsed based on https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-15.md#jsack + + // If lower than 9 or more than 9 but less than 11, report an error + if (tokensLength is < V1TokenCounts or > V1TokenCounts and < V2TokenCounts - 1) + { + return null; } - var ackSeperated = reply.Split("."); + if (originalTokens[0] != "$JS" || originalTokens[1] != "ACK") + { + return null; + } - // It must be seperated by 9 dots as defined in - // https://docs.nats.io/reference/reference-protocols/nats_api_reference#acknowledging-messages - if (ackSeperated.Length != 9) + var tokens = new string[V2TokenCounts].AsSpan(); + + if (tokensLength == V1TokenCounts) { - return (null, null); + originalTokens[..2].CopyTo(tokens); + originalTokens[2..].CopyTo(tokens[4..]); + + tokens[AckDomainTokenPos] = string.Empty; + tokens[AckAccHashTokenPos] = string.Empty; + } + else + { + tokens = originalTokens; + + if (tokens[AckDomainTokenPos] == "_") + { + tokens[AckDomainTokenPos] = string.Empty; + } } - var timestamp = long.Parse(ackSeperated[^2]); + var timestamp = long.Parse(tokens[AckTimestampSeqTokenPos]); var offset = DateTimeOffset.FromUnixTimeMilliseconds(timestamp / 1000000); var dateTime = new DateTimeOffset(offset.Ticks, TimeSpan.Zero); - var seq = long.Parse(ackSeperated[5]); - return (dateTime, seq); + return new NatsJSMsgMetadata( + new NatsJSSequencePair(ulong.Parse(tokens[AckStreamSeqTokenPos]), ulong.Parse(tokens[AckConsumerSeqTokenPos])), + ulong.Parse(tokens[AckNumDeliveredTokenPos]), + ulong.Parse(tokens[AckNumPendingTokenPos]), + dateTime, + tokens[AckStreamTokenPos], + tokens[AckConsumerTokenPos], + tokens[AckDomainTokenPos]); } } diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index 6fde0aa71..ed4e49149 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -12,13 +12,13 @@ public readonly struct NatsJSMsg { private readonly NatsJSContext _context; private readonly NatsMsg _msg; - private readonly Lazy<(DateTimeOffset? DateTime, long? Sequence)> _replyToDateTimeAndSeq; + private readonly Lazy _replyToDateTimeAndSeq; internal NatsJSMsg(NatsMsg msg, NatsJSContext context) { _msg = msg; _context = context; - _replyToDateTimeAndSeq = new Lazy<(DateTimeOffset? DateTime, long? Sequnce)>(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); + _replyToDateTimeAndSeq = new Lazy(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); } /// @@ -53,14 +53,9 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) public INatsConnection? Connection => _msg.Connection; /// - /// The sequence number of the message. + /// Additional metadata about the message. /// - public long? Sequence => _replyToDateTimeAndSeq.Value.Sequence; - - /// - /// The time of the message. - /// - public DateTimeOffset? DateTime => _replyToDateTimeAndSeq.Value.DateTime; + public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value; /// /// Acknowledges the message was completely handled. diff --git a/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs b/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs new file mode 100644 index 000000000..e31341a5d --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSMsgMetadata.cs @@ -0,0 +1,38 @@ +namespace NATS.Client.JetStream; + +/// +/// Additional metadata about the message. +/// +/// +/// The sequence pair for the message. +/// +/// +/// The number of times the message was delivered. +/// +/// +/// The number of messages pending for the consumer. +/// +/// +/// The timestamp of the message. +/// +/// +/// The stream the message was sent to. +/// +/// +/// The consumer the message was sent to. +/// +/// +/// The domain the message was sent to. +/// +public readonly record struct NatsJSMsgMetadata(NatsJSSequencePair Sequence, ulong NumDelivered, ulong NumPending, DateTimeOffset Timestamp, string Stream, string Consumer, string Domain); + +/// +/// The sequence pair for the message. +/// +/// +/// The stream sequence number. +/// +/// +/// The consumer sequence number. +/// +public readonly record struct NatsJSSequencePair(ulong Stream, ulong Consumer); diff --git a/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs index 7c9a64bb8..bf1118e5c 100644 --- a/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs +++ b/tests/NATS.Client.JetStream.Tests/Internal/ReplyToDateTimeAndSeqTest.cs @@ -5,29 +5,56 @@ namespace NATS.Client.JetStream.Tests.Internal; public class ReplyToDateTimeAndSeqTest { [Fact] - public void ShouldParseReplyToDateTimeAndSeq() + public void ShouldParseV1ReplyToDateTimeAndSeq() { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); - dateTime!.Value.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); - seq.Should().Be(100); + natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100); + natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1); + natsJSMsgMetadata.Value.NumDelivered.Should().Be(1); + natsJSMsgMetadata.Value.NumPending.Should().Be(0); + natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest"); + natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0"); + natsJSMsgMetadata.Value.Domain.Should().BeEmpty(); + } + + [Fact] + public void ShouldParseV2ReplyToDateTimeAndSeq() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("$JS.ACK.MyDomain.1234.UnitTest.GetEvents_0.1.100.1.1696023331771188000.0"); + + natsJSMsgMetadata!.Value.Timestamp.ToString("O").Should().Be("2023-09-29T21:35:31.7710000+00:00"); + natsJSMsgMetadata.Value.Sequence.Stream.Should().Be(100); + natsJSMsgMetadata.Value.Sequence.Consumer.Should().Be(1); + natsJSMsgMetadata.Value.NumDelivered.Should().Be(1); + natsJSMsgMetadata.Value.NumPending.Should().Be(0); + natsJSMsgMetadata.Value.Stream.Should().Be("UnitTest"); + natsJSMsgMetadata.Value.Consumer.Should().Be("GetEvents_0"); + natsJSMsgMetadata.Value.Domain.Should().Be("MyDomain"); } [Fact] public void ShouldSetNullForReturnWhenReplyToIsNull() { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse(null); + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse(null); - dateTime.Should().BeNull(); - seq.Should().BeNull(); + natsJSMsgMetadata.Should().BeNull(); } [Fact] public void ShouldSetNullWhenReplyToIsASimpleReqResponse() { - var (dateTime, seq) = ReplyToDateTimeAndSeq.Parse("_INBOX.1"); + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("_INBOX.1"); + + natsJSMsgMetadata.Should().BeNull(); + } + + [Fact] + public void ShouldSetNullWhenDoesNotStartWithJsAck() + { + var natsJSMsgMetadata = ReplyToDateTimeAndSeq.Parse("1.2.3.4.5.6.7.8.9"); - dateTime.Should().BeNull(); - seq.Should().BeNull(); + natsJSMsgMetadata.Should().BeNull(); } }