Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix Deadlock in StreamThread #2791

Closed

Conversation

original-brownbear
Copy link
Member

@original-brownbear original-brownbear commented Apr 2, 2017

I think this may be the (or on of them) reason we see Jenkins jobs time out at times. At least I can reproduce this to cause tests to time out with a certain rate.

With current trunk there is a possibility to run into this:

"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
	- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
	at java.lang.Thread.run(Thread.java:745)

"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
	- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
	at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
	- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
	at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)

In a nutshell: KafkaStreams and StreamThread are both waiting for each other since another intermittend close (eg. from a test) comes along also trying to lock on KafkaStreams :

"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1249)
	- locked <0x000000077d45a590> (a java.lang.Thread)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
	- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
	at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:71)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

=> causing a deadlock.

Fixed this by softer locking on the state change, that guarantees atomic changes to the state but does not lock on the whole object (I at least could not find another method that would require more than atomicly-locked access except for setState).
Also qualified the state listeners with their outer-class to make the whole code-flow around this more readable (having two interfaces with the same naming for interface and method and then using them between their two outer classes is crazy hard to get imo :)).

Easy to reproduced yourself by running org.apache.kafka.streams.KafkaStreamsTest in a loop for a bit (save yourself some time by running 2-4 in parallel :)). Eventually it will lock on one of the tests (for me this takes less than 1 min with 4 parallel runs).

@asfbot
Copy link

asfbot commented Apr 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2625/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2621/
Test FAILed (JDK 7 and Scala 2.10).

@original-brownbear
Copy link
Member Author

@hachikuji @ijuma fyi :) We def. have an issue here in my opinion (bottom of description has a straightforward reproducer), let me know what you think about the solution :)

@ijuma
Copy link
Member

ijuma commented Apr 2, 2017

Thanks for the PR, cc @dguy @guozhangwang

@asfbot
Copy link

asfbot commented Apr 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2621/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice finding @original-brownbear ! Just some code style nit pick. Overall LGTM.

if (stateListener != null) {
stateListener.onChange(state, oldState);
private void setState(final State newState) {
synchronized (this.stateLock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this

synchronized (this.stateLock) {
final State oldState = state;
if (!state.isValidTransition(newState)) {
log.warn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: fix line break

@@ -463,7 +463,7 @@ private RuntimeException performOnAllTasks(final AbstractTaskAction action,
action.apply(task);
} catch (RuntimeException t) {
log.error("{} Failed while executing {} {} due to {}: ",
StreamThread.this.logPrefix,
this.logPrefix,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: also remove this

@original-brownbear
Copy link
Member Author

@mjsax thanks for taking a look! Fixed the codestyle issues :)

@asfbot
Copy link

asfbot commented Apr 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2628/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2624/
Test PASSed (JDK 7 and Scala 2.10).

@ijuma
Copy link
Member

ijuma commented Apr 2, 2017

Since @guozhangwang is away on holiday and this is an important fix (since it fixes one cause of hung Jenkins builds), I'll review and merge it after we get a second opinion from someone familiar with the Streams code, cc @enothereska @dguy.

@asfbot
Copy link

asfbot commented Apr 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2624/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@enothereska
Copy link
Contributor

LGTM thanks.

@@ -248,7 +252,7 @@ public synchronized State state() {
return Collections.unmodifiableMap(metrics.metrics());
}

private class StreamStateListener implements StreamThread.StateListener {
private final class StreamStateListener implements StreamThread.StateListener {
@Override
public synchronized void onChange(final StreamThread thread,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy @enothereska This synchronized here seems suspicious. Is it really the aim to synchronize on the listener instance when updating variables like threadState? Seems like a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there should only be a single StreamStateListener and threadState should be a member.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dguy. That makes sense. @original-brownbear, maybe you can do a follow-up that does that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma put it on my todos, will get to this in about ~12h :)

Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM.

@asfgit asfgit closed this in 3364f12 Apr 3, 2017
@ijuma
Copy link
Member

ijuma commented Apr 3, 2017

One more thing: please keep in mind that the PR description becomes the commit message. It's good to aim for a clear and concise description of the issue.

@original-brownbear original-brownbear deleted the fix-streams-deadlock branch April 3, 2017 18:56
@original-brownbear
Copy link
Member Author

@ijuma got it, will be kept in mind for the next PR :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants