Skip to content

Commit

Permalink
fix: don't cleanup topics on engine close (confluentinc#4658)
Browse files Browse the repository at this point in the history
Co-authored-by: Rohan <[email protected]>
Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
3 people authored and stevenpyzhang committed Mar 24, 2020
1 parent 439ecbc commit f29b9eb
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public long numberOfPersistentQueries() {
@Override
public void close() {
for (final QueryMetadata queryMetadata : allLiveQueries) {
queryMetadata.close();
queryMetadata.stop();
}
adminClient.close();
engineMetrics.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(id, super.hashCode());
}

public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger log = LoggerFactory.getLogger(QueryMetadata.class);

private final String statementString;
private final KafkaStreams kafkaStreams;
private final OutputNode outputNode;
Expand Down Expand Up @@ -115,14 +117,35 @@ public Set<String> getSourceNames() {
return sourceNames;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link QueuedQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close();
if (kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING) {
if (cleanUp) {
kafkaStreams.cleanUp();
kafkaTopicClient.deleteInternalTopics(queryApplicationId);
} else {
log.error("Could not clean up the query with application id: {}. Query status is: {}",
queryApplicationId, kafkaStreams.state());
}
queryStateListener.ifPresent(QueryStateListener::close);
StreamsErrorCollector.notifyApplicationClose(queryApplicationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public void setLimitHandler(final OutputNode.LimitHandler limitHandler) {
getOutputNode().setLimitHandler(limitHandler);
}

@Override
public void stop() {
close();
}

@Override
public void close() {
super.close();
Expand Down

0 comments on commit f29b9eb

Please sign in to comment.