Skip to content

Commit

Permalink
Add test for failed message send.
Browse files Browse the repository at this point in the history
Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
  • Loading branch information
b-abel committed Aug 4, 2020
1 parent 8a1602d commit a75f592
Showing 1 changed file with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.message.Message;
Expand Down Expand Up @@ -79,6 +80,7 @@
import io.vertx.junit5.VertxTestContext;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
Expand Down Expand Up @@ -579,6 +581,43 @@ public void testEventMessage() {

}

/**
* Verifies that when a message is being rejected by the remote, the connection to the device is closed.
*/
@Test
public void sendEventClosesEndpointWhenMessageIsRejected() {

// GIVEN a protocol gateway that sends every MQTT publish message as an event downstream and a connected MQTT
// endpoint
final TestMqttProtocolGateway gateway = createGateway();
final MqttEndpoint mqttEndpoint = connectTestDevice(gateway);

// WHEN sending a MQTT message...
ProtocolGatewayTestHelper.sendMessage(mqttEndpoint, Buffer.buffer("payload1"), "topic/1");
// ... that gets rejected by the remote
rejectAmqpMessage();

// THEN the endpoint has been closed
assertThat(mqttEndpoint.isConnected()).isFalse();
// ... and the callback onDeviceConnectionClose() has been invoked
assertThat(gateway.isConnectionClosed()).isTrue();

}

private void rejectAmqpMessage() {

@SuppressWarnings("unchecked")
final ArgumentCaptor<Handler<ProtonDelivery>> handlerArgumentCaptor = ArgumentCaptor.forClass(Handler.class);

verify(protonSender).send(any(), handlerArgumentCaptor.capture());

ProtonDelivery protonDelivery = mock(ProtonDelivery.class);
when(protonDelivery.getRemoteState()).thenReturn(new Rejected());
when(protonDelivery.remotelySettled()).thenReturn(true);

handlerArgumentCaptor.getValue().handle(protonDelivery);
}

/**
* Verifies that a telemetry message is being sent to the right address.
*/
Expand Down

0 comments on commit a75f592

Please sign in to comment.