Skip to content

Commit

Permalink
feat: expose execution plans from the ksql engine API
Browse files Browse the repository at this point in the history
This patch adds interfaces to build and execute plans in the KSQL engine
API, and changes StatementExecutor to use these interfaces to run statements.
It also exposes a method executeQuery that the query endpoint can use to build
transient queries.
  • Loading branch information
rodesai committed Nov 19, 2019
1 parent cef46e5 commit 3a7f98a
Show file tree
Hide file tree
Showing 19 changed files with 362 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
package io.confluent.ksql;

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -88,6 +92,24 @@ public interface KsqlExecutionContext {
*/
PreparedStatement<?> prepare(ParsedStatement stmt);

/**
* Executes a query using the supplied service context.
*/
TransientQueryMetadata executeQuery(
ServiceContext serviceContext,
ConfiguredStatement<Query> statement
);

/**
* Computes a plan for executing a DDL/DML statement using the supplied service context.
*/
KsqlPlan plan(ServiceContext serviceContext, ConfiguredStatement<?> statement);

/**
* Executes a KSQL plan using the supplied service context.
*/
ExecuteResult execute(ServiceContext serviceContext, ConfiguredKsqlPlan plan);

/**
* Execute the supplied statement, updating the meta store and registering any query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
Expand Down Expand Up @@ -197,10 +198,11 @@ private static ExecuteResult execute(
final CustomExecutor executor =
CustomExecutors.EXECUTOR_MAP.getOrDefault(
configured.getStatement().getClass(),
(serviceContext, s, props) -> executionContext.execute(serviceContext, s));
(passedExecutionContext, s, props) -> passedExecutionContext.execute(
passedExecutionContext.getServiceContext(), s));

return executor.apply(
executionContext.getServiceContext(),
executionContext,
configured,
mutableSessionPropertyOverrides
);
Expand All @@ -209,7 +211,7 @@ private static ExecuteResult execute(
@FunctionalInterface
private interface CustomExecutor {
ExecuteResult apply(
ServiceContext serviceContext,
KsqlExecutionContext executionContext,
ConfiguredStatement<?> statement,
Map<String, Object> mutableSessionPropertyOverrides
);
Expand All @@ -218,13 +220,17 @@ ExecuteResult apply(
@SuppressWarnings("unchecked")
private enum CustomExecutors {

SET_PROPERTY(SetProperty.class, (serviceContext, stmt, props) -> {
SET_PROPERTY(SetProperty.class, (executionContext, stmt, props) -> {
PropertyOverrider.set((ConfiguredStatement<SetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
}),
UNSET_PROPERTY(UnsetProperty.class, (serviceContext, stmt, props) -> {
UNSET_PROPERTY(UnsetProperty.class, (executionContext, stmt, props) -> {
PropertyOverrider.unset((ConfiguredStatement<UnsetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
}),
QUERY(Query.class, (executionContext, stmt, props) -> {
return ExecuteResult.of(
executionContext.executeQuery(executionContext.getServiceContext(), stmt.cast()));
})
;

Expand Down Expand Up @@ -256,11 +262,12 @@ private CustomExecutor getExecutor() {
}

public ExecuteResult execute(
final ServiceContext serviceContext,
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> statement,
final Map<String, Object> mutableSessionPropertyOverrides
) {
return executor.apply(serviceContext, statement, mutableSessionPropertyOverrides);
return executor.apply(
executionContext, statement, mutableSessionPropertyOverrides);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,8 @@ static EngineExecutor create(
return new EngineExecutor(engineContext, serviceContext, ksqlConfig, overriddenProperties);
}

ExecuteResult execute(final ConfiguredStatement<?> statement) {
if (statement.getStatement() instanceof Query) {
return ExecuteResult.of(executeQuery(statement.cast()));
}
return execute(plan(statement));
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
private ExecuteResult execute(final KsqlPlan plan) {
ExecuteResult execute(final KsqlPlan plan) {
final Optional<String> ddlResult = plan.getDdlCommand()
.map(ddl -> executeDdl(ddl, plan.getStatementText()));

Expand All @@ -119,7 +112,7 @@ private ExecuteResult execute(final KsqlPlan plan) {
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
private TransientQueryMetadata executeQuery(final ConfiguredStatement<Query> statement) {
TransientQueryMetadata executeQuery(final ConfiguredStatement<Query> statement) {
final ExecutorPlans plans = planQuery(statement, statement.getStatement(), Optional.empty());
final OutputNode outputNode = plans.logicalPlan.getNode().get();
final QueryExecutor executor = engineContext.createQueryExecutor(
Expand All @@ -139,7 +132,7 @@ private TransientQueryMetadata executeQuery(final ConfiguredStatement<Query> sta
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
private KsqlPlan plan(final ConfiguredStatement<?> statement) {
KsqlPlan plan(final ConfiguredStatement<?> statement) {
try {
throwOnNonExecutableStatement(statement);

Expand Down
44 changes: 38 additions & 6 deletions ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -160,19 +162,49 @@ public PreparedStatement<?> prepare(final ParsedStatement stmt) {
}

@Override
public ExecuteResult execute(
public KsqlPlan plan(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
final ExecuteResult result = EngineExecutor
final ConfiguredStatement<?> statement) {
return EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.execute(statement);
.plan(statement);
}

@Override
public ExecuteResult execute(final ServiceContext serviceContext, final ConfiguredKsqlPlan plan) {
final ExecuteResult result = EngineExecutor
.create(primaryContext, serviceContext, plan.getConfig(), plan.getOverrides())
.execute(plan.getPlan());
result.getQuery().ifPresent(this::registerQuery);

return result;
}

@Override
public ExecuteResult execute(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
return execute(
serviceContext,
ConfiguredKsqlPlan.of(
plan(serviceContext, statement),
statement.getOverrides(),
statement.getConfig()
)
);
}

@Override
public TransientQueryMetadata executeQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement) {
final TransientQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.executeQuery(statement);
registerQuery(query);
return query;
}

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.Sandbox;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -79,14 +82,57 @@ public PreparedStatement<?> prepare(final ParsedStatement stmt) {
return engineContext.prepare(stmt);
}

@Override
public KsqlPlan plan(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
return EngineExecutor.create(
engineContext,
serviceContext,
statement.getConfig(),
statement.getOverrides()
).plan(statement);
}

@Override
public ExecuteResult execute(
final ServiceContext serviceContext,
final ConfiguredKsqlPlan ksqlPlan
) {
return EngineExecutor.create(
engineContext,
serviceContext,
ksqlPlan.getConfig(),
ksqlPlan.getOverrides()
).execute(ksqlPlan.getPlan());
}

@Override
public ExecuteResult execute(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
final EngineExecutor executor = EngineExecutor
.create(engineContext, serviceContext, statement.getConfig(), statement.getOverrides());
return execute(
serviceContext,
ConfiguredKsqlPlan.of(
plan(serviceContext, statement),
statement.getOverrides(),
statement.getConfig()
)
);
}

return executor.execute(statement);
@Override
public TransientQueryMetadata executeQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement
) {
return EngineExecutor.create(
engineContext,
serviceContext,
statement.getConfig(),
statement.getOverrides()
).executeQuery(statement);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.planner.plan;

import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;

public final class ConfiguredKsqlPlan {
private final KsqlPlan plan;
private final Map<String, Object> overrides;
private final KsqlConfig config;

public static ConfiguredKsqlPlan of(
final KsqlPlan plan,
final Map<String, Object> overrides,
final KsqlConfig config
) {
return new ConfiguredKsqlPlan(plan, overrides, config);
}

private ConfiguredKsqlPlan(
final KsqlPlan plan,
final Map<String, Object> overrides,
final KsqlConfig config
) {
this.plan = Objects.requireNonNull(plan, "plan");
this.overrides = Objects.requireNonNull(overrides, "overrides");
this.config = Objects.requireNonNull(config, "config");
}

public KsqlPlan getPlan() {
return plan;
}

public Map<String, Object> getOverrides() {
return overrides;
}

public KsqlConfig getConfig() {
return config;
}
}
Loading

0 comments on commit 3a7f98a

Please sign in to comment.