Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address corner cases around sas token refresh loop cleanup #3041

Merged
merged 6 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Track1 .NET Azure IoT Hub and DPS SDKs

* @drwill-ms @timtay-microsoft @abhipsaMisra @andyk-ms @brycewang-microsoft @tmahmood-microsoft @ngastelum-ms @patilsnr
* @drwill-ms @timtay-microsoft @abhipsaMisra @andyk-ms @brycewang-microsoft @tmahmood-microsoft @ngastelum-ms @patilsnr @schoims
114 changes: 86 additions & 28 deletions iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ internal class AmqpAuthenticationRefresher : IAmqpAuthenticationRefresher, IDisp
private readonly IConnectionCredentials _connectionCredentials;
private readonly AmqpIotCbsTokenProvider _amqpIotCbsTokenProvider;
private readonly string _audience;
private CancellationTokenSource _refresherCancellationTokenSource;
private Task _refreshLoop;
private CancellationTokenSource _loopCancellationTokenSource;
private bool _disposed;

internal AmqpAuthenticationRefresher(IConnectionCredentials connectionCredentials, AmqpIotCbsLink amqpCbsLink)
{
Expand Down Expand Up @@ -51,54 +52,83 @@ async Task IAmqpAuthenticationRefresher.InitLoopAsync(CancellationToken cancella
cancellationToken)
.ConfigureAwait(false);

// This cancellation token source is disposed when the authentication refresher is disposed
// or if this code block is executed more than once per instance of AmqpAuthenticationRefresher (not expected).

if (_refresherCancellationTokenSource != null)
{
if (Logging.IsEnabled)
Logging.Info(this, "_refresherCancellationTokenSource was already initialized, which was unexpected. Canceling and disposing the previous instance.", nameof(IAmqpAuthenticationRefresher.InitLoopAsync));

try
{
_refresherCancellationTokenSource.Cancel();
}
catch (ObjectDisposedException)
{
}
_refresherCancellationTokenSource.Dispose();
}
_refresherCancellationTokenSource = new CancellationTokenSource();

// AmqpAuthenticationRefresher.StartLoop is implemented as an explicit interface implementation.
// This is because we do not wish to make the interface and its methods public.
// Implicit interface implementation requires the interface and its methods to be public.
// Since StartLoop can only be accessed through an AmqpAuthenticationRefresher instance, we have to add the instance check.
if (refreshOn < DateTime.MaxValue
&& this is IAmqpAuthenticationRefresher refresher)
{
refresher.StartLoop(refreshOn);
refresher.StartLoop(refreshOn, _refresherCancellationTokenSource.Token);
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(IAmqpAuthenticationRefresher.InitLoopAsync));
}

void IAmqpAuthenticationRefresher.StartLoop(DateTime refreshOn)
void IAmqpAuthenticationRefresher.StartLoop(DateTime refreshOn, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(this, refreshOn, nameof(IAmqpAuthenticationRefresher.StartLoop));

if (_loopCancellationTokenSource == null
|| _refreshLoop == null)
{
(this as IAmqpAuthenticationRefresher)?.StopLoop();
}

_loopCancellationTokenSource = new CancellationTokenSource();
_refreshLoop = RefreshLoopAsync(
refreshOn,
_loopCancellationTokenSource.Token);
// This task runs in the background and is unmonitored.
// When this refresher is disposed it signals this task to be cancelled.
_refreshLoop = RefreshLoopAsync(refreshOn, cancellationToken);

if (Logging.IsEnabled)
Logging.Exit(this, refreshOn, nameof(IAmqpAuthenticationRefresher.StartLoop));
}

async void IAmqpAuthenticationRefresher.StopLoop()
async Task IAmqpAuthenticationRefresher.StopLoopAsync()
{
_loopCancellationTokenSource?.Cancel();
if (_refreshLoop != null)
try
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(IAmqpAuthenticationRefresher.StopLoopAsync));

try
{
await _refreshLoop.ConfigureAwait(false);
_refresherCancellationTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(IAmqpAuthenticationRefresher.StopLoopAsync));
}
catch (OperationCanceledException) { }
_refreshLoop = null;
}

_loopCancellationTokenSource?.Dispose();
_loopCancellationTokenSource = null;

if (Logging.IsEnabled)
Logging.Info(this, nameof(IAmqpAuthenticationRefresher.StopLoop));
// Await the completion of _refreshLoop.
// This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress.
await _refreshLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}");
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, nameof(IAmqpAuthenticationRefresher.StopLoopAsync));
}
}

private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken cancellationToken)
Expand All @@ -109,10 +139,13 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
while (!cancellationToken.IsCancellationRequested)
{
if (Logging.IsEnabled)
Logging.Info(this, refreshesOn, $"Before {nameof(RefreshLoopAsync)} with wait time {waitTime}.");
Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} before {nameof(RefreshLoopAsync)} with wait time {waitTime}.");

if (waitTime > TimeSpan.Zero)
{
if (Logging.IsEnabled)
Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} waiting {waitTime} {nameof(RefreshLoopAsync)}.");

await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -133,7 +166,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
catch (IotHubClientException ex) when (ex.ErrorCode is IotHubClientErrorCode.NetworkErrors)
{
if (Logging.IsEnabled)
Logging.Error(this, refreshesOn, $"Refresh token failed {ex}");
Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}");
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -173,7 +206,32 @@ private static string CreateAmqpCbsAudience(IConnectionCredentials connectionCre

public void Dispose()
{
_loopCancellationTokenSource?.Dispose();
Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}");

if (!_disposed)
{
if (disposing)
{
_refresherCancellationTokenSource?.Dispose();
}

_disposed = true;
}
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}");
}
}
}
}
22 changes: 15 additions & 7 deletions iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,24 @@ public AmqpUnit CreateAmqpUnit(
return amqpUnit;
}

public void Shutdown()
public async Task ShutdownAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, _amqpIotConnection, nameof(Shutdown));
Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync));

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

_amqpAuthenticationRefresher?.StopLoop();
if (_amqpIotConnection != null)
{
_amqpIotConnection.Closed -= OnConnectionClosed;
_amqpIotConnection.SafeClose();
}

if (Logging.IsEnabled)
Logging.Exit(this, _amqpIotConnection, nameof(Shutdown));
Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync));
}

public void Dispose()
Expand Down Expand Up @@ -185,7 +189,11 @@ public async Task<AmqpIotConnection> EnsureConnectionAsync(CancellationToken can
}
catch (Exception ex) when (!Fx.IsFatal(ex))
{
amqpAuthenticationRefresher?.StopLoop();
if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

amqpIotConnection?.SafeClose();
throw;
}
Expand Down Expand Up @@ -213,7 +221,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit)
// TODO #887: handle gracefulDisconnect
// Currently, when all devices got removed, AmqpConnectionHolder will terminate the TCP connection/websocket instead of graceful disconnect(CloseAsync).
// This is tracking work to add a mechanism to gracefully disconnect within the last device's CloseAsync task context.
Shutdown();
_ = ShutdownAsync().ConfigureAwait(false);
}
}

Expand All @@ -233,7 +241,7 @@ private void OnConnectionClosed(object o, EventArgs args)

if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
HashSet<AmqpUnit> amqpUnits;
lock (_unitsLock)
{
Expand Down
37 changes: 23 additions & 14 deletions iothub/device/src/Transport/Amqp/AmqpUnit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ internal async Task<AmqpIotSession> EnsureSessionIsOpenAsync(CancellationToken c
}
catch (Exception)
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
throw;
}
finally
Expand Down Expand Up @@ -221,10 +221,15 @@ internal async Task CloseAsync(CancellationToken cancellationToken)
try
{
await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false);

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}
}
finally
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
}
}
}
Expand All @@ -238,25 +243,29 @@ internal async Task CloseAsync(CancellationToken cancellationToken)
}
}

private void Cleanup()
private async Task CleanupAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(Cleanup));
Logging.Enter(this, nameof(CleanupAsync));

if (_amqpIotSession != null)
{
_amqpIotSession.Closed -= OnSessionDisconnected;
_amqpIotSession.SafeClose();
}
_amqpAuthenticationRefresher?.StopLoop();

if (!IsPooled())
if (_amqpAuthenticationRefresher != null)
{
_amqpConnectionHolder?.Shutdown();
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

if (!IsPooled() && _amqpConnectionHolder != null)
{
await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false);
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(Cleanup));
Logging.Exit(this, nameof(CleanupAsync));
}

private bool IsPooled()
Expand Down Expand Up @@ -1021,7 +1030,7 @@ internal void OnConnectionDisconnected()
if (Logging.IsEnabled)
Logging.Enter(this, nameof(OnConnectionDisconnected));

_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
_onUnitDisconnected();

if (Logging.IsEnabled)
Expand All @@ -1035,7 +1044,7 @@ private void OnSessionDisconnected(object o, EventArgs args)

if (ReferenceEquals(o, _amqpIotSession))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);

// calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry
_onUnitDisconnected();
Expand All @@ -1060,15 +1069,15 @@ public void Dispose()

if (!_isDisposed)
{
Cleanup();
if (!IsPooled())
{
_amqpConnectionHolder?.Dispose();
}

// For device SAS authenticated clients the authentication refresher is associated with the AMQP unit itself,
// so it needs to be explicitly stopped.
_amqpAuthenticationRefresher?.StopLoop();
if (_amqpAuthenticationRefresher is AmqpAuthenticationRefresher refresher)
{
refresher.Dispose();
}

_sessionSemaphore?.Dispose();
_messageReceivingLinkSemaphore?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp
internal interface IAmqpAuthenticationRefresher
{
Task InitLoopAsync(CancellationToken cancellationToken);
void StartLoop(DateTime refreshOn);
void StopLoop();
void StartLoop(DateTime refreshOn, CancellationToken cancellationToken);
Task StopLoopAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ internal interface IAmqpConnectionHolder : IDisposable

Task<IAmqpAuthenticationRefresher> CreateRefresherAsync(IConnectionCredentials connectionCredentials, CancellationToken cancellationToken);

void Shutdown();
Task ShutdownAsync();
}
}