Skip to content

Commit

Permalink
fix: don't cleanup topics on engine close (confluentinc#4658)
Browse files Browse the repository at this point in the history
Co-authored-by: Rohan <[email protected]>
Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
3 people authored and stevenpyzhang committed Mar 24, 2020
1 parent 439ecbc commit 6eb85a7
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 23 deletions.
3 changes: 3 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ public long numberOfPersistentQueries() {
@Override
public void close() {
for (final QueryMetadata queryMetadata : allLiveQueries) {
if (queryMetadata instanceof PersistentQueryMetadata) {
queryMetadata.stop();
}
queryMetadata.close();
}
adminClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(id, super.hashCode());
}

public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger log = LoggerFactory.getLogger(QueryMetadata.class);

private final String statementString;
private final KafkaStreams kafkaStreams;
private final OutputNode outputNode;
Expand Down Expand Up @@ -115,14 +117,35 @@ public Set<String> getSourceNames() {
return sourceNames;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link QueuedQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close();
if (kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING) {
if (cleanUp) {
kafkaStreams.cleanUp();
kafkaTopicClient.deleteInternalTopics(queryApplicationId);
} else {
log.error("Could not clean up the query with application id: {}. Query status is: {}",
queryApplicationId, kafkaStreams.state());
}
queryStateListener.ifPresent(QueryStateListener::close);
StreamsErrorCollector.notifyApplicationClose(queryApplicationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public void setLimitHandler(final OutputNode.LimitHandler limitHandler) {
getOutputNode().setLimitHandler(limitHandler);
}

@Override
public void stop() {
close();
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -49,25 +51,19 @@ public class QueryMetadataTest {
private Metrics metrics;
private MetricName metricName;
private final String metricGroupName = "ksql-queries";
private QueryMetadata query;
private boolean cleanUp;

@Before
public void setup() {
cleanUp = false;
metrics = MetricsTestUtil.getMetrics();
metricName = metrics.metricName("query-status", metricGroupName,
"The current status of the given query.",
Collections.singletonMap("status", queryApplicationId));
outputNode = EasyMock.niceMock(OutputNode.class);
kafkaStreams = EasyMock.niceMock(KafkaStreams.class);
}

@Test
public void shouldAddandRemoveTheMetricOnClose() {
EasyMock.expect(kafkaStreams.state()).andReturn(State.RUNNING).once();
EasyMock.expect(kafkaStreams.state()).andReturn(State.NOT_RUNNING).once();
EasyMock.replay(kafkaStreams, outputNode);
final QueryStateListener queryStateListener = new QueryStateListener(metrics, kafkaStreams, queryApplicationId);
kafkaStreams.setStateListener(EasyMock.eq(queryStateListener));
final QueryMetadata queryMetadata = new QueryMetadata(
query = new QueryMetadata(
statementString,
kafkaStreams,
outputNode,
Expand All @@ -77,19 +73,68 @@ public void shouldAddandRemoveTheMetricOnClose() {
kafkaTopicClient,
topoplogy,
overriddenProperties
);
queryMetadata.registerQueryStateListener(queryStateListener);
queryMetadata.start();
) {
@Override
public void stop() {
doClose(cleanUp);
}
};
}

@Test
public void shouldAddandRemoveTheMetricOnClose() {
EasyMock.expect(kafkaStreams.state()).andReturn(State.RUNNING).once();
EasyMock.replay(kafkaStreams);
final QueryStateListener queryStateListener = new QueryStateListener(metrics, kafkaStreams, queryApplicationId);
kafkaStreams.setStateListener(EasyMock.eq(queryStateListener));
query.registerQueryStateListener(queryStateListener);
query.start();
assertThat(metrics.metric(metricName).metricName().name(), equalTo("query-status"));
assertThat(metrics.metric(metricName).metricValue().toString(), equalTo("RUNNING"));
queryStateListener.onChange(State.REBALANCING, State.RUNNING);
assertThat(metrics.metric(metricName).metricValue().toString(), equalTo("REBALANCING"));
queryStateListener.onChange(State.RUNNING, State.REBALANCING);
assertThat(metrics.metric(metricName).metricValue().toString(), equalTo("RUNNING"));
queryMetadata.close();
query.close();
EasyMock.verify(kafkaStreams);
assertThat(metrics.metric(metricName), nullValue());
}

@Test
public void shouldCallKafkaStreamsCloseOnStop() {
kafkaStreams.close();
EasyMock.expectLastCall().times(1);
EasyMock.replay(kafkaStreams);

// When:
query.stop();
EasyMock.verify(kafkaStreams);
}

@Test
public void shouldNotCleanUpKStreamsAppOnStop() {
kafkaStreams.cleanUp();
EasyMock.expectLastCall().andAnswer(() -> {
Assert.fail();
return null;
}).anyTimes();
EasyMock.replay(kafkaStreams);

// When:
query.stop();
EasyMock.verify(kafkaStreams);
}

@Test
public void shouldCallCleanupOnStopIfCleanup() {
// Given:
cleanUp = true;
kafkaStreams.cleanUp();
EasyMock.expectLastCall().times(1);
EasyMock.replay(kafkaStreams);

// When:
query.stop();
EasyMock.verify(kafkaStreams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,7 @@ private void terminateQuery(
final TerminateQuery terminateQuery,
final Mode mode) throws Exception {
final QueryId queryId = terminateQuery.getQueryId();
if (!ksqlEngine.terminateQuery(queryId, mode == Mode.EXECUTE)) {
throw new Exception(String.format("No running query with id %s was found", queryId));
}
ksqlEngine.terminateQuery(queryId, mode == Mode.EXECUTE);
}

private void maybeTerminateQueryForLegacyDropCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.server.computation;

import static java.util.Collections.emptyMap;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
Expand All @@ -23,7 +24,6 @@
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reportMatcher;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -40,6 +40,7 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.StructuredDataSource;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.DdlStatement;
import io.confluent.ksql.parser.tree.DropStream;
Expand Down Expand Up @@ -741,6 +742,39 @@ private void createStreamsAndTables() {
handleStatement(ctasCommand, ctasCommandId, Optional.empty());
}

@Test
public void shouldDoIdempotentTerminate() {
// Given:
final String queryStatement = "a persistent query";

final TerminateQuery terminate = mock(TerminateQuery.class);
expect(terminate.getQueryId()).andStubReturn(new QueryId("foo"));

expect(mockParser.parseSingleStatement(queryStatement))
.andStubReturn(terminate);

final PersistentQueryMetadata query = mock(PersistentQueryMetadata.class);
query.close();
expectLastCall();

expect(mockEngine.terminateQuery(new QueryId("foo"), true)).andReturn(true).once();
expect(mockEngine.terminateQuery(new QueryId("foo"), true)).andReturn(false).once();

replayAll();
final QueuedCommand cmd = new QueuedCommand(
new CommandId(Type.TERMINATE, "-", Action.EXECUTE),
new Command(queryStatement, emptyMap(), emptyMap()),
Optional.empty()
);

// When:
statementExecutorWithMocks.handleStatement(cmd);
statementExecutorWithMocks.handleStatement(cmd);

// Then:
verify(mockParser, mockEngine);
}

private void tryDropThatViolatesReferentialIntegrity() {
final Command dropStreamCommand1 = new Command(
"drop stream pageview;",
Expand Down

0 comments on commit 6eb85a7

Please sign in to comment.