Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support UDAFs with different intermediate schema #3412

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Sep 25, 2019

Description

@vpapavas recently enhanced UDAFs to support a different intermediate type, e.g. the AVG UDAF might take an DOUBLE input and output, but it's intermediate type is STRUCT<COUNT BIGINT, SUM DOUBLE>.

This enhancement did not include the necessary parallel changes needed to support the same for static queries.

The rocksdb state stores store the intermediate type. Therefore, static queries, which read data from the rocksdb state stores, must now perform an additional step to convert the intermediate aggregator state to the output type of the aggregator.

How to review

In the future, static queries will also generate physical query plans. But for now they are hacked together in StaticQueryExecutor. So please refrain from commenting on the architecture of this PR. I'm well aware it is less than ideal. Once @rodesai has finished decoupling the KS implementation from the physical plan I can start to address this.

  • KsqlMaterialization and it's factory KsqlMaterializationFactory contain the main change, which is to add another step to convert aggregate internal state to final state.
  • AggregatesInfo was added to capture the data needed for this new step
  • And AggregateNode enhanced to capture it and pass it to MaterializationInfo .

Also fixed some bad import order issues along the way, (or rather my IDE did)

Testing done

materialized-aggregate-static-queries.json has new tests to cover this.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@big-andy-coates big-andy-coates requested a review from a team as a code owner September 25, 2019 21:04
# Conflicts:
#	ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java
@vpapavas
Copy link
Member

LGTM +1.

This is what I understand: We use the KsqlMaterializationFactory to create a new Materialization which is done in the Physical Plan builder and represents materialized state that we store in RocksDB. These materializations are needed to store intermediate state between streams steps. Like for example, when aggregating where intermediate data is stored in RocksDB. You added the map functions of UDAFs to be called so that we can go from the intermediate materialized state creating during aggregation to the final materialized state after aggregation (whose schemas might be different).

@vpapavas vpapavas requested a review from a team September 27, 2019 18:32
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM w/ 1 nit inline

@big-andy-coates big-andy-coates merged commit 70e10e9 into confluentinc:master Sep 27, 2019
@big-andy-coates big-andy-coates deleted the static_query_intermediate_aggs branch September 27, 2019 20:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants