-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15942: Implement ConsumerInterceptors in the async consumer #15000
Conversation
e9e7ef7
to
db405f1
Compare
@@ -757,6 +757,14 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo | |||
try { | |||
CompletableFuture<Void> future = commit(offsets, false); | |||
future.whenComplete((r, t) -> { | |||
if (t == null && interceptors != null) { | |||
invoker.submit(new OffsetCommitCallbackTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors states:
onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.
Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread. Since interceptor callbacks are called for every record, the interceptor implementation should be careful about adding performance overhead to consumer.
That's why I implemented it using the Invoker.
@philipnee could you take a look please? |
hi @lucasbru - i think we might need to loosen the thread access restriction for the interceptor because i think we also need to trigger interceptors during autocommit. There are 2 ways to achieve this:
#2 is a bit annoying because it against the contract as you mentioned in one of the comment. What do you think? |
@philipnee Good point about auto-commits, I missed that. It's a pity that auto-commits aren't triggered from the main thread (I wonder if we could do that? Would be another architectural change). I would go for solution num 1 because
Wdyt? |
hi @lucasbru - thanks. i personally prefer to make consumer interceptor thread safe because firstly, (i think) it simplifies the code quite a bit and secondly, it is possible to miss the interceptor invocation if user fail to poll/close after a commit is sent. latter is a bigger problem i think. do you have an idea of how to fix this?
|
@philipnee some considerations:
"Make the consumer interceptor thread-safe" -- How would we even do that? Consumer intereceptors are implemented by the user, we cannot make them thread safe by just putting a lock on it. The only way would be to write a KIP, have a breaking change and a migration guide, and ask all users to upgrade their code. Not sure that is worth it, given the original consumer does not seem to provide the guarantee that you are asking for. I think the best we can do is to insert the interceptor callback into the invoker queue upon commit completion, and make sure during close to execute all commit callbacks that are enqueued, no? If the user fails to call close, we are out of luck anyways. |
The javadoc for |
In terms of implementation of the autocommit-interceptor in the application thread, I see three options:
|
My take would be option 2, using the invoker in the background thread and submitting a task for the interceptor. Seems like a clean way, re-using the mechanism of the invoker already in place, and without doing any major refactoring. I would definitely leave the auto-commit logic in the background thread where it is, as it's truly an internal operation/request we want to perform without any direct relation with the app layer, needed from multiple places in the background even: auto-commit on the interval, but also auto-commit as part of the reconciliation process. |
I agree with @lianetm that the second option seems best. The invoker mechanism already exists for |
hey @lucasbru - i assume invoker queue here you meant by my original thought was to use the background event to pass the interceptor onCommit event. i think we are all on the same page about using the invoker to invoke the events. |
@philipnee I meant that the |
I have updated the PR to run interceptors during auto-commits as well. Let me know what you think |
if (callback == null) { | ||
if (t != null) { | ||
log.error("Offset commit with offsets {} failed", offsets, t); | ||
} | ||
return; | ||
} | ||
|
||
invoker.submit(new OffsetCommitCallbackTask(callback, offsets, (Exception) t)); | ||
offsetCommitCallbackInvoker.submitUserCallback(callback, offsets, (Exception) t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored so that OffsetCommitCallbackTask
can be an private class inside the invoker.
* achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the | ||
* future completion, and execute the callbacks when user polls/commits/closes the consumer. | ||
*/ | ||
private class OffsetCommitCallbackInvoker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a separate file since it's going to be shared across threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR looks about ready to merge. Please can you change PlaintextConsumerTest
to run the tests for interceptors with the new consumer. They should pass once this PR is merged.
public void submitCommitInterceptors(final Map<TopicPartition, OffsetAndMetadata> offsets) { | ||
if (!interceptors.isEmpty()) { | ||
callbackQueue.add(new OffsetCommitCallbackTask( | ||
(o, e) -> interceptors.onCommit(o), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the use of single-character variable names here is a little confusing.
|
||
public void submitUserCallback(final OffsetCommitCallback callback, | ||
final Map<TopicPartition, OffsetAndMetadata> offsets, | ||
final Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be more consistent with the styling - exception
instead of e
.
|
||
public OffsetCommitCallbackTask(final OffsetCommitCallback callback, | ||
final Map<TopicPartition, OffsetAndMetadata> offsets, | ||
final Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly to above.
The auto-commit one still fails, I debugged the problem and create a separate ticket (solution is clear, but we want to have this as a separate PR for potential backporting). The others are enabled. |
@cadonna Can you please review this? |
@cadonna The autoCommitIntercept test is flaky after this PR, but it's unrelated to this PR. I will create a separate ticket and disable it agian once your comments come in. |
@@ -1902,65 +1912,14 @@ private void maybeThrowFencedInstanceException() { | |||
} | |||
|
|||
private void maybeInvokeCommitCallbacks() { | |||
if (callbacks() > 0) { | |||
invoker.executeCallbacks(); | |||
if (offsetCommitCallbackInvoker.executeCallbacks()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the naming appears a bit misleading for me - if executeCallback() return true, it almost means the callbacks were executed correctly. I wonder if we could restructure the code like
invoker.executeCallbacks()
isFenced = invoker.hasFencedException()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done. I could actually simplify the code by moving isFenced
inside the invoker.
@@ -93,9 +94,11 @@ public CommitRequestManager( | |||
final SubscriptionState subscriptions, | |||
final ConsumerConfig config, | |||
final CoordinatorRequestManager coordinatorRequestManager, | |||
final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking back at the implementation - now i think it is rather unnecessary to have these finals in the parameters. I wonder if you think we should clean them up in the future...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very mixed across the clients codebase. Sometimes you put the final sometimes you don't. In the streams module there is a strict rule to do. Not sure, but as long as there is no guideline around this, and we are not completely repulsed by it, I'd suggest to just stick with whatever the existing code is doing for consistency to not mess up git blame too much.
final String groupId, | ||
final Optional<String> groupInstanceId) { | ||
this(time, logContext, subscriptions, config, coordinatorRequestManager, groupId, | ||
this(time, logContext, subscriptions, config, coordinatorRequestManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if it would be more aesthetic to split each of them into its own line.... now it is spanning 3 lines with different widths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* @return true if an offset commit was fenced. | ||
*/ | ||
public boolean executeCallbacks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -844,6 +849,54 @@ public void testWakeupCommitted() { | |||
assertNull(consumer.wakeupTrigger().getPendingTask()); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the interceptor behavior on close? if we have inflight commits before closing the consumer, should the interceptors be invoked? can we add tests around that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test. Interceptors are called.
@@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { | |||
assertPoll(1, commitRequestManger); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the comment above - we autocommit on close - should the interceptor be triggered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they should. Looks to me like the legacy consumer called the interceptors when closing the consumer coordinator (ConsumerCoordinator.maybeAutoCommitOffsetsSync
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they should. This is tested in the integration test PlaintextConsumer.testAutoCommitIntercept
- which works in terms of interceptors, but I have to keep disabled in this PR because of KAFKA-16155. This PR does call the interceptors after closing the network thread (I pinged you about it above). I can add a little unit test to AsyncKafkaConsumerTest
. I don't think we can add a unit test for it in CommitRequestManagerTest
, because the autocommit on close is triggered from the application thread, so in this class it does look very much like any normal commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the changes @lucasbru - I left some comments.
@@ -1336,8 +1336,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { | |||
MockProducerInterceptor.resetCounters() | |||
} | |||
|
|||
// This is disabled for the the consumer group until KAFKA-16155 is resolved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for reporting this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @lucasbru !
Here my feedback.
// Thread-safe queue to store user-defined callbacks and interceptors to be executed | ||
private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = new LinkedBlockingQueue<>(); | ||
|
||
public void submitCommitInterceptors(final Map<TopicPartition, OffsetAndMetadata> offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
I find the name of this method quite confusing. It says that it submits a commit interceptor, but actually it submits a call to the commit interceptor.
Similar applies to submitUserCallback()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
(innerOffsets, exception) -> interceptors.onCommit(innerOffsets), | ||
offsets, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
That is also a bit confusing. What are innerOffsets
? I think it would be better to rename innerOffsets
to offsetsParam
. Additionally, offsets
could be renamed to actualOffsets
but that is actually not needed since after the renaming of innerOffsets
it should be quite clear what offsets
represents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
hasFencedException = true; | ||
|
||
task.callback.onComplete(task.offsets, task.exception); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (task.exception instanceof FencedInstanceIdException) | ||
hasFencedException = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not completely correct.
The javadocs for commitAsync()
(w/o callback) say:
@throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
If no callback is passed into commitAsync()
, no offset commit callback invocation is submitted. However, we only check for a FencedInstanceIdException
when we execute a callback. It seems to me that with commitAsync()
we would not throw at all when the consumer gets fenced.
In any case, we need a unit test that verifies that the FencedInstanceIdException
is thrown for each version of commitAsync()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, but this is not related to interceptors but a separate commit bug. I opened KAFKA-16169.
@@ -271,6 +283,7 @@ public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( | |||
return (response, throwable) -> { | |||
autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); | |||
if (throwable == null) { | |||
offsetCommitCallbackInvoker.submitCommitInterceptors(allConsumedOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this only called in the case of an auto commit? Or is the name of this method misleading and this method is called also in case of async commits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not only called in the case of an auto commit, but also in commitAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but then the name of the method should be changed to better reflect what it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You asked why is "this" only called in the case of the auto commit. I think you were referring to submitCommitInterceptors. It is not only called in the case of auto-commit, but also in commitAsync.
Then you mentioned that the name of "this method" is misleading. I think you are referring to maybeAutocommitAllConsumedNow. I don't think it is, because it is only called in an auto-commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see it now that submitCommitInterceptors()
(now enqueueInterceptorInvocation
) is also called in commitAsync()
. I do not know why I haven't seen this before.
public void setup() { | ||
offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(consumerInterceptors); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there some edge cases missing like empty interceptors and empty user callbacks. Also only interceptors is also not tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna Thanks for the review. I addressed the comments that I believe should be addressed in this PR.
@@ -271,6 +283,7 @@ public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( | |||
return (response, throwable) -> { | |||
autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); | |||
if (throwable == null) { | |||
offsetCommitCallbackInvoker.submitCommitInterceptors(allConsumedOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not only called in the case of an auto commit, but also in commitAsync
.
// Thread-safe queue to store user-defined callbacks and interceptors to be executed | ||
private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = new LinkedBlockingQueue<>(); | ||
|
||
public void submitCommitInterceptors(final Map<TopicPartition, OffsetAndMetadata> offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
(innerOffsets, exception) -> interceptors.onCommit(innerOffsets), | ||
offsets, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (task.exception instanceof FencedInstanceIdException) | ||
hasFencedException = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, but this is not related to interceptors but a separate commit bug. I opened KAFKA-16169.
hasFencedException = true; | ||
|
||
task.callback.onComplete(task.offsets, task.exception); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public void setup() { | ||
offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(consumerInterceptors); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lucasbru !
I have one comment.
} | ||
|
||
@Test | ||
public void testNoInterceptorCommitAsyncFailed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we do not have the corresponding test for commitSync()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update and the patience, @lucasbru !
LGTM!
Test failures are unrelated |
…ache#15000) We need to make sure to call the consumer interceptor and test its integration. This is adding the required call in commitSync and commitAsync. The calls in commitAsync are executed using the same mechanism as commit callbacks, to ensure that we are calling the interceptors from a single thread, as is intended in the original KIP. The interceptors also need to be invoked on auto-commits which are executed in the commit request manager. For this purpose, we share the OffsetCommitCallbackInvoker class with the background thread (it is already accessed implicitly from the background thread through a future lambda). This is done analogous to the RebalanceListenerInvoker. Co-authored-by: John Doe [email protected] Reviewers: Bruno Cadonna <[email protected]>, Andrew Schofield <[email protected]>, Philip Nee <[email protected]>
…ache#15000) We need to make sure to call the consumer interceptor and test its integration. This is adding the required call in commitSync and commitAsync. The calls in commitAsync are executed using the same mechanism as commit callbacks, to ensure that we are calling the interceptors from a single thread, as is intended in the original KIP. The interceptors also need to be invoked on auto-commits which are executed in the commit request manager. For this purpose, we share the OffsetCommitCallbackInvoker class with the background thread (it is already accessed implicitly from the background thread through a future lambda). This is done analogous to the RebalanceListenerInvoker. Co-authored-by: John Doe [email protected] Reviewers: Bruno Cadonna <[email protected]>, Andrew Schofield <[email protected]>, Philip Nee <[email protected]>
…ache#15000) We need to make sure to call the consumer interceptor and test its integration. This is adding the required call in commitSync and commitAsync. The calls in commitAsync are executed using the same mechanism as commit callbacks, to ensure that we are calling the interceptors from a single thread, as is intended in the original KIP. The interceptors also need to be invoked on auto-commits which are executed in the commit request manager. For this purpose, we share the OffsetCommitCallbackInvoker class with the background thread (it is already accessed implicitly from the background thread through a future lambda). This is done analogous to the RebalanceListenerInvoker. Co-authored-by: John Doe [email protected] Reviewers: Bruno Cadonna <[email protected]>, Andrew Schofield <[email protected]>, Philip Nee <[email protected]>
We need to make sure to call the consumer interceptor and test its integration.
This is adding the required call in
commitSync
andcommitAsync
. The calls incommitAsync
are executed using the same mechanism as commit callbacks, to ensure that we are calling the interceptors from a single thread, as is intended in the original KIP.The interceptors also need to be invoked on auto-commits which are executed in the commit request manager. For this purpose, we share the OffsetCommitCallbackInvoker class with the background thread (it is already accessed implicitly from the background thread through a future lambda). This is done analogous to the
RebalanceListenerInvoker
.Co-authored-by: John Doe [email protected]
Committer Checklist (excluded from commit message)