Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15974: Enforce that event processing respects user-provided timeout #15640

Merged
merged 160 commits into from
May 22, 2024

Conversation

kirktrue
Copy link
Collaborator

@kirktrue kirktrue commented Apr 1, 2024

The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values.

The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException.

On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases.

The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause.

Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change.

This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@kirktrue
Copy link
Collaborator Author

Hey @cadonna, the tricky bit is that, for some events, the request managers do expire requests too, so in this flow you described:

The event is processed in the ApplicationEventHandler and a request is added to the commit request manager. Then the commit request manager is polled, the requests are added to the network client and the the network client is polled

When the manager is polled, if the event had timeout 0, it will be expired/cancelled before making it to the network thread. Currently we have 2 managers that do this (that I can remember): TopicMetadataManager and CommitRequestManager. So for those events, even with this PR, if they have timeout 0, they won't have a chance to complete.

My point is not to bring more changes into this PR, only to have the whole situation in mind so we can address it properly (with multiple PRs). This other PR attempts to address this situation I described, but only in the CommitRequestManager for instance. We still have to align on the approach there, and also handle it in the TopicMetadataManager I would say. I would expect that a combination of this PR and those others would allow us to get to a better point (now, even with this PR, we cannot make basic progress with a consumer being continuously polled with timeout 0 because FetchCommittedOffsets is always expired by the manager, for instance). I can easily repro it with the following integration test + poll(ZERO) (that I was surprised we have not covered, because TestUtils always polls with a non-zero timeout)

  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.
  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, groupProtocol: String): Unit = {
    val numMessages = 100
    val producer = createProducer()
    sendRecords(producer, numMessages, tp)

    val consumer = createConsumer()
    consumer.subscribe(Set(topic).asJava)
    val records = awaitNonEmptyRecords(consumer, tp)
    assertEquals(numMessages, records.count())
  }

Makes sense?

Yes, the network layer changes are captured in KAFKA-16200 and build on top of this PR.

@kirktrue
Copy link
Collaborator Author

@lianetm @cadonna—I believe I have addressed all the actionable feedback. Are there additional concerns about this PR that prevent it from being merged? Thanks.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @kirktrue !

As far as I understand from the discussion, @lianetm and you have a plan for the follow-up PRs. So this PR can be merged.

I just have two minor questions about the tests. You know best if you want to consider my comments.

@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
}

@Test
void testEnsureEventsAreCompleted() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this test without replacement?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reinstated.

Copy link
Member

@lianetm lianetm May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually seems to me that we shouldn't have this test here (and maybe this is why @kirktrue removed it before?). As I see it, this unit test is testing something that is not the ConsumerNetworkThread's responsibility (and that's why it ends up being complicated, having to mimic the reaper behaviour and spying). It is testing that events are completed, and that's the reaper.reap responsibility, so seems to me we need to:

  1. test that the ConsumerNetworkThread calls the reaper with the full list of events -> done already in the testCleanupInvokesReaper
  2. test that the CompletableEventReaper.reap(Collection<?> events) completes the events -> done in CompletableEventReaperTest (testIncompleteQueue and testIncompleteTracked)

In the end, as it is, we end up asserting a behaviour we're mocking ourselves in the doAnswer, so not much value I would say? Agree with @cadonna that we need coverage, but I would say that we have it, on my points 1 and 2, and this should be removed. Makes sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the test was a little suspect in terms of its value-add, so I'd removed it.

I was planning to file a Jira to move several of the tests (including this one) from ConsumerNetworkThreadTest to ApplicationEventProcessorTest. Then we could fix up some of the funkiness in this test as a separate task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is all fine! I was not arguing that we need to keep the test, but if I see a test removed without replacement, I suspect a mistake. Which did apparently not happen in this case. Next time comment on the PR why you removed the test.

consumer = newConsumer();
completeUnsubscribeApplicationEventSuccessfully();
consumer.unsubscribe();
verify(backgroundEventReaper).reap(any(Long.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You control the time here. Why do you not verify that reap() is called with the correct time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Done!

@Test
void testRunOnceInvokesReaper() {
consumerNetworkThread.runOnce();
verify(applicationEventReaper).reap(any(Long.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You control the time here. Why do you not verify that reap() is called with the correct time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And done here, too.

Copy link
Member

@cadonna cadonna May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still have the change locally, because here it does still not verify the correct time?

@cadonna
Copy link
Member

cadonna commented May 21, 2024

Hey @cadonna, the tricky bit is that, for some events, the request managers do expire requests too, so in this flow you described:

The event is processed in the ApplicationEventHandler and a request is added to the commit request manager. Then the commit request manager is polled, the requests are added to the network client and the the network client is polled

When the manager is polled, if the event had timeout 0, it will be expired/cancelled before making it to the network thread. Currently we have 2 managers that do this (that I can remember): TopicMetadataManager and CommitRequestManager. So for those events, even with this PR, if they have timeout 0, they won't have a chance to complete.

My point is not to bring more changes into this PR, only to have the whole situation in mind so we can address it properly (with multiple PRs). This other PR attempts to address this situation I described, but only in the CommitRequestManager for instance. We still have to align on the approach there, and also handle it in the TopicMetadataManager I would say. I would expect that a combination of this PR and those others would allow us to get to a better point (now, even with this PR, we cannot make basic progress with a consumer being continuously polled with timeout 0 because FetchCommittedOffsets is always expired by the manager, for instance). I can easily repro it with the following integration test + poll(ZERO) (that I was surprised we have not covered, because TestUtils always polls with a non-zero timeout)

  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.
  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, groupProtocol: String): Unit = {
    val numMessages = 100
    val producer = createProducer()
    sendRecords(producer, numMessages, tp)

    val consumer = createConsumer()
    consumer.subscribe(Set(topic).asJava)
    val records = awaitNonEmptyRecords(consumer, tp)
    assertEquals(numMessages, records.count())
  }

Makes sense?

@lianetm Thanks for the explanation!

Comment on lines 338 to 347
// Close the consumer here as we know it will cause a FencedInstanceIdException to be thrown.
// If we get an error other than the FencedInstanceIdException, we'll raise a ruckus.
try {
consumer.close();
} catch (KafkaException e) {
assertNotNull(e.getCause());
assertInstanceOf(FencedInstanceIdException.class, e.getCause());
} finally {
consumer = null;
}
Copy link
Member

@lianetm lianetm May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect the close to throw? If so, we should verify that (at the moment our test will just complete successfully if the close does not throw). If that's the expectation, maybe this simpler snippet would cover it all:

Suggested change
// Close the consumer here as we know it will cause a FencedInstanceIdException to be thrown.
// If we get an error other than the FencedInstanceIdException, we'll raise a ruckus.
try {
consumer.close();
} catch (KafkaException e) {
assertNotNull(e.getCause());
assertInstanceOf(FencedInstanceIdException.class, e.getCause());
} finally {
consumer = null;
}
Throwable e = assertThrows(KafkaException.class, () -> consumer.close());
assertInstanceOf(FencedInstanceIdException.class, e.getCause());
consumer = null;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did we resolve this? I see the section got completely removed, verification not needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it turns out that changes made elsewhere have obviated the need for this check.

final Timer timer) {
if (!shouldAutoCommit)
return;
void maybeAutoCommitSync(final Timer timer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a "maybe" anymore, so what about autoCommitSyncAllConsumed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to just autoCommitSync(). Is that OK?

// First, complete (exceptionally) any events that have passed their deadline AND aren't already complete.
tracked.stream()
.filter(e -> !e.future().isDone())
.filter(e -> currentTimeMs > e.deadlineMs())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want >= here when identifying expired events? I would expect so (that's the semantic applied in the Timer class isExpired for instance)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting point 🤔

If a user provides a timeout of 1000 milliseconds, is it expired at 1000 milliseconds or at 1001 milliseconds?

Regardless, I will change it to >= to be consistent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* could occur when processing the events. In such cases, the processor will take a reference to the first
* error, continue to process the remaining events, and then throw the first error that occurred.
*/
private boolean processBackgroundEvents(EventProcessor<BackgroundEvent> processor) {
Copy link
Member

@lianetm lianetm May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This processor passed as argument is in the end always a reference to the backgroundEventProcessor, so could we simplify this, remove the arg and directly reference the var? It caught my attention when seeing how this is used, which seems a bit redundant with all calls having to provide the same processBackgroundEvents(backgroundEventProcessor, ... which feels like an internal that the processBackgroundEvents could know about.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a unit test that passes in a mocked event processor. Let me look at refactoring this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. That's much better 😄

@kirktrue
Copy link
Collaborator Author

@lianetm @cadonna—The latest batch of feedback has been addressed. Thanks!

@kirktrue kirktrue requested review from lianetm and cadonna May 21, 2024 18:04
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @kirktrue !

Once @lianetm approves, I will merge the PR,

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience and great effort here @kirktrue, LGTM to merge and move on with the follow ups. Just to recap, this is what I see should be address next related to timeout enforcement:

  1. https://issues.apache.org/jira/browse/KAFKA-16637
  2. https://issues.apache.org/jira/browse/KAFKA-16200
  3. https://issues.apache.org/jira/browse/KAFKA-16792

Also please let's have a jira to address this comment to remove the test we agreed brings no value.

Thanks again!
cc. @cadonna

@cadonna cadonna merged commit a98c9be into apache:trunk May 22, 2024
1 check failed
@kirktrue
Copy link
Collaborator Author

I added KAFKA-16818 to cover the cases to refactor/migrate/remove tests.

Thanks @cadonna & @lianetm for your reviews!

@kirktrue kirktrue deleted the KAFKA-15974-enforce-timeout-in-events branch May 22, 2024 16:54
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request May 24, 2024
…eout (apache#15640)

The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values.

The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException.

On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases.

The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause.

Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change.

This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events.

Reviewers: Lianet Magrans <[email protected]>, Philip Nee <[email protected]>, Bruno Cadonna <[email protected]>
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Jun 1, 2024
…eout (apache#15640)

The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values.

The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException.

On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases.

The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause.

Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change.

This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events.

Reviewers: Lianet Magrans <[email protected]>, Philip Nee <[email protected]>, Bruno Cadonna <[email protected]>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
…eout (apache#15640)

The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values.

The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException.

On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases.

The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause.

Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change.

This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events.

Reviewers: Lianet Magrans <[email protected]>, Philip Nee <[email protected]>, Bruno Cadonna <[email protected]>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
…eout (apache#15640)

The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values.

The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException.

On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases.

The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause.

Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change.

This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events.

Reviewers: Lianet Magrans <[email protected]>, Philip Nee <[email protected]>, Bruno Cadonna <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants