From 1dc5eeff1d6102ed1eaad229381e701cea267bfd Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 13 Mar 2023 17:39:28 -0700 Subject: [PATCH] Set state to 'closed' when CloseAsync() is called (#3149) --- .../src/Pipeline/RetryDelegatingHandler.cs | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/iothub/device/src/Pipeline/RetryDelegatingHandler.cs b/iothub/device/src/Pipeline/RetryDelegatingHandler.cs index 623a33c4f9..95b3cbb9a7 100644 --- a/iothub/device/src/Pipeline/RetryDelegatingHandler.cs +++ b/iothub/device/src/Pipeline/RetryDelegatingHandler.cs @@ -14,6 +14,12 @@ namespace Microsoft.Azure.Devices.Client.Transport { + internal enum ClientTransportStatus + { + Closed = 0, + Open = 1, + } + internal class RetryDelegatingHandler : DefaultDelegatingHandler { // RetryCount is used for testing purpose and is equal to MaxValue in prod. @@ -33,7 +39,7 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler private bool _deviceReceiveMessageEnabled; private bool _isDisposing; private bool _isAnEdgeModule = true; - private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0. + private long _clientTransportState; // references the current client transport status as the int value of ClientTransportStatus private Task _transportClosedTask; private readonly CancellationTokenSource _handleDisconnectCts = new CancellationTokenSource(); @@ -792,6 +798,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken) } finally { + SetClientTransportStatus(ClientTransportStatus.Closed); Dispose(true); if (Logging.IsEnabled) @@ -814,7 +821,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } - if (Interlocked.Read(ref _isOpened) == 1) + if (GetClientTransportStatus() == ClientTransportStatus.Open) { return; } @@ -822,7 +829,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false); try { - if (Interlocked.Read(ref _isOpened) == 0) + if (GetClientTransportStatus() == ClientTransportStatus.Closed) { if (Logging.IsEnabled) Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync)); @@ -841,7 +848,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat if (!_isDisposed) { - _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" + SetClientTransportStatus(ClientTransportStatus.Open); _openCalled = true; // Send the request for transport close notification. @@ -884,7 +891,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper throw new ObjectDisposedException("IoT client", ClientDisposedMessage); } - if (Interlocked.Read(ref _isOpened) == 1) + if (GetClientTransportStatus() == ClientTransportStatus.Open) { return; } @@ -893,7 +900,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper try { - if (Interlocked.Read(ref _isOpened) == 0) + if (GetClientTransportStatus() == ClientTransportStatus.Closed) { if (Logging.IsEnabled) Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync)); @@ -912,7 +919,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper if (!_isDisposed) { - _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" + SetClientTransportStatus(ClientTransportStatus.Open); _openCalled = true; // Send the request for transport close notification. @@ -1086,7 +1093,7 @@ private async Task HandleDisconnectAsync() Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync)); await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false); - _ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed" + SetClientTransportStatus(ClientTransportStatus.Closed); try { @@ -1157,7 +1164,7 @@ await _internalRetryPolicy.RunWithRetryAsync(async () => // Send the request for transport close notification. _transportClosedTask = HandleDisconnectAsync(); - _ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened" + SetClientTransportStatus(ClientTransportStatus.Open); _onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok); if (Logging.IsEnabled) @@ -1231,6 +1238,16 @@ private void HandleConnectionStatusExceptions(Exception exception, bool retryAtt nameof(HandleConnectionStatusExceptions)); } + private ClientTransportStatus GetClientTransportStatus() + { + return (ClientTransportStatus)Interlocked.Read(ref _clientTransportState); + } + + private void SetClientTransportStatus(ClientTransportStatus clientTransportStatus) + { + _ = Interlocked.Exchange(ref _clientTransportState, (int)clientTransportStatus); + } + protected override void Dispose(bool disposing) { try @@ -1243,8 +1260,10 @@ protected override void Dispose(bool disposing) if (!_isDisposed) { _isDisposing = true; + SetClientTransportStatus(ClientTransportStatus.Closed); base.Dispose(disposing); + if (disposing) { var disposables = new List