From a0b716ec9fd518daef4f7164d82ab515b2ad7e64 Mon Sep 17 00:00:00 2001 From: brenden20 <118419078+brenden20@users.noreply.github.com> Date: Thu, 13 Jun 2024 15:35:36 -0500 Subject: [PATCH] KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder (#16140) Completely migrates ConsumerNetworkThreadTest away from ConsumerTestBuilder and removes all usages of spy objects and replaced with mocks. Removes testEnsureMetadataUpdateOnPoll() since it was doing integration testing. Also I adds new tests to get more complete test coverage of ConsumerNetworkThread. Reviewers: Kirk True , Lianet Magrans , Philip Nee , Matthias J. Sax --- .../internals/ConsumerNetworkThreadTest.java | 364 ++++++------------ 1 file changed, 123 insertions(+), 241 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 160825a308808..e02c983ed374e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -16,13 +16,11 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; -import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; @@ -32,17 +30,9 @@ import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent; -import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.message.FindCoordinatorRequestData; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.FindCoordinatorRequest; -import org.apache.kafka.common.requests.FindCoordinatorResponse; -import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.common.requests.OffsetCommitRequest; -import org.apache.kafka.common.requests.OffsetCommitResponse; -import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -50,20 +40,22 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import java.util.Collection; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.List; +import java.util.LinkedList; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Stream; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - - private ConsumerTestBuilder testBuilder; - private Time time; - private ConsumerMetadata metadata; - private NetworkClientDelegate networkClient; - private BlockingQueue applicationEventsQueue; - private ApplicationEventProcessor applicationEventProcessor; - private OffsetsRequestManager offsetsRequestManager; - private CommitRequestManager commitRequestManager; - private CoordinatorRequestManager coordinatorRequestManager; - private ConsumerNetworkThread consumerNetworkThread; - private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); - private MockClient client; - - @BeforeEach - public void setup() { - testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); - time = testBuilder.time; - metadata = testBuilder.metadata; - networkClient = testBuilder.networkClientDelegate; - client = testBuilder.client; - applicationEventsQueue = testBuilder.applicationEventQueue; - applicationEventProcessor = testBuilder.applicationEventProcessor; - commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); - offsetsRequestManager = testBuilder.offsetsRequestManager; - coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); - consumerNetworkThread = new ConsumerNetworkThread( - testBuilder.logContext, + private final Time time; + private final BlockingQueue applicationEventsQueue; + private final ApplicationEventProcessor applicationEventProcessor; + private final OffsetsRequestManager offsetsRequestManager; + private final HeartbeatRequestManager heartbeatRequestManager; + private final CoordinatorRequestManager coordinatorRequestManager; + private final ConsumerNetworkThread consumerNetworkThread; + private final NetworkClientDelegate networkClientDelegate; + private final RequestManagers requestManagers; + private final CompletableEventReaper applicationEventReaper; + + ConsumerNetworkThreadTest() { + this.networkClientDelegate = mock(NetworkClientDelegate.class); + this.requestManagers = mock(RequestManagers.class); + this.offsetsRequestManager = mock(OffsetsRequestManager.class); + this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); + this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); + this.applicationEventProcessor = mock(ApplicationEventProcessor.class); + this.applicationEventReaper = mock(CompletableEventReaper.class); + this.time = new MockTime(); + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + LogContext logContext = new LogContext(); + + this.consumerNetworkThread = new ConsumerNetworkThread( + logContext, time, - testBuilder.applicationEventQueue, + applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, - () -> testBuilder.networkClientDelegate, - () -> testBuilder.requestManagers + () -> networkClientDelegate, + () -> requestManagers ); + } + + @BeforeEach + public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { - if (testBuilder != null) { - testBuilder.close(); - consumerNetworkThread.close(Duration.ZERO); - } + if (consumerNetworkThread != null) + consumerNetworkThread.close(); + } + + @Test + public void testEnsureCloseStopsRunningThread() { + assertTrue(consumerNetworkThread.isRunning(), + "ConsumerNetworkThread should start running when created"); + + consumerNetworkThread.close(); + assertFalse(consumerNetworkThread.isRunning(), + "close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)"); + } + + @ParameterizedTest + @ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1, ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1}) + public void testConsumerNetworkThreadPollTimeComputations(long exampleTime) { + List> list = new ArrayList<>(); + list.add(Optional.of(coordinatorRequestManager)); + list.add(Optional.of(heartbeatRequestManager)); + + when(requestManagers.entries()).thenReturn(list); + + NetworkClientDelegate.PollResult pollResult = new NetworkClientDelegate.PollResult(exampleTime); + NetworkClientDelegate.PollResult pollResult1 = new NetworkClientDelegate.PollResult(exampleTime + 100); + + long t = time.milliseconds(); + when(coordinatorRequestManager.poll(t)).thenReturn(pollResult); + when(coordinatorRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime); + when(heartbeatRequestManager.poll(t)).thenReturn(pollResult1); + when(heartbeatRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime + 100); + when(networkClientDelegate.addAll(pollResult)).thenReturn(pollResult.timeUntilNextPollMs); + when(networkClientDelegate.addAll(pollResult1)).thenReturn(pollResult1.timeUntilNextPollMs); + consumerNetworkThread.runOnce(); + + verify(networkClientDelegate).poll(Math.min(exampleTime, ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS), time.milliseconds()); + assertEquals(consumerNetworkThread.maximumTimeToWait(), exampleTime); } @Test public void testStartupAndTearDown() throws InterruptedException { - // The consumer is closed in ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder.close() - // which is called from tearDown(). consumerNetworkThread.start(); - TestCondition isStarted = () -> consumerNetworkThread.isRunning(); + TestCondition isStarted = consumerNetworkThread::isRunning; TestCondition isClosed = () -> !(consumerNetworkThread.isRunning() || consumerNetworkThread.isAlive()); // There's a nonzero amount of time between starting the thread and having it @@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test - public void testApplicationEvent() { - ApplicationEvent e = new PollEvent(100); - applicationEventsQueue.add(e); + public void testRequestsTransferFromManagersToClientOnThreadRun() { + List> list = new ArrayList<>(); + list.add(Optional.of(coordinatorRequestManager)); + list.add(Optional.of(heartbeatRequestManager)); + list.add(Optional.of(offsetsRequestManager)); + + when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); - verify(applicationEventProcessor, times(1)).process(e); + requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong()))); + requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong()))); + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); + verify(networkClientDelegate).poll(anyLong(), anyLong()); } - @Test - public void testMetadataUpdateEvent() { - ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); + @ParameterizedTest + @MethodSource("applicationEvents") + public void testApplicationEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); - verify(metadata).requestUpdateForNewTopics(); - } - @Test - public void testAsyncCommitEvent() { - ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); - applicationEventsQueue.add(e); - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); - } + if (e instanceof CompletableEvent) + verify(applicationEventReaper).add((CompletableEvent) e); - @Test - public void testSyncCommitEvent() { - ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100)); - applicationEventsQueue.add(e); - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); + verify(applicationEventProcessor).process(any(e.getClass())); + assertTrue(applicationEventsQueue.isEmpty()); } @ParameterizedTest @@ -190,15 +208,6 @@ public void testListOffsetsEventIsProcessed(boolean requireTimestamp) { assertTrue(applicationEventsQueue.isEmpty()); } - @Test - public void testResetPositionsEventIsProcessed() { - ResetPositionsEvent e = new ResetPositionsEvent(calculateDeadlineMs(time, 100)); - applicationEventsQueue.add(e); - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ResetPositionsEvent.class)); - assertTrue(applicationEventsQueue.isEmpty()); - } - @Test public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); @@ -211,178 +220,51 @@ public void testResetPositionsProcessFailureIsIgnored() { } @Test - public void testValidatePositionsEventIsProcessed() { - ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); - applicationEventsQueue.add(e); - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); - assertTrue(applicationEventsQueue.isEmpty()); - } - - @Test - public void testAssignmentChangeEvent() { - HashMap offset = mockTopicPartitionOffset(); - - final long currentTimeMs = time.milliseconds(); - ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); - applicationEventsQueue.add(e); - - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); - verify(networkClient, times(1)).poll(anyLong(), anyLong()); - verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); - // Assignment change should generate an async commit (not retried). - verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); - } - - @Test - void testFetchTopicMetadata() { - applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); - consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); - } - - @Test - void testMaximumTimeToWait() { + public void testMaximumTimeToWait() { + final int defaultHeartbeatIntervalMs = 1000; // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); - consumerNetworkThread.runOnce(); - // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager - assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); - } - @Test - void testRequestManagersArePolledOnce() { - consumerNetworkThread.runOnce(); - testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong()))); - testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong()))); - verify(networkClient, times(1)).poll(anyLong(), anyLong()); - } + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) defaultHeartbeatIntervalMs); - @Test - void testEnsureMetadataUpdateOnPoll() { - MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); - client.prepareMetadataUpdate(metadataResponse); - metadata.requestUpdate(false); consumerNetworkThread.runOnce(); - verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); - } - - @Test - void testEnsureEventsAreCompleted() { - // Mimic the logic of CompletableEventReaper.reap(Collection): - doAnswer(__ -> { - Iterator i = applicationEventsQueue.iterator(); - - while (i.hasNext()) { - ApplicationEvent event = i.next(); - - if (event instanceof CompletableEvent) - ((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); - - i.remove(); - } - - return null; - }).when(applicationEventReaper).reap(any(Collection.class)); - - Node node = metadata.fetch().nodes().get(0); - coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); - CompletableApplicationEvent event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); - ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap()); - CompletableFuture future = new CompletableFuture<>(); - when(event1.future()).thenReturn(future); - applicationEventsQueue.add(event1); - applicationEventsQueue.add(event2); - assertFalse(future.isDone()); - assertFalse(applicationEventsQueue.isEmpty()); - consumerNetworkThread.cleanup(); - assertTrue(future.isCompletedExceptionally()); - assertTrue(applicationEventsQueue.isEmpty()); + // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager + assertEquals(defaultHeartbeatIntervalMs, consumerNetworkThread.maximumTimeToWait()); } @Test - void testCleanupInvokesReaper() { + public void testCleanupInvokesReaper() { + LinkedList queue = new LinkedList<>(); + when(networkClientDelegate.unsentRequests()).thenReturn(queue); consumerNetworkThread.cleanup(); verify(applicationEventReaper).reap(applicationEventsQueue); } @Test - void testRunOnceInvokesReaper() { + public void testRunOnceInvokesReaper() { consumerNetworkThread.runOnce(); verify(applicationEventReaper).reap(any(Long.class)); } @Test - void testSendUnsentRequest() { - String groupId = "group-id"; - NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( - new FindCoordinatorRequest.Builder( - new FindCoordinatorRequestData() - .setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) - .setKey(groupId)), - Optional.empty()); - - networkClient.add(request); - assertTrue(networkClient.hasAnyPendingRequests()); - assertFalse(networkClient.unsentRequests().isEmpty()); - assertFalse(client.hasInFlightRequests()); + public void testSendUnsentRequests() { + when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false); consumerNetworkThread.cleanup(); - - assertTrue(networkClient.unsentRequests().isEmpty()); - assertFalse(client.hasInFlightRequests()); - assertFalse(networkClient.hasAnyPendingRequests()); - } - - private void prepareOffsetCommitRequest(final Map expectedOffsets, - final Errors error, - final boolean disconnected) { - Map errors = partitionErrors(expectedOffsets.keySet(), error); - client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected); - } - - private Map partitionErrors(final Collection partitions, - final Errors error) { - final Map errors = new HashMap<>(); - for (TopicPartition partition : partitions) { - errors.put(partition, error); - } - return errors; - } - - private OffsetCommitResponse offsetCommitResponse(final Map responseData) { - return new OffsetCommitResponse(responseData); - } - - private MockClient.RequestMatcher offsetCommitRequestMatcher(final Map expectedOffsets) { - return body -> { - OffsetCommitRequest req = (OffsetCommitRequest) body; - Map offsets = req.offsets(); - if (offsets.size() != expectedOffsets.size()) - return false; - - for (Map.Entry expectedOffset : expectedOffsets.entrySet()) { - if (!offsets.containsKey(expectedOffset.getKey())) { - return false; - } else { - Long actualOffset = offsets.get(expectedOffset.getKey()); - if (!actualOffset.equals(expectedOffset.getValue())) { - return false; - } - } - } - return true; - }; + verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong()); } - private HashMap mockTopicPartitionOffset() { - final TopicPartition t0 = new TopicPartition("t0", 2); - final TopicPartition t1 = new TopicPartition("t0", 3); - HashMap topicPartitionOffsets = new HashMap<>(); - topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L)); - topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L)); - return topicPartitionOffsets; + private static Stream applicationEvents() { + Map offset = new HashMap<>(); + final long currentTimeMs = 12345; + return Stream.of( + Arguments.of(new PollEvent(100)), + Arguments.of(new NewTopicsMetadataUpdateRequestEvent()), + Arguments.of(new AsyncCommitEvent(new HashMap<>())), + Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)), + Arguments.of(new ResetPositionsEvent(500)), + Arguments.of(new ValidatePositionsEvent(500)), + Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)), + Arguments.of(new AssignmentChangeEvent(offset, currentTimeMs))); } }