diff --git a/sandbox/Example.KeyValueStore.Watcher/Program.cs b/sandbox/Example.KeyValueStore.Watcher/Program.cs index 35c35913f..3bfed8697 100644 --- a/sandbox/Example.KeyValueStore.Watcher/Program.cs +++ b/sandbox/Example.KeyValueStore.Watcher/Program.cs @@ -1,4 +1,4 @@ -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.KeyValueStore; diff --git a/sandbox/MicroBenchmark/NewInboxBenchmarks.cs b/sandbox/MicroBenchmark/NewInboxBenchmarks.cs index e55c47967..8e53884e2 100644 --- a/sandbox/MicroBenchmark/NewInboxBenchmarks.cs +++ b/sandbox/MicroBenchmark/NewInboxBenchmarks.cs @@ -15,9 +15,9 @@ public class NewInboxBenchmarks { private static readonly NatsOpts LongPrefixOpt = NatsOpts.Default with - { - InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here", - }; + { + InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here", + }; private static readonly NatsConnection ConnectionDefaultPrefix = new(); private static readonly NatsConnection ConnectionLongPrefix = new(LongPrefixOpt); diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index 3f4653e61..34340863a 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -88,8 +88,8 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) { if (buffer.Length == 0) { - } + return (T)(object)buffer.ToArray(); } diff --git a/src/NATS.Client.Core/Internal/NuidWriter.cs b/src/NATS.Client.Core/Internal/NuidWriter.cs index 5061c078a..3783c22a9 100644 --- a/src/NATS.Client.Core/Internal/NuidWriter.cs +++ b/src/NATS.Client.Core/Internal/NuidWriter.cs @@ -1,4 +1,4 @@ -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Security.Cryptography; diff --git a/src/NATS.Client.KeyValueStore/INatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/INatsKVWatcher.cs index ae8d08a53..263cd1168 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVWatcher.cs @@ -1,4 +1,4 @@ -using System.Threading.Channels; +using System.Threading.Channels; namespace NATS.Client.KeyValueStore; diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index 8edcc7bcc..f4f12d357 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; using System.Threading.Channels; using NATS.Client.Core; using NATS.Client.JetStream; diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 00b9ad240..81641503c 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -1,4 +1,4 @@ -using System.Threading.Channels; +using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Internal; @@ -39,7 +39,7 @@ internal class NatsKVWatcher : INatsKVWatcher private readonly NatsConnection _nats; private readonly Channel> _commandChannel; private readonly Channel> _entryChannel; - private readonly Channel<(string origin, string consumer)> _consumerCreateChannel; + private readonly Channel _consumerCreateChannel; private readonly Timer _timer; private readonly int _hbTimeout; private readonly long _idleHbNanos; @@ -105,7 +105,7 @@ public NatsKVWatcher( // A single request to create the consumer is enough because we don't want to create a new consumer // back to back in case the consumer is being recreated due to a timeout and a mismatch in consumer // sequence for example; creating the consumer once would solve both the issues. - _consumerCreateChannel = Channel.CreateBounded<(string origin, string consumer)>(new BoundedChannelOptions(1) + _consumerCreateChannel = Channel.CreateBounded(new BoundedChannelOptions(1) { AllowSynchronousContinuations = false, FullMode = BoundedChannelFullMode.DropOldest, @@ -133,7 +133,11 @@ public async ValueTask DisposeAsync() await _commandTask; } - internal void Init() => CreateSub("init"); + internal ValueTask InitAsync() + { + Consumer = NewNuid(); + return CreatePushConsumer("init"); + } private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer(); @@ -279,79 +283,7 @@ private async Task ConsumerCreateLoop() { try { - if (_debug) - { - _logger.LogDebug(NatsKVLogEvents.NewConsumer, "Creating new consumer {Consumer} from {Origin}", Consumer, origin); - } - - if (_sub != null) - { - if (_debug) - { - _logger.LogDebug(NatsKVLogEvents.DeleteOldDeliverySubject, "Deleting old delivery subject {Subject}", _sub.Subject); - } - - await _sub.UnsubscribeAsync(); - await _sub.DisposeAsync(); - } - - _sub = new NatsKVWatchSub(_context, _commandChannel, _subOpts, _cancellationToken); - await _context.Connection.SubAsync(_sub, _cancellationToken).ConfigureAwait(false); - - if (_debug) - { - _logger.LogDebug(NatsKVLogEvents.NewDeliverySubject, "New delivery subject {Subject}", _sub.Subject); - } - - Interlocked.Exchange(ref _sequenceConsumer, 0); - - var sequence = Volatile.Read(ref _sequenceStream); - - var config = new ConsumerConfiguration - { - Name = Consumer, - DeliverPolicy = ConsumerConfigurationDeliverPolicy.all, - AckPolicy = ConsumerConfigurationAckPolicy.none, - DeliverSubject = _sub.Subject, - FilterSubject = _filter, - FlowControl = true, - IdleHeartbeat = _idleHbNanos, - AckWait = _ackWaitNanos, - MaxDeliver = 1, - MemStorage = true, - NumReplicas = 1, - ReplayPolicy = ConsumerConfigurationReplayPolicy.instant, - }; - - if (!_opts.IncludeHistory) - { - config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.last_per_subject; - } - - if (_opts.UpdatesOnly) - { - config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.@new; - } - - if (_opts.MetaOnly) - { - config.HeadersOnly = true; - } - - if (sequence > 0) - { - config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.by_start_sequence; - config.OptStartSeq = sequence + 1; - } - - await _context.CreateConsumerAsync( - new ConsumerCreateRequest { StreamName = _stream, Config = config, }, - cancellationToken: _cancellationToken); - - if (_debug) - { - _logger.LogDebug(NatsKVLogEvents.NewConsumerCreated, "Created new consumer {Consumer} from {Origin}", Consumer, origin); - } + await CreatePushConsumer(origin); } catch (Exception e) { @@ -369,6 +301,83 @@ await _context.CreateConsumerAsync( } } + private async ValueTask CreatePushConsumer(string origin) + { + if (_debug) + { + _logger.LogDebug(NatsKVLogEvents.NewConsumer, "Creating new consumer {Consumer} from {Origin}", Consumer, origin); + } + + if (_sub != null) + { + if (_debug) + { + _logger.LogDebug(NatsKVLogEvents.DeleteOldDeliverySubject, "Deleting old delivery subject {Subject}", _sub.Subject); + } + + await _sub.UnsubscribeAsync(); + await _sub.DisposeAsync(); + } + + _sub = new NatsKVWatchSub(_context, _commandChannel, _subOpts, _cancellationToken); + await _context.Connection.SubAsync(_sub, _cancellationToken).ConfigureAwait(false); + + if (_debug) + { + _logger.LogDebug(NatsKVLogEvents.NewDeliverySubject, "New delivery subject {Subject}", _sub.Subject); + } + + Interlocked.Exchange(ref _sequenceConsumer, 0); + + var sequence = Volatile.Read(ref _sequenceStream); + + var config = new ConsumerConfiguration + { + Name = Consumer, + DeliverPolicy = ConsumerConfigurationDeliverPolicy.all, + AckPolicy = ConsumerConfigurationAckPolicy.none, + DeliverSubject = _sub.Subject, + FilterSubject = _filter, + FlowControl = true, + IdleHeartbeat = _idleHbNanos, + AckWait = _ackWaitNanos, + MaxDeliver = 1, + MemStorage = true, + NumReplicas = 1, + ReplayPolicy = ConsumerConfigurationReplayPolicy.instant, + }; + + if (!_opts.IncludeHistory) + { + config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.last_per_subject; + } + + if (_opts.UpdatesOnly) + { + config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.@new; + } + + if (_opts.MetaOnly) + { + config.HeadersOnly = true; + } + + if (sequence > 0) + { + config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.by_start_sequence; + config.OptStartSeq = sequence + 1; + } + + await _context.CreateConsumerAsync( + new ConsumerCreateRequest { StreamName = _stream, Config = config, }, + cancellationToken: _cancellationToken); + + if (_debug) + { + _logger.LogDebug(NatsKVLogEvents.NewConsumerCreated, "Created new consumer {Consumer} from {Origin}", Consumer, origin); + } + } + private string NewNuid() { Span buffer = stackalloc char[22]; @@ -387,6 +396,6 @@ private string NewNuid() private void CreateSub(string origin) { Consumer = NewNuid(); - _consumerCreateChannel.Writer.TryWrite((origin, Consumer)); + _consumerCreateChannel.Writer.TryWrite(origin); } } diff --git a/src/NATS.Client.KeyValueStore/NatsKVOpts.cs b/src/NATS.Client.KeyValueStore/NatsKVOpts.cs index 7c1ff4094..a744e9d0d 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVOpts.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVOpts.cs @@ -1,4 +1,4 @@ -using NATS.Client.Core; +using NATS.Client.Core; namespace NATS.Client.KeyValueStore; diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index bd1a18805..1495587d3 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -148,7 +148,7 @@ public async ValueTask> WatchAsync(string key, NatsKVWatchO subOpts: default, cancellationToken); - watcher.Init(); + await watcher.InitAsync(); return watcher; } diff --git a/tests/NATS.Client.Core.Tests/NuidWriterTests.cs b/tests/NATS.Client.Core.Tests/NuidWriterTests.cs index 84577d296..d22dac0e2 100644 --- a/tests/NATS.Client.Core.Tests/NuidWriterTests.cs +++ b/tests/NATS.Client.Core.Tests/NuidWriterTests.cs @@ -1,4 +1,4 @@ -using System.Collections.Concurrent; +using System.Collections.Concurrent; using System.Reflection; using System.Security.Cryptography; using System.Text.RegularExpressions; diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs index ecdf6555e..8128b4905 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; namespace NATS.Client.Core.Tests; diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs index 3d5af47e8..f4ceef136 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; using System.Buffers.Text; using Microsoft.Extensions.Logging; using NATS.Client.Core.Tests; @@ -82,7 +82,7 @@ public async Task Watcher_reconnect() } else { - Assert.Fail("Not a number (2)"); + Assert.Fail("Not a number (2)"); } } } @@ -234,7 +234,7 @@ await Retry.Until( } else { - Assert.Fail("Not a number (2)"); + Assert.Fail("Not a number (2)"); } } }