Skip to content

Commit

Permalink
Return false for null rows in predicate. (#1603)
Browse files Browse the repository at this point in the history
* Return false for null rows in predicate.

* Added unit test.

* Fixed test fail resulted from merge.
  • Loading branch information
hjafarpour authored Aug 2, 2018
1 parent 97ead03 commit 7019545
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ private Predicate<String, GenericRow> getStringKeyPredicate() {
final ExpressionMetadata expressionEvaluator = createExpressionMetadata();

return (key, row) -> {
if (row == null) {
return false;
}
try {
Kudf[] kudfs = expressionEvaluator.getUdfs();
Object[] values = new Object[columnIndexes.length];
Expand Down Expand Up @@ -157,6 +160,9 @@ private ExpressionMetadata createExpressionMetadata() {
private Predicate getWindowedKeyPredicate() {
final ExpressionMetadata expressionEvaluator = createExpressionMetadata();
return (Predicate<Windowed<String>, GenericRow>) (key, row) -> {
if (row == null) {
return false;
}
try {
Kudf[] kudfs = expressionEvaluator.getUdfs();
Object[] values = new Object[columnIndexes.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package io.confluent.ksql.structured;

import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QuerySpecification;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -132,4 +136,23 @@ public void testFilterBiggerExpression() throws Exception {

}

@Test
@SuppressWarnings("unchecked")
public void shouldIgnoreNullRows() {
final String selectQuery = "SELECT col0 FROM test1 WHERE col0 > 100;";
final List<Statement> statements = KSQL_PARSER.buildAst(selectQuery, metaStore);
final QuerySpecification querySpecification = (QuerySpecification)((Query) statements.get(0)).getQueryBody();
final Expression filterExpr = querySpecification.getWhere().get();
final PlanNode logicalPlan = buildLogicalPlan(selectQuery);
final FilterNode filterNode = (FilterNode) logicalPlan.getSources().get(0).getSources().get(0);

initialSchemaKStream = new SchemaKStream(logicalPlan.getTheSourceNode().getSchema(),
kStream,
ksqlStream.getKeyField(), new ArrayList<>(),
SchemaKStream.Type.SOURCE, ksqlConfig, functionRegistry, new MockSchemaRegistryClient());
final SqlPredicate sqlPredicate = new SqlPredicate(filterExpr, initialSchemaKStream.getSchema(), false, ksqlConfig, functionRegistry);
final boolean result = sqlPredicate.getPredicate().test("key", null);
Assert.assertFalse(result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@
"outputs": [
{"topic": "S1", "key": 0, "value": "4294967296,456,foo", "timestamp": 0}
]
},
{
"name": "Null row filter",
"statements": [
"CREATE STREAM TEST (ID bigint, THING MAP<VARCHAR, VARCHAR>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM S3 as SELECT ID FROM TEST WHERE THING['status']='false';"
],
"inputs": [
{"topic": "test_topic", "key": 0, "timestamp": 0, "value": {"id": 1, "thing": {"other": 11, "status": false}}},
{"topic": "test_topic", "key": 0, "timestamp": 0, "value": null}
],
"outputs": [
{"topic": "S3", "key": 0, "timestamp": 0, "value": {"ID":1}}
]
}
]
}

0 comments on commit 7019545

Please sign in to comment.