Skip to content

Commit

Permalink
chore: add full window bounds support to pull queries
Browse files Browse the repository at this point in the history
fixes: confluentinc#3633

This change sees pull queries share more functionality and code around the window bounds columns `WINDOWSTART` and `WINDOWEND` introduced in confluentinc#4388 and confluentinc#4401.

* pull queries on time windowed sources, i.e. `TUMBLING` and `HOPPING`, now have a `WINDOWEND` in their schema, just like `SESSION` and the new push query functionality.
* window bound columns are now accessible within the projection of a pull query, e.g. `SELECT WINDOWSTART, WINDOWEND FROM FOO WHERE ROWKEY=1;`
  • Loading branch information
big-andy-coates committed Jan 29, 2020
1 parent acd9f27 commit 537aa56
Show file tree
Hide file tree
Showing 28 changed files with 250 additions and 271 deletions.
8 changes: 4 additions & 4 deletions docs-md/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ SELECT * FROM pageviews_table WHERE ROWKEY='User_9|+|Page_39';
Your output should resemble:

```
+-----------------+---------------+---------------+--------+---------+-------+
ROWKEY | WINDOWSTART | ROWTIME | USERID | PAGEID | TOTAL |
------------------+---------------+---------------+--------+---------+-------+
User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1 |
+------------------+---------------+---------------+---------------+--------+---------+-------+
| ROWKEY | WINDOWSTART | WINDOWEND | ROWTIME | USERID | PAGEID | TOTAL |
+------------------+---------------+---------------+---------------+--------+---------+-------+
| User_9|+|Page_39 | 1557183900000 | 1557183960000 | 1557183929488 | User_9 | Page_39 | 1 |
Query terminated
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ private LogicalSchema buildProjectionSchema(
final List<SelectExpression> projection) {
final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(
schema,
functionRegistry,
true
functionRegistry
);

final Builder builder = LogicalSchema.builder();
Expand Down Expand Up @@ -414,7 +413,7 @@ private LogicalSchema buildAggregateSchema(
keyType = SqlTypes.STRING;
} else {
final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry, true);
new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry);

keyType = typeManager.getExpressionSqlType(groupByExps.get(0));
}
Expand All @@ -437,7 +436,7 @@ private LogicalSchema buildRepartitionedSchema(
final LogicalSchema sourceSchema = sourceNode.getSchema();

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry, false);
new ExpressionTypeManager(sourceSchema, functionRegistry);

final SqlType keyType = typeManager.getExpressionSqlType(partitionBy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private Optional<MaterializationProvider> buildMaterializationProvider(
kafkaStreams,
info.getStateStoreSchema(),
keySerializer,
keyFormat.getWindowType(),
keyFormat.getWindowInfo(),
streamsProperties,
ksqlConfig
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
final MaterializedWindowedTable table = materialization.windowed();

rows.forEach((k, v) -> {
final Window w = Window.of(k.window().startTime(), Optional.empty());
final Window w = Window.of(k.window().startTime(), k.window().endTime());
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart = table.get(key, Range.singleton(w.start()));
Expand Down Expand Up @@ -332,7 +332,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
final MaterializedWindowedTable table = materialization.windowed();

rows.forEach((k, v) -> {
final Window w = Window.of(k.window().startTime(), Optional.empty());
final Window w = Window.of(k.window().startTime(), k.window().endTime());
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart = table.get(key, Range.singleton(w.start()));
Expand Down Expand Up @@ -376,7 +376,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
final MaterializedWindowedTable table = materialization.windowed();

rows.forEach((k, v) -> {
final Window w = Window.of(k.window().startTime(), Optional.of(k.window().endTime()));
final Window w = Window.of(k.window().startTime(), k.window().endTime());
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart = table.get(key, Range.singleton(w.start()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public CodeGenRunner(
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry");
this.schema = requireNonNull(schema, "schema");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
}

public CodeGenSpec getCodeGenSpec(final Expression expression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static SqlToJavaVisitor of(
final Function<FunctionName, String> funNameToCodeName,
final Function<CreateStructExpression, String> structToCodeName
) {
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
this.expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
this.schema = Objects.requireNonNull(schema, "schema");
this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry");
this.colRefToCodeName = Objects.requireNonNull(colRefToCodeName, "colRefToCodeName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private UdafUtil() {
) {
try {
final ExpressionTypeManager expressionTypeManager =
new ExpressionTypeManager(schema, functionRegistry, true);
new ExpressionTypeManager(schema, functionRegistry);

final SqlType argumentType =
expressionTypeManager.getExpressionSqlType(functionCall.getArguments().get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static KsqlTableFunction resolveTableFunction(
final LogicalSchema schema
) {
final ExpressionTypeManager expressionTypeManager =
new ExpressionTypeManager(schema, functionRegistry, true);
new ExpressionTypeManager(schema, functionRegistry);

final List<Expression> functionArgs = functionCall.getArguments();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,13 @@ public class ExpressionTypeManager {

private final LogicalSchema schema;
private final FunctionRegistry functionRegistry;
private final boolean referenceValueColumnsOnly;

public ExpressionTypeManager(
final LogicalSchema schema,
final FunctionRegistry functionRegistry,
final boolean referenceValueColumnsOnly
final FunctionRegistry functionRegistry
) {
this.schema = Objects.requireNonNull(schema, "schema");
this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry");
this.referenceValueColumnsOnly = referenceValueColumnsOnly;
}

public SqlType getExpressionSqlType(final Expression expression) {
Expand Down Expand Up @@ -185,9 +182,7 @@ public Void visitBetweenPredicate(
public Void visitColumnReference(
final UnqualifiedColumnReferenceExp node, final ExpressionTypeContext expressionTypeContext
) {
final Optional<Column> possibleColumn = referenceValueColumnsOnly
? schema.findValueColumn(node.getReference())
: schema.findColumn(node.getReference());
final Optional<Column> possibleColumn = schema.findValueColumn(node.getReference());

final Column schemaColumn = possibleColumn.orElseThrow(() ->
new KsqlException(String.format("Invalid Expression %s.", node.toString())));
Expand Down Expand Up @@ -468,7 +463,7 @@ public Void visitFunctionCall(
final AggregateFunctionInitArguments args =
UdafUtil.createAggregateFunctionInitArgs(0, node);

final KsqlAggregateFunction aggFunc = functionRegistry
final KsqlAggregateFunction<?,?,?> aggFunc = functionRegistry
.getAggregateFunction(node.getName(), schema, args);

expressionTypeContext.setSqlType(aggFunc.returnType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class ExpressionTypeManagerTest {

@Before
public void init() {
expressionTypeManager = new ExpressionTypeManager(SCHEMA, functionRegistry, true);
expressionTypeManager = new ExpressionTypeManager(SCHEMA, functionRegistry);

final UdfFactory internalFactory = mock(UdfFactory.class);
final UdfMetadata metadata = mock(UdfMetadata.class);
Expand Down Expand Up @@ -482,7 +482,8 @@ public void shouldEvaluateTypeForStructExpression() {
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(COL0, SqlTypes.array(SqlTypes.INTEGER))
.build();
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);

expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);

final Expression exp = new CreateStructExpression(ImmutableList.of(
new Field("field1", new StringLiteral("foo")),
Expand Down Expand Up @@ -511,7 +512,7 @@ public void shouldEvaluateTypeForStructDereferenceInArray() {
.valueColumn(COL0, SqlTypes.array(inner))
.build();

expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);

final Expression expression = new DereferenceExpression(
Optional.empty(),
Expand All @@ -538,7 +539,7 @@ public void shouldEvaluateTypeForArrayReferenceInStruct() {
.valueColumn(COL0, inner)
.build();

expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry, true);
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);

final Expression structRef = new DereferenceExpression(
Optional.empty(),
Expand Down
Loading

0 comments on commit 537aa56

Please sign in to comment.