Skip to content

Commit

Permalink
fix: don't cleanup topics on engine close (confluentinc#4658)
Browse files Browse the repository at this point in the history
Co-authored-by: Rohan <[email protected]>
Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
3 people authored and stevenpyzhang committed Mar 23, 2020
1 parent 62dc5d3 commit 1fd3474
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public long numberOfPersistentQueries() {
@Override
public void close() {
for (final QueryMetadata queryMetadata : allLiveQueries) {
queryMetadata.close();
queryMetadata.stop();
}
adminClient.close();
engineMetrics.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(id, super.hashCode());
}

public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger log = LoggerFactory.getLogger(QueryMetadata.class);

private final String statementString;
private final KafkaStreams kafkaStreams;
private final OutputNode outputNode;
Expand Down Expand Up @@ -115,14 +117,35 @@ public Set<String> getSourceNames() {
return sourceNames;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link QueuedQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close();
if (kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING) {
if (cleanUp) {
kafkaStreams.cleanUp();
kafkaTopicClient.deleteInternalTopics(queryApplicationId);
} else {
log.error("Could not clean up the query with application id: {}. Query status is: {}",
queryApplicationId, kafkaStreams.state());
}
queryStateListener.ifPresent(QueryStateListener::close);
StreamsErrorCollector.notifyApplicationClose(queryApplicationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public void setLimitHandler(final OutputNode.LimitHandler limitHandler) {
getOutputNode().setLimitHandler(limitHandler);
}

@Override
public void stop() {
close();
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public void shouldCloseAdminClientOnClose() {
// Then:
verify(adminClient);
}

@Test
public void shouldUseSerdeSupplierToBuildQueries() {
final KsqlTopicSerDe mockKsqlSerde = mock(KsqlTopicSerDe.class);
Expand Down

0 comments on commit 1fd3474

Please sign in to comment.