-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: don't clean up streams internal topics on service exit #4655
Conversation
@agavra @big-andy-coates can you prioritize reviewing and getting this merged ASAP? |
3 similar comments
@agavra @big-andy-coates can you prioritize reviewing and getting this merged ASAP? |
@agavra @big-andy-coates can you prioritize reviewing and getting this merged ASAP? |
@agavra @big-andy-coates can you prioritize reviewing and getting this merged ASAP? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Nice catch @rodesai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this now not clean up internal topics for transient queries when the server stops?
We probably want different behaviour for transient vs persistent queries.
I'd be tempted to have a single close()
method on QueryMetadata
which deletes the internal state, and then a stop()
method on PersistentQueryMetadata
which stops the topology, but doesn't delete any internal state.
class QueryMetadata {
...
/**
* Stops the query and deletes any intermediate state associated with it.
*/
public void close() {
close(true);
}
void close(final boolean cleanup) {
}
}
class PersistentQueryMetadata extends QueryMetadata {
/**
* stop the query, leaving any intermediate state in-place.
*/
public void stop() {
close(false);
}
}
class KsqlEngine {
void close() {
for (query: allQueries() ) {
if (query instanceof PersistentQuery) {
((PersistentQuery)query).stop();
} else {
query.close();
}
}
}
}
Thoughts?
IMHO, this removes the boolean on the public close
method - which will make the PR more targeted and the code cleaner/clearer.
If we make this change we should revert all the other changes where lambdas have been changed to method calls etc.
I'm guessing this needs to target 5.5.x too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, this change will leave transient query internal topics lying around.
@Test | ||
public void shouldNotCleanUpInternalTopicsOnEngineClose() { | ||
// Given: | ||
final QueryMetadata query = KsqlEngineTestUtil.executeQuery( | ||
serviceContext, | ||
ksqlEngine, | ||
"select * from test1 EMIT CHANGES;", | ||
KSQL_CONFIG, Collections.emptyMap() | ||
); | ||
query.start(); | ||
|
||
// When: | ||
ksqlEngine.close(); | ||
|
||
// Then: | ||
verifyNoMoreInteractions(topicClient); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a transient query, which should have its internal topics deleted when the engine closes. Can we update the functionality and test to reflect this?
Also, can we then add another similar test that shows persistent queries do not have their internal state deleted.
final Optional<PersistentQueryMetadata> query = ksqlEngine.getPersistentQuery(queryId.get()); | ||
query.ifPresent(PersistentQueryMetadata::close); | ||
query.ifPresent(query -> query.close(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line causes compilation failure.
query.ifPresent(query -> query.close(true)); | |
query.ifPresent(q -> q.close(true)); |
When KSQL exits, it calls KsqlEngine.close. This patch changes the engine's close to close all the queries but not clean up internal topics. Internal topics are only cleaned up when queries are explicitly closed with the clean up flag set.
83cee11
to
6997894
Compare
closing in favor of #4658 |
Description
When KSQL exits, it calls KsqlEngine.close. This patch changes the engine's
close to close all the queries but not clean up internal topics. Internal
topics are only cleaned up when queries are explicitly closed with the
clean up flag set.
part of the fix for #4654
Testing done
Reviewer checklist