From f60d6c5f4e2252c58f1445ed015cfab065c26c21 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 21 Nov 2019 17:32:21 +0000 Subject: [PATCH 1/2] feat: add support to terminate all running queries You can now issue `TERMINATE ALL` to terminate all running queries. BREAKING CHANGE: `ALL` is now a reserved word and can not be used for identifiers without being quoted. --- .../io/confluent/ksql/parser/SqlBase.g4 | 2 + .../io/confluent/ksql/parser/AstBuilder.java | 23 +++--- .../confluent/ksql/parser/SqlFormatter.java | 14 +--- .../ksql/parser/tree/TerminateQuery.java | 21 +++-- .../confluent/ksql/parser/KsqlParserTest.java | 10 +-- .../ksql/parser/SqlFormatterTest.java | 16 +++- .../ksql/parser/tree/TerminateQueryTest.java | 12 +-- .../server/computation/CommandIdAssigner.java | 6 +- .../InteractiveStatementExecutor.java | 11 ++- .../validation/TerminateQueryValidator.java | 14 +++- .../InteractiveStatementExecutorTest.java | 39 +++++++++ .../server/resources/KsqlResourceTest.java | 24 ++++++ .../TerminateQueryValidatorTest.java | 81 ++++++++++++++----- 13 files changed, 196 insertions(+), 77 deletions(-) diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 2c5c0ad3a0de..8488eeb0cff7 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -46,6 +46,7 @@ statement | PRINT (identifier| STRING) printClause #printTopic | (LIST | SHOW) QUERIES EXTENDED? #listQueries | TERMINATE QUERY? identifier #terminateQuery + | TERMINATE ALL #terminateQuery | SET STRING EQ STRING #setProperty | UNSET STRING #unsetProperty | CREATE STREAM (IF NOT EXISTS)? sourceName @@ -339,6 +340,7 @@ CHANGES: 'CHANGES'; SELECT: 'SELECT'; FROM: 'FROM'; AS: 'AS'; +ALL: 'ALL'; DISTINCT: 'DISTINCT'; WHERE: 'WHERE'; WITHIN: 'WITHIN'; diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index b46c23562a75..40bb3ed4a140 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -126,6 +126,7 @@ import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.parser.tree.WindowExpression; import io.confluent.ksql.parser.tree.WithinExpression; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.Operator; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.SqlTypeParser; @@ -157,23 +158,21 @@ public AstBuilder(final TypeRegistry typeRegistry) { this.typeRegistry = requireNonNull(typeRegistry, "typeRegistry"); } - @SuppressWarnings("unchecked") public Statement buildStatement(final ParserRuleContext parseTree) { - return build(Optional.of(getSources(parseTree)), typeRegistry, parseTree); + return build(Optional.of(getSources(parseTree)), parseTree); } public Expression buildExpression(final ParserRuleContext parseTree) { - return build(Optional.empty(), typeRegistry, parseTree); + return build(Optional.empty(), parseTree); } public WindowExpression buildWindowExpression(final ParserRuleContext parseTree) { - return build(Optional.empty(), typeRegistry, parseTree); + return build(Optional.empty(), parseTree); } @SuppressWarnings("unchecked") private T build( final Optional> sources, - final TypeRegistry typeRegistry, final ParserRuleContext parseTree) { return (T) new Visitor(sources, typeRegistry).visit(parseTree); } @@ -632,11 +631,15 @@ public Node visitListTypes(final ListTypesContext ctx) { @Override public Node visitTerminateQuery(final SqlBaseParser.TerminateQueryContext context) { - return new TerminateQuery( - getLocation(context), - // use case sensitive parsing here to maintain backwards compatibility - ParserUtil.getIdentifierText(true, context.identifier()) - ); + final Optional location = getLocation(context); + + return context.ALL() != null + ? TerminateQuery.all(location) + : TerminateQuery.query( + location, + // use case sensitive parsing here to maintain backwards compatibility + new QueryId(ParserUtil.getIdentifierText(true, context.identifier())) + ); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index b846b07544b1..dddde01cb1d8 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -56,19 +56,18 @@ import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.UnsetProperty; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.util.IdentifierUtil; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; public final class SqlFormatter { private static final String INDENT = " "; - private static final Pattern NAME_PATTERN = Pattern.compile("[a-z_][a-z0-9_]*"); private static final FormatOptions FORMAT_OPTIONS = FormatOptions.of(IdentifierUtil::needsQuotes); private SqlFormatter() { @@ -287,13 +286,6 @@ protected Void visitCreateTableAsSelect( return null; } - private static String formatName(final String name) { - if (NAME_PATTERN.matcher(name).matches()) { - return name; - } - return "\"" + name + "\""; - } - @Override protected Void visitInsertInto(final InsertInto node, final Integer indent) { builder.append("INSERT INTO "); @@ -329,7 +321,7 @@ protected Void visitInsertValues(final InsertValues node, final Integer context) builder.append( node.getValues() .stream() - .map(exp -> ExpressionFormatterUtil.formatExpression(exp)) + .map(ExpressionFormatterUtil::formatExpression) .collect(Collectors.joining(", "))); builder.append(")"); @@ -345,7 +337,7 @@ protected Void visitDropTable(final DropTable node, final Integer context) { @Override protected Void visitTerminateQuery(final TerminateQuery node, final Integer context) { builder.append("TERMINATE "); - builder.append(node.getQueryId().getId()); + builder.append(node.getQueryId().map(QueryId::toString).orElse("ALL")); return null; } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java index 52bf610733b0..4664072056e9 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TerminateQuery.java @@ -22,20 +22,27 @@ import java.util.Optional; @Immutable -public class TerminateQuery extends Statement { +public final class TerminateQuery extends Statement { - private final QueryId queryId; + private final Optional queryId; - public TerminateQuery(final String queryId) { - this(Optional.empty(), queryId); + public static TerminateQuery all(final Optional location) { + return new TerminateQuery(location, Optional.empty()); } - public TerminateQuery(final Optional location, final String queryId) { + public static TerminateQuery query(final Optional location, final QueryId queryId) { + return new TerminateQuery(location, Optional.of(queryId)); + } + + private TerminateQuery(final Optional location, final Optional queryId) { super(location); - this.queryId = new QueryId(queryId); + this.queryId = Objects.requireNonNull(queryId, "queryId"); } - public QueryId getQueryId() { + /** + * @return the id of the query to terminate or {@code empty()} if all should be terminated. + */ + public Optional getQueryId() { return queryId; } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 77bbd3c9827b..8a443c602fc3 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -34,7 +34,6 @@ import com.google.common.collect.Iterables; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; -import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.ComparisonExpression; import io.confluent.ksql.execution.expression.tree.DereferenceExpression; import io.confluent.ksql.execution.expression.tree.Expression; @@ -54,7 +53,6 @@ import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.parser.tree.AliasedRelation; import io.confluent.ksql.parser.tree.AllColumns; -import io.confluent.ksql.parser.tree.AstNode; import io.confluent.ksql.parser.tree.CreateConnector; import io.confluent.ksql.parser.tree.CreateSource; import io.confluent.ksql.parser.tree.CreateStream; @@ -64,7 +62,6 @@ import io.confluent.ksql.parser.tree.DropTable; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Join; -import io.confluent.ksql.parser.tree.JoinMatchers; import io.confluent.ksql.parser.tree.ListProperties; import io.confluent.ksql.parser.tree.ListQueries; import io.confluent.ksql.parser.tree.ListStreams; @@ -77,11 +74,11 @@ import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.WithinExpression; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; @@ -94,7 +91,6 @@ import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy; import java.util.List; @@ -103,8 +99,6 @@ import java.util.concurrent.TimeUnit; import org.hamcrest.Description; import org.hamcrest.Matcher; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; import org.junit.Before; @@ -481,7 +475,7 @@ public void shouldAllowEscapedTerminateQuery() { .buildSingleAst("TERMINATE QUERY `CSAS-foo_2`;", metaStore); // Then: - assertThat(statement.getStatement().getQueryId().getId(), is("CSAS-foo_2")); + assertThat(statement.getStatement().getQueryId().map(QueryId::toString), is(Optional.of("CSAS-foo_2"))); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index 38236fe2eeaa..1d75e587f685 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -55,6 +55,7 @@ import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.properties.with.CreateConfigs; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlStruct; @@ -74,7 +75,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -@SuppressWarnings("OptionalGetWithoutIsPresent") public class SqlFormatterTest { @Rule @@ -584,7 +584,7 @@ public void shouldFormatDropTableStatement() { @Test public void shouldFormatTerminateQuery() { // Given: - final TerminateQuery terminateQuery = new TerminateQuery(Optional.empty(), "FOO"); + final TerminateQuery terminateQuery = TerminateQuery.query(Optional.empty(), new QueryId("FOO")); // When: final String formatted = SqlFormatter.formatSql(terminateQuery); @@ -593,6 +593,18 @@ public void shouldFormatTerminateQuery() { assertThat(formatted, is("TERMINATE FOO")); } + @Test + public void shouldFormatTerminateAllQueries() { + // Given: + final TerminateQuery terminateQuery = TerminateQuery.all(Optional.empty()); + + // When: + final String formatted = SqlFormatter.formatSql(terminateQuery); + + // Then: + assertThat(formatted, is("TERMINATE ALL")); + } + @Test public void shouldFormatShowTables() { // Given: diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java index 74ac92955ef7..5abe48d33cc0 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TerminateQueryTest.java @@ -17,6 +17,7 @@ import com.google.common.testing.EqualsTester; import io.confluent.ksql.parser.NodeLocation; +import io.confluent.ksql.query.QueryId; import java.util.Optional; import org.junit.Test; @@ -24,20 +25,19 @@ public class TerminateQueryTest { public static final NodeLocation SOME_LOCATION = new NodeLocation(0, 0); public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); - private static final String SOME_QUERY_ID = "query0"; + private static final QueryId SOME_QUERY_ID = new QueryId("query0"); + @SuppressWarnings("UnstableApiUsage") @Test public void shouldImplementHashCodeAndEqualsProperty() { new EqualsTester() .addEqualityGroup( // Note: At the moment location does not take part in equality testing - new TerminateQuery(SOME_QUERY_ID), - new TerminateQuery(SOME_QUERY_ID), - new TerminateQuery(Optional.of(SOME_LOCATION), SOME_QUERY_ID), - new TerminateQuery(Optional.of(OTHER_LOCATION), SOME_QUERY_ID) + TerminateQuery.query(Optional.of(SOME_LOCATION), SOME_QUERY_ID), + TerminateQuery.query(Optional.of(OTHER_LOCATION), SOME_QUERY_ID) ) .addEqualityGroup( - new TerminateQuery("diff") + TerminateQuery.query(Optional.empty(), new QueryId("diff")) ) .testEquals(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java index 80aadda0cdfc..d478cc14b3de 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandIdAssigner.java @@ -27,6 +27,7 @@ import io.confluent.ksql.parser.tree.RegisterType; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.TerminateQuery; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; import io.confluent.ksql.rest.entity.CommandId.Type; @@ -65,9 +66,6 @@ interface CommandIdSupplier { command -> new CommandId(Type.CLUSTER, "TerminateCluster", Action.TERMINATE)) .build(); - public CommandIdAssigner() { - } - public CommandId getCommandId(final Statement command) { final CommandIdSupplier supplier = SUPPLIERS.get(command.getClass()); if (supplier == null) { @@ -112,7 +110,7 @@ private static CommandId getDropTypeCommandId(final DropType dropType) { private static CommandId getTerminateCommandId(final TerminateQuery terminateQuery) { return new CommandId( CommandId.Type.TERMINATE, - terminateQuery.getQueryId().toString(), + terminateQuery.getQueryId().map(QueryId::toString).orElse("ALL"), CommandId.Action.EXECUTE ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index a3fa31e69f9e..ef02bf2d80ef 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -226,7 +226,7 @@ private void handleStatementWithTerminatedQueries( } } - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings("unchecked") private void executeStatement( final PreparedStatement statement, final Command command, @@ -374,9 +374,14 @@ private KsqlConfig buildMergedConfig(final Command command) { } private void terminateQuery(final PreparedStatement terminateQuery) { - final QueryId queryId = terminateQuery.getStatement().getQueryId(); + final Optional queryId = terminateQuery.getStatement().getQueryId(); - ksqlEngine.getPersistentQuery(queryId) + if (!queryId.isPresent()) { + ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::close); + return; + } + + ksqlEngine.getPersistentQuery(queryId.get()) .orElseThrow(() -> new KsqlException(String.format("No running query with id %s was found", queryId))) .close(); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java index e1f7c7ca11fc..33ab62e5843e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidator.java @@ -21,7 +21,9 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlStatementException; +import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.Map; +import java.util.Optional; public final class TerminateQueryValidator { @@ -34,13 +36,17 @@ public static void validate( final ServiceContext serviceContext ) { final TerminateQuery terminateQuery = (TerminateQuery) statement.getStatement(); - final QueryId queryId = terminateQuery.getQueryId(); + final Optional queryId = terminateQuery.getQueryId(); - context.getPersistentQuery(queryId) + if (!queryId.isPresent()) { + context.getPersistentQueries().forEach(PersistentQueryMetadata::close); + return; + } + + context.getPersistentQuery(queryId.get()) .orElseThrow(() -> new KsqlStatementException( - "Unknown queryId: " + queryId, + "Unknown queryId: " + queryId.get(), statement.getStatementText())) .close(); } - } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index da3aee586aa8..86f0bf6f0859 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -19,6 +19,7 @@ import static io.confluent.ksql.parser.ParserMatchers.preparedStatement; import static java.util.Collections.emptyMap; import static org.easymock.EasyMock.anyLong; +import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; @@ -55,6 +56,7 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.query.id.HybridQueryIdGenerator; @@ -735,6 +737,43 @@ public void shouldRestoreLegacyRunScriptCommand() { verify(mockParser, mockEngine, mockQuery); } + @Test + public void shouldTerminateAll() { + // Given: + final String queryStatement = "a persistent query"; + + final TerminateQuery terminateAll = mock(TerminateQuery.class); + expect(terminateAll.getQueryId()).andReturn(Optional.empty()); + + expect(mockParser.parseSingleStatement(anyString())) + .andReturn(PreparedStatement.of(queryStatement, terminateAll)); + + final PersistentQueryMetadata query0 = mock(PersistentQueryMetadata.class); + query0.close(); + expectLastCall(); + + final PersistentQueryMetadata query1 = mock(PersistentQueryMetadata.class); + query1.close(); + expectLastCall(); + + expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of(query0, query1)); + + replayAll(); + + // When: + statementExecutorWithMocks.handleStatement( + new QueuedCommand( + new CommandId(Type.TERMINATE, "-", Action.EXECUTE), + new Command("terminate all", true, emptyMap(), emptyMap()), + Optional.empty(), + 0L + ) + ); + + // Then: + verify(query0, query1); + } + private void createStreamsAndStartTwoPersistentQueries() { final Command csCommand = new Command( "CREATE STREAM pageview (" diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index c64edd86ab35..3009b52b56ba 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -1160,6 +1160,30 @@ public void shouldDistributeTerminateQuery() { assertThat(result.getStatementText(), is(terminateSql)); } + @Test + public void shouldDistributeTerminateAllQueries() { + // Given: + createQuery( + "CREATE STREAM test_explain AS SELECT * FROM test_stream;", + emptyMap()); + + final String terminateSql = "TERMINATE ALL;"; + + // When: + final CommandStatusEntity result = makeSingleRequest(terminateSql, CommandStatusEntity.class); + + // Then: + verify(commandStore) + .enqueueCommand( + argThat(is(configured(preparedStatement( + is(terminateSql), + is(TerminateQuery.all(Optional.empty())))) + )), + any(Producer.class)); + + assertThat(result.getStatementText(), is(terminateSql)); + } + @Test public void shouldThrowOnTerminateUnknownQuery() { // Then: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java index 91ec449d3614..8963ef4d6a83 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/TerminateQueryValidatorTest.java @@ -16,15 +16,18 @@ package io.confluent.ksql.rest.server.validation; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.TerminateQuery; -import io.confluent.ksql.rest.server.TemporaryEngine; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.Optional; @@ -32,13 +35,25 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class TerminateQueryValidatorTest { - @Rule public final TemporaryEngine engine = new TemporaryEngine(); - @Rule public final ExpectedException expectedException = ExpectedException.none(); + private static final KsqlConfig KSQL_CONFIG = new KsqlConfig(ImmutableMap.of()); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private PersistentQueryMetadata query0; + @Mock + private PersistentQueryMetadata query1; + @Mock + private KsqlEngine engine; + @Mock + private ServiceContext serviceContext; @Test public void shouldFailOnTerminateUnknownQueryId() { @@ -48,34 +63,56 @@ public void shouldFailOnTerminateUnknownQueryId() { // When: CustomValidators.TERMINATE_QUERY.validate( - ConfiguredStatement.of( - PreparedStatement.of("", new TerminateQuery("id")), - ImmutableMap.of(), - engine.getKsqlConfig() - ), + configuredStmt(TerminateQuery.query(Optional.empty(), new QueryId("id"))), ImmutableMap.of(), - engine.getEngine(), - engine.getServiceContext() + engine, + serviceContext ); } @Test public void shouldValidateKnownQueryId() { // Given: - final PersistentQueryMetadata metadata = mock(PersistentQueryMetadata.class); - final KsqlEngine mockEngine = mock(KsqlEngine.class); - when(mockEngine.getPersistentQuery(any())).thenReturn(Optional.ofNullable(metadata)); + when(engine.getPersistentQuery(any())).thenReturn(Optional.of(query0)); - // Expect nothing when: + // When: CustomValidators.TERMINATE_QUERY.validate( - ConfiguredStatement.of( - PreparedStatement.of("", new TerminateQuery("id")), - ImmutableMap.of(), - engine.getKsqlConfig() - ), + configuredStmt(TerminateQuery.query(Optional.empty(), new QueryId("id"))), ImmutableMap.of(), - mockEngine, - engine.getServiceContext() + engine, + serviceContext + ); + + // Then: + verify(query0).close(); + } + + @Test + public void shouldValidateTerminateAllQueries() { + // Given: + when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(query0, query1)); + + // When: + CustomValidators.TERMINATE_QUERY.validate( + configuredStmt(TerminateQuery.all(Optional.empty())), + ImmutableMap.of(), + engine, + serviceContext + ); + + // Then: + verify(query0).close(); + verify(query1).close(); + } + + private static ConfiguredStatement configuredStmt( + final TerminateQuery terminateQuery + ) { + return ConfiguredStatement.of( + PreparedStatement.of("meh", terminateQuery), + ImmutableMap.of(), + KSQL_CONFIG ); } } + From 99c39a162c069d40d06b1fa11cf514f95a29e5aa Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Sat, 23 Nov 2019 08:04:15 +0000 Subject: [PATCH 2/2] chore: changes requested by Derek --- ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 | 2 +- .../src/test/java/io/confluent/ksql/parser/KsqlParserTest.java | 2 +- .../java/io/confluent/ksql/rest/integration/RestApiTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 8488eeb0cff7..4a0ae292bbf4 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -45,7 +45,7 @@ statement | DESCRIBE CONNECTOR identifier #describeConnector | PRINT (identifier| STRING) printClause #printTopic | (LIST | SHOW) QUERIES EXTENDED? #listQueries - | TERMINATE QUERY? identifier #terminateQuery + | TERMINATE identifier #terminateQuery | TERMINATE ALL #terminateQuery | SET STRING EQ STRING #setProperty | UNSET STRING #unsetProperty diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 5fb86c397410..1757b4a4228a 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -472,7 +472,7 @@ public void shouldThrowOnNonAlphanumericSourceName() { public void shouldAllowEscapedTerminateQuery() { // When: final PreparedStatement statement = KsqlParserTestUtil - .buildSingleAst("TERMINATE QUERY `CSAS-foo_2`;", metaStore); + .buildSingleAst("TERMINATE `CSAS-foo_2`;", metaStore); // Then: assertThat(statement.getStatement().getQueryId().map(QueryId::toString), is(Optional.of("CSAS-foo_2"))); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index 2a0675503ba9..41262bc3e73d 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -361,7 +361,7 @@ public void shouldDeleteTopic() { // Given: makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); final String query = REST_APP.getPersistentQueries().iterator().next(); - makeKsqlRequest("TERMINATE QUERY " + query + ";"); + makeKsqlRequest("TERMINATE " + query + ";"); assertThat("Expected topic X to be created", topicExists("X"));