From db405f125fea3e56d072a58b6064d90178a69cfb Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 13 Dec 2023 13:38:04 +0100 Subject: [PATCH 01/13] implement consumer interceptors --- .../internals/AsyncKafkaConsumer.java | 11 +++++++ .../internals/AsyncKafkaConsumerTest.java | 30 ++++++++++++++++--- .../kafka/api/PlaintextConsumerTest.scala | 6 ++-- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6f0eaef4eb5ab..7f89090b2720a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -757,6 +757,14 @@ public void commitAsync(Map offsets, OffsetCo try { CompletableFuture future = commit(offsets, false); future.whenComplete((r, t) -> { + if (t == null && interceptors != null) { + invoker.submit(new OffsetCommitCallbackTask( + (o, e) -> interceptors.onCommit(o), + offsets, + null + )); + } + if (callback == null) { if (t != null) { log.error("Offset commit with offsets {} failed", offsets, t); @@ -1333,6 +1341,9 @@ public void commitSync(Map offsets, Duration try { CompletableFuture commitFuture = commit(offsets, true); ConsumerUtils.getResult(commitFuture, time.timer(timeout)); + if (interceptors != null) { + interceptors.onCommit(offsets); + } } finally { wakeupTrigger.clearTask(); kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index ac052d72f6ffb..3b11ffa904d37 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; @@ -67,6 +68,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.test.MockConsumerInterceptor; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; @@ -154,19 +156,21 @@ public void resetAll() { private AsyncKafkaConsumer newConsumer() { final Properties props = requiredConsumerProperties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); - final ConsumerConfig config = new ConsumerConfig(props); - return newConsumer(config); + return newConsumer(props); } private AsyncKafkaConsumer newConsumerWithoutGroupId() { final Properties props = requiredConsumerProperties(); - final ConsumerConfig config = new ConsumerConfig(props); - return newConsumer(config); + return newConsumer(props); } @SuppressWarnings("UnusedReturnValue") private AsyncKafkaConsumer newConsumerWithEmptyGroupId() { final Properties props = requiredConsumerPropertiesAndGroupId(""); + return newConsumer(props); + } + + private AsyncKafkaConsumer newConsumer(Properties props) { final ConsumerConfig config = new ConsumerConfig(props); return newConsumer(config); } @@ -848,6 +852,24 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } + @Test + public void testInterceptorConstructorClose() { + try { + Properties props = requiredConsumerProperties(); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + + AsyncKafkaConsumer consumer = newConsumer(props); + assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); + assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get()); + + consumer.close(); + assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); + assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get()); + } finally { + // cleanup since we are using mutable static variables in MockConsumerInterceptor + MockConsumerInterceptor.resetCounters(); + } + } @Test public void testRefreshCommittedOffsetsSuccess() { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 01e9aec257878..cd326381268ae 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -1278,9 +1278,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { runMultiConsumerSessionTimeoutTest(true) } - // TODO: enable this test for the consumer group protocol when consumer interceptors are supported @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testInterceptors(quorum: String, groupProtocol: String): Unit = { val appendStr = "mock" MockConsumerInterceptor.resetCounters() @@ -1391,9 +1390,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { MockConsumerInterceptor.resetCounters() } - // TODO: enable this test for the consumer group protocol when consumer interceptors are supported @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testInterceptorsWithWrongKeyValue(quorum: String, groupProtocol: String): Unit = { val appendStr = "mock" // create producer with interceptor that has different key and value types from the producer From 8ca984fce8163f797ba09d158116a61521b7514c Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Tue, 2 Jan 2024 18:33:01 +0100 Subject: [PATCH 02/13] tests --- .../internals/AsyncKafkaConsumer.java | 6 +- .../internals/ConsumerInterceptors.java | 5 ++ .../internals/AsyncKafkaConsumerTest.java | 73 +++++++++++++------ 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 7f89090b2720a..92f7044ae3488 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -757,7 +757,7 @@ public void commitAsync(Map offsets, OffsetCo try { CompletableFuture future = commit(offsets, false); future.whenComplete((r, t) -> { - if (t == null && interceptors != null) { + if (t == null && !interceptors.isEmpty()) { invoker.submit(new OffsetCommitCallbackTask( (o, e) -> interceptors.onCommit(o), offsets, @@ -1341,9 +1341,7 @@ public void commitSync(Map offsets, Duration try { CompletableFuture commitFuture = commit(offsets, true); ConsumerUtils.getResult(commitFuture, time.timer(timeout)); - if (interceptors != null) { - interceptors.onCommit(offsets); - } + interceptors.onCommit(offsets); } finally { wakeupTrigger.clearTask(); kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java index d96d8ce82a790..1c9d17a549d5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java @@ -40,6 +40,11 @@ public ConsumerInterceptors(List> interceptors) { this.interceptors = interceptors; } + /** Returns true if no interceptors are defined. All other methods will be no-ops in this case. */ + public boolean isEmpty() { + return interceptors.isEmpty(); + } + /** * This is called when the records are about to be returned to the user. *

diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 3b11ffa904d37..73d99e3f1f473 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.GroupProtocol; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; @@ -151,6 +150,7 @@ public void resetAll() { } consumer = null; Mockito.framework().clearInlineMocks(); + MockConsumerInterceptor.resetCounters(); } private AsyncKafkaConsumer newConsumer() { @@ -262,12 +262,13 @@ public void testCommitAsyncUserSuppliedCallbackNoException() { Map offsets = new HashMap<>(); offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); - completeCommitApplicationEventExceptionally(); + completeCommitApplicationEventSuccessfully(); MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); forceCommitCallbackInvocation(); + assertEquals(callback.invoked, 1); assertNull(callback.exception); } @@ -440,7 +441,7 @@ public void testEnsureCallbackExecutedByApplicationThread() { consumer = newConsumer(); final String currentThread = Thread.currentThread().getName(); MockCommitCallback callback = new MockCommitCallback(); - completeCommitApplicationEventExceptionally(); + completeCommitApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); assertEquals(1, consumer.callbacks()); @@ -482,7 +483,7 @@ public void testCommitSyncLeaderEpochUpdate() { HashMap topicPartitionOffsets = new HashMap<>(); topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); - completeCommitApplicationEventExceptionally(); + completeCommitApplicationEventSuccessfully(); consumer.assign(Arrays.asList(t0, t1)); @@ -531,7 +532,7 @@ public void testCommitAsyncLeaderEpochUpdate() { public void testEnsurePollExecutedCommitAsyncCallbacks() { consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - completeCommitApplicationEventExceptionally(); + completeCommitApplicationEventSuccessfully(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); @@ -546,7 +547,7 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { public void testEnsureShutdownExecutedCommitAsyncCallbacks() { consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - completeCommitApplicationEventExceptionally(); + completeCommitApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); assertMockCommitCallbackInvoked(() -> consumer.close(), callback, @@ -853,22 +854,52 @@ public void testWakeupCommitted() { } @Test - public void testInterceptorConstructorClose() { - try { - Properties props = requiredConsumerProperties(); - props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + public void testInterceptorCommitSync() { + Properties props = requiredConsumerPropertiesAndGroupId("test-id"); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - AsyncKafkaConsumer consumer = newConsumer(props); - assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); - assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get()); + consumer = newConsumer(props); + assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); + completeCommitApplicationEventSuccessfully(); - consumer.close(); - assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); - assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get()); - } finally { - // cleanup since we are using mutable static variables in MockConsumerInterceptor - MockConsumerInterceptor.resetCounters(); - } + consumer.commitSync(mockTopicPartitionOffset()); + + assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); + } + + @Test + public void testInterceptorCommitAsync() { + Properties props = requiredConsumerPropertiesAndGroupId("test-id"); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + consumer = newConsumer(props); + assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); + + completeCommitApplicationEventSuccessfully(); + consumer.commitAsync(mockTopicPartitionOffset(), new MockCommitCallback()); + assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); + + forceCommitCallbackInvocation(); + assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); + } + + @Test + public void testNoInterceptorCommitAsyncFailed() { + Properties props = requiredConsumerPropertiesAndGroupId("test-id"); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + consumer = newConsumer(props); + assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); + completeCommitApplicationEventExceptionally(new KafkaException("Test exception")); + + consumer.commitAsync(mockTopicPartitionOffset(), new MockCommitCallback()); + assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); + + forceCommitCallbackInvocation(); + assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); } @Test @@ -1432,7 +1463,7 @@ private void completeCommitApplicationEventExceptionally(Exception ex) { }).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); } - private void completeCommitApplicationEventExceptionally() { + private void completeCommitApplicationEventSuccessfully() { doAnswer(invocation -> { CommitApplicationEvent event = invocation.getArgument(0); event.future().complete(null); From e1035680a880c6a239ec6b1c68480d77b6d96367 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 12 Jan 2024 16:53:30 +0100 Subject: [PATCH 03/13] autocommit logic --- .../internals/AsyncKafkaConsumer.java | 77 +++---------- .../internals/CommitRequestManager.java | 8 +- .../OffsetCommitCallbackInvoker.java | 92 ++++++++++++++++ .../consumer/internals/RequestManagers.java | 5 +- .../internals/AsyncKafkaConsumerTest.java | 2 +- .../internals/CommitRequestManagerTest.java | 42 +++++++ .../internals/ConsumerTestBuilder.java | 4 + .../OffsetCommitCallbackInvokerTest.java | 103 ++++++++++++++++++ 8 files changed, 267 insertions(+), 66 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 3d1911afaa647..fa2ee4399f874 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -283,7 +283,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private boolean isFenced = false; - private final OffsetCommitCallbackInvoker invoker = new OffsetCommitCallbackInvoker(); + private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; // currentThread holds the threadId of the current thread accessing the AsyncKafkaConsumer // and is used to prevent multithreaded access @@ -365,6 +365,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { metrics, fetchMetricsManager, clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null)); + this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); final Supplier requestManagersSupplier = RequestManagers.supplier(time, logContext, backgroundEventHandler, @@ -376,7 +377,8 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { apiVersions, fetchMetricsManager, networkClientDelegateSupplier, - clientTelemetryReporter); + clientTelemetryReporter, + offsetCommitCallbackInvoker); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, applicationEventQueue, @@ -485,6 +487,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; + this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); } AsyncKafkaConsumer(LogContext logContext, @@ -554,6 +557,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { logContext, client ); + this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); Supplier requestManagersSupplier = RequestManagers.supplier( time, logContext, @@ -566,7 +570,8 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { apiVersions, fetchMetricsManager, networkClientDelegateSupplier, - clientTelemetryReporter + clientTelemetryReporter, + offsetCommitCallbackInvoker ); Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, @@ -760,12 +765,9 @@ public void commitAsync(Map offsets, OffsetCo // waiting for a response. CompletableFuture future = commit(offsets, false, Optional.empty()); future.whenComplete((r, t) -> { - if (t == null && !interceptors.isEmpty()) { - invoker.submit(new OffsetCommitCallbackTask( - (o, e) -> interceptors.onCommit(o), - offsets, - null - )); + + if (t == null) { + offsetCommitCallbackInvoker.submitCommitInterceptors(offsets); } if (callback == null) { @@ -775,7 +777,7 @@ public void commitAsync(Map offsets, OffsetCo return; } - invoker.submit(new OffsetCommitCallbackTask(callback, offsets, (Exception) t)); + offsetCommitCallbackInvoker.submitUserCallback(callback, offsets, (Exception) t); }); } finally { release(); @@ -1911,65 +1913,14 @@ private void maybeThrowFencedInstanceException() { } private void maybeInvokeCommitCallbacks() { - if (callbacks() > 0) { - invoker.executeCallbacks(); + if (offsetCommitCallbackInvoker.executeCallbacks()) { + isFenced = true; } } - // Visible for testing - int callbacks() { - return invoker.callbackQueue.size(); - } - // Visible for testing SubscriptionState subscriptions() { return subscriptions; } - /** - * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is - * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the - * future completion, and execute the callbacks when user polls/commits/closes the consumer. - */ - private class OffsetCommitCallbackInvoker { - // Thread-safe queue to store callbacks - private final BlockingQueue callbackQueue = new LinkedBlockingQueue<>(); - - public void submit(final OffsetCommitCallbackTask callback) { - try { - callbackQueue.offer(callback); - } catch (Exception e) { - log.error("Unexpected error encountered when adding offset commit callback to the invocation queue", e); - } - } - - public void executeCallbacks() { - while (!callbackQueue.isEmpty()) { - OffsetCommitCallbackTask callback = callbackQueue.poll(); - if (callback != null) { - callback.invoke(); - } - } - } - } - - private class OffsetCommitCallbackTask { - private final Map offsets; - private final Exception exception; - private final OffsetCommitCallback callback; - - public OffsetCommitCallbackTask(final OffsetCommitCallback callback, - final Map offsets, - final Exception e) { - this.offsets = offsets; - this.exception = e; - this.callback = callback; - } - - public void invoke() { - if (exception instanceof FencedInstanceIdException) - isFenced = true; - callback.onComplete(offsets, exception); - } - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index a60c1345e4c66..d33b515373af6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -69,6 +69,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener private final Logger log; private final Optional autoCommitState; private final CoordinatorRequestManager coordinatorRequestManager; + private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final long retryBackoffMs; private final String groupId; private final Optional groupInstanceId; @@ -93,9 +94,11 @@ public CommitRequestManager( final SubscriptionState subscriptions, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, + final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, final String groupId, final Optional groupInstanceId) { - this(time, logContext, subscriptions, config, coordinatorRequestManager, groupId, + this(time, logContext, subscriptions, config, coordinatorRequestManager, + offsetCommitCallbackInvoker, groupId, groupInstanceId, config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG), config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG), OptionalDouble.empty()); } @@ -107,6 +110,7 @@ public CommitRequestManager( final SubscriptionState subscriptions, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, + final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, final String groupId, final Optional groupInstanceId, final long retryBackoffMs, @@ -132,6 +136,7 @@ public CommitRequestManager( this.jitter = jitter; this.throwOnFetchStableOffsetUnsupported = config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); this.memberInfo = new MemberInfo(); + this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker; } /** @@ -271,6 +276,7 @@ public CompletableFuture maybeAutoCommitAllConsumedNow( return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { + offsetCommitCallbackInvoker.submitCommitInterceptors(allConsumedOffsets); log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java new file mode 100644 index 0000000000000..88c1f394a770d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.FencedInstanceIdException; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback} amd + * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}s. This is + * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ +public class OffsetCommitCallbackInvoker { + private final ConsumerInterceptors interceptors; + + OffsetCommitCallbackInvoker(ConsumerInterceptors interceptors) { + this.interceptors = interceptors; + } + + // Thread-safe queue to store user-defined callbacks and interceptors to be executed + private final BlockingQueue callbackQueue = new LinkedBlockingQueue<>(); + + public void submitCommitInterceptors(final Map offsets) { + if (!interceptors.isEmpty()) { + callbackQueue.add(new OffsetCommitCallbackTask( + (o, e) -> interceptors.onCommit(o), + offsets, + null + )); + } + } + + public void submitUserCallback(final OffsetCommitCallback callback, + final Map offsets, + final Exception e) { + callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, e)); + } + + /** + * @return true if an offset commit was fenced. + */ + public boolean executeCallbacks() { + boolean isFenced = false; + while (!callbackQueue.isEmpty()) { + OffsetCommitCallbackTask task = callbackQueue.poll(); + if (task != null) { + + if (task.exception instanceof FencedInstanceIdException) + isFenced = true; + + task.callback.onComplete(task.offsets, task.exception); + + } + } + return isFenced; + } + + private static class OffsetCommitCallbackTask { + public final Map offsets; + public final Exception exception; + public final OffsetCommitCallback callback; + + public OffsetCommitCallbackTask(final OffsetCommitCallback callback, + final Map offsets, + final Exception e) { + this.offsets = offsets; + this.exception = e; + this.callback = callback; + } + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 5305c004b955a..9534efc6d7602 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -116,7 +116,9 @@ public static Supplier supplier(final Time time, final ApiVersions apiVersions, final FetchMetricsManager fetchMetricsManager, final Supplier networkClientDelegateSupplier, - final Optional clientTelemetryReporter) { + final Optional clientTelemetryReporter, + final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker + ) { return new CachedSupplier() { @Override protected RequestManagers create() { @@ -166,6 +168,7 @@ protected RequestManagers create() { subscriptions, config, coordinator, + offsetCommitCallbackInvoker, groupRebalanceConfig.groupId, groupRebalanceConfig.groupInstanceId); membershipManager = new MembershipManagerImpl( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index f032deeb69e66..4c9a5a345c379 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -445,8 +445,8 @@ public void testEnsureCallbackExecutedByApplicationThread() { completeCommitApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - assertEquals(1, consumer.callbacks()); forceCommitCallbackInvocation(); + assertEquals(1, callback.invoked); assertEquals(currentThread, callback.completionThread); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 22d2be1a82060..288d74b5f6a28 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -47,6 +47,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; @@ -79,7 +80,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -93,6 +96,7 @@ public class CommitRequestManagerTest { private LogContext logContext; private MockTime time; private CoordinatorRequestManager coordinatorRequestManager; + private OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private Properties props; private final int defaultApiTimeoutMs = 60000; @@ -104,6 +108,7 @@ public void setup() { this.time = new MockTime(0); this.subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); + this.offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); this.props = new Properties(); this.props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); this.props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { assertPoll(1, commitRequestManger); } + @Test + public void testAutocommitInterceptorsInvoked() { + TopicPartition t1p = new TopicPartition("topic1", 0); + subscriptionState.assignFromUser(singleton(t1p)); + subscriptionState.seek(t1p, 100); + + CommitRequestManager commitRequestManger = create(true, 100); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + List futures = assertPoll(1, commitRequestManger); + + // complete the unsent request to trigger interceptor + futures.get(0).onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap<>()))); + verify(offsetCommitCallbackInvoker).submitCommitInterceptors( + eq(Collections.singletonMap(t1p, new OffsetAndMetadata(100L))) + ); + } + + @Test + public void testAutocommitInterceptorsNotInvokedOnError() { + TopicPartition t1p = new TopicPartition("topic1", 0); + subscriptionState.assignFromUser(singleton(t1p)); + subscriptionState.seek(t1p, 100); + + CommitRequestManager commitRequestManger = create(true, 100); + time.sleep(100); + commitRequestManger.updateAutoCommitTimer(time.milliseconds()); + List futures = assertPoll(1, commitRequestManger); + + // complete the unsent request to trigger interceptor + futures.get(0).onComplete(buildOffsetCommitClientResponse( + new OffsetCommitResponse(0, Collections.singletonMap(t1p, Errors.NETWORK_EXCEPTION))) + ); + Mockito.verify(offsetCommitCallbackInvoker, never()).submitCommitInterceptors(any()); + } + @Test public void testOffsetFetchRequestEnsureDuplicatedRequestSucceed() { CommitRequestManager commitRequestManger = create(true, 100); @@ -940,6 +981,7 @@ private CommitRequestManager create(final boolean autoCommitEnabled, final long this.subscriptionState, new ConsumerConfig(props), this.coordinatorRequestManager, + this.offsetCommitCallbackInvoker, DEFAULT_GROUP_ID, Optional.of(DEFAULT_GROUP_INSTANCE_ID), retryBackoffMs, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 013ddca9d5431..cf246ba8ba761 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -52,6 +52,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; import static org.apache.kafka.common.utils.Utils.closeQuietly; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @SuppressWarnings("ClassDataAbstractionCoupling") @@ -96,6 +97,7 @@ public class ConsumerTestBuilder implements Closeable { public final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; final MockClient client; final Optional groupInfo; + final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; public ConsumerTestBuilder(Optional groupInfo) { this(groupInfo, true, true); @@ -107,6 +109,7 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA this.applicationEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventHandler = spy(new BackgroundEventHandler(logContext, backgroundEventQueue)); + this.offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( 100, DEFAULT_MAX_POLL_INTERVAL_MS, @@ -188,6 +191,7 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA subscriptions, config, coordinator, + offsetCommitCallbackInvoker, gi.groupId, gi.groupInstanceId)); MembershipManager mm = spy( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java new file mode 100644 index 0000000000000..3a60858b017bb --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class OffsetCommitCallbackInvokerTest { + + @Mock + private ConsumerInterceptors consumerInterceptors; + private OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; + + @BeforeEach + public void setup() { + offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(consumerInterceptors); + } + + @Test + public void testMultipleUserCallbacksInvoked() { + final TopicPartition t0 = new TopicPartition("t0", 2); + Map offsets1 = + Collections.singletonMap(t0, new OffsetAndMetadata(10L)); + Map offsets2 = + Collections.singletonMap(t0, new OffsetAndMetadata(20L)); + OffsetCommitCallback callback1 = mock(OffsetCommitCallback.class); + OffsetCommitCallback callback2 = mock(OffsetCommitCallback.class); + + offsetCommitCallbackInvoker.submitUserCallback(callback1, offsets1, null); + offsetCommitCallbackInvoker.submitUserCallback(callback2, offsets2, null); + verify(callback1, never()).onComplete(any(), any()); + verify(callback2, never()).onComplete(any(), any()); + + offsetCommitCallbackInvoker.executeCallbacks(); + InOrder inOrder = inOrder(callback1, callback2); + inOrder.verify(callback1).onComplete(offsets1, null); + inOrder.verify(callback2).onComplete(offsets2, null); + + offsetCommitCallbackInvoker.executeCallbacks(); + inOrder.verifyNoMoreInteractions(); + } + + + @Test + public void testMixedCallbacksInterceptorsInvoked() { + final TopicPartition t0 = new TopicPartition("t0", 2); + Map offsets1 = + Collections.singletonMap(t0, new OffsetAndMetadata(10L)); + Map offsets2 = + Collections.singletonMap(t0, new OffsetAndMetadata(20L)); + OffsetCommitCallback callback1 = mock(OffsetCommitCallback.class); + when(consumerInterceptors.isEmpty()).thenReturn(false); + + offsetCommitCallbackInvoker.submitCommitInterceptors(offsets1); + offsetCommitCallbackInvoker.submitCommitInterceptors(offsets2); + offsetCommitCallbackInvoker.submitUserCallback(callback1, offsets1, null); + verify(callback1, never()).onComplete(any(), any()); + verify(consumerInterceptors, never()).onCommit(any()); + + offsetCommitCallbackInvoker.executeCallbacks(); + InOrder inOrder = inOrder(callback1, consumerInterceptors); + inOrder.verify(consumerInterceptors).onCommit(offsets1); + inOrder.verify(consumerInterceptors).onCommit(offsets2); + inOrder.verify(callback1).onComplete(offsets1, null); + + offsetCommitCallbackInvoker.executeCallbacks(); + inOrder.verifyNoMoreInteractions(); + } + + +} From 8ec95f9cbf5b1634fbdaccb27ed6802922a3ab16 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 15 Jan 2024 10:39:56 +0100 Subject: [PATCH 04/13] Address comments --- .../internals/OffsetCommitCallbackInvoker.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java index 88c1f394a770d..69600c706b369 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java @@ -44,7 +44,7 @@ public class OffsetCommitCallbackInvoker { public void submitCommitInterceptors(final Map offsets) { if (!interceptors.isEmpty()) { callbackQueue.add(new OffsetCommitCallbackTask( - (o, e) -> interceptors.onCommit(o), + (innerOffsets, exception) -> interceptors.onCommit(innerOffsets), offsets, null )); @@ -53,8 +53,8 @@ public void submitCommitInterceptors(final Map offsets, - final Exception e) { - callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, e)); + final Exception exception) { + callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, exception)); } /** @@ -83,9 +83,9 @@ private static class OffsetCommitCallbackTask { public OffsetCommitCallbackTask(final OffsetCommitCallback callback, final Map offsets, - final Exception e) { + final Exception exception) { this.offsets = offsets; - this.exception = e; + this.exception = exception; this.callback = callback; } } From b6be0b97c7e393c8117b7625c3d2c796caf96bb0 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 15 Jan 2024 14:07:14 +0100 Subject: [PATCH 05/13] update TODO --- .../scala/integration/kafka/api/PlaintextConsumerTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index ff925cbe12d57..698b1c4d958db 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -1337,7 +1337,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { MockProducerInterceptor.resetCounters() } - // TODO: enable this test for the consumer group protocol when consumer interceptors are supported + // TODO: enable this test for the consumer group protocol when KAFKA-16133 is fixed @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) def testAutoCommitIntercept(quorum: String, groupProtocol: String): Unit = { From 498546029b22ea39e97638190f6f3ad0b142b24a Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 15 Jan 2024 14:08:49 +0100 Subject: [PATCH 06/13] move commit callback invocation on close --- .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index fa2ee4399f874..b147a03f79970 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1236,6 +1236,8 @@ private void close(Duration timeout, boolean swallowException) { closeTimer.update(); if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); + swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", this::maybeInvokeCommitCallbacks, + firstException); closeTimer.update(); closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); @@ -1259,7 +1261,6 @@ private void close(Duration timeout, boolean swallowException) { * 1. autocommit offsets * 2. revoke all partitions * 3. if partition revocation completes successfully, send leave group - * 4. invoke all async commit callbacks if there is any */ void prepareShutdown(final Timer timer, final AtomicReference firstException) { if (!groupMetadata.isPresent()) @@ -1272,8 +1273,6 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer); }, "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); - swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", this::maybeInvokeCommitCallbacks, - firstException); } // Visible for testing From 73fd928cbc26678aabd3b5271f68ebd4fd42ae26 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 15 Jan 2024 22:01:57 +0100 Subject: [PATCH 07/13] enable test --- .../scala/integration/kafka/api/PlaintextConsumerTest.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 72c43cdf13a4a..b8fe709033280 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -263,7 +263,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { ensureNoRebalance(consumer, listener) } - // TODO: enable this test for the consumer group protocol when support for committing offsets on close is implemented. @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = { @@ -1337,9 +1336,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { MockProducerInterceptor.resetCounters() } - // TODO: enable this test for the consumer group protocol when KAFKA-16133 is fixed @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testAutoCommitIntercept(quorum: String, groupProtocol: String): Unit = { val topic2 = "topic2" createTopic(topic2, 2, brokerCount) From 31cf64c4ee1e76a6f6de59e5bea6962da5911e76 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 17 Jan 2024 15:57:45 +0100 Subject: [PATCH 08/13] comments --- .../consumer/internals/AsyncKafkaConsumer.java | 14 +++----------- .../consumer/internals/CommitRequestManager.java | 15 +++++++++++---- .../internals/OffsetCommitCallbackInvoker.java | 14 +++++++------- .../internals/AsyncKafkaConsumerTest.java | 16 ++++++++++++++++ .../kafka/api/PlaintextConsumerTest.scala | 3 ++- 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index b147a03f79970..a6ebcf91a5265 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -282,7 +282,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); - private boolean isFenced = false; private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; // currentThread holds the threadId of the current thread accessing the AsyncKafkaConsumer @@ -789,7 +788,6 @@ CompletableFuture commit(final Map offs final boolean isWakeupable, final Optional retryTimeoutMs) { maybeInvokeCommitCallbacks(); - maybeThrowFencedInstanceException(); maybeThrowInvalidGroupIdException(); log.debug("Committing offsets: {}", offsets); @@ -1643,7 +1641,6 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeInvokeCommitCallbacks(); - maybeThrowFencedInstanceException(); backgroundEventProcessor.process(); // Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as @@ -1897,8 +1894,9 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { return kafkaConsumerMetrics; } - private void maybeThrowFencedInstanceException() { - if (isFenced) { + private void maybeInvokeCommitCallbacks() { + offsetCommitCallbackInvoker.executeCallbacks(); + if (offsetCommitCallbackInvoker.hasFencedException()) { String groupInstanceId = "unknown"; if (!groupMetadata.isPresent()) { log.error("No group metadata found although a group ID was provided. This is a bug!"); @@ -1911,12 +1909,6 @@ private void maybeThrowFencedInstanceException() { } } - private void maybeInvokeCommitCallbacks() { - if (offsetCommitCallbackInvoker.executeCallbacks()) { - isFenced = true; - } - } - // Visible for testing SubscriptionState subscriptions() { return subscriptions; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index d33b515373af6..ff90dd1893f66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -97,10 +97,17 @@ public CommitRequestManager( final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, final String groupId, final Optional groupInstanceId) { - this(time, logContext, subscriptions, config, coordinatorRequestManager, - offsetCommitCallbackInvoker, groupId, - groupInstanceId, config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG), - config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG), OptionalDouble.empty()); + this(time, + logContext, + subscriptions, + config, + coordinatorRequestManager, + offsetCommitCallbackInvoker, + groupId, + groupInstanceId, + config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG), + config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG), + OptionalDouble.empty()); } // Visible for testing diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java index 69600c706b369..be3a131af0fa6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java @@ -33,6 +33,7 @@ */ public class OffsetCommitCallbackInvoker { private final ConsumerInterceptors interceptors; + private boolean hasFencedException = false; OffsetCommitCallbackInvoker(ConsumerInterceptors interceptors) { this.interceptors = interceptors; @@ -57,23 +58,22 @@ public void submitUserCallback(final OffsetCommitCallback callback, callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, exception)); } - /** - * @return true if an offset commit was fenced. - */ - public boolean executeCallbacks() { - boolean isFenced = false; + public void executeCallbacks() { while (!callbackQueue.isEmpty()) { OffsetCommitCallbackTask task = callbackQueue.poll(); if (task != null) { if (task.exception instanceof FencedInstanceIdException) - isFenced = true; + hasFencedException = true; task.callback.onComplete(task.offsets, task.exception); } } - return isFenced; + } + + public boolean hasFencedException() { + return hasFencedException; } private static class OffsetCommitCallbackTask { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 4c9a5a345c379..b8f78218605a8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -849,6 +849,22 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } + @Test + public void testInterceptorAutoCommitOnClose() { + Properties props = requiredConsumerPropertiesAndGroupId("test-id"); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + + consumer = newConsumer(props); + assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); + completeCommitApplicationEventSuccessfully(); + + consumer.close(Duration.ZERO); + + assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); + assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get()); + } + @Test public void testInterceptorCommitSync() { Properties props = requiredConsumerPropertiesAndGroupId("test-id"); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index b8fe709033280..22d465a58c824 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -1336,8 +1336,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { MockProducerInterceptor.resetCounters() } + // This is disabled for the the consumer group until KAFKA-16155 is resolved. @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) def testAutoCommitIntercept(quorum: String, groupProtocol: String): Unit = { val topic2 = "topic2" createTopic(topic2, 2, brokerCount) From 68740cd483088dc7ec4223223f992f8202af2a38 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Thu, 18 Jan 2024 10:07:37 +0100 Subject: [PATCH 09/13] do not fail close if fenced --- .../clients/consumer/internals/AsyncKafkaConsumer.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index a6ebcf91a5265..8efa1f57ab859 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -788,6 +788,7 @@ CompletableFuture commit(final Map offs final boolean isWakeupable, final Optional retryTimeoutMs) { maybeInvokeCommitCallbacks(); + maybeThrowFencedInstanceException(); maybeThrowInvalidGroupIdException(); log.debug("Committing offsets: {}", offsets); @@ -1641,6 +1642,7 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeInvokeCommitCallbacks(); + maybeThrowFencedInstanceException(); backgroundEventProcessor.process(); // Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as @@ -1894,8 +1896,7 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { return kafkaConsumerMetrics; } - private void maybeInvokeCommitCallbacks() { - offsetCommitCallbackInvoker.executeCallbacks(); + private void maybeThrowFencedInstanceException() { if (offsetCommitCallbackInvoker.hasFencedException()) { String groupInstanceId = "unknown"; if (!groupMetadata.isPresent()) { @@ -1909,6 +1910,10 @@ private void maybeInvokeCommitCallbacks() { } } + private void maybeInvokeCommitCallbacks() { + offsetCommitCallbackInvoker.executeCallbacks(); + } + // Visible for testing SubscriptionState subscriptions() { return subscriptions; From 4434f4ba9b182fce22df501f2a98bcd2f01fc7d7 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 19 Jan 2024 15:18:02 +0100 Subject: [PATCH 10/13] Bruno's comments --- .../internals/AsyncKafkaConsumer.java | 4 +- .../internals/CommitRequestManager.java | 2 +- .../OffsetCommitCallbackInvoker.java | 11 ++--- .../internals/CommitRequestManagerTest.java | 4 +- .../OffsetCommitCallbackInvokerTest.java | 46 +++++++++++++++++-- 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 8efa1f57ab859..48713957df0a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -766,7 +766,7 @@ public void commitAsync(Map offsets, OffsetCo future.whenComplete((r, t) -> { if (t == null) { - offsetCommitCallbackInvoker.submitCommitInterceptors(offsets); + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets); } if (callback == null) { @@ -776,7 +776,7 @@ public void commitAsync(Map offsets, OffsetCo return; } - offsetCommitCallbackInvoker.submitUserCallback(callback, offsets, (Exception) t); + offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, offsets, (Exception) t); }); } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index ff90dd1893f66..d0aa738cf8d19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -283,7 +283,7 @@ public CompletableFuture maybeAutoCommitAllConsumedNow( return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { - offsetCommitCallbackInvoker.submitCommitInterceptors(allConsumedOffsets); + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java index be3a131af0fa6..db7770cbda865 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java @@ -42,19 +42,19 @@ public class OffsetCommitCallbackInvoker { // Thread-safe queue to store user-defined callbacks and interceptors to be executed private final BlockingQueue callbackQueue = new LinkedBlockingQueue<>(); - public void submitCommitInterceptors(final Map offsets) { + public void enqueueInterceptorInvocation(final Map offsets) { if (!interceptors.isEmpty()) { callbackQueue.add(new OffsetCommitCallbackTask( - (innerOffsets, exception) -> interceptors.onCommit(innerOffsets), + (offsetsParam, exception) -> interceptors.onCommit(offsetsParam), offsets, null )); } } - public void submitUserCallback(final OffsetCommitCallback callback, - final Map offsets, - final Exception exception) { + public void enqueueUserCallbackInvocation(final OffsetCommitCallback callback, + final Map offsets, + final Exception exception) { callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, exception)); } @@ -67,7 +67,6 @@ public void executeCallbacks() { hasFencedException = true; task.callback.onComplete(task.offsets, task.exception); - } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 288d74b5f6a28..e233a56947c77 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -385,7 +385,7 @@ public void testAutocommitInterceptorsInvoked() { // complete the unsent request to trigger interceptor futures.get(0).onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap<>()))); - verify(offsetCommitCallbackInvoker).submitCommitInterceptors( + verify(offsetCommitCallbackInvoker).enqueueInterceptorInvocation( eq(Collections.singletonMap(t1p, new OffsetAndMetadata(100L))) ); } @@ -405,7 +405,7 @@ public void testAutocommitInterceptorsNotInvokedOnError() { futures.get(0).onComplete(buildOffsetCommitClientResponse( new OffsetCommitResponse(0, Collections.singletonMap(t1p, Errors.NETWORK_EXCEPTION))) ); - Mockito.verify(offsetCommitCallbackInvoker, never()).submitCommitInterceptors(any()); + Mockito.verify(offsetCommitCallbackInvoker, never()).enqueueInterceptorInvocation(any()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java index 3a60858b017bb..0c8a6dfaf0aff 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvokerTest.java @@ -58,8 +58,8 @@ public void testMultipleUserCallbacksInvoked() { OffsetCommitCallback callback1 = mock(OffsetCommitCallback.class); OffsetCommitCallback callback2 = mock(OffsetCommitCallback.class); - offsetCommitCallbackInvoker.submitUserCallback(callback1, offsets1, null); - offsetCommitCallbackInvoker.submitUserCallback(callback2, offsets2, null); + offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback1, offsets1, null); + offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback2, offsets2, null); verify(callback1, never()).onComplete(any(), any()); verify(callback2, never()).onComplete(any(), any()); @@ -72,6 +72,42 @@ public void testMultipleUserCallbacksInvoked() { inOrder.verifyNoMoreInteractions(); } + @Test + public void testNoOnCommitOnEmptyInterceptors() { + final TopicPartition t0 = new TopicPartition("t0", 2); + Map offsets1 = + Collections.singletonMap(t0, new OffsetAndMetadata(10L)); + Map offsets2 = + Collections.singletonMap(t0, new OffsetAndMetadata(20L)); + when(consumerInterceptors.isEmpty()).thenReturn(true); + + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets1); + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets2); + offsetCommitCallbackInvoker.executeCallbacks(); + verify(consumerInterceptors, never()).onCommit(any()); + } + + @Test + public void testOnlyInterceptors() { + final TopicPartition t0 = new TopicPartition("t0", 2); + Map offsets1 = + Collections.singletonMap(t0, new OffsetAndMetadata(10L)); + Map offsets2 = + Collections.singletonMap(t0, new OffsetAndMetadata(20L)); + when(consumerInterceptors.isEmpty()).thenReturn(false); + + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets1); + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets2); + verify(consumerInterceptors, never()).onCommit(any()); + + offsetCommitCallbackInvoker.executeCallbacks(); + InOrder inOrder = inOrder(consumerInterceptors); + inOrder.verify(consumerInterceptors).onCommit(offsets1); + inOrder.verify(consumerInterceptors).onCommit(offsets2); + + offsetCommitCallbackInvoker.executeCallbacks(); + inOrder.verifyNoMoreInteractions(); + } @Test public void testMixedCallbacksInterceptorsInvoked() { @@ -83,9 +119,9 @@ public void testMixedCallbacksInterceptorsInvoked() { OffsetCommitCallback callback1 = mock(OffsetCommitCallback.class); when(consumerInterceptors.isEmpty()).thenReturn(false); - offsetCommitCallbackInvoker.submitCommitInterceptors(offsets1); - offsetCommitCallbackInvoker.submitCommitInterceptors(offsets2); - offsetCommitCallbackInvoker.submitUserCallback(callback1, offsets1, null); + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets1); + offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets2); + offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback1, offsets1, null); verify(callback1, never()).onComplete(any(), any()); verify(consumerInterceptors, never()).onCommit(any()); From 432527903f6c7b2d1b2ef9adf243b203ca857516 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 19 Jan 2024 15:22:10 +0100 Subject: [PATCH 11/13] fix --- .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 079b44fccc3d1..b61202ea7b609 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -563,8 +563,8 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { fetchMetricsManager, networkClientDelegateSupplier, clientTelemetryReporter, - offsetCommitCallbackInvoker, - metrics + metrics, + offsetCommitCallbackInvoker ); Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, From fd687ddeb71e47ebe81d6a9d86ae4b945b11a611 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 19 Jan 2024 15:24:45 +0100 Subject: [PATCH 12/13] fix --- .../apache/kafka/clients/consumer/internals/RequestManagers.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 92d54fcf339b5..6ca2e398de382 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -105,6 +105,7 @@ public void close() { * Creates a {@link Supplier} for deferred creation during invocation by * {@link ConsumerNetworkThread}. */ + @SuppressWarnings({"checkstyle:ParameterNumber"}) public static Supplier supplier(final Time time, final LogContext logContext, final BackgroundEventHandler backgroundEventHandler, From 2d6f231f6994e05827617f7ab19648f4e1175146 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 29 Jan 2024 15:44:37 +0100 Subject: [PATCH 13/13] more testing --- .../internals/AsyncKafkaConsumerTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 1775dc1b6f4e6..691467d29fbd5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -884,6 +884,22 @@ public void testInterceptorCommitSync() { assertEquals(1, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); } + @Test + public void testNoInterceptorCommitSyncFailed() { + Properties props = requiredConsumerPropertiesAndGroupId("test-id"); + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + consumer = newConsumer(props); + assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); + KafkaException expected = new KafkaException("Test exception"); + completeCommitApplicationEventExceptionally(expected); + + KafkaException actual = assertThrows(KafkaException.class, () -> consumer.commitSync(mockTopicPartitionOffset())); + assertEquals(expected, actual); + assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); + } + @Test public void testInterceptorCommitAsync() { Properties props = requiredConsumerPropertiesAndGroupId("test-id");