diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index c6a995b3fb73b..6bc2c9eb080f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -333,7 +333,7 @@ protected void onJoinComplete(int generation, ByteBuffer assignmentBuffer) { log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId); - // only the leader is responsible for monitoring for metadata changes (i.e. partition changes) + // Only the leader is responsible for monitoring for metadata changes (i.e. partition changes) if (!isLeader) assignmentSnapshot = null; @@ -377,12 +377,12 @@ protected void onJoinComplete(int generation, ); if (!revokedPartitions.isEmpty()) { - // revoke partitions that were previously owned but no longer assigned; + // Revoke partitions that were previously owned but no longer assigned; // note that we should only change the assignment (or update the assignor's state) // AFTER we've triggered the revoke callback firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); - // if revoked any partitions, need to re-join the group afterwards + // If revoked any partitions, need to re-join the group afterwards log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions); requestRejoin(); } @@ -392,21 +392,32 @@ protected void onJoinComplete(int generation, // were not explicitly requested, so we update the joined subscription here. maybeUpdateJoinedSubscription(assignedPartitions); - // give the assignor a chance to update internal state based on the received assignment + // Give the assignor a chance to update internal state based on the received assignment groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId); - assignor.onAssignment(assignment, groupMetadata); - // reschedule the auto commit starting from now + // Catch any exception here to make sure we could complete the user callback. + try { + assignor.onAssignment(assignment, groupMetadata); + } catch (Exception e) { + firstException.compareAndSet(null, e); + } + + // Reschedule the auto commit starting from now if (autoCommitEnabled) this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs); subscriptions.assignFromSubscribed(assignedPartitions); - // add partitions that were not previously owned but are now assigned + // Add partitions that were not previously owned but are now assigned firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions)); - if (firstException.get() != null) - throw new KafkaException("User rebalance callback throws an error", firstException.get()); + if (firstException.get() != null) { + if (firstException.get() instanceof KafkaException) { + throw (KafkaException) firstException.get(); + } else { + throw new KafkaException("User rebalance callback throws an error", firstException.get()); + } + } } void maybeUpdateSubscriptionMetadata() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 6ea23ba9ec85c..fa2d797c2aa34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -106,6 +106,8 @@ import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.TestUtils.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -134,7 +136,10 @@ public class ConsumerCoordinatorTest { private final ConsumerPartitionAssignor.RebalanceProtocol protocol; private final MockPartitionAssignor partitionAssignor; + private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor; + private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor; private final List assignors; + private final Map assignorMap; private MockClient client; private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap() { { @@ -153,8 +158,18 @@ public class ConsumerCoordinatorTest { public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) { this.protocol = protocol; + this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol)); - this.assignors = Collections.singletonList(partitionAssignor); + this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), + new KafkaException("Kaboom for assignment!"), + "throw-on-assignment-assignor"); + this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), + new IllegalStateException("Illegal state for assignment!"), + "throw-fatal-error-on-assignment-assignor"); + this.assignors = Arrays.asList(partitionAssignor, throwOnAssignmentAssignor, throwFatalErrorOnAssignmentAssignor); + this.assignorMap = mkMap(mkEntry(partitionAssignor.name(), partitionAssignor), + mkEntry(throwOnAssignmentAssignor.name(), throwOnAssignmentAssignor), + mkEntry(throwFatalErrorOnAssignmentAssignor.name(), throwFatalErrorOnAssignmentAssignor)); } @Parameterized.Parameters(name = "rebalance protocol = {0}") @@ -338,14 +353,11 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() { for (int i = 0; i < numRequests; i++) { Map offsets = singletonMap(tp, new OffsetAndMetadata(i)); - coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() { - @Override - public void onComplete(Map offsets, Exception exception) { - responses.incrementAndGet(); - Throwable cause = exception.getCause(); - assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()), - cause instanceof DisconnectException); - } + coordinator.commitOffsetsAsync(offsets, (offsets1, exception) -> { + responses.incrementAndGet(); + Throwable cause = exception.getCause(); + assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()), + cause instanceof DisconnectException); }); } @@ -465,6 +477,134 @@ public void testUnsubscribeWithValidGeneration() { assertEquals(0, rebalanceListener.revokedCount); } + @Test + public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() { + MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + super.onPartitionsRevoked(partitions); + throw new KafkaException("Kaboom on revoke!"); + } + }; + + if (protocol == COOPERATIVE) { + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null); + } else { + // Eager protocol doesn't revoke partitions. + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + } + + @Test + public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() { + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + + @Test + public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() { + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + partitionAssignor.name(), "Kaboom on partition assign!", null); + } + + @Test + public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() { + MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + super.onPartitionsRevoked(partitions); + throw new IllegalStateException("Illegal state on partition revoke!"); + } + }; + + if (protocol == COOPERATIVE) { + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), + "User rebalance callback throws an error", "Illegal state on partition revoke!"); + } else { + // Eager protocol doesn't revoke partitions. + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + } + + @Test + public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() { + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + verifyOnCallbackExceptions(throwOnAssignListener, + throwFatalErrorOnAssignmentAssignor.name(), + "User rebalance callback throws an error", "Illegal state for assignment!"); + } + + @Test + public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() { + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new IllegalStateException("Illegal state on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + partitionAssignor.name(), "User rebalance callback throws an error", + "Illegal state on partition assign!"); + } + + private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceListener, + final String assignorName, + final String exceptionMessage, + final String causeMessage) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + subscriptions.subscribe(singleton(topic1), rebalanceListener); + ByteBuffer buffer = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0]))); + subscriptions.assignFromSubscribed(singleton(t2p)); + + if (exceptionMessage != null) { + final Exception exception = assertThrows(KafkaException.class, + () -> coordinator.onJoinComplete(1, "memberId", assignorName, buffer)); + assertEquals(exceptionMessage, exception.getMessage()); + if (causeMessage != null) { + assertEquals(causeMessage, exception.getCause().getMessage()); + } + } + + // Eager doesn't trigger on partition revoke. + assertEquals(protocol == COOPERATIVE ? 1 : 0, rebalanceListener.revokedCount); + assertEquals(0, rebalanceListener.lostCount); + assertEquals(1, rebalanceListener.assignedCount); + assertTrue("Unknown assignor name: " + assignorName, + assignorMap.containsKey(assignorName)); + assertEquals(1, assignorMap.get(assignorName).numAssignment()); + } + @Test public void testUnsubscribeWithInvalidGeneration() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java index ca7cb34ddd414..ef95f2ffb5594 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.common.TopicPartition; import java.util.List; @@ -25,10 +26,13 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor { private final List supportedProtocols; + private int numAssignment; + private Map> result = null; MockPartitionAssignor(final List supportedProtocols) { this.supportedProtocols = supportedProtocols; + numAssignment = 0; } @Override @@ -57,4 +61,12 @@ public void prepare(Map> result) { this.result = result; } + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + numAssignment += 1; + } + + int numAssignment() { + return numAssignment; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java new file mode 100644 index 0000000000000..782ca26006985 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; + +import java.util.List; + +/** + * A mock assignor which throws for {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#onAssignment}. + */ +public class ThrowOnAssignmentAssignor extends MockPartitionAssignor { + + private final RuntimeException bookeepedException; + private final String name; + + ThrowOnAssignmentAssignor(final List supportedProtocols, + final RuntimeException bookeepedException, + final String name) { + super(supportedProtocols); + this.bookeepedException = bookeepedException; + this.name = name; + } + + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + super.onAssignment(assignment, metadata); + throw bookeepedException; + } + + @Override + public String name() { + return name; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 1e90bad58f2a9..f0c3d3676728d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -84,7 +84,7 @@ static void wipeStateStores(final Logger log, final ProcessorStateManager stateM Utils.delete(stateMgr.baseDir()); } catch (final IOException fatalException) { // since it is only called under dirty close, we always swallow the exception - log.warn("Failed to wiping state stores for task {}", stateMgr.taskId()); + log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index b36fb36ffec17..c8f33ca28bd84 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -198,12 +198,24 @@ public void handleAssignment(final Map> activeTasks, } if (!taskCloseExceptions.isEmpty()) { + for (final Map.Entry entry : taskCloseExceptions.entrySet()) { + if (!(entry.getValue() instanceof TaskMigratedException)) { + if (entry.getValue() instanceof KafkaException) { + log.error("Hit Kafka exception while closing for first task {}", entry.getKey()); + throw entry.getValue(); + } else { + throw new RuntimeException( + "Unexpected failure to close " + taskCloseExceptions.size() + + " task(s) [" + taskCloseExceptions.keySet() + "]. " + + "First unexpected exception (for task " + entry.getKey() + ") follows.", entry.getValue() + ); + } + } + } + final Map.Entry first = taskCloseExceptions.entrySet().iterator().next(); - throw new RuntimeException( - "Unexpected failure to close " + taskCloseExceptions.size() + - " task(s) [" + taskCloseExceptions.keySet() + "]. " + - "First exception (for task " + first.getKey() + ") follows.", first.getValue() - ); + // If all exceptions are task-migrated, we would just throw the first one. + throw first.getValue(); } if (!activeTasksToCreate.isEmpty()) { @@ -293,12 +305,11 @@ boolean tryToCompleteRestoration() { * @throws TaskMigratedException if the task producer got fenced (EOS only) */ void handleRevocation(final Collection revokedPartitions) { - final Set revokedTasks = new HashSet<>(); final Set remainingPartitions = new HashSet<>(revokedPartitions); for (final Task task : tasks.values()) { if (remainingPartitions.containsAll(task.inputPartitions())) { - revokedTasks.add(task.id()); + task.suspend(); } remainingPartitions.removeAll(task.inputPartitions()); } @@ -308,11 +319,6 @@ void handleRevocation(final Collection revokedPartitions) { "due to race condition of consumer detecting the heartbeat failure, or the tasks " + "have been cleaned up by the handleAssignment callback.", remainingPartitions); } - - for (final TaskId taskId : revokedTasks) { - final Task task = tasks.get(taskId); - task.suspend(); - } } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 791ac73c84436..3f38a11e38004 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -23,10 +23,12 @@ import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -74,11 +76,9 @@ import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class TaskManagerTest { @@ -462,13 +462,9 @@ public void commit() { task00.setCommitNeeded(); - try { - taskManager.commitAll(); - fail("should have thrown"); - } catch (final RuntimeException e) { - assertThat(e, instanceOf(RuntimeException.class)); - assertThat(e.getMessage(), equalTo("opsh.")); - } + final RuntimeException thrown = + assertThrows(RuntimeException.class, () -> taskManager.commitAll()); + assertThat(thrown.getMessage(), equalTo("opsh.")); } @Test @@ -493,13 +489,9 @@ public void commit() { task01.setCommitNeeded(); - try { - taskManager.commitAll(); - fail("should have thrown"); - } catch (final RuntimeException e) { - assertThat(e, instanceOf(RuntimeException.class)); - assertThat(e.getMessage(), equalTo("opsh.")); - } + final RuntimeException thrown = + assertThrows(RuntimeException.class, () -> taskManager.commitAll()); + assertThat(thrown.getMessage(), equalTo("opsh.")); } @Test @@ -741,6 +733,85 @@ public void shouldHaveRemainingPartitionsUncleared() { "detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback.")); } + @Test + public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t2 close exception", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + // The task map orders tasks based on topic group id and partition, so here + // t1 should always be the first. + assertThat(thrown.getMessage(), equalTo("t1 close exception; it means all tasks belonging to this thread should be migrated.")); + } + + @Test + public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new IllegalStateException("t2 illegal state exception", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final RuntimeException thrown = assertThrows(RuntimeException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + // Fatal exception thrown first. + assertThat(thrown.getMessage(), equalTo("Unexpected failure to close 2 task(s) [[0_1, 0_2]]. " + + "First unexpected exception (for task 0_2) follows.")); + + assertThat(thrown.getCause().getMessage(), equalTo("t2 illegal state exception")); + } + + @Test + public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new KafkaException("Kaboom for t2!", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final KafkaException thrown = assertThrows(KafkaException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + + // Expecting the original Kafka exception instead of a wrapped one. + assertThat(thrown.getMessage(), equalTo("Kaboom for t2!")); + + assertThat(thrown.getCause().getMessage(), equalTo(null)); + } + private static void expectRestoreToBeCompleted(final Consumer consumer, final ChangelogReader changeLogReader) { final Set assignment = singleton(new TopicPartition("assignment", 0));