Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move aggregations to plan builder #3391

Merged

Conversation

rodesai
Copy link
Contributor

@rodesai rodesai commented Sep 20, 2019

Description

This patch moves the code for building aggregations into a plan builder:

To express windowed aggregates, I've added a new execution step type
called StreamWindowedAggregate. Adding a new type ensures that when we
implement the visitor that builds the streams app, the handler for windowed
aggregations is type-safe.

I've moved the windowing pojos into KSQL execution, and split them off
from the AST. This way we can serialize these into the aggregation plan
nodes to express windows. I've also removed the code that builds
aggregations from these pojos and moved it into a visitor inside
StreamAggregateBuilder.

To implement window start/end projections, I had to move
WindowSelectMapper to ksql-execution. This also requires having
WindowSelectMapper own the start/end udaf names.

This patch also includes a refactor of AggregateNode to pass down the
aggregation function call expressions rather than the resolved aggregation
functions. The code for resolving the function call expressions against the
internal schema and building the aggregators, initializers, and undo
aggregators has been moved into a class called AggregateParams, which
the aggregate builders use to make the aggregation calls in streams.

The rest of the patch implements the actual aggregation from step builders.

How to review:

  1. look at the changes to the windowing POJOs moved from the ast to ksql-execution
  2. look at the changes to the windowstart and windowend udafs, which have been moved under ksql-execution
  3. look at the changes to AggregateNode, which has been refactored. It now computes a list of Function expressions resolved to the internal schema rather than a map of index to our internal function class.
  4. look at the new builders for converting from our plan to a streams topology for aggregates. These take the list of function expressions and convert them to the map expected by KudafAggregator. The builder for stream aggregates (StreamAggregateBuilder) also implements a visitor for computing windowed aggregates from a KsqlWindowExpression.

Testing done

Added unit tests for the new plan builders and AggregateParams.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@rodesai rodesai requested a review from a team as a code owner September 20, 2019 17:30
import java.util.Optional;

@Immutable
public abstract class KsqlWindowExpression extends Node {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is showing up as a new class, but it's just KsqlWindowExpression from ksql-parser moved to ksql-execution and with the interface that builds the aggregated KTable removed.

}

@SuppressWarnings("deprecation") // Need to migrate away from Connect Schema use.
public static KsqlAggregateFunction resolveAggregateFunction(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just AggregateNode.getAggregateFunction.

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rodesai ! LGTM with the usual batch of questions and nits inline.

private final ExecutionStepProperties properties;
private final ExecutionStep<G> source;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this used to be generic? Looks like the new StreamWindowedAggregate has the same KGroupedStream<Struct, GenericRow>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed up how the type parameters work in an earlier PR. Before, the input and output types were type parameters, and you had to specify one here (either G or T). Now we just have the step implementations implement ExecutionStep with the type parameter fully specified. For stream aggregates, both StreamAggregate and StreamWindowedAggregate take an ExecutionStep<KGroupedStream<Struct, GenericRow>> as the source. But the former implements ExecutionStep<KTable<Struct, GenericRow>> and the latter implements ExecutionStep<KTable<Windowed<Struct>, GenericRow>>.

&& Objects.equals(source, that.source)
&& Objects.equals(formats, that.formats)
&& Objects.equals(aggregations, that.aggregations)
&& nonFuncColumnCount == that.nonFuncColumnCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are aggregationSchema and windowExpression not part of this method (or hashCode() below)?

this.properties = Objects.requireNonNull(properties, "properties");
this.source = Objects.requireNonNull(source, "source");
this.formats = Objects.requireNonNull(formats, "formats");
this.nonFuncColumnCount = nonFuncColumnCount;
this.aggregations = Objects.requireNonNull(aggregations);
this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
this.aggregationSchema = Objects.requireNonNull(aggregationSchema, "aggregationSchema");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this not added to equals() or hashCode()? Same question for TableAggregate as well.

windowExpression.getKsqlWindowExpression()
);
step = aggregate;
table = StreamAggregateBuilder.build(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pull this out of the if-else block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to be it there - we build a windowed vs not windowed aggregate based on the if condition.

@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.GenericRow;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this wasn't moved to ksql-execution along with WindowSelectMapper?

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(fyi: unused imports that are being caught by checkstyle in the build)

@@ -1,4 +1,4 @@
/*
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: white-space change.

// When:
final WindowSelectMapper windowSelectMapper = aggregateParams.getWindowSelectMapper();
final Windowed<?> window = new Windowed<>(null, new TimeWindow(10, 20));
assertThat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: // Then:

@vcrfxia vcrfxia requested a review from a team September 24, 2019 19:47
@rodesai rodesai force-pushed the move-aggregation-to-plan-builder-rebase branch from 08c031a to f0fead2 Compare September 24, 2019 21:23
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rodesai,

I've not looked in detail, but LGTM. Few nits beyond what @vcrfxia called out

.map(
fc -> new FunctionCall(
fc.getName(),
internalSchema.getInternalArgsExpressionList(fc.getArguments())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally.... we should be looking to move the whole internal schema thing into the physical layer. The logical layer shouldn't need to know about such things. Though such internal names should be part of the serialized form of the physical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it shouldn't be in the logical plan node (like the other code that builds the execution plan). But the code that converts from the physical plan to the streams app shouldn't have to worry about it either. I think when we have a visitor that traverses the logical plan to build the physical plan, that would be the right place for it.

} else {
fieldType = converter.toSqlType(aggregateFunction.getReturnType());
}
for (int i = 0; i < aggregations.size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could move this building of schema, and other places we build schemas, into the QueryAnalyzer / Analyzer. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense to me for a follow-up.

@@ -122,54 +116,61 @@ public KeyField getKeyField() {
@SuppressWarnings("unchecked")
public SchemaKTable<?> aggregate(
final LogicalSchema outputSchema,
final Initializer initializer,
final LogicalSchema aggregateSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: can we have aggregateSchema before outputSchema in the param list, given that the aggregation happens before the output?

@@ -109,24 +105,24 @@
@Override
public SchemaKTable<Struct> aggregate(
final LogicalSchema outputSchema,
final Initializer initializer,
final LogicalSchema aggregateSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, can we switch the param overs?

private final int nonAggValSize;

public KudafInitializer(final int nonAggValSize) {
public KudafInitializer(final int nonAggValSize, final List<Supplier> aggValueSuppliers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call this param and the field by what it actually is... which is the initialValuesSupplier, right?

public final class AggregateParams {
private final KudafInitializer initializer;
private final int initialUdafIndex;
private final Map<Integer, KsqlAggregateFunction> indexToFunction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a Map I would suggest an ordered List< KsqlAggregateFunction>.

See my PR: #3409

The indexes in the map are always sequential, starting at initialUdafIndex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Was trying (and failing) to keep this diff as small as possible :)

This patch moves the code for building aggregations into a plan builder:

To express windowed aggregates, I've added a new execution step type
called StreamWindowedAggregate. Adding a new type ensures that when we
implement the visitor that builds the streams app, the handler for windowed
aggregations is type-safe.

I've moved the windowing pojos into KSQL execution, and split them off
from the AST. This way we can serialize these into the aggregation plan
nodes to express windows. I've also removed the code that builds
aggregations from these pojos and moved it into a visitor inside
StreamAggregateBuilder.

To implement window start/end projections, I had to move
WindowSelectMapper to ksql-execution. This also requires having
WindowSelectMapper own the start/end udaf names.

This patch also includes a refactor of AggregateNode to pass down the
aggregation function call expressions rather than the resolved aggregation
functions. The code for resolving the function call expressions against the
internal schema and building the aggregators, initializers, and undo
aggregators has been moved into a class called AggregateParams, which
the aggregate builders use to make the aggregation calls in streams.

The rest of the patch implements the actual aggregation from step builders.
@rodesai rodesai force-pushed the move-aggregation-to-plan-builder-rebase branch from f0fead2 to 58cae3e Compare September 25, 2019 22:30
@rodesai rodesai merged commit 3aaeb73 into confluentinc:master Sep 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants