From 1ac7f13d8d0f91e84cb2c2fda6ae551853746207 Mon Sep 17 00:00:00 2001 From: Steven Zhang <35498506+stevenpyzhang@users.noreply.github.com> Date: Thu, 5 Mar 2020 19:10:41 -0500 Subject: [PATCH] fix: backport fixes from query close (#4704) * fix: idempotent terminate that can handle hung streams (#4643) Fixes a couple issues with terminate: - don't throw if the query doesn't get into NOT_RUNNING state. This can happen when streams threads are stuck pending shutdown. - make terminate idempotent * fix: don't cleanup topics on engine close (#4658) Co-authored-by: Rohan Co-authored-by: Andy Coates Co-authored-by: Rohan Co-authored-by: Almog Gavra Co-authored-by: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> --- .../io/confluent/ksql/engine/KsqlEngine.java | 8 +-- .../ksql/util/PersistentQueryMetadata.java | 5 ++ .../io/confluent/ksql/util/QueryMetadata.java | 33 ++++++++++-- .../ksql/util/QueuedQueryMetadata.java | 5 ++ .../confluent/ksql/engine/KsqlEngineTest.java | 39 ++++++++++++++- .../ksql/util/QueryMetadataTest.java | 50 ++++++++++++++++++- .../server/computation/StatementExecutor.java | 6 +-- .../computation/StatementExecutorTest.java | 34 +++++++++++++ 8 files changed, 167 insertions(+), 13 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java index 1ed9f048aeae..aa735032c8b1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java @@ -190,7 +190,7 @@ public ExecuteResult execute( @Override public void close() { - allLiveQueries.forEach(QueryMetadata::close); + allLiveQueries.forEach(QueryMetadata::stop); engineMetrics.close(); aggregateMetricsCollector.shutdown(); } @@ -216,8 +216,10 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet final String applicationId = query.getQueryApplicationId(); if (!query.getState().equalsIgnoreCase("NOT_RUNNING")) { - throw new IllegalStateException("query not stopped." - + " id " + applicationId + ", state: " + query.getState()); + log.warn( + "Unregistering query that has not terminated. " + + "This may happen when streams threads are hung. State: " + query.getState() + ); } if (!allLiveQueries.remove(query)) { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index c6709915b629..1133e8c1385d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -120,4 +120,9 @@ public String getSchemasDescription() { public PersistenceSchemas getPersistenceSchemas() { return persistenceSchemas; } + + @Override + public void stop() { + doClose(false); + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index 89e698b31fc5..5cf73b6ae165 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QueryMetadata { +public abstract class QueryMetadata { private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class); @@ -148,14 +148,39 @@ public boolean hasEverBeenStarted() { return everStarted; } + + /** + * Stops the query without cleaning up the external resources + * so that it can be resumed when we call {@link #start()}. + * + *

NOTE: {@link QueuedQueryMetadata} overrides this method + * since any time a transient query is stopped the external resources + * should be cleaned up.

+ * + * @see #close() + */ + public abstract void stop(); + + /** + * Closes the {@code QueryMetadata} and cleans up any of + * the resources associated with it (e.g. internal topics, + * schemas, etc...). + * + * @see QueryMetadata#stop() + */ public void close() { + doClose(true); + closeCallback.accept(this); + } + + protected void doClose(final boolean cleanUp) { kafkaStreams.close(); - kafkaStreams.cleanUp(); + if (cleanUp) { + kafkaStreams.cleanUp(); + } queryStateListener.ifPresent(QueryStateListener::close); - - closeCallback.accept(this); } public void start() { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java index faacd5aa39de..4d2adda2fb3c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java @@ -99,6 +99,11 @@ public void setLimitHandler(final LimitHandler limitHandler) { limitHandlerSetter.accept(limitHandler); } + @Override + public void stop() { + close(); + } + @Override public void close() { super.close(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 73605d86f973..faa4ce7d4f1e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -34,6 +34,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -455,7 +456,7 @@ public void shouldNotDeleteSchemaNorTopicForTable() throws Exception { } @Test - public void shouldCleanUpInternalTopicsOnClose() { + public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() { // Given: final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine, "select * from test1;", @@ -470,6 +471,42 @@ public void shouldCleanUpInternalTopicsOnClose() { verify(topicClient).deleteInternalTopics(query.getQueryApplicationId()); } + @Test + public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() { + // Given: + final List query = KsqlEngineTestUtil.execute( + ksqlEngine, + "create stream persistent as select * from test1;", + KSQL_CONFIG, Collections.emptyMap() + ); + + query.get(0).start(); + + // When: + ksqlEngine.close(); + + // Then (there are no transient queries, so no internal topics should be deleted): + verify(topicClient, never()).deleteInternalTopics(any()); + } + + @Test + public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() { + // Given: + final List query = KsqlEngineTestUtil.execute( + ksqlEngine, + "create stream persistent as select * from test1;", + KSQL_CONFIG, Collections.emptyMap() + ); + + query.get(0).start(); + + // When: + query.get(0).close(); + + // Then (there are no transient queries, so no internal topics should be deleted): + verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId()); + } + @Test public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() { // Given: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java index cf35cbd7e3a7..968e6a56ecf3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/QueryMetadataTest.java @@ -19,7 +19,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; @@ -57,9 +59,11 @@ public class QueryMetadataTest { @Mock private Consumer closeCallback; private QueryMetadata query; + private boolean cleanUp; @Before public void setup() { + cleanUp = false; query = new QueryMetadata( "foo", kafkaStreams, @@ -72,7 +76,12 @@ public void setup() { Collections.emptyMap(), Collections.emptyMap(), closeCallback - ); + ) { + @Override + public void stop() { + doClose(cleanUp); + } + }; } @Test @@ -134,6 +143,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() { inOrder.verify(closeCallback).accept(query); } + @Test + public void shouldNotCallCloseCallbackOnStop() { + // When: + query.stop(); + + // Then: + verifyNoMoreInteractions(closeCallback); + } + + @Test + public void shouldCallKafkaStreamsCloseOnStop() { + // When: + query.stop(); + + // Then: + verify(kafkaStreams).close(); + } + @Test public void shouldCleanUpKStreamsAppAfterCloseOnClose() { // When: @@ -145,6 +172,27 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() { inOrder.verify(kafkaStreams).cleanUp(); } + @Test + public void shouldNotCleanUpKStreamsAppOnStop() { + // When: + query.stop(); + + // Then: + verify(kafkaStreams, never()).cleanUp(); + } + + @Test + public void shouldCallCleanupOnStopIfCleanup() { + // Given: + cleanUp = true; + + // When: + query.stop(); + + // Then: + verify(kafkaStreams).cleanUp(); + } + @Test public void shouldReturnSources() { assertThat(query.getSourceNames(), is(SOME_SOURCES)); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index 406c9a1f1262..9ef8fbbb3242 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -310,10 +310,8 @@ private KsqlConfig buildMergedConfig(final Command command) { private void terminateQuery(final PreparedStatement terminateQuery) { final QueryId queryId = terminateQuery.getStatement().getQueryId(); - ksqlEngine.getPersistentQuery(queryId) - .orElseThrow(() -> - new KsqlException(String.format("No running query with id %s was found", queryId))) - .close(); + final Optional query = ksqlEngine.getPersistentQuery(queryId); + query.ifPresent(PersistentQueryMetadata::close); } private void maybeTerminateQueryForLegacyDropCommand( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java index 8f825e2c5eb7..86c9bee02a1f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java @@ -52,6 +52,7 @@ import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatus.Status; @@ -736,6 +737,39 @@ public void shouldRestoreLegacyRunScriptCommand() { verify(mockParser, mockEngine, mockQuery); } + @Test + public void shouldDoIdempotentTerminate() { + // Given: + final String queryStatement = "a persistent query"; + + final TerminateQuery terminate = mock(TerminateQuery.class); + expect(terminate.getQueryId()).andStubReturn(new QueryId("foo")); + + expect(mockParser.parseSingleStatement(queryStatement)) + .andStubReturn(PreparedStatement.of(queryStatement, terminate)); + + final PersistentQueryMetadata query = mock(PersistentQueryMetadata.class); + query.close(); + expectLastCall(); + + expect(mockEngine.getPersistentQuery(new QueryId("foo"))).andReturn(Optional.of(query)).once(); + expect(mockEngine.getPersistentQuery(new QueryId("foo"))).andReturn(Optional.empty()).once(); + + replayAll(); + final QueuedCommand cmd = new QueuedCommand( + new CommandId(Type.TERMINATE, "-", Action.EXECUTE), + new Command(queryStatement, emptyMap(), emptyMap()), + Optional.empty() + ); + + // When: + statementExecutorWithMocks.handleStatement(cmd); + statementExecutorWithMocks.handleStatement(cmd); + + // Then should not throw + verify(mockParser, mockEngine); + } + private void createStreamsAndStartTwoPersistentQueries() { final Command csCommand = new Command( "CREATE STREAM pageview ("