From a75f592f05dade0f25aedaabf7de03e3704f21e9 Mon Sep 17 00:00:00 2001 From: Abel Buechner-Mihaljevic Date: Tue, 4 Aug 2020 12:31:36 +0200 Subject: [PATCH] Add test for failed message send. Signed-off-by: Abel Buechner-Mihaljevic --- .../AbstractMqttProtocolGatewayTest.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java index e5dcca78..90801ca9 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.transport.Target; import org.apache.qpid.proton.message.Message; @@ -79,6 +80,7 @@ import io.vertx.junit5.VertxTestContext; import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.MqttServerOptions; +import io.vertx.proton.ProtonDelivery; import io.vertx.proton.ProtonQoS; import io.vertx.proton.ProtonReceiver; import io.vertx.proton.ProtonSender; @@ -579,6 +581,43 @@ public void testEventMessage() { } + /** + * Verifies that when a message is being rejected by the remote, the connection to the device is closed. + */ + @Test + public void sendEventClosesEndpointWhenMessageIsRejected() { + + // GIVEN a protocol gateway that sends every MQTT publish message as an event downstream and a connected MQTT + // endpoint + final TestMqttProtocolGateway gateway = createGateway(); + final MqttEndpoint mqttEndpoint = connectTestDevice(gateway); + + // WHEN sending a MQTT message... + ProtocolGatewayTestHelper.sendMessage(mqttEndpoint, Buffer.buffer("payload1"), "topic/1"); + // ... that gets rejected by the remote + rejectAmqpMessage(); + + // THEN the endpoint has been closed + assertThat(mqttEndpoint.isConnected()).isFalse(); + // ... and the callback onDeviceConnectionClose() has been invoked + assertThat(gateway.isConnectionClosed()).isTrue(); + + } + + private void rejectAmqpMessage() { + + @SuppressWarnings("unchecked") + final ArgumentCaptor> handlerArgumentCaptor = ArgumentCaptor.forClass(Handler.class); + + verify(protonSender).send(any(), handlerArgumentCaptor.capture()); + + ProtonDelivery protonDelivery = mock(ProtonDelivery.class); + when(protonDelivery.getRemoteState()).thenReturn(new Rejected()); + when(protonDelivery.remotelySettled()).thenReturn(true); + + handlerArgumentCaptor.getValue().handle(protonDelivery); + } + /** * Verifies that a telemetry message is being sent to the right address. */