Skip to content

Commit

Permalink
Await completion of refresh loop (#3021)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhipsaMisra authored Dec 19, 2022
1 parent 07a7694 commit 297bee8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 33 deletions.
37 changes: 25 additions & 12 deletions iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
// then log the exception and continue.
// This task runs on an unmonitored thread so there is no point throwing these exceptions.
if (Logging.IsEnabled)
Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}");
Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed: {ex}");
}
finally
{
Expand All @@ -143,23 +143,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
}
}

public void StopLoop()
public async Task StopLoopAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopLoop));

try
{
_refresherCancellationTokenSource?.Cancel();
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopLoopAsync));

try
{
_refresherCancellationTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync));
}

// Await the completion of _refreshLoop.
// This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress.
await _refreshLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}");
}
catch (ObjectDisposedException)
finally
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop));
Logging.Exit(this, nameof(StopLoopAsync));
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(StopLoop));
}

public void Dispose()
Expand All @@ -181,7 +195,6 @@ private void Dispose(bool disposing)
{
if (disposing)
{
StopLoop();
_refresherCancellationTokenSource?.Dispose();
_amqpIotCbsTokenProvider?.Dispose();
}
Expand Down
22 changes: 15 additions & 7 deletions iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private void OnConnectionClosed(object o, EventArgs args)

if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
HashSet<AmqpUnit> amqpUnits;
lock (_unitsLock)
{
Expand All @@ -85,16 +85,20 @@ private void OnConnectionClosed(object o, EventArgs args)
Logging.Exit(this, o, nameof(OnConnectionClosed));
}

public void Shutdown()
public async Task ShutdownAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, _amqpIotConnection, nameof(Shutdown));
Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync));

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

_amqpAuthenticationRefresher?.StopLoop();
_amqpIotConnection?.SafeClose();

if (Logging.IsEnabled)
Logging.Exit(this, _amqpIotConnection, nameof(Shutdown));
Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync));
}

public void Dispose()
Expand Down Expand Up @@ -218,7 +222,11 @@ public async Task<AmqpIotConnection> EnsureConnectionAsync(CancellationToken can
}
catch (Exception ex) when (!ex.IsFatal())
{
amqpAuthenticationRefresher?.StopLoop();
if (amqpAuthenticationRefresher != null)
{
await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

amqpIotConnection?.SafeClose();
throw;
}
Expand All @@ -244,7 +252,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit)
if (_amqpUnits.Count == 0)
{
// TODO #887: handle gracefulDisconnect
Shutdown();
_ = ShutdownAsync().ConfigureAwait(false);
}
}

Expand Down
31 changes: 19 additions & 12 deletions iothub/device/src/Transport/Amqp/AmqpUnit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ internal async Task<AmqpIotSession> EnsureSessionIsOpenAsync(CancellationToken c
}
catch (Exception)
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
throw;
}
finally
Expand Down Expand Up @@ -208,10 +208,15 @@ public async Task CloseAsync(CancellationToken cancellationToken)
try
{
await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false);

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}
}
finally
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
}
}
}
Expand All @@ -225,21 +230,25 @@ public async Task CloseAsync(CancellationToken cancellationToken)
}
}

private void Cleanup()
private async Task CleanupAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(Cleanup));
Logging.Enter(this, nameof(CleanupAsync));

_amqpIotSession?.SafeClose();
_amqpAuthenticationRefresher?.StopLoop();

if (!_deviceIdentity.IsPooling())
if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null)
{
_amqpConnectionHolder?.Shutdown();
await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false);
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(Cleanup));
Logging.Exit(this, nameof(CleanupAsync));
}

#endregion Open-Close
Expand Down Expand Up @@ -1056,7 +1065,7 @@ public void OnConnectionDisconnected()
if (Logging.IsEnabled)
Logging.Enter(this, nameof(OnConnectionDisconnected));

_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
_onUnitDisconnected();

if (Logging.IsEnabled)
Expand All @@ -1070,7 +1079,7 @@ private void OnSessionDisconnected(object o, EventArgs args)

if (ReferenceEquals(o, _amqpIotSession))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);

// calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry
_onUnitDisconnected();
Expand Down Expand Up @@ -1102,15 +1111,13 @@ private void Dispose(bool disposing)
{
if (disposing)
{
Cleanup();
if (!_deviceIdentity.IsPooling())
{
_amqpConnectionHolder?.Dispose();
}

// For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself,
// so it needs to be explicitly disposed.
_amqpAuthenticationRefresher?.StopLoop();
_amqpAuthenticationRefresher?.Dispose();

_sessionSemaphore?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp
internal interface IAmqpAuthenticationRefresher : IDisposable
{
Task InitLoopAsync(CancellationToken cancellationToken);
void StopLoop();
void StartLoop(DateTime refreshOn, CancellationToken cancellationToken);
Task StopLoopAsync();
}
}
2 changes: 1 addition & 1 deletion iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ internal interface IAmqpConnectionHolder : IDisposable

Task<IAmqpAuthenticationRefresher> CreateRefresherAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken);

void Shutdown();
Task ShutdownAsync();
}
}

0 comments on commit 297bee8

Please sign in to comment.