From 5c54650e83c02a668f503ca347cf49af50e332bb Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 14:13:27 -0800 Subject: [PATCH 1/6] Address corner cases around sas token refresh loop cleanup --- .../Amqp/AmqpAuthenticationRefresher.cs | 116 +++++++++++++----- .../Transport/Amqp/AmqpConnectionHolder.cs | 22 ++-- iothub/device/src/Transport/Amqp/AmqpUnit.cs | 37 +++--- .../Amqp/IAmqpAuthenticationRefresher.cs | 4 +- .../Transport/Amqp/IAmqpConnectionHolder.cs | 2 +- 5 files changed, 129 insertions(+), 52 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 1d5e84d840..b860ff849e 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -18,8 +18,9 @@ internal class AmqpAuthenticationRefresher : IAmqpAuthenticationRefresher, IDisp private readonly IConnectionCredentials _connectionCredentials; private readonly AmqpIotCbsTokenProvider _amqpIotCbsTokenProvider; private readonly string _audience; + private CancellationTokenSource _refresherCancellationTokenSource; private Task _refreshLoop; - private CancellationTokenSource _loopCancellationTokenSource; + private bool _disposed; internal AmqpAuthenticationRefresher(IConnectionCredentials connectionCredentials, AmqpIotCbsLink amqpCbsLink) { @@ -51,54 +52,81 @@ async Task IAmqpAuthenticationRefresher.InitLoopAsync(CancellationToken cancella cancellationToken) .ConfigureAwait(false); + // This cancellation token source is disposed when the authentication refresher is disposed + // or if this code block is executed more than once per instance of AmqpAuthenticationRefresher (not expected). + + if (_refresherCancellationTokenSource != null) + { + if (Logging.IsEnabled) + Logging.Info(this, "_refresherCancellationTokenSource was already initialized, whhich was unexpected. Canceling and disposing the previous instance.", nameof(IAmqpAuthenticationRefresher.InitLoopAsync)); + + try + { + _refresherCancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + } + _refresherCancellationTokenSource.Dispose(); + } + _refresherCancellationTokenSource = new CancellationTokenSource(); + + // AmqpAuthenticationRefresher.StartLoop + // TODO if (refreshOn < DateTime.MaxValue && this is IAmqpAuthenticationRefresher refresher) { - refresher.StartLoop(refreshOn); + refresher.StartLoop(refreshOn, _refresherCancellationTokenSource.Token); } if (Logging.IsEnabled) Logging.Exit(this, nameof(IAmqpAuthenticationRefresher.InitLoopAsync)); } - void IAmqpAuthenticationRefresher.StartLoop(DateTime refreshOn) + void IAmqpAuthenticationRefresher.StartLoop(DateTime refreshOn, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, refreshOn, nameof(IAmqpAuthenticationRefresher.StartLoop)); - if (_loopCancellationTokenSource == null - || _refreshLoop == null) - { - (this as IAmqpAuthenticationRefresher)?.StopLoop(); - } - - _loopCancellationTokenSource = new CancellationTokenSource(); - _refreshLoop = RefreshLoopAsync( - refreshOn, - _loopCancellationTokenSource.Token); + // This task runs in the background and is unmonitored. + // When this refresher is disposed it signals this task to be cancelled. + _refreshLoop = RefreshLoopAsync(refreshOn, cancellationToken); if (Logging.IsEnabled) Logging.Exit(this, refreshOn, nameof(IAmqpAuthenticationRefresher.StartLoop)); } - async void IAmqpAuthenticationRefresher.StopLoop() + async Task IAmqpAuthenticationRefresher.StopLoopAsync() { - _loopCancellationTokenSource?.Cancel(); - if (_refreshLoop != null) + try { + if (Logging.IsEnabled) + Logging.Enter(this, nameof(IAmqpAuthenticationRefresher.StopLoopAsync)); + try { - await _refreshLoop.ConfigureAwait(false); + _refresherCancellationTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(IAmqpAuthenticationRefresher.StopLoopAsync)); } - catch (OperationCanceledException) { } - _refreshLoop = null; - } - - _loopCancellationTokenSource?.Dispose(); - _loopCancellationTokenSource = null; - if (Logging.IsEnabled) - Logging.Info(this, nameof(IAmqpAuthenticationRefresher.StopLoop)); + // Await the completion of _refreshLoop. + // This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress. + await _refreshLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}"); + } + finally + { + if (Logging.IsEnabled) + Logging.Exit(this, nameof(IAmqpAuthenticationRefresher.StopLoopAsync)); + } } private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken cancellationToken) @@ -109,10 +137,13 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc while (!cancellationToken.IsCancellationRequested) { if (Logging.IsEnabled) - Logging.Info(this, refreshesOn, $"Before {nameof(RefreshLoopAsync)} with wait time {waitTime}."); + Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} before {nameof(RefreshLoopAsync)} with wait time {waitTime}."); if (waitTime > TimeSpan.Zero) { + if (Logging.IsEnabled) + Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} waiting {waitTime} {nameof(RefreshLoopAsync)}."); + await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false); } @@ -133,7 +164,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc catch (IotHubClientException ex) when (ex.ErrorCode is IotHubClientErrorCode.NetworkErrors) { if (Logging.IsEnabled) - Logging.Error(this, refreshesOn, $"Refresh token failed {ex}"); + Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}"); } catch (OperationCanceledException) { @@ -173,7 +204,36 @@ private static string CreateAmqpCbsAudience(IConnectionCredentials connectionCre public void Dispose() { - _loopCancellationTokenSource?.Dispose(); + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) + { + try + { + if (Logging.IsEnabled) + { + Logging.Enter(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); + } + + if (!_disposed) + { + if (disposing) + { + _refresherCancellationTokenSource?.Dispose(); + } + + _disposed = true; + } + } + finally + { + if (Logging.IsEnabled) + { + Logging.Exit(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); + } + } } } } diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 79eaf64860..89c0b541fc 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -67,12 +67,16 @@ public AmqpUnit CreateAmqpUnit( return amqpUnit; } - public void Shutdown() + public async Task ShutdownAsync() { if (Logging.IsEnabled) - Logging.Enter(this, _amqpIotConnection, nameof(Shutdown)); + Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync)); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } - _amqpAuthenticationRefresher?.StopLoop(); if (_amqpIotConnection != null) { _amqpIotConnection.Closed -= OnConnectionClosed; @@ -80,7 +84,7 @@ public void Shutdown() } if (Logging.IsEnabled) - Logging.Exit(this, _amqpIotConnection, nameof(Shutdown)); + Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync)); } public void Dispose() @@ -185,7 +189,11 @@ public async Task EnsureConnectionAsync(CancellationToken can } catch (Exception ex) when (!Fx.IsFatal(ex)) { - amqpAuthenticationRefresher?.StopLoop(); + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + amqpIotConnection?.SafeClose(); throw; } @@ -213,7 +221,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) // TODO #887: handle gracefulDisconnect // Currently, when all devices got removed, AmqpConnectionHolder will terminate the TCP connection/websocket instead of graceful disconnect(CloseAsync). // This is tracking work to add a mechanism to gracefully disconnect within the last device's CloseAsync task context. - Shutdown(); + _ = ShutdownAsync().ConfigureAwait(false); } } @@ -233,7 +241,7 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); HashSet amqpUnits; lock (_unitsLock) { diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index 792be01ffb..7414b27508 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -193,7 +193,7 @@ internal async Task EnsureSessionIsOpenAsync(CancellationToken c } catch (Exception) { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); throw; } finally @@ -221,10 +221,15 @@ internal async Task CloseAsync(CancellationToken cancellationToken) try { await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } } finally { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); } } } @@ -238,25 +243,29 @@ internal async Task CloseAsync(CancellationToken cancellationToken) } } - private void Cleanup() + private async Task CleanupAsync() { if (Logging.IsEnabled) - Logging.Enter(this, nameof(Cleanup)); + Logging.Enter(this, nameof(CleanupAsync)); if (_amqpIotSession != null) { _amqpIotSession.Closed -= OnSessionDisconnected; _amqpIotSession.SafeClose(); } - _amqpAuthenticationRefresher?.StopLoop(); - if (!IsPooled()) + if (_amqpAuthenticationRefresher != null) { - _amqpConnectionHolder?.Shutdown(); + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + + if (!IsPooled() && _amqpConnectionHolder != null) + { + await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false); } if (Logging.IsEnabled) - Logging.Exit(this, nameof(Cleanup)); + Logging.Exit(this, nameof(CleanupAsync)); } private bool IsPooled() @@ -1021,7 +1030,7 @@ internal void OnConnectionDisconnected() if (Logging.IsEnabled) Logging.Enter(this, nameof(OnConnectionDisconnected)); - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); _onUnitDisconnected(); if (Logging.IsEnabled) @@ -1035,7 +1044,7 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIotSession)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected(); @@ -1060,15 +1069,15 @@ public void Dispose() if (!_isDisposed) { - Cleanup(); if (!IsPooled()) { _amqpConnectionHolder?.Dispose(); } - // For device SAS authenticated clients the authentication refresher is associated with the AMQP unit itself, - // so it needs to be explicitly stopped. - _amqpAuthenticationRefresher?.StopLoop(); + if (_amqpAuthenticationRefresher is AmqpAuthenticationRefresher refresher) + { + refresher.Dispose(); + } _sessionSemaphore?.Dispose(); _messageReceivingLinkSemaphore?.Dispose(); diff --git a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs index fecf23808a..7579e23252 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp internal interface IAmqpAuthenticationRefresher { Task InitLoopAsync(CancellationToken cancellationToken); - void StartLoop(DateTime refreshOn); - void StopLoop(); + void StartLoop(DateTime refreshOn, CancellationToken cancellationToken); + Task StopLoopAsync(); } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs index 119d7a2f97..9a94c75e33 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs @@ -16,6 +16,6 @@ internal interface IAmqpConnectionHolder : IDisposable Task CreateRefresherAsync(IConnectionCredentials connectionCredentials, CancellationToken cancellationToken); - void Shutdown(); + Task ShutdownAsync(); } } From bef73625766d22c4447e834588411c06f883e026 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 14:16:22 -0800 Subject: [PATCH 2/6] Add Sophia to codeowners --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 89a92e133d..a1ad40cd66 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,3 +1,3 @@ # Track1 .NET Azure IoT Hub and DPS SDKs -* @drwill-ms @timtay-microsoft @abhipsaMisra @andyk-ms @brycewang-microsoft @tmahmood-microsoft @ngastelum-ms @patilsnr +* @drwill-ms @timtay-microsoft @abhipsaMisra @andyk-ms @brycewang-microsoft @tmahmood-microsoft @ngastelum-ms @patilsnr @schoims From 2a7a0c59f7a54a9db83f1c8497b848e40995cf6a Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 15:55:24 -0800 Subject: [PATCH 3/6] Update iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs Co-authored-by: brycewang-microsoft <94650966+brycewang-microsoft@users.noreply.github.com> --- iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index b860ff849e..b891440645 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -213,9 +213,7 @@ private void Dispose(bool disposing) try { if (Logging.IsEnabled) - { Logging.Enter(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); - } if (!_disposed) { From 472b99f20b83fab4fac66cbd122a1b97b223da78 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 15:55:36 -0800 Subject: [PATCH 4/6] Update iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs Co-authored-by: brycewang-microsoft <94650966+brycewang-microsoft@users.noreply.github.com> --- iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index b891440645..291c557c88 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -228,9 +228,7 @@ private void Dispose(bool disposing) finally { if (Logging.IsEnabled) - { Logging.Exit(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); - } } } } From 142dd0406ef48825c6fea101694ecfac3ae6ba1e Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 15:55:47 -0800 Subject: [PATCH 5/6] Update iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs Co-authored-by: brycewang-microsoft <94650966+brycewang-microsoft@users.noreply.github.com> --- iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 291c557c88..b532be65bd 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -58,7 +58,7 @@ async Task IAmqpAuthenticationRefresher.InitLoopAsync(CancellationToken cancella if (_refresherCancellationTokenSource != null) { if (Logging.IsEnabled) - Logging.Info(this, "_refresherCancellationTokenSource was already initialized, whhich was unexpected. Canceling and disposing the previous instance.", nameof(IAmqpAuthenticationRefresher.InitLoopAsync)); + Logging.Info(this, "_refresherCancellationTokenSource was already initialized, which was unexpected. Canceling and disposing the previous instance.", nameof(IAmqpAuthenticationRefresher.InitLoopAsync)); try { From afb818e17a421ab8817a2335ca2b0363a34c8a82 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 16:37:39 -0800 Subject: [PATCH 6/6] Update iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs --- .../src/Transport/Amqp/AmqpAuthenticationRefresher.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index b532be65bd..3a09227729 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -71,8 +71,10 @@ async Task IAmqpAuthenticationRefresher.InitLoopAsync(CancellationToken cancella } _refresherCancellationTokenSource = new CancellationTokenSource(); - // AmqpAuthenticationRefresher.StartLoop - // TODO + // AmqpAuthenticationRefresher.StartLoop is implemented as an explicit interface implementation. + // This is because we do not wish to make the interface and its methods public. + // Implicit interface implementation requires the interface and its methods to be public. + // Since StartLoop can only be accessed through an AmqpAuthenticationRefresher instance, we have to add the instance check. if (refreshOn < DateTime.MaxValue && this is IAmqpAuthenticationRefresher refresher) {