Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await completion of refresh loop #3021

Merged
merged 3 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
// then log the exception and continue.
// This task runs on an unmonitored thread so there is no point throwing these exceptions.
if (Logging.IsEnabled)
Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}");
Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed: {ex}");
}
finally
{
Expand All @@ -143,23 +143,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
}
}

public void StopLoop()
public async Task StopLoopAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopLoop));

try
{
_refresherCancellationTokenSource?.Cancel();
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopLoopAsync));

try
{
_refresherCancellationTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync));
}

// Await the completion of _refreshLoop.
// This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress.
await _refreshLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}");
}
catch (ObjectDisposedException)
finally
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop));
Logging.Exit(this, nameof(StopLoopAsync));
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(StopLoop));
}

public void Dispose()
Expand All @@ -181,7 +195,6 @@ private void Dispose(bool disposing)
{
if (disposing)
{
StopLoop();
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved
_refresherCancellationTokenSource?.Dispose();
_amqpIotCbsTokenProvider?.Dispose();
}
Expand Down
22 changes: 15 additions & 7 deletions iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private void OnConnectionClosed(object o, EventArgs args)

if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved
HashSet<AmqpUnit> amqpUnits;
lock (_unitsLock)
{
Expand All @@ -85,16 +85,20 @@ private void OnConnectionClosed(object o, EventArgs args)
Logging.Exit(this, o, nameof(OnConnectionClosed));
}

public void Shutdown()
public async Task ShutdownAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, _amqpIotConnection, nameof(Shutdown));
Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync));

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

_amqpAuthenticationRefresher?.StopLoop();
_amqpIotConnection?.SafeClose();

if (Logging.IsEnabled)
Logging.Exit(this, _amqpIotConnection, nameof(Shutdown));
Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync));
}

public void Dispose()
Expand Down Expand Up @@ -218,7 +222,11 @@ public async Task<AmqpIotConnection> EnsureConnectionAsync(CancellationToken can
}
catch (Exception ex) when (!ex.IsFatal())
{
amqpAuthenticationRefresher?.StopLoop();
if (amqpAuthenticationRefresher != null)
{
await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

amqpIotConnection?.SafeClose();
throw;
}
Expand All @@ -244,7 +252,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit)
if (_amqpUnits.Count == 0)
{
// TODO #887: handle gracefulDisconnect
Shutdown();
_ = ShutdownAsync().ConfigureAwait(false);
}
}

Expand Down
31 changes: 19 additions & 12 deletions iothub/device/src/Transport/Amqp/AmqpUnit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ internal async Task<AmqpIotSession> EnsureSessionIsOpenAsync(CancellationToken c
}
catch (Exception)
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
throw;
}
finally
Expand Down Expand Up @@ -208,10 +208,15 @@ public async Task CloseAsync(CancellationToken cancellationToken)
try
{
await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false);

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
}
}
}
Expand All @@ -225,21 +230,25 @@ public async Task CloseAsync(CancellationToken cancellationToken)
}
}

private void Cleanup()
private async Task CleanupAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(Cleanup));
Logging.Enter(this, nameof(CleanupAsync));

_amqpIotSession?.SafeClose();
_amqpAuthenticationRefresher?.StopLoop();

if (!_deviceIdentity.IsPooling())
if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved

if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null)
{
_amqpConnectionHolder?.Shutdown();
await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false);
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(Cleanup));
Logging.Exit(this, nameof(CleanupAsync));
}

#endregion Open-Close
Expand Down Expand Up @@ -1056,7 +1065,7 @@ public void OnConnectionDisconnected()
if (Logging.IsEnabled)
Logging.Enter(this, nameof(OnConnectionDisconnected));

_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved
_onUnitDisconnected();

if (Logging.IsEnabled)
Expand All @@ -1070,7 +1079,7 @@ private void OnSessionDisconnected(object o, EventArgs args)

if (ReferenceEquals(o, _amqpIotSession))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);

// calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry
_onUnitDisconnected();
Expand Down Expand Up @@ -1102,15 +1111,13 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved
if (!_deviceIdentity.IsPooling())
{
_amqpConnectionHolder?.Dispose();
}

// For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself,
// so it needs to be explicitly disposed.
_amqpAuthenticationRefresher?.StopLoop();
abhipsaMisra marked this conversation as resolved.
Show resolved Hide resolved
_amqpAuthenticationRefresher?.Dispose();

_sessionSemaphore?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp
internal interface IAmqpAuthenticationRefresher : IDisposable
{
Task InitLoopAsync(CancellationToken cancellationToken);
void StopLoop();
void StartLoop(DateTime refreshOn, CancellationToken cancellationToken);
Task StopLoopAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ internal interface IAmqpConnectionHolder : IDisposable

Task<IAmqpAuthenticationRefresher> CreateRefresherAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken);

void Shutdown();
Task ShutdownAsync();
}
}