diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 6ea23ba9ec85c..bb93496d3779f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -106,6 +106,8 @@ import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE; import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.TestUtils.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -134,7 +136,10 @@ public class ConsumerCoordinatorTest { private final ConsumerPartitionAssignor.RebalanceProtocol protocol; private final MockPartitionAssignor partitionAssignor; + private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor; + private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor; private final List assignors; + private final Map assignorMap; private MockClient client; private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap() { { @@ -153,8 +158,18 @@ public class ConsumerCoordinatorTest { public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) { this.protocol = protocol; + this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol)); - this.assignors = Collections.singletonList(partitionAssignor); + this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), + new KafkaException("Kaboom for assignment!"), + "throw-on-assignment-assignor"); + this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), + new IllegalStateException("Illegal state for assignment!"), + "throw-fatal-error-on-assignment-assignor"); + this.assignors = Arrays.asList(partitionAssignor, throwOnAssignmentAssignor, throwFatalErrorOnAssignmentAssignor); + this.assignorMap = mkMap(mkEntry(partitionAssignor.name(), partitionAssignor), + mkEntry(throwOnAssignmentAssignor.name(), throwOnAssignmentAssignor), + mkEntry(throwFatalErrorOnAssignmentAssignor.name(), throwFatalErrorOnAssignmentAssignor)); } @Parameterized.Parameters(name = "rebalance protocol = {0}") @@ -338,14 +353,11 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() { for (int i = 0; i < numRequests; i++) { Map offsets = singletonMap(tp, new OffsetAndMetadata(i)); - coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() { - @Override - public void onComplete(Map offsets, Exception exception) { - responses.incrementAndGet(); - Throwable cause = exception.getCause(); - assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()), - cause instanceof DisconnectException); - } + coordinator.commitOffsetsAsync(offsets, (offsets1, exception) -> { + responses.incrementAndGet(); + Throwable cause = exception.getCause(); + assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()), + cause instanceof DisconnectException); }); } @@ -465,6 +477,151 @@ public void testUnsubscribeWithValidGeneration() { assertEquals(0, rebalanceListener.revokedCount); } + @Test + public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + super.onPartitionsRevoked(partitions); + throw new KafkaException("Kaboom on revoke!"); + } + }; + + if (protocol == COOPERATIVE) { + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null); + } else { + // Eager protocol doesn't revoke partitions. + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + } + + @Test + public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + + @Test + public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + partitionAssignor.name(), "Kaboom on partition assign!", null); + } + + @Test + public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + super.onPartitionsRevoked(partitions); + throw new IllegalStateException("Illegal state on partition revoke!"); + } + }; + + if (protocol == COOPERATIVE) { + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), + "User rebalance callback throws an error", "Illegal state on partition revoke!"); + } else { + // Eager protocol doesn't revoke partitions. + verifyOnCallbackExceptions(throwOnRevokeListener, + throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null); + } + } + + @Test + public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new KafkaException("Kaboom on partition assign!"); + } + }; + verifyOnCallbackExceptions(throwOnAssignListener, + throwFatalErrorOnAssignmentAssignor.name(), + "User rebalance callback throws an error", "Illegal state for assignment!"); + } + + @Test + public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + super.onPartitionsAssigned(partitions); + throw new IllegalStateException("Illegal state on partition assign!"); + } + }; + + verifyOnCallbackExceptions(throwOnAssignListener, + partitionAssignor.name(), "User rebalance callback throws an error", + "Illegal state on partition assign!"); + } + + private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceListener, + final String assignorName, + final String exceptionMessage, + final String causeMessage) { + subscriptions.subscribe(singleton(topic1), rebalanceListener); + ByteBuffer buffer = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0]))); + subscriptions.assignFromSubscribed(singleton(t2p)); + + if (exceptionMessage != null) { + final Exception exception = assertThrows(KafkaException.class, + () -> coordinator.onJoinComplete(1, "memberId", assignorName, buffer)); + assertEquals(exceptionMessage, exception.getMessage()); + if (causeMessage != null) { + assertEquals(causeMessage, exception.getCause().getMessage()); + } + } + + // Eager doesn't trigger on partition revoke. + assertEquals(protocol == COOPERATIVE ? 1 : 0, rebalanceListener.revokedCount); + assertEquals(0, rebalanceListener.lostCount); + assertEquals(1, rebalanceListener.assignedCount); + if (assignorMap.containsKey(assignorName)) { + assertEquals(1, assignorMap.get(assignorName).numAssignment()); + } else { + throw new IllegalArgumentException("Unknown assignor name: " + assignorName); + } + } + @Test public void testUnsubscribeWithInvalidGeneration() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java index ca7cb34ddd414..ef95f2ffb5594 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.common.TopicPartition; import java.util.List; @@ -25,10 +26,13 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor { private final List supportedProtocols; + private int numAssignment; + private Map> result = null; MockPartitionAssignor(final List supportedProtocols) { this.supportedProtocols = supportedProtocols; + numAssignment = 0; } @Override @@ -57,4 +61,12 @@ public void prepare(Map> result) { this.result = result; } + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + numAssignment += 1; + } + + int numAssignment() { + return numAssignment; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java new file mode 100644 index 0000000000000..782ca26006985 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ThrowOnAssignmentAssignor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; + +import java.util.List; + +/** + * A mock assignor which throws for {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#onAssignment}. + */ +public class ThrowOnAssignmentAssignor extends MockPartitionAssignor { + + private final RuntimeException bookeepedException; + private final String name; + + ThrowOnAssignmentAssignor(final List supportedProtocols, + final RuntimeException bookeepedException, + final String name) { + super(supportedProtocols); + this.bookeepedException = bookeepedException; + this.name = name; + } + + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + super.onAssignment(assignment, metadata); + throw bookeepedException; + } + + @Override + public String name() { + return name; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 791ac73c84436..aa56834a8977f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -74,11 +75,9 @@ import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class TaskManagerTest { @@ -462,13 +461,9 @@ public void commit() { task00.setCommitNeeded(); - try { - taskManager.commitAll(); - fail("should have thrown"); - } catch (final RuntimeException e) { - assertThat(e, instanceOf(RuntimeException.class)); - assertThat(e.getMessage(), equalTo("opsh.")); - } + final RuntimeException thrown = + assertThrows(RuntimeException.class, () -> taskManager.commitAll()); + assertThat(thrown.getMessage(), equalTo("opsh.")); } @Test @@ -493,13 +488,9 @@ public void commit() { task01.setCommitNeeded(); - try { - taskManager.commitAll(); - fail("should have thrown"); - } catch (final RuntimeException e) { - assertThat(e, instanceOf(RuntimeException.class)); - assertThat(e.getMessage(), equalTo("opsh.")); - } + final RuntimeException thrown = + assertThrows(RuntimeException.class, () -> taskManager.commitAll()); + assertThat(thrown.getMessage(), equalTo("opsh.")); } @Test @@ -741,6 +732,59 @@ public void shouldHaveRemainingPartitionsUncleared() { "detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback.")); } + @Test + public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t2 close exception", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + // The task map orders tasks based on topic group id and partition, so here + // t1 should always be the first. + assertThat(thrown.getMessage(), equalTo("t1 close exception; it means all tasks belonging to this thread should be migrated.")); + } + + @Test + public void shouldThrowRuntimeExceptionWhenEncounteredDuringTaskClose() { + final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { + @Override + public void closeClean() { + throw new TaskMigratedException("t1 close exception", new RuntimeException()); + } + }; + + final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { + @Override + public void closeClean() { + throw new IllegalStateException("t2 illegal state exception", new RuntimeException()); + } + }; + taskManager.tasks().put(taskId01, migratedTask01); + taskManager.tasks().put(taskId02, migratedTask02); + + final RuntimeException thrown = assertThrows(RuntimeException.class, + () -> taskManager.handleAssignment(emptyMap(), emptyMap())); + // The task map orders tasks based on topic group id and partition, so here + // t1 should always be the first. + assertThat(thrown.getMessage(), equalTo("Unexpected failure to close 2 task(s) [[0_1, 0_2]]. " + + "First unexpected exception (for task 0_2) follows.")); + + assertThat(thrown.getCause().getMessage(), equalTo("t2 illegal state exception")); + } + private static void expectRestoreToBeCompleted(final Consumer consumer, final ChangelogReader changeLogReader) { final Set assignment = singleton(new TopicPartition("assignment", 0));