Skip to content

Commit

Permalink
KAFKA-16448: Add ProcessingExceptionHandler interface and implementat…
Browse files Browse the repository at this point in the history
…ions (apache#16090)

This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.

This PR brings ProcessingExceptionHandler interface and default implementations.

Co-authored-by: Dabz <[email protected]>
Co-authored-by: sebastienviale <[email protected]>

Reviewer: Bruno Cadonna <[email protected]>
  • Loading branch information
loicgreffier authored May 29, 2024
1 parent b73f479 commit 8d11d95
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;

/**
* This interface allows user code to inspect the context of a record that has failed processing.
*/
public interface ErrorHandlerContext {
/**
* Return the topic name of the current input record; could be {@code null} if it is not
* available.
*
* <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, the record won't have an associated topic.
* Another example is
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid topic name, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
*
* @return the topic name
*/
String topic();

/**
* Return the partition ID of the current input record; could be {@code -1} if it is not
* available.
*
* <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, the record won't have an associated partition ID.
* Another example is
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid partition ID, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
*
* @return the partition ID
*/
int partition();

/**
* Return the offset of the current input record; could be {@code -1} if it is not
* available.
*
* <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, the record won't have an associated offset.
* Another example is
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid offset, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
*
* @return the offset
*/
long offset();

/**
* Return the headers of the current source record; could be an empty header if it is not
* available.
*
* <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, the record might not have any associated headers.
* Another example is
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide valid headers, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
*
* @return the headers
*/
Headers headers();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent
* to the repartition topic.
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();

/**
* Return the current processor node ID.
*
* @return the processor node ID
*/
String processorNodeId();

/**
* Return the task ID.
*
* @return the task ID
*/
TaskId taskId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Processing exception handler that logs a processing exception and then
* signals the processing pipeline to continue processing more records.
*/
public class LogAndContinueProcessingExceptionHandler implements ProcessingExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class);

@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
log.warn("Exception caught during message processing, " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);

return ProcessingHandlerResponse.CONTINUE;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Processing exception handler that logs a processing exception and then
* signals the processing pipeline to stop processing more records and fail.
*/
public class LogAndFailProcessingExceptionHandler implements ProcessingExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class);

@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
log.warn("Exception caught during message processing, " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);

return ProcessingHandlerResponse.FAIL;
}

@Override
public void configure(final Map<String, ?> configs) {
// ignore
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.processor.api.Record;

/**
* An interface that allows user code to inspect a record that has failed processing
*/
public interface ProcessingExceptionHandler extends Configurable {
/**
* Inspect a record and the exception received
*
* @param context processing context metadata
* @param record record where the exception occurred
* @param exception the actual exception
*/
ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception);

enum ProcessingHandlerResponse {
/* continue with processing */
CONTINUE(1, "CONTINUE"),
/* fail the processing and stop */
FAIL(2, "FAIL");

/**
* the permanent and immutable name of processing exception response
*/
public final String name;

/**
* the permanent and immutable id of processing exception response
*/
public final int id;

ProcessingHandlerResponse(final int id, final String name) {
this.id = id;
this.name = name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.processor.TaskId;

public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final String topic;
private final int partition;
private final long offset;
private final Headers headers;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;
private final String processorNodeId;
private final TaskId taskId;

public DefaultErrorHandlerContext(final String topic,
final int partition,
final long offset,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue,
final String processorNodeId,
final TaskId taskId) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.headers = headers;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
this.processorNodeId = processorNodeId;
this.taskId = taskId;
}

@Override
public String topic() {
return this.topic;
}

@Override
public int partition() {
return this.partition;
}

@Override
public long offset() {
return this.offset;
}

@Override
public Headers headers() {
return this.headers;
}

@Override
public byte[] sourceRawKey() {
return this.sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return this.sourceRawValue;
}

@Override
public String processorNodeId() {
return this.processorNodeId;
}

@Override
public TaskId taskId() {
return this.taskId;
}
}

0 comments on commit 8d11d95

Please sign in to comment.