diff --git a/.all-contributorsrc b/.all-contributorsrc
index 0c8efc1..6636ccd 100644
--- a/.all-contributorsrc
+++ b/.all-contributorsrc
@@ -14,7 +14,17 @@
"code",
"maintenance"
]
- }
+ },
+ {
+ "login": "pjgg",
+ "name": "pjgg",
+ "avatar_url": "https://avatars.githubusercontent.com/u/3541131?v=4",
+ "profile": "https://github.com/pjgg",
+ "contributions": [
+ "code",
+ "maintenance"
+ ]
+ }
],
"contributorsPerLine": 7,
"projectName": "quarkus-hivemq-client",
diff --git a/integration-tests/hivemq-client-vanilla/pom.xml b/integration-tests/hivemq-client-smallrye/pom.xml
similarity index 88%
rename from integration-tests/hivemq-client-vanilla/pom.xml
rename to integration-tests/hivemq-client-smallrye/pom.xml
index 9f6255c..12d9f99 100644
--- a/integration-tests/hivemq-client-vanilla/pom.xml
+++ b/integration-tests/hivemq-client-smallrye/pom.xml
@@ -7,8 +7,8 @@
1.0.1-SNAPSHOT
- quarkus-hivemq-client-integration-tests-minimal
- Quarkus - Hivemq Client - Integration Tests Vanilla
+ quarkus-hivemq-client-integration-tests-smallrye
+ Quarkus - Hivemq Client - Integration Tests smallrye
1.17.6
@@ -19,15 +19,11 @@
io.quarkus
- quarkus-vertx
+ quarkus-resteasy-reactive
io.quarkus
- quarkus-resteasy-reactive-jackson
-
-
- io.quarkus
- quarkus-smallrye-openapi
+ quarkus-smallrye-reactive-messaging-mqtt
io.quarkiverse.hivemqclient
diff --git a/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceConverter.java b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceConverter.java
new file mode 100644
index 0000000..a519b40
--- /dev/null
+++ b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceConverter.java
@@ -0,0 +1,30 @@
+package io.quarkiverse.hivemqclient.test.smallrye;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.jboss.logging.Logger;
+
+import io.smallrye.reactive.messaging.annotations.Broadcast;
+
+/**
+ * A bean consuming data from the "prices" MQTT topic and applying some conversion.
+ * The result is pushed to the "my-data-stream" stream which is an in-memory stream.
+ */
+@ApplicationScoped
+public class PriceConverter {
+
+ private static final Logger LOG = Logger.getLogger(PriceConverter.class);
+ private static final double CONVERSION_RATE = 0.88;
+
+ @Incoming("prices")
+ @Outgoing("my-data-stream")
+ @Broadcast
+ public double process(byte[] priceRaw) {
+ int priceInUsd = Integer.parseInt(new String(priceRaw));
+ LOG.infof("Receiving price: %d ", priceInUsd);
+ return priceInUsd * CONVERSION_RATE;
+ }
+
+}
diff --git a/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceGenerator.java b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceGenerator.java
new file mode 100644
index 0000000..940e99a
--- /dev/null
+++ b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceGenerator.java
@@ -0,0 +1,35 @@
+package io.quarkiverse.hivemqclient.test.smallrye;
+
+import java.time.Duration;
+import java.util.Random;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.jboss.logging.Logger;
+
+import io.smallrye.mutiny.Multi;
+
+/**
+ * A bean producing random prices every second.
+ * The prices are written to a MQTT topic (prices). The MQTT configuration is specified in the application configuration.
+ */
+@ApplicationScoped
+public class PriceGenerator {
+
+ private static final Logger LOG = Logger.getLogger(PriceGenerator.class);
+
+ private Random random = new Random();
+
+ @Outgoing("topic-price")
+ public Multi generate() {
+ return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
+ .onOverflow().drop()
+ .map(tick -> {
+ int price = random.nextInt(100);
+ LOG.infof("Sending price: %d", price);
+ return price;
+ });
+ }
+
+}
diff --git a/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResource.java b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResource.java
new file mode 100644
index 0000000..1e9da7b
--- /dev/null
+++ b/integration-tests/hivemq-client-smallrye/src/main/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResource.java
@@ -0,0 +1,35 @@
+package io.quarkiverse.hivemqclient.test.smallrye;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+
+import io.smallrye.mutiny.Multi;
+
+/**
+ * A simple resource retrieving the "in-memory" "my-data-stream" and sending the items to a server sent event.
+ */
+@Path("/prices")
+public class PriceResource {
+
+ @Inject
+ @Channel("my-data-stream")
+ Multi prices;
+
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String hello() {
+ return "hello";
+ }
+
+ @GET
+ @Path("/stream")
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ public Multi stream() {
+ return prices;
+ }
+}
diff --git a/integration-tests/hivemq-client-smallrye/src/main/resources/application.properties b/integration-tests/hivemq-client-smallrye/src/main/resources/application.properties
new file mode 100644
index 0000000..8abf690
--- /dev/null
+++ b/integration-tests/hivemq-client-smallrye/src/main/resources/application.properties
@@ -0,0 +1,13 @@
+# Configure the MQTT sink (we write to it)
+mp.messaging.outgoing.topic-price.type=smallrye-mqtt-hivemq
+mp.messaging.outgoing.topic-price.topic=prices
+mp.messaging.outgoing.topic-price.host=localhost
+mp.messaging.outgoing.topic-price.port=1883
+mp.messaging.outgoing.topic-price.auto-generated-client-id=true
+
+# Configure the MQTT source (we read from it)
+mp.messaging.incoming.prices.type=smallrye-mqtt-hivemq
+mp.messaging.incoming.prices.topic=prices
+mp.messaging.incoming.prices.host=localhost
+mp.messaging.incoming.prices.port=1883
+mp.messaging.incoming.prices.auto-generated-client-id=true
\ No newline at end of file
diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqDefaultProfile.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqDefaultProfile.java
similarity index 86%
rename from integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqDefaultProfile.java
rename to integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqDefaultProfile.java
index 093dcd0..956d6aa 100644
--- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqDefaultProfile.java
+++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqDefaultProfile.java
@@ -1,4 +1,4 @@
-package io.quarkiverse.hivemqclient.test.vanilla;
+package io.quarkiverse.hivemqclient.test.smallrye;
import java.util.Collections;
import java.util.List;
diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqResources.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqResources.java
similarity index 61%
rename from integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqResources.java
rename to integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqResources.java
index 717907d..0598b0f 100644
--- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/HivemqResources.java
+++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/HivemqResources.java
@@ -1,4 +1,4 @@
-package io.quarkiverse.hivemqclient.test.vanilla;
+package io.quarkiverse.hivemqclient.test.smallrye;
import static org.testcontainers.utility.DockerImageName.parse;
@@ -17,9 +17,10 @@ public class HivemqResources implements QuarkusTestResourceLifecycleManager {
public Map start() {
hivemqContainer.start();
Map config = new HashMap<>();
- config.put("hivemq.cluster.host", hivemqContainer.getHost());
- config.put("hivemq.cluster.port", "" + hivemqContainer.getMqttPort());
-
+ config.put("mp.messaging.outgoing.topic-price.host", hivemqContainer.getHost());
+ config.put("mp.messaging.outgoing.topic-price.port", "" + hivemqContainer.getMqttPort());
+ config.put("mp.messaging.incoming.prices.host", hivemqContainer.getHost());
+ config.put("mp.messaging.incoming.prices.port", "" + hivemqContainer.getMqttPort());
return config;
}
diff --git a/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceIT.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceIT.java
new file mode 100644
index 0000000..7ad8d25
--- /dev/null
+++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceIT.java
@@ -0,0 +1,7 @@
+package io.quarkiverse.hivemqclient.test.smallrye;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+
+@QuarkusIntegrationTest
+public class PriceResourceIT extends PriceResourceTest {
+}
diff --git a/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceTest.java b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceTest.java
new file mode 100644
index 0000000..f2b8fb2
--- /dev/null
+++ b/integration-tests/hivemq-client-smallrye/src/test/java/io/quarkiverse/hivemqclient/test/smallrye/PriceResourceTest.java
@@ -0,0 +1,62 @@
+package io.quarkiverse.hivemqclient.test.smallrye;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.SseEventSource;
+
+import org.jboss.logging.Logger;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.test.common.http.TestHTTPResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.TestProfile;
+
+@TestProfile(HivemqDefaultProfile.class)
+@QuarkusTest
+public class PriceResourceTest {
+
+ private static final Logger LOG = Logger.getLogger(PriceResourceTest.class);
+
+ @TestHTTPResource("prices/stream")
+ URI pricesUrl;
+
+ @Test
+ public void shouldGetHello() {
+ given()
+ .when().get("/prices")
+ .then()
+ .statusCode(200)
+ .body(is("hello"));
+ }
+
+ @Test
+ public void shouldGetStreamOfPrices() {
+ Client client = ClientBuilder.newClient();
+ WebTarget target = client.target(pricesUrl);
+
+ AtomicInteger priceCount = new AtomicInteger();
+
+ try (SseEventSource source = SseEventSource.target(target).build()) {
+ source.register(event -> {
+ Double value = event.readData(Double.class);
+ LOG.infof("Received price: %f", value);
+ priceCount.incrementAndGet();
+ });
+ source.open();
+ Thread.sleep(15 * 1000L);
+ } catch (InterruptedException ignored) {
+ }
+
+ int count = priceCount.get();
+ assertTrue(count > 1, "Expected more than 2 prices read from the source, got " + count);
+ }
+
+}
diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/HttpBridgeApplication.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/HttpBridgeApplication.java
deleted file mode 100644
index 1db3e92..0000000
--- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/HttpBridgeApplication.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla;
-
-import javax.ws.rs.core.Application;
-
-import org.eclipse.microprofile.openapi.annotations.OpenAPIDefinition;
-import org.eclipse.microprofile.openapi.annotations.info.Contact;
-import org.eclipse.microprofile.openapi.annotations.info.Info;
-import org.eclipse.microprofile.openapi.annotations.info.License;
-
-@OpenAPIDefinition(info = @Info(title = "Http / HiveMQ bridge API example", version = "1.0.0", contact = @Contact(name = "Http / HiveMQ bridge API Support", url = "http://exampleurl.com/contact", email = "fake-support-examples@hivemq.com"), license = @License(name = "Apache 2.0", url = "https://www.apache.org/licenses/LICENSE-2.0.html")))
-public class HttpBridgeApplication extends Application {
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/MessageDto.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/MessageDto.java
deleted file mode 100644
index 742f826..0000000
--- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/MessageDto.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla.dto;
-
-public class MessageDto {
-
- private String value;
-
- public MessageDto() {
- }
-
- public MessageDto(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
- }
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/PublishMsgRespDto.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/PublishMsgRespDto.java
deleted file mode 100644
index 6c65fe4..0000000
--- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/dto/PublishMsgRespDto.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla.dto;
-
-public class PublishMsgRespDto {
-
- private final String topic;
-
- public PublishMsgRespDto(String topic) {
- this.topic = topic;
- }
-
- public String getTopic() {
- return topic;
- }
-
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerPushToTopicResources.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerPushToTopicResources.java
deleted file mode 100644
index e75dc8e..0000000
--- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerPushToTopicResources.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla.resources;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.jboss.resteasy.reactive.RestPath;
-
-import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
-
-import io.quarkiverse.hivemqclient.test.vanilla.dto.MessageDto;
-import io.quarkiverse.hivemqclient.test.vanilla.dto.PublishMsgRespDto;
-import io.quarkiverse.hivemqclient.test.vanilla.services.HiveProxy;
-import io.smallrye.mutiny.Uni;
-
-@Consumes(MediaType.APPLICATION_JSON)
-@Produces(MediaType.APPLICATION_JSON)
-@Path("/mqtt/{brokerName}/push")
-public class BrokerPushToTopicResources {
-
- private HiveProxy hiveProxy;
-
- BrokerPushToTopicResources(HiveProxy hiveProxy) {
- this.hiveProxy = hiveProxy;
- }
-
- @POST
- @Path("/{topicName}")
- public Uni pushMessage(@RestPath("brokerName") String brokerName, @RestPath("topicName") String topicName,
- MessageDto message) {
- return hiveProxy
- .pushSimpleMsg(brokerName, topicName, message.getValue())
- .onItem().transform(payload -> makePushMsgResponse(topicName, payload));
- }
-
- private static Response makePushMsgResponse(String topicName, Mqtt5PublishResult payload) {
- if (payload.getError().isPresent()) {
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(payload.getError().get().getMessage()).build();
- }
-
- return Response.status(Response.Status.CREATED).entity(new PublishMsgRespDto(topicName)).build();
- }
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerSubscribeToTopicResources.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerSubscribeToTopicResources.java
deleted file mode 100644
index 368928b..0000000
--- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/resources/BrokerSubscribeToTopicResources.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla.resources;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.MediaType;
-
-import org.jboss.resteasy.reactive.RestPath;
-import org.jboss.resteasy.reactive.RestStreamElementType;
-
-import io.quarkiverse.hivemqclient.test.vanilla.services.HiveProxy;
-import io.smallrye.mutiny.Multi;
-import io.vertx.mutiny.core.eventbus.EventBus;
-
-@Path("/mqtt/{brokerName}/subscribe")
-public class BrokerSubscribeToTopicResources {
-
- private EventBus bus;
- private HiveProxy hiveProxy;
-
- BrokerSubscribeToTopicResources(EventBus bus, HiveProxy hiveProxy) {
- this.bus = bus;
- this.hiveProxy = hiveProxy;
- }
-
- @GET
- @Path("/{topicName}")
- @RestStreamElementType(MediaType.TEXT_PLAIN)
- public Multi subscribeToTopic(@RestPath("brokerName") String brokerName, @RestPath("topicName") String topicName) {
- return hiveProxy.subscribeTo(brokerName, topicName).toMulti()
- .flatMap(topic -> bus. consumer(topic).bodyStream().toMulti());
- }
-
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/services/HiveProxy.java b/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/services/HiveProxy.java
deleted file mode 100644
index f5f6c85..0000000
--- a/integration-tests/hivemq-client-vanilla/src/main/java/io/quarkiverse/hivemqclient/test/vanilla/services/HiveProxy.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla.services;
-
-import static com.hivemq.client.mqtt.MqttGlobalPublishFilter.ALL;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.inject.Singleton;
-
-import org.eclipse.microprofile.config.inject.ConfigProperty;
-
-import com.hivemq.client.mqtt.MqttClient;
-import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
-import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
-import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
-
-import io.smallrye.mutiny.Uni;
-import io.vertx.mutiny.core.eventbus.EventBus;
-
-@Singleton
-public class HiveProxy {
-
- private EventBus bus;
- private String clusterHostName;
- private int clusterPort;
- private final Map connectionsPool = new ConcurrentHashMap<>();
-
- HiveProxy(
- EventBus bus,
- @ConfigProperty(name = "hivemq.cluster.host", defaultValue = "localhost") String clusterHostName,
- @ConfigProperty(name = "hivemq.cluster.port") int clusterPort) {
-
- this.bus = bus;
- this.clusterHostName = clusterHostName;
- this.clusterPort = clusterPort;
- }
-
- public Uni pushSimpleMsg(final String brokerName, final String topic, final String msg) {
- return getMqttClient(brokerName)
- .flatMap(conn -> Uni.createFrom()
- .completionStage(() -> conn.publishWith().topic(topic).payload(UTF_8.encode(msg)).send()));
- }
-
- public Uni subscribeTo(final String brokerName, final String topic) {
- String internalTopicName = getInternalSubscriptionTopicName(brokerName, topic);
- return getMqttClient(brokerName)
- .map(conn -> {
- conn.subscribeWith().topicFilter(topic).send();
- conn.toAsync().publishes(ALL,
- event -> bus.send(internalTopicName, UTF_8.decode(event.getPayload().get()).toString()));
- return internalTopicName;
- });
- }
-
- private Uni getMqttClient(final String brokerName) {
- Mqtt5AsyncClient client = connectionsPool.get(brokerName);
- if (Objects.isNull(client)) {
- return newMQTTClient().invoke(asyncClient -> connectionsPool.put(brokerName, asyncClient));
- }
-
- return Uni.createFrom().item(client);
- }
-
- private Uni newMQTTClient() {
- Mqtt5AsyncClient asyncClient = MqttClient.builder()
- .useMqttVersion5()
- .serverHost(clusterHostName)
- .serverPort(clusterPort)
- .buildAsync();
-
- Uni ack = Uni.createFrom().completionStage(asyncClient::connect);
- return ack.map(completed -> asyncClient);
- }
-
- private String getInternalSubscriptionTopicName(final String brokerName, final String topic) {
- return brokerName + "_" + topic;
- }
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/main/resources/application.properties b/integration-tests/hivemq-client-vanilla/src/main/resources/application.properties
deleted file mode 100644
index 282a529..0000000
--- a/integration-tests/hivemq-client-vanilla/src/main/resources/application.properties
+++ /dev/null
@@ -1,4 +0,0 @@
-# hiveMQ config
-hivemq.cluster.host=localhost
-hivemq.cluster.port=8883
-
diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerPushToTopicResourcesTest.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerPushToTopicResourcesTest.java
deleted file mode 100644
index bcb6c64..0000000
--- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerPushToTopicResourcesTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla;
-
-import javax.ws.rs.core.Response;
-
-import org.junit.jupiter.api.Test;
-
-import io.quarkus.test.junit.QuarkusTest;
-import io.quarkus.test.junit.TestProfile;
-
-@TestProfile(HivemqDefaultProfile.class)
-@QuarkusTest
-public class BrokerPushToTopicResourcesTest extends CommonsTestSuite {
-
- @Test
- public void verifyMessageIsPushed() {
- final String message = "helloWorld!";
- final String topicName = "hello";
-
- pushMessage(DEFAULT_BROKER_NAME, topicName, message, Response.Status.CREATED);
- }
-
- @Test
- public void verifyMassivePush() {
- final int amountOfEvents = 1000;
- final String message = "helloWorld!";
- final String topicName = "hello";
-
- for (int i = 0; i < amountOfEvents; i++) {
- pushMessage(DEFAULT_BROKER_NAME, topicName, message, Response.Status.CREATED);
- }
- }
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerSubscribeToTopicResourceTest.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerSubscribeToTopicResourceTest.java
deleted file mode 100644
index bc91990..0000000
--- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/BrokerSubscribeToTopicResourceTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
-
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.sse.SseEventSource;
-
-import org.hamcrest.Matchers;
-import org.junit.jupiter.api.Test;
-
-import io.quarkus.test.common.http.TestHTTPResource;
-import io.quarkus.test.junit.QuarkusTest;
-import io.quarkus.test.junit.TestProfile;
-
-@TestProfile(HivemqDefaultProfile.class)
-@QuarkusTest
-class BrokerSubscribeToTopicResourceTest extends CommonsTestSuite {
- private final String message = "helloWorld!";
- private final String topicName = "hello";
-
- @TestHTTPResource("/mqtt/")
- private URI brokerSubscribeToTopicResource;
-
- @Test
- void verifyServerSentEventSubscription() throws InterruptedException {
-
- // create a client for `BrokerSubscribeToTopicResources` and collect the consumed resources in a list
- WebTarget target = ClientBuilder.newClient()
- .target(brokerSubscribeToTopicResource + DEFAULT_BROKER_NAME + "/subscribe/hello");
- List received = new CopyOnWriteArrayList<>();
- SseEventSource source = SseEventSource.target(target).build();
- source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
-
- // in a separate thread, feed the `BrokerPushToTopicResource`
- ExecutorService msgSender = startSendingMsg();
- source.open();
- // check if, after at most 5 seconds, we have at least 'expectedMsgAmount' items collected, and they are what we expect
- await().atMost(30, SECONDS).until(() -> received.size() >= 2);
- assertThat(received, Matchers.hasItems("helloWorld!", "helloWorld!", "helloWorld!"));
- source.close();
-
- // shutdown the executor that is feeding the `BrokerPushToTopicResource`
- msgSender.shutdown();
- msgSender.awaitTermination(5, SECONDS);
- }
-
- private ExecutorService startSendingMsg() {
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- executorService.execute(() -> {
- while (true) {
- try {
- pushMessage(DEFAULT_BROKER_NAME, topicName, message, Response.Status.CREATED);
- } catch (Exception e) {
- // Ignore errors when test is completed: Connection refused
- }
- }
- });
- return executorService;
- }
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/CommonsTestSuite.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/CommonsTestSuite.java
deleted file mode 100644
index c24513d..0000000
--- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/CommonsTestSuite.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla;
-
-import static io.restassured.RestAssured.given;
-import static org.hamcrest.CoreMatchers.equalTo;
-
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import io.quarkiverse.hivemqclient.test.vanilla.dto.MessageDto;
-
-public abstract class CommonsTestSuite {
-
- protected final static String DEFAULT_BROKER_NAME = "test";
-
- protected void pushMessage(final String brokerName, final String topic, final String msg,
- final Response.Status ExpectedStatusCode) {
- given()
- .when()
- .header("Content-Type", MediaType.APPLICATION_JSON)
- .body(new MessageDto(msg))
- .post("/mqtt/" + brokerName + "/push/" + topic)
- .then()
- .statusCode(ExpectedStatusCode.getStatusCode())
- .body("topic", equalTo(topic));
- }
-}
diff --git a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/NativeBrokerPushToTopicResourcesIT.java b/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/NativeBrokerPushToTopicResourcesIT.java
deleted file mode 100644
index b68471e..0000000
--- a/integration-tests/hivemq-client-vanilla/src/test/java/io/quarkiverse/hivemqclient/test/vanilla/NativeBrokerPushToTopicResourcesIT.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package io.quarkiverse.hivemqclient.test.vanilla;
-
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-
-@QuarkusIntegrationTest
-public class NativeBrokerPushToTopicResourcesIT extends BrokerPushToTopicResourcesTest {
-}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index db962f7..990882d 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -12,7 +12,7 @@
Quarkus - Hivemq Client - Integration Tests
- hivemq-client-vanilla
+ hivemq-client-smallrye
kitchensink