Skip to content

Commit

Permalink
Fix various disposable issues (#625)
Browse files Browse the repository at this point in the history
* Fix various disposable issues

This PR fixes various disposable issues

* Fix

* Do not dispose certificates

* Build fixes

* Build fixes

* Build fixes

* Fix review issues

* Merge branch 'FixDisposable' of https://github.com/Bykiev/nats.net into FixDisposable

* Sign

---------

Co-authored-by: Ziya Suzen <[email protected]>
  • Loading branch information
Bykiev and mtmk authored Sep 4, 2024
1 parent 426ea8c commit 323124a
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Commands/ProtocolWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void WriteConnect(IBufferWriter<byte> writer, ClientOpts opts)
BinaryPrimitives.WriteUInt64LittleEndian(span, ConnectSpace);
writer.Advance(ConnectSpaceLength);

var jsonWriter = new Utf8JsonWriter(writer);
using var jsonWriter = new Utf8JsonWriter(writer);
JsonSerializer.Serialize(jsonWriter, opts, JsonContext.Default.ClientOpts);

span = writer.GetSpan(UInt16Length);
Expand Down
7 changes: 7 additions & 0 deletions src/NATS.Client.Core/Internal/SslStreamConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ public void SignalDisconnected(Exception exception)
public async Task AuthenticateAsClientAsync(NatsUri uri, TimeSpan timeout)
{
var options = await _tlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(true);

#if NETSTANDARD2_0
if (_sslStream != null)
_sslStream.Dispose();

_sslStream = new SslStream(
innerStream: new NetworkStream(_socket, true),
leaveInnerStreamOpen: false,
Expand All @@ -134,6 +138,9 @@ await _sslStream.AuthenticateAsClientAsync(
throw new NatsException($"TLS authentication failed", ex);
}
#else
if (_sslStream != null)
await _sslStream.DisposeAsync().ConfigureAwait(false);

_sslStream = new SslStream(innerStream: new NetworkStream(_socket, true));
try
{
Expand Down
3 changes: 2 additions & 1 deletion src/NATS.Client.Core/NKeyPair.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public byte[] Sign(byte[] src)
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
Expand Down Expand Up @@ -269,7 +270,7 @@ internal static string Encode(byte prefixbyte, bool seed, byte[] src)
if (src.Length != 32)
throw new NatsException("Invalid seed size");

var stream = new MemoryStream();
using var stream = new MemoryStream();

if (seed)
{
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ private async void StartPingTimer(CancellationToken cancellationToken)

_logger.LogDebug(NatsLogEvents.Connection, "Starting ping timer");

var periodicTimer = new PeriodicTimer(Opts.PingInterval);
using var periodicTimer = new PeriodicTimer(Opts.PingInterval);
ResetPongCount();
try
{
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ protected void ResetIdleTimeout()
if (_startUpTimeoutTimer != null)
{
_startUpTimeoutTimer.Change(dueTime: Timeout.InfiniteTimeSpan, period: Timeout.InfiniteTimeSpan);
_startUpTimeoutTimer = null;
_startUpTimeoutTimer.Dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace NATS.Client.JetStream.Internal;

internal class NatsJSNotificationChannel : IAsyncDisposable
internal sealed class NatsJSNotificationChannel : IAsyncDisposable
{
private readonly Func<INatsJSNotification, CancellationToken, Task> _notificationHandler;
private readonly Action<Exception> _exceptionHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ public async ValueTask DisposeAsync()
{
_nats.ConnectionDisconnected -= OnDisconnected;

#if NETSTANDARD2_0
_timer.Dispose();
#else
await _timer.DisposeAsync().ConfigureAwait(false);
#endif

// For correctly Dispose,
// first stop the consumer Creation operations and then the command execution operations.
// It is necessary that all consumerCreation operations have time to complete before command CommandLoop stop
Expand Down
8 changes: 7 additions & 1 deletion src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public NatsKVWatchCommandMsg()
public NatsJSMsg<T> Msg { get; init; } = default;
}

internal class NatsKVWatcher<T> : IAsyncDisposable
internal sealed class NatsKVWatcher<T> : IAsyncDisposable
{
private readonly ILogger _logger;
private readonly bool _debug;
Expand Down Expand Up @@ -144,6 +144,12 @@ public async ValueTask DisposeAsync()
await _sub.DisposeAsync();
}

#if NETSTANDARD2_0
_timer.Dispose();
#else
await _timer.DisposeAsync().ConfigureAwait(false);
#endif

_consumerCreateChannel.Writer.TryComplete();
_commandChannel.Writer.TryComplete();
_entryChannel.Writer.TryComplete();
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSCon
/// <returns>Object value as a byte array.</returns>
public async ValueTask<byte[]> GetBytesAsync(string key, CancellationToken cancellationToken = default)
{
var memoryStream = new MemoryStream();
using var memoryStream = new MemoryStream();
await GetAsync(key, memoryStream, cancellationToken: cancellationToken).ConfigureAwait(false);
return memoryStream.ToArray();
}
Expand Down

0 comments on commit 323124a

Please sign in to comment.