diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java index f3a6eaf..e7dc2b2 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java @@ -23,7 +23,6 @@ import java.util.Set; import jakarta.annotation.Priority; -import jakarta.decorator.Delegate; import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; @@ -44,7 +43,6 @@ import io.quarkiverse.kafkastreamsprocessor.impl.errors.ErrorHandlingStrategy; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics; import io.quarkiverse.kafkastreamsprocessor.spi.SinkToTopicMappingBuilder; -import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; @@ -54,7 +52,7 @@ * Uses a dead-letter sink from the topology, rather than a raw producer, to benefit from the same KStreams guarantees * (at least once / exactly once). */ -//@Decorator +// @Decorator @Priority(ProcessorDecoratorPriorities.DLQ) @Dependent public class DlqDecorator extends AbstractProcessorDecorator { @@ -105,19 +103,16 @@ public class DlqDecorator extends AbstractProcessorDecorator { * the enricher of metadata before sending message to the dead letter queue * @param metrics * container of all metrics of the framework - * @param kStreamsProcessorConfig - * It contains the configuration for the error strategy configuration property value (default - * {@link ErrorHandlingStrategy#CONTINUE}) - * and the configuration Kafka topic to use for dead letter queue (optional) + * @param errorHandlingStrategy + * tells whether DLQ is activated */ @Inject public DlqDecorator( SinkToTopicMappingBuilder sinkToTopicMappingBuilder, DlqMetadataHandler dlqMetadataHandler, KafkaStreamsProcessorMetrics metrics, - KStreamsProcessorConfig kStreamsProcessorConfig) { // NOSONAR Optional with microprofile-config + ErrorHandlingStrategy errorHandlingStrategy) { // NOSONAR Optional with microprofile-config this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics, - ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(), - kStreamsProcessorConfig.dlq().topic())); + errorHandlingStrategy.shouldSendToDlq()); } /** diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategy.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategy.java index 68b2978..11854bb 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategy.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategy.java @@ -19,11 +19,17 @@ */ package io.quarkiverse.kafkastreamsprocessor.impl.errors; -import java.util.Optional; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; /** * Constants related to Kafka error handling */ +@ApplicationScoped public class ErrorHandlingStrategy { /** * Configuration property to check for the Kafka error handling strategy @@ -45,19 +51,25 @@ public class ErrorHandlingStrategy { */ public static final String FAIL = "fail"; + private final String errorStrategy; + + private final KStreamsProcessorConfig kStreamsProcessorConfig; + + @Inject + public ErrorHandlingStrategy(@ConfigProperty(name = CONFIG_PROPERTY, defaultValue = CONTINUE) String errorStrategy, + KStreamsProcessorConfig config) { + this.errorStrategy = errorStrategy; + this.kStreamsProcessorConfig = config; + } + /** * Tells whether microservice-specific DLQ is activated and has a dedicated topic * - * @param errorStrategy - * the error strategy chosen by an application between continue, dead-letter-queue - * and fail, configured with kafka.error.strategy - * @param dlqTopic - * the optional topic that is mandatory if the chosen error strategy is dead-letter-queue * @return whether DLQ mechanism is activated by the configuration or not */ - public static boolean shouldSendToDlq(String errorStrategy, Optional dlqTopic) { - if (ErrorHandlingStrategy.DEAD_LETTER_QUEUE.equals(errorStrategy)) { - if (dlqTopic.isPresent()) { + public boolean shouldSendToDlq() { + if (DEAD_LETTER_QUEUE.equals(errorStrategy)) { + if (kStreamsProcessorConfig.dlq().topic().isPresent()) { return true; } else { throw new IllegalStateException("DLQ strategy enabled but dlq.topic configuration property is missing"); @@ -65,8 +77,4 @@ public static boolean shouldSendToDlq(String errorStrategy, Optional dlq } return false; } - - private ErrorHandlingStrategy() { - // Prevent instantiation of utility class - } } diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java index 8d55f1d..fcd8770 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegate.java @@ -72,6 +72,8 @@ public class LogAndSendToDlqExceptionHandlerDelegate implements DeserializationE */ private final KStreamsProcessorConfig kStreamsProcessorConfig; + private final ErrorHandlingStrategy errorHandlingStrategy; + /** True if the dead letter queue strategy is selected and properly configured */ boolean sendToDlq; @@ -91,11 +93,13 @@ public class LogAndSendToDlqExceptionHandlerDelegate implements DeserializationE public LogAndSendToDlqExceptionHandlerDelegate(KafkaClientSupplier kafkaClientSupplier, KafkaStreamsProcessorMetrics metrics, DlqMetadataHandler dlqMetadataHandler, - KStreamsProcessorConfig kStreamsProcessorConfig) { + KStreamsProcessorConfig kStreamsProcessorConfig, + ErrorHandlingStrategy errorHandlingStrategy) { this.clientSupplier = kafkaClientSupplier; this.metrics = metrics; this.dlqMetadataHandler = dlqMetadataHandler; this.kStreamsProcessorConfig = kStreamsProcessorConfig; + this.errorHandlingStrategy = errorHandlingStrategy; } /** @@ -147,8 +151,7 @@ private void sendToDlq(final ProcessorContext context, final ConsumerRecord configs) { // Resolve the DLQ strategy once to fail fast in case of misconfiguration - sendToDlq = ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(), - kStreamsProcessorConfig.dlq().topic()); + sendToDlq = errorHandlingStrategy.shouldSendToDlq(); if (sendToDlq) { Map dlqConfigMap = new HashMap<>(configs); dlqConfigMap.put(KafkaClientSupplierDecorator.DLQ_PRODUCER, true); diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyTest.java index a02a772..530759b 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/ErrorHandlingStrategyTest.java @@ -19,33 +19,60 @@ */ package io.quarkiverse.kafkastreamsprocessor.impl.errors; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.DlqConfig; +import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; +import io.quarkus.test.Mock; + +@ExtendWith({ MockitoExtension.class }) class ErrorHandlingStrategyTest { + + @Mock + KStreamsProcessorConfig extensionConfiguration; + + @Mock + DlqConfig dlqConfig; + + @BeforeEach + void setUp() { + when(extensionConfiguration.dlq()).thenReturn(dlqConfig); + } + @Test void shouldSendToDlqIfRequested() { + when(dlqConfig.topic()).thenReturn(Optional.of("aTopicName")); + assertTrue( - ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, Optional.of("aTopicName"))); + new ErrorHandlingStrategy(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, extensionConfiguration).shouldSendToDlq()); } @Test void shouldThrowIfDlqRequestedButNoTopic() { Optional noTopic = Optional.empty(); + when(dlqConfig.topic()).thenReturn(noTopic); + assertThrows(IllegalStateException.class, - () -> ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, noTopic)); + () -> new ErrorHandlingStrategy(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, extensionConfiguration) + .shouldSendToDlq()); } @Test void shouldNotSendToDlqWithOtherStrategy() { - assertFalse( - ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.FAIL, Optional.of("aTopicName"))); - assertFalse( - ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.CONTINUE, Optional.of("aTopicName"))); + when(dlqConfig.topic()).thenReturn(Optional.of("aTopicName")); + + assertTrue( + new ErrorHandlingStrategy(ErrorHandlingStrategy.FAIL, extensionConfiguration).shouldSendToDlq()); + assertTrue( + new ErrorHandlingStrategy(ErrorHandlingStrategy.CONTINUE, extensionConfiguration).shouldSendToDlq()); } } diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java index e0a8c5b..215e429 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.closeTo; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -70,6 +69,9 @@ class LogAndSendToDlqExceptionHandlerDelegateTest { @Mock private DlqConfig dlqConfig; + @Mock + private ErrorHandlingStrategy errorHandlingStrategy; + @Mock private ProcessorContext context; @@ -109,14 +111,14 @@ void shouldNotBlockAndSendToDlqIfPossible() { when(record.partition()).thenReturn(PARTITION); when(record.headers()).thenReturn(headers); when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC)); - when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE); + when(errorHandlingStrategy.shouldSendToDlq()).thenReturn(true); RecordHeaders headersWithMetadata = new RecordHeaders(); when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class))) .thenReturn(headersWithMetadata); when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock); handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler, - kStreamsProcessorConfig); + kStreamsProcessorConfig, errorHandlingStrategy); handler.configure(Collections.emptyMap()); DeserializationHandlerResponse response = handler.handle(context, record, exception); @@ -131,12 +133,12 @@ void shouldNotBlockAndSendToDlqIfPossible() { @Test void shouldOnlyContinueIfDefaultErrorStrategy() { - when(kStreamsProcessorConfig.errorStrategy()).thenReturn("continue"); + when(errorHandlingStrategy.shouldSendToDlq()).thenReturn(false); when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC)); when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig); when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC)); handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler, - kStreamsProcessorConfig); + kStreamsProcessorConfig, errorHandlingStrategy); handler.configure(Collections.emptyMap()); DeserializationHandlerResponse response = handler.handle(context, record, exception); @@ -146,14 +148,4 @@ void shouldOnlyContinueIfDefaultErrorStrategy() { assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d)); } - @Test - void shouldFailFastIfDlqStrategyWithoutTopic() { - when(dlqConfig.topic()).thenReturn(Optional.empty()); - when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig); - when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE); - handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler, - kStreamsProcessorConfig); - - assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap())); - } } diff --git a/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/KStreamsProcessorConfig.java b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/KStreamsProcessorConfig.java index a491861..1055460 100644 --- a/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/KStreamsProcessorConfig.java +++ b/spi/src/main/java/io/quarkiverse/kafkastreamsprocessor/spi/properties/KStreamsProcessorConfig.java @@ -47,8 +47,12 @@ public interface KStreamsProcessorConfig { /** * Kafka error handling strategy + * + * @deprecated Has actually no effect whatsoever, as it is rather the kafka config entry + * kafka.error.strategy that activates in KafkaStreams the DLQ mechanism. */ @WithDefault("continue") + @Deprecated(forRemoval = true, since = "3.0.2") String errorStrategy(); /**