Skip to content

Commit

Permalink
Merge pull request #8 from flazarus1A/feature/Add-retry-configmapping
Browse files Browse the repository at this point in the history
feat(config): Add Retry configmapping object
  • Loading branch information
flazarus1A authored Jan 17, 2024
2 parents 9e7a15f + c8e3b42 commit 7283abc
Show file tree
Hide file tree
Showing 14 changed files with 421 additions and 50 deletions.
12 changes: 12 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ We use this information to acknowledge your contributions in release announcemen

If you're considering anything more than correcting a typo or fixing a minor bug, please discuss it by [creating an issue on our issue tracker](https://github.com/quarkiverse/quarkus-kafka-streams-processor/issues) before submitting a pull request. We're happy to provide guidance but please spend an hour or two researching the subject on your own including searching the forums for prior discussions.

### Native build support

This extension is compatible with native compilation.
You can validate your changes early following Quarkus' documentation [here](https://quarkus.io/guides/building-native-image), using either a builder image or a local installation of GraalVM.

```sh
# With GraalVM installed
mvn install -Dnative
# With a builder image
mvn install -Dnative -Dquarkus.native.container-build=true
```

### Code reviews

All submissions, need to be reviewed by at least one committer before
Expand Down
9 changes: 9 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config-core</artifactId>
</dependency>
<!-- for javadoc references only -->
<dependency>
<groupId>org.eclipse.microprofile.fault-tolerance</groupId>
<artifactId>microprofile-fault-tolerance-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>

<!-- Test -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
*/
package io.quarkiverse.kafkastreamsprocessor.api.exception;

import io.quarkus.runtime.annotations.RegisterForReflection;

/**
* Generic {@link RuntimeException} use by the RetryDecorator to specify that a message should be processed again.
*/
@RegisterForReflection
public class RetryableException extends RuntimeException {
/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.quarkiverse.kafkastreamsprocessor.api.properties;

import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;

Expand Down Expand Up @@ -50,4 +51,10 @@ public interface KStreamsProcessorConfig {
*/
@WithDefault("continue")
String errorStrategy();

/**
* All configuration related to the RetryDecorator and reprocessing a record when a {@link RetryableException} has
* been caught
*/
RetryConfig retry();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*-
* #%L
* Quarkus Kafka Streams Processor
* %%
* Copyright (C) 2024 Amadeus s.a.s.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.quarkiverse.kafkastreamsprocessor.api.properties;

import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;

import org.eclipse.microprofile.faulttolerance.Retry;

import io.smallrye.config.WithDefault;

public interface RetryConfig {

/**
* Max number of retries.
*
* @see Retry#maxRetries()
*/
@WithDefault("-1")
int maxRetries();

/**
* The delay between retries.
*
* @see Retry#delay()
*/
@WithDefault("0")
long delay();

/**
* The unit for {@link #delay()}. Default milliseconds.
*
* @see Retry#delayUnit()
*/
@WithDefault("MILLIS")
ChronoUnit delayUnit();

/**
* The max duration.
*
* @see Retry#maxDuration()
*/
@WithDefault("180000")
long maxDuration();

/**
* The duration unit for {@link #maxDuration()}.
* <p>
* Milliseconds by default.
* </p>
*
* @see Retry#durationUnit()
*/
@WithDefault("MILLIS")
ChronoUnit durationUnit();

/**
* Jitter value to randomly vary retry delays for.
*
* @see Retry#jitter()
*/
@WithDefault("200")
long jitter();

/**
* The delay unit for {@link #jitter()}. Default is milliseconds.
*
* @see Retry#jitterDelayUnit()
*/
@WithDefault("MILLIS")
ChronoUnit jitterDelayUnit();

/**
* The list of exception types that should trigger a retry.
* <p>
* Default is the provided {@link io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException}.
* </p>
*
* @see Retry#retryOn()
*/
@WithDefault("io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException")
List<Class<? extends Throwable>> retryOn();

/**
* The list of exception types that should <i>not</i> trigger a retry.
* <p>
* Default is empty list
* </p>
*
* @see Retry#abortOn()
*/
@WithDefault("")
Optional<List<Class<? extends Throwable>>> abortOn();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package io.quarkiverse.kafkastreamsprocessor.kafka.streams.deployment;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
import org.jboss.jandex.DotName;
Expand Down Expand Up @@ -55,4 +57,20 @@ public void configureNativeExecutable(CombinedIndexBuildItem combinedIndex,
}
}
}

@BuildStep
public void registerRetryExceptions(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
Config config = ConfigProvider.getConfig();

config.getOptionalValue("kafkastreamsprocessor.retry.retry-on", String[].class)
.ifPresent(retryExceptions -> reflectiveClass.produce(ReflectiveClassBuildItem.builder(retryExceptions)
.methods(false)
.fields(false)
.build()));
config.getOptionalValue("kafkastreamsprocessor.retry.abort-on", String[].class)
.ifPresent(abortExceptions -> reflectiveClass.produce(ReflectiveClassBuildItem.builder(abortExceptions)
.methods(false)
.fields(false)
.build()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.kafkastreamsprocessor.api.Processor;
import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException;
import io.quarkus.builder.BuildChainBuilder;
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
Expand Down Expand Up @@ -71,6 +71,8 @@ private static void checkProperClassesAreRegistered() {
.collect(Collectors.toList());

assertThat(allRegisteredClasses, hasItem(MyProcessor.class.getName()));
// Default retryOn exception for Fault Tolerance
assertThat(allRegisteredClasses, hasItem(RetryableException.class.getName()));
}

@Test
Expand All @@ -79,9 +81,4 @@ void shouldRegisterTypesForReflection() {
assertNull(registeredClasses);
}

@Processor
static class MyProcessor {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*-
* #%L
* Quarkus Kafka Streams Processor
* %%
* Copyright (C) 2024 Amadeus s.a.s.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.quarkiverse.kafkastreamsprocessor.kafka.streams.test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.kafkastreamsprocessor.api.exception.RetryableException;
import io.quarkus.builder.BuildChainBuilder;
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.test.QuarkusUnitTest;

public class KafkaStreamsProcessorProcessorWithRetryTest {

private static volatile List<ReflectiveClassBuildItem> registeredClasses;

@RegisterExtension
static QuarkusUnitTest runner = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClass(io.quarkiverse.kafkastreamsprocessor.kafka.streams.test.MyProcessor.class))
.overrideConfigKey("kafkastreamsprocessor.input.topics", "ping-events")
.overrideConfigKey("kafkastreamsprocessor.output.topic", "pong-events")
.overrideConfigKey("quarkus.kafka-streams.topics", "ping-events,pong-events")
.overrideConfigKey("kafkastreamsprocessor.retry.retry-on",
"io.quarkiverse.kafkastreamsprocessor.kafka.streams.test.KafkaStreamsProcessorProcessorWithRetryTest$RetryException")
.overrideConfigKey("kafkastreamsprocessor.retry.abort-on",
"io.quarkiverse.kafkastreamsprocessor.kafka.streams.test.KafkaStreamsProcessorProcessorWithRetryTest$AbortException")
.addBuildChainCustomizer(buildCustomizer());

private static Consumer<BuildChainBuilder> buildCustomizer() {
return chainBuilder -> chainBuilder.addBuildStep(
context -> {
registeredClasses = context.consumeMulti(ReflectiveClassBuildItem.class);
checkProperClassesAreRegistered();
})
.consumes(ReflectiveClassBuildItem.class)
.produces(GeneratedResourceBuildItem.class)
.build();
}

private static void checkProperClassesAreRegistered() {
assertNotNull(registeredClasses);

List<String> allRegisteredClasses = registeredClasses.stream()
.flatMap(c -> c.getClassNames().stream())
.collect(Collectors.toList());

assertThat(allRegisteredClasses, hasItem(MyProcessor.class.getName()));
// Default retryOn exception for Fault Tolerance
assertThat(allRegisteredClasses, hasItem(RetryableException.class.getName()));
// Explicit retryOn & abortOn exceptions
assertThat(allRegisteredClasses, hasItem(RetryException.class.getName()));
assertThat(allRegisteredClasses, hasItem(AbortException.class.getName()));
}

@Test
void shouldRegisterTypesForReflection() {
// if it gets there, it succeeded
assertNull(registeredClasses);
}

public static class RetryException {

}

public static class AbortException {

}

}
Loading

0 comments on commit 7283abc

Please sign in to comment.