Skip to content

Commit

Permalink
Update impl
Browse files Browse the repository at this point in the history
  • Loading branch information
divyeshio committed Sep 15, 2024
1 parent 7124125 commit cdf2a6f
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 64 deletions.
17 changes: 12 additions & 5 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ internal sealed class CommandWriter : IAsyncDisposable
private readonly int _arrayPoolInitialSize;
private readonly object _lock = new();
private readonly CancellationTokenSource _cts;
private readonly NatsMetrics _metrics;
private readonly Memory<byte> _consolidateMem = new byte[SendMemSize].AsMemory();
private readonly TimeSpan _defaultCommandTimeout;
private readonly Action<PingCommand> _enqueuePing;
Expand All @@ -55,7 +54,7 @@ internal sealed class CommandWriter : IAsyncDisposable
private CancellationTokenSource? _ctsReader;
private volatile bool _disposed;

public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, NatsMetrics metrics, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
{
_logger = opts.LoggerFactory.CreateLogger<CommandWriter>();
_trace = _logger.IsEnabled(LogLevel.Trace);
Expand All @@ -67,7 +66,6 @@ public CommandWriter(string name, NatsConnection connection, ObjectPool pool, Na
// avoid defining another option.
_arrayPoolInitialSize = opts.WriterBufferSize / 256;

_metrics = metrics;
_defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout;
_enqueuePing = enqueuePing;
_protocolWriter = new ProtocolWriter(opts.SubjectEncoding);
Expand Down Expand Up @@ -693,11 +691,20 @@ private void EnqueueCommand()
return;
}

_metrics.AddPendingMessages(1);
NatsMetrics.AddPendingMessages(1);

_channelSize.Writer.TryWrite(size);
var flush = _pipeWriter.FlushAsync();
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();

if (flush.IsCompletedSuccessfully)
{
_flushTask = null;
NatsMetrics.AddPendingMessages(-1);
}
else
{
_flushTask = flush.AsTask();
}
}

private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts connectOpts, CancellationToken cancellationToken)
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/Commands/PriorityCommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable
{
private int _disposed;

public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, NatsMetrics metrics, Action<PingCommand> enqueuePing)
public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, Action<PingCommand> enqueuePing)
{
CommandWriter = new CommandWriter("init", connection, pool, opts, metrics, enqueuePing);
CommandWriter = new CommandWriter("init", connection, pool, opts, enqueuePing);
CommandWriter.Reset(socketConnection);
}

Expand Down
100 changes: 60 additions & 40 deletions src/NATS.Client.Core/Internal/NatsMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,93 @@

namespace NATS.Client.Core.Internal;

public sealed class NatsMetrics
public class NatsMetrics
{
public const string MeterName = "NATS.Client";

private readonly Meter _meter;

private readonly Counter<long> _subscriptionCounter;
private readonly Counter<long> _sentBytesCounter;
private readonly Counter<long> _receivedBytesCounter;
private readonly Counter<long> _pendingMessagesCounter;
private readonly Counter<long> _sentMessagesCounter;
private readonly Counter<long> _receivedMessagesCounter;

public NatsMetrics(IMeterFactory meterFactory)
public const string PendingMessagesInstrumentName = $"{InstrumentPrefix}.pending.messages";
public const string SentBytesInstrumentName = $"{InstrumentPrefix}.sent.bytes";
public const string ReceivedBytesInstrumentName = $"{InstrumentPrefix}.received.bytes";
public const string SentMessagesInstrumentName = $"{InstrumentPrefix}.sent.messages";
public const string ReceivedMessagesInstrumentName = $"{InstrumentPrefix}.received.messages";
public const string SubscriptionInstrumentName = $"{InstrumentPrefix}.subscription.count";

private const string InstrumentPrefix = "nats.client";

private static readonly Meter _meter;
private static readonly Counter<long> _subscriptionCounter;
private static readonly Counter<long> _pendingMessagesCounter;
private static readonly Counter<long> _sentBytesCounter;
private static readonly Counter<long> _receivedBytesCounter;
private static readonly Counter<long> _sentMessagesCounter;
private static readonly Counter<long> _receivedMessagesCounter;

static NatsMetrics()
{
_meter = meterFactory.Create(MeterName);
_meter = new Meter(MeterName);

_subscriptionCounter = _meter.CreateCounter<long>(
"nats.client.subscription.count",
SubscriptionInstrumentName,
unit: "{subscriptions}",
description: "Number of subscriptions");

_pendingMessagesCounter = _meter.CreateCounter<long>(
PendingMessagesInstrumentName,
unit: "{messages}",
description: "Number of pending messages");

_sentBytesCounter = _meter.CreateCounter<long>(
"nats.client.sent.bytes",
unit: "bytes",
SentBytesInstrumentName,
unit: "{bytes}",
description: "Number of bytes sent");

_receivedBytesCounter = _meter.CreateCounter<long>(
"nats.client.received.bytes",
unit: "bytes",
ReceivedBytesInstrumentName,
unit: "{bytes}",
description: "Number of bytes received");

_pendingMessagesCounter = _meter.CreateCounter<long>(
"nats.client.pending.messages",
unit: "messages",
description: "Number of pending messages");

_sentMessagesCounter = _meter.CreateCounter<long>(
"nats.client.sent.messages",
unit: "messages",
SentMessagesInstrumentName,
unit: "{messages}",
description: "Number of messages sent");

_receivedMessagesCounter = _meter.CreateCounter<long>(
"nats.client.received.messages",
unit: "messages",
ReceivedMessagesInstrumentName,
unit: "{messages}",
description: "Number of messages received");
}

public void IncrementSubscriptionCount() => _subscriptionCounter.Add(1);

public void DecrementSubscriptionCount() => _subscriptionCounter.Add(-1);

public void AddSentBytes(long bytes) => _sentBytesCounter.Add(bytes);
public static void IncrementSubscriptionCount()
{
_subscriptionCounter.Add(1);
}

public void AddReceivedBytes(long bytes) => _receivedBytesCounter.Add(bytes);
public static void DecrementSubscriptionCount()
{
_subscriptionCounter.Add(-1);
}

public void AddPendingMessages(long messages) => _pendingMessagesCounter.Add(messages);
public static void AddPendingMessages(long messages)
{
_pendingMessagesCounter.Add(messages);
}

public void AddSentMessages(long messages) => _sentMessagesCounter.Add(messages);
public static void AddSentBytes(long bytes)
{
_sentBytesCounter.Add(bytes);
}

public void AddReceivedMessages(long messages) => _receivedMessagesCounter.Add(messages);
public static void AddReceivedBytes(long bytes)
{
_receivedBytesCounter.Add(bytes);
}

// This factory used when type is created without DI.
internal sealed class DummyMeterFactory : IMeterFactory
public static void AddSentMessages(long messages)
{
public Meter Create(MeterOptions options) => new(options);
_sentMessagesCounter.Add(messages);
}

public void Dispose() { }
public static void AddReceivedMessages(long messages)
{
_receivedMessagesCounter.Add(messages);
}
}
7 changes: 3 additions & 4 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable
private readonly Task _infoParsed; // wait for an upgrade
private readonly ConcurrentQueue<PingCommand> _pingCommands; // wait for pong
private readonly ILogger<NatsReadProtocolProcessor> _logger;
private readonly NatsMetrics _metrics;
private readonly bool _trace;
private int _disposed;

Expand All @@ -34,7 +33,7 @@ public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnect
_waitForPongOrErrorSignal = waitForPongOrErrorSignal;
_infoParsed = infoParsed;
_pingCommands = new ConcurrentQueue<PingCommand>();
_socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, _connection.Metrics, connection.Opts.LoggerFactory);
_socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Opts.LoggerFactory);
_readLoop = Task.Run(ReadLoopAsync);
}

Expand Down Expand Up @@ -157,7 +156,7 @@ private async Task ReadLoopAsync()
code = GetCode(buffer);
}

_metrics.AddReceivedMessages(1);
NatsMetrics.AddReceivedMessages(1);

// Optimize for Msg parsing, Inline async code
if (code == ServerOpCodes.Msg)
Expand Down Expand Up @@ -444,7 +443,7 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
{
// reaches invalid line, log warn and try to get newline and go to nextloop.
_logger.LogWarning(NatsLogEvents.Protocol, "Reached invalid line");
_metrics.AddReceivedMessages(-1);
NatsMetrics.AddReceivedMessages(-1);

var position = buffer.PositionOf((byte)'\n');
if (position == null)
Expand Down
8 changes: 3 additions & 5 deletions src/NATS.Client.Core/Internal/SocketReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace NATS.Client.Core.Internal;
internal sealed class SocketReader
{
private readonly int _minimumBufferSize;
private readonly NatsMetrics _metrics;
private readonly SeqeunceBuilder _seqeunceBuilder = new SeqeunceBuilder();
private readonly Stopwatch _stopwatch = new Stopwatch();
private readonly ILogger<SocketReader> _logger;
Expand All @@ -18,11 +17,10 @@ internal sealed class SocketReader

private Memory<byte> _availableMemory;

public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, NatsMetrics metrics, ILoggerFactory loggerFactory)
public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, ILoggerFactory loggerFactory)
{
_socketConnection = socketConnection;
_minimumBufferSize = minimumBufferSize;
_metrics = metrics;
_logger = loggerFactory.CreateLogger<SocketReader>();
_isTraceLogging = _logger.IsEnabled(LogLevel.Trace);
}
Expand Down Expand Up @@ -66,7 +64,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize)
}

totalRead += read;
_metrics.AddReceivedBytes(read);
NatsMetrics.AddReceivedBytes(read);
_seqeunceBuilder.Append(_availableMemory.Slice(0, read));
_availableMemory = _availableMemory.Slice(read);
}
Expand Down Expand Up @@ -112,7 +110,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadUntilReceiveNewLineAsync()
throw ex;
}

_metrics.AddReceivedBytes(read);
NatsMetrics.AddReceivedBytes(read);
var appendMemory = _availableMemory.Slice(0, read);
_seqeunceBuilder.Append(appendMemory);
_availableMemory = _availableMemory.Slice(read);
Expand Down
7 changes: 2 additions & 5 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;
using static NATS.Client.Core.Internal.NatsMetrics;

#if NETSTANDARD
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
Expand Down Expand Up @@ -41,7 +40,6 @@ public partial class NatsConnection : INatsConnection
#pragma warning restore SA1401
private readonly object _gate = new object();
private readonly ILogger<NatsConnection> _logger;
internal readonly NatsMetrics Metrics;
private readonly ObjectPool _pool;
private readonly CancellationTokenSource _disposedCancellationTokenSource;
private readonly string _name;
Expand Down Expand Up @@ -82,8 +80,7 @@ public NatsConnection(NatsOpts opts)
_disposedCancellationTokenSource = new CancellationTokenSource();
_pool = new ObjectPool(opts.ObjectPoolSize);
_name = opts.Name;
Metrics = new NatsMetrics(new DummyMeterFactory());
CommandWriter = new CommandWriter("main", this, _pool, Opts, Metrics, EnqueuePing);
CommandWriter = new CommandWriter("main", this, _pool, Opts, EnqueuePing);
InboxPrefix = NewInbox(opts.InboxPrefix);
SubscriptionManager = new SubscriptionManager(this, InboxPrefix);
_clientOpts = ClientOpts.Create(Opts);
Expand Down Expand Up @@ -454,7 +451,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect)
// Authentication
_userCredentials?.Authenticate(_clientOpts, WritableServerInfo);

await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, Metrics, EnqueuePing))
await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, EnqueuePing))
{
// add CONNECT and PING command to priority lane
await priorityCommandWriter.CommandWriter.ConnectAsync(_clientOpts, CancellationToken.None).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Core.Internal;

namespace NATS.Extensions.Microsoft.DependencyInjection;

Expand Down Expand Up @@ -99,8 +98,6 @@ internal IServiceCollection Build()
}
}

_services.AddSingleton<NatsMetrics>();

return _services;
}

Expand Down

0 comments on commit cdf2a6f

Please sign in to comment.