diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java deleted file mode 100644 index 0c50547549027..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors; - -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.TaskId; - -/** - * This interface allows user code to inspect the context of a record that has failed processing. - */ -public interface ErrorHandlerContext { - /** - * Return the topic name of the current input record; could be {@code null} if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record won't have an associated topic. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide a valid topic name, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the topic name - */ - String topic(); - - /** - * Return the partition ID of the current input record; could be {@code -1} if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record won't have an associated partition ID. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide a valid partition ID, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the partition ID - */ - int partition(); - - /** - * Return the offset of the current input record; could be {@code -1} if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record won't have an associated offset. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide a valid offset, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the offset - */ - long offset(); - - /** - * Return the headers of the current source record; could be an empty header if it is not - * available. - * - *

For example, if this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, the record might not have any associated headers. - * Another example is - * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)} - * (and siblings), that do not always guarantee to provide valid headers, as they might be - * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. - * - * @return the headers - */ - Headers headers(); - - /** - * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent - * to the repartition topic. - * - * @return the raw byte of the key of the source message - */ - byte[] sourceRawKey(); - - /** - * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent - * to the repartition topic. - * - * @return the raw byte of the value of the source message - */ - byte[] sourceRawValue(); - - /** - * Return the current processor node ID. - * - * @return the processor node ID - */ - String processorNodeId(); - - /** - * Return the task ID. - * - * @return the task ID - */ - TaskId taskId(); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java deleted file mode 100644 index bcfe9e37cedbf..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors; - -import org.apache.kafka.streams.processor.api.Record; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Processing exception handler that logs a processing exception and then - * signals the processing pipeline to continue processing more records. - */ -public class LogAndContinueProcessingExceptionHandler implements ProcessingExceptionHandler { - private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class); - - @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { - log.warn("Exception caught during message processing, " + - "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", - context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), - exception); - - return ProcessingHandlerResponse.CONTINUE; - } - - @Override - public void configure(final Map configs) { - // ignore - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java deleted file mode 100644 index d271199c099c0..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors; - -import org.apache.kafka.streams.processor.api.Record; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Processing exception handler that logs a processing exception and then - * signals the processing pipeline to stop processing more records and fail. - */ -public class LogAndFailProcessingExceptionHandler implements ProcessingExceptionHandler { - private static final Logger log = LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class); - - @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { - log.warn("Exception caught during message processing, " + - "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", - context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), - exception); - - return ProcessingHandlerResponse.FAIL; - } - - @Override - public void configure(final Map configs) { - // ignore - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java deleted file mode 100644 index 33b2596be1227..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors; - -import org.apache.kafka.common.Configurable; -import org.apache.kafka.streams.processor.api.Record; - -/** - * An interface that allows user code to inspect a record that has failed processing - */ -public interface ProcessingExceptionHandler extends Configurable { - /** - * Inspect a record and the exception received - * - * @param context processing context metadata - * @param record record where the exception occurred - * @param exception the actual exception - */ - ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception); - - enum ProcessingHandlerResponse { - /* continue with processing */ - CONTINUE(1, "CONTINUE"), - /* fail the processing and stop */ - FAIL(2, "FAIL"); - - /** - * the permanent and immutable name of processing exception response - */ - public final String name; - - /** - * the permanent and immutable id of processing exception response - */ - public final int id; - - ProcessingHandlerResponse(final int id, final String name) { - this.id = id; - this.name = name; - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java deleted file mode 100644 index 4723f247db603..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.errors.internals; - -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.errors.ErrorHandlerContext; -import org.apache.kafka.streams.processor.TaskId; - -public class DefaultErrorHandlerContext implements ErrorHandlerContext { - private final String topic; - private final int partition; - private final long offset; - private final Headers headers; - private final byte[] sourceRawKey; - private final byte[] sourceRawValue; - private final String processorNodeId; - private final TaskId taskId; - - public DefaultErrorHandlerContext(final String topic, - final int partition, - final long offset, - final Headers headers, - final byte[] sourceRawKey, - final byte[] sourceRawValue, - final String processorNodeId, - final TaskId taskId) { - this.topic = topic; - this.partition = partition; - this.offset = offset; - this.headers = headers; - this.sourceRawKey = sourceRawKey; - this.sourceRawValue = sourceRawValue; - this.processorNodeId = processorNodeId; - this.taskId = taskId; - } - - @Override - public String topic() { - return this.topic; - } - - @Override - public int partition() { - return this.partition; - } - - @Override - public long offset() { - return this.offset; - } - - @Override - public Headers headers() { - return this.headers; - } - - @Override - public byte[] sourceRawKey() { - return this.sourceRawKey; - } - - @Override - public byte[] sourceRawValue() { - return this.sourceRawValue; - } - - @Override - public String processorNodeId() { - return this.processorNodeId; - } - - @Override - public TaskId taskId() { - return this.taskId; - } -}