Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set state to 'closed' when CloseAsync() is called (#3149) #3160

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions iothub/device/src/Transport/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

namespace Microsoft.Azure.Devices.Client.Transport
{
internal enum ClientTransportStatus
{
Closed = 0,
Open = 1,
}

internal class RetryDelegatingHandler : DefaultDelegatingHandler
{
// RetryCount is used for testing purpose and is equal to MaxValue in prod.
Expand All @@ -37,7 +43,7 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler
private bool _eventsEnabled;
private bool _deviceReceiveMessageEnabled;
private bool _isDisposing;
private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0.
private long _clientTransportState; // references the current client transport status as the int value of ClientTransportStatus

private Task _transportClosedTask;

Expand Down Expand Up @@ -763,6 +769,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
}
finally
{
SetClientTransportStatus(ClientTransportStatus.Closed);
Dispose(true);

if (Logging.IsEnabled)
Expand All @@ -785,15 +792,15 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Interlocked.Read(ref _isOpened) == 1)
if (GetClientTransportStatus() == ClientTransportStatus.Open)
{
return;
}

await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
if (Interlocked.Read(ref _isOpened) == 0)
if (GetClientTransportStatus() == ClientTransportStatus.Closed)
{
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));

Expand All @@ -811,7 +818,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat

if (!_disposed)
{
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_openCalled = true;

// Send the request for transport close notification.
Expand Down Expand Up @@ -853,7 +860,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Interlocked.Read(ref _isOpened) == 1)
if (GetClientTransportStatus() == ClientTransportStatus.Open)
{
return;
}
Expand All @@ -862,7 +869,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

try
{
if (Interlocked.Read(ref _isOpened) == 0)
if (GetClientTransportStatus() == ClientTransportStatus.Closed)
{
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));

Expand All @@ -880,7 +887,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

if (!_disposed)
{
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_openCalled = true;

// Send the request for transport close notification.
Expand Down Expand Up @@ -1042,7 +1049,7 @@ private async Task HandleDisconnectAsync()
Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync));

await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false);
_ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed"
SetClientTransportStatus(ClientTransportStatus.Closed);

try
{
Expand Down Expand Up @@ -1111,7 +1118,7 @@ await _internalRetryPolicy.ExecuteAsync(async () =>
// Send the request for transport close notification.
_transportClosedTask = HandleDisconnectAsync();

_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
SetClientTransportStatus(ClientTransportStatus.Open);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

Logging.Info(this, "Subscriptions recovered.", nameof(HandleDisconnectAsync));
Expand Down Expand Up @@ -1175,6 +1182,16 @@ private void HandleConnectionStatusExceptions(Exception exception, bool connectF
Logging.Info(this, $"Connection status change: status={status}, reason={reason}", nameof(HandleConnectionStatusExceptions));
}

private ClientTransportStatus GetClientTransportStatus()
{
return (ClientTransportStatus)Interlocked.Read(ref _clientTransportState);
}

private void SetClientTransportStatus(ClientTransportStatus clientTransportStatus)
{
_ = Interlocked.Exchange(ref _clientTransportState, (int)clientTransportStatus);
}

protected override void Dispose(bool disposing)
{
try
Expand All @@ -1187,8 +1204,10 @@ protected override void Dispose(bool disposing)
if (!_disposed)
{
_isDisposing = true;
SetClientTransportStatus(ClientTransportStatus.Closed);

base.Dispose(disposing);

if (disposing)
{
var disposables = new List<IDisposable>
Expand Down