diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 6528970d0d..9b8306e503 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -13,15 +13,15 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp { internal class AmqpAuthenticationRefresher : IAmqpAuthenticationRefresher, IDisposable { - private static readonly string[] AccessRightsStringArray = AccessRightsHelper.AccessRightsToStringArray(AccessRights.DeviceConnect); + private static readonly string[] s_accessRightsStringArray = AccessRightsHelper.AccessRightsToStringArray(AccessRights.DeviceConnect); private readonly AmqpIoTCbsLink _amqpIoTCbsLink; private readonly IotHubConnectionString _connectionString; private readonly AmqpIoTCbsTokenProvider _amqpIoTCbsTokenProvider; private readonly string _audience; - private CancellationTokenSource _cancellationTokenSource; - private TimeSpan _operationTimeout; + private readonly TimeSpan _operationTimeout; private Task _refreshLoop; private bool _disposed; + private CancellationTokenSource _refresherCancellationTokenSource; internal AmqpAuthenticationRefresher(DeviceIdentity deviceIdentity, AmqpIoTCbsLink amqpCbsLink) { @@ -49,24 +49,38 @@ public async Task InitLoopAsync(TimeSpan timeout) Logging.Enter(this, timeout, $"{nameof(InitLoopAsync)}"); } - CancellationTokenSource oldTokenSource = _cancellationTokenSource; - _cancellationTokenSource = new CancellationTokenSource(); - CancellationToken newToken = _cancellationTokenSource.Token; - oldTokenSource?.Cancel(); - DateTime refreshOn = await _amqpIoTCbsLink .SendTokenAsync( _amqpIoTCbsTokenProvider, _connectionString.AmqpEndpoint, _audience, _audience, - AccessRightsStringArray, + s_accessRightsStringArray, timeout) .ConfigureAwait(false); + // This cancellation token source is disposed when the authentication refresher is disposed + // or if this code block is executed more than once per instance of AmqpAuthenticationRefresher (not expected). + + if (_refresherCancellationTokenSource != null) + { + if (Logging.IsEnabled) + Logging.Info(this, "_refresherCancellationTokenSource was already initialized, whhich was unexpected. Canceling and disposing the previous instance.", nameof(InitLoopAsync)); + + try + { + _refresherCancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + } + _refresherCancellationTokenSource.Dispose(); + } + _refresherCancellationTokenSource = new CancellationTokenSource(); + if (refreshOn < DateTime.MaxValue) { - StartLoop(refreshOn, newToken); + StartLoop(refreshOn, _refresherCancellationTokenSource.Token); } if (Logging.IsEnabled) @@ -82,6 +96,8 @@ public void StartLoop(DateTime refreshOn, CancellationToken cancellationToken) Logging.Enter(this, refreshOn, $"{nameof(StartLoop)}"); } + // This task runs in the background and is unmonitored. + // When this refresher is disposed it signals this task to be cancelled. _refreshLoop = RefreshLoopAsync(refreshOn, cancellationToken); if (Logging.IsEnabled) { @@ -97,12 +113,13 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc while (!cancellationToken.IsCancellationRequested) { if (Logging.IsEnabled) - { - Logging.Info(this, refreshesOn, $"Before {nameof(RefreshLoopAsync)}"); - } + Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} before {nameof(RefreshLoopAsync)}"); if (waitTime > TimeSpan.Zero) { + if (Logging.IsEnabled) + Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} waiting {waitTime} {nameof(RefreshLoopAsync)}."); + await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false); } @@ -116,23 +133,22 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc _connectionString.AmqpEndpoint, _audience, _audience, - AccessRightsStringArray, + s_accessRightsStringArray, _operationTimeout) .ConfigureAwait(false); } - catch (IotHubCommunicationException ex) + catch (Exception ex) when (ex is IotHubCommunicationException || ex is OperationCanceledException) { + // In case the token refresh is not successful either due to a communication exception or cancellation token cancellation + // then log the exception and continue. + // This task runs on an unmonitored thread so there is no point throwing these exceptions. if (Logging.IsEnabled) - { - Logging.Info(this, refreshesOn, $"Refresh token failed {ex}"); - } + Logging.Error(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} refresh token failed: {ex}"); } finally { if (Logging.IsEnabled) - { - Logging.Info(this, refreshesOn, $"After {nameof(RefreshLoopAsync)}"); - } + Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} after {nameof(RefreshLoopAsync)}"); } waitTime = refreshesOn - DateTime.UtcNow; @@ -140,14 +156,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc } } - public void StopLoop() + public async Task StopLoopAsync() { - if (Logging.IsEnabled) + try { - Logging.Info(this, $"{nameof(StopLoop)}"); - } + if (Logging.IsEnabled) + Logging.Enter(this, nameof(StopLoopAsync)); - _cancellationTokenSource?.Cancel(); + try + { + _refresherCancellationTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync)); + } + + // Await the completion of _refreshLoop. + // This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress. + await _refreshLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}"); + } + finally + { + if (Logging.IsEnabled) + Logging.Exit(this, nameof(StopLoopAsync)); + } } public void Dispose() @@ -158,23 +197,30 @@ public void Dispose() private void Dispose(bool disposing) { - if (_disposed) + try { - return; - } + if (Logging.IsEnabled) + { + Logging.Enter(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); + } - if (Logging.IsEnabled) - { - Logging.Info(this, disposing, $"{nameof(Dispose)}"); - } + if (!_disposed) + { + if (disposing) + { + _refresherCancellationTokenSource?.Dispose(); + } - if (disposing) + _disposed = true; + } + } + finally { - StopLoop(); - _cancellationTokenSource.Dispose(); + if (Logging.IsEnabled) + { + Logging.Exit(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); + } } - - _disposed = true; } } } diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index f3d0eaea15..b10e96bc98 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -75,7 +75,7 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIoTConnection != null && ReferenceEquals(_amqpIoTConnection, o)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); HashSet amqpUnits; lock (_unitsLock) { @@ -92,19 +92,20 @@ private void OnConnectionClosed(object o, EventArgs args) } } - public void Shutdown() + public async Task ShutdownAsync() { if (Logging.IsEnabled) + Logging.Enter(this, _amqpIoTConnection, nameof(ShutdownAsync)); + + if (_amqpAuthenticationRefresher != null) { - Logging.Enter(this, _amqpIoTConnection, $"{nameof(Shutdown)}"); + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); } - _amqpAuthenticationRefresher?.StopLoop(); _amqpIoTConnection?.SafeClose(); + if (Logging.IsEnabled) - { - Logging.Exit(this, _amqpIoTConnection, $"{nameof(Shutdown)}"); - } + Logging.Exit(this, _amqpIoTConnection, nameof(ShutdownAsync)); } public void Dispose() @@ -232,7 +233,11 @@ public async Task EnsureConnectionAsync(TimeSpan timeout) } catch (Exception ex) when (!ex.IsFatal()) { - amqpAuthenticationRefresher?.StopLoop(); + if (amqpAuthenticationRefresher != null) + { + await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + amqpIoTConnection?.SafeClose(); throw; } @@ -261,7 +266,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) if (_amqpUnits.Count == 0) { // TODO #887: handle gracefulDisconnect - Shutdown(); + _ = ShutdownAsync().ConfigureAwait(false); } } if (Logging.IsEnabled) diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index 4b08a12f94..5ee391f9c3 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -139,7 +139,7 @@ internal async Task EnsureSessionIsOpenAsync(TimeSpan timeout) } catch (Exception) { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); throw; } finally @@ -169,10 +169,15 @@ public async Task CloseAsync(TimeSpan timeout) try { await _amqpIoTSession.CloseAsync(timeout).ConfigureAwait(false); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } } finally { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); } } } @@ -185,18 +190,25 @@ public async Task CloseAsync(TimeSpan timeout) } } - private void Cleanup() + private async Task CleanupAsync() { - Logging.Enter(this, nameof(Cleanup)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(CleanupAsync)); _amqpIoTSession?.SafeClose(); - _amqpAuthenticationRefresher?.StopLoop(); - if (!_deviceIdentity.IsPooling()) + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + + if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null) { - _amqpConnectionHolder?.Shutdown(); + await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false); } - Logging.Exit(this, nameof(Cleanup)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(CleanupAsync)); } #endregion Open-Close @@ -804,7 +816,7 @@ public void OnConnectionDisconnected() { Logging.Enter(this, nameof(OnConnectionDisconnected)); - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); _onUnitDisconnected(); Logging.Exit(this, nameof(OnConnectionDisconnected)); @@ -816,7 +828,9 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIoTSession)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); + + // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected(); } Logging.Exit(this, o, nameof(OnSessionDisconnected)); @@ -834,31 +848,45 @@ public void Dispose() private void Dispose(bool disposing) { - if (_disposed) + try { - return; - } + if (Logging.IsEnabled) + { + Logging.Enter(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}"); + } + + if (!_disposed) + { + if (disposing) + { + if (!_deviceIdentity.IsPooling()) + { + _amqpConnectionHolder?.Dispose(); + } - _disposed = true; + // For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself, + // so it needs to be explicitly disposed. + _amqpAuthenticationRefresher?.Dispose(); - if (disposing) - { - Logging.Enter(this, disposing, nameof(Dispose)); + _sessionSemaphore?.Dispose(); + _messageReceivingLinkSemaphore?.Dispose(); + _messageReceivingCallbackSemaphore?.Dispose(); + _eventReceivingLinkSemaphore?.Dispose(); + _methodLinkSemaphore?.Dispose(); + _twinLinksSemaphore?.Dispose(); - Cleanup(); - if (!_deviceIdentity.IsPooling()) - { - _amqpConnectionHolder?.Dispose(); + Logging.Exit(this, disposing, nameof(Dispose)); + } } - _sessionSemaphore?.Dispose(); - _messageReceivingLinkSemaphore?.Dispose(); - _messageReceivingCallbackSemaphore?.Dispose(); - _eventReceivingLinkSemaphore?.Dispose(); - _methodLinkSemaphore?.Dispose(); - _twinLinksSemaphore?.Dispose(); - - Logging.Exit(this, disposing, nameof(Dispose)); + _disposed = true; + } + finally + { + if (Logging.IsEnabled) + { + Logging.Exit(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}"); + } } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs index 3f7d4b6bc6..9f3c4432a6 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp internal interface IAmqpAuthenticationRefresher : IDisposable { Task InitLoopAsync(TimeSpan timeout); - void StopLoop(); void StartLoop(DateTime refreshOn, CancellationToken cancellationToken); + Task StopLoopAsync(); } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs index a3130f8044..35f4af6b6f 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs @@ -12,6 +12,6 @@ internal interface IAmqpConnectionHolder : IDisposable Task OpenSessionAsync(DeviceIdentity deviceIdentity, TimeSpan timeout); Task EnsureConnectionAsync(TimeSpan timeout); Task CreateRefresherAsync(DeviceIdentity deviceIdentity, TimeSpan timeout); - void Shutdown(); + Task ShutdownAsync(); } } diff --git a/vsts/vsts.yaml b/vsts/vsts.yaml index 0493cbd463..092663b121 100644 --- a/vsts/vsts.yaml +++ b/vsts/vsts.yaml @@ -485,10 +485,13 @@ jobs: inputs: script: 'choco install dotnet4.5.1' - - task: CmdLine@2 - displayName: 'Install .NET Core 2.1' + - task: UseDotNet@2 + displayName: 'Use .NET Core SDK 2.1' inputs: - script: 'choco install dotnetcore-2.1-sdk' + packageType: sdk + version: 2.1.x + performMultiLevelLookup: true + installationPath: $(Agent.ToolsDirectory)/dotnet - task: PublishTestResults@2 displayName: "Publish Test Results **/*.trx"