-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaConsumer should continue to poll while waiting for buffer #4023
KafkaConsumer should continue to poll while waiting for buffer #4023
Conversation
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
@@ -424,16 +426,25 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re | |||
bufferAccumulator.add(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens to this record that was caught in exception ? Is this lost ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, the consumer retries in while loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The record is put in buffer eventually because we are in an infinite loop here.
@@ -424,16 +426,25 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re | |||
bufferAccumulator.add(record); | |||
break; | |||
} catch (Exception e) { | |||
if (!paused) { | |||
paused = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should kafka consumer be paused for all exception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all cases of exception, we are trying for ever to put in the buffer, right? that's why we are pausing in all cases of exceptions.
@@ -424,16 +426,25 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re | |||
bufferAccumulator.add(record); | |||
break; | |||
} catch (Exception e) { | |||
if (!paused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to pause only when we are approaching max.poll.ms
limit, which we can also increase to 10 mins or more. I have seen this exception many times in scale testing, but buffer usually flushes after few retries and poll timeout does not expire. Pause/Resume may have some performance impact, like I read it flushes queued messages and stops fetching further records from broker, which is really not necessary if it's a momentary blip.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can pause only after few retries of getting the needed buffer and that should help in doing all this in a momentary blip case.
if (e instanceof SizeOverflowException) { | ||
topicMetrics.getNumberOfBufferSizeOverflows().increment(); | ||
} else { | ||
LOG.debug("Error while adding record to buffer, retrying ", e); | ||
} | ||
try { | ||
Thread.sleep(100); | ||
consumeRecords(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of conumeRecords() should we call consumer.poll() here directly ? and if it returns records then we assert and may be restart consumer because poll returning records means there is a bug somewhere. Calling consumeRecords() will re-enter this function if poll() returns messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair. I can just do poll() and assert on records returned.
} catch (Exception ex) {} // ignore the exception because it only means the thread slept for shorter time | ||
} | ||
} | ||
if (paused) { | ||
consumer.resume(consumer.assignment()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there may be an issue here. We may have to call resume on exactly the same partitions that we paused. e.g. lets say consumer initially had 4 partitions that were paused. While paused, 2 got revoked and after reassignment we resumed only 2 assigned partitions. Now same 2 partitions are assigned back, will they remain paused? because we never called resume on them. It'll be good to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the partitions have moved out of the consumer, I do not think they will be in the paused state. I do not think we can resume the partitions that are currently not owned by a consumer. But will try to test it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tested this scenario. When the partitions are assigned back, they are not paused. I think once the partitions are revoked, the partitions are removed from the consumer's assignment. When they are assigned back, they are assigned as any other new partition. There is no stale state from previous assignment.
@@ -520,6 +534,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | |||
ownedPartitionsEpoch.remove(topicPartition); | |||
partitionCommitTrackerMap.remove(topicPartition.partition()); | |||
} | |||
if (paused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really required in partition revocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not. I thought about it before adding this code. I do not see any down side of doing it. Let me know if you can think of any side effects of this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few test scenarios to cover:
Create a topic with 4 partitions.
Have a producer send messages to the topic at a constant rate (1k/sec).
Test 1: Run one consumer, pause all partitions, bring up another consumer that'll take 2 partitions. data should be consumed by new consumer from 2 partitions. Shutdown second consumer partitions should get assigned back to paused consumer and remain paused.
Test2: Pause all partitions, bring up another consumer that'll take 2 partitions. Unpause first consumer. All partitions should be read by 2 consumers. shutdown second consumer, first consumer will get all partitions and should consume from all partiions after reassignment.
Test3: Run two consumer reading from 2 partiions each. Pause first consumer. scale up partiions by added 4 more partiions. Paused consumer should pause new partions also. Shutdown and bring up second consumer couple of time. Unpause first consumer all partitions should be read.
Tested case 1. And it works as expected. |
Tested test case 2. And it worked as expected. Question about 3- What do you mean by "scale up partitions by increasing the number of partitions"? AFAIK, kafka partitions cannot be increased after creating the topic. |
Signed-off-by: Krishna Kondaka <[email protected]>
if (paused) { | ||
ConsumerRecords<String, T> records = doPoll(); | ||
if (records.count() > 0) { | ||
LOG.debug("Unexpected records received while the consumer is paused. Resetting the paritions to retry from last read pointer"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to make it info to show up in logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like a WARN level to me, but you probably have more context.
if (paused) { | ||
ConsumerRecords<String, T> records = doPoll(); | ||
if (records.count() > 0) { | ||
LOG.debug("Unexpected records received while the consumer is paused. Resetting the paritions to retry from last read pointer"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like a WARN level to me, but you probably have more context.
@@ -411,29 +418,51 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in | |||
return new Record<Event>(event); | |||
} | |||
|
|||
private void processRecord(final AcknowledgementSet acknowledgementSet, final Record<Event> record) { | |||
private <T> void processRecord(final AcknowledgementSet acknowledgementSet, final Record<Event> record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you change the code below to use ?
(see my other comment), this template T
becomes unnecessary and you can remove it.
Thread.sleep(100); | ||
Thread.sleep(retrySleepTimeMs); | ||
if (paused) { | ||
ConsumerRecords<String, T> records = doPoll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code does not really do anything with the value of T
, so you can have the following:
ConsumerRecords<String, ?> records = doPoll();
long numRetries = 0; | ||
final int retrySleepTimeMs = 100; | ||
// Donot pause until half the poll interval time has expired | ||
final long maxRetries = topicConfig.getMaxPollInterval().toMillis() / (2 * retrySleepTimeMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two variables (maxRetries
and retrySleepMs
) can be moved elsewhere in the class to clarify they are not so dynamic.
Perhaps make maxRetries
a field, give it a clearer name (maxRetriesOnException
) and set it in the constructor:
maxRetriesOnException = topicConfig.getMaxPollInterval().toMillis() / (2 * retrySleepTimeMs);
You can make a static field for the other
private static final int RETRY_ON_EXCEPTION_SLEEP_MS = 100
while (true) { | ||
try { | ||
bufferAccumulator.add(record); | ||
break; | ||
} catch (Exception e) { | ||
if (!paused && numRetries++ > maxRetries) { | ||
paused = true; | ||
consumer.pause(consumer.assignment()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very critical logic. We should have a unit test to verify that we call pause()
on these conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new test case to test pause/resume
Signed-off-by: Krishna Kondaka <[email protected]>
Description
KafkaConsumer should continue to poll while waiting for buffer.
KafkaConsumer waits for buffer to become available in a busy loop if it fails to get a buffer. The consumer thread may be in this busy loop for long time and this can result in kafka server not getting any heartbeats from the consumer.
To avoid this, the consumer must keep calling
poll()
while waiting for the buffer to be available. But sincepoll()
fetches new records from the server, consumer should dopause()
before doingpoll()
until buffer becomes available and doresume()
once its get the buffer and resumes normal processing.Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.