Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: build materializations from the physical plan #3494

Merged

Conversation

rodesai
Copy link
Contributor

@rodesai rodesai commented Oct 8, 2019

This patch changes up how we build materializations to use the
physical plan instead of building from the logical plan. A
materialization is the data in the state store with some additional
transformations applied. These transformations should be the same
as those applied by the steps in the physical plan following the
aggregation step. This patch builds up the transformation as we
build the streams app from the physical plan.

Transformations are described in MaterializationInfo, and
accumulated by a Builder class. A transformation allows mapping
a key/row pair to an optional row. The row is optional to support
filters. We currently support 3 types of transformations:

  • aggregate result map: mapping the results of complex aggregates
    (e.g. avg)
  • projection
  • filter

Finally, the materialization code has been moved to the ksql-streams
module so that it can be used by the streams app builder.

How to review:

  1. Look at MaterializationInfo, which has been changed to support
    generic transformations, and includes the 3 types of transforms.
  2. Look at KsqlMaterializationFactory to see how the transform descriptors
    are built into a list of transforms.
  3. Look at KsqlMaterialization to see how the transforms are applied.
  4. Look at KsqlPlanBuilder to see how we build up the transforms as we
    build the streams app from the physical plan.

@rodesai rodesai requested a review from a team as a code owner October 8, 2019 08:32
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rodesai, this is awesome.

Few nits and suggestions below.

) {
this.stateStoreName = requireNonNull(stateStoreName, "stateStoreName");
this.stateStoreSchema = requireNonNull(stateStoreSchema, "stateStoreSchema");
this.transforms = requireNonNull(transforms, "transforms");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defensive copy:

Suggested change
this.transforms = requireNonNull(transforms, "transforms");
this.transforms = ImmutableList.copyOf(requireNonNull(transforms, "transforms"));

@@ -181,18 +182,24 @@ public TransientQueryMetadata buildTransientQuery(
);
}

private Optional<MaterializationInfo> getMaterializationInfo(final Object result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:static?

* @param schema the schema of the result of applying transformations to the data in the store
* @return instance.
*/
public static MaterializationInfo of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer used.

private final List<TransformInfo> transforms;
private LogicalSchema schema;

public Builder(final String stateStoreName, final LogicalSchema stateStoreSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a private constructor and having a public Builder builder(final String stateStoreName, final LogicalSchema stateStoreSchema) method on MaterializationInfo.

this.schema = stateStoreSchema;
}

public Builder mapAggregates(final AggregatesInfo aggregatesInfo, final LogicalSchema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming schema param to something that doesn't clash with the schema field would make this code more readable, e.g.

Suggested change
public Builder mapAggregates(final AggregatesInfo aggregatesInfo, final LogicalSchema schema) {
public Builder mapAggregates(final AggregatesInfo aggregatesInfo, final LogicalSchema aggregatedSchema) {

Objects.requireNonNull(materializationBuilder, "materializationProvider");
}

public KTableHolder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the overloaded constructor? Such constructors can make it hard for people to know which to call. Consider just inlining this constructor.

@@ -53,6 +53,11 @@
<artifactId>connect-runtime</artifactId>
</dependency>

<dependency>
<groupId>io.confluent.support</groupId>
<artifactId>support-metrics-common</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's worth the dependency just for a DI Clock type. Maybe just use Supplier<Long> or add a Clock type to our common module...

this.schema = requireNonNull(schema, "schema");
this.transforms = Objects.requireNonNull(transforms, "transforms");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defensive copy:

Suggested change
this.transforms = Objects.requireNonNull(transforms, "transforms");
this.transforms = ImmutableList.copyOf(requireNonNull(transforms, "transforms"));

info.getInfo(),
functionRegistry
);
return (s, g) -> Optional.of(mapper.apply(g));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s and g are the key and value, which would more normally be represented with k and v, i.e. name them after what they are, not what type they are.

return this;
}

public Builder mapValues(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider naming this inline with what it actually is, i.e. a projection. This then marries up with the project operation name used in the processing logger when handling this transform.

To put it another way ... this is specifically a projection, not a general mapValues, so the name should reflect that IMHO.

Maybe project or selectColumns or some such?

@rodesai rodesai force-pushed the build-materialization-from-plan branch from 0880f66 to 44f6d86 Compare October 9, 2019 00:55
This patch changes up how we build materializations to use the
physical plan instead of building from the logical plan. A
materialization is the data in the state store with some additional
transformations applied. These transformations should be the same
as those applied by the steps in the physical plan following the
aggregation step. This patch builds up the transformation as we
build the streams app from the physical plan.

Transformations are described in MaterializationInfo, and
accumulated by a Builder class. A transformation allows mapping
a key/row pair to an optional row. The row is optional to support
filters. We currently support 3 types of transformations:
   - aggregate result map: mapping the results of complex aggregates
     (e.g. avg)
   - projection
   - filter

Finally, the materialization code has been moved to the ksql-streams
module so that it can be used by the streams app builder.
@rodesai rodesai force-pushed the build-materialization-from-plan branch from 44f6d86 to fa156eb Compare October 9, 2019 02:25
@rodesai rodesai merged commit f45d649 into confluentinc:master Oct 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants