diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 025a8a5b393ad..7b8943af1e5b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -39,6 +39,8 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.StreamsConfigUtils; @@ -554,6 +556,11 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface."; + /** {@code processing.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler"; + public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface."; + /** {@code default.dsl.store} */ @Deprecated @SuppressWarnings("WeakerAccess") @@ -931,6 +938,11 @@ public class StreamsConfig extends AbstractConfig { DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) + .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + LogAndFailProcessingExceptionHandler.class.getName(), + Importance.MEDIUM, + PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, FailOnInvalidTimestamp.class.getName(), @@ -1925,6 +1937,10 @@ public ProductionExceptionHandler defaultProductionExceptionHandler() { return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); } + public ProcessingExceptionHandler processingExceptionHandler() { + return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); + } + /** * Override any client properties in the original configs with overrides * diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 8c89132ae2f9c..28430a27e5d99 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -1586,6 +1586,33 @@ public void shouldDisableMetricCollectionOnMainConsumerOnly() { ); } + @Test + public void shouldGetDefaultValueProcessingExceptionHandler() { + final StreamsConfig streamsConfig = new StreamsConfig(props); + + assertEquals("org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName()); + } + + @Test + public void shouldOverrideDefaultProcessingExceptionHandler() { + props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler"); + final StreamsConfig streamsConfig = new StreamsConfig(props); + + assertEquals("org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName()); + } + + @Test + public void testInvalidProcessingExceptionHandler() { + props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler"); + final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + + assertThat( + exception.getMessage(), + containsString("Invalid value org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler " + + "for configuration processing.exception.handler: Class org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler could not be found.") + ); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) {