Skip to content

Commit

Permalink
Process commands so that WITH clause is always resolved before execut…
Browse files Browse the repository at this point in the history
…ing it (#2436)
  • Loading branch information
agavra authored Mar 28, 2019
1 parent 9ad3764 commit a1c4bf4
Show file tree
Hide file tree
Showing 34 changed files with 1,986 additions and 296 deletions.
53 changes: 40 additions & 13 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.confluent.ksql.schema.inference.SchemaRegistryTopicSchemaSupplier;
import io.confluent.ksql.services.DefaultServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.topic.DefaultTopicInjector;
import io.confluent.ksql.topic.TopicInjector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
Expand All @@ -41,6 +43,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,7 +54,8 @@ public class KsqlContext {
private final ServiceContext serviceContext;
private final KsqlConfig ksqlConfig;
private final KsqlEngine ksqlEngine;
private final SchemaInjector schemaInjector;
private final Function<ServiceContext, SchemaInjector> schemaInjectorFactory;
private final Function<KsqlExecutionContext, TopicInjector> topicInjectorFactory;

/**
* Create a KSQL context object with the given properties. A KSQL context has it's own metastore
Expand All @@ -72,23 +76,30 @@ public static KsqlContext create(
functionRegistry,
serviceId);

final DefaultSchemaInjector schemaInjector = new DefaultSchemaInjector(
new SchemaRegistryTopicSchemaSupplier(serviceContext.getSchemaRegistryClient()));

return new KsqlContext(serviceContext, ksqlConfig, engine, schemaInjector);
return new KsqlContext(
serviceContext,
ksqlConfig,
engine,
sc -> new DefaultSchemaInjector(
new SchemaRegistryTopicSchemaSupplier(sc.getSchemaRegistryClient())),
DefaultTopicInjector::new);
}

@VisibleForTesting
KsqlContext(
final ServiceContext serviceContext,
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final SchemaInjector schemaInjector
final Function<ServiceContext, SchemaInjector> schemaInjectorFactory,
final Function<KsqlExecutionContext, TopicInjector> topicInjectorFactory
) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.schemaInjector = Objects.requireNonNull(schemaInjector, "schemaInjector");
this.schemaInjectorFactory = Objects
.requireNonNull(schemaInjectorFactory, "schemaInjectorFactory");
this.topicInjectorFactory = Objects
.requireNonNull(topicInjectorFactory, "topicInjectorFactory");
}

public ServiceContext getServiceContext() {
Expand All @@ -110,12 +121,25 @@ public List<QueryMetadata> sql(final String sql, final Map<String, Object> overr
final List<ParsedStatement> statements = ksqlEngine.parse(sql);

final KsqlExecutionContext sandbox = ksqlEngine.createSandbox();
final SchemaInjector sandboxSchemaInjector = schemaInjectorFactory
.apply(sandbox.getServiceContext());
final TopicInjector sandboxTopicInjector = topicInjectorFactory.apply(sandbox);

for (ParsedStatement stmt : statements) {
execute(
sandbox,
stmt,
ksqlConfig,
overriddenProperties,
sandboxSchemaInjector,
sandboxTopicInjector);
}

statements.forEach(stmt -> execute(sandbox, stmt, ksqlConfig, overriddenProperties));

final SchemaInjector schemaInjector = schemaInjectorFactory.apply(serviceContext);
final TopicInjector topicInjector = topicInjectorFactory.apply(ksqlEngine);
final List<QueryMetadata> queries = new ArrayList<>();
for (final ParsedStatement parsed : statements) {
execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties)
execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties, schemaInjector, topicInjector)
.getQuery()
.ifPresent(queries::add);
}
Expand Down Expand Up @@ -157,10 +181,13 @@ private ExecuteResult execute(
final KsqlExecutionContext executionContext,
final ParsedStatement stmt,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
) {
final Map<String, Object> overriddenProperties,
final SchemaInjector schemaInjector,
final TopicInjector topicInjector) {
final PreparedStatement<?> prepared = executionContext.prepare(stmt);
final PreparedStatement<?> withSchema = schemaInjector.forStatement(prepared);
return executionContext.execute(withSchema, ksqlConfig, overriddenProperties);
final PreparedStatement<?> withInferredTopic =
topicInjector.forStatement(withSchema, ksqlConfig, overriddenProperties);
return executionContext.execute(withInferredTopic, ksqlConfig, overriddenProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
Expand All @@ -46,6 +47,11 @@ public interface KsqlExecutionContext {
*/
MetaStore getMetaStore();

/**
* @return the service context used for this execution context
*/
ServiceContext getServiceContext();

/**
* Retrieve the details of a persistent query.
*
Expand Down
37 changes: 13 additions & 24 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.StringUtil;
import io.confluent.ksql.util.WithClauseUtil;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -136,34 +137,22 @@ private void setIntoProperties(final Sink sink) {
}

if (sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS) != null) {
try {
final int numberOfPartitions = Integer.parseInt(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS).toString()
);
analysis.getIntoProperties().put(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
numberOfPartitions
);
final int numberOfPartitions =
WithClauseUtil.parsePartitions(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS));

} catch (final NumberFormatException e) {
throw new KsqlException(
"Invalid number of partitions in WITH clause: "
+ sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_PARTITIONS).toString());
}
analysis.getIntoProperties().put(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
numberOfPartitions
);
}

if (sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS) != null) {
try {
final short numberOfReplications =
Short.parseShort(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString()
);
analysis.getIntoProperties()
.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications);
} catch (final NumberFormatException e) {
throw new KsqlException("Invalid number of replications in WITH clause: " + sink
.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS).toString());
}
final short numberOfReplications =
WithClauseUtil.parseReplicas(
sink.getProperties().get(KsqlConstants.SINK_NUMBER_OF_REPLICAS));
analysis.getIntoProperties()
.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, numberOfReplications);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public MetaStore getMetaStore() {
return primaryContext.getMetaStore();
}

@Override
public ServiceContext getServiceContext() {
return serviceContext;
}

public DdlCommandExec getDdlCommandExec() {
return primaryContext.getDdlCommandExec();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.List;
Expand All @@ -44,6 +45,11 @@ public MetaStore getMetaStore() {
return engineContext.getMetaStore();
}

@Override
public ServiceContext getServiceContext() {
return engineContext.getServiceContext();
}

@Override
public KsqlExecutionContext createSandbox() {
return new SandboxedExecutionContext(engineContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.structured.QueryContext;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.topic.TopicProperties;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryIdGenerator;
Expand All @@ -43,6 +44,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -142,19 +144,22 @@ public SchemaKStream<?> buildStream(

final KsqlStructuredDataOutputNode noRowKey = outputNodeBuilder.build();
if (doCreateInto) {
final SourceTopicProperties sourceTopicProperties = getSourceTopicProperties(
getTheSourceNode().getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
outputProperties,
serviceContext.getTopicClient(),
ksqlConfig
);
final Supplier<TopicDescription> sourceTopicDescription = () ->
getSourceTopicPropertiesFromKafka(
getTheSourceNode().getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
serviceContext.getTopicClient());

createSinkTopic(
noRowKey.getKafkaTopicName(),
serviceContext.getTopicClient(),
shouldBeCompacted(result),
sourceTopicProperties.partitions,
sourceTopicProperties.replicas);
new TopicProperties.Builder()
.withName(noRowKey.getKafkaTopicName())
.withOverrides(outputProperties)
.withKsqlConfig(ksqlConfig)
.withSource(sourceTopicDescription)
.build());
}

result.into(
noRowKey.getKafkaTopicName(),
noRowKey.getKsqlTopic().getKsqlTopicSerDe()
Expand Down Expand Up @@ -236,69 +241,22 @@ private void addAvroSchemaToResultTopic(final Builder builder) {
}

private static void createSinkTopic(
final String kafkaTopicName,
final KafkaTopicClient kafkaTopicClient,
final boolean isCompacted,
final int numberOfPartitions,
final short numberOfReplications
final TopicProperties topicProperties
) {
final Map<String, ?> config = isCompacted
? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
: Collections.emptyMap();

kafkaTopicClient.createTopic(kafkaTopicName,
numberOfPartitions,
numberOfReplications,
kafkaTopicClient.createTopic(
topicProperties.getTopicName(),
topicProperties.getPartitions(),
topicProperties.getReplicas(),
config
);
}

private static SourceTopicProperties getSourceTopicProperties(
final String kafkaTopicName,
final Map<String, Object> sinkProperties,
final KafkaTopicClient kafkaTopicClient,
final KsqlConfig ksqlConfig
) {
final Map ksqlProperties = ksqlConfig.values();
if (ksqlProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY) != null
|| ksqlProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY) != null) {
return getSinkTopicPropertiesLegacyWay(sinkProperties, ksqlConfig);
}
// Don't request topic properties from Kafka if both are set in WITH clause.
if (sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY) != null
&& sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY) != null) {
return new SourceTopicProperties(
(Integer) sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY),
(Short) sinkProperties.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)
);
}
final TopicDescription topicDescription = getSourceTopicPropertiesFromKafka(
kafkaTopicName,
kafkaTopicClient);

final int partitions = (Integer) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
topicDescription.partitions().size());
final short replicas = (Short) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY,
(short) topicDescription.partitions().get(0).replicas().size());

return new SourceTopicProperties(partitions, replicas);
}

private static SourceTopicProperties getSinkTopicPropertiesLegacyWay(
final Map<String, Object> sinkProperties,
final KsqlConfig ksqlConfig
) {
final int partitions = (Integer) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
ksqlConfig.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY));
final short replicas = (Short) sinkProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY,
ksqlConfig.getShort(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY));
return new SourceTopicProperties(partitions, replicas);
}

private static TopicDescription getSourceTopicPropertiesFromKafka(
final String kafkaTopicName,
final KafkaTopicClient kafkaTopicClient
Expand All @@ -318,17 +276,6 @@ private KsqlTopicSerDe getTopicSerde() {
return ksqlTopic.getKsqlTopicSerDe();
}

private static class SourceTopicProperties {

private final int partitions;
private final short replicas;

SourceTopicProperties(final int partitions, final short replicas) {
this.partitions = partitions;
this.replicas = replicas;
}
}

public static class Builder {

private PlanNodeId id;
Expand Down
Loading

0 comments on commit a1c4bf4

Please sign in to comment.