Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15942: Implement ConsumerInterceptors in the async consumer #15000

Merged
merged 18 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private boolean isFenced = false;
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;

// currentThread holds the threadId of the current thread accessing the AsyncKafkaConsumer
Expand Down Expand Up @@ -789,7 +788,6 @@ CompletableFuture<Void> commit(final Map<TopicPartition, OffsetAndMetadata> offs
final boolean isWakeupable,
final Optional<Long> retryTimeoutMs) {
maybeInvokeCommitCallbacks();
maybeThrowFencedInstanceException();
maybeThrowInvalidGroupIdException();

log.debug("Committing offsets: {}", offsets);
Expand Down Expand Up @@ -1643,7 +1641,6 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeInvokeCommitCallbacks();
maybeThrowFencedInstanceException();
backgroundEventProcessor.process();

// Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as
Expand Down Expand Up @@ -1897,8 +1894,9 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
return kafkaConsumerMetrics;
}

private void maybeThrowFencedInstanceException() {
if (isFenced) {
private void maybeInvokeCommitCallbacks() {
offsetCommitCallbackInvoker.executeCallbacks();
if (offsetCommitCallbackInvoker.hasFencedException()) {
String groupInstanceId = "unknown";
if (!groupMetadata.isPresent()) {
log.error("No group metadata found although a group ID was provided. This is a bug!");
Expand All @@ -1911,12 +1909,6 @@ private void maybeThrowFencedInstanceException() {
}
}

private void maybeInvokeCommitCallbacks() {
if (offsetCommitCallbackInvoker.executeCallbacks()) {
isFenced = true;
}
}

// Visible for testing
SubscriptionState subscriptions() {
return subscriptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,17 @@ public CommitRequestManager(
final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking back at the implementation - now i think it is rather unnecessary to have these finals in the parameters. I wonder if you think we should clean them up in the future...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very mixed across the clients codebase. Sometimes you put the final sometimes you don't. In the streams module there is a strict rule to do. Not sure, but as long as there is no guideline around this, and we are not completely repulsed by it, I'd suggest to just stick with whatever the existing code is doing for consistency to not mess up git blame too much.

final String groupId,
final Optional<String> groupInstanceId) {
this(time, logContext, subscriptions, config, coordinatorRequestManager,
offsetCommitCallbackInvoker, groupId,
groupInstanceId, config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG), OptionalDouble.empty());
this(time,
logContext,
subscriptions,
config,
coordinatorRequestManager,
offsetCommitCallbackInvoker,
groupId,
groupInstanceId,
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
OptionalDouble.empty());
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*/
public class OffsetCommitCallbackInvoker {
private final ConsumerInterceptors<?, ?> interceptors;
private boolean hasFencedException = false;

OffsetCommitCallbackInvoker(ConsumerInterceptors<?, ?> interceptors) {
this.interceptors = interceptors;
Expand All @@ -57,23 +58,22 @@ public void submitUserCallback(final OffsetCommitCallback callback,
callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, exception));
}

/**
* @return true if an offset commit was fenced.
*/
public boolean executeCallbacks() {
boolean isFenced = false;
public void executeCallbacks() {
while (!callbackQueue.isEmpty()) {
OffsetCommitCallbackTask task = callbackQueue.poll();
if (task != null) {

if (task.exception instanceof FencedInstanceIdException)
isFenced = true;
hasFencedException = true;

task.callback.onComplete(task.offsets, task.exception);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
return isFenced;
}

public boolean hasFencedException() {
return hasFencedException;
}

private static class OffsetCommitCallbackTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,22 @@ public void testWakeupCommitted() {
assertNull(consumer.wakeupTrigger().getPendingTask());
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the interceptor behavior on close? if we have inflight commits before closing the consumer, should the interceptors be invoked? can we add tests around that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test. Interceptors are called.

public void testInterceptorAutoCommitOnClose() {
Properties props = requiredConsumerPropertiesAndGroupId("test-id");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

consumer = newConsumer(props);
assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
completeCommitApplicationEventSuccessfully();

consumer.close(Duration.ZERO);

assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
}

@Test
public void testInterceptorCommitSync() {
Properties props = requiredConsumerPropertiesAndGroupId("test-id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,8 +1336,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
MockProducerInterceptor.resetCounters()
}

// This is disabled for the the consumer group until KAFKA-16155 is resolved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reporting this.

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testAutoCommitIntercept(quorum: String, groupProtocol: String): Unit = {
val topic2 = "topic2"
createTopic(topic2, 2, brokerCount)
Expand Down