Skip to content

Commit

Permalink
Set state to 'closed' when CloseAsync() is called (Azure#3149)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhipsaMisra authored Mar 14, 2023
1 parent 34bb38b commit 696f281
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions iothub/device/src/Pipeline/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

namespace Microsoft.Azure.Devices.Client.Transport
{
internal enum ClientTransportStatus
{
Closed = 0,
Open = 1,
}

internal class RetryDelegatingHandler : DefaultDelegatingHandler
{
// RetryCount is used for testing purpose and is equal to MaxValue in prod.
Expand All @@ -33,7 +39,7 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler
private bool _deviceReceiveMessageEnabled;
private bool _isDisposing;
private bool _isAnEdgeModule = true;
private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0.
private long _clientTransportState; // references the current client transport status as the int value of ClientTransportStatus

private Task _transportClosedTask;
private readonly CancellationTokenSource _handleDisconnectCts = new CancellationTokenSource();
Expand Down Expand Up @@ -792,6 +798,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
}
finally
{
SetClientTransportStatus(ClientTransportStatus.Closed);
Dispose(true);

if (Logging.IsEnabled)
Expand All @@ -814,15 +821,15 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Interlocked.Read(ref _isOpened) == 1)
if (GetClientTransportStatus() == ClientTransportStatus.Open)
{
return;
}

await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
if (Interlocked.Read(ref _isOpened) == 0)
if (GetClientTransportStatus() == ClientTransportStatus.Closed)
{
if (Logging.IsEnabled)
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));
Expand All @@ -841,7 +848,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat

if (!_isDisposed)
{
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_openCalled = true;

// Send the request for transport close notification.
Expand Down Expand Up @@ -884,7 +891,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Interlocked.Read(ref _isOpened) == 1)
if (GetClientTransportStatus() == ClientTransportStatus.Open)
{
return;
}
Expand All @@ -893,7 +900,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

try
{
if (Interlocked.Read(ref _isOpened) == 0)
if (GetClientTransportStatus() == ClientTransportStatus.Closed)
{
if (Logging.IsEnabled)
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));
Expand All @@ -912,7 +919,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

if (!_isDisposed)
{
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_openCalled = true;

// Send the request for transport close notification.
Expand Down Expand Up @@ -1086,7 +1093,7 @@ private async Task HandleDisconnectAsync()
Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync));

await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false);
_ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed"
SetClientTransportStatus(ClientTransportStatus.Closed);

try
{
Expand Down Expand Up @@ -1157,7 +1164,7 @@ await _internalRetryPolicy.RunWithRetryAsync(async () =>
// Send the request for transport close notification.
_transportClosedTask = HandleDisconnectAsync();

_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

if (Logging.IsEnabled)
Expand Down Expand Up @@ -1231,6 +1238,16 @@ private void HandleConnectionStatusExceptions(Exception exception, bool retryAtt
nameof(HandleConnectionStatusExceptions));
}

private ClientTransportStatus GetClientTransportStatus()
{
return (ClientTransportStatus)Interlocked.Read(ref _clientTransportState);
}

private void SetClientTransportStatus(ClientTransportStatus clientTransportStatus)
{
_ = Interlocked.Exchange(ref _clientTransportState, (int)clientTransportStatus);
}

protected override void Dispose(bool disposing)
{
try
Expand All @@ -1243,8 +1260,10 @@ protected override void Dispose(bool disposing)
if (!_isDisposed)
{
_isDisposing = true;
SetClientTransportStatus(ClientTransportStatus.Closed);

base.Dispose(disposing);

if (disposing)
{
var disposables = new List<IDisposable>
Expand Down

0 comments on commit 696f281

Please sign in to comment.