Skip to content

Commit

Permalink
fix(iot-device): Update the amqp layer to inspect the inner exception…
Browse files Browse the repository at this point in the history
… for an amqp error code (#2053)
  • Loading branch information
abhipsaMisra authored Jun 29, 2021
1 parent c83a1d5 commit a2a23d6
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 22 deletions.
124 changes: 110 additions & 14 deletions e2e/test/iothub/messaging/MessageSendE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,20 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
public partial class MessageSendE2ETests : E2EMsTestBase
{
private const int MessageBatchCount = 5;
private const int LargeMessageSizeInBytes = 255 * 1024; // The maximum message size for device to cloud messages is 256 KB. We are allowing 1 KB of buffer for message header information etc.
private readonly string DevicePrefix = $"{nameof(MessageSendE2ETests)}_";
private readonly string ModulePrefix = $"{nameof(MessageSendE2ETests)}_";
private static string ProxyServerAddress = TestConfiguration.IoTHub.ProxyServerAddress;

// The maximum message size for device to cloud messages is 256 KB. We are allowing 1 KB of buffer for message header information etc.
private const int LargeMessageSizeInBytes = 255 * 1024;

// The size of a device to cloud message. This exceeds the the maximum message size set by the hub; 256 KB.
private const int ExceedAllowedMessageSizeInBytes = 300 * 1024;

// The size of a device to cloud message. This overly exceeds the maximum message size set by the hub, which is 256 KB. The reason why we are testing for this case is because
// we noticed a different behavior between this case, and the case where the message size is less than 1 MB.
private const int OverlyExceedAllowedMessageSizeInBytes = 3000 * 1024;

private readonly string _devicePrefix = $"{nameof(MessageSendE2ETests)}_";
private readonly string _modulePrefix = $"{nameof(MessageSendE2ETests)}_";
private static string s_proxyServerAddress = TestConfiguration.IoTHub.ProxyServerAddress;

[LoggedTestMethod]
public async Task Message_DeviceSendSingleMessage_Amqp()
Expand Down Expand Up @@ -80,7 +90,7 @@ public async Task Message_DeviceSendSingleMessage_AmqpWs_WithHeartbeats()
public async Task Message_DeviceSendSingleMessage_Http_WithProxy()
{
Client.Http1TransportSettings httpTransportSettings = new Client.Http1TransportSettings();
httpTransportSettings.Proxy = new WebProxy(ProxyServerAddress);
httpTransportSettings.Proxy = new WebProxy(s_proxyServerAddress);
ITransportSettings[] transportSettings = new ITransportSettings[] { httpTransportSettings };

await SendSingleMessage(TestDeviceType.Sasl, transportSettings).ConfigureAwait(false);
Expand All @@ -105,7 +115,7 @@ public async Task Message_DeviceSendSingleMessage_Http_WithCustomProxy()
public async Task Message_DeviceSendSingleMessage_AmqpWs_WithProxy()
{
Client.AmqpTransportSettings amqpTransportSettings = new Client.AmqpTransportSettings(Client.TransportType.Amqp_WebSocket_Only);
amqpTransportSettings.Proxy = new WebProxy(ProxyServerAddress);
amqpTransportSettings.Proxy = new WebProxy(s_proxyServerAddress);
ITransportSettings[] transportSettings = new ITransportSettings[] { amqpTransportSettings };

await SendSingleMessage(TestDeviceType.Sasl, transportSettings).ConfigureAwait(false);
Expand All @@ -117,7 +127,7 @@ public async Task Message_DeviceSendSingleMessage_MqttWs_WithProxy()
{
Client.Transport.Mqtt.MqttTransportSettings mqttTransportSettings =
new Client.Transport.Mqtt.MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only);
mqttTransportSettings.Proxy = new WebProxy(ProxyServerAddress);
mqttTransportSettings.Proxy = new WebProxy(s_proxyServerAddress);
ITransportSettings[] transportSettings = new ITransportSettings[] { mqttTransportSettings };

await SendSingleMessage(TestDeviceType.Sasl, transportSettings).ConfigureAwait(false);
Expand All @@ -128,7 +138,7 @@ public async Task Message_DeviceSendSingleMessage_MqttWs_WithProxy()
public async Task Message_ModuleSendSingleMessage_AmqpWs_WithProxy()
{
Client.AmqpTransportSettings amqpTransportSettings = new Client.AmqpTransportSettings(Client.TransportType.Amqp_WebSocket_Only);
amqpTransportSettings.Proxy = new WebProxy(ProxyServerAddress);
amqpTransportSettings.Proxy = new WebProxy(s_proxyServerAddress);
ITransportSettings[] transportSettings = new ITransportSettings[] { amqpTransportSettings };

await SendSingleMessageModule(transportSettings).ConfigureAwait(false);
Expand All @@ -140,7 +150,7 @@ public async Task Message_ModuleSendSingleMessage_MqttWs_WithProxy()
{
Client.Transport.Mqtt.MqttTransportSettings mqttTransportSettings =
new Client.Transport.Mqtt.MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only);
mqttTransportSettings.Proxy = new WebProxy(ProxyServerAddress);
mqttTransportSettings.Proxy = new WebProxy(s_proxyServerAddress);
ITransportSettings[] transportSettings = new ITransportSettings[] { mqttTransportSettings };

await SendSingleMessageModule(transportSettings).ConfigureAwait(false);
Expand Down Expand Up @@ -212,7 +222,7 @@ public async Task X509_DeviceSendBatchMessages_Http()
[ExpectedException(typeof(MessageTooLargeException))]
public async Task Message_ClientThrowsForMqttTopicNameTooLong()
{
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix).ConfigureAwait(false);
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix).ConfigureAwait(false);
using DeviceClient deviceClient = testDevice.CreateDeviceClient(Client.TransportType.Mqtt);

await deviceClient.OpenAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -245,9 +255,95 @@ public async Task Message_DeviceSendSingleLargeMessageAsync(TestDeviceType testD
await SendSingleMessage(testDeviceType, transportType, messageSize).ConfigureAwait(false);
}

[LoggedTestMethod]
[ExpectedException(typeof(MessageTooLargeException))]
public async Task Message_DeviceSendMessageOverAllowedSize_Amqp()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_Tcp_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

[LoggedTestMethod]
[ExpectedException(typeof(MessageTooLargeException))]
public async Task Message_DeviceSendMessageOverAllowedSize_AmqpWs()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_WebSocket_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

// MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than
// Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175
// This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it
// and throwing a MessageTooLargeException, if relevant.
[LoggedTestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task Message_DeviceSendMessageOverAllowedSize_Mqtt()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_Tcp_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

// MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than
// Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175
// This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it
// and throwing a MessageTooLargeException, if relevant.
[LoggedTestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task Message_DeviceSendMessageOverAllowedSize_MqttWs()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_WebSocket_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

[LoggedTestMethod]
[ExpectedException(typeof(MessageTooLargeException))]
public async Task Message_DeviceSendMessageOverAllowedSize_Http()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Http1, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

[LoggedTestMethod]
[ExpectedException(typeof(MessageTooLargeException))]
public async Task Message_DeviceSendMessageWayOverAllowedSize_Amqp()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_Tcp_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

[LoggedTestMethod]
[ExpectedException(typeof(MessageTooLargeException))]
public async Task Message_DeviceSendMessageWayOverAllowedSize_AmqpWs()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_WebSocket_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

// MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than
// Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175
// This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it
// and throwing a MessageTooLargeException, if relevant.
[LoggedTestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task Message_DeviceSendMessageWayOverAllowedSize_Mqtt()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_Tcp_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

// MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than
// Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175
// This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it
// and throwing a MessageTooLargeException, if relevant.
[LoggedTestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public async Task Message_DeviceSendMessageWayOverAllowedSize_MqttWs()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_WebSocket_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

[LoggedTestMethod]
[ExpectedException(typeof(MessageTooLargeException))]
public async Task Message_DeviceSendMessageWayOverAllowedSize_Http()
{
await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Http1, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false);
}

private async Task SendSingleMessage(TestDeviceType type, Client.TransportType transport, int messageSize = 0)
{
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix, type).ConfigureAwait(false);
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix, type).ConfigureAwait(false);
using DeviceClient deviceClient = testDevice.CreateDeviceClient(transport);

await deviceClient.OpenAsync().ConfigureAwait(false);
Expand All @@ -257,7 +353,7 @@ private async Task SendSingleMessage(TestDeviceType type, Client.TransportType t

private async Task SendBatchMessages(TestDeviceType type, Client.TransportType transport)
{
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix, type).ConfigureAwait(false);
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix, type).ConfigureAwait(false);
using DeviceClient deviceClient = testDevice.CreateDeviceClient(transport);

await deviceClient.OpenAsync().ConfigureAwait(false);
Expand All @@ -267,7 +363,7 @@ private async Task SendBatchMessages(TestDeviceType type, Client.TransportType t

private async Task SendSingleMessage(TestDeviceType type, ITransportSettings[] transportSettings, int messageSize = 0)
{
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix, type).ConfigureAwait(false);
TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix, type).ConfigureAwait(false);
using DeviceClient deviceClient = testDevice.CreateDeviceClient(transportSettings);

await deviceClient.OpenAsync().ConfigureAwait(false);
Expand All @@ -277,7 +373,7 @@ private async Task SendSingleMessage(TestDeviceType type, ITransportSettings[] t

private async Task SendSingleMessageModule(ITransportSettings[] transportSettings)
{
TestModule testModule = await TestModule.GetTestModuleAsync(DevicePrefix, ModulePrefix, Logger).ConfigureAwait(false);
TestModule testModule = await TestModule.GetTestModuleAsync(_devicePrefix, _modulePrefix, Logger).ConfigureAwait(false);
using var moduleClient = ModuleClient.CreateFromConnectionString(testModule.ConnectionString, transportSettings);

await moduleClient.OpenAsync().ConfigureAwait(false);
Expand Down
21 changes: 15 additions & 6 deletions iothub/device/src/Transport/AmqpIot/AmqpIotExceptionAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@ internal static Exception ConvertToIotHubException(Exception exception)
{
return new IotHubCommunicationException(exception.Message, exception);
}
else if (exception is UnauthorizedAccessException)

if (exception is UnauthorizedAccessException)
{
return new UnauthorizedException(exception.Message, exception);
}
else

if (exception is OperationCanceledException
&& exception.InnerException is AmqpException innerAmqpException
&& innerAmqpException != null)
{
var amqpException = exception as AmqpException;
return amqpException == null
? exception
: AmqpIotErrorAdapter.ToIotHubClientContract(amqpException);
return AmqpIotErrorAdapter.ToIotHubClientContract(innerAmqpException);
}

if (exception is AmqpException amqpException
&& amqpException != null)
{
return AmqpIotErrorAdapter.ToIotHubClientContract(amqpException);
}

return exception;
}

internal static Exception ConvertToIotHubException(Exception exception, AmqpObject source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
namespace Microsoft.Azure.Devices.Client.Transport
{
/// <summary>
/// Transport handler router.
/// Tries to open the connection in the protocol order it was set.
/// Transport handler router.
/// Tries to open the connection in the protocol order it was set.
/// If fails tries to open the next one, etc.
/// </summary>
internal class ProtocolRoutingDelegatingHandler : DefaultDelegatingHandler
Expand All @@ -22,6 +22,7 @@ internal class ProtocolRoutingDelegatingHandler : DefaultDelegatingHandler
/// After we've verified that we could open the transport for any operation, we will stop attempting others in the list.
/// </summary>
private bool _transportSelectionComplete;

private int _nextTransportIndex;

private SemaphoreSlim _handlerLock = new SemaphoreSlim(1, 1);
Expand Down

0 comments on commit a2a23d6

Please sign in to comment.