-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: wrap timestamps in ROWTIME expressions with STRINGTOTIMESTAMP (#…
…3160) * docs(syntax): update docs for rowtime timestamp literals * feat: wrap timestamps in rowtime expressions with converter * docs: address review comments * refactor: add qtt tests and time zone support, change rowtime detection
- Loading branch information
Zara Lim
authored
Aug 8, 2019
1 parent
2bf8c70
commit 42acd78
Showing
6 changed files
with
394 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
101 changes: 101 additions & 0 deletions
101
ksql-functional-tests/src/test/resources/query-validation-tests/rowtime.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
{ | ||
"comments": [ | ||
"Tests covering filters using ROWTIME" | ||
], | ||
"tests": [ | ||
{ | ||
"name": "test ROWTIME", | ||
"statements": [ | ||
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", | ||
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME>'2018-01-01T00:00:00';" | ||
], | ||
"inputs": [ | ||
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 1546300808000}, | ||
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1546300800000}, | ||
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 0} | ||
], | ||
"outputs": [ | ||
{"topic": "OUTPUT", "key": 2, "value": {"THING": 2}, "timestamp": 1546300808000}, | ||
{"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1546300800000} | ||
] | ||
}, | ||
{ | ||
"name": "test ROWTIME with BETWEEN", | ||
"statements": [ | ||
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", | ||
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME BETWEEN '2018-01-01T00:00:00' AND '2019-12-31T23:59:59';" | ||
], | ||
"inputs": [ | ||
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300808000}, | ||
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 1536307808000}, | ||
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} | ||
], | ||
"outputs": [ | ||
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300808000}, | ||
{"topic": "OUTPUT", "key": 3, "value": {"THING": 3}, "timestamp": 1536307808000} | ||
] | ||
}, | ||
{ | ||
"name": "test ROWTIME with timezone", | ||
"statements": [ | ||
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", | ||
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME > '2019-01-01T00:00:00+0445';" | ||
], | ||
"inputs": [ | ||
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000}, | ||
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} | ||
], | ||
"outputs": [ | ||
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000}, | ||
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000} | ||
] | ||
}, | ||
{ | ||
"name": "test ROWTIME with AND", | ||
"statements": [ | ||
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", | ||
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2019-01-01T00:00:00' AND SOURCE=5;" | ||
], | ||
"inputs": [ | ||
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000}, | ||
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} | ||
], | ||
"outputs": [ | ||
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000} | ||
] | ||
}, | ||
{ | ||
"name": "test ROWTIME with inexact timestring", | ||
"statements": [ | ||
"CREATE STREAM TEST (source int) WITH (kafka_topic='test_topic', value_format='JSON');", | ||
"CREATE STREAM OUTPUT AS SELECT source AS THING FROM TEST WHERE ROWTIME >= '2018';" | ||
], | ||
"inputs": [ | ||
{"topic": "test_topic", "key": 0, "value": {"source": null}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 1, "value": {"source": 1}, "timestamp": 1546300800000}, | ||
{"topic": "test_topic", "key": 2, "value": {"source": 2}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 3, "value": {"source": 3}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 4, "value": {"source": 4}, "timestamp": 0}, | ||
{"topic": "test_topic", "key": 5, "value": {"source": 5}, "timestamp": 1600000000000} | ||
], | ||
"outputs": [ | ||
{"topic": "OUTPUT", "key": 1, "value": {"THING": 1}, "timestamp": 1546300800000}, | ||
{"topic": "OUTPUT", "key": 5, "value": {"THING": 5}, "timestamp": 1600000000000} | ||
] | ||
} | ||
] | ||
} |
165 changes: 165 additions & 0 deletions
165
ksql-parser/src/main/java/io/confluent/ksql/parser/rewrite/StatementRewriteForRowtime.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
/* | ||
* Copyright 2019 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.parser.rewrite; | ||
|
||
import io.confluent.ksql.parser.tree.BetweenPredicate; | ||
import io.confluent.ksql.parser.tree.ComparisonExpression; | ||
import io.confluent.ksql.parser.tree.DereferenceExpression; | ||
import io.confluent.ksql.parser.tree.Expression; | ||
import io.confluent.ksql.parser.tree.FunctionCall; | ||
import io.confluent.ksql.parser.tree.Node; | ||
import io.confluent.ksql.parser.tree.QualifiedName; | ||
import io.confluent.ksql.parser.tree.StringLiteral; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
public class StatementRewriteForRowtime { | ||
private final Expression expression; | ||
|
||
public StatementRewriteForRowtime(final Expression expression) { | ||
this.expression = Objects.requireNonNull(expression, "expression"); | ||
} | ||
|
||
public static boolean requiresRewrite(final Expression expression) { | ||
return expression.toString().contains("ROWTIME"); | ||
} | ||
|
||
public Expression rewriteForRowtime() { | ||
return (Expression) new RewriteWithTimestampTransform().process(expression, null); | ||
} | ||
|
||
private static class TimestampRewriter extends StatementRewriter { | ||
private static final String DATE_PATTERN = "yyyy-MM-dd"; | ||
private static final String TIME_PATTERN = "HH:mm:ss.SSS"; | ||
private static final String PATTERN = DATE_PATTERN + "'T'" + TIME_PATTERN; | ||
|
||
@Override | ||
public Expression visitFunctionCall(final FunctionCall node, final Object context) { | ||
return (Expression) new StatementRewriter().process(node, context); | ||
} | ||
|
||
@Override | ||
public Node visitStringLiteral(final StringLiteral node, final Object context) { | ||
if (!node.getValue().equals("ROWTIME")) { | ||
return new FunctionCall( | ||
QualifiedName.of("STRINGTOTIMESTAMP"), | ||
getFunctionArgs(node.getValue())); | ||
} | ||
return node; | ||
} | ||
|
||
private List<Expression> getFunctionArgs(final String datestring) { | ||
final List<Expression> args = new ArrayList<>(); | ||
final String date; | ||
final String time; | ||
final String timezone; | ||
if (datestring.contains("T")) { | ||
date = datestring.substring(0, datestring.indexOf('T')); | ||
final String withTimezone = completeTime(datestring.substring(datestring.indexOf('T') + 1)); | ||
timezone = getTimezone(withTimezone); | ||
time = completeTime(withTimezone.substring(0, timezone.length())); | ||
} else { | ||
date = completeDate(datestring); | ||
time = completeTime(""); | ||
timezone = ""; | ||
} | ||
|
||
if (timezone.length() > 0) { | ||
args.add(new StringLiteral(date + "T" + time)); | ||
args.add(new StringLiteral(PATTERN)); | ||
args.add(new StringLiteral(timezone)); | ||
} else { | ||
args.add(new StringLiteral(date + "T" + time)); | ||
args.add(new StringLiteral(PATTERN)); | ||
} | ||
return args; | ||
} | ||
|
||
private String getTimezone(final String time) { | ||
if (time.contains("+")) { | ||
return time.substring(time.indexOf('+')); | ||
} else if (time.contains("-")) { | ||
return time.substring(time.indexOf('-')); | ||
} else { | ||
return ""; | ||
} | ||
} | ||
|
||
private String completeDate(final String date) { | ||
final String[] parts = date.split("-"); | ||
if (parts.length == 1) { | ||
return date + "-01-01"; | ||
} else if (parts.length == 2) { | ||
return date + "-01"; | ||
} else { | ||
// It is either a complete date or an incorrectly formatted one. | ||
// In the latter case, we can pass the incorrectly formed string | ||
// to STRINGTITIMESTAMP which will deal with the error handling. | ||
return date; | ||
} | ||
} | ||
|
||
private String completeTime(final String time) { | ||
if (time.length() >= TIME_PATTERN.length()) { | ||
return time; | ||
} | ||
return time + TIME_PATTERN.substring(time.length()).replaceAll("[a-zA-Z]", "0"); | ||
} | ||
} | ||
|
||
private static class RewriteWithTimestampTransform extends StatementRewriter { | ||
@Override | ||
public Expression visitBetweenPredicate(final BetweenPredicate node, final Object context) { | ||
if (StatementRewriteForRowtime.requiresRewrite(node)) { | ||
return new BetweenPredicate( | ||
node.getLocation(), | ||
(Expression) new TimestampRewriter().process(node.getValue(), context), | ||
(Expression) new TimestampRewriter().process(node.getMin(), context), | ||
(Expression) new TimestampRewriter().process(node.getMax(), context)); | ||
} | ||
return new BetweenPredicate( | ||
node.getLocation(), | ||
(Expression) process(node.getValue(), context), | ||
(Expression) process(node.getMin(), context), | ||
(Expression) process(node.getMax(), context)); | ||
} | ||
|
||
@Override | ||
public Expression visitComparisonExpression( | ||
final ComparisonExpression node, | ||
final Object context) { | ||
if (expressionIsRowtime(node.getLeft()) || expressionIsRowtime(node.getRight())) { | ||
return new ComparisonExpression( | ||
node.getLocation(), | ||
node.getType(), | ||
(Expression) new TimestampRewriter().process(node.getLeft(), context), | ||
(Expression) new TimestampRewriter().process(node.getRight(), context)); | ||
} | ||
return new ComparisonExpression( | ||
node.getLocation(), | ||
node.getType(), | ||
(Expression) process(node.getLeft(), context), | ||
(Expression) process(node.getRight(), context)); | ||
} | ||
} | ||
|
||
private static boolean expressionIsRowtime(final Expression node) { | ||
return (node instanceof DereferenceExpression) | ||
&& ((DereferenceExpression) node).getFieldName().equals("ROWTIME"); | ||
} | ||
} |
Oops, something went wrong.