From a897fdbe67ffe7244710c42fc462071d60b53c22 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Sun, 18 Dec 2022 22:50:41 -0800 Subject: [PATCH 1/4] updates --- .../Amqp/AmqpAuthenticationRefresher.cs | 35 +++++++++++++------ .../Transport/Amqp/AmqpConnectionHolder.cs | 25 +++++++++---- iothub/device/src/Transport/Amqp/AmqpUnit.cs | 33 ++++++++++------- .../Amqp/IAmqpAuthenticationRefresher.cs | 2 +- .../Transport/Amqp/IAmqpConnectionHolder.cs | 2 +- 5 files changed, 65 insertions(+), 32 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 2a7d577763..bf3b204b56 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -143,23 +143,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc } } - public void StopLoop() + public async Task StopLoopAsync() { - if (Logging.IsEnabled) - Logging.Enter(this, nameof(StopLoop)); - try { - _refresherCancellationTokenSource?.Cancel(); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(StopLoopAsync)); + + try + { + _refresherCancellationTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync)); + } + + // Await the completion of _refreshLoop. + // This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress. + await _refreshLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}"); } - catch (ObjectDisposedException) + finally { if (Logging.IsEnabled) - Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop)); + Logging.Exit(this, nameof(StopLoopAsync)); } - - if (Logging.IsEnabled) - Logging.Exit(this, nameof(StopLoop)); } public void Dispose() @@ -181,7 +195,6 @@ private void Dispose(bool disposing) { if (disposing) { - StopLoop(); _refresherCancellationTokenSource?.Dispose(); _amqpIotCbsTokenProvider?.Dispose(); } diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 6430abb022..b27d6bf60d 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -69,7 +69,11 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o)) { - _amqpAuthenticationRefresher?.StopLoop(); + if (_amqpAuthenticationRefresher != null) + { + _ = _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + HashSet amqpUnits; lock (_unitsLock) { @@ -85,16 +89,20 @@ private void OnConnectionClosed(object o, EventArgs args) Logging.Exit(this, o, nameof(OnConnectionClosed)); } - public void Shutdown() + public async Task ShutdownAsync() { if (Logging.IsEnabled) - Logging.Enter(this, _amqpIotConnection, nameof(Shutdown)); + Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync)); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } - _amqpAuthenticationRefresher?.StopLoop(); _amqpIotConnection?.SafeClose(); if (Logging.IsEnabled) - Logging.Exit(this, _amqpIotConnection, nameof(Shutdown)); + Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync)); } public void Dispose() @@ -218,7 +226,10 @@ public async Task EnsureConnectionAsync(CancellationToken can } catch (Exception ex) when (!ex.IsFatal()) { - amqpAuthenticationRefresher?.StopLoop(); + if (amqpAuthenticationRefresher != null) + { + await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } amqpIotConnection?.SafeClose(); throw; } @@ -244,7 +255,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) if (_amqpUnits.Count == 0) { // TODO #887: handle gracefulDisconnect - Shutdown(); + _ = ShutdownAsync().ConfigureAwait(false); } } diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index b41f069d53..1064d20df8 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -173,7 +173,7 @@ internal async Task EnsureSessionIsOpenAsync(CancellationToken c } catch (Exception) { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); throw; } finally @@ -208,10 +208,11 @@ public async Task CloseAsync(CancellationToken cancellationToken) try { await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false); + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); } finally { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); } } } @@ -225,21 +226,25 @@ public async Task CloseAsync(CancellationToken cancellationToken) } } - private void Cleanup() + private async Task CleanupAsync() { if (Logging.IsEnabled) - Logging.Enter(this, nameof(Cleanup)); + Logging.Enter(this, nameof(CleanupAsync)); _amqpIotSession?.SafeClose(); - _amqpAuthenticationRefresher?.StopLoop(); - if (!_deviceIdentity.IsPooling()) + if (_amqpAuthenticationRefresher != null) { - _amqpConnectionHolder?.Shutdown(); + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + + if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null) + { + await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false); } if (Logging.IsEnabled) - Logging.Exit(this, nameof(Cleanup)); + Logging.Exit(this, nameof(CleanupAsync)); } #endregion Open-Close @@ -1056,7 +1061,10 @@ public void OnConnectionDisconnected() if (Logging.IsEnabled) Logging.Enter(this, nameof(OnConnectionDisconnected)); - _amqpAuthenticationRefresher?.StopLoop(); + if (_amqpAuthenticationRefresher != null) + { + _ = _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } _onUnitDisconnected(); if (Logging.IsEnabled) @@ -1070,7 +1078,10 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIotSession)) { - _amqpAuthenticationRefresher?.StopLoop(); + if (_amqpAuthenticationRefresher != null) + { + _ = _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected(); @@ -1102,7 +1113,6 @@ private void Dispose(bool disposing) { if (disposing) { - Cleanup(); if (!_deviceIdentity.IsPooling()) { _amqpConnectionHolder?.Dispose(); @@ -1110,7 +1120,6 @@ private void Dispose(bool disposing) // For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself, // so it needs to be explicitly disposed. - _amqpAuthenticationRefresher?.StopLoop(); _amqpAuthenticationRefresher?.Dispose(); _sessionSemaphore?.Dispose(); diff --git a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs index 83a7c7fe1e..216217702b 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp internal interface IAmqpAuthenticationRefresher : IDisposable { Task InitLoopAsync(CancellationToken cancellationToken); - void StopLoop(); void StartLoop(DateTime refreshOn, CancellationToken cancellationToken); + Task StopLoopAsync(); } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs index 19c444a71b..31a120ee43 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs @@ -16,6 +16,6 @@ internal interface IAmqpConnectionHolder : IDisposable Task CreateRefresherAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken); - void Shutdown(); + Task ShutdownAsync(); } } From 6e62c8ea84b348a65023a29e919956af1ab77949 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Sun, 18 Dec 2022 22:57:02 -0800 Subject: [PATCH 2/4] moe --- .../device/src/Transport/Amqp/AmqpConnectionHolder.cs | 5 +---- iothub/device/src/Transport/Amqp/AmqpUnit.cs | 10 ++-------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index b27d6bf60d..ec077836d5 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -69,10 +69,7 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o)) { - if (_amqpAuthenticationRefresher != null) - { - _ = _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); - } + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); HashSet amqpUnits; lock (_unitsLock) diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index 1064d20df8..abc44ee015 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -1061,10 +1061,7 @@ public void OnConnectionDisconnected() if (Logging.IsEnabled) Logging.Enter(this, nameof(OnConnectionDisconnected)); - if (_amqpAuthenticationRefresher != null) - { - _ = _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); - } + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); _onUnitDisconnected(); if (Logging.IsEnabled) @@ -1078,10 +1075,7 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIotSession)) { - if (_amqpAuthenticationRefresher != null) - { - _ = _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); - } + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected(); From 27e09c53f0fb01296866d773972af4b6f7c1a86a Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Sun, 18 Dec 2022 22:59:33 -0800 Subject: [PATCH 3/4] minor --- iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 07c13890af..44030223e1 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -70,7 +70,6 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o)) { _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); - HashSet amqpUnits; lock (_unitsLock) { From 76fb6625c7b63f7b3f28c67a297e7f3d77e6b8f0 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Sun, 18 Dec 2022 23:00:27 -0800 Subject: [PATCH 4/4] minor --- iothub/device/src/Transport/Amqp/AmqpUnit.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index abc44ee015..65ee0da228 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -1061,7 +1061,7 @@ public void OnConnectionDisconnected() if (Logging.IsEnabled) Logging.Enter(this, nameof(OnConnectionDisconnected)); - _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); _onUnitDisconnected(); if (Logging.IsEnabled) @@ -1075,7 +1075,7 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIotSession)) { - _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected();