Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: switch TERMINATE ALL to distribute N TERMINATE queryId stmts #3971

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.parser.tree.TerminateAllQueries;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.parser.tree.WindowExpression;
Expand Down Expand Up @@ -634,10 +635,9 @@ public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext contex
final Optional<NodeLocation> location = getLocation(context);

return context.ALL() != null
? TerminateQuery.all(location)
: TerminateQuery.query(
? new TerminateAllQueries(location)
: new TerminateQuery(
location,
// use case sensitive parsing here to maintain backwards compatibility
new QueryId(ParserUtil.getIdentifierText(true, context.identifier()))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TerminateAllQueries;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.util.IdentifierUtil;
import java.util.List;
Expand Down Expand Up @@ -337,7 +337,13 @@ protected Void visitDropTable(final DropTable node, final Integer context) {
@Override
protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) {
builder.append("TERMINATE ");
builder.append(node.getQueryId().map(QueryId::toString).orElse("ALL"));
builder.append(node.getQueryId());
return null;
}

@Override
protected Void visitTerminateAllQueries(final TerminateAllQueries node, final Integer context) {
builder.append("TERMINATE ALL");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ protected R visitTerminateQuery(final TerminateQuery node, final C context) {
return visitStatement(node, context);
}

protected R visitTerminateAllQueries(final TerminateAllQueries node, final C context) {
return visitStatement(node, context);
}

protected R visitListStreams(final ListStreams node, final C context) {
return visitStatement(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.tree;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.NodeLocation;
import java.util.Objects;
import java.util.Optional;

@Immutable
public final class TerminateAllQueries extends Statement {

public TerminateAllQueries(final Optional<NodeLocation> location) {
super(location);
}

@Override
public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
return visitor.visitTerminateAllQueries(this, context);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}

return o != null && getClass() == o.getClass();
}

@Override
public int hashCode() {
return Objects.hash(getClass());
}

@Override
public String toString() {
return "TerminateAllQueries{}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,14 @@
@Immutable
public final class TerminateQuery extends Statement {

private final Optional<QueryId> queryId;
private final QueryId queryId;

public static TerminateQuery all(final Optional<NodeLocation> location) {
return new TerminateQuery(location, Optional.empty());
}

public static TerminateQuery query(final Optional<NodeLocation> location, final QueryId queryId) {
return new TerminateQuery(location, Optional.of(queryId));
}

private TerminateQuery(final Optional<NodeLocation> location, final Optional<QueryId> queryId) {
public TerminateQuery(final Optional<NodeLocation> location, QueryId queryId) {
super(location);
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

/**
* @return the id of the query to terminate or {@code empty()} if all should be terminated.
*/
public Optional<QueryId> getQueryId() {
public QueryId getQueryId() {
return queryId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
Expand Down Expand Up @@ -475,10 +474,9 @@ public void shouldAllowEscapedTerminateQuery() {
.buildSingleAst("TERMINATE `CSAS-foo_2`;", metaStore);

// Then:
assertThat(statement.getStatement().getQueryId().map(QueryId::toString), is(Optional.of("CSAS-foo_2")));
assertThat(statement.getStatement().getQueryId().getId(), is("CSAS-foo_2"));
}


@Test
public void testSelectAllJoin() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.parser.tree.TerminateAllQueries;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
Expand Down Expand Up @@ -585,7 +586,7 @@ public void shouldFormatDropTableStatement() {
@Test
public void shouldFormatTerminateQuery() {
// Given:
final TerminateQuery terminateQuery = TerminateQuery.query(Optional.empty(), new QueryId("FOO"));
final TerminateQuery terminateQuery = new TerminateQuery(Optional.empty(), new QueryId("FOO"));

// When:
final String formatted = SqlFormatter.formatSql(terminateQuery);
Expand All @@ -597,10 +598,10 @@ public void shouldFormatTerminateQuery() {
@Test
public void shouldFormatTerminateAllQueries() {
// Given:
final TerminateQuery terminateQuery = TerminateQuery.all(Optional.empty());
final TerminateAllQueries terminateAll = new TerminateAllQueries(Optional.empty());

// When:
final String formatted = SqlFormatter.formatSql(terminateQuery);
final String formatted = SqlFormatter.formatSql(terminateAll);

// Then:
assertThat(formatted, is("TERMINATE ALL"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public void shouldImplementHashCodeAndEqualsProperty() {
new EqualsTester()
.addEqualityGroup(
// Note: At the moment location does not take part in equality testing
TerminateQuery.query(Optional.of(SOME_LOCATION), SOME_QUERY_ID),
TerminateQuery.query(Optional.of(OTHER_LOCATION), SOME_QUERY_ID)
new TerminateQuery(Optional.of(SOME_LOCATION), SOME_QUERY_ID),
new TerminateQuery(Optional.of(OTHER_LOCATION), SOME_QUERY_ID)
)
.addEqualityGroup(
TerminateQuery.query(Optional.empty(), new QueryId("diff"))
new TerminateQuery(Optional.empty(), new QueryId("diff"))
)
.testEquals();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandId.Action;
import io.confluent.ksql.rest.entity.CommandId.Type;
Expand Down Expand Up @@ -110,7 +109,7 @@ private static CommandId getDropTypeCommandId(final DropType dropType) {
private static CommandId getTerminateCommandId(final TerminateQuery terminateQuery) {
return new CommandId(
CommandId.Type.TERMINATE,
terminateQuery.getQueryId().map(QueryId::toString).orElse("ALL"),
terminateQuery.getQueryId().getId(),
CommandId.Action.EXECUTE
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

package io.confluent.ksql.rest.server.computation;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.server.execution.StatementExecutor;
import io.confluent.ksql.rest.server.validation.RequestValidator;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
Expand All @@ -31,7 +32,7 @@
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.KsqlServerException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -47,7 +48,7 @@
* duration for the command to be executed remotely if configured with a
* {@code distributedCmdResponseTimeout}.
*/
public class DistributingExecutor {
public class DistributingExecutor implements StatementExecutor<Statement> {

private final CommandQueue commandQueue;
private final Duration distributedCmdResponseTimeout;
Expand Down Expand Up @@ -82,9 +83,8 @@ public DistributingExecutor(
* If a new transactional producer is initialized while the current transaction is incomplete,
* the old producer will be fenced off and unable to continue with its transaction.
*/
public Optional<KsqlEntity> execute(
public List<? extends KsqlEntity> execute(
agavra marked this conversation as resolved.
Show resolved Hide resolved
final ConfiguredStatement<Statement> statement,
final ParsedStatement parsedStatement,
final Map<String, Object> mutableScopedProperties,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
Expand All @@ -101,15 +101,15 @@ public Optional<KsqlEntity> execute(
transactionalProducer.initTransactions();
transactionalProducer.beginTransaction();
commandQueue.waitForCommandConsumer();

// Don't perform validation on Terminate Cluster statements
if (!parsedStatement.getStatementText()
if (!injected.getStatementText()
.equals(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT)) {
requestValidator.validate(
SandboxedServiceContext.create(serviceContext),
Collections.singletonList(parsedStatement),
injected,
mutableScopedProperties,
parsedStatement.getStatementText()
injected.getStatementText()
);
}

Expand All @@ -120,7 +120,7 @@ public Optional<KsqlEntity> execute(
final CommandStatus commandStatus = queuedCommandStatus
.tryWaitForFinalStatus(distributedCmdResponseTimeout);

return Optional.of(new CommandStatusEntity(
return ImmutableList.of(new CommandStatusEntity(
injected.getStatementText(),
queuedCommandStatus.getCommandId(),
commandStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,9 @@ private KsqlConfig buildMergedConfig(final Command command) {
}

private void terminateQuery(final PreparedStatement<TerminateQuery> terminateQuery) {
final Optional<QueryId> queryId = terminateQuery.getStatement().getQueryId();
final QueryId queryId = terminateQuery.getStatement().getQueryId();

if (!queryId.isPresent()) {
ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::close);
return;
}

ksqlEngine.getPersistentQuery(queryId.get())
ksqlEngine.getPersistentQuery(queryId)
.orElseThrow(() ->
new KsqlException(String.format("No running query with id %s was found", queryId)))
.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.server.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.connect.supported.Connectors;
Expand All @@ -26,15 +27,15 @@
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

public final class ConnectExecutor {

private ConnectExecutor() { }

public static Optional<KsqlEntity> execute(
public static List<? extends KsqlEntity> execute(
final ConfiguredStatement<CreateConnector> statement,
final Map<String, ?> sessionProperties,
final KsqlExecutionContext executionContext,
Expand All @@ -51,7 +52,7 @@ public static Optional<KsqlEntity> execute(
l -> l != null ? l.getValue().toString() : null)));

if (response.datum().isPresent()) {
return Optional.of(
return ImmutableList.of(
new CreateConnectorEntity(
statement.getStatementText(),
response.datum().get()
Expand All @@ -60,6 +61,7 @@ public static Optional<KsqlEntity> execute(
}

return response.error()
.map(err -> new ErrorEntity(statement.getStatementText(), err));
.map(err -> ImmutableList.of(new ErrorEntity(statement.getStatementText(), err)))
.orElse(ImmutableList.of());
}
}
Loading