Skip to content

Commit

Permalink
Log possibly unobserved exceptions in MQTT (Azure#3319)
Browse files Browse the repository at this point in the history
* Log possibly unobserved exceptions in MQTT

* Apply other IDE suggestions
  • Loading branch information
drwill-ms authored May 2, 2023
1 parent 9fc5c6d commit 1a37b4f
Showing 1 changed file with 62 additions and 56 deletions.
118 changes: 62 additions & 56 deletions iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,12 @@ internal MqttTransportHandler(
else
{
ClientOptions options = context.ClientOptions;
switch (settings.GetTransportType())
_channelFactory = settings.GetTransportType() switch
{
case TransportType.Mqtt_Tcp_Only:
_channelFactory = CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
break;

case TransportType.Mqtt_WebSocket_Only:
_channelFactory = CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options);
break;

default:
throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType()));
}
TransportType.Mqtt_Tcp_Only => CreateChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
TransportType.Mqtt_WebSocket_Only => CreateWebSocketChannelFactory(iotHubConnectionString, settings, context.ProductInfo, options),
_ => throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(settings.GetTransportType())),
};
}

_closeRetryPolicy = new RetryPolicy(new TransientErrorIgnoreStrategy(), 5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
Expand Down Expand Up @@ -277,37 +270,35 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT

return null;
}
else
{
try
{
if (Logging.IsEnabled)
Logging.Enter(
this,
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");

cancellationToken.ThrowIfCancellationRequested();
try
{
if (Logging.IsEnabled)
Logging.Enter(
this,
cancellationToken, $"ReceiveAsync() called with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");

EnsureValidState();
cancellationToken.ThrowIfCancellationRequested();

if (State != TransportState.Receiving)
{
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
}
EnsureValidState();

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessC2dMessage();
}
finally
if (State != TransportState.Receiving)
{
if (Logging.IsEnabled)
Logging.Exit(
this,
cancellationToken,
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");
await SubscribeCloudToDeviceMessagesAsync().ConfigureAwait(false);
}

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessC2dMessage();
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(
this,
cancellationToken,
$"Exiting ReceiveAsync() with cancellation requested state of: {cancellationToken.IsCancellationRequested}",
$"{nameof(ReceiveAsync)}");
}
}

Expand All @@ -320,6 +311,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)

return null;
}

try
{
if (Logging.IsEnabled)
Expand Down Expand Up @@ -405,7 +397,7 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can
isTransient: false);
}

if (_completionQueue.Count == 0)
if (_completionQueue.IsEmpty)
{
throw new IotHubException("Unknown lock token.", isTransient: false);
}
Expand Down Expand Up @@ -444,9 +436,10 @@ protected override void Dispose(bool disposing)
try
{
if (Logging.IsEnabled)
{
Logging.Enter(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
Logging.Enter(
this,
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");

if (!_isDisposed)
{
Expand All @@ -458,6 +451,18 @@ protected override void Dispose(bool disposing)
CleanUpAsync().GetAwaiter().GetResult();
}

// Log the task completion source tasks' exceptions and avoid unobserved exceptions.
if (_connectCompletion.Task.Exception != null)
{
if (Logging.IsEnabled)
Logging.Error(this, $"{_connectCompletion} has exception {_connectCompletion.Task.Exception}", nameof(Dispose));
}
if (_subscribeCompletionSource.Task.Exception != null)
{
if (Logging.IsEnabled)
Logging.Error(this, $"{_subscribeCompletionSource} has exception {_subscribeCompletionSource.Task.Exception}", nameof(Dispose));
}

_disconnectAwaitersCancellationSource?.Dispose();
_disconnectAwaitersCancellationSource = null;

Expand All @@ -480,9 +485,10 @@ protected override void Dispose(bool disposing)
finally
{
if (Logging.IsEnabled)
{
Logging.Exit(this, $"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}", $"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
Logging.Exit(
this,
$"{nameof(DefaultDelegatingHandler)}.Disposed={_isDisposed}; disposing={disposing}",
$"{nameof(MqttTransportHandler)}.{nameof(Dispose)}");
}
}

Expand Down Expand Up @@ -726,8 +732,9 @@ public async void OnError(Exception exception)
break;

default:
Debug.Fail($"Unknown transport state: {previousState}");
throw new InvalidOperationException();
string error = $"Unknown transport state: {previousState}";
Debug.Fail(error);
throw new InvalidOperationException(error);
}

await _closeRetryPolicy.RunWithRetryAsync(CleanUpImplAsync).ConfigureAwait(true);
Expand All @@ -753,6 +760,7 @@ private TransportState MoveToStateIfPossible(TransportState destination, Transpo
{
return previousState;
}

TransportState prevState;
if ((prevState = (TransportState)Interlocked.CompareExchange(ref _state, (int)destination, (int)previousState)) == previousState)
{
Expand Down Expand Up @@ -944,11 +952,9 @@ public override async Task<Twin> SendTwinGetAsync(CancellationToken cancellation
Properties = JsonConvert.DeserializeObject<TwinProperties>(body),
};
}
catch (JsonReaderException ex)
catch (JsonReaderException ex) when (Logging.IsEnabled)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");

Logging.Error(this, $"Failed to parse Twin JSON: {ex}. Message body: '{body}'");
throw;
}
}
Expand Down Expand Up @@ -1117,7 +1123,7 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
Message response = null; ;
ExceptionDispatchInfo responseException = null;

Action<Message> onTwinResponse = (Message possibleResponse) =>
void OnTwinResponse(Message possibleResponse)
{
try
{
Expand Down Expand Up @@ -1152,11 +1158,11 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
responseException = ExceptionDispatchInfo.Capture(e);
responseReceived.Release();
}
};
}

try
{
_twinResponseEvent += onTwinResponse;
_twinResponseEvent += OnTwinResponse;

await SendEventAsync(request, cancellationToken).ConfigureAwait(false);

Expand All @@ -1175,7 +1181,7 @@ private async Task<Message> SendTwinRequestAsync(Message request, string rid, Ca
}
finally
{
_twinResponseEvent -= onTwinResponse;
_twinResponseEvent -= OnTwinResponse;
}
}

Expand All @@ -1185,7 +1191,7 @@ private Func<IPAddress[], int, Task<IChannel>> CreateChannelFactory(IotHubConnec
{
IChannel channel = null;

Func<Stream, SslStream> streamFactory = stream => new SslStream(stream, true, settings.RemoteCertificateValidationCallback);
SslStream StreamFactory(Stream stream) => new SslStream(stream, true, settings.RemoteCertificateValidationCallback);

List<X509Certificate> certs = settings.ClientCertificate == null
? new List<X509Certificate>(0)
Expand Down Expand Up @@ -1216,7 +1222,7 @@ private Func<IPAddress[], int, Task<IChannel>> CreateChannelFactory(IotHubConnec
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
{
var tlsHandler = new TlsHandler(streamFactory, clientTlsSettings);
var tlsHandler = new TlsHandler(StreamFactory, clientTlsSettings);
ch.Pipeline.AddLast(
tlsHandler,
MqttEncoder.Instance,
Expand Down

0 comments on commit 1a37b4f

Please sign in to comment.