Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16448 [WIP]: Add Kafka Streams exception handler for exceptions occuring during processing #15973

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d0f6cc7
KAFKA-16448: add new processor for error handling
sebastienviale May 15, 2024
bac5ed1
KAFKA-16448: enum to get the origin of a SerializationException
sebastienviale May 15, 2024
c97cf4c
KAFKA-16448: Add ProcessingExceptionHandler and update ProductionExce…
loicgreffier May 15, 2024
962e2e2
KAFKA-16448: Add ProcessingExceptionHandler impls
loicgreffier May 15, 2024
00e2bf3
KAFKA-16448: Add error handler context impl
loicgreffier May 15, 2024
d99b48f
KAFKA-16448: Catch processing exceptions at Stream Task level
loicgreffier May 15, 2024
5ffca71
KAFKA-16448: Move processing exception catch to processor context imp…
loicgreffier May 15, 2024
a5ce52b
KAFKA-16448: add error handling for deserialisation
sebastienviale May 16, 2024
3a71905
KAFKA-16448: add error handling for deserialisation
sebastienviale May 16, 2024
5f621c2
KAFKA-16448: fix checkstyle
sebastienviale May 16, 2024
2b7a528
KAFKA-16448: Provide rawSourceKey and rawSourceValue in the ErrorHand…
loicgreffier May 17, 2024
76cc4be
KAFKA-16448: add boolean to catch only one time an exception in the p…
sebastienviale May 17, 2024
6906dd7
KAFKA-16448: Fix deprecation in unit tests
loicgreffier May 17, 2024
259f01a
KAFKA-16448: Move raw record out of StampedRecord
loicgreffier May 18, 2024
521a51d
KAFKA-16448: Add tests for ProcessorContextImpl
loicgreffier May 18, 2024
ea13eea
KAFKA-16448: split deserializer key and value to get origin
sebastienviale May 20, 2024
5e3eaad
Merge branch 'KAFKA-16448' of https://github.com/loicgreffier/kafka i…
sebastienviale May 20, 2024
db45b6b
KAFKA-16448: fix TU after classCastException
sebastienviale May 20, 2024
2171fbe
KAFKA-16448: remove deprecated
sebastienviale May 20, 2024
028ba33
KAFKA-16448: Fix indent
loicgreffier May 22, 2024
7462001
Merge branch 'trunk' into KAFKA-16448
loicgreffier May 22, 2024
655f274
KAFKA-16448: Update from trunk
loicgreffier May 22, 2024
81b80f4
KAFKA-16448: Fix bad indent
loicgreffier May 22, 2024
99ae674
KAFKA-16448: Fix bad indent
loicgreffier May 22, 2024
a22f983
KAFKA-16448: Fix bad indent
loicgreffier May 22, 2024
50fa22d
KAFKA-16448: Handle processing exception on punctuate
loicgreffier May 24, 2024
8f95697
KAFKA-16448
sebastienviale May 24, 2024
9c84ceb
KAFKA-16448: Simplify unit tests and handle metrics
loicgreffier May 26, 2024
d3e2c58
KAFKA-16448: Add metrics in tests
loicgreffier May 26, 2024
c388a06
KAFKA-16448: Move topology test driver to integration tests
loicgreffier May 26, 2024
f011b32
KAFKA-16448: Pass source node id in deserialization handler
loicgreffier May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingLogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
Expand Down Expand Up @@ -553,6 +555,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";

/** {@code processing.exception.handler} */
@SuppressWarnings("WeakerAccess")
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.";

/** {@code default.dsl.store} */
@Deprecated
@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -926,6 +933,11 @@ public class StreamsConfig extends AbstractConfig {
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
.define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
ProcessingLogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
FailOnInvalidTimestamp.class.getName(),
Expand Down Expand Up @@ -1915,6 +1927,11 @@ public ProductionExceptionHandler defaultProductionExceptionHandler() {
return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
}

@SuppressWarnings("WeakerAccess")
public ProcessingExceptionHandler processingExceptionHandler() {
return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
}

/**
* Override any client properties in the original configs with overrides
*
Expand Down
20 changes: 17 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;

import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
Expand All @@ -57,6 +58,7 @@
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;

/**
Expand Down Expand Up @@ -135,6 +137,7 @@ public class TopologyConfig extends AbstractConfig {
public final Class<?> dslStoreSuppliers;
public final Supplier<TimestampExtractor> timestampExtractorSupplier;
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandler;

public TopologyConfig(final StreamsConfig globalAppConfigs) {
this(null, globalAppConfigs, new Properties());
Expand Down Expand Up @@ -225,6 +228,13 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
}

if (isTopologyOverride(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) {
processingExceptionHandler = () -> getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
log.info("Topology {} is overriding {} to {}", topologyName, PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG));
} else {
processingExceptionHandler = () -> globalAppConfigs.getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
}

if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) {
storeType = getString(DEFAULT_DSL_STORE_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DSL_STORE_CONFIG, storeType);
Expand Down Expand Up @@ -280,7 +290,8 @@ public TaskConfig getTaskConfig() {
maxBufferedSize,
timestampExtractorSupplier.get(),
deserializationExceptionHandlerSupplier.get(),
eosEnabled
eosEnabled,
processingExceptionHandler.get()
);
}

Expand All @@ -291,19 +302,22 @@ public static class TaskConfig {
public final TimestampExtractor timestampExtractor;
public final DeserializationExceptionHandler deserializationExceptionHandler;
public final boolean eosEnabled;
public final ProcessingExceptionHandler processingExceptionHandler;

private TaskConfig(final long maxTaskIdleMs,
final long taskTimeoutMs,
final int maxBufferedSize,
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
final boolean eosEnabled) {
final boolean eosEnabled,
final ProcessingExceptionHandler processingExceptionHandler) {
this.maxTaskIdleMs = maxTaskIdleMs;
this.taskTimeoutMs = taskTimeoutMs;
this.maxBufferedSize = maxBufferedSize;
this.timestampExtractor = timestampExtractor;
this.deserializationExceptionHandler = deserializationExceptionHandler;
this.eosEnabled = eosEnabled;
this.processingExceptionHandler = processingExceptionHandler;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.processor.ErrorHandlerContext;

/**
* {@code ProductionExceptionHandler} that always instructs streams to fail when an exception
* happens while attempting to produce result records.
*/
public class DefaultProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return ProductionExceptionHandlerResponse.FAIL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package org.apache.kafka.streams.errors;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.processor.ErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ErrorHandlerContextImpl;

/**
* Interface that specifies how an exception from source node deserialization
Expand All @@ -37,11 +38,28 @@ public interface DeserializationExceptionHandler extends Configurable {
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
* @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)}
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception);
@Deprecated
default DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

throw new UnsupportedOperationException();
}
/**
* Inspect a record and the exception received.
*
* @param context error handler context
* @param record record that failed deserialization
* @param exception the actual exception
*/
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

return handle(((ErrorHandlerContextImpl) context).convertToProcessorContext(), record, exception);
}

/**
* Enumeration that describes the response from the exception handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +33,7 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);

@Override
@Deprecated
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
Expand All @@ -44,6 +46,19 @@ public DeserializationHandlerResponse handle(final ProcessorContext context,
return DeserializationHandlerResponse.CONTINUE;
}

@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.warn("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);

return DeserializationHandlerResponse.CONTINUE;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.errors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +33,7 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);

@Override
@Deprecated
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
Expand All @@ -44,6 +46,18 @@ public DeserializationHandlerResponse handle(final ProcessorContext context,
return DeserializationHandlerResponse.FAIL;
}

public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

log.error("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);

return DeserializationHandlerResponse.FAIL;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.processor.ErrorHandlerContext;
import org.apache.kafka.streams.processor.api.Record;

/**
* An interface that allows user code to inspect a record that has failed processing
*/
public interface ProcessingExceptionHandler extends Configurable {
/**
* Inspect a record and the exception received
*
* @param context processing context metadata
* @param record record where the exception occurred
* @param exception the actual exception
*/
ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception);

enum ProcessingHandlerResponse {
/* continue with processing */
CONTINUE(1, "CONTINUE"),
/* fail the processing and stop */
FAIL(2, "FAIL");

/**
* the permanent and immutable name of processing exception response
*/
public final String name;

/**
* the permanent and immutable id of processing exception response
*/
public final int id;

ProcessingHandlerResponse(final int id, final String name) {
this.id = id;
this.name = name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import org.apache.kafka.streams.processor.ErrorHandlerContext;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Processing exception handler that logs a processing exception and then
* signals the processing pipeline to continue processing more records.
*/
public class ProcessingLogAndContinueExceptionHandler implements ProcessingExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(ProcessingLogAndContinueExceptionHandler.class);

@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
log.warn("Exception caught during message processing, " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);

return ProcessingHandlerResponse.CONTINUE;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
}
}
Loading