Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring amqp sas token refresh cleanup logic to LTS #3039

Merged
merged 5 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 84 additions & 38 deletions iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp
{
internal class AmqpAuthenticationRefresher : IAmqpAuthenticationRefresher, IDisposable
{
private static readonly string[] AccessRightsStringArray = AccessRightsHelper.AccessRightsToStringArray(AccessRights.DeviceConnect);
private static readonly string[] s_accessRightsStringArray = AccessRightsHelper.AccessRightsToStringArray(AccessRights.DeviceConnect);
private readonly AmqpIoTCbsLink _amqpIoTCbsLink;
private readonly IotHubConnectionString _connectionString;
private readonly AmqpIoTCbsTokenProvider _amqpIoTCbsTokenProvider;
private readonly string _audience;
private CancellationTokenSource _cancellationTokenSource;
private TimeSpan _operationTimeout;
private readonly TimeSpan _operationTimeout;
private Task _refreshLoop;
private bool _disposed;
private CancellationTokenSource _refresherCancellationTokenSource;

internal AmqpAuthenticationRefresher(DeviceIdentity deviceIdentity, AmqpIoTCbsLink amqpCbsLink)
{
Expand Down Expand Up @@ -49,24 +49,38 @@ public async Task InitLoopAsync(TimeSpan timeout)
Logging.Enter(this, timeout, $"{nameof(InitLoopAsync)}");
}

CancellationTokenSource oldTokenSource = _cancellationTokenSource;
_cancellationTokenSource = new CancellationTokenSource();
CancellationToken newToken = _cancellationTokenSource.Token;
oldTokenSource?.Cancel();

DateTime refreshOn = await _amqpIoTCbsLink
.SendTokenAsync(
_amqpIoTCbsTokenProvider,
_connectionString.AmqpEndpoint,
_audience,
_audience,
AccessRightsStringArray,
s_accessRightsStringArray,
timeout)
.ConfigureAwait(false);

// This cancellation token source is disposed when the authentication refresher is disposed
// or if this code block is executed more than once per instance of AmqpAuthenticationRefresher (not expected).

if (_refresherCancellationTokenSource != null)
{
if (Logging.IsEnabled)
Logging.Info(this, "_refresherCancellationTokenSource was already initialized, whhich was unexpected. Canceling and disposing the previous instance.", nameof(InitLoopAsync));

try
{
_refresherCancellationTokenSource.Cancel();
}
catch (ObjectDisposedException)
{
}
_refresherCancellationTokenSource.Dispose();
}
_refresherCancellationTokenSource = new CancellationTokenSource();

if (refreshOn < DateTime.MaxValue)
{
StartLoop(refreshOn, newToken);
StartLoop(refreshOn, _refresherCancellationTokenSource.Token);
}

if (Logging.IsEnabled)
Expand All @@ -82,6 +96,8 @@ public void StartLoop(DateTime refreshOn, CancellationToken cancellationToken)
Logging.Enter(this, refreshOn, $"{nameof(StartLoop)}");
}

// This task runs in the background and is unmonitored.
// When this refresher is disposed it signals this task to be cancelled.
_refreshLoop = RefreshLoopAsync(refreshOn, cancellationToken);
if (Logging.IsEnabled)
{
Expand All @@ -97,12 +113,13 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
while (!cancellationToken.IsCancellationRequested)
{
if (Logging.IsEnabled)
{
Logging.Info(this, refreshesOn, $"Before {nameof(RefreshLoopAsync)}");
}
Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} before {nameof(RefreshLoopAsync)}");

if (waitTime > TimeSpan.Zero)
{
if (Logging.IsEnabled)
Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} waiting {waitTime} {nameof(RefreshLoopAsync)}.");

await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -116,38 +133,60 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
_connectionString.AmqpEndpoint,
_audience,
_audience,
AccessRightsStringArray,
s_accessRightsStringArray,
_operationTimeout)
.ConfigureAwait(false);
}
catch (IotHubCommunicationException ex)
catch (Exception ex) when (ex is IotHubCommunicationException || ex is OperationCanceledException)
{
// In case the token refresh is not successful either due to a communication exception or cancellation token cancellation
// then log the exception and continue.
// This task runs on an unmonitored thread so there is no point throwing these exceptions.
if (Logging.IsEnabled)
{
Logging.Info(this, refreshesOn, $"Refresh token failed {ex}");
}
Logging.Error(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} refresh token failed: {ex}");
}
finally
{
if (Logging.IsEnabled)
{
Logging.Info(this, refreshesOn, $"After {nameof(RefreshLoopAsync)}");
}
Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} after {nameof(RefreshLoopAsync)}");
}

waitTime = refreshesOn - DateTime.UtcNow;
}
}
}

public void StopLoop()
public async Task StopLoopAsync()
{
if (Logging.IsEnabled)
try
{
Logging.Info(this, $"{nameof(StopLoop)}");
}
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopLoopAsync));

_cancellationTokenSource?.Cancel();
try
{
_refresherCancellationTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync));
}

// Await the completion of _refreshLoop.
// This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress.
await _refreshLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}");
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, nameof(StopLoopAsync));
}
}

public void Dispose()
Expand All @@ -158,23 +197,30 @@ public void Dispose()

private void Dispose(bool disposing)
{
if (_disposed)
try
{
return;
}
if (Logging.IsEnabled)
{
Logging.Enter(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}");
}

if (Logging.IsEnabled)
{
Logging.Info(this, disposing, $"{nameof(Dispose)}");
}
if (!_disposed)
{
if (disposing)
{
_refresherCancellationTokenSource?.Dispose();
}

if (disposing)
_disposed = true;
}
}
finally
{
StopLoop();
_cancellationTokenSource.Dispose();
if (Logging.IsEnabled)
{
Logging.Exit(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}");
}
}

_disposed = true;
}
}
}
23 changes: 14 additions & 9 deletions iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private void OnConnectionClosed(object o, EventArgs args)

if (_amqpIoTConnection != null && ReferenceEquals(_amqpIoTConnection, o))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
HashSet<AmqpUnit> amqpUnits;
lock (_unitsLock)
{
Expand All @@ -92,19 +92,20 @@ private void OnConnectionClosed(object o, EventArgs args)
}
}

public void Shutdown()
public async Task ShutdownAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, _amqpIoTConnection, nameof(ShutdownAsync));

if (_amqpAuthenticationRefresher != null)
{
Logging.Enter(this, _amqpIoTConnection, $"{nameof(Shutdown)}");
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

_amqpAuthenticationRefresher?.StopLoop();
_amqpIoTConnection?.SafeClose();

if (Logging.IsEnabled)
{
Logging.Exit(this, _amqpIoTConnection, $"{nameof(Shutdown)}");
}
Logging.Exit(this, _amqpIoTConnection, nameof(ShutdownAsync));
}

public void Dispose()
Expand Down Expand Up @@ -232,7 +233,11 @@ public async Task<AmqpIoTConnection> EnsureConnectionAsync(TimeSpan timeout)
}
catch (Exception ex) when (!ex.IsFatal())
{
amqpAuthenticationRefresher?.StopLoop();
if (amqpAuthenticationRefresher != null)
{
await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

amqpIoTConnection?.SafeClose();
throw;
}
Expand Down Expand Up @@ -261,7 +266,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit)
if (_amqpUnits.Count == 0)
{
// TODO #887: handle gracefulDisconnect
Shutdown();
_ = ShutdownAsync().ConfigureAwait(false);
}
}
if (Logging.IsEnabled)
Expand Down
Loading