From 9c3cbf8144b49b83c14120bac7b74211190b5590 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Mon, 10 Feb 2020 23:41:22 +0000 Subject: [PATCH] fix: support conversion of STRING to BIGINT for window bounds (#4500) Fixes: https://github.com/confluentinc/ksql/issues/4482 Comparison and Between expressions in the WHERE clause already support magic conversion from a STRING containing a ISO formatted datetime into a BIGINT for the `ROWTIME` column. This changes extends the support to cover the `WINDOWSTART` and `WINDOWEND` columns. The change also fixes a bug where by a numeric Between expression on `ROWTIME` resulted in a class-cast exception, e.g. `WHERE ROWTIME < 123546794894`. --- .../rewrite/ExpressionTreeRewriter.java | 6 +- ...tementRewriteForMagicPseudoTimestamp.java} | 87 +++-- .../ksql/structured/SchemaKStream.java | 17 +- .../ksql/structured/SchemaKTable.java | 2 +- ...entRewriteForMagicPseudoTimestampTest.java | 329 ++++++++++++++++++ .../StatementRewriteForRowtimeTest.java | 212 ----------- .../expression/tree/ComparisonExpression.java | 40 --- .../magic-timestamp-conversion.json | 178 ++++++++++ .../query-validation-tests/rowtime.json | 101 ------ 9 files changed, 569 insertions(+), 403 deletions(-) rename ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/{StatementRewriteForRowtime.java => StatementRewriteForMagicPseudoTimestamp.java} (55%) create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForMagicPseudoTimestampTest.java delete mode 100644 ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java create mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/magic-timestamp-conversion.json delete mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java index f6f2b6f976d8..1bfed37935f8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/ExpressionTreeRewriter.java @@ -93,18 +93,16 @@ public Expression process(final Expression expression) { private final RewritingVisitor rewriter; - @SuppressWarnings("unchecked") public static T rewriteWith( final BiFunction, Optional> plugin, final T expression) { return rewriteWith(plugin, expression, null); } - @SuppressWarnings("unchecked") public static T rewriteWith( final BiFunction, Optional> plugin, final T expression, final C context) { - return new ExpressionTreeRewriter(plugin).rewrite(expression, context); + return new ExpressionTreeRewriter<>(plugin).rewrite(expression, context); } @SuppressWarnings("unchecked") @@ -182,7 +180,7 @@ public Expression visitSubscriptExpression( final SubscriptExpression node, final C context) { final Optional result - = plugin.apply(node, new Context(context, this)); + = plugin.apply(node, new Context<>(context, this)); if (result.isPresent()) { return result.get(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtime.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForMagicPseudoTimestamp.java similarity index 55% rename from ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtime.java rename to ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForMagicPseudoTimestamp.java index 0c53859abc2b..fd6743829148 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtime.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriteForMagicPseudoTimestamp.java @@ -16,6 +16,7 @@ package io.confluent.ksql.engine.rewrite; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context; import io.confluent.ksql.execution.expression.tree.BetweenPredicate; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; @@ -24,28 +25,33 @@ import io.confluent.ksql.execution.expression.tree.LongLiteral; import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; +import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.util.SchemaUtil; import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser; import java.util.Objects; import java.util.Optional; +import java.util.Set; + +public class StatementRewriteForMagicPseudoTimestamp { + + private static final Set SUPPORTED_COLUMNS = ImmutableSet.builder() + .addAll(SchemaUtil.windowBoundsColumnNames()) + .add(SchemaUtil.ROWTIME_NAME) + .build(); -public class StatementRewriteForRowtime { private final PartialStringToTimestampParser parser; - public StatementRewriteForRowtime() { + public StatementRewriteForMagicPseudoTimestamp() { this(new PartialStringToTimestampParser()); } @VisibleForTesting - StatementRewriteForRowtime(final PartialStringToTimestampParser parser) { + StatementRewriteForMagicPseudoTimestamp(final PartialStringToTimestampParser parser) { this.parser = Objects.requireNonNull(parser, "parser"); } - public Expression rewriteForRowtime(final Expression expression) { - if (noRewriteRequired(expression)) { - return expression; - } + public Expression rewrite(final Expression expression) { return new ExpressionTreeRewriter<>(new OperatorPlugin()::process) .rewrite(expression, null); } @@ -66,7 +72,13 @@ public Optional visitBetweenPredicate( final BetweenPredicate node, final Context context ) { - if (noRewriteRequired(node.getValue())) { + if (!supportedColumnRef(node.getValue())) { + return Optional.empty(); + } + + final Optional min = maybeRewriteTimestamp(node.getMin()); + final Optional max = maybeRewriteTimestamp(node.getMax()); + if (!min.isPresent() && !max.isPresent()) { return Optional.empty(); } @@ -74,8 +86,8 @@ public Optional visitBetweenPredicate( new BetweenPredicate( node.getLocation(), node.getValue(), - rewriteTimestamp(((StringLiteral) node.getMin()).getValue()), - rewriteTimestamp(((StringLiteral) node.getMax()).getValue()) + min.orElse(node.getMin()), + max.orElse(node.getMax()) ) ); } @@ -85,42 +97,45 @@ public Optional visitComparisonExpression( final ComparisonExpression node, final Context context ) { - if (expressionIsRowtime(node.getLeft()) && node.getRight() instanceof StringLiteral) { - return Optional.of( - new ComparisonExpression( - node.getLocation(), - node.getType(), - node.getLeft(), - rewriteTimestamp(((StringLiteral) node.getRight()).getValue()) - ) - ); + if (supportedColumnRef(node.getLeft())) { + final Optional right = maybeRewriteTimestamp(node.getRight()); + return right.map(r -> new ComparisonExpression( + node.getLocation(), + node.getType(), + node.getLeft(), + r + )); } - if (expressionIsRowtime(node.getRight()) && node.getLeft() instanceof StringLiteral) { - return Optional.of( - new ComparisonExpression( - node.getLocation(), - node.getType(), - rewriteTimestamp(((StringLiteral) node.getLeft()).getValue()), - node.getRight() - ) - ); + if (supportedColumnRef(node.getRight())) { + final Optional left = maybeRewriteTimestamp(node.getLeft()); + return left.map(l -> new ComparisonExpression( + node.getLocation(), + node.getType(), + l, + node.getRight() + )); } return Optional.empty(); } } - private static boolean refIsRowtime(final ColumnReferenceExp node) { - return node.getReference().equals(SchemaUtil.ROWTIME_NAME); - } + private Optional maybeRewriteTimestamp(final Expression maybeTimestamp) { + if (!(maybeTimestamp instanceof StringLiteral)) { + return Optional.empty(); + } - private static boolean expressionIsRowtime(final Expression node) { - return (node instanceof ColumnReferenceExp) - && refIsRowtime((ColumnReferenceExp) node); + final String text = ((StringLiteral) maybeTimestamp).getValue(); + + return Optional.of(new LongLiteral(parser.parse(text))); } - private LongLiteral rewriteTimestamp(final String timestamp) { - return new LongLiteral(parser.parse(timestamp)); + private static boolean supportedColumnRef(final Expression maybeColumnRef) { + if (!(maybeColumnRef instanceof ColumnReferenceExp)) { + return false; + } + + return SUPPORTED_COLUMNS.contains(((ColumnReferenceExp) maybeColumnRef).getReference()); } } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index bd8b07985b7d..d6cc5cc0e74a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -17,7 +17,7 @@ import static java.util.Objects.requireNonNull; -import io.confluent.ksql.engine.rewrite.StatementRewriteForRowtime; +import io.confluent.ksql.engine.rewrite.StatementRewriteForMagicPseudoTimestamp; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryContext.Stacker; @@ -136,8 +136,7 @@ public SchemaKStream filter( } static Expression rewriteTimeComparisonForFilter(final Expression expression) { - return new StatementRewriteForRowtime() - .rewriteForRowtime(expression); + return new StatementRewriteForMagicPseudoTimestamp().rewrite(expression); } public SchemaKStream select( @@ -325,7 +324,7 @@ public SchemaKStream selectKey( final Expression keyExpression, final QueryContext.Stacker contextStacker ) { - if (!needsRepartition(keyExpression)) { + if (repartitionNotNeeded(keyExpression)) { return (SchemaKStream) this; } @@ -360,9 +359,9 @@ private KeyField getNewKeyField(final Expression expression) { return getSchema().isMetaColumn(columnName) ? KeyField.none() : newKeyField; } - protected boolean needsRepartition(final Expression expression) { + protected boolean repartitionNotNeeded(final Expression expression) { if (!(expression instanceof UnqualifiedColumnReferenceExp)) { - return true; + return false; } final ColumnName columnName = ((UnqualifiedColumnReferenceExp) expression).getReference(); @@ -379,7 +378,7 @@ protected boolean needsRepartition(final Expression expression) { .map(kf -> kf.ref().equals(proposedKey.ref())) .orElse(false); - return !namesMatch && !isRowKey(columnName); + return namesMatch || isRowKey(columnName); } private boolean isRowKey(final ColumnName fieldName) { @@ -453,7 +452,7 @@ public SchemaKGroupedStream groupBy( ); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) private SchemaKGroupedStream groupByKey( final KeyFormat rekeyedKeyFormat, final ValueFormat valueFormat, @@ -534,7 +533,7 @@ LogicalSchema resolveSchema(final ExecutionStep step) { return new StepSchemaResolver(ksqlConfig, functionRegistry).resolve(step, schema); } - LogicalSchema resolveSchema(final ExecutionStep step, final SchemaKStream right) { + LogicalSchema resolveSchema(final ExecutionStep step, final SchemaKStream right) { return new StepSchemaResolver(ksqlConfig, functionRegistry).resolve( step, schema, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index e74006e2f3b7..86312cdc1148 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -139,7 +139,7 @@ public SchemaKTable select( @Override public SchemaKStream selectKey(final Expression keyExpression, final Stacker contextStacker) { - if (!needsRepartition(keyExpression)) { + if (repartitionNotNeeded(keyExpression)) { return (SchemaKStream) this; } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForMagicPseudoTimestampTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForMagicPseudoTimestampTest.java new file mode 100644 index 000000000000..1a7a74d3582b --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForMagicPseudoTimestampTest.java @@ -0,0 +1,329 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.engine.rewrite; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.parser.tree.Query; +import io.confluent.ksql.util.KsqlParserTestUtil; +import io.confluent.ksql.util.MetaStoreFixture; +import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@SuppressWarnings("OptionalGetWithoutIsPresent") +@RunWith(MockitoJUnitRunner.class) +public class StatementRewriteForMagicPseudoTimestampTest { + + private static final long A_TIMESTAMP = 1234567890L; + private static final long ANOTHER_TIMESTAMP = 1234568890L; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private PartialStringToTimestampParser parser; + private MetaStore metaStore; + private StatementRewriteForMagicPseudoTimestamp rewritter; + + @Before + public void init() { + metaStore = MetaStoreFixture.getNewMetaStore(mock(FunctionRegistry.class)); + rewritter = new StatementRewriteForMagicPseudoTimestamp(parser); + + when(parser.parse(any())) + .thenReturn(A_TIMESTAMP) + .thenReturn(ANOTHER_TIMESTAMP); + } + + @Test + public void shouldPassRowTimeStringsToTheParser() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ROWTIME = '2017-01-01T00:44:00.000';"); + + // When: + rewritter.rewrite(predicate); + + // Then: + verify(parser).parse("2017-01-01T00:44:00.000"); + } + + @Test + public void shouldNotReplaceComparisonRowTimeAndNonString() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ROWTIME > 10.25;"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten, is(predicate)); + } + + @Test + public void shouldReplaceComparisonOfRowTimeAndString() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ROWTIME > '2017-01-01T00:00:00.000';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten.toString(), is(String.format("(ORDERS.ROWTIME > %d)", A_TIMESTAMP))); + } + + @Test + public void shouldReplaceComparisonOfWindowStartAndString() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where WINDOWSTART > '2017-01-01T00:00:00.000';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten.toString(), is(String.format("(ORDERS.WINDOWSTART > %d)", A_TIMESTAMP))); + } + + @Test + public void shouldReplaceComparisonOfWindowEndAndString() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where WINDOWEND > '2017-01-01T00:00:00.000';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten.toString(), is(String.format("(ORDERS.WINDOWEND > %d)", A_TIMESTAMP))); + } + + @Test + public void shouldReplaceComparisonInReverseOrder() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where '2017-01-01T00:00:00.000' < ROWTIME;"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten.toString(), is(String.format("(%d < ORDERS.ROWTIME)", A_TIMESTAMP))); + } + + @Test + public void shouldReplaceBetweenRowTimeAndStrings() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ROWTIME BETWEEN '2017-01-01' AND '2017-02-01';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat( + rewritten.toString(), + is(String.format("(ORDERS.ROWTIME BETWEEN %d AND %d)", A_TIMESTAMP, ANOTHER_TIMESTAMP)) + ); + } + + @Test + public void shouldReplaceBetweenWindowStartAndStrings() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where WINDOWSTART BETWEEN '2017-01-01' AND '2017-02-01';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat( + rewritten.toString(), + is(String.format("(ORDERS.WINDOWSTART BETWEEN %d AND %d)", A_TIMESTAMP, ANOTHER_TIMESTAMP)) + ); + } + + @Test + public void shouldReplaceBetweenWindowEndAndStrings() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where WINDOWEND BETWEEN '2017-01-01' AND '2017-02-01';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat( + rewritten.toString(), + is(String.format("(ORDERS.WINDOWEND BETWEEN %d AND %d)", A_TIMESTAMP, ANOTHER_TIMESTAMP)) + ); + } + + @Test + public void shouldReplaceBetweenOnMinString() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where WINDOWSTART BETWEEN '2017-01-01' AND 1236987;"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat( + rewritten.toString(), + is(String.format("(ORDERS.WINDOWSTART BETWEEN %d AND 1236987)", A_TIMESTAMP)) + ); + } + + @Test + public void shouldReplaceBetweenOnMaxString() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where WINDOWEND BETWEEN 1236987 AND '2017-01-01';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat( + rewritten.toString(), + is(String.format("(ORDERS.WINDOWEND BETWEEN 1236987 AND %d)", A_TIMESTAMP)) + ); + } + + @Test + public void shouldNotReplaceBetweenExpressionOnNonString() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ROWTIME BETWEEN 123456 AND 147258;"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten, is(predicate)); + } + + @Test + public void shouldReplaceQualifiedColumns() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ORDERS.ROWTIME > '2017-01-01T00:00:00.000';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten.toString(), is(String.format("(ORDERS.ROWTIME > %d)", A_TIMESTAMP))); + } + + @Test + public void shouldNotReplaceStringsInFunctions() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ROWTIME = foo('2017-01-01');"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + verify(parser, never()).parse(any()); + assertThat(rewritten.toString(), is("(ORDERS.ROWTIME = FOO('2017-01-01'))")); + } + + @Test + public void shouldNotReplaceUnsupportedColumns() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where ROWTIME > '2017-01-01' AND ROWKEY = '2017-01-01';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + assertThat(rewritten.toString(), + is(String.format("((ORDERS.ROWTIME > %d) AND (ORDERS.ROWKEY = '2017-01-01'))", + A_TIMESTAMP))); + } + + @Test + public void shouldNotReplaceSupportedInFunction() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where foo(ROWTIME) = '2017-01-01';"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + verify(parser, never()).parse(any()); + assertThat(rewritten.toString(), containsString("(FOO(ORDERS.ROWTIME) = '2017-01-01')")); + } + + @Test + public void shouldNotReplaceArithmetic() { + // Given: + final Expression predicate = getPredicate( + "SELECT * FROM orders where '2017-01-01' + 10000 > ROWTIME;"); + + // When: + final Expression rewritten = rewritter.rewrite(predicate); + + // Then: + verify(parser, never()).parse(any()); + assertThat(rewritten.toString(), containsString("(('2017-01-01' + 10000) > ORDERS.ROWTIME)")); + } + + @Test + public void shouldThrowParseError() { + // Given: + final Expression predicate = getPredicate("SELECT * FROM orders where ROWTIME = '2017-01-01';"); + when(parser.parse(any())).thenThrow(new IllegalArgumentException("it no good")); + + // Expect: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("it no good"); + + // When: + rewritter.rewrite(predicate); + } + + private Expression getPredicate(final String querySql) { + final Query statement = (Query) KsqlParserTestUtil + .buildSingleAst(querySql, metaStore) + .getStatement(); + + return statement.getWhere().get(); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java deleted file mode 100644 index d8f42c913591..000000000000 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriteForRowtimeTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.engine.rewrite; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.function.FunctionRegistry; -import io.confluent.ksql.metastore.MetaStore; -import io.confluent.ksql.parser.tree.Query; -import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.util.KsqlParserTestUtil; -import io.confluent.ksql.util.MetaStoreFixture; -import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser; -import io.confluent.ksql.util.timestamp.StringToTimestampParser; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@SuppressWarnings("OptionalGetWithoutIsPresent") -@RunWith(MockitoJUnitRunner.class) -public class StatementRewriteForRowtimeTest { - - private static final StringToTimestampParser PARSER = - new StringToTimestampParser("yyyy-MM-dd'T'HH:mm:ss.SSS"); - - private static final long A_TIMESTAMP = 1234567890L; - - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - - @Mock - private PartialStringToTimestampParser parser; - private MetaStore metaStore; - private StatementRewriteForRowtime rewritter; - - @Before - public void init() { - metaStore = MetaStoreFixture.getNewMetaStore(mock(FunctionRegistry.class)); - rewritter = new StatementRewriteForRowtime(parser); - - when(parser.parse(any())).thenReturn(A_TIMESTAMP); - } - - @Test - public void shouldPassRowTimeStringsToTheParser() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where ROWTIME = '2017-01-01T00:44:00.000';"); - final Expression predicate = statement.getWhere().get(); - - // When: - rewritter.rewriteForRowtime(predicate); - - // Then: - verify(parser).parse("2017-01-01T00:44:00.000"); - } - - @Test - public void shouldReplaceDateString() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where ROWTIME > '2017-01-01T00:00:00.000';"); - final Expression predicate = statement.getWhere().get(); - - // When: - final Expression rewritten = rewritter.rewriteForRowtime(predicate); - - // Then: - assertThat(rewritten.toString(), is(String.format("(ORDERS.ROWTIME > %d)", A_TIMESTAMP))); - } - - @Test - public void shouldReplaceDateStringComparedToQualifiedRowtimeColumnReference() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where ORDERS.ROWTIME > '2017-01-01T00:00:00.000';"); - final Expression predicate = statement.getWhere().get(); - - // When: - final Expression rewritten = rewritter.rewriteForRowtime(predicate); - - // Then: - assertThat(rewritten.toString(), is(String.format("(ORDERS.ROWTIME > %d)", A_TIMESTAMP))); - } - - @Test - public void shouldHandleBetweenExpression() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where ROWTIME BETWEEN '2017-01-01' AND '2017-02-01';"); - final Expression predicate = statement.getWhere().get(); - - when(parser.parse(any())) - .thenReturn(A_TIMESTAMP) - .thenReturn(7654L); - - // When: - final Expression rewritten = rewritter.rewriteForRowtime(predicate); - - // Then: - assertThat( - rewritten.toString(), - is(String.format("(ORDERS.ROWTIME BETWEEN %d AND %d)", A_TIMESTAMP, 7654L)) - ); - } - - @Test - public void shouldNotProcessStringsInFunctions() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where ROWTIME = foo('2017-01-01');"); - final Expression predicate = statement.getWhere().get(); - - // When: - final Expression rewritten = rewritter.rewriteForRowtime(predicate); - - // Then: - verify(parser, never()).parse(any()); - assertThat(rewritten.toString(), is("(ORDERS.ROWTIME = FOO('2017-01-01'))")); - } - - @Test - public void shouldIgnoreNonRowtimeStrings() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where ROWTIME > '2017-01-01' AND ROWKEY = '2017-01-01';"); - final Expression predicate = statement.getWhere().get(); - - // When: - final Expression rewritten = rewritter.rewriteForRowtime(predicate); - - // Then: - assertThat(rewritten.toString(), - is(String.format("((ORDERS.ROWTIME > %d) AND (ORDERS.ROWKEY = '2017-01-01'))", - A_TIMESTAMP))); - } - - @Test - public void shouldNotProcessWhenRowtimeInFunction() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where foo(ROWTIME) = '2017-01-01';"); - final Expression predicate = statement.getWhere().get(); - - // When: - final Expression rewritten = rewritter.rewriteForRowtime(predicate); - - // Then: - verify(parser, never()).parse(any()); - assertThat(rewritten.toString(), containsString("(FOO(ORDERS.ROWTIME) = '2017-01-01')")); - } - - @Test - public void shouldNotProcessArithmetic() { - // Given: - final Query statement = buildSingleAst( - "SELECT * FROM orders where '2017-01-01' + 10000 > ROWTIME;"); - final Expression predicate = statement.getWhere().get(); - - // When: - final Expression rewritten = rewritter.rewriteForRowtime(predicate); - - // Then: - verify(parser, never()).parse(any()); - assertThat(rewritten.toString(), containsString("(('2017-01-01' + 10000) > ORDERS.ROWTIME)")); - } - - @Test - public void shouldThrowParseError() { - // Given: - final Query statement = buildSingleAst("SELECT * FROM orders where ROWTIME = '2017-01-01';"); - final Expression predicate = statement.getWhere().get(); - when(parser.parse(any())).thenThrow(new IllegalArgumentException("it no good")); - - // Expect: - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("it no good"); - - // When: - rewritter.rewriteForRowtime(predicate); - } - - @SuppressWarnings("unchecked") - private T buildSingleAst(final String query) { - return (T) KsqlParserTestUtil.buildSingleAst(query, metaStore).getStatement(); - } -} \ No newline at end of file diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java index e97a238d8735..853347e0a85f 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/expression/tree/ComparisonExpression.java @@ -43,46 +43,6 @@ public enum Type { public String getValue() { return value; } - - public Type flip() { - switch (this) { - case EQUAL: - return EQUAL; - case NOT_EQUAL: - return NOT_EQUAL; - case LESS_THAN: - return GREATER_THAN; - case LESS_THAN_OR_EQUAL: - return GREATER_THAN_OR_EQUAL; - case GREATER_THAN: - return LESS_THAN; - case GREATER_THAN_OR_EQUAL: - return LESS_THAN_OR_EQUAL; - case IS_DISTINCT_FROM: - return IS_DISTINCT_FROM; - default: - throw new IllegalArgumentException("Unsupported comparison: " + this); - } - } - - public Type negate() { - switch (this) { - case EQUAL: - return NOT_EQUAL; - case NOT_EQUAL: - return EQUAL; - case LESS_THAN: - return GREATER_THAN_OR_EQUAL; - case LESS_THAN_OR_EQUAL: - return GREATER_THAN; - case GREATER_THAN: - return LESS_THAN_OR_EQUAL; - case GREATER_THAN_OR_EQUAL: - return LESS_THAN; - default: - throw new IllegalArgumentException("Unsupported comparison: " + this); - } - } } private final Type type; diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/magic-timestamp-conversion.json b/ksql-functional-tests/src/test/resources/query-validation-tests/magic-timestamp-conversion.json new file mode 100644 index 000000000000..0fcf347dfcd3 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/magic-timestamp-conversion.json @@ -0,0 +1,178 @@ +{ + "comments": [ + "Tests covering the magic conversion of STRING to BIGINT for ROWTIME, WINDOWSTART and WINDOWEND system columns" + ], + "tests": [ + { + "name": "comparison predicate on BIGINT ROWTIME in WHERE", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME > 1514764800001;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1514764800001}, + {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 1546300808000}, + {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 1514764800002}, + {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 1514764800000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "2", "value": {"THING": 2}, "timestamp": 1546300808000}, + {"topic": "OUTPUT", "key": "3", "value": {"THING": 3}, "timestamp": 1514764800002} + ] + }, + { + "name": "comparison predicate on STRING ROWTIME in WHERE", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME>'2018-01-01T00:00:00.001';" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1514764800001}, + {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 1546300808000}, + {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 1514764800002}, + {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 1514764800000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "2", "value": {"THING": 2}, "timestamp": 1546300808000}, + {"topic": "OUTPUT", "key": "3", "value": {"THING": 3}, "timestamp": 1514764800002} + ] + }, + { + "name": "comparison predicate on BIGINT window bounds in WHERE", + "statements": [ + "CREATE STREAM INPUT (source INT) WITH (kafka_topic='test_topic', value_format='JSON', WINDOW_TYPE='Session');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT WHERE 1581323504001 <= WINDOWSTART AND WINDOWEND <= 1581323505001;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"source": 1}, "window": {"start": 1581323504000, "end": 1581323505001, "type": "session"}}, + {"topic": "test_topic", "value": {"source": 2}, "window": {"start": 1581323504001, "end": 1581323505001, "type": "session"}}, + {"topic": "test_topic", "value": {"source": 3}, "window": {"start": 1581323504001, "end": 1581323505002, "type": "session"}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"SOURCE": 2}, "window": {"start": 1581323504001, "end": 1581323505001, "type": "session"}} + ] + }, + { + "name": "comparison predicate on STRING window bounds in WHERE", + "statements": [ + "CREATE STREAM INPUT (source INT) WITH (kafka_topic='test_topic', value_format='JSON', WINDOW_TYPE='Session');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT WHERE '2020-02-10T08:31:44.001+0000' <= WINDOWSTART AND WINDOWEND <= '2020-02-10T08:31:45.001+0000';" + ], + "inputs": [ + {"topic": "test_topic", "value": {"source": 1}, "window": {"start": 1581323504000, "end": 1581323505001, "type": "session"}}, + {"topic": "test_topic", "value": {"source": 2}, "window": {"start": 1581323504001, "end": 1581323505001, "type": "session"}}, + {"topic": "test_topic", "value": {"source": 3}, "window": {"start": 1581323504001, "end": 1581323505002, "type": "session"}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"SOURCE": 2}, "window": {"start": 1581323504001, "end": 1581323505001, "type": "session"}} + ] + }, + { + "name": "between predicate on BIGINT ROWTIME in WHERE", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME BETWEEN 1514764800001 AND 1514764801000;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1514764799999}, + {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 1514764800001}, + {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 1514764801000}, + {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 1514764801001} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "2", "value": {"THING": 2}, "timestamp": 1514764800001}, + {"topic": "OUTPUT", "key": "3", "value": {"THING": 3}, "timestamp": 1514764801000} + ] + }, + { + "name": "between predicate on STRING ROWTIME in WHERE", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME BETWEEN '2018-01-01T00:00:00.001' AND '2018-01-01T00:00:01';" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1514764799999}, + {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 1514764800001}, + {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 1514764801000}, + {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 1514764801001} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "2", "value": {"THING": 2}, "timestamp": 1514764800001}, + {"topic": "OUTPUT", "key": "3", "value": {"THING": 3}, "timestamp": 1514764801000} + ] + }, + { + "name": "between predicate on STRING WINDOWSTART in WHERE", + "statements": [ + "CREATE STREAM INPUT (source INT) WITH (kafka_topic='test_topic', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='1 SECOND');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT WHERE WINDOWSTART BETWEEN '2020-02-10T08:31:44.001+0000' AND '2020-02-10T08:31:44.011+0000';" + ], + "inputs": [ + {"topic": "test_topic", "value": {"source": 1}, "window": {"start": 1581323504000, "end": 1581323505000, "type": "time"}}, + {"topic": "test_topic", "value": {"source": 2}, "window": {"start": 1581323504001, "end": 1581323505001, "type": "time"}}, + {"topic": "test_topic", "value": {"source": 3}, "window": {"start": 1581323504011, "end": 1581323505011, "type": "time"}}, + {"topic": "test_topic", "value": {"source": 4}, "window": {"start": 1581323504012, "end": 1581323505012, "type": "time"}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"SOURCE": 2}, "window": {"start": 1581323504001, "end": 1581323505001, "type": "time"}}, + {"topic": "OUTPUT", "value": {"SOURCE": 3}, "window": {"start": 1581323504011, "end": 1581323505011, "type": "time"}} + ] + }, + { + "name": "comparison predicate with STRING ROWTIME containing TZ in WHERE", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME > '2019-01-01T00:00:00+0445';" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1546300800000}, + {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 0}, + {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 0}, + {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 1600000000000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"THING": 1}, "timestamp": 1546300800000}, + {"topic": "OUTPUT", "key": "5", "value": {"THING": 5}, "timestamp": 1600000000000} + ] + }, + { + "name": "nested comparison expression on STRING ROWTIME in WHERE", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2019-01-01T00:00:00' AND SOURCE=5;" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1546300800000}, + {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 0}, + {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 0}, + {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 1600000000000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "5", "value": {"THING": 5}, "timestamp": 1600000000000} + ] + }, + { + "name": "partial STRING ROWTIME", + "statements": [ + "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2018';" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1546300800000}, + {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 0}, + {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 0}, + {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, + {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 1600000000000} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"THING": 1}, "timestamp": 1546300800000}, + {"topic": "OUTPUT", "key": "5", "value": {"THING": 5}, "timestamp": 1600000000000} + ] + } + ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json b/ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json deleted file mode 100644 index d6c2c35b70e7..000000000000 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json +++ /dev/null @@ -1,101 +0,0 @@ -{ - "comments": [ - "Tests covering filters using ROWTIME" - ], - "tests": [ - { - "name": "test ROWTIME", - "statements": [ - "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME>'2018-01-01T00:00:00';" - ], - "inputs": [ - {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, - {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 0}, - {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 1546300808000}, - {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 1546300800000}, - {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, - {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 0} - ], - "outputs": [ - {"topic": "OUTPUT", "key": "2", "value": {"THING": 2}, "timestamp": 1546300808000}, - {"topic": "OUTPUT", "key": "3", "value": {"THING": 3}, "timestamp": 1546300800000} - ] - }, - { - "name": "test ROWTIME with BETWEEN", - "statements": [ - "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME BETWEEN '2018-01-01T00:00:00' AND '2019-12-31T23:59:59';" - ], - "inputs": [ - {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, - {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1546300808000}, - {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 0}, - {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 1536307808000}, - {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, - {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 1600000000000} - ], - "outputs": [ - {"topic": "OUTPUT", "key": "1", "value": {"THING": 1}, "timestamp": 1546300808000}, - {"topic": "OUTPUT", "key": "3", "value": {"THING": 3}, "timestamp": 1536307808000} - ] - }, - { - "name": "test ROWTIME with timezone", - "statements": [ - "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME > '2019-01-01T00:00:00+0445';" - ], - "inputs": [ - {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, - {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1546300800000}, - {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 0}, - {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 0}, - {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, - {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 1600000000000} - ], - "outputs": [ - {"topic": "OUTPUT", "key": "1", "value": {"THING": 1}, "timestamp": 1546300800000}, - {"topic": "OUTPUT", "key": "5", "value": {"THING": 5}, "timestamp": 1600000000000} - ] - }, - { - "name": "test ROWTIME with AND", - "statements": [ - "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2019-01-01T00:00:00' AND SOURCE=5;" - ], - "inputs": [ - {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, - {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1546300800000}, - {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 0}, - {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 0}, - {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, - {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 1600000000000} - ], - "outputs": [ - {"topic": "OUTPUT", "key": "5", "value": {"THING": 5}, "timestamp": 1600000000000} - ] - }, - { - "name": "test ROWTIME with inexact timestring", - "statements": [ - "CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2018';" - ], - "inputs": [ - {"topic": "test_topic", "key": "0", "value": {"source": null}, "timestamp": 0}, - {"topic": "test_topic", "key": "1", "value": {"source": 1}, "timestamp": 1546300800000}, - {"topic": "test_topic", "key": "2", "value": {"source": 2}, "timestamp": 0}, - {"topic": "test_topic", "key": "3", "value": {"source": 3}, "timestamp": 0}, - {"topic": "test_topic", "key": "4", "value": {"source": 4}, "timestamp": 0}, - {"topic": "test_topic", "key": "5", "value": {"source": 5}, "timestamp": 1600000000000} - ], - "outputs": [ - {"topic": "OUTPUT", "key": "1", "value": {"THING": 1}, "timestamp": 1546300800000}, - {"topic": "OUTPUT", "key": "5", "value": {"THING": 5}, "timestamp": 1600000000000} - ] - } - ] -} \ No newline at end of file