diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 1bcb3d1365df9..1e34835317fb6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -20,8 +20,6 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -41,7 +39,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -64,9 +61,6 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; -import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; @@ -94,7 +88,6 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentMatchers; import org.mockito.MockedConstruction; -import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.opentest4j.AssertionFailedError; @@ -326,53 +319,6 @@ private static MockedConstruction offsetFetchEventM return mockConstruction(OffsetFetchApplicationEvent.class, mockInitializer); } - private static MockedConstruction commitEventMocker(CompletableFuture future) { - Answer getInvocationAnswer = invocation -> { - Timer timer = invocation.getArgument(0); - return ConsumerUtils.getResult(future, timer); - }; - - MockedConstruction.MockInitializer mockInitializer = (mock, ctx) -> { - when(mock.get(any())).thenAnswer(getInvocationAnswer); - when(mock.type()).thenReturn(ApplicationEvent.Type.COMMIT); - when(mock.future()).thenReturn(future); - }; - - return mockConstruction(CommitApplicationEvent.class, mockInitializer); - } - - @Test - public void testAssign() { - final TopicPartition tp = new TopicPartition("foo", 3); - consumer.assign(singleton(tp)); - assertTrue(consumer.subscription().isEmpty()); - assertTrue(consumer.assignment().contains(tp)); - verify(applicationEventHandler).add(any(AssignmentChangeApplicationEvent.class)); - verify(applicationEventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class)); - } - - @Test - public void testAssignOnNullTopicPartition() { - assertThrows(IllegalArgumentException.class, () -> consumer.assign(null)); - } - - @Test - public void testAssignOnEmptyTopicPartition() { - consumer.assign(Collections.emptyList()); - assertTrue(consumer.subscription().isEmpty()); - assertTrue(consumer.assignment().isEmpty()); - } - - @Test - public void testAssignOnNullTopicInPartition() { - assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0)))); - } - - @Test - public void testAssignOnEmptyTopicInPartition() { - assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(" ", 0)))); - } - @Test public void testWakeup_committed() { consumer.wakeup(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java index d87d025728e82..74f439e659b39 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java @@ -52,10 +52,12 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; @@ -138,6 +140,51 @@ public void testInvalidGroupId() { assertThrows(InvalidGroupIdException.class, this::setupWithEmptyGroupId); } + @Test + public void testAssign() { + consumer = setup(); + final TopicPartition tp = new TopicPartition("foo", 3); + consumer.assign(singleton(tp)); + assertTrue(consumer.subscription().isEmpty()); + assertTrue(consumer.assignment().contains(tp)); + verify(applicationEventHandler).add(any(AssignmentChangeApplicationEvent.class)); + verify(applicationEventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class)); + } + + @Test + public void testAssignOnNullTopicPartition() { + consumer = setup(); + assertThrows(IllegalArgumentException.class, () -> consumer.assign(null)); + } + + @Test + public void testAssignOnEmptyTopicPartition() { + consumer = setup(); + + Mockito.doAnswer(invocation -> { + CompletableApplicationEvent event = invocation.getArgument(0); + assertTrue(event instanceof UnsubscribeApplicationEvent); + event.future().complete(null); + return null; + }).when(applicationEventHandler).add(any()); + + consumer.assign(Collections.emptyList()); + assertTrue(consumer.subscription().isEmpty()); + assertTrue(consumer.assignment().isEmpty()); + } + + @Test + public void testAssignOnNullTopicInPartition() { + consumer = setup(); + assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0)))); + } + + @Test + public void testAssignOnEmptyTopicInPartition() { + consumer = setup(); + assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(" ", 0)))); + } + @Test public void testFencedInstanceException() { consumer = setup(); @@ -417,6 +464,7 @@ public void testSubscriptionOnEmptyTopic() { assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } + @Test public void testWakeupBeforeCallingPoll() { consumer = setup();