From 0318763abd0b88e122cfb1c6a68298379db8daf8 Mon Sep 17 00:00:00 2001 From: Pablo Gonzalez Granados Date: Wed, 8 Mar 2023 21:12:29 +0100 Subject: [PATCH] Add smallrye coverage to integrations tests --- .all-contributorsrc | 12 ++- .../pom.xml | 12 +-- .../test/smallrye/PriceConverter.java | 30 +++++++ .../test/smallrye/PriceGenerator.java | 35 ++++++++ .../test/smallrye/PriceResource.java | 35 ++++++++ .../src/main/resources/application.properties | 13 +++ .../test/smallrye}/HivemqDefaultProfile.java | 2 +- .../test/smallrye}/HivemqResources.java | 9 ++- .../test/smallrye/PriceResourceIT.java | 7 ++ .../test/smallrye/PriceResourceTest.java | 62 ++++++++++++++ .../test/vanilla/HttpBridgeApplication.java | 12 --- .../test/vanilla/dto/MessageDto.java | 21 ----- .../test/vanilla/dto/PublishMsgRespDto.java | 15 ---- .../resources/BrokerPushToTopicResources.java | 46 ----------- .../BrokerSubscribeToTopicResources.java | 33 -------- .../test/vanilla/services/HiveProxy.java | 80 ------------------- .../src/main/resources/application.properties | 4 - .../BrokerPushToTopicResourcesTest.java | 32 -------- .../BrokerSubscribeToTopicResourceTest.java | 70 ---------------- .../test/vanilla/CommonsTestSuite.java | 26 ------ .../NativeBrokerPushToTopicResourcesIT.java | 7 -- integration-tests/pom.xml | 2 +- 22 files changed, 204 insertions(+), 361 deletions(-) rename integration-tests/{hivemq-client-vanilla => hivemq-client-smallrye}/pom.xml (88%) create mode 100644 integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceConverter.java create mode 100644 integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceGenerator.java create mode 100644 integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResource.java create mode 100644 integration-tests/hivemq-client-smallrye/src/main/resources/application.properties rename integration-tests/{hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla => hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye}/HivemqDefaultProfile.java (86%) rename integration-tests/{hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla => hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye}/HivemqResources.java (61%) create mode 100644 integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceIT.java create mode 100644 integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceTest.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/HttpBridgeApplication.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/MessageDto.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/PublishMsgRespDto.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerPushToTopicResources.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerSubscribeToTopicResources.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/services/HiveProxy.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/main/resources/application.properties delete mode 100644 integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerPushToTopicResourcesTest.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerSubscribeToTopicResourceTest.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/CommonsTestSuite.java delete mode 100644 integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/NativeBrokerPushToTopicResourcesIT.java diff --git a/.all-contributorsrc b/.all-contributorsrc index 0c8efc1..6636ccd 100644 --- a/.all-contributorsrc +++ b/.all-contributorsrc @@ -14,7 +14,17 @@ "code", "maintenance" ] - } + }, + { + "login": "pjgg", + "name": "pjgg", + "avatar_url": "https://avatars.githubusercontent.com/u/3541131?v=4", + "profile": "https://github.com/pjgg", + "contributions": [ + "code", + "maintenance" + ] + } ], "contributorsPerLine": 7, "projectName": "quarkus-hivemq-client", diff --git a/integration-tests/hivemq-client-vanilla/pom.xml b/integration-tests/hivemq-client-smallrye/pom.xml similarity index 88% rename from integration-tests/hivemq-client-vanilla/pom.xml rename to integration-tests/hivemq-client-smallrye/pom.xml index 9f6255c..12d9f99 100644 --- a/integration-tests/hivemq-client-vanilla/pom.xml +++ b/integration-tests/hivemq-client-smallrye/pom.xml @@ -7,8 +7,8 @@ 1.0.1-SNAPSHOT - quarkus-hivemq-client-integration-tests-minimal - Quarkus - Hivemq Client - Integration Tests Vanilla + quarkus-hivemq-client-integration-tests-smallrye + Quarkus - Hivemq Client - Integration Tests smallrye 1.17.6 @@ -19,15 +19,11 @@ io.quarkus - quarkus-vertx + quarkus-resteasy-reactive io.quarkus - quarkus-resteasy-reactive-jackson - - - io.quarkus - quarkus-smallrye-openapi + quarkus-smallrye-reactive-messaging-mqtt io.quarkiverse.hivemqclient diff --git a/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceConverter.java b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceConverter.java new file mode 100644 index 0000000..a519b40 --- /dev/null +++ b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceConverter.java @@ -0,0 +1,30 @@ +package io.quarkiverse.hivemqclient.test.smallrye; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.jboss.logging.Logger; + +import io.smallrye.reactive.messaging.annotations.Broadcast; + +/** + * A bean consuming data from the "prices" MQTT topic and applying some conversion. + * The result is pushed to the "my-data-stream" stream which is an in-memory stream. + */ +@ApplicationScoped +public class PriceConverter { + + private static final Logger LOG = Logger.getLogger(PriceConverter.class); + private static final double CONVERSION_RATE = 0.88; + + @Incoming("prices") + @Outgoing("my-data-stream") + @Broadcast + public double process(byte[] priceRaw) { + int priceInUsd = Integer.parseInt(new String(priceRaw)); + LOG.infof("Receiving price: %d ", priceInUsd); + return priceInUsd * CONVERSION_RATE; + } + +} diff --git a/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceGenerator.java b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceGenerator.java new file mode 100644 index 0000000..940e99a --- /dev/null +++ b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceGenerator.java @@ -0,0 +1,35 @@ +package io.quarkiverse.hivemqclient.test.smallrye; + +import java.time.Duration; +import java.util.Random; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.jboss.logging.Logger; + +import io.smallrye.mutiny.Multi; + +/** + * A bean producing random prices every second. + * The prices are written to a MQTT topic (prices). The MQTT configuration is specified in the application configuration. + */ +@ApplicationScoped +public class PriceGenerator { + + private static final Logger LOG = Logger.getLogger(PriceGenerator.class); + + private Random random = new Random(); + + @Outgoing("topic-price") + public Multi generate() { + return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) + .onOverflow().drop() + .map(tick -> { + int price = random.nextInt(100); + LOG.infof("Sending price: %d", price); + return price; + }); + } + +} diff --git a/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResource.java b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResource.java new file mode 100644 index 0000000..1e9da7b --- /dev/null +++ b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResource.java @@ -0,0 +1,35 @@ +package io.quarkiverse.hivemqclient.test.smallrye; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Multi; + +/** + * A simple resource retrieving the "in-memory" "my-data-stream" and sending the items to a server sent event. + */ +@Path("/prices") +public class PriceResource { + + @Inject + @Channel("my-data-stream") + Multi prices; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public String hello() { + return "hello"; + } + + @GET + @Path("/stream") + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi stream() { + return prices; + } +} diff --git a/integration-tests/hivemq-client-smallrye/src/main/resources/application.properties b/integration-tests/hivemq-client-smallrye/src/main/resources/application.properties new file mode 100644 index 0000000..8abf690 --- /dev/null +++ b/integration-tests/hivemq-client-smallrye/src/main/resources/application.properties @@ -0,0 +1,13 @@ +# Configure the MQTT sink (we write to it) +mp.messaging.outgoing.topic-price.type=smallrye-mqtt-hivemq +mp.messaging.outgoing.topic-price.topic=prices +mp.messaging.outgoing.topic-price.host=localhost +mp.messaging.outgoing.topic-price.port=1883 +mp.messaging.outgoing.topic-price.auto-generated-client-id=true + +# Configure the MQTT source (we read from it) +mp.messaging.incoming.prices.type=smallrye-mqtt-hivemq +mp.messaging.incoming.prices.topic=prices +mp.messaging.incoming.prices.host=localhost +mp.messaging.incoming.prices.port=1883 +mp.messaging.incoming.prices.auto-generated-client-id=true \ No newline at end of file diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqDefaultProfile.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqDefaultProfile.java similarity index 86% rename from integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqDefaultProfile.java rename to integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqDefaultProfile.java index 093dcd0..956d6aa 100644 --- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqDefaultProfile.java +++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqDefaultProfile.java @@ -1,4 +1,4 @@ -package io.quarkiverse.hivemqclient.test.vanilla; +package io.quarkiverse.hivemqclient.test.smallrye; import java.util.Collections; import java.util.List; diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqResources.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqResources.java similarity index 61% rename from integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqResources.java rename to integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqResources.java index 717907d..0598b0f 100644 --- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqResources.java +++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqResources.java @@ -1,4 +1,4 @@ -package io.quarkiverse.hivemqclient.test.vanilla; +package io.quarkiverse.hivemqclient.test.smallrye; import static org.testcontainers.utility.DockerImageName.parse; @@ -17,9 +17,10 @@ public class HivemqResources implements QuarkusTestResourceLifecycleManager { public Map start() { hivemqContainer.start(); Map config = new HashMap<>(); - config.put("hivemq.cluster.host", hivemqContainer.getHost()); - config.put("hivemq.cluster.port", "" + hivemqContainer.getMqttPort()); - + config.put("mp.messaging.outgoing.topic-price.host", hivemqContainer.getHost()); + config.put("mp.messaging.outgoing.topic-price.port", "" + hivemqContainer.getMqttPort()); + config.put("mp.messaging.incoming.prices.host", hivemqContainer.getHost()); + config.put("mp.messaging.incoming.prices.port", "" + hivemqContainer.getMqttPort()); return config; } diff --git a/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceIT.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceIT.java new file mode 100644 index 0000000..7ad8d25 --- /dev/null +++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceIT.java @@ -0,0 +1,7 @@ +package io.quarkiverse.hivemqclient.test.smallrye; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class PriceResourceIT extends PriceResourceTest { +} diff --git a/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceTest.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceTest.java new file mode 100644 index 0000000..f2b8fb2 --- /dev/null +++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceTest.java @@ -0,0 +1,62 @@ +package io.quarkiverse.hivemqclient.test.smallrye; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.sse.SseEventSource; + +import org.jboss.logging.Logger; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@TestProfile(HivemqDefaultProfile.class) +@QuarkusTest +public class PriceResourceTest { + + private static final Logger LOG = Logger.getLogger(PriceResourceTest.class); + + @TestHTTPResource("prices/stream") + URI pricesUrl; + + @Test + public void shouldGetHello() { + given() + .when().get("/prices") + .then() + .statusCode(200) + .body(is("hello")); + } + + @Test + public void shouldGetStreamOfPrices() { + Client client = ClientBuilder.newClient(); + WebTarget target = client.target(pricesUrl); + + AtomicInteger priceCount = new AtomicInteger(); + + try (SseEventSource source = SseEventSource.target(target).build()) { + source.register(event -> { + Double value = event.readData(Double.class); + LOG.infof("Received price: %f", value); + priceCount.incrementAndGet(); + }); + source.open(); + Thread.sleep(15 * 1000L); + } catch (InterruptedException ignored) { + } + + int count = priceCount.get(); + assertTrue(count > 1, "Expected more than 2 prices read from the source, got " + count); + } + +} diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/HttpBridgeApplication.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/HttpBridgeApplication.java deleted file mode 100644 index 1db3e92..0000000 --- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/HttpBridgeApplication.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla; - -import javax.ws.rs.core.Application; - -import org.eclipse.microprofile.openapi.annotations.OpenAPIDefinition; -import org.eclipse.microprofile.openapi.annotations.info.Contact; -import org.eclipse.microprofile.openapi.annotations.info.Info; -import org.eclipse.microprofile.openapi.annotations.info.License; - -@OpenAPIDefinition(info = @Info(title = "Http / HiveMQ bridge API example", version = "1.0.0", contact = @Contact(name = "Http / HiveMQ bridge API Support", url = "http://exampleurl.com/contact", email = "fake-support-examples@hivemq.com"), license = @License(name = "Apache 2.0", url = "https://www.apache.org/licenses/LICENSE-2.0.html"))) -public class HttpBridgeApplication extends Application { -} diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/MessageDto.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/MessageDto.java deleted file mode 100644 index 742f826..0000000 --- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/MessageDto.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla.dto; - -public class MessageDto { - - private String value; - - public MessageDto() { - } - - public MessageDto(String value) { - this.value = value; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } -} diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/PublishMsgRespDto.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/PublishMsgRespDto.java deleted file mode 100644 index 6c65fe4..0000000 --- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/PublishMsgRespDto.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla.dto; - -public class PublishMsgRespDto { - - private final String topic; - - public PublishMsgRespDto(String topic) { - this.topic = topic; - } - - public String getTopic() { - return topic; - } - -} diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerPushToTopicResources.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerPushToTopicResources.java deleted file mode 100644 index e75dc8e..0000000 --- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerPushToTopicResources.java +++ /dev/null @@ -1,46 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla.resources; - -import javax.ws.rs.Consumes; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -import org.jboss.resteasy.reactive.RestPath; - -import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; - -import io.quarkiverse.hivemqclient.test.vanilla.dto.MessageDto; -import io.quarkiverse.hivemqclient.test.vanilla.dto.PublishMsgRespDto; -import io.quarkiverse.hivemqclient.test.vanilla.services.HiveProxy; -import io.smallrye.mutiny.Uni; - -@Consumes(MediaType.APPLICATION_JSON) -@Produces(MediaType.APPLICATION_JSON) -@Path("/mqtt/{brokerName}/push") -public class BrokerPushToTopicResources { - - private HiveProxy hiveProxy; - - BrokerPushToTopicResources(HiveProxy hiveProxy) { - this.hiveProxy = hiveProxy; - } - - @POST - @Path("/{topicName}") - public Uni pushMessage(@RestPath("brokerName") String brokerName, @RestPath("topicName") String topicName, - MessageDto message) { - return hiveProxy - .pushSimpleMsg(brokerName, topicName, message.getValue()) - .onItem().transform(payload -> makePushMsgResponse(topicName, payload)); - } - - private static Response makePushMsgResponse(String topicName, Mqtt5PublishResult payload) { - if (payload.getError().isPresent()) { - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(payload.getError().get().getMessage()).build(); - } - - return Response.status(Response.Status.CREATED).entity(new PublishMsgRespDto(topicName)).build(); - } -} diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerSubscribeToTopicResources.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerSubscribeToTopicResources.java deleted file mode 100644 index 368928b..0000000 --- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerSubscribeToTopicResources.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla.resources; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.core.MediaType; - -import org.jboss.resteasy.reactive.RestPath; -import org.jboss.resteasy.reactive.RestStreamElementType; - -import io.quarkiverse.hivemqclient.test.vanilla.services.HiveProxy; -import io.smallrye.mutiny.Multi; -import io.vertx.mutiny.core.eventbus.EventBus; - -@Path("/mqtt/{brokerName}/subscribe") -public class BrokerSubscribeToTopicResources { - - private EventBus bus; - private HiveProxy hiveProxy; - - BrokerSubscribeToTopicResources(EventBus bus, HiveProxy hiveProxy) { - this.bus = bus; - this.hiveProxy = hiveProxy; - } - - @GET - @Path("/{topicName}") - @RestStreamElementType(MediaType.TEXT_PLAIN) - public Multi subscribeToTopic(@RestPath("brokerName") String brokerName, @RestPath("topicName") String topicName) { - return hiveProxy.subscribeTo(brokerName, topicName).toMulti() - .flatMap(topic -> bus. consumer(topic).bodyStream().toMulti()); - } - -} diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/services/HiveProxy.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/services/HiveProxy.java deleted file mode 100644 index f5f6c85..0000000 --- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/services/HiveProxy.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla.services; - -import static com.hivemq.client.mqtt.MqttGlobalPublishFilter.ALL; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -import javax.inject.Singleton; - -import org.eclipse.microprofile.config.inject.ConfigProperty; - -import com.hivemq.client.mqtt.MqttClient; -import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; -import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; -import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; - -import io.smallrye.mutiny.Uni; -import io.vertx.mutiny.core.eventbus.EventBus; - -@Singleton -public class HiveProxy { - - private EventBus bus; - private String clusterHostName; - private int clusterPort; - private final Map connectionsPool = new ConcurrentHashMap<>(); - - HiveProxy( - EventBus bus, - @ConfigProperty(name = "hivemq.cluster.host", defaultValue = "localhost") String clusterHostName, - @ConfigProperty(name = "hivemq.cluster.port") int clusterPort) { - - this.bus = bus; - this.clusterHostName = clusterHostName; - this.clusterPort = clusterPort; - } - - public Uni pushSimpleMsg(final String brokerName, final String topic, final String msg) { - return getMqttClient(brokerName) - .flatMap(conn -> Uni.createFrom() - .completionStage(() -> conn.publishWith().topic(topic).payload(UTF_8.encode(msg)).send())); - } - - public Uni subscribeTo(final String brokerName, final String topic) { - String internalTopicName = getInternalSubscriptionTopicName(brokerName, topic); - return getMqttClient(brokerName) - .map(conn -> { - conn.subscribeWith().topicFilter(topic).send(); - conn.toAsync().publishes(ALL, - event -> bus.send(internalTopicName, UTF_8.decode(event.getPayload().get()).toString())); - return internalTopicName; - }); - } - - private Uni getMqttClient(final String brokerName) { - Mqtt5AsyncClient client = connectionsPool.get(brokerName); - if (Objects.isNull(client)) { - return newMQTTClient().invoke(asyncClient -> connectionsPool.put(brokerName, asyncClient)); - } - - return Uni.createFrom().item(client); - } - - private Uni newMQTTClient() { - Mqtt5AsyncClient asyncClient = MqttClient.builder() - .useMqttVersion5() - .serverHost(clusterHostName) - .serverPort(clusterPort) - .buildAsync(); - - Uni ack = Uni.createFrom().completionStage(asyncClient::connect); - return ack.map(completed -> asyncClient); - } - - private String getInternalSubscriptionTopicName(final String brokerName, final String topic) { - return brokerName + "_" + topic; - } -} diff --git a/integration-tests/hivemq-client-vanilla/src/main/resources/application.properties b/integration-tests/hivemq-client-vanilla/src/main/resources/application.properties deleted file mode 100644 index 282a529..0000000 --- a/integration-tests/hivemq-client-vanilla/src/main/resources/application.properties +++ /dev/null @@ -1,4 +0,0 @@ -# hiveMQ config -hivemq.cluster.host=localhost -hivemq.cluster.port=8883 - diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerPushToTopicResourcesTest.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerPushToTopicResourcesTest.java deleted file mode 100644 index bcb6c64..0000000 --- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerPushToTopicResourcesTest.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla; - -import javax.ws.rs.core.Response; - -import org.junit.jupiter.api.Test; - -import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.junit.TestProfile; - -@TestProfile(HivemqDefaultProfile.class) -@QuarkusTest -public class BrokerPushToTopicResourcesTest extends CommonsTestSuite { - - @Test - public void verifyMessageIsPushed() { - final String message = "helloWorld!"; - final String topicName = "hello"; - - pushMessage(DEFAULT_BROKER_NAME, topicName, message, Response.Status.CREATED); - } - - @Test - public void verifyMassivePush() { - final int amountOfEvents = 1000; - final String message = "helloWorld!"; - final String topicName = "hello"; - - for (int i = 0; i < amountOfEvents; i++) { - pushMessage(DEFAULT_BROKER_NAME, topicName, message, Response.Status.CREATED); - } - } -} diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerSubscribeToTopicResourceTest.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerSubscribeToTopicResourceTest.java deleted file mode 100644 index bc91990..0000000 --- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerSubscribeToTopicResourceTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.testcontainers.shaded.org.awaitility.Awaitility.await; - -import java.net.URI; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.Response; -import javax.ws.rs.sse.SseEventSource; - -import org.hamcrest.Matchers; -import org.junit.jupiter.api.Test; - -import io.quarkus.test.common.http.TestHTTPResource; -import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.junit.TestProfile; - -@TestProfile(HivemqDefaultProfile.class) -@QuarkusTest -class BrokerSubscribeToTopicResourceTest extends CommonsTestSuite { - private final String message = "helloWorld!"; - private final String topicName = "hello"; - - @TestHTTPResource("/mqtt/") - private URI brokerSubscribeToTopicResource; - - @Test - void verifyServerSentEventSubscription() throws InterruptedException { - - // create a client for `BrokerSubscribeToTopicResources` and collect the consumed resources in a list - WebTarget target = ClientBuilder.newClient() - .target(brokerSubscribeToTopicResource + DEFAULT_BROKER_NAME + "/subscribe/hello"); - List received = new CopyOnWriteArrayList<>(); - SseEventSource source = SseEventSource.target(target).build(); - source.register(inboundSseEvent -> received.add(inboundSseEvent.readData())); - - // in a separate thread, feed the `BrokerPushToTopicResource` - ExecutorService msgSender = startSendingMsg(); - source.open(); - // check if, after at most 5 seconds, we have at least 'expectedMsgAmount' items collected, and they are what we expect - await().atMost(30, SECONDS).until(() -> received.size() >= 2); - assertThat(received, Matchers.hasItems("helloWorld!", "helloWorld!", "helloWorld!")); - source.close(); - - // shutdown the executor that is feeding the `BrokerPushToTopicResource` - msgSender.shutdown(); - msgSender.awaitTermination(5, SECONDS); - } - - private ExecutorService startSendingMsg() { - ExecutorService executorService = Executors.newSingleThreadExecutor(); - executorService.execute(() -> { - while (true) { - try { - pushMessage(DEFAULT_BROKER_NAME, topicName, message, Response.Status.CREATED); - } catch (Exception e) { - // Ignore errors when test is completed: Connection refused - } - } - }); - return executorService; - } -} diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/CommonsTestSuite.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/CommonsTestSuite.java deleted file mode 100644 index c24513d..0000000 --- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/CommonsTestSuite.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla; - -import static io.restassured.RestAssured.given; -import static org.hamcrest.CoreMatchers.equalTo; - -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -import io.quarkiverse.hivemqclient.test.vanilla.dto.MessageDto; - -public abstract class CommonsTestSuite { - - protected final static String DEFAULT_BROKER_NAME = "test"; - - protected void pushMessage(final String brokerName, final String topic, final String msg, - final Response.Status ExpectedStatusCode) { - given() - .when() - .header("Content-Type", MediaType.APPLICATION_JSON) - .body(new MessageDto(msg)) - .post("/mqtt/" + brokerName + "/push/" + topic) - .then() - .statusCode(ExpectedStatusCode.getStatusCode()) - .body("topic", equalTo(topic)); - } -} diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/NativeBrokerPushToTopicResourcesIT.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/NativeBrokerPushToTopicResourcesIT.java deleted file mode 100644 index b68471e..0000000 --- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/NativeBrokerPushToTopicResourcesIT.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.quarkiverse.hivemqclient.test.vanilla; - -import io.quarkus.test.junit.QuarkusIntegrationTest; - -@QuarkusIntegrationTest -public class NativeBrokerPushToTopicResourcesIT extends BrokerPushToTopicResourcesTest { -} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index db962f7..990882d 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -12,7 +12,7 @@ Quarkus - Hivemq Client - Integration Tests - hivemq-client-vanilla + hivemq-client-smallrye kitchensink