diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java index 1427b977cae9..35f174206f6d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java @@ -533,7 +533,7 @@ public long numberOfPersistentQueries() { @Override public void close() { for (final QueryMetadata queryMetadata : allLiveQueries) { - queryMetadata.close(); + queryMetadata.stop(); } adminClient.close(); engineMetrics.close(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index 1f4f7dbe1a28..a6960a0bee48 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -95,4 +95,8 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(id, super.hashCode()); } + + public void stop() { + doClose(false); + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java index 99e498cd7d65..f6c617c7e366 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java @@ -33,8 +33,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QueryMetadata { +public abstract class QueryMetadata { + private static final Logger log = LoggerFactory.getLogger(QueryMetadata.class); + private final String statementString; private final KafkaStreams kafkaStreams; private final OutputNode outputNode; @@ -115,14 +117,35 @@ public Set getSourceNames() { return sourceNames; } + + /** + * Stops the query without cleaning up the external resources + * so that it can be resumed when we call {@link #start()}. + * + *

NOTE: {@link QueuedQueryMetadata} overrides this method + * since any time a transient query is stopped the external resources + * should be cleaned up.

+ * + * @see #close() + */ + public abstract void stop(); + + /** + * Closes the {@code QueryMetadata} and cleans up any of + * the resources associated with it (e.g. internal topics, + * schemas, etc...). + * + * @see QueryMetadata#stop() + */ public void close() { + doClose(true); + } + + protected void doClose(final boolean cleanUp) { kafkaStreams.close(); - if (kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING) { + if (cleanUp) { kafkaStreams.cleanUp(); kafkaTopicClient.deleteInternalTopics(queryApplicationId); - } else { - log.error("Could not clean up the query with application id: {}. Query status is: {}", - queryApplicationId, kafkaStreams.state()); } queryStateListener.ifPresent(QueryStateListener::close); StreamsErrorCollector.notifyApplicationClose(queryApplicationId); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java index c9d7176a4426..72b7dd65607a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java @@ -75,6 +75,11 @@ public void setLimitHandler(final OutputNode.LimitHandler limitHandler) { getOutputNode().setLimitHandler(limitHandler); } + @Override + public void stop() { + close(); + } + @Override public void close() { super.close();