diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index caca6cd..133be05 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,6 +36,18 @@ We use this information to acknowledge your contributions in release announcemen If you're considering anything more than correcting a typo or fixing a minor bug, please discuss it by [creating an issue on our issue tracker](https://github.com/quarkiverse/quarkus-kafka-streams-processor/issues) before submitting a pull request. We're happy to provide guidance but please spend an hour or two researching the subject on your own including searching the forums for prior discussions. +### Native build support + +This extension is compatible with native compilation. +You can validate your changes early following Quarkus' documentation [here](https://quarkus.io/guides/building-native-image), using either a builder image or a local installation of GraalVM. + +```sh +# With GraalVM installed +mvn install -Dnative +# With a builder image +mvn install -Dnative -Dquarkus.native.container-build=true +``` + ### Code reviews All submissions, need to be reviewed by at least one committer before diff --git a/api/pom.xml b/api/pom.xml index b573a9b..0a6c6b0 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -61,6 +61,15 @@ io.smallrye.config smallrye-config-core + + + org.eclipse.microprofile.fault-tolerance + microprofile-fault-tolerance-api + + + io.quarkus + quarkus-core + diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/exception/RetryableException.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/exception/RetryableException.java index 01102c7..d514dd0 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/exception/RetryableException.java +++ b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/exception/RetryableException.java @@ -19,9 +19,12 @@ */ package io.quarkiverse.kafkastreamsprocessor.api.exception; +import io.quarkus.runtime.annotations.RegisterForReflection; + /** * Generic {@link RuntimeException} use by the RetryDecorator to specify that a message should be processed again. */ +@RegisterForReflection public class RetryableException extends RuntimeException { /** * {@inheritDoc} diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/KStreamsProcessorConfig.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/KStreamsProcessorConfig.java index 2e56bb6..b23a1e0 100644 --- a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/KStreamsProcessorConfig.java +++ b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/KStreamsProcessorConfig.java @@ -20,6 +20,7 @@ package io.quarkiverse.kafkastreamsprocessor.api.properties; +import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException; import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithDefault; @@ -50,4 +51,10 @@ public interface KStreamsProcessorConfig { */ @WithDefault("continue") String errorStrategy(); + + /** + * All configuration related to the RetryDecorator and reprocessing a record when a {@link RetryableException} has + * been caught + */ + RetryConfig retry(); } diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/RetryConfig.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/RetryConfig.java new file mode 100644 index 0000000..2167684 --- /dev/null +++ b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/properties/RetryConfig.java @@ -0,0 +1,112 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.api.properties; + +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Optional; + +import org.eclipse.microprofile.faulttolerance.Retry; + +import io.smallrye.config.WithDefault; + +public interface RetryConfig { + + /** + * Max number of retries. + * + * @see Retry#maxRetries() + */ + @WithDefault("-1") + int maxRetries(); + + /** + * The delay between retries. + * + * @see Retry#delay() + */ + @WithDefault("0") + long delay(); + + /** + * The unit for {@link #delay()}. Default milliseconds. + * + * @see Retry#delayUnit() + */ + @WithDefault("MILLIS") + ChronoUnit delayUnit(); + + /** + * The max duration. + * + * @see Retry#maxDuration() + */ + @WithDefault("180000") + long maxDuration(); + + /** + * The duration unit for {@link #maxDuration()}. + *

+ * Milliseconds by default. + *

+ * + * @see Retry#durationUnit() + */ + @WithDefault("MILLIS") + ChronoUnit durationUnit(); + + /** + * Jitter value to randomly vary retry delays for. + * + * @see Retry#jitter() + */ + @WithDefault("200") + long jitter(); + + /** + * The delay unit for {@link #jitter()}. Default is milliseconds. + * + * @see Retry#jitterDelayUnit() + */ + @WithDefault("MILLIS") + ChronoUnit jitterDelayUnit(); + + /** + * The list of exception types that should trigger a retry. + *

+ * Default is the provided {@link io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException}. + *

+ * + * @see Retry#retryOn() + */ + @WithDefault("io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException") + List> retryOn(); + + /** + * The list of exception types that should not trigger a retry. + *

+ * Default is empty list + *

+ * + * @see Retry#abortOn() + */ + @WithDefault("") + Optional>> abortOn(); +} \ No newline at end of file diff --git a/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java b/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java index 0a3dd76..5d8e5f8 100644 --- a/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java +++ b/deployment/src/main/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/deployment/KafkaStreamsProcessorProcessor.java @@ -19,6 +19,8 @@ */ package io.quarkiverse.kafkastreamsprocessor.kafka.streams.deployment; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.jandex.AnnotationInstance; import org.jboss.jandex.AnnotationTarget; import org.jboss.jandex.DotName; @@ -55,4 +57,20 @@ public void configureNativeExecutable(CombinedIndexBuildItem combinedIndex, } } } + + @BuildStep + public void registerRetryExceptions(BuildProducer reflectiveClass) { + Config config = ConfigProvider.getConfig(); + + config.getOptionalValue("kafkastreamsprocessor.retry.retry-on", String[].class) + .ifPresent(retryExceptions -> reflectiveClass.produce(ReflectiveClassBuildItem.builder(retryExceptions) + .methods(false) + .fields(false) + .build())); + config.getOptionalValue("kafkastreamsprocessor.retry.abort-on", String[].class) + .ifPresent(abortExceptions -> reflectiveClass.produce(ReflectiveClassBuildItem.builder(abortExceptions) + .methods(false) + .fields(false) + .build())); + } } diff --git a/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java b/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java index f8223f5..7ca5620 100644 --- a/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java +++ b/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorTest.java @@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException; import io.quarkus.builder.BuildChainBuilder; import io.quarkus.deployment.builditem.GeneratedResourceBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; @@ -71,6 +71,8 @@ private static void checkProperClassesAreRegistered() { .collect(Collectors.toList()); assertThat(allRegisteredClasses, hasItem(MyProcessor.class.getName())); + // Default retryOn exception for Fault Tolerance + assertThat(allRegisteredClasses, hasItem(RetryableException.class.getName())); } @Test @@ -79,9 +81,4 @@ void shouldRegisterTypesForReflection() { assertNull(registeredClasses); } - @Processor - static class MyProcessor { - - } - } diff --git a/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorWithRetryTest.java b/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorWithRetryTest.java new file mode 100644 index 0000000..ed7bdfa --- /dev/null +++ b/deployment/src/test/java/io/quarkiverse/kafkastreamsprocessor/kafka/streams/test/KafkaStreamsProcessorProcessorWithRetryTest.java @@ -0,0 +1,99 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.kafka.streams.test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException; +import io.quarkus.builder.BuildChainBuilder; +import io.quarkus.deployment.builditem.GeneratedResourceBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.test.QuarkusUnitTest; + +public class KafkaStreamsProcessorProcessorWithRetryTest { + + private static volatile List registeredClasses; + + @RegisterExtension + static QuarkusUnitTest runner = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClass(io.quarkiverse.kafkastreamsprocessor.kafka.streams.test.MyProcessor.class)) + .overrideConfigKey("kafkastreamsprocessor.input.topics", "ping-events") + .overrideConfigKey("kafkastreamsprocessor.output.topic", "pong-events") + .overrideConfigKey("quarkus.kafka-streams.topics", "ping-events,pong-events") + .overrideConfigKey("kafkastreamsprocessor.retry.retry-on", + "io.quarkiverse.kafkastreamsprocessor.kafka.streams.test.KafkaStreamsProcessorProcessorWithRetryTest$RetryException") + .overrideConfigKey("kafkastreamsprocessor.retry.abort-on", + "io.quarkiverse.kafkastreamsprocessor.kafka.streams.test.KafkaStreamsProcessorProcessorWithRetryTest$AbortException") + .addBuildChainCustomizer(buildCustomizer()); + + private static Consumer buildCustomizer() { + return chainBuilder -> chainBuilder.addBuildStep( + context -> { + registeredClasses = context.consumeMulti(ReflectiveClassBuildItem.class); + checkProperClassesAreRegistered(); + }) + .consumes(ReflectiveClassBuildItem.class) + .produces(GeneratedResourceBuildItem.class) + .build(); + } + + private static void checkProperClassesAreRegistered() { + assertNotNull(registeredClasses); + + List allRegisteredClasses = registeredClasses.stream() + .flatMap(c -> c.getClassNames().stream()) + .collect(Collectors.toList()); + + assertThat(allRegisteredClasses, hasItem(MyProcessor.class.getName())); + // Default retryOn exception for Fault Tolerance + assertThat(allRegisteredClasses, hasItem(RetryableException.class.getName())); + // Explicit retryOn & abortOn exceptions + assertThat(allRegisteredClasses, hasItem(RetryException.class.getName())); + assertThat(allRegisteredClasses, hasItem(AbortException.class.getName())); + } + + @Test + void shouldRegisterTypesForReflection() { + // if it gets there, it succeeded + assertNull(registeredClasses); + } + + public static class RetryException { + + } + + public static class AbortException { + + } + +} diff --git a/docs/modules/ROOT/pages/includes/kafka-streams-processor-configuration-keys.adoc b/docs/modules/ROOT/pages/includes/kafka-streams-processor-configuration-keys.adoc index 93ffd43..17f0d26 100644 --- a/docs/modules/ROOT/pages/includes/kafka-streams-processor-configuration-keys.adoc +++ b/docs/modules/ROOT/pages/includes/kafka-streams-processor-configuration-keys.adoc @@ -1,3 +1,4 @@ +:retryLink: https://download.eclipse.org/microprofile/microprofile-fault-tolerance-3.0/microprofile-fault-tolerance-spec-3.0.html#retry :summaryTableId: kafka-streams-processor-configuration-keys [.configuration-legend] icon:lock[title=Fixed at build time] Configuration property fixed at build time - All other configuration properties are overridable at runtime @@ -40,17 +41,17 @@ endif::add-copy-button-to-env-var[] -- a| string | -a| [[kafka-streams-processor-configuration-keys_kafka.streams.processor-sink-topic]]`link:#kafka-streams-processor-configuration-keys_kafka.streams.processor-sink-topic[kafkastreamsprocessor.output.sinks..topic]` +a| [[kafka-streams-processor-configuration-keys_kafka.streams.processor-sink-topic]]`link:#kafka-streams-processor-configuration-keys_kafka.streams.processor-sink-topic[kafka.streams.processor.topic]` [.description] -- The Kafka topic for outgoing messages for the given sink name. ifdef::add-copy-button-to-env-var[] -Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_OUTPUT_SINKS_sink in uppercase_TOPIC+++[] +Environment variable: env_var_with_copy_button:+++KAFKA_STREAMS_PROCESSOR__sink__TOPIC+++[] endif::add-copy-button-to-env-var[] ifndef::add-copy-button-to-env-var[] -Environment variable: `+++KAFKASTREAMSPROCESSOR_OUTPUT_SINKS_sink in uppercase_TOPIC+++` +Environment variable: `+++KAFKA_STREAMS_PROCESSOR__sink__TOPIC+++` endif::add-copy-button-to-env-var[] --| string | @@ -118,53 +119,168 @@ endif::add-copy-button-to-env-var[] --| boolean | false -a| [[kafka-streams-processor-configuration-keys_kafka-streams-processor.retry.maxRetries]]`link:#kafka-streams-processor-configuration-keys_kafka-streams-processor.retry.maxRetries[kafka-streams-processor.retry.maxRetries]` +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.error-strategy]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.error-strategy[kafkastreamsprocessor.error-strategy]` [.description] -- -The retries setting determines how many times the producer will attempt to send a message before marking it as failed. +Kafka Streams Processor error strategy ifdef::add-copy-button-to-env-var[] -Environment variable: env_var_with_copy_button:+++KAFKA_STREAMS_PROCESSOR_RETRY_MAXRETRIES+++[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_ERROR_STRATEGY+++[] endif::add-copy-button-to-env-var[] ifndef::add-copy-button-to-env-var[] -Environment variable: `+++KAFKA_STREAMS_PROCESSOR_RETRY_MAXRETRIES+++` +Environment variable: `+++KAFKASTREAMSPROCESSOR_ERROR_STRATEGY+++` endif::add-copy-button-to-env-var[] ---| int +--| String +| continue + +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.max-retries]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.max-retries[kafkastreamsprocessor.retry.max-retries]` + +[.description] +-- + +Max number of retries. See {retryLink}[microprofile doc]. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_MAX_RETRIES+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_MAX_RETRIES+++` +endif::add-copy-button-to-env-var[] +--| String +| -1 + +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.delay]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.delay[kafkastreamsprocessor.retry.delay]` + +[.description] +-- + +The delay between retries. See {retryLink}[microprofile doc]. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_DELAY+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_DELAY+++` +endif::add-copy-button-to-env-var[] +--| long | 0 -a| [[kafka-streams-processor-configuration-keys_kafka-streams-processor.retry.retryOn]]`link:#kafka-streams-processor-configuration-keys_kafka-streams-processor.retry.retryOn[kafka-streams-processor.retry.retryOn]` +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.delay-unit]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.delay-unit[kafkastreamsprocessor.retry.delay-unit]` [.description] -- -Specify the failures to retry on +The unit for delay. Default milliseconds. See {retryLink}[microprofile doc]. ifdef::add-copy-button-to-env-var[] -Environment variable: env_var_with_copy_button:+++KAFKA_STREAMS_PROCESSOR_RETRY_RETRYON+++[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_DELAY_UNIT+++[] endif::add-copy-button-to-env-var[] ifndef::add-copy-button-to-env-var[] -Environment variable: `+++KAFKA_STREAMS_PROCESSOR_RETRY_RETRYON+++` +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_DELAY_UNIT+++` endif::add-copy-button-to-env-var[] ---| Error.class -| +--| ChronoUnit +| MILLIS -a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.error-strategy]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.error-strategy[kafkastreamsprocessor.error-strategy]` +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.max-duration]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.max-duration[kafkastreamsprocessor.retry.max-duration]` [.description] -- -Kafka Streams Processor error strategy +The max duration. See {retryLink}[microprofile doc]. ifdef::add-copy-button-to-env-var[] -Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_ERROR_STRATEGY+++[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_MAX_DURATION+++[] endif::add-copy-button-to-env-var[] ifndef::add-copy-button-to-env-var[] -Environment variable: `+++KAFKASTREAMSPROCESSOR_ERROR_STRATEGY+++` +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_MAX_DURATION+++` endif::add-copy-button-to-env-var[] ---| String -| continue +--| long +| 180000 -|=== +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.duration-unit]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.duration-unit[kafkastreamsprocessor.retry.duration-unit]` + +[.description] +-- + +The unit for max duration. See {retryLink}[microprofile doc]. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_DURATION_UNIT+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_DURATION_UNIT+++` +endif::add-copy-button-to-env-var[] +--| ChronoUnit +| MILLIS + +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.jitter]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.jitter[kafkastreamsprocessor.retry.jitter]` + +[.description] +-- + +Jitter value to randomly vary retry delays for. See {retryLink}[microprofile doc]. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_JITTER+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_JITTER+++` +endif::add-copy-button-to-env-var[] +--| long +| 200 + +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.jitter-delay-unit]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.jitter-delay-unit[kafkastreamsprocessor.retry.jitter-delay-unit]` + +[.description] +-- + +The delay unit for jitter. Default is milliseconds. See {retryLink}[microprofile doc]. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_JITTER_DELAY_UNIT+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_JITTER_DELAY_UNIT+++` +endif::add-copy-button-to-env-var[] +--| ChronoUnit +| MILLIS +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.retry-on]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.retry-on[kafkastreamsprocessor.retry.retry-on]` + +[.description] +-- + +The list of exception types that should trigger a retry. +Default is the provided io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException. +See {retryLink}[microprofile doc]. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_RETRY_ON+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_RETRY_ON+++` +endif::add-copy-button-to-env-var[] +--| Exception class names +| `[io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException]` + +a| [[kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.abort-on]]`link:#kafka-streams-processor-configuration-keys_kafkastreamsprocessor.retry.abort-on[kafkastreamsprocessor.retry.abort-on]` + +[.description] +-- + +The list of exception types that should not trigger a retry. +Default is empty list +See {retryLink}[microprofile doc]. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++KAFKASTREAMSPROCESSOR_RETRY_ABORT_ON+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++KAFKASTREAMSPROCESSOR_RETRY_ABORT_ON+++` +endif::add-copy-button-to-env-var[] +--| Exception class names +| `[]` + +|=== diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc index 6941ed4..b42a104 100644 --- a/docs/modules/ROOT/pages/index.adoc +++ b/docs/modules/ROOT/pages/index.adoc @@ -385,11 +385,11 @@ The default behavior can be overriden via the following configuration, ie: .application.properties [source,properties] ---- -kafka-streams-processor.retry.maxRetries=-1 -kafka-streams-processor.retry.retryOn=io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException +kafkastreamsprocessor.retry.max-retries=-1 +kafkastreamsprocessor.retry.retry-on=io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException ---- -As this implementation is based on https://download.eclipse.org/microprofile/microprofile-fault-tolerance-3.0/microprofile-fault-tolerance-spec-3.0.html#_retry_usage[Eclipse Microprofile Fault Tolerance] it's also possible to override some default policy (ie: maxDuration, jitter...) under the prefix `kafka-streams-processor.retry.`. +As this implementation is based on https://download.eclipse.org/microprofile/microprofile-fault-tolerance-3.0/microprofile-fault-tolerance-spec-3.0.html#_retry_usage[Eclipse Microprofile Fault Tolerance] it's also possible to override some default policy (ie: maxDuration, jitter...) under the prefix `kafkastreamsprocessor.retry.`. ==== Ad hoc usage diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java index 84d1af2..92d75a7 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java @@ -85,7 +85,7 @@ public void process(Record record) { log.info("An exception that has been raised by the processor will not be retried.\n" + "Possible causes:\n" + "- That's not a managed retryable exception\n" - + "- maxRetries or maxDuration limits have been reached"); + + "- max-retries or max-duration limits have been reached"); throw e; } } diff --git a/impl/src/main/resources/META-INF/microprofile-config.properties b/impl/src/main/resources/META-INF/microprofile-config.properties index d995326..b3f2dae 100644 --- a/impl/src/main/resources/META-INF/microprofile-config.properties +++ b/impl/src/main/resources/META-INF/microprofile-config.properties @@ -4,24 +4,15 @@ kafka-streams.default.deserialization.exception.handler=io.quarkiverse.kafkastre kafka-streams.auto.offset.reset=latest %test.kafka-streams.auto.offset.reset=earliest # Uncomment this part to override at runtime default values for the Retry of the RetryDecoratorDelegate -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/maxRetries=${kafka-streams-processor.retry.maxRetries} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/delay=${kafka-streams-processor.retry.delay} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/delayUnit=${kafka-streams-processor.retry.delayUnit} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/maxDuration=${kafka-streams-processor.retry.maxDuration} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/durationUnit=${kafka-streams-processor.retry.durationUnit} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/jitter=${kafka-streams-processor.retry.jitter} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/jitterDelayUnit=${kafka-streams-processor.retry.jitterDelayUnit} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/retryOn=${kafka-streams-processor.retry.retryOn} -io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/abortOn=${kafka-streams-processor.retry.abortOn} -kafka-streams-processor.retry.maxRetries=-1 -kafka-streams-processor.retry.delay=0 -kafka-streams-processor.retry.delayUnit=Millis -kafka-streams-processor.retry.maxDuration=180000 -kafka-streams-processor.retry.durationUnit=Millis -kafka-streams-processor.retry.jitter=200 -kafka-streams-processor.retry.jitterDelayUnit=Millis -kafka-streams-processor.retry.retryOn=io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException -kafka-streams-processor.retry.abortOn= +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/maxRetries=${kafkastreamsprocessor.retry.max-retries} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/delay=${kafkastreamsprocessor.retry.delay} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/delayUnit=${kafkastreamsprocessor.retry.delay-unit} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/maxDuration=${kafkastreamsprocessor.retry.max-duration} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/durationUnit=${kafkastreamsprocessor.retry.duration-unit} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/jitter=${kafkastreamsprocessor.retry.jitter} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/jitterDelayUnit=${kafkastreamsprocessor.retry.jitter-delay-unit} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/retryOn=${kafkastreamsprocessor.retry.retry-on} +io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.RetryDecoratorDelegate/retryableProcess/Retry/abortOn=${kafkastreamsprocessor.retry.abort-on} # Notify Kafka on pod shutdown to react faster to topology changes # Warning: This setting is not public and may disappear. https://issues.apache.org/jira/browse/KAFKA-6995 kafka-streams.internal.leave.group.on.close=true diff --git a/integration-tests/kafka-to-rest/src/main/resources/application.properties b/integration-tests/kafka-to-rest/src/main/resources/application.properties index 2efa75a..d7dcbd3 100644 --- a/integration-tests/kafka-to-rest/src/main/resources/application.properties +++ b/integration-tests/kafka-to-rest/src/main/resources/application.properties @@ -8,7 +8,7 @@ kafka-streams.producer.linger.ms=0 # REST client ping.endpoint/mp-rest/url=http://localhost:9095 # Override max retry default value -kafka-streams-processor.retry.maxRetries=3 +kafkastreamsprocessor.retry.max-retries=3 quarkus.kafka.devservices.topic-partitions.pong-events=1 quarkus.kafka.devservices.topic-partitions.ping-events=1 %test.quarkus.http.test-port=0 diff --git a/integration-tests/protobuf-binding/pom.xml b/integration-tests/protobuf-binding/pom.xml index 3ba7260..ca194b5 100644 --- a/integration-tests/protobuf-binding/pom.xml +++ b/integration-tests/protobuf-binding/pom.xml @@ -79,6 +79,13 @@ + + io.quarkus + quarkus-maven-plugin + + true + + \ No newline at end of file