Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple client open-close semaphore from callback subscription semaphore #3135

Merged
merged 19 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions iothub/device/src/Pipeline/DefaultDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ namespace Microsoft.Azure.Devices.Client.Transport
{
internal abstract class DefaultDelegatingHandler : IDelegatingHandler
{
private volatile IDelegatingHandler _innerHandler;
protected const string ClientDisposedMessage = "The client has already been disposed and is no longer usable.";
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved
protected volatile bool _isDisposed;
private volatile IDelegatingHandler _innerHandler;

protected DefaultDelegatingHandler(PipelineContext context, IDelegatingHandler innerHandler)
{
Expand Down Expand Up @@ -209,7 +210,7 @@ protected void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException("IoT hub client");
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}
}

Expand Down
38 changes: 19 additions & 19 deletions iothub/device/src/Pipeline/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler

private RetryPolicy _internalRetryPolicy;

private readonly SemaphoreSlim _clientOpenCloseSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _clientOpenSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _cloudToDeviceMessageSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _cloudToDeviceEventSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _directMethodSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _twinEventsSubscriptionSemaphore = new SemaphoreSlim(1, 1);
private bool _openCalled;
private bool _opened;
private bool _methodsEnabled;
private bool _twinEnabled;
private bool _eventsEnabled;
private bool _deviceReceiveMessageEnabled;
private bool _isDisposing;
private bool _isAnEdgeModule = true;
private long _isOpened; // store the opened status in an int which can be accessed via Interlocked class. opened = 1, closed = 0.

private Task _transportClosedTask;
private readonly CancellationTokenSource _handleDisconnectCts = new CancellationTokenSource();
Expand Down Expand Up @@ -811,18 +811,18 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat
// The current behavior does not support open->close->open
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(RetryDelegatingHandler));
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Volatile.Read(ref _opened))
if (Interlocked.Read(ref _isOpened) == 1)
{
return;
}

await _clientOpenCloseSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
try
{
if (!_opened)
if (Interlocked.Read(ref _isOpened) == 0)
{
if (Logging.IsEnabled)
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));
Expand All @@ -841,7 +841,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat

if (!_isDisposed)
{
_opened = true;
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
_openCalled = true;

// Send the request for transport close notification.
Expand All @@ -860,7 +860,7 @@ private async Task EnsureOpenedAsync(bool withRetry, CancellationToken cancellat
{
try
{
_clientOpenCloseSemaphore?.Release();
_clientOpenSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
Expand All @@ -881,19 +881,19 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper
// The current behavior does not support open->close->open
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(RetryDelegatingHandler));
throw new ObjectDisposedException("IoT client", ClientDisposedMessage);
}

if (Volatile.Read(ref _opened))
if (Interlocked.Read(ref _isOpened) == 1)
{
return;
}

await _clientOpenCloseSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);
await _clientOpenSemaphore.WaitAsync(operationCts.Token).ConfigureAwait(false);

try
{
if (!_opened)
if (Interlocked.Read(ref _isOpened) == 0)
{
if (Logging.IsEnabled)
Logging.Info(this, "Opening connection", nameof(EnsureOpenedAsync));
Expand All @@ -912,7 +912,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper

if (!_isDisposed)
{
_opened = true;
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
_openCalled = true;

// Send the request for transport close notification.
Expand All @@ -931,7 +931,7 @@ private async Task EnsureOpenedAsync(bool withRetry, TimeoutHelper timeoutHelper
{
try
{
_clientOpenCloseSemaphore?.Release();
_clientOpenSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
Expand Down Expand Up @@ -1085,8 +1085,8 @@ private async Task HandleDisconnectAsync()
if (Logging.IsEnabled)
Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync));

await _clientOpenCloseSemaphore.WaitAsync().ConfigureAwait(false);
_opened = false;
await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false);
_ = Interlocked.Exchange(ref _isOpened, 0); // set the state to "closed"

try
{
Expand Down Expand Up @@ -1157,7 +1157,7 @@ await _internalRetryPolicy.RunWithRetryAsync(async () =>
// Send the request for transport close notification.
_transportClosedTask = HandleDisconnectAsync();

_opened = true;
_ = Interlocked.Exchange(ref _isOpened, 1); // set the state to "opened"
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

if (Logging.IsEnabled)
Expand All @@ -1176,7 +1176,7 @@ await _internalRetryPolicy.RunWithRetryAsync(async () =>
{
try
{
_clientOpenCloseSemaphore?.Release();
_clientOpenSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
Expand Down Expand Up @@ -1251,7 +1251,7 @@ protected override void Dispose(bool disposing)
{
_handleDisconnectCts,
_cancelPendingOperationsCts,
_clientOpenCloseSemaphore,
_clientOpenSemaphore,
_cloudToDeviceMessageSubscriptionSemaphore,
_cloudToDeviceEventSubscriptionSemaphore,
_directMethodSubscriptionSemaphore,
Expand Down