diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/table.json b/ksql-functional-tests/src/test/resources/query-validation-tests/table.json index b10e34d09aac..b971f5e3b8e6 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/table.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/table.json @@ -74,6 +74,22 @@ {"topic": "OUTPUT", "key": "1", "value": null}, {"topic": "OUTPUT", "key": "1", "value": "2"} ] + }, + { + "name": "should not blow up on null key", + "statements": [ + "CREATE TABLE INPUT (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE TABLE OUTPUT as SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": "1"}, + {"topic": "test_topic", "key": null, "value": "2"}, + {"topic": "test_topic", "key": "1", "value": "3"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": "1"}, + {"topic": "OUTPUT", "key": "1", "value": "3"} + ] } ] } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java index feae69a9e17c..153a584678c8 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java @@ -116,7 +116,7 @@ private static final class RowSerializer implements Serializer { @Override public byte[] serialize(final String topic, final Object struct) { - final Object value = ((Struct) struct).get(field); + final Object value = struct == null ? null : ((Struct) struct).get(field); return delegate.serialize(topic, value); } } @@ -140,6 +140,9 @@ private static final class RowDeserializer implements Deserializer { public Struct deserialize(final String topic, final byte[] bytes) { try { final Object primitive = delegate.deserialize(topic, bytes); + if (primitive == null) { + return null; + } final Struct struct = new Struct(schema); struct.put(field, primitive); return struct; diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java index d6839db609e8..bb12a1638824 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactoryTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.name.ColumnName; @@ -146,8 +147,31 @@ public void shouldThrowOnValidateIfStruct() { } @Test - public void shouldHandleNulls() { - shouldHandle(SqlTypes.INTEGER, null); + public void shouldSerializeNullAsNull() { + // Given: + final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER); + + final Serde serde = factory.createSerde(schema, ksqlConfig, srClientFactory); + + // When: + final byte[] result = serde.serializer().serialize("topic", null); + + // Then: + assertThat(result, is(nullValue())); + } + + @Test + public void shouldDeserializeNullAsNull() { + // Given: + final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER); + + final Serde serde = factory.createSerde(schema, ksqlConfig, srClientFactory); + + // When: + final Object result = serde.deserializer().deserialize("topic", null); + + // Then: + assertThat(result, is(nullValue())); } @Test