diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java index 99e5562e6db4..5f6d48f12bef 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SqlPredicate.java @@ -119,6 +119,9 @@ private Predicate getStringKeyPredicate() { final ExpressionMetadata expressionEvaluator = createExpressionMetadata(); return (key, row) -> { + if (row == null) { + return false; + } try { Kudf[] kudfs = expressionEvaluator.getUdfs(); Object[] values = new Object[columnIndexes.length]; @@ -157,6 +160,9 @@ private ExpressionMetadata createExpressionMetadata() { private Predicate getWindowedKeyPredicate() { final ExpressionMetadata expressionEvaluator = createExpressionMetadata(); return (Predicate, GenericRow>) (key, row) -> { + if (row == null) { + return false; + } try { Kudf[] kudfs = expressionEvaluator.getUdfs(); Object[] values = new Object[columnIndexes.length]; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java index 8a99b6019f39..15547fd7d771 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java @@ -16,10 +16,14 @@ package io.confluent.ksql.structured; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.parser.tree.QuerySpecification; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -132,4 +136,23 @@ public void testFilterBiggerExpression() throws Exception { } + @Test + @SuppressWarnings("unchecked") + public void shouldIgnoreNullRows() { + final String selectQuery = "SELECT col0 FROM test1 WHERE col0 > 100;"; + final List statements = KSQL_PARSER.buildAst(selectQuery, metaStore); + final QuerySpecification querySpecification = (QuerySpecification)((Query) statements.get(0)).getQueryBody(); + final Expression filterExpr = querySpecification.getWhere().get(); + final PlanNode logicalPlan = buildLogicalPlan(selectQuery); + final FilterNode filterNode = (FilterNode) logicalPlan.getSources().get(0).getSources().get(0); + + initialSchemaKStream = new SchemaKStream(logicalPlan.getTheSourceNode().getSchema(), + kStream, + ksqlStream.getKeyField(), new ArrayList<>(), + SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, new MockSchemaRegistryClient()); + final SqlPredicate sqlPredicate = new SqlPredicate(filterExpr, initialSchemaKStream.getSchema(), false, ksqlConfig, functionRegistry); + final boolean result = sqlPredicate.getPredicate().test("key", null); + Assert.assertFalse(result); + } + } diff --git a/ksql-engine/src/test/resources/query-validation-tests/project-filter.json b/ksql-engine/src/test/resources/query-validation-tests/project-filter.json index dca324cc61a0..5517a4d74a1d 100644 --- a/ksql-engine/src/test/resources/query-validation-tests/project-filter.json +++ b/ksql-engine/src/test/resources/query-validation-tests/project-filter.json @@ -150,6 +150,20 @@ "outputs": [ {"topic": "S1", "key": 0, "value": "4294967296,456,foo", "timestamp": 0} ] + }, + { + "name": "Null row filter", + "statements": [ + "CREATE STREAM TEST (ID bigint, THING MAP) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM S3 as SELECT ID FROM TEST WHERE THING['status']='false';" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "timestamp": 0, "value": {"id": 1, "thing": {"other": 11, "status": false}}}, + {"topic": "test_topic", "key": 0, "timestamp": 0, "value": null} + ], + "outputs": [ + {"topic": "S3", "key": 0, "timestamp": 0, "value": {"ID":1}} + ] } ] } \ No newline at end of file