From 808b04ebbb95cfaedda6429138957a322c9a4b73 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 26 Nov 2019 15:29:18 -0800 Subject: [PATCH] fix: unify behavior for PARTITION BY and GROUP BY BREAKING CHANGE: this change makes it so that PARTITION BY statements use the _source_ schema, not the value/projection schema, when selecting the value to partition by. This is consistent with GROUP BY, and standard SQL for GROUP by. Any statement that previously used PARTITION BY may need to be reworked. --- docs/developer-guide/syntax-reference.rst | 2 +- .../io/confluent/ksql/analyzer/Analyzer.java | 15 ++- .../ksql/engine/rewrite/AstSanitizer.java | 6 +- .../engine/rewrite/StatementRewriter.java | 18 ++-- .../ksql/planner/LogicalPlanner.java | 69 +++++++------- .../plan/KsqlStructuredDataOutputNode.java | 49 +--------- .../ksql/planner/plan/RepartitionNode.java | 76 +++++++++++++++ .../engine/rewrite/StatementRewriterTest.java | 64 +++++++++---- .../physical/PhysicalPlanBuilderTest.java | 8 +- .../KsqlStructuredDataOutputNodeTest.java | 92 ------------------- ...partition_by_(-)___key_in_value___aliasing | 69 -------------- ...tition_by_(-)___key_in_value___no_aliasing | 24 ++--- ...y_(different)___key_in_value___no_aliasing | 24 ++--- ...rtition-by_-_aliased_key_field_-_same_name | 19 +--- .../0_6_0-pre/partition-by_-_partition_by | 24 ++--- .../partition-by_-_partition_by_ROWTIME | 24 ++--- ..._partition_by_with_null_partition_by_value | 24 ++--- ...artition-by_-_partition_by_with_null_value | 24 ++--- ..._-_partition_by_with_projection_select_all | 24 ++--- ...-_partition_by_with_projection_select_some | 24 ++--- .../query-validation-tests/key-field.json | 88 +++++++++--------- .../query-validation-tests/partition-by.json | 21 ++--- .../io/confluent/ksql/parser/SqlBase.g4 | 6 +- .../io/confluent/ksql/parser/AstBuilder.java | 7 +- .../confluent/ksql/parser/SqlFormatter.java | 9 +- .../ksql/parser/tree/CreateAsSelect.java | 18 +--- .../parser/tree/CreateStreamAsSelect.java | 13 +-- .../ksql/parser/tree/CreateTableAsSelect.java | 4 +- .../ksql/parser/tree/InsertInto.java | 18 ++-- .../io/confluent/ksql/parser/tree/Query.java | 16 ++++ .../io/confluent/ksql/parser/tree/Sink.java | 36 +------- .../ksql/parser/SqlFormatterTest.java | 8 +- .../parser/tree/CreateStreamAsSelectTest.java | 20 ++-- .../parser/tree/CreateTableAsSelectTest.java | 2 +- .../ksql/parser/tree/InsertIntoTest.java | 16 ++-- .../ksql/parser/tree/ParserModelTest.java | 1 + .../confluent/ksql/parser/tree/QueryTest.java | 31 ++++--- .../rest/server/StandaloneExecutorTest.java | 10 +- .../streaming/WSQueryEndpointTest.java | 1 + 39 files changed, 427 insertions(+), 577 deletions(-) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java delete mode 100644 ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___aliasing diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 24e6c7c452cb..8632b971af5e 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -635,7 +635,7 @@ its corresponding topic. If the PARTITION BY clause is present, then the resulting stream will have the specified column as its key. The `column_name` must be present -in the `select_expr`. For more information, see :ref:`partition-data-to-enable-joins`. +in the `from_stream`. For more information, see :ref:`partition-data-to-enable-joins`. For joins, the key of the resulting stream will be the value from the column from the left stream that was used in the join criteria. This column will be diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 495f730671a3..fc2d8f80f494 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -164,9 +164,7 @@ private final class Visitor extends DefaultTraversalVisitor { private void analyzeNonStdOutSink(final Sink sink) { analysis.setProperties(sink.getProperties()); - sink.getPartitionBy() - .map(name -> ColumnRef.withoutSource(name.name())) - .ifPresent(analysis::setPartitionBy); + setSerdeOptions(sink); @@ -317,6 +315,7 @@ protected AstNode visitQuery( node.getWhere().ifPresent(this::analyzeWhere); node.getGroupBy().ifPresent(this::analyzeGroupBy); + node.getPartitionBy().ifPresent(this::analyzePartitionBy); node.getWindow().ifPresent(this::analyzeWindowExpression); node.getHaving().ifPresent(this::analyzeHaving); node.getLimit().ifPresent(analysis::setLimitClause); @@ -543,6 +542,16 @@ private void analyzeGroupBy(final GroupBy groupBy) { } } + private void analyzePartitionBy(final Expression partitionBy) { + if (partitionBy instanceof ColumnReferenceExp) { + analysis.setPartitionBy(((ColumnReferenceExp) partitionBy).getReference()); + return; + } + + throw new KsqlException( + "Expected partition by to be a valid column but got " + partitionBy); + } + private void analyzeWindowExpression(final WindowExpression windowExpression) { analysis.setWindowExpression(windowExpression); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/AstSanitizer.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/AstSanitizer.java index 872fc9f9fded..414d050d94bf 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/AstSanitizer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/AstSanitizer.java @@ -88,8 +88,7 @@ protected Optional visitCreateStreamAsSelect( node.getName(), (Query) ctx.process(node.getQuery()), node.isNotExists(), - node.getProperties(), - node.getPartitionByColumn() + node.getProperties() ) ); } @@ -117,8 +116,7 @@ protected Optional visitInsertInto( new InsertInto( node.getLocation(), node.getTarget(), - (Query) ctx.process(node.getQuery()), - node.getPartitionByColumn() + (Query) ctx.process(node.getQuery()) ) ); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java index 531c32187c5c..626a49b2c69a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java @@ -188,6 +188,10 @@ protected AstNode visitQuery(final Query node, final C context) { final Optional groupBy = node.getGroupBy() .map(exp -> ((GroupBy) rewriter.apply(exp, context))); + // don't rewrite the partitionBy because we expect it to be + // exactly as it was (a single, un-aliased, column reference) + final Optional partitionBy = node.getPartitionBy(); + final Optional having = node.getHaving() .map(exp -> (processExpression(exp, context))); @@ -198,6 +202,7 @@ protected AstNode visitQuery(final Query node, final C context) { windowExpression, where, groupBy, + partitionBy, having, node.getResultMaterialization(), node.isPullQuery(), @@ -364,16 +369,12 @@ protected AstNode visitCreateStreamAsSelect( return result.get(); } - final Optional partitionBy = node.getPartitionByColumn() - .map(exp -> processExpression(exp, context)); - return new CreateStreamAsSelect( node.getLocation(), node.getName(), (Query) rewriter.apply(node.getQuery(), context), node.isNotExists(), - node.getProperties(), - partitionBy + node.getProperties() ); } @@ -416,14 +417,11 @@ protected AstNode visitInsertInto(final InsertInto node, final C context) { return result.get(); } - final Optional rewrittenPartitionBy = node.getPartitionByColumn() - .map(exp -> processExpression(exp, context)); - return new InsertInto( node.getLocation(), node.getTarget(), - (Query) rewriter.apply(node.getQuery(), context), - rewrittenPartitionBy); + (Query) rewriter.apply(node.getQuery(), context) + ); } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 218e866c961b..64dd8e0fe2d9 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -38,9 +38,9 @@ import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.PlanNodeId; import io.confluent.ksql.planner.plan.ProjectNode; +import io.confluent.ksql.planner.plan.RepartitionNode; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.ColumnRef; -import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; import io.confluent.ksql.schema.ksql.types.SqlType; @@ -81,6 +81,10 @@ public OutputNode buildPlan() { currentNode = buildFilterNode(currentNode, analysis.getWhereExpression().get()); } + if (analysis.getPartitionBy().isPresent()) { + currentNode = buildRepartitionNode(currentNode, analysis.getPartitionBy().get()); + } + if (!analysis.getTableFunctions().isEmpty()) { currentNode = buildFlatMapNode(currentNode); } @@ -111,25 +115,13 @@ private OutputNode buildOutputNode(final PlanNode sourcePlanNode) { final Into intoDataSource = analysis.getInto().get(); - final Optional partitionByField = analysis.getPartitionBy(); - - partitionByField.ifPresent(keyName -> - inputSchema.findValueColumn(keyName) - .orElseThrow(() -> new KsqlException( - "Column " + keyName.name().toString(FormatOptions.noEscape()) - + " does not exist in the result schema. Error in Partition By clause.") - )); - - final KeyField keyField = buildOutputKeyField(sourcePlanNode); - return new KsqlStructuredDataOutputNode( new PlanNodeId(intoDataSource.getName().name()), sourcePlanNode, inputSchema, extractionPolicy, - keyField, + sourcePlanNode.getKeyField(), intoDataSource.getKsqlTopic(), - partitionByField, analysis.getLimitClause(), intoDataSource.isCreate(), analysis.getSerdeOptions(), @@ -137,30 +129,6 @@ private OutputNode buildOutputNode(final PlanNode sourcePlanNode) { ); } - private KeyField buildOutputKeyField( - final PlanNode sourcePlanNode - ) { - final KeyField sourceKeyField = sourcePlanNode.getKeyField(); - - final Optional partitionByField = analysis.getPartitionBy(); - if (!partitionByField.isPresent()) { - return sourceKeyField; - } - - final ColumnRef partitionBy = partitionByField.get(); - final LogicalSchema schema = sourcePlanNode.getSchema(); - - if (schema.isMetaColumn(partitionBy.name())) { - return KeyField.none(); - } - - if (schema.isKeyColumn(partitionBy.name())) { - return sourceKeyField; - } - - return KeyField.of(partitionBy); - } - private TimestampExtractionPolicy getTimestampExtractionPolicy( final LogicalSchema inputSchema, final Analysis analysis @@ -229,6 +197,31 @@ private static FilterNode buildFilterNode( return new FilterNode(new PlanNodeId("Filter"), sourcePlanNode, filterExpression); } + private static RepartitionNode buildRepartitionNode( + final PlanNode sourceNode, + final ColumnRef partitionBy + ) { + if (!sourceNode.getSchema().withoutAlias().findValueColumn(partitionBy).isPresent()) { + throw new KsqlException("Invalid identifier for PARTITION BY clause: " + partitionBy); + } + + final KeyField keyField; + final LogicalSchema schema = sourceNode.getSchema(); + if (schema.isMetaColumn(partitionBy.name())) { + keyField = KeyField.none(); + } else if (schema.isKeyColumn(partitionBy.name())) { + keyField = sourceNode.getKeyField(); + } else { + keyField = KeyField.of(partitionBy); + } + + return new RepartitionNode( + new PlanNodeId("PartitionBy"), + sourceNode, + partitionBy, + keyField); + } + private FlatMapNode buildFlatMapNode(final PlanNode sourcePlanNode) { return new FlatMapNode(new PlanNodeId("FlatMap"), sourcePlanNode, functionRegistry, analysis); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java index f1ea8b8f93b9..bff4fcc8f893 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java @@ -26,14 +26,10 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.QueryIdGenerator; -import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.structured.SchemaKStream; -import io.confluent.ksql.structured.SchemaKTable; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; -import java.util.Objects; -import java.util.Optional; import java.util.OptionalInt; import java.util.Set; @@ -41,7 +37,6 @@ public class KsqlStructuredDataOutputNode extends OutputNode { private final KsqlTopic ksqlTopic; private final KeyField keyField; - private final Optional partitionByField; private final boolean doCreateInto; private final Set serdeOptions; private final SourceName intoSourceName; @@ -54,7 +49,6 @@ public KsqlStructuredDataOutputNode( final TimestampExtractionPolicy timestampExtractionPolicy, final KeyField keyField, final KsqlTopic ksqlTopic, - final Optional partitionByField, final OptionalInt limit, final boolean doCreateInto, final Set serdeOptions, @@ -76,11 +70,8 @@ public KsqlStructuredDataOutputNode( this.keyField = requireNonNull(keyField, "keyField") .validateKeyExistsIn(schema); this.ksqlTopic = requireNonNull(ksqlTopic, "ksqlTopic"); - this.partitionByField = Objects.requireNonNull(partitionByField, "partitionByField"); this.doCreateInto = doCreateInto; this.intoSourceName = requireNonNull(intoSourceName, "intoSourceName"); - - validatePartitionByField(); } public boolean isDoCreateInto() { @@ -119,16 +110,11 @@ public KeyField getKeyField() { @Override public SchemaKStream buildStream(final KsqlQueryBuilder builder) { final PlanNode source = getSource(); - final SchemaKStream schemaKStream = source.buildStream(builder); + final SchemaKStream schemaKStream = source.buildStream(builder); final QueryContext.Stacker contextStacker = builder.buildNodeContext(getId().toString()); - final SchemaKStream result = createOutputStream( - schemaKStream, - contextStacker - ); - - return result.into( + return schemaKStream.into( getKsqlTopic().getKafkaTopicName(), getSchema(), getKsqlTopic().getValueFormat(), @@ -136,35 +122,4 @@ public SchemaKStream buildStream(final KsqlQueryBuilder builder) { contextStacker ); } - - private SchemaKStream createOutputStream( - final SchemaKStream schemaKStream, - final QueryContext.Stacker contextStacker - ) { - if (schemaKStream instanceof SchemaKTable) { - return schemaKStream; - } - - if (!partitionByField.isPresent()) { - return schemaKStream; - } - - return schemaKStream.selectKey(partitionByField.get(), false, contextStacker); - } - - private void validatePartitionByField() { - if (!partitionByField.isPresent()) { - return; - } - - final ColumnRef fieldName = partitionByField.get(); - - if (getSchema().isMetaColumn(fieldName.name()) || getSchema().isKeyColumn(fieldName.name())) { - return; - } - - if (!keyField.ref().equals(Optional.of(fieldName))) { - throw new IllegalArgumentException("keyField must match partition by field"); - } - } } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java new file mode 100644 index 000000000000..018459c63426 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/RepartitionNode.java @@ -0,0 +1,76 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.planner.plan; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.plan.SelectExpression; +import io.confluent.ksql.metastore.model.KeyField; +import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.schema.ksql.ColumnRef; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.structured.SchemaKStream; +import java.util.List; +import java.util.Objects; + +@Immutable +public class RepartitionNode extends PlanNode { + + private final PlanNode source; + private final ColumnRef partitionBy; + private final KeyField keyField; + + public RepartitionNode(PlanNodeId id, PlanNode source, ColumnRef partitionBy, KeyField keyField) { + super(id, source.getNodeOutputType()); + final SourceName alias = source.getTheSourceNode().getAlias(); + this.source = Objects.requireNonNull(source, "source"); + this.partitionBy = Objects.requireNonNull(partitionBy, "partitionBy").withSource(alias); + this.keyField = Objects.requireNonNull(keyField, "keyField"); + } + + @Override + public LogicalSchema getSchema() { + return source.getSchema(); + } + + @Override + public KeyField getKeyField() { + return keyField; + } + + @Override + public List getSources() { + return ImmutableList.of(source); + } + + @Override + public List getSelectExpressions() { + return source.getSelectExpressions(); + } + + @Override + protected int getPartitions(KafkaTopicClient kafkaTopicClient) { + return source.getPartitions(kafkaTopicClient); + } + + @Override + public SchemaKStream buildStream(KsqlQueryBuilder builder) { + return source.buildStream(builder) + .selectKey(partitionBy, true, builder.buildNodeContext(getId().toString())); + } +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java index bd643c782d79..869f30f9e4ef 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java @@ -166,6 +166,7 @@ private Query givenQuery( final Optional window, final Optional where, final Optional groupBy, + final Optional partitionBy, final Optional having ) { when(mockRewriter.apply(select, context)).thenReturn(rewrittenSelect); @@ -177,6 +178,7 @@ private Query givenQuery( window, where, groupBy, + partitionBy, having, resultMaterialization, false, @@ -188,7 +190,7 @@ private Query givenQuery( public void shouldRewriteQuery() { // Given: final Query query = - givenQuery(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); + givenQuery(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); // When: final AstNode rewritten = rewriter.rewrite(query, context); @@ -202,6 +204,7 @@ public void shouldRewriteQuery() { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), resultMaterialization, false, optionalInt)) @@ -223,7 +226,7 @@ public void shouldRewriteQueryUsingPlugin() { public void shouldRewriteQueryWithFilter() { // Given: final Query query = - givenQuery(Optional.empty(), Optional.of(expression), Optional.empty(), Optional.empty()); + givenQuery(Optional.empty(), Optional.of(expression), Optional.empty(), Optional.empty(), Optional.empty()); when(expressionRewriter.apply(expression, context)).thenReturn(rewrittenExpression); // When: @@ -238,6 +241,7 @@ public void shouldRewriteQueryWithFilter() { Optional.of(rewrittenExpression), Optional.empty(), Optional.empty(), + Optional.empty(), resultMaterialization, false, optionalInt)) @@ -250,7 +254,7 @@ public void shouldRewriteQueryWithGroupBy() { final GroupBy groupBy = mock(GroupBy.class); final GroupBy rewrittenGroupBy = mock(GroupBy.class); final Query query = - givenQuery(Optional.empty(), Optional.empty(), Optional.of(groupBy), Optional.empty()); + givenQuery(Optional.empty(), Optional.empty(), Optional.of(groupBy), Optional.empty(), Optional.empty()); when(mockRewriter.apply(groupBy, context)).thenReturn(rewrittenGroupBy); // When: @@ -265,6 +269,32 @@ public void shouldRewriteQueryWithGroupBy() { Optional.empty(), Optional.of(rewrittenGroupBy), Optional.empty(), + Optional.empty(), + resultMaterialization, + false, + optionalInt)) + ); + } + + @Test + public void shouldNotRewriteQueryWithPartitionBy() { + // Given: + final Query query = + givenQuery(Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(expression), Optional.empty()); + + // When: + final AstNode rewritten = rewriter.rewrite(query, context); + + // Then: + assertThat(rewritten, equalTo(new Query( + location, + rewrittenSelect, + rewrittenRelation, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(expression), + Optional.empty(), resultMaterialization, false, optionalInt)) @@ -277,7 +307,8 @@ public void shouldRewriteQueryWithWindow() { final WindowExpression window = mock(WindowExpression.class); final WindowExpression rewrittenWindow = mock(WindowExpression.class); final Query query = - givenQuery(Optional.of(window), Optional.empty(), Optional.empty(), Optional.empty()); + givenQuery(Optional.of(window), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); when(mockRewriter.apply(window, context)).thenReturn(rewrittenWindow); // When: @@ -292,6 +323,7 @@ public void shouldRewriteQueryWithWindow() { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), resultMaterialization, false, optionalInt)) @@ -302,7 +334,7 @@ public void shouldRewriteQueryWithWindow() { public void shouldRewriteQueryWithHaving() { // Given: final Query query = - givenQuery(Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(expression)); + givenQuery(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(expression)); when(expressionRewriter.apply(expression, context)).thenReturn(rewrittenExpression); // When: @@ -316,6 +348,7 @@ public void shouldRewriteQueryWithHaving() { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.of(rewrittenExpression), resultMaterialization, false, @@ -531,8 +564,7 @@ public void shouldRewriteCSAS() { sourceName, query, false, - csasProperties, - Optional.empty() + csasProperties ); when(mockRewriter.apply(query, context)).thenReturn(rewrittenQuery); @@ -546,8 +578,7 @@ public void shouldRewriteCSAS() { sourceName, rewrittenQuery, false, - csasProperties, - Optional.empty() + csasProperties ) ) ); @@ -560,8 +591,7 @@ public void shouldRewriteCSASWithPartitionBy() { sourceName, query, false, - csasProperties, - Optional.of(expression) + csasProperties ); when(mockRewriter.apply(query, context)).thenReturn(rewrittenQuery); when(expressionRewriter.apply(expression, context)).thenReturn(rewrittenExpression); @@ -576,8 +606,7 @@ public void shouldRewriteCSASWithPartitionBy() { sourceName, rewrittenQuery, false, - csasProperties, - Optional.of(rewrittenExpression) + csasProperties ) ) ); @@ -684,7 +713,7 @@ public void shouldRewriteCreateTableAsSelectUsingPlugin() { @Test public void shouldRewriteInsertInto() { // Given: - final InsertInto ii = new InsertInto(location, sourceName, query, Optional.empty()); + final InsertInto ii = new InsertInto(location, sourceName, query); when(mockRewriter.apply(query, context)).thenReturn(rewrittenQuery); // When: @@ -693,14 +722,14 @@ public void shouldRewriteInsertInto() { // Then: assertThat( rewritten, - equalTo(new InsertInto(location, sourceName, rewrittenQuery, Optional.empty())) + equalTo(new InsertInto(location, sourceName, rewrittenQuery)) ); } @Test public void shouldRewriteInsertIntoWithPartitionBy() { // Given: - final InsertInto ii = new InsertInto(location, sourceName, query, Optional.of(expression)); + final InsertInto ii = new InsertInto(location, sourceName, query); when(mockRewriter.apply(query, context)).thenReturn(rewrittenQuery); when(expressionRewriter.apply(expression, context)).thenReturn(rewrittenExpression); @@ -714,8 +743,7 @@ public void shouldRewriteInsertIntoWithPartitionBy() { new InsertInto( location, sourceName, - rewrittenQuery, - Optional.of(rewrittenExpression) + rewrittenQuery ) ) ); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index bd65a16cd6c0..90ce230238d6 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -291,10 +291,10 @@ public void shouldRekeyIfPartitionByDoesNotMatchResultKey() { assertThat(lines.length, equalTo(4)); assertThat(lines[0], equalTo(" > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 " + "DOUBLE] | Logger: InsertQuery_1.S1")); - assertThat(lines[1], - equalTo("\t\t > [ REKEY ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] " - + "| Logger: InsertQuery_1.S1")); - assertThat(lines[2], equalTo("\t\t\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING" + assertThat(lines[2], + containsString("[ REKEY ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] " + + "| Logger: InsertQuery_1.PartitionBy")); + assertThat(lines[1], containsString("[ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING" + ", COL2 DOUBLE] | Logger: InsertQuery_1.Project")); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java index 04650c209d33..2fb9c640c9d9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java @@ -106,14 +106,12 @@ public class KsqlStructuredDataOutputNodeTest { private KsqlStructuredDataOutputNode outputNode; private LogicalSchema schema; - private Optional partitionBy; private boolean createInto; @SuppressWarnings("unchecked") @Before public void before() { schema = SCHEMA; - partitionBy = Optional.empty(); createInto = true; when(queryIdGenerator.getNext()).thenReturn(QUERY_ID_VALUE); @@ -137,40 +135,6 @@ public void before() { buildNode(); } - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfPartitionByAndKeyFieldNone() { - // When: - new KsqlStructuredDataOutputNode( - new PlanNodeId("0"), - sourceNode, - SCHEMA, - new LongColumnTimestampExtractionPolicy(ColumnRef.withoutSource(ColumnName.of("timestamp"))), - KeyField.none(), - ksqlTopic, - Optional.of(ColumnRef.withoutSource(ColumnName.of("something"))), - OptionalInt.empty(), - false, - SerdeOption.none(), - SourceName.of("0")); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfPartitionByDoesNotMatchKeyField() { - // When: - new KsqlStructuredDataOutputNode( - new PlanNodeId("0"), - sourceNode, - SCHEMA, - new LongColumnTimestampExtractionPolicy(ColumnRef.withoutSource(ColumnName.of("timestamp"))), - KeyField.of(Optional.of(ColumnRef.withoutSource(ColumnName.of("something else")))), - ksqlTopic, - Optional.of(ColumnRef.withoutSource(ColumnName.of("something"))), - OptionalInt.empty(), - false, - SerdeOption.none(), - SourceName.of("0")); - } - @Test public void shouldBuildSourceNode() { // When: @@ -180,60 +144,6 @@ public void shouldBuildSourceNode() { verify(sourceNode).buildStream(ksqlStreamBuilder); } - @Test - public void shouldPartitionByFieldNameInPartitionByProperty() { - // Given: - givenNodePartitioningByKey("key"); - - // When: - final SchemaKStream result = outputNode.buildStream(ksqlStreamBuilder); - - // Then: - verify(sourceStream).selectKey( - KEY_FIELD.ref().get(), - false, - new QueryContext.Stacker().push(PLAN_NODE_ID.toString()) - ); - - assertThat(result, is(sameInstance(sinkStreamWithKeySelected))); - } - - @Test - public void shouldPartitionByRowKey() { - // Given: - givenNodePartitioningByKey("ROWKEY"); - - // When: - final SchemaKStream result = outputNode.buildStream(ksqlStreamBuilder); - - // Then: - verify(sourceStream).selectKey( - ColumnRef.withoutSource(ColumnName.of("ROWKEY")), - false, - new QueryContext.Stacker().push(PLAN_NODE_ID.toString()) - ); - - assertThat(result, is(sameInstance(sinkStreamWithKeySelected))); - } - - @Test - public void shouldPartitionByRowTime() { - // Given: - givenNodePartitioningByKey("ROWTIME"); - - // When: - final SchemaKStream result = outputNode.buildStream(ksqlStreamBuilder); - - // Then: - verify(sourceStream).selectKey( - ColumnRef.withoutSource(ColumnName.of("ROWTIME")), - false, - new QueryContext.Stacker().push(PLAN_NODE_ID.toString()) - ); - - assertThat(result, is(sameInstance(sinkStreamWithKeySelected))); - } - @Test public void shouldComputeQueryIdCorrectlyForStream() { // When: @@ -314,7 +224,6 @@ private void givenInsertIntoNode() { } private void givenNodePartitioningByKey(final String field) { - this.partitionBy = Optional.of(ColumnRef.withoutSource(ColumnName.of(field))); buildNode(); } @@ -331,7 +240,6 @@ private void buildNode() { new LongColumnTimestampExtractionPolicy(ColumnRef.withoutSource(ColumnName.of("timestamp"))), KEY_FIELD, ksqlTopic, - partitionBy, OptionalInt.empty(), createInto, SerdeOption.none(), diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___aliasing b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___aliasing deleted file mode 100644 index be8884035959..000000000000 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___aliasing +++ /dev/null @@ -1,69 +0,0 @@ -{ - "ksql.extension.dir" : "ext", - "ksql.streams.cache.max.bytes.buffering" : "0", - "ksql.security.extension.class" : null, - "ksql.transient.prefix" : "transient_", - "ksql.persistence.wrap.single.values" : "true", - "ksql.named.internal.topics" : "on", - "ksql.windowed.session.key.legacy" : "false", - "ksql.query.stream.groupby.rowkey.repartition" : "false", - "ksql.schema.registry.url" : "http://localhost:8081", - "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", - "ksql.output.topic.name.prefix" : "", - "ksql.streams.auto.offset.reset" : "earliest", - "ksql.sink.partitions" : null, - "ksql.connect.url" : "http://localhost:8083", - "ksql.avro.maps.named" : "true", - "ksql.service.id" : "some.ksql.service.id", - "ksql.internal.topic.replicas" : "1", - "ksql.insert.into.values.enabled" : "true", - "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", - "ksql.access.validator.enable" : "auto", - "ksql.streams.bootstrap.servers" : "localhost:0", - "ksql.streams.commit.interval.ms" : "2000", - "ksql.metric.reporters" : "", - "ksql.streams.auto.commit.interval.ms" : "0", - "ksql.sink.replicas" : null, - "ksql.metrics.extension" : null, - "ksql.streams.topology.optimization" : "all", - "ksql.streams.num.stream.threads" : "4", - "ksql.metrics.tags.custom" : "", - "ksql.udfs.enabled" : "true", - "ksql.udf.enable.security.manager" : "true", - "ksql.functions.substring.legacy.args" : "false", - "ksql.connect.worker.config" : "", - "ksql.sink.window.change.log.additional.retention" : "1000000", - "ksql.query.inject.legacy.map.values.node" : "false", - "ksql.udf.collect.metrics" : "false", - "ksql.persistent.prefix" : "query_", - "ksql.query.persistent.active.limit" : "2147483647" -} -CONFIGS_END -CSAS_OUTPUT_0.KsqlTopic.source = STRUCT NOT NULL -CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL -SCHEMAS_END -Topologies: - Sub-topology: 0 - Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) - --> KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 - <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 - <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) - --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 - Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) - --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 - Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) - <-- KSTREAM-MAPVALUES-0000000006 - diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___no_aliasing b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___no_aliasing index 6832c784b602..2bb8c2ad95f8 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___no_aliasing +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_null___partition_by_(-)___key_in_value___no_aliasing @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_set___partition_by_(different)___key_in_value___no_aliasing b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_set___partition_by_(different)___key_in_value___no_aliasing index 6832c784b602..2bb8c2ad95f8 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_set___partition_by_(different)___key_in_value___no_aliasing +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/key-field_-_stream___initially_set___partition_by_(different)___key_in_value___no_aliasing @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_aliased_key_field_-_same_name b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_aliased_key_field_-_same_name index cca99ebd5e4e..83a754624e9b 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_aliased_key_field_-_same_name +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_aliased_key_field_-_same_name @@ -50,20 +50,11 @@ Topologies: --> SELECT-0 <-- KSTREAM-SOURCE-0000000000 Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + --> KSTREAM-MAPVALUES-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 + Processor: KSTREAM-MAPVALUES-0000000003 (stores: []) + --> KSTREAM-SINK-0000000004 <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) - --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 - Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) - --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 - Sink: KSTREAM-SINK-0000000007 (topic: REPARTITIONED) - <-- KSTREAM-MAPVALUES-0000000006 + Sink: KSTREAM-SINK-0000000004 (topic: REPARTITIONED) + <-- KSTREAM-MAPVALUES-0000000003 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by index c78b322a99f4..c58b8aef7482 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_ROWTIME b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_ROWTIME index 0437592be6dd..3f4037de65d8 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_ROWTIME +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_ROWTIME @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_partition_by_value b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_partition_by_value index 2c22a13212cf..2e69d72025da 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_partition_by_value +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_partition_by_value @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: REPARTITIONED) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_value b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_value index 2c22a13212cf..2e69d72025da 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_value +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_null_value @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: REPARTITIONED) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_all b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_all index b4130c58e365..233fbea3d134 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_all +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_all @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: REPARTITIONED) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_some b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_some index 2c22a13212cf..2e69d72025da 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_some +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/partition-by_-_partition_by_with_projection_select_some @@ -47,23 +47,23 @@ Topologies: Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> SELECT-0 + --> KSTREAM-FILTER-0000000002 <-- KSTREAM-SOURCE-0000000000 - Processor: SELECT-0 (stores: []) - --> KSTREAM-FILTER-0000000003 + Processor: KSTREAM-FILTER-0000000002 (stores: []) + --> KSTREAM-KEY-SELECT-0000000003 <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-FILTER-0000000003 (stores: []) - --> KSTREAM-KEY-SELECT-0000000004 - <-- SELECT-0 - Processor: KSTREAM-KEY-SELECT-0000000004 (stores: []) - --> KSTREAM-MAPVALUES-0000000005 - <-- KSTREAM-FILTER-0000000003 - Processor: KSTREAM-MAPVALUES-0000000005 (stores: []) + Processor: KSTREAM-KEY-SELECT-0000000003 (stores: []) + --> KSTREAM-MAPVALUES-0000000004 + <-- KSTREAM-FILTER-0000000002 + Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) + --> SELECT-0 + <-- KSTREAM-KEY-SELECT-0000000003 + Processor: SELECT-0 (stores: []) --> KSTREAM-MAPVALUES-0000000006 - <-- KSTREAM-KEY-SELECT-0000000004 + <-- KSTREAM-MAPVALUES-0000000004 Processor: KSTREAM-MAPVALUES-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 - <-- KSTREAM-MAPVALUES-0000000005 + <-- SELECT-0 Sink: KSTREAM-SINK-0000000007 (topic: REPARTITIONED) <-- KSTREAM-MAPVALUES-0000000006 diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json b/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json index 1fc393ba8351..152585f366da 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/key-field.json @@ -90,30 +90,30 @@ "CREATE STREAM INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT foo AS ALIASED, bar FROM INPUT PARTITION BY ALIASED;" ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Invalid identifier for PARTITION BY clause: ALIASED" + } + }, + { + "name": "stream | initially null | partition by (-) | key not in value | -", + "statements": [ + "CREATE STREAM INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT bar FROM INPUT PARTITION BY foo;" + ], "inputs": [ {"topic": "input_topic", "value": {"foo": 1, "bar": 2}} ], "outputs": [ - {"topic": "OUTPUT", "key": "1", "value": {"ALIASED":1, "BAR": 2}} + {"topic": "OUTPUT", "key": "1", "value": {"BAR": 2}} ], "post": { "sources": [ {"name": "INPUT", "type": "stream", "keyField": null}, - {"name": "OUTPUT", "type": "stream", "keyField": "ALIASED"} + {"name": "OUTPUT", "type": "stream", "keyField": null} ] } }, - { - "name": "stream | initially null | partition by (-) | key not in value | -", - "statements": [ - "CREATE STREAM INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT bar FROM INPUT PARTITION BY foo;" - ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Column FOO does not exist in the result schema. Error in Partition By clause." - } - }, { "name": "stream | initially null | group by (-) | key in value | no aliasing", "statements": [ @@ -280,17 +280,9 @@ "CREATE STREAM INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='foo', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT foo AS aliased, bar FROM INPUT PARTITION BY aliased;" ], - "inputs": [ - {"topic": "input_topic", "key": "1", "value": {"foo": 1, "bar": 2}} - ], - "outputs": [ - {"topic": "OUTPUT", "key":"1", "value": {"ALIASED":1, "BAR": 2}} - ], - "post": { - "sources": [ - {"name": "INPUT", "type": "stream", "keyField": "FOO"}, - {"name": "OUTPUT", "type": "stream", "keyField": "ALIASED"} - ] + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Invalid identifier for PARTITION BY clause: ALIASED" } }, { @@ -299,9 +291,20 @@ "CREATE STREAM INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='foo', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT bar FROM INPUT PARTITION BY foo;" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Column FOO does not exist in the result schema. Error in Partition By clause." + "inputs": [ + {"topic": "input_topic", "key": "1", "value": {"foo": 1, "bar": 2}} + ], + "outputs": [ + {"topic": "OUTPUT", "key":"1", "value": {"BAR": 2}} + ], + "post": { + "sources": [ + {"name": "INPUT", "type": "stream", "keyField": "FOO"}, + {"name": "OUTPUT", "type": "stream", "keyField": null} + ], + "topics": { + "blacklist": ".*-repartition" + } } }, { @@ -329,20 +332,9 @@ "CREATE STREAM INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='foo', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT foo, bar AS aliased FROM INPUT PARTITION BY aliased;" ], - "inputs": [ - {"topic": "input_topic", "key": "1", "value": {"foo": 1, "bar": 2}} - ], - "outputs": [ - {"topic": "OUTPUT", "key":"2", "value": {"FOO": 1, "ALIASED": 2}} - ], - "post": { - "sources": [ - {"name": "INPUT", "type": "stream", "keyField": "FOO"}, - {"name": "OUTPUT", "type": "stream", "keyField": "ALIASED"} - ], - "topics": { - "blacklist": ".*-repartition" - } + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Invalid identifier for PARTITION BY clause: ALIASED" } }, { @@ -351,9 +343,17 @@ "CREATE STREAM INPUT (foo INT, bar INT) WITH (kafka_topic='input_topic', key='foo', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT foo FROM INPUT PARTITION BY bar;" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Column BAR does not exist in the result schema. Error in Partition By clause." + "inputs": [ + {"topic": "input_topic", "key": "1", "value": {"foo": 1, "bar": 2}} + ], + "outputs": [ + {"topic": "OUTPUT", "key":"2", "value": {"FOO":1}} + ], + "post": { + "sources": [ + {"name": "INPUT", "type": "stream", "keyField": "FOO"}, + {"name": "OUTPUT", "type": "stream", "keyField": null} + ] } }, { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json index f356a81fa9c9..44e3636c1f3c 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -72,18 +72,9 @@ "CREATE STREAM TEST (ID varchar, NAME varchar) with (kafka_topic='test_topic', value_format = 'delimited', key='ID');", "CREATE STREAM REPARTITIONED AS select ID + '_new' AS ID_new, NAME from TEST partition by ID_NEW;" ], - "inputs": [ - {"topic": "test_topic", "key": 0, "value": "0,a"}, - {"topic": "test_topic", "key": 1, "value": "1,b"} - ], - "outputs": [ - {"topic": "REPARTITIONED", "key": "0_new", "value": "0_new,a"}, - {"topic": "REPARTITIONED", "key": "1_new", "value": "1_new,b"} - ], - "post": { - "sources": [ - {"name": "REPARTITIONED", "type": "stream", "keyField": "ID_NEW"} - ] + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Invalid identifier for PARTITION BY clause: ID_NEW" } }, { @@ -100,12 +91,12 @@ {"topic": "test_topic", "key": 1, "value": "1,b"} ], "outputs": [ - {"topic": "REPARTITIONED", "key": "0_new", "value": "0_new,a"}, - {"topic": "REPARTITIONED", "key": "1_new", "value": "1_new,b"} + {"topic": "REPARTITIONED", "key": 0, "value": "0_new,a"}, + {"topic": "REPARTITIONED", "key": 1, "value": "1_new,b"} ], "post": { "sources": [ - {"name": "REPARTITIONED", "type": "stream", "keyField": "ID"} + {"name": "REPARTITIONED", "type": "stream", "keyField": null} ] } }, diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 4a0ae292bbf4..11dd95284393 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -53,15 +53,14 @@ statement (tableElements)? (WITH tableProperties)? #createStream | CREATE STREAM (IF NOT EXISTS)? sourceName - (WITH tableProperties)? AS query - (PARTITION BY identifier)? #createStreamAs + (WITH tableProperties)? AS query #createStreamAs | CREATE TABLE (IF NOT EXISTS)? sourceName (tableElements)? (WITH tableProperties)? #createTable | CREATE TABLE (IF NOT EXISTS)? sourceName (WITH tableProperties)? AS query #createTableAs | CREATE (SINK | SOURCE) CONNECTOR identifier WITH tableProperties #createConnector - | INSERT INTO sourceName query (PARTITION BY identifier)? #insertInto + | INSERT INTO sourceName query #insertInto | INSERT INTO sourceName (columns)? VALUES values #insertValues | DROP STREAM (IF EXISTS)? sourceName (DELETE TOPIC)? #dropStream | DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable @@ -78,6 +77,7 @@ query (WINDOW windowExpression)? (WHERE where=booleanExpression)? (GROUP BY groupBy)? + (PARTITION BY partitionBy=identifier)? (HAVING having=booleanExpression)? (EMIT resultMaterialization)? limitClause? diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 40bb3ed4a140..e802934d7c82 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -279,8 +279,7 @@ public Node visitCreateStreamAs(final SqlBaseParser.CreateStreamAsContext contex ParserUtil.getSourceName(context.sourceName()), query, context.EXISTS() != null, - CreateSourceAsProperties.from(properties), - getPartitionBy(context.identifier()) + CreateSourceAsProperties.from(properties) ); } @@ -320,8 +319,7 @@ public Node visitInsertInto(final SqlBaseParser.InsertIntoContext context) { return new InsertInto( getLocation(context), targetName, - query, - getPartitionBy(context.identifier())); + query); } @Override @@ -406,6 +404,7 @@ public Query visitQuery(final SqlBaseParser.QueryContext context) { visitIfPresent(context.windowExpression(), WindowExpression.class), visitIfPresent(context.where, Expression.class), visitIfPresent(context.groupBy(), GroupBy.class), + getPartitionBy(context.partitionBy), visitIfPresent(context.having, Expression.class), resultMaterialization, pullQuery, diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index dddde01cb1d8..308b30fa3e5d 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -120,6 +120,13 @@ protected Void visitQuery(final Query node, final Integer indent) { .append('\n'); } + if (node.getPartitionBy().isPresent()) { + append(indent, "PARTITION BY " + + ExpressionFormatterUtil + .formatExpression(node.getPartitionBy().get())) + .append('\n'); + } + if (node.getHaving().isPresent()) { append(indent, "HAVING " + ExpressionFormatterUtil.formatExpression(node.getHaving().get())) @@ -292,7 +299,6 @@ protected Void visitInsertInto(final InsertInto node, final Integer indent) { builder.append(escapedName(node.getTarget())); builder.append(" "); process(node.getQuery(), indent); - processPartitionBy(node.getPartitionByColumn(), indent); return null; } @@ -481,7 +487,6 @@ private void formatCreateAs(final CreateAsSelect node, final Integer indent) { builder.append(" AS "); process(node.getQuery(), indent); - processPartitionBy(node.getPartitionByColumn(), indent); } private static String formatTableElement(final TableElement e) { diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java index 1ead566bfe96..b78946e69935 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateAsSelect.java @@ -17,7 +17,6 @@ import static java.util.Objects.requireNonNull; -import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; @@ -30,22 +29,19 @@ public abstract class CreateAsSelect extends Statement implements QueryContainer private final Query query; private final boolean notExists; private final CreateSourceAsProperties properties; - private final Optional partitionByColumn; CreateAsSelect( final Optional location, final SourceName name, final Query query, final boolean notExists, - final CreateSourceAsProperties properties, - final Optional partitionByColumn + final CreateSourceAsProperties properties ) { super(location); this.name = requireNonNull(name, "name"); this.query = requireNonNull(query, "query"); this.notExists = notExists; this.properties = requireNonNull(properties, "properties"); - this.partitionByColumn = requireNonNull(partitionByColumn, "partitionByColumn"); } CreateAsSelect( @@ -57,8 +53,8 @@ public abstract class CreateAsSelect extends Statement implements QueryContainer other.name, other.query, other.notExists, - properties, - other.partitionByColumn); + properties + ); } public abstract CreateAsSelect copyWith(CreateSourceAsProperties properties); @@ -80,10 +76,6 @@ public CreateSourceAsProperties getProperties() { return properties; } - public Optional getPartitionByColumn() { - return partitionByColumn; - } - @Override public int hashCode() { return Objects.hash(name, query, properties, notExists, getClass()); @@ -101,8 +93,7 @@ public boolean equals(final Object obj) { return Objects.equals(name, o.name) && Objects.equals(query, o.query) && Objects.equals(notExists, o.notExists) - && Objects.equals(properties, o.properties) - && Objects.equals(partitionByColumn, o.partitionByColumn); + && Objects.equals(properties, o.properties); } @Override @@ -111,7 +102,6 @@ public String toString() { + ", query=" + query + ", notExists=" + notExists + ", properties=" + properties - + ", partitionByColumn=" + partitionByColumn + '}'; } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java index 21cac30bf758..3faca3877fab 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateStreamAsSelect.java @@ -16,7 +16,6 @@ package io.confluent.ksql.parser.tree; import com.google.errorprone.annotations.Immutable; -import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.NodeLocation; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; @@ -29,10 +28,9 @@ public CreateStreamAsSelect( final SourceName name, final Query query, final boolean notExists, - final CreateSourceAsProperties properties, - final Optional partitionByColumn + final CreateSourceAsProperties properties ) { - this(Optional.empty(), name, query, notExists, properties, partitionByColumn); + this(Optional.empty(), name, query, notExists, properties); } public CreateStreamAsSelect( @@ -40,9 +38,8 @@ public CreateStreamAsSelect( final SourceName name, final Query query, final boolean notExists, - final CreateSourceAsProperties properties, - final Optional partitionByColumn) { - super(location, name, query, notExists, properties, partitionByColumn); + final CreateSourceAsProperties properties) { + super(location, name, query, notExists, properties); } private CreateStreamAsSelect( @@ -54,7 +51,7 @@ private CreateStreamAsSelect( @Override public Sink getSink() { - return Sink.of(getName(), true, getProperties(), getPartitionByColumn()); + return Sink.of(getName(), true, getProperties()); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java index 0375bb1c0adc..3163d9db8e9e 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/CreateTableAsSelect.java @@ -40,7 +40,7 @@ public CreateTableAsSelect( final boolean notExists, final CreateSourceAsProperties properties ) { - super(location, name, query, notExists, properties, Optional.empty()); + super(location, name, query, notExists, properties); } private CreateTableAsSelect( @@ -62,7 +62,7 @@ public R accept(final AstVisitor visitor, final C context) { @Override public Sink getSink() { - return Sink.of(getName(), true, getProperties(), Optional.empty()); + return Sink.of(getName(), true, getProperties()); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/InsertInto.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/InsertInto.java index 319d01066a1e..9c350457ab67 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/InsertInto.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/InsertInto.java @@ -33,26 +33,22 @@ public class InsertInto private final SourceName target; private final Query query; - private final Optional partitionByColumn; public InsertInto( final SourceName target, - final Query query, - final Optional partitionByColumn + final Query query ) { - this(Optional.empty(), target, query, partitionByColumn); + this(Optional.empty(), target, query); } public InsertInto( final Optional location, final SourceName target, - final Query query, - final Optional partitionByColumn + final Query query ) { super(location); this.target = requireNonNull(target, "target"); this.query = requireNonNull(query, "query"); - this.partitionByColumn = requireNonNull(partitionByColumn, "partitionByColumn"); } public SourceName getTarget() { @@ -65,12 +61,12 @@ public Query getQuery() { } public Optional getPartitionByColumn() { - return partitionByColumn; + return query.getPartitionBy(); } @Override public Sink getSink() { - return Sink.of(target, false, CreateSourceAsProperties.none(), partitionByColumn); + return Sink.of(target, false, CreateSourceAsProperties.none()); } @Override @@ -93,8 +89,7 @@ public boolean equals(final Object obj) { } final InsertInto o = (InsertInto) obj; return Objects.equals(target, o.target) - && Objects.equals(query, o.query) - && Objects.equals(partitionByColumn, o.partitionByColumn); + && Objects.equals(query, o.query); } @Override @@ -102,7 +97,6 @@ public String toString() { return toStringHelper(this) .add("target", target) .add("query", query) - .add("partitionBy", partitionByColumn) .toString(); } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Query.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Query.java index 13504e71548e..ce09a84568a2 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Query.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Query.java @@ -21,6 +21,7 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.parser.NodeLocation; +import io.confluent.ksql.util.KsqlPreconditions; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; @@ -33,18 +34,22 @@ public class Query extends Statement { private final Optional window; private final Optional where; private final Optional groupBy; + private final Optional partitionBy; private final Optional having; private final ResultMaterialization resultMaterialization; private final boolean pullQuery; private final OptionalInt limit; + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck public Query( + // CHECKSTYLE_RULES.ON: ParameterNumberCheck final Optional location, final Select select, final Relation from, final Optional window, final Optional where, final Optional groupBy, + final Optional partitionBy, final Optional having, final ResultMaterialization resultMaterialization, final boolean pullQuery, @@ -56,10 +61,15 @@ public Query( this.window = requireNonNull(window, "window"); this.where = requireNonNull(where, "where"); this.groupBy = requireNonNull(groupBy, "groupBy"); + this.partitionBy = requireNonNull(partitionBy, "partitionBy"); this.having = requireNonNull(having, "having"); this.resultMaterialization = requireNonNull(resultMaterialization, "resultMaterialization"); this.pullQuery = pullQuery; this.limit = requireNonNull(limit, "limit"); + + KsqlPreconditions.checkArgument( + !(partitionBy.isPresent() && groupBy.isPresent()), + "Queries only support one of PARTITION BY and GROUP BY"); } public Select getSelect() { @@ -82,6 +92,10 @@ public Optional getGroupBy() { return groupBy; } + public Optional getPartitionBy() { + return partitionBy; + } + public Optional getHaving() { return having; } @@ -111,6 +125,7 @@ public String toString() { .add("window", window.orElse(null)) .add("where", where.orElse(null)) .add("groupBy", groupBy.orElse(null)) + .add("partitionBy", partitionBy.orElse(null)) .add("having", having.orElse(null)) .add("resultMaterialization", resultMaterialization) .add("pullQuery", pullQuery) @@ -137,6 +152,7 @@ public boolean equals(final Object obj) { && Objects.equals(where, o.where) && Objects.equals(window, o.window) && Objects.equals(groupBy, o.groupBy) + && Objects.equals(partitionBy, o.partitionBy) && Objects.equals(having, o.having) && Objects.equals(resultMaterialization, o.resultMaterialization) && Objects.equals(limit, o.limit); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Sink.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Sink.java index ddb89e477ebd..834810bf3226 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Sink.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/Sink.java @@ -18,14 +18,8 @@ import static java.util.Objects.requireNonNull; import com.google.errorprone.annotations.Immutable; -import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; -import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; -import io.confluent.ksql.schema.ksql.ColumnRef; -import io.confluent.ksql.util.KsqlException; -import java.util.Objects; -import java.util.Optional; /** * Pojo holding sink information @@ -36,7 +30,6 @@ public final class Sink { private final SourceName name; private final boolean createSink; private final CreateSourceAsProperties properties; - private final Optional partitionBy; /** * Info about the sink of a query. @@ -44,43 +37,24 @@ public final class Sink { * @param name the name of the sink. * @param createSink indicates if name should be created, (CSAS/CTAS), or not (INSERT INTO). * @param properties properties of the sink. - * @param partitionBy an optional partition by expression * @return the pojo. */ public static Sink of( final SourceName name, final boolean createSink, - final CreateSourceAsProperties properties, - final Optional partitionBy + final CreateSourceAsProperties properties ) { - if (partitionBy.isPresent()) { - final Expression partitionByExp = partitionBy.get(); - if (partitionByExp instanceof ColumnReferenceExp) { - return new Sink( - name, - createSink, - properties, - Optional.of(((ColumnReferenceExp) partitionByExp).getReference()) - ); - } - - throw new KsqlException( - "Expected partition by to be a valid column but got " + partitionByExp); - } - - return new Sink(name, createSink, properties, Optional.empty()); + return new Sink(name, createSink, properties); } private Sink( final SourceName name, final boolean createSink, - final CreateSourceAsProperties properties, - final Optional partitionBy + final CreateSourceAsProperties properties ) { this.name = requireNonNull(name, "name"); this.properties = requireNonNull(properties, "properties"); this.createSink = createSink; - this.partitionBy = Objects.requireNonNull(partitionBy, "partitionBy"); } public SourceName getName() { @@ -94,8 +68,4 @@ public boolean shouldCreateSink() { public CreateSourceAsProperties getProperties() { return properties; } - - public Optional getPartitionBy() { - return partitionBy; - } } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index fa2fca8ace41..5a44c344009c 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -495,8 +495,8 @@ public void shouldFormatCsasPartitionBy() { assertThat(result, is("CREATE STREAM S AS SELECT *\n" + "FROM ADDRESS ADDRESS\n" - + "EMIT CHANGES\n" - + "PARTITION BY ADDRESS" + + "PARTITION BY ADDRESS\n" + + "EMIT CHANGES" )); } @@ -509,8 +509,8 @@ public void shouldFormatInsertIntoPartitionBy() { assertThat(result, startsWith("INSERT INTO ADDRESS SELECT *\n" + "FROM ADDRESS ADDRESS\n" - + "EMIT CHANGES\n" - + "PARTITION BY ADDRESS" + + "PARTITION BY ADDRESS\n" + + "EMIT CHANGES" )); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamAsSelectTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamAsSelectTest.java index 368b6cde9e3e..9e45a54060db 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamAsSelectTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamAsSelectTest.java @@ -36,33 +36,29 @@ public class CreateStreamAsSelectTest { ImmutableMap.of("KAFKA_TOPIC", new StringLiteral("value")) ); private static final Query SOME_QUERY = mock(Query.class); - private static final Optional EXPRESSION = Optional.of(mock(Expression.class)); @Test public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, SOME_PROPS, EXPRESSION), + new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, SOME_PROPS), new CreateStreamAsSelect(Optional.of(SOME_LOCATION), SOME_NAME, SOME_QUERY, true, - SOME_PROPS, EXPRESSION) + SOME_PROPS) ) .addEqualityGroup( - new CreateStreamAsSelect(SourceName.of("diff"), SOME_QUERY, true, SOME_PROPS, - EXPRESSION) + new CreateStreamAsSelect(SourceName.of("diff"), SOME_QUERY, true, SOME_PROPS + ) ) .addEqualityGroup( - new CreateStreamAsSelect(SOME_NAME, mock(Query.class), true, SOME_PROPS, EXPRESSION) + new CreateStreamAsSelect(SOME_NAME, mock(Query.class), true, SOME_PROPS) ) .addEqualityGroup( - new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, false, SOME_PROPS, EXPRESSION) + new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, false, SOME_PROPS) ) .addEqualityGroup( - new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, CreateSourceAsProperties.none(), - EXPRESSION) - ) - .addEqualityGroup( - new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, SOME_PROPS, Optional.empty()) + new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, CreateSourceAsProperties.none() + ) ) .testEquals(); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java index 0dfaa10e7cb2..bd8372423817 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableAsSelectTest.java @@ -57,7 +57,7 @@ public void shouldImplementHashCodeAndEqualsProperty() { new CreateTableAsSelect(SOME_NAME, SOME_QUERY, true, CreateSourceAsProperties.none()) ) .addEqualityGroup( - new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, CreateSourceAsProperties.none(), Optional.empty()) + new CreateStreamAsSelect(SOME_NAME, SOME_QUERY, true, CreateSourceAsProperties.none()) ) .testEquals(); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/InsertIntoTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/InsertIntoTest.java index 14b40ceb858d..b388598231c9 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/InsertIntoTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/InsertIntoTest.java @@ -30,26 +30,22 @@ public class InsertIntoTest { public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); private static final SourceName SOME_NAME = SourceName.of("bob"); private static final Query SOME_QUERY = mock(Query.class); - private static final Optional SOME_COLUMN = Optional.of(mock(Expression.class)); @Test public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new InsertInto(SOME_NAME, SOME_QUERY, SOME_COLUMN), - new InsertInto(SOME_NAME, SOME_QUERY, SOME_COLUMN), - new InsertInto(Optional.of(SOME_LOCATION), SOME_NAME, SOME_QUERY, SOME_COLUMN), - new InsertInto(Optional.of(OTHER_LOCATION), SOME_NAME, SOME_QUERY, SOME_COLUMN) + new InsertInto(SOME_NAME, SOME_QUERY), + new InsertInto(SOME_NAME, SOME_QUERY), + new InsertInto(Optional.of(SOME_LOCATION), SOME_NAME, SOME_QUERY), + new InsertInto(Optional.of(OTHER_LOCATION), SOME_NAME, SOME_QUERY) ) .addEqualityGroup( - new InsertInto(SourceName.of("jim"), SOME_QUERY, SOME_COLUMN) + new InsertInto(SourceName.of("jim"), SOME_QUERY) ) .addEqualityGroup( - new InsertInto(SOME_NAME, mock(Query.class), SOME_COLUMN) - ) - .addEqualityGroup( - new InsertInto(SOME_NAME, SOME_QUERY, Optional.empty()) + new InsertInto(SOME_NAME, mock(Query.class)) ) .testEquals(); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java index d9606f17ffdb..76273b87e224 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ParserModelTest.java @@ -85,6 +85,7 @@ public class ParserModelTest { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), ResultMaterialization.CHANGES, false, OptionalInt.empty() diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/QueryTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/QueryTest.java index 988a37a097bc..08b8d3399c3a 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/QueryTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/QueryTest.java @@ -52,6 +52,9 @@ public class QueryTest { private static final Optional SOME_GROUP_BY = Optional.of( mock(GroupBy.class) ); + private static final Optional SOME_PARTITION_BY = Optional.of( + mock(Expression.class) + ); private static final Optional SOME_HAVING = Optional.of( mock(Expression.class) ); @@ -62,48 +65,52 @@ public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, SOME_HAVING, FINAL, true, SOME_LIMIT), + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT), new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, SOME_HAVING, FINAL, true, SOME_LIMIT), + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT), new Query(Optional.of(OTHER_LOCATION), SOME_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, SOME_HAVING, FINAL, true, SOME_LIMIT) + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT) + ) + .addEqualityGroup( + new Query(Optional.empty(), OTHER_SELECT, SOME_FROM, SOME_WINDOW, + SOME_WHERE, Optional.empty(), SOME_PARTITION_BY, SOME_HAVING, FINAL, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), OTHER_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, SOME_HAVING, FINAL, true, SOME_LIMIT) + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, OTHER_RELATION, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, SOME_HAVING, FINAL, true, SOME_LIMIT) + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, Optional.empty(), SOME_WHERE, - SOME_GROUP_BY, SOME_HAVING, FINAL, true, SOME_LIMIT) + SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, Optional.empty(), - SOME_GROUP_BY, SOME_HAVING, FINAL, true, SOME_LIMIT) + SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT) + SOME_WHERE, Optional.empty(), Optional.empty(), SOME_HAVING, FINAL, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, Optional.empty(), FINAL, true, SOME_LIMIT) + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), Optional.empty(), FINAL, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, SOME_WHERE, - SOME_GROUP_BY, SOME_HAVING, CHANGES, true, SOME_LIMIT) + SOME_GROUP_BY, Optional.empty(), SOME_HAVING, CHANGES, true, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, SOME_HAVING, FINAL, false, SOME_LIMIT) + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, false, SOME_LIMIT) ) .addEqualityGroup( new Query(Optional.empty(), SOME_SELECT, SOME_FROM, SOME_WINDOW, - SOME_WHERE, SOME_GROUP_BY, SOME_HAVING, FINAL, true, OptionalInt.empty()) + SOME_WHERE, SOME_GROUP_BY, Optional.empty(), SOME_HAVING, FINAL, true, OptionalInt.empty()) ) .testEquals(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index b46efc49e1ec..2c994b3c6eae 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -146,13 +146,13 @@ public class StandaloneExecutorTest { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), ResultMaterialization.CHANGES, false, OptionalInt.empty() ), false, - CreateSourceAsProperties.none(), - Optional.empty() + CreateSourceAsProperties.none() ); @@ -540,7 +540,7 @@ public void shouldRunUnSetStatements() { public void shouldRunCsasStatements() { // Given: final PreparedStatement csas = PreparedStatement.of("CSAS1", - new CreateStreamAsSelect(SOME_NAME, query, false, CreateSourceAsProperties.none(), Optional.empty())); + new CreateStreamAsSelect(SOME_NAME, query, false, CreateSourceAsProperties.none())); final ConfiguredStatement configured = ConfiguredStatement.of(csas, emptyMap(), ksqlConfig); givenQueryFileParsesTo(csas); @@ -577,7 +577,7 @@ public void shouldRunCtasStatements() { public void shouldRunInsertIntoStatements() { // Given: final PreparedStatement insertInto = PreparedStatement.of("InsertInto", - new InsertInto(SOME_NAME, query, Optional.empty())); + new InsertInto(SOME_NAME, query)); final ConfiguredStatement configured = ConfiguredStatement.of(insertInto, emptyMap(), ksqlConfig); givenQueryFileParsesTo(insertInto); @@ -772,7 +772,7 @@ private void givenExecutorWillFailOnNoQueries() { private void givenFileContainsAPersistentQuery() { givenQueryFileParsesTo( - PreparedStatement.of("InsertInto", new InsertInto(SOME_NAME, query, Optional.empty())) + PreparedStatement.of("InsertInto", new InsertInto(SOME_NAME, query)) ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 7be34e9d95f1..2d3bd33861b5 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -660,6 +660,7 @@ private void givenQueryIs(final QueryType type) { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), ResultMaterialization.CHANGES, type == QueryType.PULL, OptionalInt.empty()