From e7f6c1e1a7590244cca88622c9e1f6b3545b1dc1 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 6 Mar 2020 23:27:04 -0800 Subject: [PATCH] MINOR: fix ClassCastException handling (#8156) Reviewers: John Roesler --- .../internals/RecordCollectorImpl.java | 17 +++ .../streams/processor/internals/SinkNode.java | 16 +-- .../internals/RecordCollectorTest.java | 115 ++++++++++++++++++ .../processor/internals/SinkNodeTest.java | 58 +-------- 4 files changed, 134 insertions(+), 72 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index c5ac440a809e8..4fbcdb982d844 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -152,6 +152,23 @@ public void send(final String topic, try { keyBytes = keySerializer.serialize(topic, headers, key); valBytes = valueSerializer.serialize(topic, headers, value); + } catch (final ClassCastException exception) { + final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); + final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); + throw new StreamsException( + String.format( + "ClassCastException while producing data to topic %s. " + + "A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + + "(key type: %s / value type: %s). " + + "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with " + + "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", + topic, + keySerializer.getClass().getName(), + valueSerializer.getClass().getName(), + keyClass, + valueClass), + exception); } catch (final RuntimeException exception) { final String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception.toString()); throw new StreamsException(errorMessage, exception); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index e3333be03e06e..e0f2510de9364 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -85,21 +85,7 @@ public void process(final K key, final V value) { final String topic = topicExtractor.extract(key, value, this.context.recordContext()); - try { - collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner); - } catch (final ClassCastException e) { - final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); - final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); - throw new StreamsException( - String.format("ClassCastException while producing data to a sink topic. A serializer (key: %s / value: %s) is not compatible to the actual key or value type " + - "(key type: %s / value type: %s). Change the default Serdes in StreamConfig or " + - "provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", - keySerializer.getClass().getName(), - valSerializer.getClass().getName(), - keyClass, - valueClass), - e); - } + collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 7c6752d21d5a9..e09d99c952cab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -42,6 +42,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; @@ -71,6 +73,7 @@ import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -342,6 +345,118 @@ public void shouldAbortTxIfEosEnabled() { verify(streamsProducer); } + @SuppressWarnings("unchecked") + @Test + public void shouldThrowInformativeStreamsExceptionOnKeyClassCastException() { + final StreamsException expected = assertThrows( + StreamsException.class, + () -> this.collector.send( + "topic", + "key", + "value", + new RecordHeaders(), + 0, + 0L, + (Serializer) new LongSerializer(), // need to add cast to trigger `ClassCastException` + new StringSerializer()) + ); + + assertThat(expected.getCause(), instanceOf(ClassCastException.class)); + assertThat( + expected.getMessage(), + equalTo( + "ClassCastException while producing data to topic topic. " + + "A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) " + + "is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). " + + "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + ); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldThrowInformativeStreamsExceptionOnKeyAndNullValueClassCastException() { + final StreamsException expected = assertThrows( + StreamsException.class, + () -> this.collector.send( + "topic", + "key", + null, + new RecordHeaders(), + 0, + 0L, + (Serializer) new LongSerializer(), // need to add cast to trigger `ClassCastException` + new StringSerializer()) + ); + + assertThat(expected.getCause(), instanceOf(ClassCastException.class)); + assertThat( + expected.getMessage(), + equalTo( + "ClassCastException while producing data to topic topic. " + + "A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) " + + "is not compatible to the actual key or value type (key type: java.lang.String / value type: unknown because value is null). " + + "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + ); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldThrowInformativeStreamsExceptionOnValueClassCastException() { + final StreamsException expected = assertThrows( + StreamsException.class, + () -> this.collector.send( + "topic", + "key", + "value", + new RecordHeaders(), + 0, + 0L, + new StringSerializer(), + (Serializer) new LongSerializer()) // need to add cast to trigger `ClassCastException` + ); + + assertThat(expected.getCause(), instanceOf(ClassCastException.class)); + assertThat( + expected.getMessage(), + equalTo( + "ClassCastException while producing data to topic topic. " + + "A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) " + + "is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). " + + "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + ); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldThrowInformativeStreamsExceptionOnValueAndNullKeyClassCastException() { + final StreamsException expected = assertThrows( + StreamsException.class, + () -> this.collector.send( + "topic", + null, + "value", + new RecordHeaders(), + 0, + 0L, + new StringSerializer(), + (Serializer) new LongSerializer()) // need to add cast to trigger `ClassCastException` + ); + + assertThat(expected.getCause(), instanceOf(ClassCastException.class)); + assertThat( + expected.getMessage(), + equalTo( + "ClassCastException while producing data to topic topic. " + + "A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) " + + "is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). " + + "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters " + + "(for example if using the DSL, `#to(String topic, Produced produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).") + ); + } + @Test public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedInCallback() { final KafkaException exception = new ProducerFencedException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index bee33dd454edd..0c303d26d24dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -16,40 +16,22 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; import org.junit.Before; import org.junit.Test; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; public class SinkNodeTest { private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); private final Serializer anySerializer = Serdes.ByteArray().serializer(); - private final RecordCollector recordCollector = new MockRecordCollector() { - @Override - public void send(final String topic, - final K key, - final V value, - final Headers headers, - final Long timestamp, - final Serializer keySerializer, - final Serializer valueSerializer, - final StreamPartitioner partitioner) { - throw new ClassCastException("boom"); - } - }; - + private final RecordCollector recordCollector = new MockRecordCollector(); private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, recordCollector); private final SinkNode sink = new SinkNode<>("anyNodeName", new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null); @@ -75,42 +57,4 @@ public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { } } - @Test - public void shouldThrowStreamsExceptionWithClassCastFromRecordCollector() { - // When/Then - context.setTime(0); - try { - illTypedSink.process("key", "value"); - fail("Should have thrown StreamsException"); - } catch (final StreamsException e) { - assertThat(e.getCause(), instanceOf(ClassCastException.class)); - } - } - - @Test - public void shouldThrowStreamsExceptionNullKeyWithClassCastFromRecordCollector() { - // When/Then - context.setTime(1); - try { - illTypedSink.process(null, ""); - fail("Should have thrown StreamsException"); - } catch (final StreamsException e) { - assertThat(e.getCause(), instanceOf(ClassCastException.class)); - assertThat(e.getMessage(), containsString("unknown because key is null")); - } - } - - @Test - public void shouldThrowStreamsExceptionNullValueWithClassCastFromRecordCollector() { - // When/Then - context.setTime(1); - try { - illTypedSink.process("", null); - fail("Should have thrown StreamsException"); - } catch (final StreamsException e) { - assertThat(e.getCause(), instanceOf(ClassCastException.class)); - assertThat(e.getMessage(), containsString("unknown because value is null")); - } - } - }