Skip to content

Commit

Permalink
feat: build materializations from the physical plan (#3494)
Browse files Browse the repository at this point in the history
* feat: build materializations from the physical plan

This patch changes up how we build materializations to use the
physical plan instead of building from the logical plan. A
materialization is the data in the state store with some additional
transformations applied. These transformations should be the same
as those applied by the steps in the physical plan following the
aggregation step. This patch builds up the transformation as we
build the streams app from the physical plan.

Transformations are described in MaterializationInfo, and
accumulated by a Builder class. A transformation allows mapping
a key/row pair to an optional row. The row is optional to support
filters. We currently support 3 types of transformations:
   - aggregate result map: mapping the results of complex aggregates
     (e.g. avg)
   - projection
   - filter

Finally, the materialization code has been moved to the ksql-streams
module so that it can be used by the streams app builder.

* Review feedback
  • Loading branch information
rodesai authored Oct 9, 2019
1 parent 7260a93 commit f45d649
Show file tree
Hide file tree
Showing 66 changed files with 816 additions and 527 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ private PersistentQueryMetadata executePersistentQuery(final KsqlPlan ksqlPlan)
ksqlPlan.getStatementText(),
physicalPlan.getQueryId(),
engineContext.getMetaStore().getSource(queryPlan.getSink()),
physicalPlan.getMaterializationInfo(),
queryPlan.getSources(),
physicalPlan.getPhysicalPlan(),
physicalPlan.getPlanSummary()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,27 @@

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.materialization.MaterializationInfo;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.query.QueryId;
import java.util.Objects;
import java.util.Optional;

@Immutable
public final class PhysicalPlan<T> {
private final QueryId queryId;
private final ExecutionStep<T> physicalPlan;
private final Optional<MaterializationInfo> materializationInfo;
private final String planSummary;
private final transient KeyField keyField;

PhysicalPlan(
final QueryId queryId,
final ExecutionStep<T> physicalPlan,
final Optional<MaterializationInfo> materializationInfo,
final String planSummary,
final KeyField keyField
) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
this.physicalPlan = Objects.requireNonNull(physicalPlan, "physicalPlan");
this.planSummary = Objects.requireNonNull(planSummary, "planSummary");
this.keyField = Objects.requireNonNull(keyField, "keyField");
this.materializationInfo = Objects.requireNonNull(materializationInfo, "materializationInfo");
}

public ExecutionStep<?> getPhysicalPlan() {
Expand All @@ -60,8 +55,4 @@ public KeyField getKeyField() {
public QueryId getQueryId() {
return queryId;
}

public Optional<MaterializationInfo> getMaterializationInfo() {
return materializationInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,17 @@

package io.confluent.ksql.physical;

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;

import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.materialization.MaterializationInfo;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.plan.AggregateNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.streams.StreamsBuilder;

public class PhysicalPlanBuilder {
Expand Down Expand Up @@ -77,33 +70,12 @@ public PhysicalPlan<?> buildPhysicalPlan(final LogicalPlanNode logicalPlanNode)
);

final SchemaKStream<?> resultStream = outputNode.buildStream(ksqlQueryBuilder);
final DataSourceType sourceType = (resultStream instanceof SchemaKTable)
? DataSourceType.KTABLE
: DataSourceType.KSTREAM;
final Optional<MaterializationInfo> materializationInfo = sourceType == DataSourceType.KTABLE
? findMaterializationInfo(outputNode)
: Optional.empty();
return new PhysicalPlan<>(
queryId,
resultStream.getSourceStep(),
materializationInfo,
resultStream.getExecutionPlan(queryId, ""),
resultStream.getKeyField()
);
}

private static Optional<MaterializationInfo> findMaterializationInfo(
final PlanNode node
) {
if (node instanceof AggregateNode) {
return ((AggregateNode) node).getMaterializationInfo();
}

return node.getSources().stream()
.map(PhysicalPlanBuilder::findMaterializationInfo)
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.materialization.AggregatesInfo;
import io.confluent.ksql.materialization.MaterializationInfo;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.tree.WindowExpression;
Expand Down Expand Up @@ -66,7 +64,6 @@

public class AggregateNode extends PlanNode {

private static final String AGGREGATE_STATE_STORE_NAME = "Aggregate-aggregate";
private static final String INTERNAL_COLUMN_NAME_PREFIX = "KSQL_INTERNAL_COL_";

private static final String PREPARE_OP_NAME = "prepare";
Expand All @@ -85,7 +82,6 @@ public class AggregateNode extends PlanNode {
private final List<ColumnReferenceExp> requiredColumns;
private final List<Expression> finalSelectExpressions;
private final Expression havingExpressions;
private Optional<MaterializationInfo> materializationInfo = Optional.empty();

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public AggregateNode(
Expand Down Expand Up @@ -161,10 +157,6 @@ public List<ColumnReferenceExp> getRequiredColumns() {
return requiredColumns;
}

public Optional<MaterializationInfo> getMaterializationInfo() {
return materializationInfo;
}

private List<SelectExpression> getFinalSelectExpressions() {
final List<SelectExpression> finalSelectExpressionList = new ArrayList<>();
if (finalSelectExpressions.size() != schema.value().size()) {
Expand Down Expand Up @@ -285,21 +277,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
final List<SelectExpression> finalSelects = internalSchema
.updateFinalSelectExpressions(getFinalSelectExpressions());

final AggregatesInfo aggregatesInfo = AggregatesInfo.of(
requiredColumns.size(),
functionsWithInternalIdentifiers,
prepareSchema
);

materializationInfo = Optional.of(MaterializationInfo.of(
AGGREGATE_STATE_STORE_NAME,
aggregatesInfo,
outputSchema,
havingExpression,
schema,
finalSelects
));

return aggregated.select(
finalSelects,
contextStacker.push(PROJECT_OP_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.materialization.MaterializationInfo.Builder;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.plan.PlanBuilder;
import io.confluent.ksql.execution.streams.KSPlanBuilder;
import io.confluent.ksql.execution.streams.materialization.KsqlMaterializationFactory;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.execution.streams.materialization.ks.KsMaterialization;
import io.confluent.ksql.execution.streams.materialization.ks.KsMaterializationFactory;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.materialization.KsqlMaterializationFactory;
import io.confluent.ksql.materialization.MaterializationInfo;
import io.confluent.ksql.materialization.MaterializationProvider;
import io.confluent.ksql.materialization.ks.KsMaterialization;
import io.confluent.ksql.materialization.ks.KsMaterializationFactory;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.ProducerCollector;
Expand Down Expand Up @@ -181,18 +182,24 @@ public TransientQueryMetadata buildTransientQuery(
);
}

private static Optional<MaterializationInfo> getMaterializationInfo(final Object result) {
if (result instanceof KTableHolder) {
return ((KTableHolder<?>) result).getMaterializationBuilder().map(Builder::build);
}
return Optional.empty();
}

public PersistentQueryMetadata buildQuery(
final String statementText,
final QueryId queryId,
final DataSource<?> sinkDataSource,
final Optional<MaterializationInfo> materializationInfo,
final Set<SourceName> sources,
final ExecutionStep<?> physicalPlan,
final String planSummary
) {
final KsqlQueryBuilder ksqlQueryBuilder = queryBuilder(queryId);
final PlanBuilder planBuilder = new KSPlanBuilder(ksqlQueryBuilder);
physicalPlan.build(planBuilder);
final Object result = physicalPlan.build(planBuilder);
final String persistanceQueryPrefix =
ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG);
final String applicationId = getQueryApplicationId(
Expand All @@ -208,7 +215,7 @@ public PersistentQueryMetadata buildQuery(
sinkDataSource.getSchema(),
sinkDataSource.getSerdeOptions()
);
final Optional<MaterializationProvider> materializationBuilder = materializationInfo
final Optional<MaterializationProvider> materializationBuilder = getMaterializationInfo(result)
.flatMap(info -> buildMaterializationProvider(
info,
streams,
Expand Down Expand Up @@ -353,7 +360,7 @@ private Optional<MaterializationProvider> buildMaterializationProvider(
.create(
info.stateStoreName(),
kafkaStreams,
info.aggregationSchema(),
info.getStateStoreSchema(),
keySerializer,
keyFormat.getWindowType(),
streamsProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.materialization.Materialization;
import io.confluent.ksql.materialization.MaterializationProvider;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.integration.TestKsqlContext;
import io.confluent.ksql.materialization.Materialization;
import io.confluent.ksql.materialization.MaterializedTable;
import io.confluent.ksql.materialization.MaterializedWindowedTable;
import io.confluent.ksql.materialization.Row;
import io.confluent.ksql.materialization.Window;
import io.confluent.ksql.materialization.WindowedRow;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializedTable;
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.Window;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.query.QueryId;
Expand Down
Loading

0 comments on commit f45d649

Please sign in to comment.