Skip to content

Commit

Permalink
feat: build execution plan from structured package (#3285)
Browse files Browse the repository at this point in the history
This patch updates the classes in the structured package (SchemaKStream,
SchemaKTable, SchemaKGroupedStream, and SchemaKGroupedTable) to build
the query execution plan internally. The plan nodes themselves are
constructed by a factory class (ExecutionStepFactory).

Note that this currently requires some ugly passing around of redundant
parameters. For example, we have to pass around serdes _and_ value/key
formats for building those serdes. Once we move the actual streams calls
to the ksql-streams layer, this will get cleaned up.
  • Loading branch information
rodesai authored Sep 6, 2019
1 parent e590ae1 commit 0d0b1c3
Show file tree
Hide file tree
Showing 30 changed files with 1,713 additions and 517 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.structured.QueuedSchemaKStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -124,20 +123,11 @@ public QueryMetadata buildPhysicalPlan(final LogicalPlanNode logicalPlanNode) {
final SchemaKStream<?> resultStream = outputNode.buildStream(ksqlQueryBuilder);

if (outputNode instanceof KsqlBareOutputNode) {
if (!(resultStream instanceof QueuedSchemaKStream)) {
throw new KsqlException(String.format(
"Mismatch between logical and physical output; "
+ "expected a QueuedSchemaKStream based on logical "
+ "KsqlBareOutputNode, found a %s instead",
resultStream.getClass().getCanonicalName()
));
}

final String transientQueryPrefix =
ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);

return buildPlanForBareQuery(
(QueuedSchemaKStream<?>) resultStream,
resultStream,
(KsqlBareOutputNode) outputNode,
getServiceId(),
transientQueryPrefix,
Expand All @@ -147,15 +137,6 @@ public QueryMetadata buildPhysicalPlan(final LogicalPlanNode logicalPlanNode) {
}

if (outputNode instanceof KsqlStructuredDataOutputNode) {
if (resultStream instanceof QueuedSchemaKStream) {
throw new KsqlException(String.format(
"Mismatch between logical and physical output; "
+ "expected a SchemaKStream based on logical "
+ "QueuedSchemaKStream, found a %s instead",
resultStream.getClass().getCanonicalName()
));
}

final KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
(KsqlStructuredDataOutputNode) outputNode;

Expand All @@ -177,7 +158,7 @@ public QueryMetadata buildPhysicalPlan(final LogicalPlanNode logicalPlanNode) {
}

private QueryMetadata buildPlanForBareQuery(
final QueuedSchemaKStream<?> schemaKStream,
final SchemaKStream<?> schemaKStream,
final KsqlBareOutputNode bareOutputNode,
final String serviceId,
final String transientQueryPrefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.confluent.ksql.physical;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.structured.QueuedSchemaKStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.OptionalInt;
Expand All @@ -37,7 +37,7 @@ class TransientQueryQueue<K> {
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue =
new LinkedBlockingQueue<>(100);

TransientQueryQueue(final QueuedSchemaKStream<K> schemaKStream, final OptionalInt limit) {
TransientQueryQueue(final SchemaKStream<K> schemaKStream, final OptionalInt limit) {
this.callback = limit.isPresent()
? new LimitedQueueCallback(limit.getAsInt())
: new UnlimitedQueueCallback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
getGroupByExpressions());

final SchemaKGroupedStream schemaKGroupedStream = aggregateArgExpanded.groupBy(
valueFormat,
genericRowSerde,
internalGroupByColumns,
groupByContext
Expand Down Expand Up @@ -246,15 +247,20 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
aggregationContext.getQueryContext()
);

final List<FunctionCall> functionsWithInternalIdentifiers = functionList.stream()
.map(internalSchema::resolveToInternal)
.map(FunctionCall.class::cast)
.collect(Collectors.toList());
SchemaKTable<?> aggregated = schemaKGroupedStream.aggregate(
aggStageSchema,
initializer,
requiredColumns.size(),
functionsWithInternalIdentifiers,
aggValToFunctionMap,
getWindowExpression(),
valueFormat,
aggValueGenericRowSerde,
aggregationContext

);

if (havingExpressions != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static java.util.Objects.requireNonNull;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
Expand All @@ -37,7 +36,6 @@
import javax.annotation.concurrent.Immutable;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.Topology.AutoOffsetReset;

Expand Down Expand Up @@ -134,7 +132,7 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
builder,
dataSource,
schema,
contextStacker.push(SOURCE_OP_NAME).getQueryContext(),
contextStacker.push(SOURCE_OP_NAME),
timestampIndex(),
getAutoOffsetReset(builder.getKsqlConfig().getKsqlStreamConfigProps()),
keyField
Expand All @@ -143,20 +141,23 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
return schemaKStream;
}
final Stacker reduceContextStacker = contextStacker.push(REDUCE_OP_NAME);
final Serde<GenericRow> tableSerde = builder.buildValueSerde(
dataSource.getKsqlTopic().getValueFormat().getFormatInfo(),
PhysicalSchema.from(getSchema(), SerdeOption.none()),
reduceContextStacker.getQueryContext()
);
return schemaKStream.toTable(tableSerde, reduceContextStacker);
return schemaKStream.toTable(
dataSource.getKsqlTopic().getKeyFormat(),
dataSource.getKsqlTopic().getValueFormat(),
builder.buildValueSerde(
dataSource.getKsqlTopic().getValueFormat().getFormatInfo(),
PhysicalSchema.from(getSchema(), SerdeOption.none()),
reduceContextStacker.getQueryContext()
),
reduceContextStacker);
}

interface SchemaKStreamFactory {
SchemaKStream<?> create(
KsqlQueryBuilder builder,
DataSource<?> dataSource,
LogicalSchemaWithMetaAndKeyFields schemaWithMetaAndKeyFields,
QueryContext queryContext,
QueryContext.Stacker contextStacker,
int timestampIndex,
Optional<AutoOffsetReset> offsetReset,
KeyField keyField
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
Expand Down Expand Up @@ -280,15 +279,17 @@ static <K> SchemaKStream<K> maybeRePartitionByKey(
return stream.selectKey(joinFieldName, true, contextStacker);
}

static ValueFormat getFormatForSource(final DataSourceNode sourceNode) {
return sourceNode.getDataSource()
.getKsqlTopic()
.getValueFormat();
}

Serde<GenericRow> getSerDeForSource(
final DataSourceNode sourceNode,
final QueryContext.Stacker contextStacker
) {
final DataSource<?> dataSource = sourceNode.getDataSource();

final ValueFormat valueFormat = dataSource
.getKsqlTopic()
.getValueFormat();
final ValueFormat valueFormat = getFormatForSource(sourceNode);

final LogicalSchema logicalSchema = sourceNode.getSchema()
.withoutAlias();
Expand Down Expand Up @@ -367,6 +368,8 @@ public SchemaKStream<K> join() {
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)),
contextStacker);
Expand All @@ -376,6 +379,8 @@ public SchemaKStream<K> join() {
joinNode.schema,
getOuterJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)),
contextStacker);
Expand All @@ -385,6 +390,8 @@ public SchemaKStream<K> join() {
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)),
contextStacker);
Expand Down Expand Up @@ -424,6 +431,7 @@ public SchemaKStream<K> join() {
rightTable,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
getFormatForSource(joinNode.left),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
contextStacker);

Expand All @@ -432,6 +440,7 @@ public SchemaKStream<K> join() {
rightTable,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
getFormatForSource(joinNode.left),
getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
contextStacker);
case OUTER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.structured.QueuedSchemaKStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.QueryIdGenerator;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
Expand Down Expand Up @@ -56,12 +55,6 @@ public KeyField getKeyField() {

@Override
public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
final SchemaKStream<?> schemaKStream = getSource()
.buildStream(builder);

return new QueuedSchemaKStream<>(
schemaKStream,
builder.buildNodeContext(getId().toString()).getQueryContext()
);
return getSource().buildStream(builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,15 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
contextStacker.getQueryContext()
);

result.into(
return result.into(
getKsqlTopic().getKafkaTopicName(),
outputRowSerde,
implicitAndKeyFieldIndexes
getSchema(),
getKsqlTopic().getValueFormat(),
serdeOptions,
implicitAndKeyFieldIndexes,
contextStacker
);

return result;
}

@SuppressWarnings("unchecked")
Expand All @@ -164,7 +166,7 @@ private SchemaKStream<?> createOutputStream(
getKeyField().legacy()
);

final SchemaKStream result = schemaKStream.sink(resultKeyField, contextStacker);
final SchemaKStream result = schemaKStream.withKeyField(resultKeyField);

if (!partitionByField.isPresent()) {
return result;
Expand Down

This file was deleted.

Loading

0 comments on commit 0d0b1c3

Please sign in to comment.