Skip to content

Commit

Permalink
fix: backport fixes from query close (#4704)
Browse files Browse the repository at this point in the history
* fix: idempotent terminate that can handle hung streams (#4643)

Fixes a couple issues with terminate:
- don't throw if the query doesn't get into NOT_RUNNING state. This can
  happen when streams threads are stuck pending shutdown.
- make terminate idempotent

* fix: don't cleanup topics on engine close (#4658)

Co-authored-by: Rohan <[email protected]>
Co-authored-by: Andy Coates <[email protected]>

Co-authored-by: Rohan <[email protected]>
Co-authored-by: Almog Gavra <[email protected]>
Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
4 people authored Mar 6, 2020
1 parent cf9c6b9 commit 1ac7f13
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public ExecuteResult execute(

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
allLiveQueries.forEach(QueryMetadata::stop);
engineMetrics.close();
aggregateMetricsCollector.shutdown();
}
Expand All @@ -216,8 +216,10 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet
final String applicationId = query.getQueryApplicationId();

if (!query.getState().equalsIgnoreCase("NOT_RUNNING")) {
throw new IllegalStateException("query not stopped."
+ " id " + applicationId + ", state: " + query.getState());
log.warn(
"Unregistering query that has not terminated. "
+ "This may happen when streams threads are hung. State: " + query.getState()
);
}

if (!allLiveQueries.remove(query)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,9 @@ public String getSchemasDescription() {
public PersistenceSchemas getPersistenceSchemas() {
return persistenceSchemas;
}

@Override
public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class);

Expand Down Expand Up @@ -148,14 +148,39 @@ public boolean hasEverBeenStarted() {
return everStarted;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link QueuedQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
closeCallback.accept(this);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close();

kafkaStreams.cleanUp();
if (cleanUp) {
kafkaStreams.cleanUp();
}

queryStateListener.ifPresent(QueryStateListener::close);

closeCallback.accept(this);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public void setLimitHandler(final LimitHandler limitHandler) {
limitHandlerSetter.accept(limitHandler);
}

@Override
public void stop() {
close();
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -455,7 +456,7 @@ public void shouldNotDeleteSchemaNorTopicForTable() throws Exception {
}

@Test
public void shouldCleanUpInternalTopicsOnClose() {
public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() {
// Given:
final QueryMetadata query = KsqlEngineTestUtil.execute(ksqlEngine,
"select * from test1;",
Expand All @@ -470,6 +471,42 @@ public void shouldCleanUpInternalTopicsOnClose() {
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
ksqlEngine,
"create stream persistent as select * from test1;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient, never()).deleteInternalTopics(any());
}

@Test
public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
ksqlEngine,
"create stream persistent as select * from test1;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
query.get(0).close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -57,9 +59,11 @@ public class QueryMetadataTest {
@Mock
private Consumer<QueryMetadata> closeCallback;
private QueryMetadata query;
private boolean cleanUp;

@Before
public void setup() {
cleanUp = false;
query = new QueryMetadata(
"foo",
kafkaStreams,
Expand All @@ -72,7 +76,12 @@ public void setup() {
Collections.emptyMap(),
Collections.emptyMap(),
closeCallback
);
) {
@Override
public void stop() {
doClose(cleanUp);
}
};
}

@Test
Expand Down Expand Up @@ -134,6 +143,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() {
inOrder.verify(closeCallback).accept(query);
}

@Test
public void shouldNotCallCloseCallbackOnStop() {
// When:
query.stop();

// Then:
verifyNoMoreInteractions(closeCallback);
}

@Test
public void shouldCallKafkaStreamsCloseOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams).close();
}

@Test
public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
// When:
Expand All @@ -145,6 +172,27 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
inOrder.verify(kafkaStreams).cleanUp();
}

@Test
public void shouldNotCleanUpKStreamsAppOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldCallCleanupOnStopIfCleanup() {
// Given:
cleanUp = true;

// When:
query.stop();

// Then:
verify(kafkaStreams).cleanUp();
}

@Test
public void shouldReturnSources() {
assertThat(query.getSourceNames(), is(SOME_SOURCES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,8 @@ private KsqlConfig buildMergedConfig(final Command command) {
private void terminateQuery(final PreparedStatement<TerminateQuery> terminateQuery) {
final QueryId queryId = terminateQuery.getStatement().getQueryId();

ksqlEngine.getPersistentQuery(queryId)
.orElseThrow(() ->
new KsqlException(String.format("No running query with id %s was found", queryId)))
.close();
final Optional<PersistentQueryMetadata> query = ksqlEngine.getPersistentQuery(queryId);
query.ifPresent(PersistentQueryMetadata::close);
}

private void maybeTerminateQueryForLegacyDropCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.RunScript;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatus.Status;
Expand Down Expand Up @@ -736,6 +737,39 @@ public void shouldRestoreLegacyRunScriptCommand() {
verify(mockParser, mockEngine, mockQuery);
}

@Test
public void shouldDoIdempotentTerminate() {
// Given:
final String queryStatement = "a persistent query";

final TerminateQuery terminate = mock(TerminateQuery.class);
expect(terminate.getQueryId()).andStubReturn(new QueryId("foo"));

expect(mockParser.parseSingleStatement(queryStatement))
.andStubReturn(PreparedStatement.of(queryStatement, terminate));

final PersistentQueryMetadata query = mock(PersistentQueryMetadata.class);
query.close();
expectLastCall();

expect(mockEngine.getPersistentQuery(new QueryId("foo"))).andReturn(Optional.of(query)).once();
expect(mockEngine.getPersistentQuery(new QueryId("foo"))).andReturn(Optional.empty()).once();

replayAll();
final QueuedCommand cmd = new QueuedCommand(
new CommandId(Type.TERMINATE, "-", Action.EXECUTE),
new Command(queryStatement, emptyMap(), emptyMap()),
Optional.empty()
);

// When:
statementExecutorWithMocks.handleStatement(cmd);
statementExecutorWithMocks.handleStatement(cmd);

// Then should not throw
verify(mockParser, mockEngine);
}

private void createStreamsAndStartTwoPersistentQueries() {
final Command csCommand = new Command(
"CREATE STREAM pageview ("
Expand Down

0 comments on commit 1ac7f13

Please sign in to comment.