Skip to content

Commit

Permalink
KAFKA-16448: Add ProcessingExceptionHandler in Streams configuration
Browse files Browse the repository at this point in the history
Co-authored-by: Dabz <[email protected]>
Co-authored-by: sebastienviale <[email protected]>
  • Loading branch information
3 people committed Jun 7, 2024
1 parent e41e54d commit 7764007
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
16 changes: 16 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
Expand Down Expand Up @@ -554,6 +556,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";

/** {@code processing.exception.handler} */
@SuppressWarnings("WeakerAccess")
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.";

/** {@code default.dsl.store} */
@Deprecated
@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -931,6 +938,11 @@ public class StreamsConfig extends AbstractConfig {
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
.define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailProcessingExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
FailOnInvalidTimestamp.class.getName(),
Expand Down Expand Up @@ -1925,6 +1937,10 @@ public ProductionExceptionHandler defaultProductionExceptionHandler() {
return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
}

public ProcessingExceptionHandler processingExceptionHandler() {
return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
}

/**
* Override any client properties in the original configs with overrides
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,33 @@ public void shouldDisableMetricCollectionOnMainConsumerOnly() {
);
}

@Test
public void shouldGetDefaultValueProcessingExceptionHandler() {
final StreamsConfig streamsConfig = new StreamsConfig(props);

assertEquals("org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName());
}

@Test
public void shouldOverrideDefaultProcessingExceptionHandler() {
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler");
final StreamsConfig streamsConfig = new StreamsConfig(props);

assertEquals("org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName());
}

@Test
public void testInvalidProcessingExceptionHandler() {
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler");
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));

assertThat(
exception.getMessage(),
containsString("Invalid value org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler " +
"for configuration processing.exception.handler: Class org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler could not be found.")
);
}

static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
Expand Down

0 comments on commit 7764007

Please sign in to comment.