From bbdbb84b4b19eaf6323223204c66b96fc394a50c Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 27 Feb 2020 09:16:20 -0800 Subject: [PATCH 1/6] log reason --- .../kafka/streams/processor/internals/StateManagerUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 1e90bad58f2a9..f0c3d3676728d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -84,7 +84,7 @@ static void wipeStateStores(final Logger log, final ProcessorStateManager stateM Utils.delete(stateMgr.baseDir()); } catch (final IOException fatalException) { // since it is only called under dirty close, we always swallow the exception - log.warn("Failed to wiping state stores for task {}", stateMgr.taskId()); + log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException); } } From d6bfdb82bafed045b807da7f88aa86c496cb8be0 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 27 Feb 2020 12:38:32 -0800 Subject: [PATCH 2/6] hold exception until the finish of all callbacks --- .../internals/ConsumerCoordinator.java | 31 +++++++++++++------ .../processor/internals/TaskManager.java | 26 ++++++++++++---- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index c6a995b3fb73b..959427ba3d7ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.FencedInstanceIdException; @@ -333,7 +334,7 @@ protected void onJoinComplete(int generation, ByteBuffer assignmentBuffer) { log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId); - // only the leader is responsible for monitoring for metadata changes (i.e. partition changes) + // Only the leader is responsible for monitoring for metadata changes (i.e. partition changes) if (!isLeader) assignmentSnapshot = null; @@ -377,12 +378,12 @@ protected void onJoinComplete(int generation, ); if (!revokedPartitions.isEmpty()) { - // revoke partitions that were previously owned but no longer assigned; + // Revoke partitions that were previously owned but no longer assigned; // note that we should only change the assignment (or update the assignor's state) // AFTER we've triggered the revoke callback firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); - // if revoked any partitions, need to re-join the group afterwards + // If revoked any partitions, need to re-join the group afterwards log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions); requestRejoin(); } @@ -392,21 +393,33 @@ protected void onJoinComplete(int generation, // were not explicitly requested, so we update the joined subscription here. maybeUpdateJoinedSubscription(assignedPartitions); - // give the assignor a chance to update internal state based on the received assignment + // Give the assignor a chance to update internal state based on the received assignment groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId); - assignor.onAssignment(assignment, groupMetadata); - // reschedule the auto commit starting from now + // Catch any exception here to make sure we could complete the user callback. + try { + assignor.onAssignment(assignment, groupMetadata); + } catch (Exception e) { + firstException.compareAndSet(null, e); + } + + // Reschedule the auto commit starting from now if (autoCommitEnabled) this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs); subscriptions.assignFromSubscribed(assignedPartitions); - // add partitions that were not previously owned but are now assigned + // Add partitions that were not previously owned but are now assigned firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions)); + Exception exceptionCaught = firstException.get(); - if (firstException.get() != null) - throw new KafkaException("User rebalance callback throws an error", firstException.get()); + if (exceptionCaught != null) { + if (exceptionCaught instanceof KafkaException) { + throw (KafkaException) exceptionCaught; + } else { + throw new KafkaException("User rebalance callback throws an error", exceptionCaught); + } + } } void maybeUpdateSubscriptionMetadata() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index b36fb36ffec17..78c6eff4cf515 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; @@ -199,11 +200,18 @@ public void handleAssignment(final Map> activeTasks, if (!taskCloseExceptions.isEmpty()) { final Map.Entry first = taskCloseExceptions.entrySet().iterator().next(); - throw new RuntimeException( - "Unexpected failure to close " + taskCloseExceptions.size() + - " task(s) [" + taskCloseExceptions.keySet() + "]. " + - "First exception (for task " + first.getKey() + ") follows.", first.getValue() - ); + for (Map.Entry entry : taskCloseExceptions.entrySet()) { + if (!(entry.getValue() instanceof TaskMigratedException)) { + throw new RuntimeException( + "Unexpected failure to close " + taskCloseExceptions.size() + + " task(s) [" + taskCloseExceptions.keySet() + "]. " + + "First unexpected exception (for task " + entry.getKey() + ") follows.", entry.getValue() + ); + } + } + + // If all exceptions are task-migrated, we would just throw the first one. + throw first.getValue(); } if (!activeTasksToCreate.isEmpty()) { @@ -311,7 +319,13 @@ void handleRevocation(final Collection revokedPartitions) { for (final TaskId taskId : revokedTasks) { final Task task = tasks.get(taskId); - task.suspend(); + try { + task.suspend(); + } catch (RuntimeException e) { + log.error("Failed to suspend task {} due to {}, Closing it uncleanly.", e, task.id()); + + task.closeDirty(); + } } } From 9e5126740306723247e240ca4617091f3ee6f0b9 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 27 Feb 2020 14:41:54 -0800 Subject: [PATCH 3/6] simplify revocation --- .../internals/ConsumerCoordinator.java | 10 ++++------ .../processor/internals/TaskManager.java | 19 +++---------------- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 959427ba3d7ab..6bc2c9eb080f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.FencedInstanceIdException; @@ -411,13 +410,12 @@ protected void onJoinComplete(int generation, // Add partitions that were not previously owned but are now assigned firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions)); - Exception exceptionCaught = firstException.get(); - if (exceptionCaught != null) { - if (exceptionCaught instanceof KafkaException) { - throw (KafkaException) exceptionCaught; + if (firstException.get() != null) { + if (firstException.get() instanceof KafkaException) { + throw (KafkaException) firstException.get(); } else { - throw new KafkaException("User rebalance callback throws an error", exceptionCaught); + throw new KafkaException("User rebalance callback throws an error", firstException.get()); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 78c6eff4cf515..e203fad897294 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; @@ -199,8 +198,7 @@ public void handleAssignment(final Map> activeTasks, } if (!taskCloseExceptions.isEmpty()) { - final Map.Entry first = taskCloseExceptions.entrySet().iterator().next(); - for (Map.Entry entry : taskCloseExceptions.entrySet()) { + for (final Map.Entry entry : taskCloseExceptions.entrySet()) { if (!(entry.getValue() instanceof TaskMigratedException)) { throw new RuntimeException( "Unexpected failure to close " + taskCloseExceptions.size() + @@ -210,6 +208,7 @@ public void handleAssignment(final Map> activeTasks, } } + final Map.Entry first = taskCloseExceptions.entrySet().iterator().next(); // If all exceptions are task-migrated, we would just throw the first one. throw first.getValue(); } @@ -301,12 +300,11 @@ boolean tryToCompleteRestoration() { * @throws TaskMigratedException if the task producer got fenced (EOS only) */ void handleRevocation(final Collection revokedPartitions) { - final Set revokedTasks = new HashSet<>(); final Set remainingPartitions = new HashSet<>(revokedPartitions); for (final Task task : tasks.values()) { if (remainingPartitions.containsAll(task.inputPartitions())) { - revokedTasks.add(task.id()); + task.suspend(); } remainingPartitions.removeAll(task.inputPartitions()); } @@ -316,17 +314,6 @@ void handleRevocation(final Collection revokedPartitions) { "due to race condition of consumer detecting the heartbeat failure, or the tasks " + "have been cleaned up by the handleAssignment callback.", remainingPartitions); } - - for (final TaskId taskId : revokedTasks) { - final Task task = tasks.get(taskId); - try { - task.suspend(); - } catch (RuntimeException e) { - log.error("Failed to suspend task {} due to {}, Closing it uncleanly.", e, task.id()); - - task.closeDirty(); - } - } } /** From 8821bfa06121deb8131984d432dd27397b4cb104 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 27 Feb 2020 15:16:01 -0800 Subject: [PATCH 4/6] unit test --- .../internals/ConsumerCoordinatorTest.java | 175 +++++++++++++++++- .../internals/MockPartitionAssignor.java | 12 ++ .../internals/ThrowOnAssignmentAssignor.java | 49 +++++ .../processor/internals/TaskManagerTest.java | 76 ++++++-- 4 files changed, 287 insertions(+), 25 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 6ea23ba9ec85c..bb93496d3779f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -106,6 +106,8 @@ import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.TestUtils.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -134,7 +136,10 @@ public class ConsumerCoordinatorTest { private final ConsumerPartitionAssignor.RebalanceProtocol protocol; private final MockPartitionAssignor partitionAssignor; + private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor; + private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor; private final List assignors; + private final Map assignorMap; private MockClient client; private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap() { { @@ -153,8 +158,18 @@ public class ConsumerCoordinatorTest { public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) { this.protocol = protocol; + this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol)); - this.assignors = Collections.singletonList(partitionAssignor); + this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), + new KafkaException("Kaboom for assignment!"), + "throw-on-assignment-assignor"); + this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), + new IllegalStateException("Illegal state for assignment!"), + "throw-fatal-error-on-assignment-assignor"); + this.assignors = Arrays.asList(partitionAssignor, throwOnAssignmentAssignor, throwFatalErrorOnAssignmentAssignor); + this.assignorMap = mkMap(mkEntry(partitionAssignor.name(), partitionAssignor), + mkEntry(throwOnAssignmentAssignor.name(), throwOnAssignmentAssignor), + mkEntry(throwFatalErrorOnAssignmentAssignor.name(), throwFatalErrorOnAssignmentAssignor)); } @Parameterized.Parameters(name = "rebalance protocol = {0}") @@ -338,14 +353,11 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() { for (int i = 0; i < numRequests; i++) { Map offsets = singletonMap(tp, new OffsetAndMetadata(i)); - coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() { - @Override - public void onComplete(Map offsets, Exception exception) { - responses.incrementAndGet(); - Throwable cause = exception.getCause(); - assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()), - cause instanceof DisconnectException); - } + coordinator.commitOffsetsAsync(offsets, (offsets1, exception) -> { + responses.incrementAndGet(); + Throwable cause = exception.getCause(); + assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()), + cause instanceof DisconnectException); }); } @@ -465,6 +477,151 @@ public void testUnsubscribeWithValidGeneration() { assertEquals(0, rebalanceListener.revokedCount); } + @Test + public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + super.onPartitionsRevoked(partitions); + throw new KafkaException("Kaboom on revoke!"); + } + }; + + if (protocol == COOPERATIVE) { + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null); + } else { + // Eager protocol doesn't revoke partitions. + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + } + + @Test + public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + + @Test + public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + partitionAssignor.name(), "Kaboom on partition assign!", null); + } + + @Test + public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + super.onPartitionsRevoked(partitions); + throw new IllegalStateException("Illegal state on partition revoke!"); + } + }; + + if (protocol == COOPERATIVE) { + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), + "User rebalance callback throws an error", "Illegal state on partition revoke!"); + } else { + // Eager protocol doesn't revoke partitions. + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + } + + @Test + public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + verifyOnCallbackExceptions(throwOnAssignListener, + throwFatalErrorOnAssignmentAssignor.name(), + "User rebalance callback throws an error", "Illegal state for assignment!"); + } + + @Test + public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new IllegalStateException("Illegal state on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + partitionAssignor.name(), "User rebalance callback throws an error", + "Illegal state on partition assign!"); + } + + private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceListener, + final String assignorName, + final String exceptionMessage, + final String causeMessage) { + subscriptions.subscribe(singleton(topic1), rebalanceListener); + ByteBuffer buffer = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0]))); + subscriptions.assignFromSubscribed(singleton(t2p)); + + if (exceptionMessage != null) { + final Exception exception = assertThrows(KafkaException.class, + () -> coordinator.onJoinComplete(1, "memberId", assignorName, buffer)); + assertEquals(exceptionMessage, exception.getMessage()); + if (causeMessage != null) { + assertEquals(causeMessage, exception.getCause().getMessage()); + } + } + + // Eager doesn't trigger on partition revoke. + assertEquals(protocol == COOPERATIVE ? 1 : 0, rebalanceListener.revokedCount); + assertEquals(0, rebalanceListener.lostCount); + assertEquals(1, rebalanceListener.assignedCount); + if (assignorMap.containsKey(assignorName)) { + assertEquals(1, assignorMap.get(assignorName).numAssignment()); + } else { + throw new IllegalArgumentException("Unknown assignor name: " + assignorName); + } + } + @Test public void testUnsubscribeWithInvalidGeneration() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java index ca7cb34ddd414..ef95f2ffb5594 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.common.TopicPartition; import java.util.List; @@ -25,10 +26,13 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor { private final List supportedProtocols; + private int numAssignment; + private Map> result = null; MockPartitionAssignor(final List supportedProtocols) { this.supportedProtocols = supportedProtocols; + numAssignment = 0; } @Override @@ -57,4 +61,12 @@ public void prepare(Map> result) { this.result = result; } + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + numAssignment += 1; + } + + int numAssignment() { + return numAssignment; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java new file mode 100644 index 0000000000000..782ca26006985 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; + +import java.util.List; + +/** + * A mock assignor which throws for {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#onAssignment}. + */ +public class ThrowOnAssignmentAssignor extends MockPartitionAssignor { + + private final RuntimeException bookeepedException; + private final String name; + + ThrowOnAssignmentAssignor(final List supportedProtocols, + final RuntimeException bookeepedException, + final String name) { + super(supportedProtocols); + this.bookeepedException = bookeepedException; + this.name = name; + } + + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + super.onAssignment(assignment, metadata); + throw bookeepedException; + } + + @Override + public String name() { + return name; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 791ac73c84436..aa56834a8977f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -74,11 +75,9 @@ import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class TaskManagerTest { @@ -462,13 +461,9 @@ public void commit() { task00.setCommitNeeded(); - try { - taskManager.commitAll(); - fail("should have thrown"); - } catch (final RuntimeException e) { - assertThat(e, instanceOf(RuntimeException.class)); - assertThat(e.getMessage(), equalTo("opsh.")); - } + final RuntimeException thrown = + assertThrows(RuntimeException.class, () -> taskManager.commitAll()); + assertThat(thrown.getMessage(), equalTo("opsh.")); } @Test @@ -493,13 +488,9 @@ public void commit() { task01.setCommitNeeded(); - try { - taskManager.commitAll(); - fail("should have thrown"); - } catch (final RuntimeException e) { - assertThat(e, instanceOf(RuntimeException.class)); - assertThat(e.getMessage(), equalTo("opsh.")); - } + final RuntimeException thrown = + assertThrows(RuntimeException.class, () -> taskManager.commitAll()); + assertThat(thrown.getMessage(), equalTo("opsh.")); } @Test @@ -741,6 +732,59 @@ public void shouldHaveRemainingPartitionsUncleared() { "detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback.")); } + @Test + public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t2 close exception", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + // The task map orders tasks based on topic group id and partition, so here + // t1 should always be the first. + assertThat(thrown.getMessage(), equalTo("t1 close exception; it means all tasks belonging to this thread should be migrated.")); + } + + @Test + public void shouldThrowRuntimeExceptionWhenEncounteredDuringTaskClose() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new IllegalStateException("t2 illegal state exception", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final RuntimeException thrown = assertThrows(RuntimeException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + // The task map orders tasks based on topic group id and partition, so here + // t1 should always be the first. + assertThat(thrown.getMessage(), equalTo("Unexpected failure to close 2 task(s) [[0_1, 0_2]]. " + + "First unexpected exception (for task 0_2) follows.")); + + assertThat(thrown.getCause().getMessage(), equalTo("t2 illegal state exception")); + } + private static void expectRestoreToBeCompleted(final Consumer consumer, final ChangelogReader changeLogReader) { final Set assignment = singleton(new TopicPartition("assignment", 0)); From 1347ddc2c399ecddc1fa488c8188f3a74f0f5f49 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 27 Feb 2020 17:47:25 -0800 Subject: [PATCH 5/6] also check Kafka exception --- .../processor/internals/TaskManager.java | 15 ++++++--- .../processor/internals/TaskManagerTest.java | 33 +++++++++++++++++-- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index e203fad897294..c8f33ca28bd84 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -200,11 +200,16 @@ public void handleAssignment(final Map> activeTasks, if (!taskCloseExceptions.isEmpty()) { for (final Map.Entry entry : taskCloseExceptions.entrySet()) { if (!(entry.getValue() instanceof TaskMigratedException)) { - throw new RuntimeException( - "Unexpected failure to close " + taskCloseExceptions.size() + - " task(s) [" + taskCloseExceptions.keySet() + "]. " + - "First unexpected exception (for task " + entry.getKey() + ") follows.", entry.getValue() - ); + if (entry.getValue() instanceof KafkaException) { + log.error("Hit Kafka exception while closing for first task {}", entry.getKey()); + throw entry.getValue(); + } else { + throw new RuntimeException( + "Unexpected failure to close " + taskCloseExceptions.size() + + " task(s) [" + taskCloseExceptions.keySet() + "]. " + + "First unexpected exception (for task " + entry.getKey() + ") follows.", entry.getValue() + ); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index aa56834a8977f..3f38a11e38004 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.Metrics; @@ -758,7 +759,7 @@ public void closeClean() { } @Test - public void shouldThrowRuntimeExceptionWhenEncounteredDuringTaskClose() { + public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() { final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { @Override public void closeClean() { @@ -777,14 +778,40 @@ public void closeClean() { final RuntimeException thrown = assertThrows(RuntimeException.class, () -> taskManager.handleAssignment(emptyMap(), emptyMap())); - // The task map orders tasks based on topic group id and partition, so here - // t1 should always be the first. + // Fatal exception thrown first. assertThat(thrown.getMessage(), equalTo("Unexpected failure to close 2 task(s) [[0_1, 0_2]]. " + "First unexpected exception (for task 0_2) follows.")); assertThat(thrown.getCause().getMessage(), equalTo("t2 illegal state exception")); } + @Test + public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new KafkaException("Kaboom for t2!", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final KafkaException thrown = assertThrows(KafkaException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + + // Expecting the original Kafka exception instead of a wrapped one. + assertThat(thrown.getMessage(), equalTo("Kaboom for t2!")); + + assertThat(thrown.getCause().getMessage(), equalTo(null)); + } + private static void expectRestoreToBeCompleted(final Consumer consumer, final ChangelogReader changeLogReader) { final Set assignment = singleton(new TopicPartition("assignment", 0)); From abbb2e38df0ff7c250e3b77588c7cc62b22bb4de Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 27 Feb 2020 20:50:15 -0800 Subject: [PATCH 6/6] minor test fixes --- .../internals/ConsumerCoordinatorTest.java | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index bb93496d3779f..fa2d797c2aa34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -479,9 +479,6 @@ public void testUnsubscribeWithValidGeneration() { @Test public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { @@ -502,9 +499,6 @@ public void onPartitionsRevoked(Collection partitions) { @Test public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { @Override public void onPartitionsAssigned(Collection partitions) { @@ -519,9 +513,6 @@ public void onPartitionsAssigned(Collection partitions) { @Test public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { @Override public void onPartitionsAssigned(Collection partitions) { @@ -536,9 +527,6 @@ public void onPartitionsAssigned(Collection partitions) { @Test public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { @@ -560,9 +548,6 @@ public void onPartitionsRevoked(Collection partitions) { @Test public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { @Override public void onPartitionsAssigned(Collection partitions) { @@ -577,9 +562,6 @@ public void onPartitionsAssigned(Collection partitions) { @Test public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { @Override public void onPartitionsAssigned(Collection partitions) { @@ -597,6 +579,9 @@ private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceLis final String assignorName, final String exceptionMessage, final String causeMessage) { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + subscriptions.subscribe(singleton(topic1), rebalanceListener); ByteBuffer buffer = ConsumerProtocol.serializeAssignment( new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0]))); @@ -615,11 +600,9 @@ private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceLis assertEquals(protocol == COOPERATIVE ? 1 : 0, rebalanceListener.revokedCount); assertEquals(0, rebalanceListener.lostCount); assertEquals(1, rebalanceListener.assignedCount); - if (assignorMap.containsKey(assignorName)) { - assertEquals(1, assignorMap.get(assignorName).numAssignment()); - } else { - throw new IllegalArgumentException("Unknown assignor name: " + assignorName); - } + assertTrue("Unknown assignor name: " + assignorName, + assignorMap.containsKey(assignorName)); + assertEquals(1, assignorMap.get(assignorName).numAssignment()); } @Test