From eda2e345868ac517977601c74475eee0da3930c8 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Fri, 7 Feb 2020 10:28:06 +0000 Subject: [PATCH] feat: remove WindowStart() and WindowEnd() UDAFs (#4459) * feat: remove WindowStart() and WindowEnd() UDAFs These two UDAFs were introduced to allow access to the start and end times of the window in a windowed source. `WINDOWSTART` and `WINDOWEND` are now accessible as columns to be used in the SELECT of a query, (outside of UDAFs). This makes the two UDAFs redundant. BREAKING CHANGE: The `WindowStart()` and `WindowEnd()` UDAFs have been removed from KSQL. Use the `WindowStart` and `WindowEnd` system columns to access the window bounds within the SELECT expression instead. --- .../time-and-windows-in-ksqldb-queries.md | 4 +- .../ksqldb-reference/aggregate-functions.md | 18 --- docs-md/tutorials/examples.md | 4 +- .../time-and-windows-in-ksql-queries.rst | 4 +- docs/developer-guide/syntax-reference.rst | 8 +- docs/tutorials/examples.rst | 4 +- .../ksql/analyzer/AggregateAnalyzer.java | 2 +- .../ksql/analyzer/QueryAnalyzer.java | 25 +++- .../function/udaf/window/WindowEndKudaf.java | 49 -------- .../udaf/window/WindowStartKudaf.java | 49 -------- .../ksql/planner/plan/AggregateNode.java | 23 ++-- .../udaf/window/WindowEndKudafTest.java | 38 ------ .../udaf/window/WindowStartKudafTest.java | 38 ------ .../transform/window/WindowSelectMapper.java | 97 --------------- .../window/WindowSelectMapperTest.java | 115 ------------------ .../0_6_0-pre/having_-_table_having | 6 +- .../0_6_0-pre/hopping-windows_-_count | 6 +- .../0_6_0-pre/hopping-windows_-_max_hopping | 6 +- .../0_6_0-pre/hopping-windows_-_min_hopping | 6 +- .../0_6_0-pre/hopping-windows_-_topk_hopping | 6 +- .../hopping-windows_-_topkdistinct_hopping | 6 +- .../0_6_0-pre/session-windows_-_max_session | 6 +- .../0_6_0-pre/tumbling-windows_-_max_tumbling | 6 +- .../0_6_0-pre/tumbling-windows_-_min_tumbling | 6 +- .../tumbling-windows_-_topk_tumbling | 6 +- .../tumbling-windows_-_topkdistinct_tumbling | 6 +- .../0_6_0-pre/window-bounds_-_in_expressions | 23 ++-- .../0_6_0-pre/window-bounds_-_none | 71 ----------- .../0_6_0-pre/window-bounds_-_table_hopping | 23 ++-- .../0_6_0-pre/window-bounds_-_table_session | 23 ++-- .../0_6_0-pre/window-bounds_-_table_tumbling | 23 ++-- .../query-validation-tests/group-by.json | 22 ++++ .../query-validation-tests/window-bounds.json | 98 +++++++-------- ...eries-against-materialized-aggregates.json | 22 ---- .../server/execution/PullQueryExecutor.java | 2 +- .../execution/streams/AggregateParams.java | 9 -- .../streams/AggregateParamsFactory.java | 2 - .../streams/StreamAggregateBuilder.java | 17 +-- .../streams/AggregateParamsFactoryTest.java | 40 ------ .../streams/StreamAggregateBuilderTest.java | 35 +----- 40 files changed, 189 insertions(+), 765 deletions(-) delete mode 100644 ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java delete mode 100644 ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java delete mode 100644 ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowEndKudafTest.java delete mode 100644 ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowStartKudafTest.java delete mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java delete mode 100644 ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java delete mode 100644 ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_none diff --git a/docs-md/concepts/time-and-windows-in-ksqldb-queries.md b/docs-md/concepts/time-and-windows-in-ksqldb-queries.md index e2b82203c5f..4355cf0fac2 100644 --- a/docs-md/concepts/time-and-windows-in-ksqldb-queries.md +++ b/docs-md/concepts/time-and-windows-in-ksqldb-queries.md @@ -178,7 +178,7 @@ boundaries are named *windows*. ![Diagram showing the relationship between records and time in a ksqlDB stream](../img/ksql-window.png) A window has a start time and an end time, which you access in your -queries by using the WINDOWSTART() and WINDOWEND() functions. +queries by using the WINDOWSTART and WINDOWEND system columns. !!! important ksqlDB is based on the Unix epoch time in the UTC timezone, and this can @@ -339,7 +339,7 @@ session window to have zero records. If a session window contains exactly one record, the record's ROWTIME timestamp is identical to the window's own start and end times. Access -these by using the WINDOWSTART() and WINDOWEND() functions. +these by using the WINDOWSTART and WINDOWEND system columns. If a session window contains two or more records, then the earliest/oldest record's ROWTIME timestamp is identical to the diff --git a/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md b/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md index 4ef6f2889d4..0ff72fceb9b 100644 --- a/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md +++ b/docs-md/developer-guide/ksqldb-reference/aggregate-functions.md @@ -170,23 +170,5 @@ Stream Return the distinct Top *K* values for the given column and window Note: rows where `col1` is null will be ignored. -WindowStart ------------ - -`WindowStart()` - -Stream, Table - -Extract the start time of the current window, in milliseconds. -If the query is not windowed the function will return null. - -WindowEnd ---------- - -`WindowEnd()` - -Extract the end time of the current window, in milliseconds. -If the query is not windowed the function will return null. - Page last revised on: {{ git_revision_date }} diff --git a/docs-md/tutorials/examples.md b/docs-md/tutorials/examples.md index 1214f85fdf3..d2d4ce8eac2 100644 --- a/docs-md/tutorials/examples.md +++ b/docs-md/tutorials/examples.md @@ -312,8 +312,8 @@ current session window into fields within output rows. ```sql CREATE TABLE pageviews_per_region_per_session AS SELECT regionid, - windowStart(), - windowEnd(), + windowStart AS WSTART, + windowEnd AS WEND, count(*) FROM pageviews_enriched WINDOW SESSION (60 SECONDS) diff --git a/docs/concepts/time-and-windows-in-ksql-queries.rst b/docs/concepts/time-and-windows-in-ksql-queries.rst index 3c7ef88cac8..1449cfbb07f 100644 --- a/docs/concepts/time-and-windows-in-ksql-queries.rst +++ b/docs/concepts/time-and-windows-in-ksql-queries.rst @@ -165,7 +165,7 @@ are named *windows*. :alt: Diagram showing the relationship between records and time in a KSQL stream A window has a start time and an end time, which you access in your queries by -using the WINDOWSTART() and WINDOWEND() functions. +using the WINDOWSTART and WINDOWEND system columns. .. important:: @@ -339,7 +339,7 @@ window to have zero records. If a session window contains exactly one record, the record's ROWTIME timestamp is identical to the window's own start and end times. Access these by using the -WINDOWSTART() and WINDOWEND() functions. +WINDOWSTART and WINDOWEND system columns. If a session window contains two or more records, then the earliest/oldest record's ROWTIME timestamp is identical to the window's start time, and the diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 0c71bfa5052..65f327936a0 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -1290,7 +1290,7 @@ Pulls the current value from the materialized table and terminate. The result of this statement will not be persisted in a Kafka topic and will only be printed out in the console. -The WHERE clause must contain a single value of ``ROWKEY`` to retieve and may optionally include +The WHERE clause must contain a single value of ``ROWKEY`` to retrieve and may optionally include bounds on WINDOWSTART if the materialized table is windowed. Example: @@ -2157,12 +2157,6 @@ Aggregate functions | TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window | | | | | Note: rows where ``col1`` is null will be ignored. | +------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| WindowStart | ``WindowStart()`` | Stream | Extract the start time of the current window, in milliseconds. | -| | | Table | If the query is not windowed the function will return null. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ -| WindowEnd | ``WindowEnd()`` | Stream | Extract the end time of the current window, in milliseconds. | -| | | Table | If the query is not windowed the function will return null. | -+------------------------+---------------------------+------------+---------------------------------------------------------------------+ For more information, see :ref:`aggregate-streaming-data-with-ksql`. diff --git a/docs/tutorials/examples.rst b/docs/tutorials/examples.rst index 2ffad7fad12..51cccc59270 100644 --- a/docs/tutorials/examples.rst +++ b/docs/tutorials/examples.rst @@ -293,8 +293,8 @@ end time of the current session window into fields within output rows. CREATE TABLE pageviews_per_region_per_session AS SELECT regionid, - windowStart(), - windowEnd(), + windowStart AS WSTART, + windowEnd AS WEND, count(*) FROM pageviews_enriched WINDOW SESSION (60 SECONDS) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java index 6a7da020ee1..47ed11d2ec4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/AggregateAnalyzer.java @@ -110,7 +110,7 @@ private void throwOnWindowBoundColumnIfWindowedAggregate(final ColumnReferenceEx if (SchemaUtil.isWindowBound(node.getReference())) { throw new KsqlException( "Window bounds column " + node + " can only be used in the SELECT clause of " - + "windowed aggregations." + + "windowed aggregations and can not be passed to aggregate functions." + System.lineSeparator() + "See https://github.com/confluentinc/ksql/issues/4397" ); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java index 24b83af4f46..3aaa9ecd85e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; import io.confluent.ksql.analyzer.Analysis.AliasedDataSource; @@ -213,16 +214,13 @@ private static void enforceAggregateRules( "GROUP BY requires columns using aggregate functions in SELECT clause."); } - final Set groupByExprs = ImmutableSet.copyOf(analysis.getGroupByExpressions()); + final Set groupByExprs = getGroupByExpressions(analysis); final List unmatchedSelects = aggregateAnalysis.getNonAggregateSelectExpressions() .entrySet() .stream() // Remove any that exactly match a group by expression: .filter(e -> !groupByExprs.contains(e.getKey())) - // Remove any window bounds expressions, which are implicit: - .filter(e -> !(e.getKey() instanceof ColumnReferenceExp - && SchemaUtil.isWindowBound(((ColumnReferenceExp) e.getKey()).getReference()))) // Remove any that are constants, // or expressions where all params exactly match a group by expression: .filter(e -> !Sets.difference(e.getValue(), groupByExprs).isEmpty()) @@ -253,4 +251,23 @@ private static void enforceAggregateRules( "Non-aggregate HAVING expression not part of GROUP BY: " + havingOnly); } } + + private static Set getGroupByExpressions(final Analysis analysis) { + if (!analysis.getWindowExpression().isPresent()) { + return ImmutableSet.copyOf(analysis.getGroupByExpressions()); + } + + // Add in window bounds columns as implicit group by columns: + final AliasedDataSource source = Iterables.getOnlyElement(analysis.getFromDataSources()); + + final Set windowBoundColumnRefs = + SchemaUtil.windowBoundsColumnNames().stream() + .map(cn -> new QualifiedColumnReferenceExp(source.getAlias(), cn)) + .collect(Collectors.toSet()); + + return ImmutableSet.builder() + .addAll(analysis.getGroupByExpressions()) + .addAll(windowBoundColumnRefs) + .build(); + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java deleted file mode 100644 index d1668bfe0fe..00000000000 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.function.udaf.window; - -import io.confluent.ksql.execution.transform.window.WindowSelectMapper; -import io.confluent.ksql.function.udaf.TableUdaf; -import io.confluent.ksql.function.udaf.UdafDescription; -import io.confluent.ksql.function.udaf.UdafFactory; -import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; -import io.confluent.ksql.util.KsqlConstants; - -/** - * A placeholder KUDAF for extracting window end times. - * - *

The KUDAF itself does nothing. It's just a placeholder. - * - * @see WindowSelectMapper - */ -@SuppressWarnings("WeakerAccess") // Invoked via reflection. -@UdafDescription(name = "WindowEnd", author = KsqlConstants.CONFLUENT_AUTHOR, - description = "Returns the window end time, in milliseconds, for the given record. " - + "If the given record is not part of a window the function will return NULL.") -public final class WindowEndKudaf { - - private WindowEndKudaf() { - } - - static String getFunctionName() { - return WindowSelectMapper.WINDOW_END_NAME; - } - - @UdafFactory(description = "Extracts the window end time") - public static TableUdaf createWindowEnd() { - return PlaceholderTableUdaf.INSTANCE; - } -} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java deleted file mode 100644 index 17bd9a6f6d4..00000000000 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.function.udaf.window; - -import io.confluent.ksql.execution.transform.window.WindowSelectMapper; -import io.confluent.ksql.function.udaf.TableUdaf; -import io.confluent.ksql.function.udaf.UdafDescription; -import io.confluent.ksql.function.udaf.UdafFactory; -import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; -import io.confluent.ksql.util.KsqlConstants; - -/** - * A placeholder KUDAF for extracting window start times. - * - *

The KUDAF itself does nothing. It's just a placeholder. - * - * @see WindowSelectMapper - */ -@SuppressWarnings("WeakerAccess") // Invoked via reflection. -@UdafDescription(name = "WindowStart", author = KsqlConstants.CONFLUENT_AUTHOR, - description = "Returns the window start time, in milliseconds, for the given record. " - + "If the given record is not part of a window the function will return NULL.") -public final class WindowStartKudaf { - - private WindowStartKudaf() { - } - - static String getFunctionName() { - return WindowSelectMapper.WINDOW_START_NAME; - } - - @UdafFactory(description = "Extracts the window start time") - public static TableUdaf createWindowStart() { - return PlaceholderTableUdaf.INSTANCE; - } -} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index 84bd1859ef6..4b001b830d2 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -360,10 +360,6 @@ List getAggArgExpansionList() { } private Expression resolveToInternal(final Expression exp) { - if (isWindowBound(exp)) { - return exp; - } - final ColumnName name = expressionToInternalColumnName.get(exp.toString()); if (name != null) { return new UnqualifiedColumnReferenceExp( @@ -374,15 +370,6 @@ private Expression resolveToInternal(final Expression exp) { return ExpressionTreeRewriter.rewriteWith(new ResolveToInternalRewriter()::process, exp); } - private static boolean isWindowBound(final Expression exp) { - if (!(exp instanceof ColumnReferenceExp)) { - return false; - } - - final ColumnReferenceExp column = (ColumnReferenceExp)exp; - return SchemaUtil.isWindowBound(column.getReference()); - } - private final class ResolveToInternalRewriter extends VisitParentExpressionVisitor, Context> { private ResolveToInternalRewriter() { @@ -404,9 +391,15 @@ public Optional visitColumnReference( } final boolean isAggregate = node.getReference().isAggregate(); + final boolean windowBounds = SchemaUtil.isWindowBound(node.getReference()); + + if (isAggregate && windowBounds) { + throw new KsqlException("Window bound " + node + " is not available as a parameter " + + "to aggregate functions"); + } - if (!isAggregate) { - throw new KsqlException("Unknown source column: " + node.toString()); + if (!isAggregate && !windowBounds) { + throw new KsqlException("Unknown source column: " + node); } return Optional.of(node); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowEndKudafTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowEndKudafTest.java deleted file mode 100644 index 592781f34c7..00000000000 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowEndKudafTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.function.udaf.window; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; - -import io.confluent.ksql.function.udaf.UdafDescription; -import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; -import org.junit.Test; - -public class WindowEndKudafTest { - - @Test - public void shouldReturnCorrectFunctionName() { - assertThat(WindowEndKudaf.getFunctionName(), - is(WindowEndKudaf.class.getAnnotation(UdafDescription.class).name())); - } - - @Test - public void shouldCreatePlaceholderTableUdaf() { - assertThat(WindowEndKudaf.createWindowEnd(), is(instanceOf(PlaceholderTableUdaf.class))); - } -} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowStartKudafTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowStartKudafTest.java deleted file mode 100644 index 6442599d0cd..00000000000 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udaf/window/WindowStartKudafTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.function.udaf.window; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; - -import io.confluent.ksql.function.udaf.UdafDescription; -import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; -import org.junit.Test; - -public class WindowStartKudafTest { - - @Test - public void shouldReturnCorrectFunctionName() { - assertThat(WindowStartKudaf.getFunctionName(), - is(WindowStartKudaf.class.getAnnotation(UdafDescription.class).name())); - } - - @Test - public void shouldCreatePlaceholderTableUdaf() { - assertThat(WindowStartKudaf.createWindowStart(), is(instanceOf(PlaceholderTableUdaf.class))); - } -} \ No newline at end of file diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java deleted file mode 100644 index 9e0fd4d0c73..00000000000 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.execution.transform.window; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.execution.transform.KsqlProcessingContext; -import io.confluent.ksql.execution.transform.KsqlTransformer; -import io.confluent.ksql.function.KsqlAggregateFunction; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; - -/** - * Used to handle the special cased {WindowStart} and {WindowEnd}. - */ -public final class WindowSelectMapper { - - public static final String WINDOW_START_NAME = "WindowStart"; - public static final String WINDOW_END_NAME = "WindowEnd"; - - private static final Map WINDOW_FUNCTION_NAMES = ImmutableMap.of( - WINDOW_START_NAME.toUpperCase(), Type.StartTime, - WINDOW_END_NAME.toUpperCase(), Type.EndTime - ); - - private final Map windowSelects; - - public WindowSelectMapper( - final int initialUdafIndex, - final List> functions - ) { - final Builder selectsBuilder = new Builder<>(); - for (int i = 0; i < functions.size(); i++) { - final String name = functions.get(i).name().name().toUpperCase(); - if (WINDOW_FUNCTION_NAMES.containsKey(name)) { - selectsBuilder.put(initialUdafIndex + i, WINDOW_FUNCTION_NAMES.get(name)); - } - } - windowSelects = selectsBuilder.build(); - } - - public boolean hasSelects() { - return !windowSelects.isEmpty(); - } - - public KsqlTransformer, GenericRow> getTransformer() { - return new Transformer(); - } - - private final class Transformer implements KsqlTransformer, GenericRow> { - - @Override - public GenericRow transform( - final Windowed readOnlyKey, - final GenericRow value, - final KsqlProcessingContext ctx - ) { - if (value == null) { - return null; - } - - final Window window = readOnlyKey.window(); - - windowSelects.forEach((index, type) -> value.set(index, type.mapper.apply(window))); - - return value; - } - } - - private enum Type { - StartTime(Window::start), EndTime(Window::end); - - private final Function mapper; - - Type(final Function mapper) { - this.mapper = mapper; - } - } -} diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java deleted file mode 100644 index 3d78278ae02..00000000000 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.execution.transform.window; - -import static io.confluent.ksql.GenericRow.genericRow; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableList; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.execution.transform.KsqlProcessingContext; -import io.confluent.ksql.execution.transform.KsqlTransformer; -import io.confluent.ksql.execution.util.StructKeyUtil; -import io.confluent.ksql.function.KsqlAggregateFunction; -import io.confluent.ksql.name.FunctionName; -import io.confluent.ksql.schema.ksql.types.SqlTypes; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.SessionWindow; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class WindowSelectMapperTest { - - private static final Struct A_KEY = StructKeyUtil.keyBuilder(SqlTypes.STRING) - .build("key"); - - @Mock - private KsqlAggregateFunction windowStartFunc; - @Mock - private KsqlAggregateFunction windowEndFunc; - @Mock - private KsqlAggregateFunction otherFunc; - @Mock - private KsqlProcessingContext ctx; - - @Before - public void setUp() { - when(windowStartFunc.name()).thenReturn(FunctionName.of("WinDowStarT")); - when(windowEndFunc.name()).thenReturn(FunctionName.of("WinDowEnD")); - when(otherFunc.name()).thenReturn(FunctionName.of("NotWindowStartOrWindowEnd")); - } - - @Test - public void shouldNotDetectNonWindowBoundsSelects() { - assertThat(new WindowSelectMapper(5, ImmutableList.of(otherFunc)).hasSelects(), - is(false)); - } - - @Test - public void shouldDetectWindowStartSelects() { - assertThat(new WindowSelectMapper(5, ImmutableList.of(windowStartFunc)).hasSelects(), - is(true)); - } - - @Test - public void shouldDetectWindowEndSelects() { - assertThat(new WindowSelectMapper(5, ImmutableList.of(windowEndFunc)).hasSelects(), - is(true)); - } - - @Test - public void shouldUpdateRowWithWindowBounds() { - // Given: - final KsqlTransformer, GenericRow> mapper = new WindowSelectMapper( - 1, - ImmutableList.of(otherFunc, windowStartFunc, windowEndFunc, windowStartFunc) - ).getTransformer(); - - final Window window = new SessionWindow(12345L, 54321L); - final GenericRow row = genericRow(0, 1, 2, 3, 4, 5); - - // When: - final GenericRow result = mapper.transform(new Windowed<>(A_KEY, window), row, ctx); - - // Then: - assertThat(result, is(sameInstance(row))); - assertThat(row.values(), is(ImmutableList.of(0, 1, 12345L, 54321L, 12345L, 5))); - } - - @Test(expected = IndexOutOfBoundsException.class) - public void shouldThrowIfRowNotBigEnough() { - // Given: - final KsqlTransformer, GenericRow> mapper = new WindowSelectMapper( - 0, - ImmutableList.of(windowStartFunc) - ).getTransformer(); - - final Window window = new SessionWindow(12345L, 54321L); - final GenericRow row = genericRow(); - - // When: - mapper.transform(new Windowed<>(A_KEY, window), row, ctx); - } -} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/having_-_table_having b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/having_-_table_having index b2c3bcba17d..855771de899 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/having_-_table_having +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/having_-_table_having @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-HavingFilter-ApplyPredicate <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-HavingFilter-ApplyPredicate (stores: []) --> Aggregate-HavingFilter-Filter - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: Aggregate-HavingFilter-Filter (stores: []) --> Aggregate-HavingFilter-PostProcess <-- Aggregate-HavingFilter-ApplyPredicate diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_count b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_count index 8191cc8eb8c..69912f359f6 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_count +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_count @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_max_hopping b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_max_hopping index 227e2109026..3ccd4ddce20 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_max_hopping +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_max_hopping @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_min_hopping b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_min_hopping index 227e2109026..3ccd4ddce20 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_min_hopping +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_min_hopping @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topk_hopping b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topk_hopping index 2623177596e..41eb12b265c 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topk_hopping +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topk_hopping @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topkdistinct_hopping b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topkdistinct_hopping index 2623177596e..41eb12b265c 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topkdistinct_hopping +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/hopping-windows_-_topkdistinct_hopping @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/session-windows_-_max_session b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/session-windows_-_max_session index 227e2109026..3ccd4ddce20 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/session-windows_-_max_session +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/session-windows_-_max_session @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_max_tumbling b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_max_tumbling index 227e2109026..3ccd4ddce20 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_max_tumbling +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_max_tumbling @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_min_tumbling b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_min_tumbling index 227e2109026..3ccd4ddce20 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_min_tumbling +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_min_tumbling @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topk_tumbling b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topk_tumbling index 2623177596e..41eb12b265c 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topk_tumbling +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topk_tumbling @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topkdistinct_tumbling b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topkdistinct_tumbling index 2623177596e..41eb12b265c 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topkdistinct_tumbling +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/tumbling-windows_-_topkdistinct_tumbling @@ -58,14 +58,14 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 + --> Aggregate-Aggregate-WindowSelect <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) + Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) --> KTABLE-TOSTREAM-0000000007 - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-WindowSelect Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- Aggregate-Project diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_in_expressions b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_in_expressions index 14df146817e..8349c929087 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_in_expressions +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_in_expressions @@ -40,9 +40,9 @@ } CONFIGS_END CTAS_S2_0.KsqlTopic.Source = STRUCT NOT NULL -CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL -CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL -CTAS_S2_0.S2 = STRUCT NOT NULL +CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL +CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL +CTAS_S2_0.S2 = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 @@ -58,20 +58,17 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 - <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) --> Aggregate-Aggregate-WindowSelect - <-- Aggregate-Aggregate-ToOutputSchema + <-- KSTREAM-AGGREGATE-0000000003 Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) - --> KTABLE-TOSTREAM-0000000008 + --> KTABLE-TOSTREAM-0000000007 <-- Aggregate-Aggregate-WindowSelect - Processor: KTABLE-TOSTREAM-0000000008 (stores: []) - --> KSTREAM-SINK-0000000009 + Processor: KTABLE-TOSTREAM-0000000007 (stores: []) + --> KSTREAM-SINK-0000000008 <-- Aggregate-Project - Sink: KSTREAM-SINK-0000000009 (topic: S2) - <-- KTABLE-TOSTREAM-0000000008 + Sink: KSTREAM-SINK-0000000008 (topic: S2) + <-- KTABLE-TOSTREAM-0000000007 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_none b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_none deleted file mode 100644 index 7af3577ee09..00000000000 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_none +++ /dev/null @@ -1,71 +0,0 @@ -{ - "ksql.extension.dir" : "ext", - "ksql.streams.cache.max.bytes.buffering" : "0", - "ksql.security.extension.class" : null, - "ksql.transient.prefix" : "transient_", - "ksql.persistence.wrap.single.values" : "true", - "ksql.authorization.cache.expiry.time.secs" : "30", - "ksql.schema.registry.url" : "", - "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", - "ksql.output.topic.name.prefix" : "", - "ksql.streams.auto.offset.reset" : "earliest", - "ksql.connect.url" : "http://localhost:8083", - "ksql.service.id" : "some.ksql.service.id", - "ksql.internal.topic.min.insync.replicas" : "1", - "ksql.streams.shutdown.timeout.ms" : "300000", - "ksql.internal.topic.replicas" : "1", - "ksql.insert.into.values.enabled" : "true", - "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", - "ksql.access.validator.enable" : "auto", - "ksql.streams.bootstrap.servers" : "localhost:0", - "ksql.streams.commit.interval.ms" : "2000", - "ksql.metric.reporters" : "", - "ksql.streams.auto.commit.interval.ms" : "0", - "ksql.metrics.extension" : null, - "ksql.streams.topology.optimization" : "all", - "ksql.execution.plan.enable" : "false", - "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", - "ksql.streams.num.stream.threads" : "4", - "ksql.authorization.cache.max.entries" : "10000", - "ksql.metrics.tags.custom" : "", - "ksql.pull.queries.enable" : "true", - "ksql.udfs.enabled" : "true", - "ksql.udf.enable.security.manager" : "true", - "ksql.connect.worker.config" : "", - "ksql.query.pull.routing.timeout.ms" : "30000", - "ksql.sink.window.change.log.additional.retention" : "1000000", - "ksql.udf.collect.metrics" : "false", - "ksql.persistent.prefix" : "query_", - "ksql.query.persistent.active.limit" : "2147483647" -} -CONFIGS_END -CTAS_S2_0.KsqlTopic.Source = STRUCT NOT NULL -CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL -CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL -CTAS_S2_0.S2 = STRUCT NOT NULL -SCHEMAS_END -Topologies: - Sub-topology: 0 - Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) - --> KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) - --> Aggregate-Prepare - <-- KSTREAM-SOURCE-0000000000 - Processor: Aggregate-Prepare (stores: []) - --> KSTREAM-AGGREGATE-0000000003 - <-- KSTREAM-TRANSFORMVALUES-0000000001 - Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) - --> Aggregate-Aggregate-ToOutputSchema - <-- Aggregate-Prepare - Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Project - <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Project (stores: []) - --> KTABLE-TOSTREAM-0000000006 - <-- Aggregate-Aggregate-ToOutputSchema - Processor: KTABLE-TOSTREAM-0000000006 (stores: []) - --> KSTREAM-SINK-0000000007 - <-- Aggregate-Project - Sink: KSTREAM-SINK-0000000007 (topic: S2) - <-- KTABLE-TOSTREAM-0000000006 - diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_hopping b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_hopping index 14df146817e..cfc532eb288 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_hopping +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_hopping @@ -40,9 +40,9 @@ } CONFIGS_END CTAS_S2_0.KsqlTopic.Source = STRUCT NOT NULL -CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL -CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL -CTAS_S2_0.S2 = STRUCT NOT NULL +CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL +CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL +CTAS_S2_0.S2 = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 @@ -58,20 +58,17 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 - <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) --> Aggregate-Aggregate-WindowSelect - <-- Aggregate-Aggregate-ToOutputSchema + <-- KSTREAM-AGGREGATE-0000000003 Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) - --> KTABLE-TOSTREAM-0000000008 + --> KTABLE-TOSTREAM-0000000007 <-- Aggregate-Aggregate-WindowSelect - Processor: KTABLE-TOSTREAM-0000000008 (stores: []) - --> KSTREAM-SINK-0000000009 + Processor: KTABLE-TOSTREAM-0000000007 (stores: []) + --> KSTREAM-SINK-0000000008 <-- Aggregate-Project - Sink: KSTREAM-SINK-0000000009 (topic: S2) - <-- KTABLE-TOSTREAM-0000000008 + Sink: KSTREAM-SINK-0000000008 (topic: S2) + <-- KTABLE-TOSTREAM-0000000007 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_session b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_session index 14df146817e..cfc532eb288 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_session +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_session @@ -40,9 +40,9 @@ } CONFIGS_END CTAS_S2_0.KsqlTopic.Source = STRUCT NOT NULL -CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL -CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL -CTAS_S2_0.S2 = STRUCT NOT NULL +CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL +CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL +CTAS_S2_0.S2 = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 @@ -58,20 +58,17 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 - <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) --> Aggregate-Aggregate-WindowSelect - <-- Aggregate-Aggregate-ToOutputSchema + <-- KSTREAM-AGGREGATE-0000000003 Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) - --> KTABLE-TOSTREAM-0000000008 + --> KTABLE-TOSTREAM-0000000007 <-- Aggregate-Aggregate-WindowSelect - Processor: KTABLE-TOSTREAM-0000000008 (stores: []) - --> KSTREAM-SINK-0000000009 + Processor: KTABLE-TOSTREAM-0000000007 (stores: []) + --> KSTREAM-SINK-0000000008 <-- Aggregate-Project - Sink: KSTREAM-SINK-0000000009 (topic: S2) - <-- KTABLE-TOSTREAM-0000000008 + Sink: KSTREAM-SINK-0000000008 (topic: S2) + <-- KTABLE-TOSTREAM-0000000007 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_tumbling b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_tumbling index 14df146817e..cfc532eb288 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_tumbling +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/window-bounds_-_table_tumbling @@ -40,9 +40,9 @@ } CONFIGS_END CTAS_S2_0.KsqlTopic.Source = STRUCT NOT NULL -CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL -CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL -CTAS_S2_0.S2 = STRUCT NOT NULL +CTAS_S2_0.Aggregate.GroupBy = STRUCT NOT NULL +CTAS_S2_0.Aggregate.Aggregate.Materialize = STRUCT NOT NULL +CTAS_S2_0.S2 = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 @@ -58,20 +58,17 @@ Topologies: --> Aggregate-Aggregate-ToOutputSchema <-- Aggregate-Prepare Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) - --> Aggregate-Aggregate-WindowSelect2 - <-- KSTREAM-AGGREGATE-0000000003 - Processor: Aggregate-Aggregate-WindowSelect2 (stores: []) --> Aggregate-Aggregate-WindowSelect - <-- Aggregate-Aggregate-ToOutputSchema + <-- KSTREAM-AGGREGATE-0000000003 Processor: Aggregate-Aggregate-WindowSelect (stores: []) --> Aggregate-Project - <-- Aggregate-Aggregate-WindowSelect2 + <-- Aggregate-Aggregate-ToOutputSchema Processor: Aggregate-Project (stores: []) - --> KTABLE-TOSTREAM-0000000008 + --> KTABLE-TOSTREAM-0000000007 <-- Aggregate-Aggregate-WindowSelect - Processor: KTABLE-TOSTREAM-0000000008 (stores: []) - --> KSTREAM-SINK-0000000009 + Processor: KTABLE-TOSTREAM-0000000007 (stores: []) + --> KSTREAM-SINK-0000000008 <-- Aggregate-Project - Sink: KSTREAM-SINK-0000000009 (topic: S2) - <-- KTABLE-TOSTREAM-0000000008 + Sink: KSTREAM-SINK-0000000008 (topic: S2) + <-- KTABLE-TOSTREAM-0000000007 diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json b/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json index dd6abe149b2..6d62aedc868 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json @@ -139,6 +139,28 @@ ] } }, + { + "name": "fields used in expression", + "statements": [ + "CREATE STREAM TEST (f1 INT, f2 INT) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE TABLE OUTPUT AS SELECT f1 / f2, COUNT(*) FROM TEST GROUP BY f1, f2;" + ], + "inputs": [ + {"topic": "test_topic", "value": "4,2"}, + {"topic": "test_topic", "value": "9,3"}, + {"topic": "test_topic", "value": "9,3"} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "4|+|2", "value": "2,1"}, + {"topic": "OUTPUT", "key": "9|+|3", "value": "3,1"}, + {"topic": "OUTPUT", "key": "9|+|3", "value": "3,2"} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "table", "schema": "ROWKEY STRING KEY, `KSQL_COL_0` INTEGER, `KSQL_COL_1` BIGINT"} + ] + } + }, { "name": "fields (stream->table) - format", "statements": [ diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/window-bounds.json b/ksql-functional-tests/src/test/resources/query-validation-tests/window-bounds.json index 4f9be815439..9b9102f0203 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/window-bounds.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/window-bounds.json @@ -7,7 +7,7 @@ "name": "table session", "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", - "CREATE TABLE S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW SESSION (30 SECONDS) group by id;" + "CREATE TABLE S2 as SELECT id, WindowStart as wstart, WindowEnd as wend, COUNT(1) AS Count FROM test WINDOW SESSION (30 SECONDS) group by id;" ], "inputs": [ {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 0}, @@ -16,12 +16,12 @@ {"topic": "test_topic", "key": 1,"value": "1", "timestamp": 40000} ], "outputs": [ - {"topic": "S2", "key": 0,"value": "0,0,0", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, + {"topic": "S2", "key": 0,"value": "0,0,0,1", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, {"topic": "S2", "key": 0,"value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, - {"topic": "S2", "key": 0,"value": "0,0,10000", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, - {"topic": "S2", "key": 1,"value": "1,10000,10000", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, + {"topic": "S2", "key": 0,"value": "0,0,10000,2", "timestamp": 10000, "window": {"start": 0, "end": 10000, "type": "session"}}, + {"topic": "S2", "key": 1,"value": "1,10000,10000,1", "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, {"topic": "S2", "key": 1,"value": null, "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, - {"topic": "S2", "key": 1,"value": "1,10000,40000", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} + {"topic": "S2", "key": 1,"value": "1,10000,40000,2", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} ], "properties": { "ksql.windowed.session.key.legacy": "false" @@ -31,7 +31,7 @@ "name": "table tumbling", "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", - "CREATE TABLE S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;" + "CREATE TABLE S2 as SELECT id, WindowStart as wstart, WindowEnd as wend, COUNT(1) AS Count FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;" ], "inputs": [ {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 0}, @@ -43,20 +43,20 @@ {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 70000} ], "outputs": [ - {"topic": "S2", "key": 0,"value": "0,0,30000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,0,30000", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,30000,60000", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,30000,60000", "timestamp": 45000, "window": {"start": 30000, "end": 60000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,30000,60000", "timestamp": 50000, "window": {"start": 30000, "end": 60000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,30000,60000", "timestamp": 35000, "window": {"start": 30000, "end": 60000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,60000,90000", "timestamp": 70000, "window": {"start": 60000, "end": 90000, "type": "time"}} + {"topic": "S2", "key": 0,"value": "0,0,30000,1", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0,"value": "0,0,30000,2", "timestamp": 10000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100,"value": "100,30000,60000,1", "timestamp": 30000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 100,"value": "100,30000,60000,2", "timestamp": 45000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 100,"value": "100,30000,60000,3", "timestamp": 50000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 0,"value": "0,30000,60000,1", "timestamp": 35000, "window": {"start": 30000, "end": 60000, "type": "time"}}, + {"topic": "S2", "key": 0,"value": "0,60000,90000,1", "timestamp": 70000, "window": {"start": 60000, "end": 90000, "type": "time"}} ] }, { "name": "table hopping", "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", - "CREATE TABLE S2 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS) group by id;" + "CREATE TABLE S2 as SELECT id, WindowStart as wstart, WindowEnd as wend, COUNT(1) AS Count FROM test WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS) group by id;" ], "inputs": [ {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 0}, @@ -65,50 +65,30 @@ {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 5000} ], "outputs": [ - {"topic": "S2", "key": 0,"value": "0,0,30000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,0,30000", "timestamp": 2000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,0,30000", "timestamp": 4999, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,0,30000", "timestamp": 5000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,5000,35000", "timestamp": 5000, "window": {"start": 5000, "end": 35000, "type": "time"}} - ] - }, - { - "name": "none", - "comment" : "Without a WINDOW statement the methods will return NULL", - "statements": [ - "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", - "CREATE TABLE S2 as SELECT id, WindowStart(), WindowEnd() FROM test group by id;" - ], - "inputs": [ - {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 0}, - {"topic": "test_topic", "key": 100,"value": "100", "timestamp": 2000}, - {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 4999}, - {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 5000} - ], - "outputs": [ - {"topic": "S2", "key": 0,"value": "0,,", "timestamp": 0}, - {"topic": "S2", "key": 100,"value": "100,,", "timestamp": 2000}, - {"topic": "S2", "key": 0,"value": "0,,", "timestamp": 4999}, - {"topic": "S2", "key": 0,"value": "0,,", "timestamp": 5000} + {"topic": "S2", "key": 0,"value": "0,0,30000,1", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100,"value": "100,0,30000,1", "timestamp": 2000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0,"value": "0,0,30000,2", "timestamp": 4999, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0,"value": "0,0,30000,3", "timestamp": 5000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 0,"value": "0,5000,35000,1", "timestamp": 5000, "window": {"start": 5000, "end": 35000, "type": "time"}} ] }, { "name": "in expressions", "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", - "CREATE TABLE S2 as SELECT id, WindowStart() / 2, WindowEnd() / 2 FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;" + "CREATE TABLE S2 as SELECT id, Test.WindowStart / 2, WindowEnd / id, COUNT(1) AS Count FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;" ], "inputs": [ - {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 0}, + {"topic": "test_topic", "key": 2,"value": "2", "timestamp": 0}, {"topic": "test_topic", "key": 100,"value": "100", "timestamp": 2000}, - {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 4999}, - {"topic": "test_topic", "key": 0,"value": "0", "timestamp": 5000} + {"topic": "test_topic", "key": 2,"value": "2", "timestamp": 4999}, + {"topic": "test_topic", "key": 2,"value": "2", "timestamp": 5000} ], "outputs": [ - {"topic": "S2", "key": 0,"value": "0,0,15000", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 100,"value": "100,0,15000", "timestamp": 2000, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,0,15000", "timestamp": 4999, "window": {"start": 0, "end": 30000, "type": "time"}}, - {"topic": "S2", "key": 0,"value": "0,0,15000", "timestamp": 5000, "window": {"start": 0, "end": 30000, "type": "time"}} + {"topic": "S2", "key": 2,"value": "2,0,15000,1", "timestamp": 0, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 100,"value": "100,0,300,1", "timestamp": 2000, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 2,"value": "2,0,15000,2", "timestamp": 4999, "window": {"start": 0, "end": 30000, "type": "time"}}, + {"topic": "S2", "key": 2,"value": "2,0,15000,3", "timestamp": 5000, "window": {"start": 0, "end": 30000, "type": "time"}} ] }, { @@ -150,14 +130,30 @@ } }, { - "name": "windowed group by - aggregate window bounds in SELECT", + "name": "windowed group by - window bounds used in expression with aggregate in SELECT", "statements": [ "CREATE STREAM TEST (ROWKEY INT KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE S2 as SELECT count(1) * windowend FROM test WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY;" ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Field used in aggregate SELECT expression(s) outside of aggregate functions not part of GROUP BY: [TEST.WINDOWEND]" + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {}, "timestamp": 10345}, + {"topic": "test_topic", "key": 0, "value": {}, "timestamp": 10445}, + {"topic": "test_topic", "key": 0, "value": {}, "timestamp": 13251} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": {"KSQL_COL_0": 11000}, "timestamp": 10345, "window": {"start": 10000, "end": 11000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": {"KSQL_COL_0": 22000}, "timestamp": 10445, "window": {"start": 10000, "end": 11000, "type": "time"}}, + {"topic": "S2", "key": 0, "value": {"KSQL_COL_0": 14000}, "timestamp": 13251, "window": {"start": 13000, "end": 14000, "type": "time"}} + ], + "post": { + "sources": [ + { + "name": "S2", + "type": "table", + "keyFormat": {"format": "KAFKA", "windowType": "TUMBLING", "windowSize": 1000}, + "schema": "`ROWKEY` INTEGER KEY, `KSQL_COL_0` BIGINT" + } + ] } }, { diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index 9e29978f636..62b63427d14 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -975,28 +975,6 @@ ]} ] }, - { - "name": "windowStart and windowEnd UDAFs", - "statements": [ - "CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WEND, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY LANG;", - "SELECT WSTART, WEND, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en';" - ], - "inputs": [ - {"topic": "test_topic", "timestamp": 1580223282123, "value": {"lang": "en"}}, - {"topic": "test_topic", "timestamp": 1580223282323, "value": {"lang": "en"}}, - {"topic": "test_topic", "timestamp": 1580223283123, "value": {"lang": "en"}} - ], - "responses": [ - {"admin": {"@type": "currentStatus"}}, - {"admin": {"@type": "currentStatus"}}, - {"query": [ - {"header":{"schema":"`WSTART` STRING, `WEND` STRING, `LANG` STRING, `TWEET_COUNT` BIGINT"}}, - {"row":{"columns":["2020-01-28 14:54:42 +0000", "2020-01-28 14:54:43 +0000", "en", 2]}}, - {"row":{"columns":["2020-01-28 14:54:43 +0000", "2020-01-28 14:54:44 +0000", "en", 1]}} - ]} - ] - }, { "name": "access window bounds in selection", "statements": [ diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index d24587b50b0..a2e1498595e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -694,7 +694,7 @@ private List> handleSelects( final Stacker contextStacker ) { final boolean noSystemColumns = analysis.getSelectColumnRefs().stream() - .noneMatch(ref -> SchemaUtil.isSystemColumn(ref)); + .noneMatch(SchemaUtil::isSystemColumn); final LogicalSchema intermediateSchema; final Function preSelectTransform; diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParams.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParams.java index a99c842abdf..9c8ca3f5778 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParams.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParams.java @@ -18,7 +18,6 @@ import io.confluent.ksql.execution.function.udaf.KudafAggregator; import io.confluent.ksql.execution.function.udaf.KudafInitializer; import io.confluent.ksql.execution.function.udaf.KudafUndoAggregator; -import io.confluent.ksql.execution.transform.window.WindowSelectMapper; import io.confluent.ksql.schema.ksql.LogicalSchema; import java.util.Objects; import java.util.Optional; @@ -27,7 +26,6 @@ public final class AggregateParams { private final KudafInitializer initializer; private final KudafAggregator aggregator; private final Optional undoAggregator; - private final WindowSelectMapper windowSelectMapper; private final LogicalSchema aggregateSchema; private final LogicalSchema schema; @@ -35,14 +33,11 @@ public final class AggregateParams { final KudafInitializer initializer, final KudafAggregator aggregator, final Optional undoAggregator, - final WindowSelectMapper windowSelectMapper, final LogicalSchema aggregateSchema, final LogicalSchema schema) { this.initializer = Objects.requireNonNull(initializer, "initializer"); this.aggregator = Objects.requireNonNull(aggregator, "aggregator"); this.undoAggregator = Objects.requireNonNull(undoAggregator, "undoAggregator"); - this.windowSelectMapper - = Objects.requireNonNull(windowSelectMapper, "windowSelectMapper"); this.aggregateSchema = Objects.requireNonNull(aggregateSchema, "aggregateSchema"); this.schema = Objects.requireNonNull(schema, "schema"); } @@ -60,10 +55,6 @@ Optional getUndoAggregator() { return undoAggregator; } - WindowSelectMapper getWindowSelectMapper() { - return windowSelectMapper; - } - public LogicalSchema getSchema() { return schema; } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParamsFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParamsFactory.java index ed7a0743ea5..c84551b5b89 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParamsFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateParamsFactory.java @@ -23,7 +23,6 @@ import io.confluent.ksql.execution.function.udaf.KudafAggregator; import io.confluent.ksql.execution.function.udaf.KudafInitializer; import io.confluent.ksql.execution.function.udaf.KudafUndoAggregator; -import io.confluent.ksql.execution.transform.window.WindowSelectMapper; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.name.ColumnName; @@ -110,7 +109,6 @@ private AggregateParams create( new KudafInitializer(nonAggregateColumns.size(), initialValueSuppliers), aggregatorFactory.create(nonAggregateColumns.size(), functions), undoAggregator, - new WindowSelectMapper(nonAggregateColumns.size(), functions), aggregateSchema, outputSchema ); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java index 725b19c9204..b86f01f2a8a 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java @@ -29,7 +29,6 @@ import io.confluent.ksql.execution.streams.transform.KsTransformer; import io.confluent.ksql.execution.transform.KsqlProcessingContext; import io.confluent.ksql.execution.transform.KsqlTransformer; -import io.confluent.ksql.execution.transform.window.WindowSelectMapper; import io.confluent.ksql.execution.windows.HoppingWindowExpression; import io.confluent.ksql.execution.windows.KsqlWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; @@ -195,7 +194,7 @@ static KTableHolder> build( () -> new KsTransformer<>(new WindowBoundsPopulator()), Named.as(StreamsUtil.buildOpName( AggregateBuilderUtils.windowSelectContext(aggregate) - ) + "2") + )) ); materializationBuilder.map( @@ -204,20 +203,6 @@ static KTableHolder> build( AggregateBuilderUtils.windowSelectContext(aggregate) ); - final WindowSelectMapper windowSelectMapper = aggregateParams.getWindowSelectMapper(); - if (windowSelectMapper.hasSelects()) { - reduced = reduced.transformValues( - () -> new KsTransformer<>(windowSelectMapper.getTransformer()), - Named.as(StreamsUtil.buildOpName(AggregateBuilderUtils.windowSelectContext(aggregate))) - ); - - materializationBuilder.map( - pl -> (KsqlTransformer) windowSelectMapper.getTransformer(), - resultSchema, - AggregateBuilderUtils.windowSelectContext(aggregate) - ); - } - return KTableHolder.materialized( reduced, resultSchema, diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java index f99394c5b21..59c8f7419a4 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java @@ -11,7 +11,6 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.expression.tree.FunctionCall; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.execution.function.TableAggregationFunction; @@ -21,8 +20,6 @@ import io.confluent.ksql.execution.streams.AggregateParamsFactory.KudafAggregatorFactory; import io.confluent.ksql.execution.streams.AggregateParamsFactory.KudafUndoAggregatorFactory; import io.confluent.ksql.execution.transform.KsqlProcessingContext; -import io.confluent.ksql.execution.transform.KsqlTransformer; -import io.confluent.ksql.execution.transform.window.WindowSelectMapper; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.name.ColumnName; @@ -32,9 +29,6 @@ import io.confluent.ksql.util.SchemaUtil; import java.util.List; import java.util.Optional; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -206,40 +200,6 @@ public void shouldReturnUndoAggregator() { assertThat(undoAggregator, is(undoAggregator)); } - @Test - public void shouldReturnCorrectWindowSelectMapperForNonWindowSelections() { - // When: - final WindowSelectMapper windowSelectMapper = aggregateParams.getWindowSelectMapper(); - - // Then: - assertThat(windowSelectMapper.hasSelects(), is(false)); - } - - @Test - public void shouldReturnCorrectWindowSelectMapperForWindowSelections() { - // Given: - aggregateParams = new AggregateParamsFactory(udafFactory, undoUdafFactory).create( - INPUT_SCHEMA, - NON_AGG_COLUMNS, - functionRegistry, - ImmutableList.of(WINDOW_START), - false - ); - - // When: - final KsqlTransformer, GenericRow> windowSelectMapper = - aggregateParams - .getWindowSelectMapper() - .getTransformer(); - - // Then: - final Windowed window = new Windowed<>(null, new TimeWindow(10, 20)); - assertThat( - windowSelectMapper.transform(window, genericRow("fiz", "baz", null), ctx), - equalTo(genericRow("fiz", "baz", 10L)) - ); - } - @Test public void shouldReturnCorrectAggregateSchema() { // When: diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java index 3d99df5e3d3..79004eb4700 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamAggregateBuilderTest.java @@ -49,7 +49,6 @@ import io.confluent.ksql.execution.plan.StreamWindowedAggregate; import io.confluent.ksql.execution.transform.KsqlProcessingContext; import io.confluent.ksql.execution.transform.KsqlTransformer; -import io.confluent.ksql.execution.transform.window.WindowSelectMapper; import io.confluent.ksql.execution.windows.HoppingWindowExpression; import io.confluent.ksql.execution.windows.SessionWindowExpression; import io.confluent.ksql.execution.windows.TumblingWindowExpression; @@ -155,8 +154,6 @@ public class StreamAggregateBuilderTest { @Mock private KTable, GenericRow> windowedWithWindowBounds; @Mock - private KTable, GenericRow> windowedWithWindowUdafs; - @Mock private KsqlQueryBuilder queryBuilder; @Mock private FunctionRegistry functionRegistry; @@ -171,8 +168,6 @@ public class StreamAggregateBuilderTest { @Mock private KsqlTransformer resultMapper; @Mock - private WindowSelectMapper windowSelectMapper; - @Mock private Merger merger; @Mock private MaterializedFactory materializedFactory; @@ -210,8 +205,6 @@ public void init() { when(aggregator.getMerger()).thenReturn(merger); when(aggregator.getResultMapper()).thenReturn(resultMapper); when(aggregateParams.getInitializer()).thenReturn(initializer); - when(aggregateParams.getWindowSelectMapper()).thenReturn(windowSelectMapper); - when(windowSelectMapper.hasSelects()).thenReturn(false); planBuilder = new KSPlanBuilder( queryBuilder, @@ -439,6 +432,8 @@ public void shouldBuildTumblingWindowedAggregateCorrectly() { inOrder.verify(windowed).transformValues(any(), any(Named.class)); inOrder.verify(windowedWithResults).transformValues(any(), any(Named.class)); inOrder.verifyNoMoreInteractions(); + + assertThat(result.getTable(), is(windowedWithWindowBounds)); } @Test @@ -463,6 +458,8 @@ public void shouldBuildHoppingWindowedAggregateCorrectly() { inOrder.verify(windowed).transformValues(any(), any(Named.class)); inOrder.verify(windowedWithResults).transformValues(any(), any(Named.class)); inOrder.verifyNoMoreInteractions(); + + assertThat(result.getTable(), is(windowedWithWindowBounds)); } @Test @@ -492,6 +489,8 @@ public void shouldBuildSessionWindowedAggregateCorrectly() { inOrder.verify(windowed).transformValues(any(), any(Named.class)); inOrder.verify(windowedWithResults).transformValues(any(), any(Named.class)); inOrder.verifyNoMoreInteractions(); + + assertThat(result.getTable(), is(windowedWithWindowBounds)); } @Test @@ -637,28 +636,6 @@ public void shouldBuildAggregatorParamsCorrectlyForWindowedAggregate() { } } - @Test - @SuppressWarnings("unchecked") - public void shouldAddWindowBoundariesIfSpecified() { - for (final Runnable given : given()) { - // Given: - clearInvocations(groupedStream, timeWindowedStream, sessionWindowedStream, windowed, - windowedWithResults, windowedWithWindowBounds); - when(windowSelectMapper.hasSelects()).thenReturn(true); - when(windowedWithWindowBounds.transformValues(any(), any(Named.class))).thenReturn( - (KTable) windowedWithWindowUdafs); - given.run(); - - // When: - final KTableHolder> result = - windowedAggregate.build(planBuilder); - - // Then: - assertThat(result.getTable(), is(windowedWithWindowUdafs)); - verify(windowedWithWindowBounds).transformValues(any(), any(Named.class)); - } - } - private void assertCorrectMaterializationBuilder( final KTableHolder result, final boolean windowed