Skip to content

Commit

Permalink
Fix format and warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Oct 10, 2023
1 parent 41f1d6f commit 4f536be
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 93 deletions.
2 changes: 1 addition & 1 deletion sandbox/Example.KeyValueStore.Watcher/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
Expand Down
6 changes: 3 additions & 3 deletions sandbox/MicroBenchmark/NewInboxBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ public class NewInboxBenchmarks
{
private static readonly NatsOpts LongPrefixOpt = NatsOpts.Default
with
{
InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here",
};
{
InboxPrefix = "this-is-a-rather-long-prefix-that-we-use-here",
};

private static readonly NatsConnection ConnectionDefaultPrefix = new();
private static readonly NatsConnection ConnectionLongPrefix = new(LongPrefixOpt);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/INatsSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
{
if (buffer.Length == 0)
{

}

return (T)(object)buffer.ToArray();
}

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/NuidWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/INatsKVWatcher.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Threading.Channels;
using System.Threading.Channels;

namespace NATS.Client.KeyValueStore;

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Buffers;
using System.Buffers;
using System.Threading.Channels;
using NATS.Client.Core;
using NATS.Client.JetStream;
Expand Down
165 changes: 87 additions & 78 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Threading.Channels;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
Expand Down Expand Up @@ -39,7 +39,7 @@ internal class NatsKVWatcher<T> : INatsKVWatcher<T>
private readonly NatsConnection _nats;
private readonly Channel<NatsKVWatchCommandMsg<T>> _commandChannel;
private readonly Channel<NatsKVEntry<T?>> _entryChannel;
private readonly Channel<(string origin, string consumer)> _consumerCreateChannel;
private readonly Channel<string> _consumerCreateChannel;
private readonly Timer _timer;
private readonly int _hbTimeout;
private readonly long _idleHbNanos;
Expand Down Expand Up @@ -105,7 +105,7 @@ public NatsKVWatcher(
// A single request to create the consumer is enough because we don't want to create a new consumer
// back to back in case the consumer is being recreated due to a timeout and a mismatch in consumer
// sequence for example; creating the consumer once would solve both the issues.
_consumerCreateChannel = Channel.CreateBounded<(string origin, string consumer)>(new BoundedChannelOptions(1)
_consumerCreateChannel = Channel.CreateBounded<string>(new BoundedChannelOptions(1)
{
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropOldest,
Expand Down Expand Up @@ -133,7 +133,11 @@ public async ValueTask DisposeAsync()
await _commandTask;
}

internal void Init() => CreateSub("init");
internal ValueTask InitAsync()
{
Consumer = NewNuid();
return CreatePushConsumer("init");
}

private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer();

Expand Down Expand Up @@ -279,79 +283,7 @@ private async Task ConsumerCreateLoop()
{
try
{
if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.NewConsumer, "Creating new consumer {Consumer} from {Origin}", Consumer, origin);
}

if (_sub != null)
{
if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.DeleteOldDeliverySubject, "Deleting old delivery subject {Subject}", _sub.Subject);
}

await _sub.UnsubscribeAsync();
await _sub.DisposeAsync();
}

_sub = new NatsKVWatchSub<T>(_context, _commandChannel, _subOpts, _cancellationToken);
await _context.Connection.SubAsync(_sub, _cancellationToken).ConfigureAwait(false);

if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.NewDeliverySubject, "New delivery subject {Subject}", _sub.Subject);
}

Interlocked.Exchange(ref _sequenceConsumer, 0);

var sequence = Volatile.Read(ref _sequenceStream);

var config = new ConsumerConfiguration
{
Name = Consumer,
DeliverPolicy = ConsumerConfigurationDeliverPolicy.all,
AckPolicy = ConsumerConfigurationAckPolicy.none,
DeliverSubject = _sub.Subject,
FilterSubject = _filter,
FlowControl = true,
IdleHeartbeat = _idleHbNanos,
AckWait = _ackWaitNanos,
MaxDeliver = 1,
MemStorage = true,
NumReplicas = 1,
ReplayPolicy = ConsumerConfigurationReplayPolicy.instant,
};

if (!_opts.IncludeHistory)
{
config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.last_per_subject;
}

if (_opts.UpdatesOnly)
{
config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.@new;
}

if (_opts.MetaOnly)
{
config.HeadersOnly = true;
}

if (sequence > 0)
{
config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.by_start_sequence;
config.OptStartSeq = sequence + 1;
}

await _context.CreateConsumerAsync(
new ConsumerCreateRequest { StreamName = _stream, Config = config, },
cancellationToken: _cancellationToken);

if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.NewConsumerCreated, "Created new consumer {Consumer} from {Origin}", Consumer, origin);
}
await CreatePushConsumer(origin);
}
catch (Exception e)
{
Expand All @@ -369,6 +301,83 @@ await _context.CreateConsumerAsync(
}
}

private async ValueTask CreatePushConsumer(string origin)
{
if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.NewConsumer, "Creating new consumer {Consumer} from {Origin}", Consumer, origin);
}

if (_sub != null)
{
if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.DeleteOldDeliverySubject, "Deleting old delivery subject {Subject}", _sub.Subject);
}

await _sub.UnsubscribeAsync();
await _sub.DisposeAsync();
}

_sub = new NatsKVWatchSub<T>(_context, _commandChannel, _subOpts, _cancellationToken);
await _context.Connection.SubAsync(_sub, _cancellationToken).ConfigureAwait(false);

if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.NewDeliverySubject, "New delivery subject {Subject}", _sub.Subject);
}

Interlocked.Exchange(ref _sequenceConsumer, 0);

var sequence = Volatile.Read(ref _sequenceStream);

var config = new ConsumerConfiguration
{
Name = Consumer,
DeliverPolicy = ConsumerConfigurationDeliverPolicy.all,
AckPolicy = ConsumerConfigurationAckPolicy.none,
DeliverSubject = _sub.Subject,
FilterSubject = _filter,
FlowControl = true,
IdleHeartbeat = _idleHbNanos,
AckWait = _ackWaitNanos,
MaxDeliver = 1,
MemStorage = true,
NumReplicas = 1,
ReplayPolicy = ConsumerConfigurationReplayPolicy.instant,
};

if (!_opts.IncludeHistory)
{
config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.last_per_subject;
}

if (_opts.UpdatesOnly)
{
config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.@new;
}

if (_opts.MetaOnly)
{
config.HeadersOnly = true;
}

if (sequence > 0)
{
config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.by_start_sequence;
config.OptStartSeq = sequence + 1;
}

await _context.CreateConsumerAsync(
new ConsumerCreateRequest { StreamName = _stream, Config = config, },
cancellationToken: _cancellationToken);

if (_debug)
{
_logger.LogDebug(NatsKVLogEvents.NewConsumerCreated, "Created new consumer {Consumer} from {Origin}", Consumer, origin);
}
}

private string NewNuid()
{
Span<char> buffer = stackalloc char[22];
Expand All @@ -387,6 +396,6 @@ private string NewNuid()
private void CreateSub(string origin)
{
Consumer = NewNuid();
_consumerCreateChannel.Writer.TryWrite((origin, Consumer));
_consumerCreateChannel.Writer.TryWrite(origin);
}
}
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/NatsKVOpts.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using NATS.Client.Core;
using NATS.Client.Core;

namespace NATS.Client.KeyValueStore;

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async ValueTask<INatsKVWatcher<T>> WatchAsync<T>(string key, NatsKVWatchO
subOpts: default,
cancellationToken);

watcher.Init();
await watcher.InitAsync();

return watcher;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.Tests/NuidWriterTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Reflection;
using System.Security.Cryptography;
using System.Text.RegularExpressions;
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.Tests/SerializerTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Buffers;
using System.Buffers;

namespace NATS.Client.Core.Tests;

Expand Down
6 changes: 3 additions & 3 deletions tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Buffers;
using System.Buffers;
using System.Buffers.Text;
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Tests;
Expand Down Expand Up @@ -82,7 +82,7 @@ public async Task Watcher_reconnect()
}
else
{
Assert.Fail("Not a number (2)");
Assert.Fail("Not a number (2)");
}
}
}
Expand Down Expand Up @@ -234,7 +234,7 @@ await Retry.Until(
}
else
{
Assert.Fail("Not a number (2)");
Assert.Fail("Not a number (2)");
}
}
}
Expand Down

0 comments on commit 4f536be

Please sign in to comment.