diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java index a1061efc35a2..9e0fd4d0c73a 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/window/WindowSelectMapper.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; @@ -60,15 +61,15 @@ public boolean hasSelects() { return !windowSelects.isEmpty(); } - public KsqlTransformer, GenericRow> getTransformer() { - return new Transformer<>(); + public KsqlTransformer, GenericRow> getTransformer() { + return new Transformer(); } - private final class Transformer implements KsqlTransformer, GenericRow> { + private final class Transformer implements KsqlTransformer, GenericRow> { @Override public GenericRow transform( - final Windowed readOnlyKey, + final Windowed readOnlyKey, final GenericRow value, final KsqlProcessingContext ctx ) { diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java index b41d9dd13d26..904c908bddaf 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java @@ -31,14 +31,6 @@ */ public final class StructKeyUtil { - private static final Schema ROWKEY_STRUCT_SCHEMA = SchemaBuilder - .struct() - .field(SchemaUtil.ROWKEY_NAME.name(), Schema.OPTIONAL_STRING_SCHEMA) - .build(); - - private static final org.apache.kafka.connect.data.Field ROWKEY_FIELD = - ROWKEY_STRUCT_SCHEMA.fields().get(0); - private StructKeyUtil() { } diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java index 9b860e1275f5..3d78278ae022 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/transform/window/WindowSelectMapperTest.java @@ -25,8 +25,11 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.transform.KsqlProcessingContext; import io.confluent.ksql.execution.transform.KsqlTransformer; +import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.name.FunctionName; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; @@ -39,6 +42,9 @@ @RunWith(MockitoJUnitRunner.class) public class WindowSelectMapperTest { + private static final Struct A_KEY = StructKeyUtil.keyBuilder(SqlTypes.STRING) + .build("key"); + @Mock private KsqlAggregateFunction windowStartFunc; @Mock @@ -76,7 +82,7 @@ public void shouldDetectWindowEndSelects() { @Test public void shouldUpdateRowWithWindowBounds() { // Given: - final KsqlTransformer, GenericRow> mapper = new WindowSelectMapper( + final KsqlTransformer, GenericRow> mapper = new WindowSelectMapper( 1, ImmutableList.of(otherFunc, windowStartFunc, windowEndFunc, windowStartFunc) ).getTransformer(); @@ -85,7 +91,7 @@ public void shouldUpdateRowWithWindowBounds() { final GenericRow row = genericRow(0, 1, 2, 3, 4, 5); // When: - final GenericRow result = mapper.transform(new Windowed<>("k", window), row, ctx); + final GenericRow result = mapper.transform(new Windowed<>(A_KEY, window), row, ctx); // Then: assertThat(result, is(sameInstance(row))); @@ -95,7 +101,7 @@ public void shouldUpdateRowWithWindowBounds() { @Test(expected = IndexOutOfBoundsException.class) public void shouldThrowIfRowNotBigEnough() { // Given: - final KsqlTransformer, GenericRow> mapper = new WindowSelectMapper( + final KsqlTransformer, GenericRow> mapper = new WindowSelectMapper( 0, ImmutableList.of(windowStartFunc) ).getTransformer(); @@ -104,6 +110,6 @@ public void shouldThrowIfRowNotBigEnough() { final GenericRow row = genericRow(); // When: - mapper.transform(new Windowed<>("k", window), row, ctx); + mapper.transform(new Windowed<>(A_KEY, window), row, ctx); } } \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index f580818fc136..188ab33b9b59 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -974,6 +974,28 @@ {"row":{"columns":["10", "2020-01-29 17:19:45 +0000", "2020-01-29 17:19:46 +0000", 1]}} ]} ] + }, + { + "name": "windowStart and windowEnd UDAFs", + "statements": [ + "CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WEND, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY LANG;", + "SELECT WSTART, WEND, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en';" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 1580223282123, "value": {"lang": "en"}}, + {"topic": "test_topic", "timestamp": 1580223282323, "value": {"lang": "en"}}, + {"topic": "test_topic", "timestamp": 1580223283123, "value": {"lang": "en"}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`WSTART` STRING, `WEND` STRING, `LANG` STRING, `TWEET_COUNT` BIGINT"}}, + {"row":{"columns":["2020-01-28 14:54:42 +0000", "2020-01-28 14:54:43 +0000", "en", 2]}}, + {"row":{"columns":["2020-01-28 14:54:43 +0000", "2020-01-28 14:54:44 +0000", "en", 1]}} + ]} + ] } ] } \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java index ac0e474c0e8d..ce13773ad4ea 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java @@ -26,7 +26,6 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.Row; import io.confluent.ksql.execution.streams.materialization.TableRow; -import io.confluent.ksql.execution.streams.materialization.Window; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; @@ -36,6 +35,8 @@ import io.confluent.ksql.util.SchemaUtil; import java.time.Instant; import java.util.List; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Test; public class TableRowsEntityFactoryTest { @@ -87,21 +88,19 @@ public void shouldAddNonWindowedRowToValues() { public void shouldAddWindowedRowToValues() { // Given: final Instant now = Instant.now(); - final Window window0 = Window.of(now, now.plusMillis(2)); - final Window window1 = Window.of(now, now.plusMillis(1)); + final TimeWindow window0 = new TimeWindow(now.toEpochMilli(), now.plusMillis(2).toEpochMilli()); + final TimeWindow window1 = new TimeWindow(now.toEpochMilli(), now.plusMillis(1).toEpochMilli()); final List input = ImmutableList.of( WindowedRow.of( SIMPLE_SCHEMA, - STRING_KEY_BUILDER.build("x"), - window0, + new Windowed<>(STRING_KEY_BUILDER.build("x"), window0), genericRow(true), ROWTIME ), WindowedRow.of( SIMPLE_SCHEMA, - STRING_KEY_BUILDER.build("y"), - window1, + new Windowed<>(STRING_KEY_BUILDER.build("y"), window1), genericRow(false), ROWTIME ) diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/LagReportingAgentTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/LagReportingAgentTest.java index fb991f63ca95..a38a72994aa8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/LagReportingAgentTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/LagReportingAgentTest.java @@ -10,7 +10,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.rest.entity.HostStatusEntity; import io.confluent.ksql.rest.entity.HostStoreLags; import io.confluent.ksql.rest.entity.KsqlHostEntity; import io.confluent.ksql.rest.entity.LagInfoEntity; diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java index c401d2747b62..b0451edf7254 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java @@ -27,6 +27,7 @@ import io.confluent.ksql.execution.plan.StreamAggregate; import io.confluent.ksql.execution.plan.StreamWindowedAggregate; import io.confluent.ksql.execution.streams.transform.KsTransformer; +import io.confluent.ksql.execution.transform.KsqlTransformer; import io.confluent.ksql.execution.transform.window.WindowSelectMapper; import io.confluent.ksql.execution.windows.HoppingWindowExpression; import io.confluent.ksql.execution.windows.KsqlWindowExpression; @@ -140,6 +141,7 @@ public static KTableHolder> build( ); } + @SuppressWarnings({"rawtypes", "unchecked"}) static KTableHolder> build( final KGroupedStreamHolder groupedStream, final StreamWindowedAggregate aggregate, @@ -191,6 +193,12 @@ static KTableHolder> build( () -> new KsTransformer<>(windowSelectMapper.getTransformer()), Named.as(StreamsUtil.buildOpName(AggregateBuilderUtils.windowSelectContext(aggregate))) ); + + materializationBuilder.map( + pl -> (KsqlTransformer) windowSelectMapper.getTransformer(), + aggregateSchema, + AggregateBuilderUtils.windowSelectContext(aggregate) + ); } return KTableHolder.materialized( diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java index 49d65b4f6735..55d7b8e3a1ce 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java @@ -105,7 +105,7 @@ public MaterializedWindowedTable windowed() { } private Optional filterAndTransform( - final Struct key, + final Object key, final GenericRow value, final long rowTime ) { @@ -157,7 +157,7 @@ public List get(final Struct key, final Range windowStart) final Builder builder = ImmutableList.builder(); for (final WindowedRow row : result) { - filterAndTransform(key, row.value(), row.rowTime()) + filterAndTransform(row.windowedKey(), row.value(), row.rowTime()) .ifPresent(v -> builder.add(row.withValue(v, schema()))); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/WindowedRow.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/WindowedRow.java index cb522fefe6b7..13f11c1fd90f 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/WindowedRow.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/WindowedRow.java @@ -24,43 +24,40 @@ import java.util.Objects; import java.util.Optional; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.Windowed; public final class WindowedRow implements TableRow { private final LogicalSchema schema; - private final Window window; - private final Struct key; + private final Windowed key; private final GenericRow value; private final long rowTime; private final Validator validator; public static WindowedRow of( final LogicalSchema schema, - final Struct key, - final Window window, + final Windowed key, final GenericRow value, final long rowTime ) { - return new WindowedRow(schema, key, window, value, rowTime, TableRowValidation::validate); + return new WindowedRow(schema, key, value, rowTime, TableRowValidation::validate); } @VisibleForTesting WindowedRow( final LogicalSchema schema, - final Struct key, - final Window window, + final Windowed key, final GenericRow value, final long rowTime, final Validator validator ) { this.schema = requireNonNull(schema, "schema"); this.key = requireNonNull(key, "key"); - this.window = requireNonNull(window, "window"); this.value = requireNonNull(value, "value"); this.rowTime = rowTime; this.validator = requireNonNull(validator, "validator"); - validator.validate(schema, key, value); + validator.validate(schema, key.key(), value); } @Override @@ -75,12 +72,19 @@ public long rowTime() { @Override public Struct key() { + return key.key(); + } + + public Windowed windowedKey() { return key; } @Override public Optional window() { - return Optional.of(window); + return Optional.of(Window.of( + key.window().startTime(), + key.window().endTime() + )); } @Override @@ -96,7 +100,6 @@ public WindowedRow withValue( return new WindowedRow( newSchema, key, - window, newValue, rowTime, validator @@ -114,21 +117,19 @@ public boolean equals(final Object o) { final WindowedRow that = (WindowedRow) o; return Objects.equals(schema, that.schema) && Objects.equals(key, that.key) - && Objects.equals(window, that.window) && Objects.equals(value, that.value) && Objects.equals(rowTime, that.rowTime); } @Override public int hashCode() { - return Objects.hash(key, window, value, schema, rowTime); + return Objects.hash(key, value, schema, rowTime); } @Override public String toString() { return "WindowedRow{" + "key=" + key - + ", window=" + window + ", value=" + value + ", rowTime=" + rowTime + ", schema=" + schema diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java index e12d5381b1a0..265273eb57e2 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java @@ -21,7 +21,6 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.MaterializationException; import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; -import io.confluent.ksql.execution.streams.materialization.Window; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import java.time.Instant; import java.util.List; @@ -73,16 +72,13 @@ private List findSession( if (windowStart.contains(next.key.window().startTime())) { - final Window window = Window.of( - next.key.window().startTime(), - next.key.window().endTime() - ); + final long rowTime = next.key.window().end(); final WindowedRow row = WindowedRow.of( stateStore.schema(), - key, window, + next.key, next.value, - next.key.window().end() + rowTime ); builder.add(row); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java index fbcdee4d66e1..c2c224605d5b 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java @@ -21,7 +21,6 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.MaterializationException; import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; -import io.confluent.ksql.execution.streams.materialization.Window; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import java.time.Duration; import java.time.Instant; @@ -29,6 +28,8 @@ import java.util.Objects; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -76,12 +77,12 @@ public List get( final Instant windowEnd = windowStart.plus(windowSize); - final Window window = Window.of(windowStart, windowEnd); + final TimeWindow window = + new TimeWindow(windowStart.toEpochMilli(), windowEnd.toEpochMilli()); final WindowedRow row = WindowedRow.of( stateStore.schema(), - key, - window, + new Windowed<>(key, window), next.value.value(), next.value.timestamp() ); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java index 52eeb53572e3..d68d226aee4b 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/AggregateParamsFactoryTest.java @@ -32,6 +32,7 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.util.List; import java.util.Optional; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Before; @@ -224,13 +225,13 @@ public void shouldReturnCorrectWindowSelectMapperForWindowSelections() { ); // When: - final KsqlTransformer, GenericRow> windowSelectMapper = + final KsqlTransformer, GenericRow> windowSelectMapper = aggregateParams .getWindowSelectMapper() .getTransformer(); // Then: - final Windowed window = new Windowed<>(null, new TimeWindow(10, 20)); + final Windowed window = new Windowed<>(null, new TimeWindow(10, 20)); assertThat( windowSelectMapper.transform(window, genericRow("fiz", "baz", null), ctx), equalTo(genericRow("fiz", "baz", 10L)) diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java index 9003e12d12d6..4136240bddec 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java @@ -46,6 +46,9 @@ import java.util.List; import java.util.Optional; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -71,6 +74,10 @@ public class KsqlMaterializationTest { private static final GenericRow A_VALUE = GenericRow.genericRow("a", "b"); private static final GenericRow TRANSFORMED = GenericRow.genericRow("x", "y"); private static final Window A_WINDOW = Window.of(Instant.now(), Instant.now().plusMillis(10)); + private static final TimeWindow STREAM_WINDOW = new TimeWindow( + A_WINDOW.start().toEpochMilli(), + A_WINDOW.end().toEpochMilli() + ); private static final Row ROW = Row.of( SCHEMA, @@ -81,8 +88,7 @@ public class KsqlMaterializationTest { private static final WindowedRow WINDOWED_ROW = WindowedRow.of( SCHEMA, - A_KEY, - A_WINDOW, + new Windowed<>(A_KEY, STREAM_WINDOW), A_VALUE, A_ROWTIME ); @@ -224,7 +230,11 @@ public void shouldCallFilterWithCorrectValuesOnWindowedGet() { table.get(A_KEY, WINDOW_START_BOUNDS); // Then: - verify(filter).apply(A_KEY, A_VALUE, new PullProcessingContext(A_ROWTIME)); + verify(filter).apply( + new Windowed<>(A_KEY, STREAM_WINDOW), + A_VALUE, + new PullProcessingContext(A_ROWTIME) + ); } @Test @@ -338,7 +348,11 @@ public void shouldPipeTransformsWindowed() { table.get(A_KEY, WINDOW_START_BOUNDS); // Then: - verify(filter).apply(A_KEY, TRANSFORMED, new PullProcessingContext(A_ROWTIME)); + verify(filter).apply( + new Windowed<>(A_KEY, STREAM_WINDOW), + TRANSFORMED, + new PullProcessingContext(A_ROWTIME) + ); } @SuppressWarnings("OptionalGetWithoutIsPresent") @@ -382,14 +396,20 @@ public void shouldMaintainResultOrdering() { final MaterializedWindowedTable table = materialization.windowed(); givenNoopTransforms(); - final Window window1 = mock(Window.class); - final Window window2 = mock(Window.class); - final Window window3 = mock(Window.class); + final Instant now = Instant.now(); + final TimeWindow window1 = + new TimeWindow(now.plusMillis(10).toEpochMilli(), now.plusMillis(11).toEpochMilli()); + + final SessionWindow window2 = + new SessionWindow(now.toEpochMilli(), now.plusMillis(2).toEpochMilli()); + + final TimeWindow window3 = + new TimeWindow(now.toEpochMilli(), now.plusMillis(3).toEpochMilli()); final ImmutableList rows = ImmutableList.of( - WindowedRow.of(SCHEMA, A_KEY, window1, A_VALUE, A_ROWTIME), - WindowedRow.of(SCHEMA, A_KEY, window2, A_VALUE, A_ROWTIME), - WindowedRow.of(SCHEMA, A_KEY, window3, A_VALUE, A_ROWTIME) + WindowedRow.of(SCHEMA, new Windowed<>(A_KEY, window1), A_VALUE, A_ROWTIME), + WindowedRow.of(SCHEMA, new Windowed<>(A_KEY, window2), A_VALUE, A_ROWTIME), + WindowedRow.of(SCHEMA, new Windowed<>(A_KEY, window3), A_VALUE, A_ROWTIME) ); when(innerWindowed.get(any(), any())).thenReturn(rows); @@ -399,9 +419,9 @@ public void shouldMaintainResultOrdering() { // Then: assertThat(result, hasSize(rows.size())); - assertThat(result.get(0).window(), is(Optional.of(window1))); - assertThat(result.get(1).window(), is(Optional.of(window2))); - assertThat(result.get(2).window(), is(Optional.of(window3))); + assertThat(result.get(0).windowedKey().window(), is(window1)); + assertThat(result.get(1).windowedKey().window(), is(window2)); + assertThat(result.get(2).windowedKey().window(), is(window3)); } private void givenNoopFilter() { diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java index 42a08c0df7e1..f00ab01f3a97 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java @@ -28,10 +28,11 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; -import java.time.Instant; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -58,7 +59,9 @@ public class WindowedRowTest { .put("k0", "key") .put("k1", 11); - private static final Window A_WINDOW = Window.of(Instant.MIN, Instant.MIN.plusMillis(1)); + private static final TimeWindow A_WINDOW = new TimeWindow(10, 1000); + + private static final Windowed A_WINDOWED_KEY = new Windowed<>(A_KEY, A_WINDOW); private static final GenericRow A_VALUE = GenericRow.genericRow("v0-v", 1.0d); private static final long A_ROWTIME = 12335L; @@ -74,8 +77,7 @@ public class WindowedRowTest { public void shouldThrowNPE() { new NullPointerTester() .setDefault(LogicalSchema.class, SCHEMA) - .setDefault(Struct.class, A_KEY) - .setDefault(Window.class, A_WINDOW) + .setDefault(Windowed.class, A_WINDOWED_KEY) .setDefault(GenericRow.class, A_VALUE) .testStaticMethods(WindowedRow.class, Visibility.PROTECTED); } @@ -92,23 +94,23 @@ public void shouldImplementEquals() { new EqualsTester() .addEqualityGroup( - WindowedRow.of(SCHEMA, A_KEY, A_WINDOW, A_VALUE, A_ROWTIME), - WindowedRow.of(SCHEMA, A_KEY, A_WINDOW, A_VALUE, A_ROWTIME) + WindowedRow.of(SCHEMA, A_WINDOWED_KEY, A_VALUE, A_ROWTIME), + WindowedRow.of(SCHEMA, A_WINDOWED_KEY, A_VALUE, A_ROWTIME) ) .addEqualityGroup( - WindowedRow.of(differentSchema, A_KEY, A_WINDOW, A_VALUE, A_ROWTIME) + WindowedRow.of(differentSchema, A_WINDOWED_KEY, A_VALUE, A_ROWTIME) ) .addEqualityGroup( - WindowedRow.of(SCHEMA, new Struct(KEY_STRUCT_SCHEMA), A_WINDOW, A_VALUE, A_ROWTIME) + WindowedRow.of(SCHEMA, new Windowed<>(new Struct(KEY_STRUCT_SCHEMA), A_WINDOW), A_VALUE, A_ROWTIME) ) .addEqualityGroup( - WindowedRow.of(SCHEMA, A_KEY, mock(Window.class, "diff"), A_VALUE, A_ROWTIME) + WindowedRow.of(SCHEMA, new Windowed<>(A_KEY, mock(TimeWindow.class, "diff")), A_VALUE, A_ROWTIME) ) .addEqualityGroup( - WindowedRow.of(SCHEMA, A_KEY, A_WINDOW, GenericRow.genericRow(null, null), A_ROWTIME) + WindowedRow.of(SCHEMA, A_WINDOWED_KEY, GenericRow.genericRow(null, null), A_ROWTIME) ) .addEqualityGroup( - WindowedRow.of(SCHEMA, A_KEY, A_WINDOW, A_VALUE, -1L) + WindowedRow.of(SCHEMA, A_WINDOWED_KEY, A_VALUE, -1L) ) .testEquals(); } @@ -116,7 +118,7 @@ public void shouldImplementEquals() { @Test public void shouldValidateOnConstruction() { // When: - new WindowedRow(SCHEMA, A_KEY, A_WINDOW, A_VALUE, A_ROWTIME, validator); + new WindowedRow(SCHEMA, A_WINDOWED_KEY, A_VALUE, A_ROWTIME, validator); // Then: verify(validator).validate(SCHEMA, A_KEY, A_VALUE); @@ -126,7 +128,7 @@ public void shouldValidateOnConstruction() { @Test public void shouldValidateOnCopy() { // Given: - final WindowedRow row = new WindowedRow(SCHEMA, A_KEY, A_WINDOW, A_VALUE, A_ROWTIME, validator); + final WindowedRow row = new WindowedRow(SCHEMA, A_WINDOWED_KEY, A_VALUE, A_ROWTIME, validator); clearInvocations(validator); // When: diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java index 6c960435c28b..0e26cd18f606 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java @@ -31,7 +31,6 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.MaterializationException; import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException; -import io.confluent.ksql.execution.streams.materialization.Window; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.name.ColumnName; @@ -216,8 +215,7 @@ public void shouldReturnValueIfSessionStartsAtLowerBoundIfLowerBoundClosed() { // Then: assertThat(result, contains(WindowedRow.of( SCHEMA, - A_KEY, - Window.of(LOWER_INSTANT, wend), + sessionKey(LOWER_INSTANT, wend), A_VALUE, wend.toEpochMilli() ))); @@ -257,8 +255,7 @@ public void shouldReturnValueIfSessionStartsAtUpperBoundIfUpperBoundClosed() { // Then: assertThat(result, contains(WindowedRow.of( SCHEMA, - A_KEY, - Window.of(UPPER_INSTANT, wend), + sessionKey(UPPER_INSTANT, wend), A_VALUE, wend.toEpochMilli() ))); @@ -293,8 +290,7 @@ public void shouldReturnValueIfSessionStartsBetweenBounds() { // Then: assertThat(result, contains(WindowedRow.of( SCHEMA, - A_KEY, - Window.of(LOWER_INSTANT.plusMillis(1), wend), + sessionKey(LOWER_INSTANT.plusMillis(1), wend), A_VALUE, wend.toEpochMilli() ))); @@ -317,15 +313,13 @@ public void shouldReturnMultipleSessions() { assertThat(result, contains( WindowedRow.of( SCHEMA, - A_KEY, - Window.of(LOWER_INSTANT, wend0), + sessionKey(LOWER_INSTANT, wend0), A_VALUE, wend0.toEpochMilli() ), WindowedRow.of( SCHEMA, - A_KEY, - Window.of(UPPER_INSTANT, wend1), + sessionKey(UPPER_INSTANT, wend1), A_VALUE, wend1.toEpochMilli() ) @@ -349,14 +343,16 @@ private void givenSingleSession( final Instant start, final Instant end ) { - final KeyValue, GenericRow> kv = new KeyValue<>( - new Windowed<>( - A_KEY, - new SessionWindow(start.toEpochMilli(), end.toEpochMilli()) - ), - A_VALUE - ); + sessions.add(new KeyValue<>(sessionKey(start, end), A_VALUE)); + } - sessions.add(kv); + private static Windowed sessionKey( + final Instant sessionStart, + final Instant sessionEnd + ) { + return new Windowed<>( + A_KEY, + new SessionWindow(sessionStart.toEpochMilli(), sessionEnd.toEpochMilli()) + ); } } \ No newline at end of file diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java index e290935d2b65..ceb7af6e26fc 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java @@ -30,7 +30,6 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.materialization.MaterializationException; import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException; -import io.confluent.ksql.execution.streams.materialization.Window; import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.name.ColumnName; @@ -41,6 +40,8 @@ import java.util.List; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -207,15 +208,13 @@ public void shouldReturnValuesForClosedBounds() { assertThat(result, contains( WindowedRow.of( SCHEMA, - A_KEY, - Window.of(bounds.lowerEndpoint(), windowEnd(bounds.lowerEndpoint())), + windowedKey(bounds.lowerEndpoint()), VALUE_1.value(), VALUE_1.timestamp() ), WindowedRow.of( SCHEMA, - A_KEY, - Window.of(bounds.upperEndpoint(), windowEnd(bounds.upperEndpoint())), + windowedKey(bounds.upperEndpoint()), VALUE_2.value(), VALUE_2.timestamp() ) @@ -251,11 +250,7 @@ public void shouldReturnValuesForOpenBounds() { assertThat(result, contains( WindowedRow.of( SCHEMA, - A_KEY, - Window.of( - bounds.lowerEndpoint().plusMillis(1), - windowEnd(bounds.lowerEndpoint().plusMillis(1)) - ), + windowedKey(bounds.lowerEndpoint().plusMillis(1)), VALUE_2.value(), VALUE_2.timestamp() ) @@ -288,22 +283,19 @@ public void shouldMaintainResultOrder() { assertThat(result, contains( WindowedRow.of( SCHEMA, - A_KEY, - Window.of(start, windowEnd(start)), + windowedKey(start), VALUE_1.value(), VALUE_1.timestamp() ), WindowedRow.of( SCHEMA, - A_KEY, - Window.of(start.plusMillis(1), windowEnd(start.plusMillis(1))), + windowedKey(start.plusMillis(1)), VALUE_2.value(), VALUE_2.timestamp() ), WindowedRow.of( SCHEMA, - A_KEY, - Window.of(start.plusMillis(2), windowEnd(start.plusMillis(2))), + windowedKey(start.plusMillis(2)), VALUE_3.value(), VALUE_3.timestamp() ) @@ -323,7 +315,10 @@ public void shouldSupportRangeAll() { ); } - private static Instant windowEnd(final Instant windowStart) { - return windowStart.plus(WINDOW_SIZE); + private static Windowed windowedKey(final Instant windowStart) { + return new Windowed<>( + A_KEY, + new TimeWindow(windowStart.toEpochMilli(), windowStart.plus(WINDOW_SIZE).toEpochMilli()) + ); } } \ No newline at end of file