Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add full window bounds support to pull queries #4404

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs-md/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description: Learn how to use the CREATE STREAM statement on a Kafka topic
---

In ksqlDB, you create streams from existing {{ site.aktm }} topics, create
streams that will create new {{ site.aktm }} topics, or create streams of
streams that will create new {{ site.ak }} topics, or create streams of
query results from other streams.

- Use the CREATE STREAM statement to create a stream from an existing Kafka
Expand Down Expand Up @@ -187,7 +187,7 @@ Kafka topic : pageviews (partitions: 1, replication: 1)
Create a Stream backed by a new Kafka Topic
-------------------------------------------

Use the CREATE STREAM statement to create a stream without a preexisting
Use the CREATE STREAM statement to create a stream without a preexisting
topic by providing the PARTITIONS count, and optionally the REPLICA count,
in the WITH clause.

Expand Down
14 changes: 7 additions & 7 deletions docs-md/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description: Learn how to use the CREATE TABLE statement on a Kafka topic
---

In ksqlDB, you create tables from existing {{ site.aktm }} topics, create
tables that will create new {{ site.ak }} topics, or create tables of
tables that will create new {{ site.ak }} topics, or create tables of
query results from other tables or streams.

- Use the CREATE TABLE statement to create a table from an existing Kafka topic,
Expand All @@ -21,7 +21,7 @@ query results from other tables or streams.
Create a Table from an existing Kafka Topic
-------------------------------------------

Use the CREATE TABLE statement to create a table from an existing
Use the CREATE TABLE statement to create a table from an existing
underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster.

The following examples show how to create tables from a Kafka topic,
Expand Down Expand Up @@ -132,7 +132,7 @@ because the underlying `users` topic receives new messages continuously.
Create a Table backed by a new Kafka Topic
------------------------------------------

Use the CREATE TABLE statement to create a table without a preexisting
Use the CREATE TABLE statement to create a table without a preexisting
topic by providing the PARTITIONS count, and optionally the REPLICA count,
in the WITH clause.

Expand Down Expand Up @@ -310,10 +310,10 @@ SELECT * FROM pageviews_table WHERE ROWKEY='User_9|+|Page_39';
Your output should resemble:

```
+-----------------+---------------+---------------+--------+---------+-------+
ROWKEY | WINDOWSTART | ROWTIME | USERID | PAGEID | TOTAL |
------------------+---------------+---------------+--------+---------+-------+
User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1 |
+------------------+---------------+---------------+---------------+--------+---------+-------+
| ROWKEY | WINDOWSTART | WINDOWEND | ROWTIME | USERID | PAGEID | TOTAL |
+------------------+---------------+---------------+---------------+--------+---------+-------+
| User_9|+|Page_39 | 1557183900000 | 1557183960000 | 1557183929488 | User_9 | Page_39 | 1 |
Query terminated
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ private LogicalSchema buildProjectionSchema(
final List<SelectExpression> projection) {
final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(
schema,
functionRegistry,
true
functionRegistry
);

final Builder builder = LogicalSchema.builder();
Expand Down Expand Up @@ -414,7 +413,7 @@ private LogicalSchema buildAggregateSchema(
keyType = SqlTypes.STRING;
} else {
final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry, true);
new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry);

keyType = typeManager.getExpressionSqlType(groupByExps.get(0));
}
Expand All @@ -437,7 +436,7 @@ private LogicalSchema buildRepartitionedSchema(
final LogicalSchema sourceSchema = sourceNode.getSchema();

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry, false);
new ExpressionTypeManager(sourceSchema, functionRegistry);

final SqlType keyType = typeManager.getExpressionSqlType(partitionBy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private Optional<MaterializationProvider> buildMaterializationProvider(
kafkaStreams,
info.getStateStoreSchema(),
keySerializer,
keyFormat.getWindowType(),
keyFormat.getWindowInfo(),
streamsProperties,
ksqlConfig
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
final MaterializedWindowedTable table = materialization.windowed();

rows.forEach((k, v) -> {
final Window w = Window.of(k.window().startTime(), Optional.empty());
final Window w = Window.of(k.window().startTime(), k.window().endTime());
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart = table.get(key, Range.singleton(w.start()));
Expand Down Expand Up @@ -332,7 +332,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
final MaterializedWindowedTable table = materialization.windowed();

rows.forEach((k, v) -> {
final Window w = Window.of(k.window().startTime(), Optional.empty());
final Window w = Window.of(k.window().startTime(), k.window().endTime());
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart = table.get(key, Range.singleton(w.start()));
Expand Down Expand Up @@ -376,7 +376,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
final MaterializedWindowedTable table = materialization.windowed();

rows.forEach((k, v) -> {
final Window w = Window.of(k.window().startTime(), Optional.of(k.window().endTime()));
final Window w = Window.of(k.window().startTime(), k.window().endTime());
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart = table.get(key, Range.singleton(w.start()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public CodeGenRunner(
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry");
this.schema = requireNonNull(schema, "schema");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
}

public CodeGenSpec getCodeGenSpec(final Expression expression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static SqlToJavaVisitor of(
final Function<FunctionName, String> funNameToCodeName,
final Function<CreateStructExpression, String> structToCodeName
) {
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
this.schema = Objects.requireNonNull(schema, "schema");
this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry");
this.colRefToCodeName = Objects.requireNonNull(colRefToCodeName, "colRefToCodeName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private UdafUtil() {
) {
try {
final ExpressionTypeManager expressionTypeManager =
new ExpressionTypeManager(schema, functionRegistry, true);
new ExpressionTypeManager(schema, functionRegistry);

final SqlType argumentType =
expressionTypeManager.getExpressionSqlType(functionCall.getArguments().get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static KsqlTableFunction resolveTableFunction(
final LogicalSchema schema
) {
final ExpressionTypeManager expressionTypeManager =
new ExpressionTypeManager(schema, functionRegistry, true);
new ExpressionTypeManager(schema, functionRegistry);

final List<Expression> functionArgs = functionCall.getArguments();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,13 @@ public class ExpressionTypeManager {

private final LogicalSchema schema;
private final FunctionRegistry functionRegistry;
private final boolean referenceValueColumnsOnly;

public ExpressionTypeManager(
final LogicalSchema schema,
final FunctionRegistry functionRegistry,
final boolean referenceValueColumnsOnly
final FunctionRegistry functionRegistry
) {
this.schema = Objects.requireNonNull(schema, "schema");
this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry");
this.referenceValueColumnsOnly = referenceValueColumnsOnly;
}

public SqlType getExpressionSqlType(final Expression expression) {
Expand Down Expand Up @@ -185,9 +182,7 @@ public Void visitBetweenPredicate(
public Void visitColumnReference(
final UnqualifiedColumnReferenceExp node, final ExpressionTypeContext expressionTypeContext
) {
final Optional<Column> possibleColumn = referenceValueColumnsOnly
? schema.findValueColumn(node.getReference())
: schema.findColumn(node.getReference());
final Optional<Column> possibleColumn = schema.findValueColumn(node.getReference());

final Column schemaColumn = possibleColumn.orElseThrow(() ->
new KsqlException(String.format("Invalid Expression %s.", node.toString())));
Expand Down Expand Up @@ -468,7 +463,7 @@ public Void visitFunctionCall(
final AggregateFunctionInitArguments args =
UdafUtil.createAggregateFunctionInitArgs(0, node);

final KsqlAggregateFunction aggFunc = functionRegistry
final KsqlAggregateFunction<?,?,?> aggFunc = functionRegistry
.getAggregateFunction(node.getName(), schema, args);

expressionTypeContext.setSqlType(aggFunc.returnType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class ExpressionTypeManagerTest {

@Before
public void init() {
expressionTypeManager = new ExpressionTypeManager(SCHEMA, functionRegistry, true);
expressionTypeManager = new ExpressionTypeManager(SCHEMA, functionRegistry);

final UdfFactory internalFactory = mock(UdfFactory.class);
final UdfMetadata metadata = mock(UdfMetadata.class);
Expand Down Expand Up @@ -482,7 +482,8 @@ public void shouldEvaluateTypeForStructExpression() {
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(COL0, SqlTypes.array(SqlTypes.INTEGER))
.build();
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);

expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);

final Expression exp = new CreateStructExpression(ImmutableList.of(
new Field("field1", new StringLiteral("foo")),
Expand Down Expand Up @@ -511,7 +512,7 @@ public void shouldEvaluateTypeForStructDereferenceInArray() {
.valueColumn(COL0, SqlTypes.array(inner))
.build();

expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);

final Expression expression = new DereferenceExpression(
Optional.empty(),
Expand All @@ -538,7 +539,7 @@ public void shouldEvaluateTypeForArrayReferenceInStruct() {
.valueColumn(COL0, inner)
.build();

expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);

final Expression structRef = new DereferenceExpression(
Optional.empty(),
Expand Down
Loading