Skip to content

Commit

Permalink
Add smallrye coverage to integrations tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Gonzalez Granados authored and pjgg committed Oct 1, 2023
1 parent ab41c12 commit 0318763
Show file tree
Hide file tree
Showing 22 changed files with 204 additions and 361 deletions.
12 changes: 11 additions & 1 deletion .all-contributorsrc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@
"code",
"maintenance"
]
}
},
{
"login": "pjgg",
"name": "pjgg",
"avatar_url": "https://avatars.githubusercontent.com/u/3541131?v=4",
"profile": "https://github.com/pjgg",
"contributions": [
"code",
"maintenance"
]
}
],
"contributorsPerLine": 7,
"projectName": "quarkus-hivemq-client",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
<version>1.0.1-SNAPSHOT</version>
</parent>

<artifactId>quarkus-hivemq-client-integration-tests-minimal</artifactId>
<name>Quarkus - Hivemq Client - Integration Tests Vanilla</name>
<artifactId>quarkus-hivemq-client-integration-tests-smallrye</artifactId>
<name>Quarkus - Hivemq Client - Integration Tests smallrye</name>

<properties>
<hive.testcontainers.version>1.17.6</hive.testcontainers.version>
Expand All @@ -19,15 +19,11 @@
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
<artifactId>quarkus-smallrye-reactive-messaging-mqtt</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.hivemqclient</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.quarkiverse.hivemqclient.test.smallrye;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

import io.smallrye.reactive.messaging.annotations.Broadcast;

/**
* A bean consuming data from the "prices" MQTT topic and applying some conversion.
* The result is pushed to the "my-data-stream" stream which is an in-memory stream.
*/
@ApplicationScoped
public class PriceConverter {

private static final Logger LOG = Logger.getLogger(PriceConverter.class);
private static final double CONVERSION_RATE = 0.88;

@Incoming("prices")
@Outgoing("my-data-stream")
@Broadcast
public double process(byte[] priceRaw) {
int priceInUsd = Integer.parseInt(new String(priceRaw));
LOG.infof("Receiving price: %d ", priceInUsd);
return priceInUsd * CONVERSION_RATE;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkiverse.hivemqclient.test.smallrye;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

import io.smallrye.mutiny.Multi;

/**
* A bean producing random prices every second.
* The prices are written to a MQTT topic (prices). The MQTT configuration is specified in the application configuration.
*/
@ApplicationScoped
public class PriceGenerator {

private static final Logger LOG = Logger.getLogger(PriceGenerator.class);

private Random random = new Random();

@Outgoing("topic-price")
public Multi<Integer> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.map(tick -> {
int price = random.nextInt(100);
LOG.infof("Sending price: %d", price);
return price;
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkiverse.hivemqclient.test.smallrye;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Multi;

/**
* A simple resource retrieving the "in-memory" "my-data-stream" and sending the items to a server sent event.
*/
@Path("/prices")
public class PriceResource {

@Inject
@Channel("my-data-stream")
Multi<Double> prices;

@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "hello";
}

@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Double> stream() {
return prices;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Configure the MQTT sink (we write to it)
mp.messaging.outgoing.topic-price.type=smallrye-mqtt-hivemq
mp.messaging.outgoing.topic-price.topic=prices
mp.messaging.outgoing.topic-price.host=localhost
mp.messaging.outgoing.topic-price.port=1883
mp.messaging.outgoing.topic-price.auto-generated-client-id=true

# Configure the MQTT source (we read from it)
mp.messaging.incoming.prices.type=smallrye-mqtt-hivemq
mp.messaging.incoming.prices.topic=prices
mp.messaging.incoming.prices.host=localhost
mp.messaging.incoming.prices.port=1883
mp.messaging.incoming.prices.auto-generated-client-id=true
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkiverse.hivemqclient.test.vanilla;
package io.quarkiverse.hivemqclient.test.smallrye;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkiverse.hivemqclient.test.vanilla;
package io.quarkiverse.hivemqclient.test.smallrye;

import static org.testcontainers.utility.DockerImageName.parse;

Expand All @@ -17,9 +17,10 @@ public class HivemqResources implements QuarkusTestResourceLifecycleManager {
public Map<String, String> start() {
hivemqContainer.start();
Map<String, String> config = new HashMap<>();
config.put("hivemq.cluster.host", hivemqContainer.getHost());
config.put("hivemq.cluster.port", "" + hivemqContainer.getMqttPort());

config.put("mp.messaging.outgoing.topic-price.host", hivemqContainer.getHost());
config.put("mp.messaging.outgoing.topic-price.port", "" + hivemqContainer.getMqttPort());
config.put("mp.messaging.incoming.prices.host", hivemqContainer.getHost());
config.put("mp.messaging.incoming.prices.port", "" + hivemqContainer.getMqttPort());
return config;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkiverse.hivemqclient.test.smallrye;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class PriceResourceIT extends PriceResourceTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.quarkiverse.hivemqclient.test.smallrye;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.SseEventSource;

import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@TestProfile(HivemqDefaultProfile.class)
@QuarkusTest
public class PriceResourceTest {

private static final Logger LOG = Logger.getLogger(PriceResourceTest.class);

@TestHTTPResource("prices/stream")
URI pricesUrl;

@Test
public void shouldGetHello() {
given()
.when().get("/prices")
.then()
.statusCode(200)
.body(is("hello"));
}

@Test
public void shouldGetStreamOfPrices() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target(pricesUrl);

AtomicInteger priceCount = new AtomicInteger();

try (SseEventSource source = SseEventSource.target(target).build()) {
source.register(event -> {
Double value = event.readData(Double.class);
LOG.infof("Received price: %f", value);
priceCount.incrementAndGet();
});
source.open();
Thread.sleep(15 * 1000L);
} catch (InterruptedException ignored) {
}

int count = priceCount.get();
assertTrue(count > 1, "Expected more than 2 prices read from the source, got " + count);
}

}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 0318763

Please sign in to comment.