-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4228 - make producer close on sender thread death, make consumer shutdown on failure to rebalance, and make MM die on any of the above. #1930
Conversation
@@ -434,6 +434,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { | |||
records.foreach(producer.send) | |||
maybeFlushAndCommitOffsets() | |||
} | |||
if (!mirrorMakerConsumer.hasData && !shuttingDown) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to add check !exitingOnSendFailure
as well? Otherwise we may throw IllegalStateException when there is a send failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a send failure closes the producer, and not the consumer, meaning mirrorMakerConsumer.hasData will return true and this wont be called. its possible for there to be both a send failure and consumer death at the same time, in which case which one kills the mm is semantics :-). will update.
efca6e8
to
3b748d8
Compare
@Override | ||
public void uncaughtException(Thread t, Throwable e) { | ||
try { | ||
log.error("thread " + t.getName() + " died to uncaught exception", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capitalize 't'
Also, died due to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
run(time.milliseconds()); | ||
} catch (Exception e) { | ||
log.error("Uncaught error in kafka producer I/O thread: ", e); | ||
boolean gracefullShutdown = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
graceful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -588,7 +588,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
if (doRebalance) | |||
syncedRebalance | |||
} catch { | |||
case t: Throwable => error("error during syncedRebalance", t) | |||
case t: Throwable => { | |||
error("error during syncedRebalance", t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
case t: Throwable => error("error during syncedRebalance", t) | ||
case t: Throwable => { | ||
error("error during syncedRebalance", t) | ||
shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a somewhat significant change in the old consumer. Previously the consumer would actually stay alive after a series of rebalance failures and could succeed on a successful subsequent rebalance trigger. So for e.g., with this change one consumer that has high latency issues with zookeeper and delay in releasing partition ownership could cause all the other (normal) consumers to shutdown.
Even in the new consumer a consumer that is problematic within the group is not shutdown. It is marked as failed but it can definitely rejoin the group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is youre not guaranteed when the next rebalance will happen, if at all.
some here's a non-hypothetical scenario - a consumer has failed to rebalance, but remains "alive" (its still registered in ZK). the rest of the consumers succeed to rebalance and leave a chunk of partitions to the failed consumer (because its still alive on ZK).
and you end up with orphaned partitions for an unbounded amount of time.
the choices I can see are:
- suicide if you cannot rebalance. your death will trigger a further rebalance and all partitions should be distributed among survivors.
- trigger another rebalance. the issue is that you got the exception because the user defined a fixed max number of attempts. you'd be violating this setting, as this si effectively indefinite retries.
- at least release all partitions and unregister yourself from ZK. this is almost like Switch to using scala 2.9.2 #1 only you remain in some interesting zombie state?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it is a weird state to be in and shutting down is not unreasonable. However, this is a major change in behavior especially given the decentralized coordination in the old consumer. The main issue that I see is that one misbehaving client that is slow to release partitions can cause the good clients to shutdown.
Specifically at LinkedIn, one common occurrence of rebalance failures is when a developer starts up a consumer on his development machine that joins a group in a staging environment (i.e., in a remote DC) - sure developers should use a different group but for various reasons this happens. This actually causes rebalance failures in the staging instances. If those instances were to shutdown the staging services would need to be bounced (and some of them may even die depending on how they handle the consumer's lifecycle).
So it seems at the very least we should make such a change configurable and default to the old behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
382e567
to
459ad81
Compare
@@ -588,7 +588,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | |||
if (doRebalance) | |||
syncedRebalance | |||
} catch { | |||
case t: Throwable => error("error during syncedRebalance", t) | |||
case t: Throwable => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: you don't need braces in case
blocks in Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
…r shutdown on failure to rebalance, and make MM die on any of the above. Signed-off-by: radai-rosenblatt <[email protected]>
459ad81
to
3b8ca4c
Compare
the JIRA issue (https://issues.apache.org/jira/browse/KAFKA-4228) details a cascade of failures that resulted in an entire mirror maker cluster stalling due to an OOM death on one mm instance.
this patch makes producers and consumers close themselves on the errors encountered, and mm to shut down if anything happens to producers or consumers.
Signed-off-by: radai-rosenblatt [email protected]