diff --git a/build.ps1 b/build.ps1 index a9b2bdceae..6a9151bdee 100644 --- a/build.ps1 +++ b/build.ps1 @@ -217,7 +217,7 @@ Function BuildPackage($path, $message) SignDotNetBinary $filesToSign } - & dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --output $localPackages + & dotnet pack --verbosity $verbosity --configuration $configuration --no-build --include-symbols --include-source --property:PackageOutputPath=$localPackages if ($LASTEXITCODE -ne 0) { diff --git a/e2e/test/iothub/FileUploadE2ETests.cs b/e2e/test/iothub/FileUploadE2ETests.cs index 421e93d475..c908a0d53e 100644 --- a/e2e/test/iothub/FileUploadE2ETests.cs +++ b/e2e/test/iothub/FileUploadE2ETests.cs @@ -114,6 +114,7 @@ public async Task FileUpload_SmallFile_Http_GranularSteps_x509() [TestMethod] [Timeout(TestTimeoutMilliseconds)] [TestCategory("LongRunning")] + [TestCategory("Proxy")] public async Task FileUpload_SmallFile_Http_GranularSteps_Proxy() { string filename = await GetTestFileNameAsync(FileSizeSmall).ConfigureAwait(false); diff --git a/e2e/test/iothub/service/RegistryManagerE2ETests.cs b/e2e/test/iothub/service/RegistryManagerE2ETests.cs index 2ddc085836..e64369d675 100644 --- a/e2e/test/iothub/service/RegistryManagerE2ETests.cs +++ b/e2e/test/iothub/service/RegistryManagerE2ETests.cs @@ -328,6 +328,7 @@ public async Task RegistryManager_RemoveDevices2Async_Works() [TestMethod] [Timeout(TestTimeoutMilliseconds)] + [TestCategory("Proxy")] public async Task RegistryManager_AddDeviceWithProxy() { string deviceId = _idPrefix + Guid.NewGuid(); diff --git a/iothub/device/src/Pipeline/DefaultDelegatingHandler.cs b/iothub/device/src/Pipeline/DefaultDelegatingHandler.cs index 4cdef80c32..269b78dc73 100644 --- a/iothub/device/src/Pipeline/DefaultDelegatingHandler.cs +++ b/iothub/device/src/Pipeline/DefaultDelegatingHandler.cs @@ -11,8 +11,9 @@ namespace Microsoft.Azure.Devices.Client.Transport { internal abstract class DefaultDelegatingHandler : IDelegatingHandler { - private volatile IDelegatingHandler _innerHandler; + protected const string ClientDisposedMessage = "The client has been disposed and is no longer usable."; protected volatile bool _isDisposed; + private volatile IDelegatingHandler _innerHandler; protected DefaultDelegatingHandler(PipelineContext context, IDelegatingHandler innerHandler) { @@ -209,7 +210,7 @@ protected void ThrowIfDisposed() { if (_isDisposed) { - throw new ObjectDisposedException("IoT hub client"); + throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } } diff --git a/iothub/device/src/Pipeline/RetryDelegatingHandler.cs b/iothub/device/src/Pipeline/RetryDelegatingHandler.cs index 91e6b19077..623a33c4f9 100644 --- a/iothub/device/src/Pipeline/RetryDelegatingHandler.cs +++ b/iothub/device/src/Pipeline/RetryDelegatingHandler.cs @@ -21,17 +21,23 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler private RetryPolicy _internalRetryPolicy; - private SemaphoreSlim _handlerSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _clientOpenSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _cloudToDeviceMessageSubscriptionSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _cloudToDeviceEventSubscriptionSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _directMethodSubscriptionSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _twinEventsSubscriptionSemaphore = new SemaphoreSlim(1, 1); private bool _openCalled; - private bool _opened; private bool _methodsEnabled; private bool _twinEnabled; private bool _eventsEnabled; private bool _deviceReceiveMessageEnabled; + private bool _isDisposing; private bool _isAnEdgeModule = true; + private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0. private Task _transportClosedTask; private readonly CancellationTokenSource _handleDisconnectCts = new CancellationTokenSource(); + private readonly CancellationTokenSource _cancelPendingOperationsCts = new CancellationTokenSource(); private readonly ConnectionStatusChangesHandler _onConnectionStatusChanged; @@ -76,20 +82,22 @@ public override async Task SendEventAsync(Message message, CancellationToken can if (Logging.IsEnabled) Logging.Enter(this, message, cancellationToken, nameof(SendEventAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); if (message.IsBodyCalled) { message.ResetBody(); } - await base.SendEventAsync(message, cancellationToken).ConfigureAwait(false); + await base.SendEventAsync(message, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -106,11 +114,13 @@ public override async Task SendEventAsync(IEnumerable messages, Cancell if (Logging.IsEnabled) Logging.Enter(this, messages, cancellationToken, nameof(SendEventAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); foreach (Message m in messages) { @@ -120,9 +130,9 @@ await _internalRetryPolicy } } - await base.SendEventAsync(messages, cancellationToken).ConfigureAwait(false); + await base.SendEventAsync(messages, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -139,14 +149,16 @@ public override async Task SendMethodResponseAsync(MethodResponseInternal method if (Logging.IsEnabled) Logging.Enter(this, method, cancellationToken, nameof(SendMethodResponseAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await base.SendMethodResponseAsync(method, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.SendMethodResponseAsync(method, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -163,14 +175,16 @@ public override async Task ReceiveAsync(CancellationToken cancellationT if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(ReceiveAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + return await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - return await base.ReceiveAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + return await base.ReceiveAsync(operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -188,6 +202,8 @@ public override async Task ReceiveAsync(TimeoutHelper timeoutHelper) Logging.Enter(this, timeoutHelper, nameof(ReceiveAsync)); using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime()); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token); + return await _internalRetryPolicy .RunWithRetryAsync( async () => @@ -195,7 +211,7 @@ public override async Task ReceiveAsync(TimeoutHelper timeoutHelper) await EnsureOpenedAsync(false, timeoutHelper).ConfigureAwait(false); return await base.ReceiveAsync(timeoutHelper).ConfigureAwait(false); }, - cts.Token) + operationCts.Token) .ConfigureAwait(false); } finally @@ -212,28 +228,38 @@ public override async Task EnableReceiveMessageAsync(CancellationToken cancellat if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(EnableReceiveMessageAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - // Ensure that the connection has been opened, before enabling the callback for receiving messages. - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); - // Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + // Wait to acquire the _cloudToDeviceSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. + await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { // The telemetry downlink needs to be enabled only for the first time that the callback is set. Debug.Assert(!_deviceReceiveMessageEnabled); - await base.EnableReceiveMessageAsync(cancellationToken).ConfigureAwait(false); + await base.EnableReceiveMessageAsync(operationCts.Token).ConfigureAwait(false); _deviceReceiveMessageEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceMessageSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -252,28 +278,38 @@ public override async Task EnsurePendingMessagesAreDeliveredAsync(CancellationTo if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(EnsurePendingMessagesAreDeliveredAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - // Ensure that the connection has been opened before returning pending messages to the callback. - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); - // Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + // Wait to acquire the _cloudToDeviceMessageSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. + await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { // Ensure that a callback for receiving messages has been previously set. Debug.Assert(_deviceReceiveMessageEnabled); - await base.EnsurePendingMessagesAreDeliveredAsync(cancellationToken).ConfigureAwait(false); + await base.EnsurePendingMessagesAreDeliveredAsync(operationCts.Token).ConfigureAwait(false); } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceMessageSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -290,28 +326,38 @@ public override async Task DisableReceiveMessageAsync(CancellationToken cancella if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(DisableReceiveMessageAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - // Ensure that the connection has been opened, before disabling the callback for receiving messages. - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); - // Wait to acquire the _handlerSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + // Wait to acquire the _cloudToDeviceMessageSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner. + await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { // Ensure that a callback for receiving messages has been previously set. Debug.Assert(_deviceReceiveMessageEnabled); - await base.DisableReceiveMessageAsync(cancellationToken).ConfigureAwait(false); + await base.DisableReceiveMessageAsync(operationCts.Token).ConfigureAwait(false); _deviceReceiveMessageEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceMessageSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device message subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -328,25 +374,35 @@ public override async Task EnableMethodsAsync(CancellationToken cancellationToke if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(EnableMethodsAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _directMethodSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(!_methodsEnabled); - await base.EnableMethodsAsync(cancellationToken).ConfigureAwait(false); + await base.EnableMethodsAsync(operationCts.Token).ConfigureAwait(false); _methodsEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _directMethodSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing direct method subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -363,24 +419,35 @@ public override async Task DisableMethodsAsync(CancellationToken cancellationTok if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(DisableMethodsAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _directMethodSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(_methodsEnabled); - await base.DisableMethodsAsync(cancellationToken).ConfigureAwait(false); + await base.DisableMethodsAsync(operationCts.Token).ConfigureAwait(false); _methodsEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _directMethodSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing direct method subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -394,28 +461,39 @@ public override async Task EnableEventReceiveAsync(bool isAnEdgeModule, Cancella { try { - _isAnEdgeModule = isAnEdgeModule; if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(EnableEventReceiveAsync)); + _isAnEdgeModule = isAnEdgeModule; + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _cloudToDeviceEventSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { - await base.EnableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false); Debug.Assert(!_eventsEnabled); + await base.EnableEventReceiveAsync(isAnEdgeModule, operationCts.Token).ConfigureAwait(false); _eventsEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceEventSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -429,28 +507,39 @@ public override async Task DisableEventReceiveAsync(bool isAnEdgeModule, Cancell { try { - _isAnEdgeModule = isAnEdgeModule; if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(DisableEventReceiveAsync)); + _isAnEdgeModule = isAnEdgeModule; + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _cloudToDeviceEventSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(_eventsEnabled); - await base.DisableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false); + await base.DisableEventReceiveAsync(isAnEdgeModule, operationCts.Token).ConfigureAwait(false); _eventsEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _cloudToDeviceEventSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing cloud-to-device event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -467,24 +556,35 @@ public override async Task EnableTwinPatchAsync(CancellationToken cancellationTo if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(EnableTwinPatchAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _twinEventsSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(!_twinEnabled); - await base.EnableTwinPatchAsync(cancellationToken).ConfigureAwait(false); + await base.EnableTwinPatchAsync(operationCts.Token).ConfigureAwait(false); _twinEnabled = true; } finally { - _handlerSemaphore?.Release(); + try + { + _twinEventsSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -501,24 +601,35 @@ public override async Task DisableTwinPatchAsync(CancellationToken cancellationT if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(DisableTwinPatchAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await _twinEventsSubscriptionSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { Debug.Assert(_twinEnabled); - await base.DisableTwinPatchAsync(cancellationToken).ConfigureAwait(false); + await base.DisableTwinPatchAsync(operationCts.Token).ConfigureAwait(false); _twinEnabled = false; } finally { - _handlerSemaphore?.Release(); + try + { + _twinEventsSubscriptionSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -535,14 +646,16 @@ public override async Task SendTwinGetAsync(CancellationToken cancellation if (Logging.IsEnabled) Logging.Enter(this, cancellationToken, nameof(SendTwinGetAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + return await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - return await base.SendTwinGetAsync(cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + return await base.SendTwinGetAsync(operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -559,14 +672,16 @@ public override async Task SendTwinPatchAsync(TwinCollection reportedProperties, if (Logging.IsEnabled) Logging.Enter(this, reportedProperties, cancellationToken, nameof(SendTwinPatchAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await base.SendTwinPatchAsync(reportedProperties, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.SendTwinPatchAsync(reportedProperties, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -583,14 +698,16 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can if (Logging.IsEnabled) Logging.Enter(this, lockToken, cancellationToken, nameof(CompleteAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await base.CompleteAsync(lockToken, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.CompleteAsync(lockToken, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -607,14 +724,16 @@ public override async Task AbandonAsync(string lockToken, CancellationToken canc if (Logging.IsEnabled) Logging.Enter(this, lockToken, cancellationToken, nameof(AbandonAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await base.AbandonAsync(lockToken, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.AbandonAsync(lockToken, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -631,14 +750,16 @@ public override async Task RejectAsync(string lockToken, CancellationToken cance if (Logging.IsEnabled) Logging.Enter(this, lockToken, cancellationToken, nameof(RejectAsync)); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + await _internalRetryPolicy .RunWithRetryAsync( async () => { - await EnsureOpenedAsync(false, cancellationToken).ConfigureAwait(false); - await base.RejectAsync(lockToken, cancellationToken).ConfigureAwait(false); + await EnsureOpenedAsync(false, operationCts.Token).ConfigureAwait(false); + await base.RejectAsync(lockToken, operationCts.Token).ConfigureAwait(false); }, - cancellationToken) + operationCts.Token) .ConfigureAwait(false); } finally @@ -655,7 +776,6 @@ public override Task OpenAsync(CancellationToken cancellationToken) public override async Task CloseAsync(CancellationToken cancellationToken) { - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (!_openCalled) @@ -667,15 +787,15 @@ public override async Task CloseAsync(CancellationToken cancellationToken) Logging.Enter(this, cancellationToken, nameof(CloseAsync)); _handleDisconnectCts.Cancel(); + _cancelPendingOperationsCts.Cancel(); await base.CloseAsync(cancellationToken).ConfigureAwait(false); } finally { + Dispose(true); + if (Logging.IsEnabled) Logging.Exit(this, cancellationToken, nameof(CloseAsync)); - - _handlerSemaphore?.Release(); - Dispose(true); } } @@ -684,23 +804,25 @@ public override async Task CloseAsync(CancellationToken cancellationToken) /// private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellationToken) { + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + // If this object has already been disposed, we will throw an exception indicating that. // This is the entry point for interacting with the client and this safety check should be done here. // The current behavior does not support open->close->open if (_isDisposed) { - throw new ObjectDisposedException(nameof(RetryDelegatingHandler)); + throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } - if (Volatile.Read(ref _opened)) + if (Interlocked.Read(ref _isOpened) == 1) { return; } - await _handlerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { - if (!_opened) + if (Interlocked.Read(ref _isOpened) == 0) { if (Logging.IsEnabled) Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync)); @@ -709,7 +831,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat // we are returning the corresponding connection status change event => disconnected: retry_expired. try { - await OpenInternalAsync(withRetry, cancellationToken).ConfigureAwait(false); + await OpenInternalAsync(withRetry, operationCts.Token).ConfigureAwait(false); } catch (Exception ex) when (!ex.IsFatal()) { @@ -719,7 +841,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat if (!_isDisposed) { - _opened = true; + _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" _openCalled = true; // Send the request for transport close notification. @@ -736,26 +858,42 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat } finally { - _handlerSemaphore?.Release(); + try + { + _clientOpenSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } } private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper) { - if (Volatile.Read(ref _opened)) + using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime()); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token); + + // If this object has already been disposed, we will throw an exception indicating that. + // This is the entry point for interacting with the client and this safety check should be done here. + // The current behavior does not support open->close->open + if (_isDisposed) { - return; + throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } - bool gain = await _handlerSemaphore.WaitAsync(timeoutHelper.GetRemainingTime()).ConfigureAwait(false); - if (!gain) + if (Interlocked.Read(ref _isOpened) == 1) { - throw new TimeoutException("Timed out to acquire handler lock."); + return; } + await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); + try { - if (!_opened) + if (Interlocked.Read(ref _isOpened) == 0) { if (Logging.IsEnabled) Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync)); @@ -774,7 +912,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper if (!_isDisposed) { - _opened = true; + _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" _openCalled = true; // Send the request for transport close notification. @@ -791,12 +929,23 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper } finally { - _handlerSemaphore?.Release(); + try + { + _clientOpenSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } } private async Task OpenInternalAsync(bool withRetry, CancellationToken cancellationToken) { + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancelPendingOperationsCts.Token); + if (withRetry) { await _internalRetryPolicy @@ -809,7 +958,7 @@ await _internalRetryPolicy Logging.Enter(this, cancellationToken, nameof(OpenAsync)); // Will throw on error. - await base.OpenAsync(cancellationToken).ConfigureAwait(false); + await base.OpenAsync(operationCts.Token).ConfigureAwait(false); _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); } catch (Exception ex) when (!ex.IsFatal()) @@ -823,7 +972,7 @@ await _internalRetryPolicy Logging.Exit(this, cancellationToken, nameof(OpenAsync)); } }, - cancellationToken).ConfigureAwait(false); + operationCts.Token).ConfigureAwait(false); } else { @@ -833,7 +982,7 @@ await _internalRetryPolicy Logging.Enter(this, cancellationToken, nameof(OpenAsync)); // Will throw on error. - await base.OpenAsync(cancellationToken).ConfigureAwait(false); + await base.OpenAsync(operationCts.Token).ConfigureAwait(false); _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); } catch (Exception ex) when (!ex.IsFatal()) @@ -852,6 +1001,7 @@ await _internalRetryPolicy private async Task OpenInternalAsync(bool withRetry, TimeoutHelper timeoutHelper) { using var cts = new CancellationTokenSource(timeoutHelper.GetRemainingTime()); + using var operationCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, _cancelPendingOperationsCts.Token); if (withRetry) { @@ -879,7 +1029,7 @@ await _internalRetryPolicy Logging.Exit(this, timeoutHelper, nameof(OpenAsync)); } }, - cts.Token) + operationCts.Token) .ConfigureAwait(false); } else @@ -904,8 +1054,6 @@ await _internalRetryPolicy Logging.Exit(this, timeoutHelper, nameof(OpenAsync)); } } - - } // Triggered from connection loss event @@ -937,8 +1085,8 @@ private async Task HandleDisconnectAsync() if (Logging.IsEnabled) Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync)); - await _handlerSemaphore.WaitAsync().ConfigureAwait(false); - _opened = false; + await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false); + _ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed" try { @@ -1009,7 +1157,7 @@ await _internalRetryPolicy.RunWithRetryAsync(async () => // Send the request for transport close notification. _transportClosedTask = HandleDisconnectAsync(); - _opened = true; + _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); if (Logging.IsEnabled) @@ -1026,7 +1174,16 @@ await _internalRetryPolicy.RunWithRetryAsync(async () => } finally { - _handlerSemaphore?.Release(); + try + { + _clientOpenSemaphore?.Release(); + } + catch (ObjectDisposedException) when (_isDisposing) + { + if (Logging.IsEnabled) + Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } } @@ -1085,20 +1242,41 @@ protected override void Dispose(bool disposing) if (!_isDisposed) { + _isDisposing = true; + base.Dispose(disposing); if (disposing) { + var disposables = new List + { + _handleDisconnectCts, + _cancelPendingOperationsCts, + _clientOpenSemaphore, + _cloudToDeviceMessageSubscriptionSemaphore, + _cloudToDeviceEventSubscriptionSemaphore, + _directMethodSubscriptionSemaphore, + _twinEventsSubscriptionSemaphore, + }; + _handleDisconnectCts?.Cancel(); - _handleDisconnectCts?.Dispose(); - if (_handlerSemaphore != null && _handlerSemaphore.CurrentCount == 0) + _cancelPendingOperationsCts?.Cancel(); + + foreach (IDisposable disposable in disposables) { - _handlerSemaphore.Release(); + try + { + disposable?.Dispose(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Tried disposing the IDisposable {disposable} but it has already been disposed by client disposal on a separate thread." + + "Ignoring this exception and continuing with client cleanup."); + } } - _handlerSemaphore?.Dispose(); - _handlerSemaphore = null; } - // the _disposed flag is inherited from the base class DefaultDelegatingHandler and is finally set to null there. + // the _disposed flag is inherited from the base class DefaultDelegatingHandler and is finally set to true there. } } finally diff --git a/iothub/device/tests/Pipeline/RetryDelegatingHandlerTests.cs b/iothub/device/tests/Pipeline/RetryDelegatingHandlerTests.cs index 7c59c216ff..b9e0448d5e 100644 --- a/iothub/device/tests/Pipeline/RetryDelegatingHandlerTests.cs +++ b/iothub/device/tests/Pipeline/RetryDelegatingHandlerTests.cs @@ -27,13 +27,12 @@ public async Task RetryDelegatingHandler_OpenAsyncRetries() // arrange int callCounter = 0; - var ct = CancellationToken.None; PipelineContext contextMock = Substitute.For(); contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { }); IDelegatingHandler innerHandlerMock = Substitute.For(); innerHandlerMock - .OpenAsync(ct) + .OpenAsync(Arg.Any()) .Returns(t => { return ++callCounter == 1 @@ -45,7 +44,7 @@ public async Task RetryDelegatingHandler_OpenAsyncRetries() var retryDelegatingHandler = new RetryDelegatingHandler(contextMock, innerHandlerMock); // act - await retryDelegatingHandler.OpenAsync(ct).ConfigureAwait(false); + await retryDelegatingHandler.OpenAsync(CancellationToken.None).ConfigureAwait(false); // assert callCounter.Should().Be(2); @@ -255,12 +254,12 @@ public async Task DeviceNotFoundExceptionReturnsDeviceDisabledStatus() public async Task RetryTransientErrorThrownAfterNumberOfRetriesThrows() { // arrange - using var cts = new CancellationTokenSource(100); + using var cts = new CancellationTokenSource(1000); var contextMock = Substitute.For(); contextMock.ConnectionStatusChangesHandler = new ConnectionStatusChangesHandler(delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { }); var innerHandlerMock = Substitute.For(); innerHandlerMock - .OpenAsync(cts.Token) + .OpenAsync(Arg.Any()) .Returns(t => throw new IotHubException(TestExceptionMessage, isTransient: true)); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); @@ -352,12 +351,12 @@ public async Task RetrySetRetryPolicyVerifyInternalsSuccess() delegate (ConnectionStatus status, ConnectionStatusChangeReason reason) { }); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); - var retryPolicy = new TestRetryPolicy(); + var retryPolicy = new TestRetryPolicyRetryTwice(); sut.SetRetryPolicy(retryPolicy); int innerHandlerCallCounter = 0; - innerHandlerMock.OpenAsync(CancellationToken.None).Returns(t => + innerHandlerMock.OpenAsync(Arg.Any()).Returns(t => { innerHandlerCallCounter++; throw new IotHubCommunicationException(); @@ -397,7 +396,7 @@ public async Task RetryCancellationTokenCanceledAbandon() { // arrange var innerHandlerMock = Substitute.For(); - innerHandlerMock.AbandonAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask); + innerHandlerMock.AbandonAsync(null, Arg.Any()).ReturnsForAnyArgs(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); @@ -416,7 +415,7 @@ public async Task RetryCancellationTokenCanceledReject() { // arrange var innerHandlerMock = Substitute.For(); - innerHandlerMock.RejectAsync(null, CancellationToken.None).ReturnsForAnyArgs(TaskHelpers.CompletedTask); + innerHandlerMock.RejectAsync(null, Arg.Any()).ReturnsForAnyArgs(TaskHelpers.CompletedTask); var contextMock = Substitute.For(); var sut = new RetryDelegatingHandler(contextMock, innerHandlerMock); @@ -427,7 +426,7 @@ public async Task RetryCancellationTokenCanceledReject() await sut.RejectAsync(Arg.Any(), cts.Token).ExpectedAsync().ConfigureAwait(false); } - private class TestRetryPolicy : IRetryPolicy + private class TestRetryPolicyRetryTwice : IRetryPolicy { public int Counter { get; private set; } diff --git a/iothub/device/tests/Transport/Amqp/AmqpConnectionPoolTests.cs b/iothub/device/tests/Transport/Amqp/AmqpConnectionPoolTests.cs index 506aa2ff20..aff820c023 100644 --- a/iothub/device/tests/Transport/Amqp/AmqpConnectionPoolTests.cs +++ b/iothub/device/tests/Transport/Amqp/AmqpConnectionPoolTests.cs @@ -1,4 +1,7 @@ -using System; +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -13,6 +16,7 @@ namespace Microsoft.Azure.Devices.Client.Tests.Amqp { [TestClass] + [TestCategory("Unit")] public class AmqpConnectionPoolTests { internal class AmqpConnectionPoolTest : AmqpConnectionPool diff --git a/vsts/vsts.yaml b/vsts/vsts.yaml index 10cc14ade3..37e41849de 100644 --- a/vsts/vsts.yaml +++ b/vsts/vsts.yaml @@ -548,11 +548,7 @@ jobs: pool: vmImage: windows-2022 steps: - - script: | - rem Run dotnet first experience. - dotnet new - rem Start build - build.cmd -clean -build -configuration Debug -package + - powershell: .\build.ps1 -clean -build -configutaion Debug -package displayName: Build Package - task: ComponentGovernanceComponentDetection@0