Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support to terminate all running queries #3944

Merged
merged 4 commits into from
Nov 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ statement
| PRINT (identifier| STRING) printClause #printTopic
| (LIST | SHOW) QUERIES EXTENDED? #listQueries
| TERMINATE QUERY? identifier #terminateQuery
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
| TERMINATE ALL #terminateQuery
Copy link
Contributor

@agavra agavra Nov 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about changing the semantics of TERMINATE <foo> to use regex instead? That seems much more powerful and we don't need to introduce any new reserved words. Should also be backwards compatible I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of feel like that could be a slippery slope. e.g. do we introduce regex semantics elsewhere in the grammar? How do users know where they can use regexes and where they can't?

In terms of selectively performing an operation on a number of objects based on a condition, other databases often use catalogs. e.g.,

SELECT terminate(query_id) FROM query_catalog WHERE <condition>

Which I think is a great pattern but obviously not possible for us.

Personally my vote here would be to completely do away with TERMINATE and just make DROP drop the object and its underlying query. It doesn't seem like TERMINATE is useful for users, because as far as I know a terminated query can't be restarted anyways. So it seems to serve as a workaround for the problem of not being able to DROP CASCADE a stream or table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slippery slope is fair... that's why I'd like to see these things discussed before adding it 😂

In terms of selectively performing an operation on a number of objects based on a condition, other databases often use catalogs. e.g.,

SELECT terminate(query_id) FROM query_catalog WHERE <condition>

Which I think is a great pattern but obviously not possible for us.

(off topic) The wheels in my mind are turning 😈 I think we should expose a query_catalog as a KSQL table! It's basically a materialization on the command topic. I'm about to create a ticket for this because I think we need to change the command topic to have table semantics anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proper catalogs would be amazing and would open up all kinds of other operational possibilities. Catalogs also really empower tools, automation etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unresolving so that this thread is more visible when people refer back to it.

| SET STRING EQ STRING #setProperty
| UNSET STRING #unsetProperty
| CREATE STREAM (IF NOT EXISTS)? sourceName
Expand Down Expand Up @@ -339,6 +340,7 @@ CHANGES: 'CHANGES';
SELECT: 'SELECT';
FROM: 'FROM';
AS: 'AS';
ALL: 'ALL';
DISTINCT: 'DISTINCT';
WHERE: 'WHERE';
WITHIN: 'WITHIN';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.Operator;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.SqlTypeParser;
Expand Down Expand Up @@ -157,23 +158,21 @@ public AstBuilder(final TypeRegistry typeRegistry) {
this.typeRegistry = requireNonNull(typeRegistry, "typeRegistry");
}

@SuppressWarnings("unchecked")
public Statement buildStatement(final ParserRuleContext parseTree) {
return build(Optional.of(getSources(parseTree)), typeRegistry, parseTree);
return build(Optional.of(getSources(parseTree)), parseTree);
}

public Expression buildExpression(final ParserRuleContext parseTree) {
return build(Optional.empty(), typeRegistry, parseTree);
return build(Optional.empty(), parseTree);
}

public WindowExpression buildWindowExpression(final ParserRuleContext parseTree) {
return build(Optional.empty(), typeRegistry, parseTree);
return build(Optional.empty(), parseTree);
}

@SuppressWarnings("unchecked")
private <T extends Node> T build(
final Optional<Set<SourceName>> sources,
final TypeRegistry typeRegistry,
final ParserRuleContext parseTree) {
return (T) new Visitor(sources, typeRegistry).visit(parseTree);
}
Expand Down Expand Up @@ -632,11 +631,15 @@ public Node visitListTypes(final ListTypesContext ctx) {

@Override
public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) {
return new TerminateQuery(
getLocation(context),
// use case sensitive parsing here to maintain backwards compatibility
ParserUtil.getIdentifierText(true, context.identifier())
);
final Optional<NodeLocation> location = getLocation(context);

return context.ALL() != null
? TerminateQuery.all(location)
: TerminateQuery.query(
location,
// use case sensitive parsing here to maintain backwards compatibility
new QueryId(ParserUtil.getIdentifierText(true, context.identifier()))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,18 @@
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.util.IdentifierUtil;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public final class SqlFormatter {

private static final String INDENT = " ";
private static final Pattern NAME_PATTERN = Pattern.compile("[a-z_][a-z0-9_]*");
private static final FormatOptions FORMAT_OPTIONS = FormatOptions.of(IdentifierUtil::needsQuotes);

private SqlFormatter() {
Expand Down Expand Up @@ -287,13 +286,6 @@ protected Void visitCreateTableAsSelect(
return null;
}

private static String formatName(final String name) {
if (NAME_PATTERN.matcher(name).matches()) {
return name;
}
return "\"" + name + "\"";
}

@Override
protected Void visitInsertInto(final InsertInto node, final Integer indent) {
builder.append("INSERT INTO ");
Expand Down Expand Up @@ -329,7 +321,7 @@ protected Void visitInsertValues(final InsertValues node, final Integer context)
builder.append(
node.getValues()
.stream()
.map(exp -> ExpressionFormatterUtil.formatExpression(exp))
.map(ExpressionFormatterUtil::formatExpression)
.collect(Collectors.joining(", ")));
builder.append(")");

Expand All @@ -345,7 +337,7 @@ protected Void visitDropTable(final DropTable node, final Integer context) {
@Override
protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) {
builder.append("TERMINATE ");
builder.append(node.getQueryId().getId());
builder.append(node.getQueryId().map(QueryId::toString).orElse("ALL"));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@
import java.util.Optional;

@Immutable
public class TerminateQuery extends Statement {
public final class TerminateQuery extends Statement {

private final QueryId queryId;
private final Optional<QueryId> queryId;

public TerminateQuery(final String queryId) {
this(Optional.empty(), queryId);
public static TerminateQuery all(final Optional<NodeLocation> location) {
return new TerminateQuery(location, Optional.empty());
}

public TerminateQuery(final Optional<NodeLocation> location, final String queryId) {
public static TerminateQuery query(final Optional<NodeLocation> location, final QueryId queryId) {
return new TerminateQuery(location, Optional.of(queryId));
}

private TerminateQuery(final Optional<NodeLocation> location, final Optional<QueryId> queryId) {
super(location);
this.queryId = new QueryId(queryId);
this.queryId = Objects.requireNonNull(queryId, "queryId");
}

public QueryId getQueryId() {
/**
* @return the id of the query to terminate or {@code empty()} if all should be terminated.
*/
public Optional<QueryId> getQueryId() {
return queryId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.common.collect.Iterables;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.DereferenceExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
Expand All @@ -54,7 +53,6 @@
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.AstNode;
import io.confluent.ksql.parser.tree.CreateConnector;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStream;
Expand All @@ -64,7 +62,6 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Join;
import io.confluent.ksql.parser.tree.JoinMatchers;
import io.confluent.ksql.parser.tree.ListProperties;
import io.confluent.ksql.parser.tree.ListQueries;
import io.confluent.ksql.parser.tree.ListStreams;
Expand All @@ -77,11 +74,11 @@
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
Expand All @@ -94,7 +91,6 @@
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy;
import java.util.List;
Expand All @@ -103,8 +99,6 @@
import java.util.concurrent.TimeUnit;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -481,7 +475,7 @@ public void shouldAllowEscapedTerminateQuery() {
.buildSingleAst("TERMINATE QUERY `CSAS-foo_2`;", metaStore);

// Then:
assertThat(statement.getStatement().getQueryId().getId(), is("CSAS-foo_2"));
assertThat(statement.getStatement().getQueryId().map(QueryId::toString), is(Optional.of("CSAS-foo_2")));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.properties.with.CreateConfigs;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlStruct;
Expand All @@ -74,7 +75,6 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

@SuppressWarnings("OptionalGetWithoutIsPresent")
public class SqlFormatterTest {

@Rule
Expand Down Expand Up @@ -584,7 +584,7 @@ public void shouldFormatDropTableStatement() {
@Test
public void shouldFormatTerminateQuery() {
// Given:
final TerminateQuery terminateQuery = new TerminateQuery(Optional.empty(), "FOO");
final TerminateQuery terminateQuery = TerminateQuery.query(Optional.empty(), new QueryId("FOO"));

// When:
final String formatted = SqlFormatter.formatSql(terminateQuery);
Expand All @@ -593,6 +593,18 @@ public void shouldFormatTerminateQuery() {
assertThat(formatted, is("TERMINATE FOO"));
}

@Test
public void shouldFormatTerminateAllQueries() {
// Given:
final TerminateQuery terminateQuery = TerminateQuery.all(Optional.empty());

// When:
final String formatted = SqlFormatter.formatSql(terminateQuery);

// Then:
assertThat(formatted, is("TERMINATE ALL"));
}

@Test
public void shouldFormatShowTables() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@

import com.google.common.testing.EqualsTester;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.query.QueryId;
import java.util.Optional;
import org.junit.Test;

public class TerminateQueryTest {

public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0);
public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0);
private static final String SOME_QUERY_ID = "query0";
private static final QueryId SOME_QUERY_ID = new QueryId("query0");

@SuppressWarnings("UnstableApiUsage")
@Test
public void shouldImplementHashCodeAndEqualsProperty() {
new EqualsTester()
.addEqualityGroup(
// Note: At the moment location does not take part in equality testing
new TerminateQuery(SOME_QUERY_ID),
new TerminateQuery(SOME_QUERY_ID),
new TerminateQuery(Optional.of(SOME_LOCATION), SOME_QUERY_ID),
new TerminateQuery(Optional.of(OTHER_LOCATION), SOME_QUERY_ID)
TerminateQuery.query(Optional.of(SOME_LOCATION), SOME_QUERY_ID),
TerminateQuery.query(Optional.of(OTHER_LOCATION), SOME_QUERY_ID)
)
.addEqualityGroup(
new TerminateQuery("diff")
TerminateQuery.query(Optional.empty(), new QueryId("diff"))
)
.testEquals();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandId.Action;
import io.confluent.ksql.rest.entity.CommandId.Type;
Expand Down Expand Up @@ -65,9 +66,6 @@ interface CommandIdSupplier {
command -> new CommandId(Type.CLUSTER, "TerminateCluster", Action.TERMINATE))
.build();

public CommandIdAssigner() {
}

public CommandId getCommandId(final Statement command) {
final CommandIdSupplier supplier = SUPPLIERS.get(command.getClass());
if (supplier == null) {
Expand Down Expand Up @@ -112,7 +110,7 @@ private static CommandId getDropTypeCommandId(final DropType dropType) {
private static CommandId getTerminateCommandId(final TerminateQuery terminateQuery) {
return new CommandId(
CommandId.Type.TERMINATE,
terminateQuery.getQueryId().toString(),
terminateQuery.getQueryId().map(QueryId::toString).orElse("ALL"),
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
CommandId.Action.EXECUTE
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private void handleStatementWithTerminatedQueries(
}
}

@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings("unchecked")
private void executeStatement(
final PreparedStatement<?> statement,
final Command command,
Expand Down Expand Up @@ -374,9 +374,14 @@ private KsqlConfig buildMergedConfig(final Command command) {
}

private void terminateQuery(final PreparedStatement<TerminateQuery> terminateQuery) {
final QueryId queryId = terminateQuery.getStatement().getQueryId();
final Optional<QueryId> queryId = terminateQuery.getStatement().getQueryId();

ksqlEngine.getPersistentQuery(queryId)
if (!queryId.isPresent()) {
ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::close);
return;
}

ksqlEngine.getPersistentQuery(queryId.get())
.orElseThrow(() ->
new KsqlException(String.format("No running query with id %s was found", queryId)))
.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.util.Map;
import java.util.Optional;

public final class TerminateQueryValidator {

Expand All @@ -34,13 +36,17 @@ public static void validate(
final ServiceContext serviceContext
) {
final TerminateQuery terminateQuery = (TerminateQuery) statement.getStatement();
final QueryId queryId = terminateQuery.getQueryId();
final Optional<QueryId> queryId = terminateQuery.getQueryId();

context.getPersistentQuery(queryId)
if (!queryId.isPresent()) {
context.getPersistentQueries().forEach(PersistentQueryMetadata::close);
return;
}

context.getPersistentQuery(queryId.get())
.orElseThrow(() -> new KsqlStatementException(
"Unknown queryId: " + queryId,
"Unknown queryId: " + queryId.get(),
statement.getStatementText()))
.close();
}

}
Loading