Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7477: Improve Streams close timeout semantics #5747

Merged
merged 13 commits into from
Oct 9, 2018

Conversation

nizhikov
Copy link
Collaborator

@nizhikov nizhikov commented Oct 5, 2018

Second part of KIP-358.

This changes based on previous PR discussion.

Default close timeout is 30 seconds.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@nizhikov nizhikov changed the title KAFKA-7477 KAFKA-7477: Improve Streams close timeout semantics Oct 5, 2018
@nizhikov
Copy link
Collaborator Author

nizhikov commented Oct 5, 2018

Hello, @vvcephei.

Can you take a look?

@nizhikov
Copy link
Collaborator Author

nizhikov commented Oct 5, 2018

Tests passed.

@mjsax Can you take a look?

@mjsax mjsax added the streams label Oct 5, 2018
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. We need to preserve existing semantics. I guess, the simplest way will be, to introduce a private synchronized boolean close(final long timeout, final TimeUnit timeUnit, boolean newSemantics) and call it with true/false from new and old close(). The newSemantics flag can be passed into waitOnState() (or we duplicate the old one as waitOnStateBlocking that we only call for newSemantics==false?

If there is another more elegant way, we can do it differently, too. Just an idea.

@@ -126,7 +126,7 @@
public class KafkaStreams {

private static final String JMX_PREFIX = "kafka.streams";
private static final int DEFAULT_CLOSE_TIMEOUT = 0;
private static final int DEFAULT_CLOSE_TIMEOUT = 30;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

@mjsax
Copy link
Member

mjsax commented Oct 5, 2018

We should also add a test KafkaStreamsTest to test that the new method throws for negative values and does not block for zero (ie, please add two tests)

@nizhikov
Copy link
Collaborator Author

nizhikov commented Oct 7, 2018

Hello, @mjsax

Looks like a didn't understand initial task properly :)
Thank you for the clarification!

  1. waitOnState reworked.
  2. Tests added.
  3. Current methods behaviour preserved.

Please, take a look.

@nizhikov
Copy link
Collaborator Author

nizhikov commented Oct 7, 2018

Failure unrelated.

Retest this, please.

final long begin = time.milliseconds();
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != targetState) {
if (waitMs == 0) {
if (newSemantics)
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: every conditional needs to have braces (per the code style)

not nit: I find this logic a little difficult to follow. Contrary to what @mjsax suggested, wouldn't it be pretty straightforward to map the old semantics on to the new ones like this:

  • negative numbers => 0
  • 0 => Long.MAX_VALUE
  • all other arguments stay the same
    ?

Then, the old close method could just transform its arguments and call the new method, with no need to have this "new semantics" flag and an early return in the middle of the loop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei I like your proposal. Thanks!
Changed PR according to it.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update; this seems easier to follow to me. I had one more question...

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR. Some more comments.

@nizhikov
Copy link
Collaborator Author

nizhikov commented Oct 8, 2018

@mjsax @vvcephei

Thanks for the review.
I addressed all your comments.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Waiting for second +1 before merging.

@nizhikov
Copy link
Collaborator Author

nizhikov commented Oct 8, 2018

Hello @guozhangwang @bbejeck

I've got +1 from @mjsax and need one more :)
Can you take a look at my PR?

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@guozhangwang
Copy link
Contributor

LGTM!

@mjsax mjsax merged commit 6d16879 into apache:trunk Oct 9, 2018
mjsax pushed a commit that referenced this pull request Oct 9, 2018
@mjsax
Copy link
Member

mjsax commented Oct 9, 2018

Merged to trunk and cherry-picked to 2.1 branch.

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants