diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index b589fb44f0042..05ac8688931d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -248,6 +248,11 @@ public interface Consumer extends Closeable { */ ConsumerGroupMetadata groupMetadata(); + /** + * @see KafkaConsumer#enforceRebalance) + */ + void enforceRebalance(); + /** * @see KafkaConsumer#close() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f140d6a8806a1..0680e6796b161 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2247,6 +2247,39 @@ public ConsumerGroupMetadata groupMetadata() { return coordinator.groupMetadata(); } + /** + * Alert the consumer to trigger a new rebalance by rejoining the group. This is a nonblocking call that forces + * the consumer to trigger a new rebalance on the next {@link #poll(Duration)} call. Note that this API does not + * itself initiate the rebalance, so you must still call {@link #poll(Duration)}. If a rebalance is already in + * progress this call will be a no-op. If you wish to force an additional rebalance you must complete the current + * one by calling poll before retrying this API. + *

+ * You do not need to call this during normal processing, as the consumer group will manage itself + * automatically and rebalance when necessary. However there may be situations where the application wishes to + * trigger a rebalance that would otherwise not occur. For example, if some condition external and invisible to + * the Consumer and its group changes in a way that would affect the userdata encoded in the + * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}, the Consumer + * will not be notified and no rebalance will occur. This API can be used to force the group to rebalance so that + * the assignor can perform a partition reassignment based on the latest userdata. If your assignor does not use + * this userdata, or you do not use a custom + * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not + * use this API. + * + * @throws java.lang.IllegalStateException if the consumer does not use group subscription + */ + @Override + public void enforceRebalance() { + acquireAndEnsureOpen(); + try { + if (coordinator == null) { + throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group."); + } + coordinator.requestRejoin(); + } finally { + release(); + } + } + /** * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. * If auto-commit is enabled, this will commit the current offsets if possible within the default diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 67b4e9f0ea314..b8579c45dff20 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -566,6 +566,10 @@ public ConsumerGroupMetadata groupMetadata() { return null; } + @Override + public void enforceRebalance() { + } + @Override public void close(Duration timeout) { close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index a0070178eb7ed..a1ccb61cfa80b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -426,7 +426,7 @@ boolean joinGroupIfNeeded(final Timer timer) { // Generation data maybe concurrently cleared by Heartbeat thread. // Can't use synchronized for {@code onJoinComplete}, because it can be long enough - // and shouldn't block hearbeat thread. + // and shouldn't block heartbeat thread. // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment synchronized (AbstractCoordinator.this) { generationSnapshot = this.generation; @@ -904,7 +904,7 @@ synchronized void resetGenerationOnLeaveGroup() { resetGeneration(); } - protected synchronized void requestRejoin() { + public synchronized void requestRejoin() { this.rejoinNeeded = true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 00d4db4775495..3e23883a4f182 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.Fetcher; +import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; @@ -1722,7 +1723,7 @@ public void testCommitSyncAuthenticationFailure() { } @Test(expected = AuthenticationException.class) - public void testCommittedAuthenticationFaiure() { + public void testCommittedAuthenticationFailure() { final KafkaConsumer consumer = consumerWithPendingAuthenticationError(); consumer.committed(Collections.singleton(tp0)).get(tp0); } @@ -2418,4 +2419,44 @@ public void testClosingConsumerUnregistersConsumerMetrics() { assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg")); assertFalse(consumerMetricPresent(consumer, "time-between-poll-max")); } + + @Test(expected = IllegalStateException.class) + public void testEnforceRebalanceWithManualAssignment() { + try (KafkaConsumer consumer = newConsumer((String) null)) { + consumer.assign(singleton(new TopicPartition("topic", 0))); + consumer.enforceRebalance(); + } + } + + @Test + public void testEnforceRebalanceTriggersRebalanceOnNextPoll() { + Time time = new MockTime(1L); + SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + MockRebalanceListener countingRebalanceListener = new MockRebalanceListener(); + initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); + + consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener); + Node node = metadata.fetch().nodes().get(0); + prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); + + // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group + consumer.poll(Duration.ZERO); + consumer.poll(Duration.ZERO); + + // onPartitionsRevoked is not invoked when first joining the group + assertEquals(countingRebalanceListener.revokedCount, 0); + assertEquals(countingRebalanceListener.assignedCount, 1); + + consumer.enforceRebalance(); + + // the next poll should trigger a rebalance + consumer.poll(Duration.ZERO); + + assertEquals(countingRebalanceListener.revokedCount, 1); + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index fa2d797c2aa34..c2edc08c0095c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -2902,31 +2901,4 @@ public void onComplete(Map offsets, Exception this.exception = exception; } } - - private static class MockRebalanceListener implements ConsumerRebalanceListener { - public Collection lost; - public Collection revoked; - public Collection assigned; - public int lostCount = 0; - public int revokedCount = 0; - public int assignedCount = 0; - - @Override - public void onPartitionsAssigned(Collection partitions) { - this.assigned = partitions; - assignedCount++; - } - - @Override - public void onPartitionsRevoked(Collection partitions) { - this.revoked = partitions; - revokedCount++; - } - - @Override - public void onPartitionsLost(Collection partitions) { - this.lost = partitions; - lostCount++; - } - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java new file mode 100644 index 0000000000000..be802542e26a0 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockRebalanceListener.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.Collection; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +public class MockRebalanceListener implements ConsumerRebalanceListener { + public Collection lost; + public Collection revoked; + public Collection assigned; + public int lostCount = 0; + public int revokedCount = 0; + public int assignedCount = 0; + + @Override + public void onPartitionsAssigned(Collection partitions) { + this.assigned = partitions; + assignedCount++; + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + this.revoked = partitions; + revokedCount++; + } + + @Override + public void onPartitionsLost(Collection partitions) { + this.lost = partitions; + lostCount++; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1ec6e7b297964..4092825368d45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -753,7 +753,7 @@ private void runLoop() { log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance."); assignmentErrorCode.set(AssignorError.NONE.code()); - enforceRebalance(); + mainConsumer.enforceRebalance(); } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks {} are corrupted. " + @@ -766,16 +766,11 @@ private void runLoop() { "Will close out all assigned tasks and rejoin the consumer group."); taskManager.handleLostAll(); - enforceRebalance(); + mainConsumer.enforceRebalance(); } } } - - private void enforceRebalance() { - mainConsumer.unsubscribe(); - subscribeConsumer(); - } - + private void subscribeConsumer() { if (builder.usesPatternSubscription()) { mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index dada19eb4b40d..505ab82933a33 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -539,7 +539,7 @@ def do_rolling_bounce(self, processor, counter, current_generation): timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 7' on" + str(second_other_node.account)) - log_monitor.wait_until("Version probing detected. Triggering new rebalance.", + log_monitor.wait_until("Version probing detected. Rejoining the consumer group to trigger a new rebalance.", timeout_sec=60, err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))