Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose execution plans from the ksql engine API #3482

Merged
merged 2 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
package io.confluent.ksql;

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -88,6 +92,24 @@ public interface KsqlExecutionContext {
*/
PreparedStatement<?> prepare(ParsedStatement stmt);

/**
* Executes a query using the supplied service context.
*/
TransientQueryMetadata executeQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
TransientQueryMetadata executeQuery(
TransientQueryMetadata executeTransientQuery(

Since this is explicitly returning a TransientQueryMetadata object, it'd be nice to have the function name indicate that. The function naming would be fine if the return type is changed to QueryMetadata.

ServiceContext serviceContext,
ConfiguredStatement<Query> statement
);

/**
* Computes a plan for executing a DDL/DML statement using the supplied service context.
*/
KsqlPlan plan(ServiceContext serviceContext, ConfiguredStatement<?> statement);

/**
* Executes a KSQL plan using the supplied service context.
*/
ExecuteResult execute(ServiceContext serviceContext, ConfiguredKsqlPlan plan);

/**
* Execute the supplied statement, updating the meta store and registering any query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
Expand Down Expand Up @@ -197,10 +198,11 @@ private static ExecuteResult execute(
final CustomExecutor executor =
CustomExecutors.EXECUTOR_MAP.getOrDefault(
configured.getStatement().getClass(),
(serviceContext, s, props) -> executionContext.execute(serviceContext, s));
(passedExecutionContext, s, props) -> passedExecutionContext.execute(
passedExecutionContext.getServiceContext(), s));

return executor.apply(
executionContext.getServiceContext(),
executionContext,
configured,
mutableSessionPropertyOverrides
);
Expand All @@ -209,7 +211,7 @@ private static ExecuteResult execute(
@FunctionalInterface
private interface CustomExecutor {
ExecuteResult apply(
ServiceContext serviceContext,
KsqlExecutionContext executionContext,
ConfiguredStatement<?> statement,
Map<String, Object> mutableSessionPropertyOverrides
);
Expand All @@ -218,13 +220,17 @@ ExecuteResult apply(
@SuppressWarnings("unchecked")
private enum CustomExecutors {

SET_PROPERTY(SetProperty.class, (serviceContext, stmt, props) -> {
SET_PROPERTY(SetProperty.class, (executionContext, stmt, props) -> {
PropertyOverrider.set((ConfiguredStatement<SetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
}),
UNSET_PROPERTY(UnsetProperty.class, (serviceContext, stmt, props) -> {
UNSET_PROPERTY(UnsetProperty.class, (executionContext, stmt, props) -> {
PropertyOverrider.unset((ConfiguredStatement<UnsetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
}),
QUERY(Query.class, (executionContext, stmt, props) -> {
return ExecuteResult.of(
executionContext.executeQuery(executionContext.getServiceContext(), stmt.cast()));
})
;

Expand Down Expand Up @@ -256,11 +262,12 @@ private CustomExecutor getExecutor() {
}

public ExecuteResult execute(
final ServiceContext serviceContext,
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> statement,
final Map<String, Object> mutableSessionPropertyOverrides
) {
return executor.apply(serviceContext, statement, mutableSessionPropertyOverrides);
return executor.apply(
executionContext, statement, mutableSessionPropertyOverrides);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,8 @@ static EngineExecutor create(
return new EngineExecutor(engineContext, serviceContext, ksqlConfig, overriddenProperties);
}

ExecuteResult execute(final ConfiguredStatement<?> statement) {
if (statement.getStatement() instanceof Query) {
return ExecuteResult.of(executeQuery(statement.cast()));
}
return execute(plan(statement));
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
private ExecuteResult execute(final KsqlPlan plan) {
ExecuteResult execute(final KsqlPlan plan) {
final Optional<String> ddlResult = plan.getDdlCommand()
.map(ddl -> executeDdl(ddl, plan.getStatementText()));

Expand All @@ -119,7 +112,7 @@ private ExecuteResult execute(final KsqlPlan plan) {
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
private TransientQueryMetadata executeQuery(final ConfiguredStatement<Query> statement) {
TransientQueryMetadata executeQuery(final ConfiguredStatement<Query> statement) {
final ExecutorPlans plans = planQuery(statement, statement.getStatement(), Optional.empty());
final OutputNode outputNode = plans.logicalPlan.getNode().get();
final QueryExecutor executor = engineContext.createQueryExecutor(
Expand All @@ -139,7 +132,7 @@ private TransientQueryMetadata executeQuery(final ConfiguredStatement<Query> sta
}

@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
private KsqlPlan plan(final ConfiguredStatement<?> statement) {
KsqlPlan plan(final ConfiguredStatement<?> statement) {
try {
throwOnNonExecutableStatement(statement);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -160,19 +162,51 @@ public PreparedStatement<?> prepare(final ParsedStatement stmt) {
}

@Override
public ExecuteResult execute(
public KsqlPlan plan(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
final ExecuteResult result = EngineExecutor
return EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.execute(statement);
.plan(statement);
}

@Override
public ExecuteResult execute(final ServiceContext serviceContext, final ConfiguredKsqlPlan plan) {
final ExecuteResult result = EngineExecutor
.create(primaryContext, serviceContext, plan.getConfig(), plan.getOverrides())
.execute(plan.getPlan());
result.getQuery().ifPresent(this::registerQuery);

return result;
}

@Override
public ExecuteResult execute(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
return execute(
serviceContext,
ConfiguredKsqlPlan.of(
plan(serviceContext, statement),
statement.getOverrides(),
statement.getConfig()
)
);
}

@Override
public TransientQueryMetadata executeQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement
) {
final TransientQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.executeQuery(statement);
registerQuery(query);
return query;
}

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.Sandbox;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -79,14 +82,57 @@ public PreparedStatement<?> prepare(final ParsedStatement stmt) {
return engineContext.prepare(stmt);
}

@Override
public KsqlPlan plan(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
return EngineExecutor.create(
engineContext,
serviceContext,
statement.getConfig(),
statement.getOverrides()
).plan(statement);
}

@Override
public ExecuteResult execute(
final ServiceContext serviceContext,
final ConfiguredKsqlPlan ksqlPlan
) {
return EngineExecutor.create(
engineContext,
serviceContext,
ksqlPlan.getConfig(),
ksqlPlan.getOverrides()
).execute(ksqlPlan.getPlan());
}

@Override
public ExecuteResult execute(
final ServiceContext serviceContext,
final ConfiguredStatement<?> statement
) {
final EngineExecutor executor = EngineExecutor
.create(engineContext, serviceContext, statement.getConfig(), statement.getOverrides());
return execute(
serviceContext,
ConfiguredKsqlPlan.of(
plan(serviceContext, statement),
statement.getOverrides(),
statement.getConfig()
)
);
}

return executor.execute(statement);
@Override
public TransientQueryMetadata executeQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement
) {
return EngineExecutor.create(
engineContext,
serviceContext,
statement.getConfig(),
statement.getOverrides()
).executeQuery(statement);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.planner.plan;

import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;

public final class ConfiguredKsqlPlan {
private final KsqlPlan plan;
private final Map<String, Object> overrides;
private final KsqlConfig config;

public static ConfiguredKsqlPlan of(
final KsqlPlan plan,
final Map<String, Object> overrides,
final KsqlConfig config
) {
return new ConfiguredKsqlPlan(plan, overrides, config);
}

private ConfiguredKsqlPlan(
final KsqlPlan plan,
final Map<String, Object> overrides,
final KsqlConfig config
) {
this.plan = Objects.requireNonNull(plan, "plan");
this.overrides = Objects.requireNonNull(overrides, "overrides");
this.config = Objects.requireNonNull(config, "config");
}

public KsqlPlan getPlan() {
return plan;
}

public Map<String, Object> getOverrides() {
return overrides;
}

public KsqlConfig getConfig() {
return config;
}
}
Loading