Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(impl): Remove useless errorStrategy configuration entry #131

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Set;

import jakarta.annotation.Priority;
import jakarta.decorator.Delegate;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

Expand All @@ -44,7 +43,6 @@
import io.quarkiverse.kafkastreamsprocessor.impl.errors.ErrorHandlingStrategy;
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
import io.quarkiverse.kafkastreamsprocessor.spi.SinkToTopicMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

Expand All @@ -54,7 +52,7 @@
* Uses a dead-letter sink from the topology, rather than a raw producer, to benefit from the same KStreams guarantees
* (at least once / exactly once).
*/
//@Decorator
// @Decorator
@Priority(ProcessorDecoratorPriorities.DLQ)
@Dependent
public class DlqDecorator extends AbstractProcessorDecorator {
Expand Down Expand Up @@ -105,19 +103,16 @@ public class DlqDecorator extends AbstractProcessorDecorator {
* the enricher of metadata before sending message to the dead letter queue
* @param metrics
* container of all metrics of the framework
* @param kStreamsProcessorConfig
* It contains the configuration for the error strategy configuration property value (default
* {@link ErrorHandlingStrategy#CONTINUE})
* and the configuration Kafka topic to use for dead letter queue (optional)
* @param errorHandlingStrategy
* tells whether DLQ is activated
*/
@Inject
public DlqDecorator(
SinkToTopicMappingBuilder sinkToTopicMappingBuilder, DlqMetadataHandler dlqMetadataHandler,
KafkaStreamsProcessorMetrics metrics,
KStreamsProcessorConfig kStreamsProcessorConfig) { // NOSONAR Optional with microprofile-config
ErrorHandlingStrategy errorHandlingStrategy) { // NOSONAR Optional with microprofile-config
this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics,
ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
kStreamsProcessorConfig.dlq().topic()));
errorHandlingStrategy.shouldSendToDlq());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
*/
package io.quarkiverse.kafkastreamsprocessor.impl.errors;

import java.util.Optional;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;

/**
* Constants related to Kafka error handling
*/
@ApplicationScoped
public class ErrorHandlingStrategy {
/**
* Configuration property to check for the Kafka error handling strategy
Expand All @@ -45,28 +51,30 @@ public class ErrorHandlingStrategy {
*/
public static final String FAIL = "fail";

private final String errorStrategy;

private final KStreamsProcessorConfig kStreamsProcessorConfig;

@Inject
public ErrorHandlingStrategy(@ConfigProperty(name = CONFIG_PROPERTY, defaultValue = CONTINUE) String errorStrategy,
KStreamsProcessorConfig config) {
this.errorStrategy = errorStrategy;
this.kStreamsProcessorConfig = config;
}

/**
* Tells whether microservice-specific DLQ is activated and has a dedicated topic
*
* @param errorStrategy
* the error strategy chosen by an application between <code>continue</code>, <code>dead-letter-queue</code>
* and <code>fail</code>, configured with <code>kafka.error.strategy</code>
* @param dlqTopic
* the optional topic that is mandatory if the chosen error strategy is <code>dead-letter-queue</code>
* @return whether DLQ mechanism is activated by the configuration or not
*/
public static boolean shouldSendToDlq(String errorStrategy, Optional<String> dlqTopic) {
if (ErrorHandlingStrategy.DEAD_LETTER_QUEUE.equals(errorStrategy)) {
if (dlqTopic.isPresent()) {
public boolean shouldSendToDlq() {
if (DEAD_LETTER_QUEUE.equals(errorStrategy)) {
if (kStreamsProcessorConfig.dlq().topic().isPresent()) {
return true;
} else {
throw new IllegalStateException("DLQ strategy enabled but dlq.topic configuration property is missing");
}
}
return false;
}

private ErrorHandlingStrategy() {
// Prevent instantiation of utility class
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class LogAndSendToDlqExceptionHandlerDelegate implements DeserializationE
*/
private final KStreamsProcessorConfig kStreamsProcessorConfig;

private final ErrorHandlingStrategy errorHandlingStrategy;

/** True if the dead letter queue strategy is selected and properly configured */
boolean sendToDlq;

Expand All @@ -91,11 +93,13 @@ public class LogAndSendToDlqExceptionHandlerDelegate implements DeserializationE
public LogAndSendToDlqExceptionHandlerDelegate(KafkaClientSupplier kafkaClientSupplier,
KafkaStreamsProcessorMetrics metrics,
DlqMetadataHandler dlqMetadataHandler,
KStreamsProcessorConfig kStreamsProcessorConfig) {
KStreamsProcessorConfig kStreamsProcessorConfig,
ErrorHandlingStrategy errorHandlingStrategy) {
this.clientSupplier = kafkaClientSupplier;
this.metrics = metrics;
this.dlqMetadataHandler = dlqMetadataHandler;
this.kStreamsProcessorConfig = kStreamsProcessorConfig;
this.errorHandlingStrategy = errorHandlingStrategy;
}

/**
Expand Down Expand Up @@ -147,8 +151,7 @@ private void sendToDlq(final ProcessorContext context, final ConsumerRecord<byte
@Override
public void configure(final Map<String, ?> configs) {
// Resolve the DLQ strategy once to fail fast in case of misconfiguration
sendToDlq = ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
kStreamsProcessorConfig.dlq().topic());
sendToDlq = errorHandlingStrategy.shouldSendToDlq();
if (sendToDlq) {
Map<String, Object> dlqConfigMap = new HashMap<>(configs);
dlqConfigMap.put(KafkaClientSupplierDecorator.DLQ_PRODUCER, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,60 @@
*/
package io.quarkiverse.kafkastreamsprocessor.impl.errors;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

import java.util.Optional;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import io.quarkiverse.kafkastreamsprocessor.spi.properties.DlqConfig;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import io.quarkus.test.Mock;

@ExtendWith({ MockitoExtension.class })
class ErrorHandlingStrategyTest {

@Mock
KStreamsProcessorConfig extensionConfiguration;

@Mock
DlqConfig dlqConfig;

@BeforeEach
void setUp() {
when(extensionConfiguration.dlq()).thenReturn(dlqConfig);
}

@Test
void shouldSendToDlqIfRequested() {
when(dlqConfig.topic()).thenReturn(Optional.of("aTopicName"));

assertTrue(
ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, Optional.of("aTopicName")));
new ErrorHandlingStrategy(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, extensionConfiguration).shouldSendToDlq());
}

@Test
void shouldThrowIfDlqRequestedButNoTopic() {
Optional<String> noTopic = Optional.empty();
when(dlqConfig.topic()).thenReturn(noTopic);

assertThrows(IllegalStateException.class,
() -> ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, noTopic));
() -> new ErrorHandlingStrategy(ErrorHandlingStrategy.DEAD_LETTER_QUEUE, extensionConfiguration)
.shouldSendToDlq());
}

@Test
void shouldNotSendToDlqWithOtherStrategy() {
assertFalse(
ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.FAIL, Optional.of("aTopicName")));
assertFalse(
ErrorHandlingStrategy.shouldSendToDlq(ErrorHandlingStrategy.CONTINUE, Optional.of("aTopicName")));
when(dlqConfig.topic()).thenReturn(Optional.of("aTopicName"));

assertTrue(
new ErrorHandlingStrategy(ErrorHandlingStrategy.FAIL, extensionConfiguration).shouldSendToDlq());
assertTrue(
new ErrorHandlingStrategy(ErrorHandlingStrategy.CONTINUE, extensionConfiguration).shouldSendToDlq());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -70,6 +69,9 @@ class LogAndSendToDlqExceptionHandlerDelegateTest {
@Mock
private DlqConfig dlqConfig;

@Mock
private ErrorHandlingStrategy errorHandlingStrategy;

@Mock
private ProcessorContext context;

Expand Down Expand Up @@ -109,14 +111,14 @@ void shouldNotBlockAndSendToDlqIfPossible() {
when(record.partition()).thenReturn(PARTITION);
when(record.headers()).thenReturn(headers);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
when(errorHandlingStrategy.shouldSendToDlq()).thenReturn(true);
RecordHeaders headersWithMetadata = new RecordHeaders();
when(metadataHandler.withMetadata(any(Headers.class), anyString(), anyInt(), any(Exception.class)))
.thenReturn(headersWithMetadata);
when(kafkaClientSupplier.getProducer(any())).thenReturn(dlqProducerMock);

handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig, errorHandlingStrategy);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
Expand All @@ -131,12 +133,12 @@ void shouldNotBlockAndSendToDlqIfPossible() {

@Test
void shouldOnlyContinueIfDefaultErrorStrategy() {
when(kStreamsProcessorConfig.errorStrategy()).thenReturn("continue");
when(errorHandlingStrategy.shouldSendToDlq()).thenReturn(false);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(dlqConfig.topic()).thenReturn(Optional.of(DLQ_TOPIC));
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);
kStreamsProcessorConfig, errorHandlingStrategy);
handler.configure(Collections.emptyMap());

DeserializationHandlerResponse response = handler.handle(context, record, exception);
Expand All @@ -146,14 +148,4 @@ void shouldOnlyContinueIfDefaultErrorStrategy() {
assertThat(metrics.processorErrorCounter().count(), closeTo(1d, 0.01d));
}

@Test
void shouldFailFastIfDlqStrategyWithoutTopic() {
when(dlqConfig.topic()).thenReturn(Optional.empty());
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(kStreamsProcessorConfig.errorStrategy()).thenReturn(ErrorHandlingStrategy.DEAD_LETTER_QUEUE);
handler = new LogAndSendToDlqExceptionHandlerDelegate(kafkaClientSupplier, metrics, metadataHandler,
kStreamsProcessorConfig);

assertThrows(IllegalStateException.class, () -> handler.configure(Collections.emptyMap()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public interface KStreamsProcessorConfig {

/**
* Kafka error handling strategy
*
* @deprecated Has actually no effect whatsoever, as it is rather the kafka config entry
* <code>kafka.error.strategy</code> that activates in KafkaStreams the DLQ mechanism.
*/
@WithDefault("continue")
@Deprecated(forRemoval = true, since = "3.0.2")
String errorStrategy();
edeweerd1A marked this conversation as resolved.
Show resolved Hide resolved

/**
Expand Down
Loading