Skip to content

Commit

Permalink
feat: build ks app from an execution plan visitor (#3418)
Browse files Browse the repository at this point in the history
* feat: build ks app from an execution plan visitor

This patch implements a visitor that iterates over the execution plan
and builds the final kstreams app. In addition to defining and
implementing the visitor, this required updating the type built by
many of the plan nodes to a wrapper class that includes both a kstream/
ktable, and a factory for building key serdes.

Now that we have this visior, we no longer need the code in SchemaKX
that makes calls into kafka streams, so that's all cleaned up.

Finally, we need to actually call the visitor to build the streams app.
For now that's happening in PhysicalPlanBuilder, but that will get moved
very soon.
  • Loading branch information
rodesai authored Oct 2, 2019
1 parent e93c445 commit b57d194
Show file tree
Hide file tree
Showing 74 changed files with 1,517 additions and 1,287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.plan.PlanBuilder;
import io.confluent.ksql.execution.streams.KSPlanBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
Expand Down Expand Up @@ -59,7 +62,6 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QuerySchemas;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -78,6 +80,8 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class PhysicalPlanBuilder {
Expand Down Expand Up @@ -152,6 +156,7 @@ public QueryMetadata buildPhysicalPlan(final LogicalPlanNode logicalPlanNode) {
ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);

return buildPlanForBareQuery(
ksqlQueryBuilder,
resultStream,
(KsqlBareOutputNode) outputNode,
getServiceId(),
Expand All @@ -169,20 +174,21 @@ public QueryMetadata buildPhysicalPlan(final LogicalPlanNode logicalPlanNode) {
ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG);

return buildPlanForStructuredOutputNode(
ksqlQueryBuilder,
logicalPlanNode.getStatementText(),
resultStream,
ksqlStructuredDataOutputNode,
getServiceId(),
persistanceQueryPrefix,
queryId,
ksqlQueryBuilder.getSchemas()
queryId
);
}

throw new KsqlException("Sink data source type unsupported: " + outputNode.getClass());
}

private QueryMetadata buildPlanForBareQuery(
final KsqlQueryBuilder ksqlQueryBuilder,
final SchemaKStream<?> schemaKStream,
final KsqlBareOutputNode bareOutputNode,
final String serviceId,
Expand All @@ -204,8 +210,18 @@ private QueryMetadata buildPlanForBareQuery(
processingLogContext
);

final KStream<?, GenericRow> kStream;
final PlanBuilder planBuilder = new KSPlanBuilder(ksqlQueryBuilder);
if (schemaKStream instanceof SchemaKTable) {
final KTable<?, GenericRow> kTable =
((SchemaKTable<?>) schemaKStream).getSourceTableStep().build(planBuilder).getTable();
kStream = kTable.toStream();
} else {
kStream = schemaKStream.getSourceStep().build(planBuilder).getStream();
}

final TransientQueryQueue<?> queue =
new TransientQueryQueue<>(schemaKStream, bareOutputNode.getLimit());
new TransientQueryQueue<>(kStream, bareOutputNode.getLimit());

final KafkaStreams streams = kafkaStreamsBuilder.buildKafkaStreams(builder, streamsProperties);

Expand All @@ -231,20 +247,22 @@ private QueryMetadata buildPlanForBareQuery(
}

private QueryMetadata buildPlanForStructuredOutputNode(
final KsqlQueryBuilder ksqlQueryBuilder,
final String sqlExpression,
final SchemaKStream<?> schemaKStream,
final KsqlStructuredDataOutputNode outputNode,
final String serviceId,
final String persistanceQueryPrefix,
final QueryId queryId,
final QuerySchemas schemas
final QueryId queryId
) {
final DataSourceType sourceType = (schemaKStream instanceof SchemaKTable)
? DataSourceType.KTABLE
: DataSourceType.KSTREAM;

final DataSource<?> sinkDataSource;
final PlanBuilder planBuilder = new KSPlanBuilder(ksqlQueryBuilder);
if (sourceType == DataSourceType.KTABLE) {
((SchemaKTable) schemaKStream).getSourceTableStep().build(planBuilder);
sinkDataSource = new KsqlTable<>(
sqlExpression,
outputNode.getIntoSourceName(),
Expand All @@ -255,6 +273,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
outputNode.getKsqlTopic()
);
} else {
schemaKStream.getSourceStep().build(planBuilder);
sinkDataSource = new KsqlStream<>(
sqlExpression,
outputNode.getIntoSourceName(),
Expand Down Expand Up @@ -315,7 +334,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
applicationId,
sinkDataSource.getKsqlTopic(),
topology,
schemas,
ksqlQueryBuilder.getSchemas(),
streamsProperties,
overriddenProperties,
queryCloseCallback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
package io.confluent.ksql.physical;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;

/**
Expand All @@ -37,13 +37,12 @@ class TransientQueryQueue<K> {
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue =
new LinkedBlockingQueue<>(100);

TransientQueryQueue(final SchemaKStream<K> schemaKStream, final OptionalInt limit) {
TransientQueryQueue(final KStream<?, GenericRow> kstream, final OptionalInt limit) {
this.callback = limit.isPresent()
? new LimitedQueueCallback(limit.getAsInt())
: new UnlimitedQueueCallback();

schemaKStream.getKstream()
.foreach(new TransientQueryQueue.QueuePopulator<>(rowQueue, callback));
kstream.foreach(new TransientQueryQueue.QueuePopulator<>(rowQueue, callback));
}

BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.plan.StreamAggregate;
import io.confluent.ksql.execution.plan.StreamWindowedAggregate;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.StreamAggregateBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.model.WindowType;
Expand All @@ -45,12 +45,10 @@
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Windowed;

public class SchemaKGroupedStream {

final KGroupedStream kgroupedStream;
final ExecutionStep<KGroupedStream<Struct, GenericRow>> sourceStep;
final KeyFormat keyFormat;
final KeySerde<Struct> keySerde;
Expand All @@ -61,7 +59,6 @@ public class SchemaKGroupedStream {
final MaterializedFactory materializedFactory;

SchemaKGroupedStream(
final KGroupedStream kgroupedStream,
final ExecutionStep<KGroupedStream<Struct, GenericRow>> sourceStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -71,7 +68,6 @@ public class SchemaKGroupedStream {
final FunctionRegistry functionRegistry
) {
this(
kgroupedStream,
sourceStep,
keyFormat,
keySerde,
Expand All @@ -84,7 +80,6 @@ public class SchemaKGroupedStream {
}

SchemaKGroupedStream(
final KGroupedStream kgroupedStream,
final ExecutionStep<KGroupedStream<Struct, GenericRow>> sourceStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -94,7 +89,6 @@ public class SchemaKGroupedStream {
final FunctionRegistry functionRegistry,
final MaterializedFactory materializedFactory
) {
this.kgroupedStream = kgroupedStream;
this.sourceStep = sourceStep;
this.keyFormat = Objects.requireNonNull(keyFormat, "keyFormat");
this.keySerde = Objects.requireNonNull(keySerde, "keySerde");
Expand Down Expand Up @@ -126,8 +120,7 @@ public SchemaKTable<?> aggregate(
) {
throwOnValueFieldCountMismatch(outputSchema, nonFuncColumnCount, aggregations);

final ExecutionStep<? extends KTable<?, GenericRow>> step;
final KTable table;
final ExecutionStep<? extends KTableHolder<?>> step;
final KeySerde<?> newKeySerde;
final KeyFormat keyFormat;

Expand All @@ -145,12 +138,6 @@ public SchemaKTable<?> aggregate(
windowExpression.get().getKsqlWindowExpression()
);
step = aggregate;
table = StreamAggregateBuilder.build(
kgroupedStream,
aggregate,
queryBuilder,
materializedFactory
);
} else {
keyFormat = this.keyFormat;
newKeySerde = keySerde;
Expand All @@ -164,16 +151,9 @@ public SchemaKTable<?> aggregate(
aggregateSchema
);
step = aggregate;
table = StreamAggregateBuilder.build(
kgroupedStream,
aggregate,
queryBuilder,
materializedFactory
);
}

return new SchemaKTable(
table,
step,
keyFormat,
newKeySerde,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.confluent.ksql.execution.plan.TableAggregate;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.streams.MaterializedFactory;
import io.confluent.ksql.execution.streams.TableAggregateBuilder;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.metastore.model.KeyField;
Expand All @@ -46,11 +45,9 @@
import org.apache.kafka.streams.kstream.KGroupedTable;

public class SchemaKGroupedTable extends SchemaKGroupedStream {
private final KGroupedTable kgroupedTable;
private final ExecutionStep<KGroupedTable<Struct, GenericRow>> sourceTableStep;

SchemaKGroupedTable(
final KGroupedTable kgroupedTable,
final ExecutionStep<KGroupedTable<Struct, GenericRow>> sourceTableStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -60,7 +57,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
final FunctionRegistry functionRegistry
) {
this(
kgroupedTable,
sourceTableStep,
keyFormat,
keySerde,
Expand All @@ -72,7 +68,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
}

SchemaKGroupedTable(
final KGroupedTable kgroupedTable,
final ExecutionStep<KGroupedTable<Struct, GenericRow>> sourceTableStep,
final KeyFormat keyFormat,
final KeySerde<Struct> keySerde,
Expand All @@ -83,7 +78,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
final MaterializedFactory materializedFactory
) {
super(
null,
null,
keyFormat,
keySerde,
Expand All @@ -94,7 +88,6 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream {
materializedFactory
);

this.kgroupedTable = Objects.requireNonNull(kgroupedTable, "kgroupedTable");
this.sourceTableStep = Objects.requireNonNull(sourceTableStep, "sourceTableStep");
}

Expand Down Expand Up @@ -144,12 +137,6 @@ public SchemaKTable<Struct> aggregate(
);

return new SchemaKTable<>(
TableAggregateBuilder.build(
kgroupedTable,
step,
queryBuilder,
materializedFactory
),
step,
keyFormat,
keySerde,
Expand Down
Loading

0 comments on commit b57d194

Please sign in to comment.