diff --git a/docs-md/developer-guide/create-a-stream.md b/docs-md/developer-guide/create-a-stream.md index a697a800c93b..79d74805c89f 100644 --- a/docs-md/developer-guide/create-a-stream.md +++ b/docs-md/developer-guide/create-a-stream.md @@ -6,7 +6,7 @@ description: Learn how to use the CREATE STREAM statement on a Kafka topic --- In ksqlDB, you create streams from existing {{ site.aktm }} topics, create -streams that will create new {{ site.aktm }} topics, or create streams of +streams that will create new {{ site.ak }} topics, or create streams of query results from other streams. - Use the CREATE STREAM statement to create a stream from an existing Kafka @@ -187,7 +187,7 @@ Kafka topic : pageviews (partitions: 1, replication: 1) Create a Stream backed by a new Kafka Topic ------------------------------------------- -Use the CREATE STREAM statement to create a stream without a preexisting +Use the CREATE STREAM statement to create a stream without a preexisting topic by providing the PARTITIONS count, and optionally the REPLICA count, in the WITH clause. diff --git a/docs-md/developer-guide/create-a-table.md b/docs-md/developer-guide/create-a-table.md index 41fdfdce98ad..4ac8c359e341 100644 --- a/docs-md/developer-guide/create-a-table.md +++ b/docs-md/developer-guide/create-a-table.md @@ -6,7 +6,7 @@ description: Learn how to use the CREATE TABLE statement on a Kafka topic --- In ksqlDB, you create tables from existing {{ site.aktm }} topics, create -tables that will create new {{ site.ak }} topics, or create tables of +tables that will create new {{ site.ak }} topics, or create tables of query results from other tables or streams. - Use the CREATE TABLE statement to create a table from an existing Kafka topic, @@ -21,7 +21,7 @@ query results from other tables or streams. Create a Table from an existing Kafka Topic ------------------------------------------- -Use the CREATE TABLE statement to create a table from an existing +Use the CREATE TABLE statement to create a table from an existing underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster. The following examples show how to create tables from a Kafka topic, @@ -132,7 +132,7 @@ because the underlying `users` topic receives new messages continuously. Create a Table backed by a new Kafka Topic ------------------------------------------ -Use the CREATE TABLE statement to create a table without a preexisting +Use the CREATE TABLE statement to create a table without a preexisting topic by providing the PARTITIONS count, and optionally the REPLICA count, in the WITH clause. @@ -310,10 +310,10 @@ SELECT * FROM pageviews_table WHERE ROWKEY='User_9|+|Page_39'; Your output should resemble: ``` -+-----------------+---------------+---------------+--------+---------+-------+ - ROWKEY | WINDOWSTART | ROWTIME | USERID | PAGEID | TOTAL | -------------------+---------------+---------------+--------+---------+-------+ - User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1 | ++------------------+---------------+---------------+---------------+--------+---------+-------+ +| ROWKEY | WINDOWSTART | WINDOWEND | ROWTIME | USERID | PAGEID | TOTAL | ++------------------+---------------+---------------+---------------+--------+---------+-------+ +| User_9|+|Page_39 | 1557183900000 | 1557183960000 | 1557183929488 | User_9 | Page_39 | 1 | Query terminated ``` diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index c8ce24942c97..eb3fe151bb5a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -383,8 +383,7 @@ private LogicalSchema buildProjectionSchema( final List projection) { final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager( schema, - functionRegistry, - true + functionRegistry ); final Builder builder = LogicalSchema.builder(); @@ -414,7 +413,7 @@ private LogicalSchema buildAggregateSchema( keyType = SqlTypes.STRING; } else { final ExpressionTypeManager typeManager = - new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry, true); + new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry); keyType = typeManager.getExpressionSqlType(groupByExps.get(0)); } @@ -437,7 +436,7 @@ private LogicalSchema buildRepartitionedSchema( final LogicalSchema sourceSchema = sourceNode.getSchema(); final ExpressionTypeManager typeManager = - new ExpressionTypeManager(sourceSchema, functionRegistry, false); + new ExpressionTypeManager(sourceSchema, functionRegistry); final SqlType keyType = typeManager.getExpressionSqlType(partitionBy); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 61ae11b8702b..765853399d43 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -362,7 +362,7 @@ private Optional buildMaterializationProvider( kafkaStreams, info.getStateStoreSchema(), keySerializer, - keyFormat.getWindowType(), + keyFormat.getWindowInfo(), streamsProperties, ksqlConfig ); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java index cc32b451d620..0ba2925add9e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java @@ -286,7 +286,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() { final MaterializedWindowedTable table = materialization.windowed(); rows.forEach((k, v) -> { - final Window w = Window.of(k.window().startTime(), Optional.empty()); + final Window w = Window.of(k.window().startTime(), k.window().endTime()); final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema()); final List resultAtWindowStart = table.get(key, Range.singleton(w.start())); @@ -332,7 +332,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() { final MaterializedWindowedTable table = materialization.windowed(); rows.forEach((k, v) -> { - final Window w = Window.of(k.window().startTime(), Optional.empty()); + final Window w = Window.of(k.window().startTime(), k.window().endTime()); final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema()); final List resultAtWindowStart = table.get(key, Range.singleton(w.start())); @@ -376,7 +376,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() { final MaterializedWindowedTable table = materialization.windowed(); rows.forEach((k, v) -> { - final Window w = Window.of(k.window().startTime(), Optional.of(k.window().endTime())); + final Window w = Window.of(k.window().startTime(), k.window().endTime()); final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema()); final List resultAtWindowStart = table.get(key, Range.singleton(w.start())); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java index e7fdeed8a333..494aec10dcc2 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/CodeGenRunner.java @@ -82,7 +82,7 @@ public CodeGenRunner( this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry"); this.schema = requireNonNull(schema, "schema"); this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); - this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true); + this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry); } public CodeGenSpec getCodeGenSpec(final Expression expression) { diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index a7d2d6c99cda..b6f382f764ce 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -169,7 +169,7 @@ public static SqlToJavaVisitor of( final Function funNameToCodeName, final Function structToCodeName ) { - this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true); + this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry); this.schema = Objects.requireNonNull(schema, "schema"); this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry"); this.colRefToCodeName = Objects.requireNonNull(colRefToCodeName, "colRefToCodeName"); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java index 13b76b89d508..6fc48c944212 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdafUtil.java @@ -45,7 +45,7 @@ private UdafUtil() { ) { try { final ExpressionTypeManager expressionTypeManager = - new ExpressionTypeManager(schema, functionRegistry, true); + new ExpressionTypeManager(schema, functionRegistry); final SqlType argumentType = expressionTypeManager.getExpressionSqlType(functionCall.getArguments().get(0)); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java index 5263496e7517..68e8cfb5280a 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/UdtfUtil.java @@ -37,7 +37,7 @@ public static KsqlTableFunction resolveTableFunction( final LogicalSchema schema ) { final ExpressionTypeManager expressionTypeManager = - new ExpressionTypeManager(schema, functionRegistry, true); + new ExpressionTypeManager(schema, functionRegistry); final List functionArgs = functionCall.getArguments(); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java index 3d40d5b6aae2..3902544206ff 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/ExpressionTypeManager.java @@ -82,16 +82,13 @@ public class ExpressionTypeManager { private final LogicalSchema schema; private final FunctionRegistry functionRegistry; - private final boolean referenceValueColumnsOnly; public ExpressionTypeManager( final LogicalSchema schema, - final FunctionRegistry functionRegistry, - final boolean referenceValueColumnsOnly + final FunctionRegistry functionRegistry ) { this.schema = Objects.requireNonNull(schema, "schema"); this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry"); - this.referenceValueColumnsOnly = referenceValueColumnsOnly; } public SqlType getExpressionSqlType(final Expression expression) { @@ -185,9 +182,7 @@ public Void visitBetweenPredicate( public Void visitColumnReference( final UnqualifiedColumnReferenceExp node, final ExpressionTypeContext expressionTypeContext ) { - final Optional possibleColumn = referenceValueColumnsOnly - ? schema.findValueColumn(node.getReference()) - : schema.findColumn(node.getReference()); + final Optional possibleColumn = schema.findValueColumn(node.getReference()); final Column schemaColumn = possibleColumn.orElseThrow(() -> new KsqlException(String.format("Invalid Expression %s.", node.toString()))); @@ -468,7 +463,7 @@ public Void visitFunctionCall( final AggregateFunctionInitArguments args = UdafUtil.createAggregateFunctionInitArgs(0, node); - final KsqlAggregateFunction aggFunc = functionRegistry + final KsqlAggregateFunction aggFunc = functionRegistry .getAggregateFunction(node.getName(), schema, args); expressionTypeContext.setSqlType(aggFunc.returnType()); diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java index 9ed84c86f0cd..76fa6e4e943d 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java @@ -106,7 +106,7 @@ public class ExpressionTypeManagerTest { @Before public void init() { - expressionTypeManager = new ExpressionTypeManager(SCHEMA, functionRegistry, true); + expressionTypeManager = new ExpressionTypeManager(SCHEMA, functionRegistry); final UdfFactory internalFactory = mock(UdfFactory.class); final UdfMetadata metadata = mock(UdfMetadata.class); @@ -482,7 +482,8 @@ public void shouldEvaluateTypeForStructExpression() { final LogicalSchema schema = LogicalSchema.builder() .valueColumn(COL0, SqlTypes.array(SqlTypes.INTEGER)) .build(); - expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true); + + expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry); final Expression exp = new CreateStructExpression(ImmutableList.of( new Field("field1", new StringLiteral("foo")), @@ -511,7 +512,7 @@ public void shouldEvaluateTypeForStructDereferenceInArray() { .valueColumn(COL0, SqlTypes.array(inner)) .build(); - expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true); + expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry); final Expression expression = new DereferenceExpression( Optional.empty(), @@ -538,7 +539,7 @@ public void shouldEvaluateTypeForArrayReferenceInStruct() { .valueColumn(COL0, inner) .build(); - expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true); + expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry); final Expression structRef = new DereferenceExpression( Optional.empty(), diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index 410632a8ccab..a817b59e07f3 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -133,11 +133,11 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 12000, 12345, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 13000, 12345, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -157,11 +157,11 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` INTEGER KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":[10, 12000, 12345, 1]}} + {"header":{"schema":"`ROWKEY` INTEGER KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":[10, 12000, 13000, 12345, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` INTEGER KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` INTEGER KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -181,11 +181,11 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` BIGINT KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":[10, 12000, 12345, 1]}} + {"header":{"schema":"`ROWKEY` BIGINT KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":[10, 12000, 13000, 12345, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` BIGINT KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` BIGINT KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -205,11 +205,11 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` DOUBLE KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":[10.1, 12000, 12345, 1]}} + {"header":{"schema":"`ROWKEY` DOUBLE KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":[10.1, 12000, 13000, 12345, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` DOUBLE KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` DOUBLE KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -243,11 +243,11 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 12000, 12345, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 13000, 12345, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -269,15 +269,15 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 12000, 13345, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 22000, 13345, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 13000, 13345, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 13000, 23000, 13345, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -331,21 +331,21 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 12000, 12345, 2]}}, - {"row":{"columns":["10", 14000, 14253, 1]}}, - {"row":{"columns":["10", 15000, 15364, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 13000, 12345, 2]}}, + {"row":{"columns":["10", 14000, 15000, 14253, 1]}}, + {"row":{"columns":["10", 15000, 16000, 15364, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 12000, 12345, 2]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 13000, 12345, 2]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 14000, 14253, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 14000, 15000, 14253, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -367,21 +367,21 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 7000, 10345, 1]}}, - {"row":{"columns":["10", 8000, 10345, 1]}}, - {"row":{"columns":["10", 9000, 13251, 2]}}, - {"row":{"columns":["10", 10000, 13251, 2]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 7000, 12000, 10345, 1]}}, + {"row":{"columns":["10", 8000, 13000, 10345, 1]}}, + {"row":{"columns":["10", 9000, 14000, 13251, 2]}}, + {"row":{"columns":["10", 10000, 15000, 13251, 2]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 8000, 10345, 1]}}, - {"row":{"columns":["10", 9000, 13251, 2]}}, - {"row":{"columns":["10", 10000, 13251, 2]}}, - {"row":{"columns":["10", 11000, 13251, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 8000, 13000, 10345, 1]}}, + {"row":{"columns":["10", 9000, 14000, 13251, 2]}}, + {"row":{"columns":["10", 10000, 15000, 13251, 2]}}, + {"row":{"columns":["10", 11000, 16000, 13251, 1]}} ]}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} ]} ] }, @@ -434,9 +434,9 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 11000, 11345, 1]}}, - {"row":{"columns":["10", 12000, 12345, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 11000, 12000, 11345, 1]}}, + {"row":{"columns":["10", 12000, 13000, 12345, 1]}} ]} ] }, @@ -456,14 +456,14 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 7000, 10345, 1]}}, - {"row":{"columns":["10", 8000, 10345, 1]}}, - {"row":{"columns":["10", 9000, 10345, 1]}}, - {"row":{"columns":["10", 10000, 13345, 2]}}, - {"row":{"columns":["10", 11000, 13345, 1]}}, - {"row":{"columns":["10", 12000, 13345, 1]}}, - {"row":{"columns":["10", 13000, 13345, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 7000, 11000, 10345, 1]}}, + {"row":{"columns":["10", 8000, 12000, 10345, 1]}}, + {"row":{"columns":["10", 9000, 13000, 10345, 1]}}, + {"row":{"columns":["10", 10000, 14000, 13345, 2]}}, + {"row":{"columns":["10", 11000, 15000, 13345, 1]}}, + {"row":{"columns":["10", 12000, 16000, 13345, 1]}}, + {"row":{"columns":["10", 13000, 17000, 13345, 1]}} ]} ] }, @@ -687,8 +687,8 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 1582501512000, 1582501512456, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 1582501512000, 1582501513000, 1582501512456, 1]}} ]} ] }, @@ -707,8 +707,8 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 1582501512000, 1582501512456, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 1582501512000, 1582501513000, 1582501512456, 1]}} ]} ] }, @@ -730,9 +730,9 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, - {"row":{"columns":["10", 1582501512000, 1582501512456, 1]}}, - {"row":{"columns":["10", 1582501552000, 1582501552456, 1]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 1582501512000, 1582501513000, 1582501512456, 1]}}, + {"row":{"columns":["10", 1582501552000, 1582501553000, 1582501552456, 1]}} ]} ] }, @@ -835,8 +835,8 @@ {"admin": {"@type": "currentStatus"}}, {"admin": {"@type": "currentStatus"}}, {"query": [ - {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT, `AVG` DOUBLE"}}, - {"row":{"columns":["10", 11000, 11346, 2, 5.0]}} + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT, `AVG` DOUBLE"}}, + {"row":{"columns":["10", 11000, 12000, 11346, 2, 5.0]}} ]} ] }, @@ -955,6 +955,25 @@ "message": "Column 'AGGREGATE.WINDOWSTART' cannot be resolved", "status": 400 } + }, + { + "name": "window bounds in projection UDF", + "statements": [ + "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", + "SELECT ROWKEY, TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss Z') AS WEND, COUNT FROM AGGREGATE WHERE ROWKEY='10';" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 1580318385000, "key": "10", "value": {"val": 2}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WSTART` STRING, `WEND` STRING, `COUNT` BIGINT"}}, + {"row":{"columns":["10", "2020-01-29 17:19:45 +0000", "2020-01-29 17:19:46 +0000", 1]}} + ]} + ] } ] } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java index cb4147f2db5c..efee662051f5 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java @@ -15,17 +15,13 @@ package io.confluent.ksql.rest.entity; -import com.google.common.collect.ImmutableList; import io.confluent.ksql.execution.streams.materialization.TableRow; -import io.confluent.ksql.model.WindowType; -import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.SchemaUtil; -import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.connect.data.Struct; @@ -35,16 +31,6 @@ */ public final class TableRowsEntityFactory { - @SuppressWarnings("deprecation") - private static final List TIME_WINDOW_COLUMNS = ImmutableList - .of(Column.legacySystemWindowColumn(SchemaUtil.WINDOWSTART_NAME, SqlTypes.BIGINT)); - - @SuppressWarnings("deprecation") - private static final List SESSION_WINDOW_COLUMNS = ImmutableList.builder() - .addAll(TIME_WINDOW_COLUMNS) - .add(Column.legacySystemWindowColumn(SchemaUtil.WINDOWEND_NAME, SqlTypes.BIGINT)) - .build(); - private TableRowsEntityFactory() { } @@ -58,32 +44,19 @@ public static List> createRows( public static LogicalSchema buildSchema( final LogicalSchema schema, - final Optional windowType + final boolean windowed ) { - final LogicalSchema adjusted = LogicalSchema.builder() + final Builder builder = LogicalSchema.builder() .noImplicitColumns() - .keyColumns(schema.key()) - .valueColumns(schema.metadata()) - .valueColumns(schema.value()) - .build(); + .keyColumns(schema.key()); - return windowType - .map(wt -> addWindowFieldsIntoSchema(wt, adjusted)) - .orElse(adjusted); - } - - private static LogicalSchema addWindowFieldsIntoSchema( - final WindowType windowType, - final LogicalSchema schema - ) { - final List additionalKeyCols = windowType == WindowType.SESSION - ? SESSION_WINDOW_COLUMNS - : TIME_WINDOW_COLUMNS; + if (windowed) { + builder.keyColumn(SchemaUtil.WINDOWSTART_NAME, SqlTypes.BIGINT); + builder.keyColumn(SchemaUtil.WINDOWEND_NAME, SqlTypes.BIGINT); + } - return LogicalSchema.builder() - .noImplicitColumns() - .keyColumns(schema.key()) - .keyColumns(additionalKeyCols) + return builder + .valueColumns(schema.metadata()) .valueColumns(schema.value()) .build(); } @@ -95,7 +68,7 @@ private static List createRow(final TableRow row) { row.window().ifPresent(window -> { rowList.add(window.start().toEpochMilli()); - window.end().map(Instant::toEpochMilli).ifPresent(rowList::add); + rowList.add(window.end().toEpochMilli()); }); rowList.add(row.rowTime()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index 089c667f075f..c686733a050b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -58,6 +58,7 @@ import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.model.WindowType; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.tree.AllColumns; import io.confluent.ksql.parser.tree.Query; @@ -96,6 +97,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.connect.data.Struct; @@ -199,10 +201,11 @@ public static TableRowsEntity execute( final LogicalSchema outputSchema; final List> rows; if (isSelectStar(statement.getStatement().getSelect())) { - outputSchema = TableRowsEntityFactory.buildSchema(result.schema, mat.windowType()); + outputSchema = TableRowsEntityFactory + .buildSchema(result.schema, mat.windowType().isPresent()); rows = TableRowsEntityFactory.createRows(result.rows); } else { - outputSchema = selectOutputSchema(result, executionContext, analysis); + outputSchema = selectOutputSchema(result, executionContext, analysis, mat.windowType()); rows = handleSelects( result, @@ -210,6 +213,7 @@ public static TableRowsEntity execute( executionContext, analysis, outputSchema, + mat.windowType(), queryId, contextStacker ); @@ -559,37 +563,43 @@ private static List> handleSelects( final KsqlExecutionContext executionContext, final ImmutableAnalysis analysis, final LogicalSchema outputSchema, + final Optional windowType, final QueryId queryId, final Stacker contextStacker ) { - final boolean referencesRowTime = analysis.getSelectColumnRefs().stream() - .anyMatch(ref -> ref.name().equals(SchemaUtil.ROWTIME_NAME)); - - final boolean referencesRowKey = analysis.getSelectColumnRefs().stream() - .anyMatch(ref -> ref.name().equals(SchemaUtil.ROWKEY_NAME)); + final boolean noSystemColumns = analysis.getSelectColumnRefs().stream() + .noneMatch(ref -> SchemaUtil.systemColumnNames().contains(ref.name())); final LogicalSchema intermediateSchema; - final PreSelectTransformer preSelectTransform; - if (!referencesRowTime && !referencesRowKey) { + final Function preSelectTransform; + if (noSystemColumns) { intermediateSchema = input.schema; - preSelectTransform = (rowTime, key, value) -> value; + preSelectTransform = TableRow::value; } else { // SelectValueMapper requires the rowTime & key fields in the value schema :( - intermediateSchema = LogicalSchema.builder() - .keyColumns(input.schema.key()) - .valueColumns(input.schema.value()) - .valueColumns(input.schema.metadata()) - .valueColumns(input.schema.key()) - .build(); - - preSelectTransform = (rowTime, key, value) -> { - value.getColumns().add(rowTime); - - key.schema().fields().forEach(f -> { - final Object keyField = key.get(f); - value.getColumns().add(keyField); + final boolean windowed = windowType.isPresent(); + + intermediateSchema = input.schema + .withMetaAndKeyColsInValue(windowed); + + preSelectTransform = row -> { + final Struct key = row.key(); + final List columns = row.value().getColumns(); + + columns.add(0, row.rowTime()); + + final List keyFields = key.schema().fields().stream() + .map(key::get) + .collect(Collectors.toList()); + + columns.addAll(1, keyFields); + + row.window().ifPresent(window -> { + columns.add(1, window.start().toEpochMilli()); + columns.add(2, window.end().toEpochMilli()); }); - return value; + + return row.value(); }; } @@ -616,7 +626,7 @@ private static List> handleSelects( final ImmutableList.Builder> output = ImmutableList.builder(); input.rows.forEach(r -> { - final GenericRow intermediate = preSelectTransform.transform(r.rowTime(), r.key(), r.value()); + final GenericRow intermediate = preSelectTransform.apply(r); final GenericRow mapped = transformer.transform( r.key(), @@ -647,22 +657,27 @@ private static void validateProjection( private static LogicalSchema selectOutputSchema( final Result input, final KsqlExecutionContext executionContext, - final ImmutableAnalysis analysis + final ImmutableAnalysis analysis, + final Optional windowType ) { final Builder schemaBuilder = LogicalSchema.builder() .noImplicitColumns(); - final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager( - input.schema, - executionContext.getMetaStore(), - false - ); + // Copy meta & key columns into the value schema as SelectValueMapper expects it: + final LogicalSchema schema = input.schema + .withMetaAndKeyColsInValue(windowType.isPresent()); + + final ExpressionTypeManager expressionTypeManager = + new ExpressionTypeManager(schema, executionContext.getMetaStore()); for (int idx = 0; idx < analysis.getSelectExpressions().size(); idx++) { final SelectExpression select = analysis.getSelectExpressions().get(idx); final SqlType type = expressionTypeManager.getExpressionSqlType(select.getExpression()); - if (input.schema.isKeyColumn(select.getAlias())) { + if (input.schema.isKeyColumn(select.getAlias()) + || select.getAlias().equals(SchemaUtil.WINDOWSTART_NAME) + || select.getAlias().equals(SchemaUtil.WINDOWEND_NAME) + ) { schemaBuilder.keyColumn(select.getAlias(), type); } else { schemaBuilder.valueColumn(select.getAlias(), type); @@ -819,15 +834,6 @@ private static Struct asKeyStruct(final Object rowKey, final PhysicalSchema phys return key; } - private interface PreSelectTransformer { - - GenericRow transform( - long rowTime, - Struct readOnlyKey, - GenericRow value - ); - } - private static final class ColumnReferenceRewriter extends VisitParentExpressionVisitor, Context> { private ColumnReferenceRewriter() { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java index 97d3f94d2211..b7e1ea9de4d9 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/TableRowsEntityFactoryTest.java @@ -29,7 +29,6 @@ import io.confluent.ksql.execution.streams.materialization.WindowedRow; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; -import io.confluent.ksql.model.WindowType; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -37,7 +36,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import org.junit.Test; public class TableRowsEntityFactoryTest { @@ -89,8 +87,8 @@ public void shouldAddNonWindowedRowToValues() { public void shouldAddWindowedRowToValues() { // Given: final Instant now = Instant.now(); - final Window window0 = Window.of(now, Optional.empty()); - final Window window1 = Window.of(now, Optional.of(now)); + final Window window0 = Window.of(now, now.plusMillis(2)); + final Window window1 = Window.of(now, now.plusMillis(1)); final List input = ImmutableList.of( WindowedRow.of( @@ -114,9 +112,10 @@ public void shouldAddWindowedRowToValues() { // Then: assertThat(output, hasSize(2)); - assertThat(output.get(0), contains("x", now.toEpochMilli(), ROWTIME, true)); + assertThat(output.get(0), + contains("x", now.toEpochMilli(), now.plusMillis(2).toEpochMilli(), ROWTIME, true)); assertThat(output.get(1), - contains("y", now.toEpochMilli(), now.toEpochMilli(), ROWTIME, false)); + contains("y", now.toEpochMilli(), now.plusMillis(1).toEpochMilli(), ROWTIME, false)); } @Test @@ -143,7 +142,7 @@ public void shouldSupportNullColumns() { @Test public void shouldJustDuplicateRowTimeInValueIfNotWindowed() { // When: - final LogicalSchema result = TableRowsEntityFactory.buildSchema(SCHEMA, Optional.empty()); + final LogicalSchema result = TableRowsEntityFactory.buildSchema(SCHEMA, false); // Then: assertThat(result, is(LogicalSchema.builder() @@ -160,46 +159,7 @@ public void shouldJustDuplicateRowTimeInValueIfNotWindowed() { @Test public void shouldAddHoppingWindowFieldsToSchema() { // When: - final LogicalSchema result = TableRowsEntityFactory - .buildSchema(SCHEMA, Optional.of(WindowType.HOPPING)); - - // Then: - assertThat(result, is(LogicalSchema.builder() - .noImplicitColumns() - .keyColumn(ColumnName.of("k0"), SqlTypes.STRING) - .keyColumn(ColumnName.of("k1"), SqlTypes.BOOLEAN) - .keyColumn(ColumnName.of("WINDOWSTART"), SqlTypes.BIGINT) - .valueColumn(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT) - .valueColumn(ColumnName.of("v0"), SqlTypes.INTEGER) - .valueColumn(ColumnName.of("v1"), SqlTypes.BOOLEAN) - .build() - )); - } - - @Test - public void shouldAddTumblingWindowFieldsToSchema() { - // When: - final LogicalSchema result = TableRowsEntityFactory - .buildSchema(SCHEMA, Optional.of(WindowType.TUMBLING)); - - // Then: - assertThat(result, is(LogicalSchema.builder() - .noImplicitColumns() - .keyColumn(ColumnName.of("k0"), SqlTypes.STRING) - .keyColumn(ColumnName.of("k1"), SqlTypes.BOOLEAN) - .keyColumn(ColumnName.of("WINDOWSTART"), SqlTypes.BIGINT) - .valueColumn(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT) - .valueColumn(ColumnName.of("v0"), SqlTypes.INTEGER) - .valueColumn(ColumnName.of("v1"), SqlTypes.BOOLEAN) - .build() - )); - } - - @Test - public void shouldAddSessionWindowFieldsToSchema() { - // When: - final LogicalSchema result = TableRowsEntityFactory - .buildSchema(SCHEMA, Optional.of(WindowType.SESSION)); + final LogicalSchema result = TableRowsEntityFactory.buildSchema(SCHEMA, true); // Then: assertThat(result, is(LogicalSchema.builder() diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java index 56e69f9b19f4..2e4adc135af4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryFunctionalTest.java @@ -38,7 +38,6 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.test.util.KsqlIdentifierTestUtil; import io.confluent.ksql.test.util.TestBasicJaasConfig; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.UserDataProvider; import io.confluent.rest.RestConfig; import java.io.IOException; @@ -98,6 +97,7 @@ public class PullQueryFunctionalTest { private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); private static final int BASE_TIME = 1_000_000; + private static final int ONE_SECOND = (int)TimeUnit.SECONDS.toMillis(1); private static final PhysicalSchema AGGREGATE_SCHEMA = PhysicalSchema.from( LogicalSchema.builder() @@ -226,7 +226,13 @@ public void shouldGetSingleWindowedKeyFromBothNodes() { assertThat(rows_0, hasSize(HEADER + 1)); assertThat(rows_1, is(matchersRows(rows_0))); assertThat(rows_0.get(1).getRow(), is(not(Optional.empty()))); - assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, BASE_TIME, 1))); + assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of( + key, // ROWKEY + BASE_TIME, // WINDOWSTART + BASE_TIME + ONE_SECOND, // WINDOWEND + BASE_TIME, // ROWTIME + 1 // COUNT + ))); } private static List makePullQueryRequest( diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java index 3b204b759a5e..7cd4089bf44f 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java @@ -210,7 +210,7 @@ private LogicalSchema handleSelectKey( final StreamSelectKey step ) { final ExpressionTypeManager expressionTypeManager = - new ExpressionTypeManager(sourceSchema, functionRegistry, true); + new ExpressionTypeManager(sourceSchema, functionRegistry); final SqlType keyType = expressionTypeManager .getExpressionSqlType(step.getKeyExpression()); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java index 68325a1e1b93..1baff93cdcca 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java @@ -106,8 +106,8 @@ public static LogicalSchema buildSchema( schemaBuilder.valueColumn(col); } - final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager( - inputSchema, functionRegistry, true); + final ExpressionTypeManager expressionTypeManager = + new ExpressionTypeManager(inputSchema, functionRegistry); // And add new columns representing the exploded values at the end for (int i = 0; i < tableFunctions.size(); i++) { diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Window.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Window.java index 879c094f216f..95d10ccedd0f 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Window.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Window.java @@ -17,7 +17,6 @@ import java.time.Instant; import java.util.Objects; -import java.util.Optional; /** * Pojo for storing window bounds @@ -25,27 +24,27 @@ public final class Window { private final Instant start; - private final Optional end; + private final Instant end; /** * Create instance. * * @param start the start of the window. - * @param end the end of a session window, otherwise {@link Optional#empty()}. + * @param end the end of the window. * @return the instance. */ - public static Window of(final Instant start, final Optional end) { + public static Window of(final Instant start, final Instant end) { return new Window(start, end); } - private Window(final Instant start, final Optional end) { + private Window(final Instant start, final Instant end) { this.start = Objects.requireNonNull(start, "start"); this.end = Objects.requireNonNull(end, "end"); - if (end.map(e -> e.isBefore(start)).orElse(false)) { + if (end.isBefore(start)) { throw new IllegalArgumentException("end before start." + " start: " + start - + ", end: " + end.get() + + ", end: " + end ); } } @@ -54,7 +53,7 @@ public Instant start() { return start; } - public Optional end() { + public Instant end() { return end; } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java index 056f675f4a92..8fd13a607215 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterialization.java @@ -23,6 +23,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.serde.WindowInfo; import java.util.Optional; /** @@ -30,16 +31,16 @@ */ public final class KsMaterialization implements Materialization { - private final Optional windowType; + private final Optional windowInfo; private final KsStateStore stateStore; private final Locator locator; KsMaterialization( - final Optional windowType, + final Optional windowInfo, final Locator locator, final KsStateStore stateStore ) { - this.windowType = requireNonNull(windowType, "windowType"); + this.windowInfo = requireNonNull(windowInfo, "windowInfo"); this.stateStore = requireNonNull(stateStore, "stateStore"); this.locator = requireNonNull(locator, "locator"); } @@ -56,33 +57,36 @@ public Locator locator() { @Override public Optional windowType() { - return windowType; + return windowInfo.map(WindowInfo::getType); } @Override public MaterializedTable nonWindowed() { - if (windowType.isPresent()) { + if (windowInfo.isPresent()) { throw new UnsupportedOperationException("Table has windowed key"); } return new KsMaterializedTable(stateStore); } + @SuppressWarnings("OptionalGetWithoutIsPresent") // Enforced by type @Override public MaterializedWindowedTable windowed() { - if (!windowType.isPresent()) { + if (!windowInfo.isPresent()) { throw new UnsupportedOperationException("Table has non-windowed key"); } - switch (windowType.get()) { + final WindowInfo wndInfo = windowInfo.get(); + final WindowType wndType = wndInfo.getType(); + switch (wndType) { case SESSION: return new KsMaterializedSessionTable(stateStore); case HOPPING: case TUMBLING: - return new KsMaterializedWindowTable(stateStore); + return new KsMaterializedWindowTable(stateStore, wndInfo.getSize().get()); default: - throw new UnsupportedOperationException("Unknown window type: " + windowType.get()); + throw new UnsupportedOperationException("Unknown window type: " + wndInfo); } } } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java index 81082e7da557..db1a4cb95748 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java @@ -19,8 +19,8 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.execution.streams.materialization.Locator; -import io.confluent.ksql.model.WindowType; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlConfig; import java.net.MalformedURLException; import java.net.URL; @@ -65,7 +65,7 @@ public KsMaterializationFactory() { * @param stateStoreName the name of the state store in the Kafka Streams instance. * @param kafkaStreams the Kafka Streams instance. * @param keySerializer the key serializer - used purely for location lookups. - * @param windowType the window type of the key. + * @param windowInfo the window type of the key. * @param streamsProperties the Kafka Streams properties. * @return the new instance if the streams props support IQ. */ @@ -74,7 +74,7 @@ public Optional create( final KafkaStreams kafkaStreams, final LogicalSchema schema, final Serializer keySerializer, - final Optional windowType, + final Optional windowInfo, final Map streamsProperties, final KsqlConfig ksqlConfig ) { @@ -100,7 +100,7 @@ public Optional create( ); final KsMaterialization materialization = materializationFactory.create( - windowType, + windowInfo, locator, stateStore ); @@ -144,7 +144,7 @@ KsStateStore create( interface MaterializationFactory { KsMaterialization create( - Optional windowType, + Optional windowInfo, Locator locator, KsStateStore stateStore ); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java index 2cc2e73ba300..e12d5381b1a0 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java @@ -26,7 +26,6 @@ import java.time.Instant; import java.util.List; import java.util.Objects; -import java.util.Optional; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -76,7 +75,7 @@ private List findSession( final Window window = Window.of( next.key.window().startTime(), - Optional.of(next.key.window().endTime()) + next.key.window().endTime() ); final WindowedRow row = WindowedRow.of( diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java index 0efbe98e2131..fbcdee4d66e1 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java @@ -23,10 +23,10 @@ import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; import io.confluent.ksql.execution.streams.materialization.Window; import io.confluent.ksql.execution.streams.materialization.WindowedRow; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Objects; -import java.util.Optional; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -40,9 +40,11 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable { private final KsStateStore stateStore; + private final Duration windowSize; - KsMaterializedWindowTable(final KsStateStore store) { + KsMaterializedWindowTable(final KsStateStore store, final Duration windowSize) { this.stateStore = Objects.requireNonNull(store, "store"); + this.windowSize = Objects.requireNonNull(windowSize, "windowSize"); } @Override @@ -71,7 +73,10 @@ public List get( final Instant windowStart = Instant.ofEpochMilli(next.key); if (windowStartBounds.contains(windowStart)) { - final Window window = Window.of(windowStart, Optional.empty()); + + final Instant windowEnd = windowStart.plus(windowSize); + + final Window window = Window.of(windowStart, windowEnd); final WindowedRow row = WindowedRow.of( stateStore.schema(), diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java index 6be413a4c56a..40320f4e2931 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java @@ -70,7 +70,7 @@ public class KsqlMaterializationTest { ); private static final GenericRow A_VALUE = new GenericRow("a", "b"); private static final GenericRow TRANSFORMED = new GenericRow("x", "y"); - private static final Window A_WINDOW = Window.of(Instant.now(), Optional.empty()); + private static final Window A_WINDOW = Window.of(Instant.now(), Instant.now().plusMillis(10)); private static final Row ROW = Row.of( SCHEMA, diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowTest.java index a5e39da5108e..f7fdffb78ebb 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowTest.java @@ -19,7 +19,6 @@ import com.google.common.testing.NullPointerTester; import com.google.common.testing.NullPointerTester.Visibility; import java.time.Instant; -import java.util.Optional; import org.junit.Test; public class WindowTest { @@ -27,30 +26,32 @@ public class WindowTest { private static final Instant INSTANT = Instant.now(); private static final Instant LATER_INSTANCE = INSTANT.plusMillis(1); + @SuppressWarnings("UnstableApiUsage") @Test public void shouldThrowNPEs() { new NullPointerTester() .testStaticMethods(Window.class, Visibility.PACKAGE); } + @SuppressWarnings("UnstableApiUsage") @Test public void shouldImplementEquals() { new EqualsTester() .addEqualityGroup( - Window.of(INSTANT, Optional.of(LATER_INSTANCE)), - Window.of(INSTANT, Optional.of(LATER_INSTANCE)) + Window.of(INSTANT, LATER_INSTANCE), + Window.of(INSTANT, LATER_INSTANCE) ) .addEqualityGroup( - Window.of(INSTANT.minusMillis(1), Optional.of(LATER_INSTANCE)) + Window.of(INSTANT.minusMillis(1), LATER_INSTANCE) ) .addEqualityGroup( - Window.of(INSTANT, Optional.empty()) + Window.of(INSTANT, LATER_INSTANCE.plusMillis(1)) ) .testEquals(); } @Test(expected = IllegalArgumentException.class) public void shouldThrowIfEndBeforeStart() { - Window.of(LATER_INSTANCE, Optional.of(INSTANT)); + Window.of(LATER_INSTANCE, INSTANT); } } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java index 3cde0851d1e6..0dec47f218f6 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/WindowedRowTest.java @@ -29,7 +29,6 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.time.Instant; -import java.util.Optional; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -59,7 +58,7 @@ public class WindowedRowTest { .put("k0", "key") .put("k1", 11); - private static final Window A_WINDOW = Window.of(Instant.MIN, Optional.empty()); + private static final Window A_WINDOW = Window.of(Instant.MIN, Instant.MIN.plusMillis(1)); private static final GenericRow A_VALUE = new GenericRow("v0-v", 1.0d); private static final long A_ROWTIME = 12335L; diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java index 32e3e64e1c38..92208723f057 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java @@ -30,6 +30,7 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlConfig; import java.net.MalformedURLException; import java.net.URL; @@ -151,15 +152,16 @@ public void shouldBuildStateStoreWithCorrectParams() { @Test public void shouldBuildMaterializationWithCorrectParams() { // Given: - final Optional windowType = Optional.of(WindowType.SESSION); + final Optional windowInfo = + Optional.of(WindowInfo.of(WindowType.SESSION, Optional.empty())); // When: - factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, windowType, streamsProperties, + factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, windowInfo, streamsProperties, ksqlConfig); // Then: verify(materializationFactory).create( - windowType, + windowInfo, locator, stateStore ); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationTest.java index 331887ee99ea..8a11a960a54c 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationTest.java @@ -26,6 +26,8 @@ import io.confluent.ksql.execution.streams.materialization.MaterializedTable; import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable; import io.confluent.ksql.model.WindowType; +import io.confluent.ksql.serde.WindowInfo; +import java.time.Duration; import java.util.Optional; import org.junit.Before; import org.junit.Test; @@ -47,6 +49,7 @@ public void setUp() { givenWindowType(Optional.empty()); } + @SuppressWarnings("UnstableApiUsage") @Test public void shouldThrowNPEs() { new NullPointerTester() @@ -135,6 +138,12 @@ public void shouldReturnWindowedForHopping() { } private void givenWindowType(final Optional windowType) { - materialization = new KsMaterialization(windowType, locator, stateStore); + final Optional windowInfo = windowType + .map(wt -> WindowInfo.of(wt, wt.requiresWindowSize() + ? Optional.of(Duration.ofSeconds(1)) + : Optional.empty()) + ); + + materialization = new KsMaterialization(windowInfo, locator, stateStore); } } \ No newline at end of file diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java index 34909f92b145..e59d82c2f8f4 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java @@ -40,7 +40,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -218,7 +217,7 @@ public void shouldReturnValueIfSessionStartsAtLowerBoundIfLowerBoundClosed() { assertThat(result, contains(WindowedRow.of( SCHEMA, A_KEY, - Window.of(LOWER_INSTANT, Optional.of(wend)), + Window.of(LOWER_INSTANT, wend), A_VALUE, wend.toEpochMilli() ))); @@ -259,7 +258,7 @@ public void shouldReturnValueIfSessionStartsAtUpperBoundIfUpperBoundClosed() { assertThat(result, contains(WindowedRow.of( SCHEMA, A_KEY, - Window.of(UPPER_INSTANT, Optional.of(wend)), + Window.of(UPPER_INSTANT, wend), A_VALUE, wend.toEpochMilli() ))); @@ -295,7 +294,7 @@ public void shouldReturnValueIfSessionStartsBetweenBounds() { assertThat(result, contains(WindowedRow.of( SCHEMA, A_KEY, - Window.of(LOWER_INSTANT.plusMillis(1), Optional.of(wend)), + Window.of(LOWER_INSTANT.plusMillis(1), wend), A_VALUE, wend.toEpochMilli() ))); @@ -319,14 +318,14 @@ public void shouldReturnMultipleSessions() { WindowedRow.of( SCHEMA, A_KEY, - Window.of(LOWER_INSTANT, Optional.of(wend0)), + Window.of(LOWER_INSTANT, wend0), A_VALUE, wend0.toEpochMilli() ), WindowedRow.of( SCHEMA, A_KEY, - Window.of(UPPER_INSTANT, Optional.of(wend1)), + Window.of(UPPER_INSTANT, wend1), A_VALUE, wend1.toEpochMilli() ) diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java index 673d8fa86083..112fc53048a8 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java @@ -36,9 +36,9 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import java.time.Duration; import java.time.Instant; import java.util.List; -import java.util.Optional; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.QueryableStoreType; @@ -58,6 +58,8 @@ @RunWith(MockitoJUnitRunner.class) public class KsMaterializedWindowTableTest { + private static final Duration WINDOW_SIZE = Duration.ofMinutes(1); + private static final LogicalSchema SCHEMA = LogicalSchema.builder() .keyColumn(ColumnName.of("ROWKEY"), SqlTypes.STRING) .valueColumn(ColumnName.of("v0"), SqlTypes.STRING) @@ -93,7 +95,7 @@ public class KsMaterializedWindowTableTest { @Before public void setUp() { - table = new KsMaterializedWindowTable(stateStore); + table = new KsMaterializedWindowTable(stateStore, WINDOW_SIZE); when(stateStore.store(any())).thenReturn(tableStore); when(stateStore.schema()).thenReturn(SCHEMA); @@ -206,14 +208,14 @@ public void shouldReturnValuesForClosedBounds() { WindowedRow.of( SCHEMA, A_KEY, - Window.of(bounds.lowerEndpoint(), Optional.empty()), + Window.of(bounds.lowerEndpoint(), windowEnd(bounds.lowerEndpoint())), VALUE_1.value(), VALUE_1.timestamp() ), WindowedRow.of( SCHEMA, A_KEY, - Window.of(bounds.upperEndpoint(), Optional.empty()), + Window.of(bounds.upperEndpoint(), windowEnd(bounds.upperEndpoint())), VALUE_2.value(), VALUE_2.timestamp() ) @@ -250,7 +252,10 @@ public void shouldReturnValuesForOpenBounds() { WindowedRow.of( SCHEMA, A_KEY, - Window.of(bounds.lowerEndpoint().plusMillis(1), Optional.empty()), + Window.of( + bounds.lowerEndpoint().plusMillis(1), + windowEnd(bounds.lowerEndpoint().plusMillis(1)) + ), VALUE_2.value(), VALUE_2.timestamp() ) @@ -284,21 +289,21 @@ public void shouldMaintainResultOrder() { WindowedRow.of( SCHEMA, A_KEY, - Window.of(start, Optional.empty()), + Window.of(start, windowEnd(start)), VALUE_1.value(), VALUE_1.timestamp() ), WindowedRow.of( SCHEMA, A_KEY, - Window.of(start.plusMillis(1), Optional.empty()), + Window.of(start.plusMillis(1), windowEnd(start.plusMillis(1))), VALUE_2.value(), VALUE_2.timestamp() ), WindowedRow.of( SCHEMA, A_KEY, - Window.of(start.plusMillis(2), Optional.empty()), + Window.of(start.plusMillis(2), windowEnd(start.plusMillis(2))), VALUE_3.value(), VALUE_3.timestamp() ) @@ -317,4 +322,8 @@ public void shouldSupportRangeAll() { Instant.ofEpochMilli(Long.MAX_VALUE) ); } + + private static Instant windowEnd(final Instant windowStart) { + return windowStart.plus(WINDOW_SIZE); + } } \ No newline at end of file