Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move statement re-write logic into engine #3400

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.QualifiedName;
import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.serde.SerdeOption;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@

package io.confluent.ksql.engine;

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.ddl.commands.CommandFactories;
import io.confluent.ksql.ddl.commands.DdlCommandExec;
import io.confluent.ksql.engine.rewrite.StatementRewriteForStruct;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommandResult;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
Expand All @@ -27,6 +31,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
Expand All @@ -39,7 +44,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
Expand All @@ -55,42 +59,49 @@ final class EngineContext {
private final DdlCommandExec ddlCommandExec;
private final QueryIdGenerator queryIdGenerator;
private final ProcessingLogContext processingLogContext;
private final KsqlParser parser = new DefaultKsqlParser();
private final KsqlParser parser;
private final BiConsumer<ServiceContext, QueryMetadata> outerOnQueryCloseCallback;
private final Map<QueryId, PersistentQueryMetadata> persistentQueries;
private final StatementRewriteForStruct rewriter;

private EngineContext(
static EngineContext create(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final MutableMetaStore metaStore,
final QueryIdGenerator queryIdGenerator,
final BiConsumer<ServiceContext, QueryMetadata> onQueryCloseCallback
) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.metaStore = Objects.requireNonNull(metaStore, "metaStore");
this.queryIdGenerator = Objects.requireNonNull(queryIdGenerator, "queryIdGenerator");
this.ddlCommandFactory = new CommandFactories(serviceContext, metaStore);
this.outerOnQueryCloseCallback = Objects
.requireNonNull(onQueryCloseCallback, "onQueryCloseCallback");
this.ddlCommandExec = new DdlCommandExec(metaStore);
this.persistentQueries = new ConcurrentHashMap<>();
this.processingLogContext = Objects
.requireNonNull(processingLogContext, "processingLogContext");
return new EngineContext(
serviceContext,
processingLogContext,
metaStore,
queryIdGenerator,
onQueryCloseCallback,
new DefaultKsqlParser(),
new StatementRewriteForStruct()
);
}

static EngineContext create(
@VisibleForTesting
EngineContext(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final MutableMetaStore metaStore,
final QueryIdGenerator queryIdGenerator,
final BiConsumer<ServiceContext, QueryMetadata> onQueryCloseCallback
final BiConsumer<ServiceContext, QueryMetadata> onQueryCloseCallback,
final KsqlParser parser,
final StatementRewriteForStruct rewriter
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
) {
return new EngineContext(
serviceContext,
processingLogContext,
metaStore,
queryIdGenerator,
onQueryCloseCallback);
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.metaStore = requireNonNull(metaStore, "metaStore");
this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator");
this.ddlCommandFactory = new CommandFactories(serviceContext, metaStore);
this.outerOnQueryCloseCallback = requireNonNull(onQueryCloseCallback, "onQueryCloseCallback");
this.ddlCommandExec = new DdlCommandExec(metaStore);
this.persistentQueries = new ConcurrentHashMap<>();
this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext");
this.parser = requireNonNull(parser, "parser");
this.rewriter = requireNonNull(rewriter, "rewriter");
}

EngineContext createSandbox(final ServiceContext serviceContext) {
Expand Down Expand Up @@ -132,7 +143,11 @@ List<ParsedStatement> parse(final String sql) {

PreparedStatement<?> prepare(final ParsedStatement stmt) {
try {
return parser.prepare(stmt, metaStore);
final PreparedStatement<?> prepared = parser.prepare(stmt, metaStore);

final Statement rewritten = rewriter.rewriteForStruct(prepared.getStatement());

return PreparedStatement.of(stmt.getStatementText(), rewritten);
} catch (final KsqlException e) {
throw e;
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Confluent Inc.
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand All @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.rewrite;
package io.confluent.ksql.engine.rewrite;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,39 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.rewrite;
package io.confluent.ksql.engine.rewrite;

import static io.confluent.ksql.util.KsqlConstants.DATE_TIME_PATTERN;
import static io.confluent.ksql.util.KsqlConstants.TIME_PATTERN;

import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.execution.expression.tree.BetweenPredicate;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.LongLiteral;
import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
import io.confluent.ksql.execution.expression.tree.StringLiteral;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;
import java.time.ZoneId;
import java.util.Objects;
import java.util.Optional;

public class StatementRewriteForRowtime {
private final Expression expression;

public StatementRewriteForRowtime(final Expression expression) {
this.expression = Objects.requireNonNull(expression, "expression");
@SuppressWarnings("MethodMayBeStatic") // Used for DI
public Expression rewriteForRowtime(final Expression expression) {
if (!requiresRewrite(expression)) {
return expression;
}
return new ExpressionTreeRewriter<>(
new OperatorPlugin()::process).rewrite(expression, null);
}

public static boolean requiresRewrite(final Expression expression) {
private static boolean requiresRewrite(final Expression expression) {
return expression.toString().contains("ROWTIME");
}

public Expression rewriteForRowtime() {
return new ExpressionTreeRewriter<>(
new OperatorPlugin()::process).rewrite(expression, null);
}

private static final class OperatorPlugin
extends VisitParentExpressionVisitor<Optional<Expression>, Context<Void>> {
private OperatorPlugin() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Confluent Inc.
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand All @@ -13,46 +13,47 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.rewrite;
package io.confluent.ksql.engine.rewrite;

import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.execution.expression.tree.DereferenceExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.QualifiedName;
import io.confluent.ksql.execution.expression.tree.StringLiteral;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.parser.tree.Explain;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Statement;
import java.util.Objects;
import java.util.Optional;

public final class StatementRewriteForStruct {

private final Statement statement;

public StatementRewriteForStruct(final Statement statement) {
this.statement = Objects.requireNonNull(statement, "statement");
}
@SuppressWarnings("MethodMayBeStatic") // Used for DI
public Statement rewriteForStruct(final Statement statement) {
if (!requiresRewrite(statement)) {
return statement;
}

public Statement rewriteForStruct() {
return (Statement) new StatementRewriter<>(
final StatementRewriter<Object> rewritter = new StatementRewriter<>(
(e, c) -> ExpressionTreeRewriter.rewriteWith(new Plugin()::process, e)
).rewrite(statement, null);
);

return (Statement) rewritter.rewrite(statement, null);
}

public static boolean requiresRewrite(final Statement statement) {
private static boolean requiresRewrite(final Statement statement) {
return statement instanceof Query
|| statement instanceof QueryContainer
|| statement instanceof Explain;
}

private static final class Plugin
extends VisitParentExpressionVisitor<Optional<Expression>, Context<Void>> {

private Plugin() {
super(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Confluent Inc.
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand All @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.rewrite;
package io.confluent.ksql.engine.rewrite;

import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.Type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.expression.tree.Expression;
Expand All @@ -36,8 +38,6 @@
import io.confluent.ksql.function.udaf.KudafInitializer;
import io.confluent.ksql.materialization.MaterializationInfo;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.engine.rewrite.StatementRewriteForRowtime;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
Expand Down Expand Up @@ -59,7 +60,6 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
import io.confluent.ksql.parser.rewrite.StatementRewriteForRowtime;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
Expand Down Expand Up @@ -332,11 +332,9 @@ public SchemaKStream<K> filter(
);
}

Expression rewriteTimeComparisonForFilter(final Expression expression) {
if (StatementRewriteForRowtime.requiresRewrite(expression)) {
return new StatementRewriteForRowtime(expression).rewriteForRowtime();
}
return expression;
static Expression rewriteTimeComparisonForFilter(final Expression expression) {
return new StatementRewriteForRowtime()
.rewriteForRowtime(expression);
}

public SchemaKStream<K> select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@

package io.confluent.ksql.util;

import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.QualifiedName;
import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter.Context;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void shouldThrowIfSelectContainsReversedStringConcatExpression() {
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"Non-aggregate SELECT expression(s) not part of GROUP BY: "
+ "[(ORDERS.ITEMID + FETCH_FIELD_FROM_STRUCT(ORDERS.ADDRESS, 'STREET'))]"
+ "[(ORDERS.ITEMID + ORDERS.ADDRESS->STREET)]"
);

// When:
Expand Down
Loading