From 0dd61af3e6a7e6d92f121d53ebb85b429416ffce Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Mon, 6 Jul 2020 12:07:52 +0200 Subject: [PATCH] Integrate health support to reactive messaging. Also fix the durability bug when sending messages to a non-durable AMQP address --- bom/application/pom.xml | 7 ++++- docs/src/main/asciidoc/amqp.adoc | 1 - .../src/test/resources/application.properties | 1 - .../deployment/pom.xml | 4 +++ .../ReactiveMessagingBuildTimeConfig.java | 14 ++++++++++ .../SmallRyeReactiveMessagingProcessor.java | 11 ++++++++ .../runtime/pom.xml | 26 +++++++++++++++++++ .../src/main/resources/application.properties | 1 - 8 files changed, 61 insertions(+), 4 deletions(-) create mode 100644 extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 064d06c3f180b..0490d28e7a0e7 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -43,7 +43,7 @@ 1.0.13 1.0.13 1.1.0 - 2.1.1 + 2.2.0 3.28.0 1.2.1 1.3.5 @@ -3601,6 +3601,11 @@ smallrye-reactive-messaging-api ${smallrye-reactive-messaging.version} + + io.smallrye.reactive + smallrye-reactive-messaging-health + ${smallrye-reactive-messaging.version} + io.smallrye.reactive smallrye-reactive-messaging-mqtt diff --git a/docs/src/main/asciidoc/amqp.adoc b/docs/src/main/asciidoc/amqp.adoc index 294f72305f3c3..c234a5802699a 100644 --- a/docs/src/main/asciidoc/amqp.adoc +++ b/docs/src/main/asciidoc/amqp.adoc @@ -231,7 +231,6 @@ amqp-password=quarkus # Configure the AMQP connector to write to the `prices` address mp.messaging.outgoing.generated-price.connector=smallrye-amqp mp.messaging.outgoing.generated-price.address=prices -mp.messaging.outgoing.generated-price.durable=true # Configure the AMQP connector to read from the `prices` queue mp.messaging.incoming.prices.connector=smallrye-amqp diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties index f6e591ab27f7b..5506833771e38 100644 --- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties +++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/test/resources/application.properties @@ -1,7 +1,6 @@ amqp-port=5672 mp.messaging.outgoing.source.connector=smallrye-amqp -mp.messaging.outgoing.source.durable=true mp.messaging.incoming.in.connector=smallrye-amqp mp.messaging.incoming.in.address=source diff --git a/extensions/smallrye-reactive-messaging/deployment/pom.xml b/extensions/smallrye-reactive-messaging/deployment/pom.xml index 642c14ed4ed16..44d80bfdbbf23 100644 --- a/extensions/smallrye-reactive-messaging/deployment/pom.xml +++ b/extensions/smallrye-reactive-messaging/deployment/pom.xml @@ -27,6 +27,10 @@ io.quarkus quarkus-smallrye-reactive-messaging + + io.quarkus + quarkus-smallrye-health-spi + io.quarkus quarkus-mutiny-reactive-streams-operators-deployment diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java new file mode 100644 index 0000000000000..818b1e21243f5 --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java @@ -0,0 +1,14 @@ +package io.quarkus.smallrye.reactivemessaging.deployment; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "reactive-messaging", phase = ConfigPhase.BUILD_TIME) +public class ReactiveMessagingBuildTimeConfig { + /** + * Whether or not an health check is published in case the smallrye-health extension is present. + */ + @ConfigItem(name = "health.enabled", defaultValue = "true") + public boolean healthEnabled; +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index 646353c071a6b..1edb4efb88cf6 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -60,6 +60,7 @@ import io.quarkus.gizmo.MethodCreator; import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.gizmo.ResultHandle; +import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration; import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry; import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration; @@ -67,6 +68,8 @@ import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder; import io.smallrye.reactive.messaging.Invoker; import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingLivenessCheck; +import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingReadinessCheck; /** * @@ -302,6 +305,14 @@ public void transform(AnnotationsTransformer.TransformationContext ctx) { } } + @BuildStep + public void enableHealth(ReactiveMessagingBuildTimeConfig buildTimeConfig, BuildProducer producer) { + producer.produce( + new HealthBuildItem(SmallRyeReactiveMessagingLivenessCheck.class.getName(), buildTimeConfig.healthEnabled)); + producer.produce( + new HealthBuildItem(SmallRyeReactiveMessagingReadinessCheck.class.getName(), buildTimeConfig.healthEnabled)); + } + @BuildStep @Record(STATIC_INIT) public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext recorderContext, diff --git a/extensions/smallrye-reactive-messaging/runtime/pom.xml b/extensions/smallrye-reactive-messaging/runtime/pom.xml index 6a4c8114db9d3..e6be5e835ea56 100644 --- a/extensions/smallrye-reactive-messaging/runtime/pom.xml +++ b/extensions/smallrye-reactive-messaging/runtime/pom.xml @@ -43,6 +43,32 @@ + + io.smallrye.reactive + smallrye-reactive-messaging-health + + + com.fasterxml.jackson.core + jackson-core + + + javax.annotation + javax.annotation-api + + + javax.inject + javax.inject + + + org.osgi + org.osgi.annotation.versioning + + + javax.enterprise + cdi-api + + + jakarta.annotation jakarta.annotation-api diff --git a/integration-tests/reactive-messaging-amqp/src/main/resources/application.properties b/integration-tests/reactive-messaging-amqp/src/main/resources/application.properties index 03b5e793d41ba..4a815794676f5 100644 --- a/integration-tests/reactive-messaging-amqp/src/main/resources/application.properties +++ b/integration-tests/reactive-messaging-amqp/src/main/resources/application.properties @@ -7,7 +7,6 @@ amqp-username=artemis amqp-password=artemis mp.messaging.outgoing.people-out.connector=smallrye-amqp -mp.messaging.outgoing.people-out.durable=true mp.messaging.outgoing.people-out.address=people mp.messaging.incoming.people-in.connector=smallrye-amqp mp.messaging.incoming.people-in.durable=true