diff --git a/e2e/test/iothub/messaging/MessageSendE2ETests.cs b/e2e/test/iothub/messaging/MessageSendE2ETests.cs index 974be9733e..7cca652d71 100644 --- a/e2e/test/iothub/messaging/MessageSendE2ETests.cs +++ b/e2e/test/iothub/messaging/MessageSendE2ETests.cs @@ -20,10 +20,20 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging public partial class MessageSendE2ETests : E2EMsTestBase { private const int MessageBatchCount = 5; - private const int LargeMessageSizeInBytes = 255 * 1024; // The maximum message size for device to cloud messages is 256 KB. We are allowing 1 KB of buffer for message header information etc. - private readonly string DevicePrefix = $"{nameof(MessageSendE2ETests)}_"; - private readonly string ModulePrefix = $"{nameof(MessageSendE2ETests)}_"; - private static string ProxyServerAddress = TestConfiguration.IoTHub.ProxyServerAddress; + + // The maximum message size for device to cloud messages is 256 KB. We are allowing 1 KB of buffer for message header information etc. + private const int LargeMessageSizeInBytes = 255 * 1024; + + // The size of a device to cloud message. This exceeds the the maximum message size set by the hub; 256 KB. + private const int ExceedAllowedMessageSizeInBytes = 300 * 1024; + + // The size of a device to cloud message. This overly exceeds the maximum message size set by the hub, which is 256 KB. The reason why we are testing for this case is because + // we noticed a different behavior between this case, and the case where the message size is less than 1 MB. + private const int OverlyExceedAllowedMessageSizeInBytes = 3000 * 1024; + + private readonly string _devicePrefix = $"{nameof(MessageSendE2ETests)}_"; + private readonly string _modulePrefix = $"{nameof(MessageSendE2ETests)}_"; + private static string s_proxyServerAddress = TestConfiguration.IoTHub.ProxyServerAddress; [LoggedTestMethod] public async Task Message_DeviceSendSingleMessage_Amqp() @@ -80,7 +90,7 @@ public async Task Message_DeviceSendSingleMessage_AmqpWs_WithHeartbeats() public async Task Message_DeviceSendSingleMessage_Http_WithProxy() { Client.Http1TransportSettings httpTransportSettings = new Client.Http1TransportSettings(); - httpTransportSettings.Proxy = new WebProxy(ProxyServerAddress); + httpTransportSettings.Proxy = new WebProxy(s_proxyServerAddress); ITransportSettings[] transportSettings = new ITransportSettings[] { httpTransportSettings }; await SendSingleMessage(TestDeviceType.Sasl, transportSettings).ConfigureAwait(false); @@ -105,7 +115,7 @@ public async Task Message_DeviceSendSingleMessage_Http_WithCustomProxy() public async Task Message_DeviceSendSingleMessage_AmqpWs_WithProxy() { Client.AmqpTransportSettings amqpTransportSettings = new Client.AmqpTransportSettings(Client.TransportType.Amqp_WebSocket_Only); - amqpTransportSettings.Proxy = new WebProxy(ProxyServerAddress); + amqpTransportSettings.Proxy = new WebProxy(s_proxyServerAddress); ITransportSettings[] transportSettings = new ITransportSettings[] { amqpTransportSettings }; await SendSingleMessage(TestDeviceType.Sasl, transportSettings).ConfigureAwait(false); @@ -117,7 +127,7 @@ public async Task Message_DeviceSendSingleMessage_MqttWs_WithProxy() { Client.Transport.Mqtt.MqttTransportSettings mqttTransportSettings = new Client.Transport.Mqtt.MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only); - mqttTransportSettings.Proxy = new WebProxy(ProxyServerAddress); + mqttTransportSettings.Proxy = new WebProxy(s_proxyServerAddress); ITransportSettings[] transportSettings = new ITransportSettings[] { mqttTransportSettings }; await SendSingleMessage(TestDeviceType.Sasl, transportSettings).ConfigureAwait(false); @@ -128,7 +138,7 @@ public async Task Message_DeviceSendSingleMessage_MqttWs_WithProxy() public async Task Message_ModuleSendSingleMessage_AmqpWs_WithProxy() { Client.AmqpTransportSettings amqpTransportSettings = new Client.AmqpTransportSettings(Client.TransportType.Amqp_WebSocket_Only); - amqpTransportSettings.Proxy = new WebProxy(ProxyServerAddress); + amqpTransportSettings.Proxy = new WebProxy(s_proxyServerAddress); ITransportSettings[] transportSettings = new ITransportSettings[] { amqpTransportSettings }; await SendSingleMessageModule(transportSettings).ConfigureAwait(false); @@ -140,7 +150,7 @@ public async Task Message_ModuleSendSingleMessage_MqttWs_WithProxy() { Client.Transport.Mqtt.MqttTransportSettings mqttTransportSettings = new Client.Transport.Mqtt.MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only); - mqttTransportSettings.Proxy = new WebProxy(ProxyServerAddress); + mqttTransportSettings.Proxy = new WebProxy(s_proxyServerAddress); ITransportSettings[] transportSettings = new ITransportSettings[] { mqttTransportSettings }; await SendSingleMessageModule(transportSettings).ConfigureAwait(false); @@ -212,7 +222,7 @@ public async Task X509_DeviceSendBatchMessages_Http() [ExpectedException(typeof(MessageTooLargeException))] public async Task Message_ClientThrowsForMqttTopicNameTooLong() { - TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix).ConfigureAwait(false); + TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix).ConfigureAwait(false); using DeviceClient deviceClient = testDevice.CreateDeviceClient(Client.TransportType.Mqtt); await deviceClient.OpenAsync().ConfigureAwait(false); @@ -245,9 +255,95 @@ public async Task Message_DeviceSendSingleLargeMessageAsync(TestDeviceType testD await SendSingleMessage(testDeviceType, transportType, messageSize).ConfigureAwait(false); } + [LoggedTestMethod] + [ExpectedException(typeof(MessageTooLargeException))] + public async Task Message_DeviceSendMessageOverAllowedSize_Amqp() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_Tcp_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + [LoggedTestMethod] + [ExpectedException(typeof(MessageTooLargeException))] + public async Task Message_DeviceSendMessageOverAllowedSize_AmqpWs() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_WebSocket_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + // MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than + // Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175 + // This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it + // and throwing a MessageTooLargeException, if relevant. + [LoggedTestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task Message_DeviceSendMessageOverAllowedSize_Mqtt() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_Tcp_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + // MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than + // Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175 + // This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it + // and throwing a MessageTooLargeException, if relevant. + [LoggedTestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task Message_DeviceSendMessageOverAllowedSize_MqttWs() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_WebSocket_Only, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + [LoggedTestMethod] + [ExpectedException(typeof(MessageTooLargeException))] + public async Task Message_DeviceSendMessageOverAllowedSize_Http() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Http1, ExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + [LoggedTestMethod] + [ExpectedException(typeof(MessageTooLargeException))] + public async Task Message_DeviceSendMessageWayOverAllowedSize_Amqp() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_Tcp_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + [LoggedTestMethod] + [ExpectedException(typeof(MessageTooLargeException))] + public async Task Message_DeviceSendMessageWayOverAllowedSize_AmqpWs() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Amqp_WebSocket_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + // MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than + // Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175 + // This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it + // and throwing a MessageTooLargeException, if relevant. + [LoggedTestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task Message_DeviceSendMessageWayOverAllowedSize_Mqtt() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_Tcp_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + // MQTT protocol will throw an InvalidOperationException if the PUBLISH packet is greater than + // Hub limits: https://github.com/Azure/azure-iot-sdk-csharp/blob/d46e0f07fe8d80e21e07b41c2e75b0bd1fcb8f80/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs#L1175 + // This flow is a bit different from other protocols where we do not inspect the packet being sent but rather rely on service validating it + // and throwing a MessageTooLargeException, if relevant. + [LoggedTestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public async Task Message_DeviceSendMessageWayOverAllowedSize_MqttWs() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Mqtt_WebSocket_Only, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + + [LoggedTestMethod] + [ExpectedException(typeof(MessageTooLargeException))] + public async Task Message_DeviceSendMessageWayOverAllowedSize_Http() + { + await SendSingleMessage(TestDeviceType.Sasl, Client.TransportType.Http1, OverlyExceedAllowedMessageSizeInBytes).ConfigureAwait(false); + } + private async Task SendSingleMessage(TestDeviceType type, Client.TransportType transport, int messageSize = 0) { - TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix, type).ConfigureAwait(false); + TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix, type).ConfigureAwait(false); using DeviceClient deviceClient = testDevice.CreateDeviceClient(transport); await deviceClient.OpenAsync().ConfigureAwait(false); @@ -257,7 +353,7 @@ private async Task SendSingleMessage(TestDeviceType type, Client.TransportType t private async Task SendBatchMessages(TestDeviceType type, Client.TransportType transport) { - TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix, type).ConfigureAwait(false); + TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix, type).ConfigureAwait(false); using DeviceClient deviceClient = testDevice.CreateDeviceClient(transport); await deviceClient.OpenAsync().ConfigureAwait(false); @@ -267,7 +363,7 @@ private async Task SendBatchMessages(TestDeviceType type, Client.TransportType t private async Task SendSingleMessage(TestDeviceType type, ITransportSettings[] transportSettings, int messageSize = 0) { - TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, DevicePrefix, type).ConfigureAwait(false); + TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix, type).ConfigureAwait(false); using DeviceClient deviceClient = testDevice.CreateDeviceClient(transportSettings); await deviceClient.OpenAsync().ConfigureAwait(false); @@ -277,7 +373,7 @@ private async Task SendSingleMessage(TestDeviceType type, ITransportSettings[] t private async Task SendSingleMessageModule(ITransportSettings[] transportSettings) { - TestModule testModule = await TestModule.GetTestModuleAsync(DevicePrefix, ModulePrefix, Logger).ConfigureAwait(false); + TestModule testModule = await TestModule.GetTestModuleAsync(_devicePrefix, _modulePrefix, Logger).ConfigureAwait(false); using var moduleClient = ModuleClient.CreateFromConnectionString(testModule.ConnectionString, transportSettings); await moduleClient.OpenAsync().ConfigureAwait(false); diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotExceptionAdapter.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotExceptionAdapter.cs index fff5e23334..57babf2e13 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotExceptionAdapter.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotExceptionAdapter.cs @@ -15,17 +15,26 @@ internal static Exception ConvertToIotHubException(Exception exception) { return new IotHubCommunicationException(exception.Message, exception); } - else if (exception is UnauthorizedAccessException) + + if (exception is UnauthorizedAccessException) { return new UnauthorizedException(exception.Message, exception); } - else + + if (exception is OperationCanceledException + && exception.InnerException is AmqpException innerAmqpException + && innerAmqpException != null) { - var amqpException = exception as AmqpException; - return amqpException == null - ? exception - : AmqpIotErrorAdapter.ToIotHubClientContract(amqpException); + return AmqpIotErrorAdapter.ToIotHubClientContract(innerAmqpException); } + + if (exception is AmqpException amqpException + && amqpException != null) + { + return AmqpIotErrorAdapter.ToIotHubClientContract(amqpException); + } + + return exception; } internal static Exception ConvertToIotHubException(Exception exception, AmqpObject source) diff --git a/iothub/device/src/Transport/ProtocolRoutingDelegatingHandler.cs b/iothub/device/src/Transport/ProtocolRoutingDelegatingHandler.cs index d0b624cd8e..20636f8ef3 100644 --- a/iothub/device/src/Transport/ProtocolRoutingDelegatingHandler.cs +++ b/iothub/device/src/Transport/ProtocolRoutingDelegatingHandler.cs @@ -10,8 +10,8 @@ namespace Microsoft.Azure.Devices.Client.Transport { /// - /// Transport handler router. - /// Tries to open the connection in the protocol order it was set. + /// Transport handler router. + /// Tries to open the connection in the protocol order it was set. /// If fails tries to open the next one, etc. /// internal class ProtocolRoutingDelegatingHandler : DefaultDelegatingHandler @@ -22,6 +22,7 @@ internal class ProtocolRoutingDelegatingHandler : DefaultDelegatingHandler /// After we've verified that we could open the transport for any operation, we will stop attempting others in the list. /// private bool _transportSelectionComplete; + private int _nextTransportIndex; private SemaphoreSlim _handlerLock = new SemaphoreSlim(1, 1);