From e2ee84c5123c78014a4bb127727159a5375a30c5 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 6 Jun 2024 00:17:01 +0100 Subject: [PATCH 1/7] Update consumer channel capacity to maxMsgs The capacity now has a limit set to maxMsgs to prevent pulling excessive messages from the server. It also has been bounded to a maximum of 1024 to avoid large object heap allocations. --- src/NATS.Client.JetStream/Internal/NatsJSConsume.cs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index d94c9b88b..3c6480275 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -128,11 +128,12 @@ public NatsJSConsume( Timeout.Infinite, Timeout.Infinite); - // This channel is used to pass messages - // to the user from the subscription channel (which should be set to a - // sufficiently large value to avoid blocking socket reads in the - // NATS connection). - _userMsgs = Channel.CreateBounded>(1000); + // This channel is used to pass messages to the user from the subscription channel. + // Capacity is set to maxMsgs to avoid pulling too many messages from the server + // resulting in a large number of pending messages (which can be a problem if the + // application is slow to process messages). The capacity is bounded to a maximum of 1024 + // to avoid LOH allocations. + _userMsgs = Channel.CreateBounded>(maxMsgs > 1024 ? 1024 : (int)maxMsgs); Msgs = _userMsgs.Reader; // Capacity as 1 is enough here since it's used for signaling only. From 3ff8b06b0db76c22e778760940fd444505dd5104 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 6 Jun 2024 01:41:19 +0100 Subject: [PATCH 2/7] Set minimum message capacity --- src/NATS.Client.JetStream/Internal/NatsJSConsume.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 3c6480275..fc44e176c 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -132,8 +132,9 @@ public NatsJSConsume( // Capacity is set to maxMsgs to avoid pulling too many messages from the server // resulting in a large number of pending messages (which can be a problem if the // application is slow to process messages). The capacity is bounded to a maximum of 1024 - // to avoid LOH allocations. - _userMsgs = Channel.CreateBounded>(maxMsgs > 1024 ? 1024 : (int)maxMsgs); + // to avoid LOH allocations. Also, the capacity is set to a minimum of 64 to avoid + // socket reads being blocked. + _userMsgs = Channel.CreateBounded>(maxMsgs > 1024 ? 1024 : maxMsgs < 64 ? 64 : (int)maxMsgs); Msgs = _userMsgs.Reader; // Capacity as 1 is enough here since it's used for signaling only. From cb044fe1a9e3729aa75554d65feede61d9bd3ae9 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 6 Jun 2024 10:27:24 +0100 Subject: [PATCH 3/7] Adjust lower limit for message channel capacity --- src/NATS.Client.JetStream/Internal/NatsJSConsume.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index fc44e176c..816cf599f 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -134,7 +134,7 @@ public NatsJSConsume( // application is slow to process messages). The capacity is bounded to a maximum of 1024 // to avoid LOH allocations. Also, the capacity is set to a minimum of 64 to avoid // socket reads being blocked. - _userMsgs = Channel.CreateBounded>(maxMsgs > 1024 ? 1024 : maxMsgs < 64 ? 64 : (int)maxMsgs); + _userMsgs = Channel.CreateBounded>(maxMsgs > 1024 ? 1024 : maxMsgs < 1 ? 1 : (int)maxMsgs); Msgs = _userMsgs.Reader; // Capacity as 1 is enough here since it's used for signaling only. From 2484d1ca5082bb46783c1f0bd0bd257c045496d9 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 6 Jun 2024 11:47:57 +0100 Subject: [PATCH 4/7] Update behavior for maxMsgs --- src/NATS.Client.JetStream/Internal/NatsJSConsume.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 816cf599f..76615fd6c 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -132,9 +132,9 @@ public NatsJSConsume( // Capacity is set to maxMsgs to avoid pulling too many messages from the server // resulting in a large number of pending messages (which can be a problem if the // application is slow to process messages). The capacity is bounded to a maximum of 1024 - // to avoid LOH allocations. Also, the capacity is set to a minimum of 64 to avoid - // socket reads being blocked. - _userMsgs = Channel.CreateBounded>(maxMsgs > 1024 ? 1024 : maxMsgs < 1 ? 1 : (int)maxMsgs); + // to avoid LOH allocations. Also, if maxMsgs is set to 0, we default to 1024 since it + // means maxBytes is set. + _userMsgs = Channel.CreateBounded>(maxMsgs is > 1024 or <= 0 ? 1024 : (int)maxMsgs); Msgs = _userMsgs.Reader; // Capacity as 1 is enough here since it's used for signaling only. From b6b8c3210b3cf9287fe2525d14849ab2c98935df Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 6 Jun 2024 19:22:43 +0100 Subject: [PATCH 5/7] Moved pending checks after delivery --- .../Internal/NatsJSConsume.cs | 117 +++++++++++------- src/NATS.Client.JetStream/NatsJSConsumer.cs | 1 + 2 files changed, 70 insertions(+), 48 deletions(-) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 76615fd6c..5fcf4f30c 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -4,7 +4,6 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Internal; @@ -113,8 +112,7 @@ public NatsJSConsume( if (self.Connection.ConnectionState == NatsConnectionState.Open) { - self.Pull("heartbeat-timeout", self._maxMsgs, self._maxBytes); - self.ResetPending(); + self.CheckPending("heartbeat-timeout"); if (self._debug) { self._logger.LogDebug( @@ -129,12 +127,8 @@ public NatsJSConsume( Timeout.Infinite); // This channel is used to pass messages to the user from the subscription channel. - // Capacity is set to maxMsgs to avoid pulling too many messages from the server - // resulting in a large number of pending messages (which can be a problem if the - // application is slow to process messages). The capacity is bounded to a maximum of 1024 - // to avoid LOH allocations. Also, if maxMsgs is set to 0, we default to 1024 since it - // means maxBytes is set. - _userMsgs = Channel.CreateBounded>(maxMsgs is > 1024 or <= 0 ? 1024 : (int)maxMsgs); + // Capacity is bounded to a maximum of 1024 to avoid LOH allocations. + _userMsgs = Channel.CreateBounded>(1024); Msgs = _userMsgs.Reader; // Capacity as 1 is enough here since it's used for signaling only. @@ -166,6 +160,28 @@ public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, _hbTimeout); + public void Delivered(int msgSize) + { + lock (_pendingGate) + { + if (_pendingMsgs > 0) + _pendingMsgs--; + } + + if (_maxBytes > 0) + { + if (_debug) + _logger.LogDebug(NatsJSLogEvents.MessageProperty, "Message size {Size}", msgSize); + + lock (_pendingGate) + { + _pendingBytes -= msgSize; + } + } + + CheckPending("delivered"); + } + public override async ValueTask DisposeAsync() { Interlocked.Exchange(ref _disposed, 1); @@ -181,26 +197,48 @@ public override async ValueTask DisposeAsync() internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter commandWriter, int sid) { await base.WriteReconnectCommandsAsync(commandWriter, sid); - ResetPending(); - - var request = new ConsumerGetnextRequest - { - Batch = _maxMsgs, - MaxBytes = _maxBytes, - IdleHeartbeat = _idle, - Expires = _expires, - }; if (_cancellationToken.IsCancellationRequested) return; - await commandWriter.PublishAsync( - subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", - value: request, - headers: default, - replyTo: Subject, - serializer: NatsJSJsonSerializer.Default, - cancellationToken: CancellationToken.None); + long maxMsgs = 0; + long maxBytes = 0; + + // We have to do the pending check here because we can't access + // the publish method here since the connection state is not open yet + // and we're just writing the reconnect commands. + lock (_pendingGate) + { + if (_maxBytes > 0 && _pendingBytes <= _thresholdBytes) + { + maxBytes = _maxBytes - _pendingBytes; + } + else if (_maxBytes == 0 && _pendingMsgs <= _thresholdMsgs && _pendingMsgs < _maxMsgs) + { + maxMsgs = _maxMsgs - _pendingMsgs; + } + } + + if (maxMsgs > 0 || maxBytes > 0) + { + var request = new ConsumerGetnextRequest + { + Batch = maxMsgs, + MaxBytes = maxBytes, + IdleHeartbeat = _idle, + Expires = _expires, + }; + + await commandWriter.PublishAsync( + subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", + value: request, + headers: default, + replyTo: Subject, + serializer: NatsJSJsonSerializer.Default, + cancellationToken: CancellationToken.None); + + ResetPending(); + } } protected override async ValueTask ReceiveInternalAsync( @@ -321,6 +359,8 @@ protected override async ValueTask ReceiveInternalAsync( { throw new NatsJSException("No header found"); } + + CheckPending("control-msg"); } else { @@ -335,23 +375,6 @@ protected override async ValueTask ReceiveInternalAsync( _serializer), _context); - lock (_pendingGate) - { - if (_pendingMsgs > 0) - _pendingMsgs--; - } - - if (_maxBytes > 0) - { - if (_debug) - _logger.LogDebug(NatsJSLogEvents.MessageProperty, "Message size {Size}", msg.Size); - - lock (_pendingGate) - { - _pendingBytes -= msg.Size; - } - } - // Stop feeding the user if we are disposed. // We need to exit as soon as possible. if (Volatile.Read(ref _disposed) == 0) @@ -359,11 +382,9 @@ protected override async ValueTask ReceiveInternalAsync( // We can't pass cancellation token here because we need to hand // the message to the user to be processed. Writer will be completed // when the user calls Stop() or when the subscription is closed. - await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + await _userMsgs.Writer.WriteAsync(msg, CancellationToken.None).ConfigureAwait(false); } } - - CheckPending(); } protected override void TryComplete() @@ -381,7 +402,7 @@ private void ResetPending() } } - private void CheckPending() + private void CheckPending(string origin) { lock (_pendingGate) { @@ -390,7 +411,7 @@ private void CheckPending() if (_debug) _logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending bytes {Pending}, {MaxBytes}", _pendingBytes, _maxBytes); - Pull("chk-bytes", _maxMsgs, _maxBytes - _pendingBytes); + Pull($"chk-bytes({origin})", _maxMsgs, _maxBytes - _pendingBytes); ResetPending(); } else if (_maxBytes == 0 && _pendingMsgs <= _thresholdMsgs && _pendingMsgs < _maxMsgs) @@ -398,7 +419,7 @@ private void CheckPending() if (_debug) _logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending messages {Pending}, {MaxMsgs}", _pendingMsgs, _maxMsgs); - Pull("chk-msgs", _maxMsgs - _pendingMsgs, 0); + Pull($"chk-msgs({origin})", _maxMsgs - _pendingMsgs, 0); ResetPending(); } } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index e1c7b400d..de84b7f72 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -96,6 +96,7 @@ public async IAsyncEnumerable> ConsumeAsync( break; yield return jsMsg; + cc.Delivered(jsMsg.Size); } } } From 11bc11c57b4033943d8d403ed3967a59b15e5dac Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 9 Sep 2024 14:17:06 +0100 Subject: [PATCH 6/7] Fix tests --- .../Internal/NatsJSConsume.cs | 3 ++- .../ConsumerConsumeTest.cs | 17 +++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index d45e0a58a..f7ce93f94 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -112,7 +112,8 @@ public NatsJSConsume( if (self.Connection.ConnectionState == NatsConnectionState.Open) { - self.CheckPending("heartbeat-timeout"); + self.Pull("heartbeat-timeout", self._maxMsgs, self._maxBytes); + self.ResetPending(); if (self._debug) { self._logger.LogDebug( diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index d01d6831a..5b9cf705d 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -76,8 +76,7 @@ public async Task Consume_msgs_test() var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10 }; var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - await using var cc = await consumer.ConsumeInternalAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token); - await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) + await foreach (var msg in consumer.ConsumeAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token)) { await msg.AckAsync(cancellationToken: cts.Token); Assert.Equal(count, msg.Data!.Test); @@ -92,7 +91,7 @@ public async Task Consume_msgs_test() await Retry.Until( reason: "received enough pulls", - condition: () => PullCount() > 5, + condition: () => PullCount() >= 4, action: () => { _output.WriteLine($"### PullCount:{PullCount()}"); @@ -215,12 +214,10 @@ public async Task Consume_reconnect_test() // Not interested in management messages sent upto this point await proxy.FlushFramesAsync(nats); - var cc = await consumer.ConsumeInternalAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token); - var readerTask = Task.Run(async () => { var count = 0; - await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) + await foreach (var msg in consumer.ConsumeAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token)) { await msg.AckAsync(cancellationToken: cts.Token); Assert.Equal(count, msg.Data!.Test); @@ -230,6 +227,8 @@ public async Task Consume_reconnect_test() if (count == 2) break; } + + return count; }); // Send a message before reconnect @@ -258,11 +257,9 @@ await Retry.Until( ack.EnsureSuccess(); } - await Retry.Until( - "acked", - () => proxy.ClientFrames.Any(f => f.Message.Contains("CONSUMER.MSG.NEXT"))); + var count = await readerTask; + Assert.Equal(2, count); - await readerTask; await nats.DisposeAsync(); } From 4d82af9c21134866d4d1611e4347a1e2cd3ae305 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 9 Sep 2024 18:41:02 +0100 Subject: [PATCH 7/7] Test --- .../ConsumerConsumeTest.cs | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index 5b9cf705d..c6064642c 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -443,4 +443,61 @@ public async Task Serialization_errors() break; } } + + [Fact] + public async Task Consume_right_amount_of_messages() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", ["s1.*"], cts.Token); + + var payload = new byte[1024]; + for (var i = 0; i < 50; i++) + { + var ack = await js.PublishAsync("s1.foo", payload, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + // Max messages + { + var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var opts = new NatsJSConsumeOpts { MaxMsgs = 10, }; + var count = 0; + await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + if (++count == 4) + break; + } + + await Retry.Until("consumer stats updated", async () => + { + var info = (await js.GetConsumerAsync("s1", "c1", cts.Token)).Info; + return info is { NumAckPending: 6, NumPending: 40 }; + }); + } + + // Max bytes + { + var consumer = await js.CreateOrUpdateConsumerAsync("s1", "c2", cancellationToken: cts.Token); + var opts = new NatsJSConsumeOpts { MaxBytes = 10 * (1024 + 50), }; + var count = 0; + await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + if (++count == 4) + break; + } + + await Retry.Until("consumer stats updated", async () => + { + var info = (await js.GetConsumerAsync("s1", "c2", cts.Token)).Info; + return info is { NumAckPending: 6, NumPending: 40 }; + }); + } + } }