diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 2a7d577763..e3e9efa774 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -130,7 +130,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc // then log the exception and continue. // This task runs on an unmonitored thread so there is no point throwing these exceptions. if (Logging.IsEnabled) - Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}"); + Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed: {ex}"); } finally { @@ -143,23 +143,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc } } - public void StopLoop() + public async Task StopLoopAsync() { - if (Logging.IsEnabled) - Logging.Enter(this, nameof(StopLoop)); - try { - _refresherCancellationTokenSource?.Cancel(); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(StopLoopAsync)); + + try + { + _refresherCancellationTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync)); + } + + // Await the completion of _refreshLoop. + // This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress. + await _refreshLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}"); } - catch (ObjectDisposedException) + finally { if (Logging.IsEnabled) - Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop)); + Logging.Exit(this, nameof(StopLoopAsync)); } - - if (Logging.IsEnabled) - Logging.Exit(this, nameof(StopLoop)); } public void Dispose() @@ -181,7 +195,6 @@ private void Dispose(bool disposing) { if (disposing) { - StopLoop(); _refresherCancellationTokenSource?.Dispose(); _amqpIotCbsTokenProvider?.Dispose(); } diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 6430abb022..44030223e1 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -69,7 +69,7 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); HashSet amqpUnits; lock (_unitsLock) { @@ -85,16 +85,20 @@ private void OnConnectionClosed(object o, EventArgs args) Logging.Exit(this, o, nameof(OnConnectionClosed)); } - public void Shutdown() + public async Task ShutdownAsync() { if (Logging.IsEnabled) - Logging.Enter(this, _amqpIotConnection, nameof(Shutdown)); + Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync)); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } - _amqpAuthenticationRefresher?.StopLoop(); _amqpIotConnection?.SafeClose(); if (Logging.IsEnabled) - Logging.Exit(this, _amqpIotConnection, nameof(Shutdown)); + Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync)); } public void Dispose() @@ -218,7 +222,11 @@ public async Task EnsureConnectionAsync(CancellationToken can } catch (Exception ex) when (!ex.IsFatal()) { - amqpAuthenticationRefresher?.StopLoop(); + if (amqpAuthenticationRefresher != null) + { + await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + amqpIotConnection?.SafeClose(); throw; } @@ -244,7 +252,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) if (_amqpUnits.Count == 0) { // TODO #887: handle gracefulDisconnect - Shutdown(); + _ = ShutdownAsync().ConfigureAwait(false); } } diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index b41f069d53..5558d6519e 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -173,7 +173,7 @@ internal async Task EnsureSessionIsOpenAsync(CancellationToken c } catch (Exception) { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); throw; } finally @@ -208,10 +208,15 @@ public async Task CloseAsync(CancellationToken cancellationToken) try { await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } } finally { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); } } } @@ -225,21 +230,25 @@ public async Task CloseAsync(CancellationToken cancellationToken) } } - private void Cleanup() + private async Task CleanupAsync() { if (Logging.IsEnabled) - Logging.Enter(this, nameof(Cleanup)); + Logging.Enter(this, nameof(CleanupAsync)); _amqpIotSession?.SafeClose(); - _amqpAuthenticationRefresher?.StopLoop(); - if (!_deviceIdentity.IsPooling()) + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + + if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null) { - _amqpConnectionHolder?.Shutdown(); + await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false); } if (Logging.IsEnabled) - Logging.Exit(this, nameof(Cleanup)); + Logging.Exit(this, nameof(CleanupAsync)); } #endregion Open-Close @@ -1056,7 +1065,7 @@ public void OnConnectionDisconnected() if (Logging.IsEnabled) Logging.Enter(this, nameof(OnConnectionDisconnected)); - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); _onUnitDisconnected(); if (Logging.IsEnabled) @@ -1070,7 +1079,7 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIotSession)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected(); @@ -1102,7 +1111,6 @@ private void Dispose(bool disposing) { if (disposing) { - Cleanup(); if (!_deviceIdentity.IsPooling()) { _amqpConnectionHolder?.Dispose(); @@ -1110,7 +1118,6 @@ private void Dispose(bool disposing) // For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself, // so it needs to be explicitly disposed. - _amqpAuthenticationRefresher?.StopLoop(); _amqpAuthenticationRefresher?.Dispose(); _sessionSemaphore?.Dispose(); diff --git a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs index 83a7c7fe1e..216217702b 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp internal interface IAmqpAuthenticationRefresher : IDisposable { Task InitLoopAsync(CancellationToken cancellationToken); - void StopLoop(); void StartLoop(DateTime refreshOn, CancellationToken cancellationToken); + Task StopLoopAsync(); } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs index 19c444a71b..31a120ee43 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs @@ -16,6 +16,6 @@ internal interface IAmqpConnectionHolder : IDisposable Task CreateRefresherAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken); - void Shutdown(); + Task ShutdownAsync(); } }