Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder #16140

Merged
merged 75 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
f82d15c
Migrated ConsumerTestBuilder
brenden20 May 24, 2024
c7a479e
Removed changes to OffsetFetchRequestState
brenden20 May 24, 2024
1fcf205
Added test for RMs
brenden20 May 24, 2024
5271bb5
Cleaning up comments
brenden20 May 28, 2024
3f037e5
Added test for wait time computation
brenden20 May 28, 2024
4a85031
Test removal
brenden20 May 28, 2024
a10a961
Changed variables names
brenden20 May 28, 2024
a1a4de0
Removed vars and methods with no usage
brenden20 May 28, 2024
372eea6
Removed unused imports
brenden20 May 28, 2024
43be21e
Updating comments
brenden20 May 28, 2024
ce3d5c5
Comments
brenden20 May 28, 2024
ad9d38b
Merge branch 'apache:trunk' into 16001
brenden20 May 28, 2024
1fccaa8
Merge branch 'apache:trunk' into 16001
brenden20 May 29, 2024
8a785e1
Updated constructor
brenden20 May 29, 2024
70ad457
Added tests back
brenden20 May 29, 2024
94f9bcc
Minor updates to tests
brenden20 May 29, 2024
99c4e15
Constructor and test updates
brenden20 May 29, 2024
9aa6e53
Updated comment
brenden20 May 29, 2024
0a502aa
Merge branch '16001' of https://github.com/brenden20/kafka into 16001
brenden20 May 29, 2024
a587124
Remove comment
brenden20 May 29, 2024
a95e0c7
Added testEnsureCloseStopsRunningThread(), updated others
brenden20 May 29, 2024
da90e1b
Cleaning up warnings
brenden20 May 29, 2024
8c9ad01
Updated testAssignmentChangeEvent()
brenden20 May 29, 2024
0d2b957
Removed testEnsureMetadataUpdateOnPoll()
brenden20 May 29, 2024
5897bed
Changed MockClient
brenden20 May 30, 2024
2402e6f
Moved all changes into original file
brenden20 May 30, 2024
91d5266
Comment removal
brenden20 May 30, 2024
88950a3
Cleaned up testConsumerNetworkThreadWaitTimeComputations
brenden20 May 30, 2024
7a6593d
Cleaned up testMaximumTimeToWait()
brenden20 May 30, 2024
15d35fc
Cleaned imports and comment removal
brenden20 May 30, 2024
10df9a4
Fix whitespace
brenden20 May 30, 2024
ed3b0d0
Merge branch 'apache:trunk' into 16001
brenden20 May 31, 2024
dcc892b
Implementing suggestions from PR
brenden20 Jun 3, 2024
255f4dd
Add comment
brenden20 Jun 3, 2024
848ab9b
Todo comment
brenden20 Jun 3, 2024
524967e
Updated testConsumerNetworkThreadWaitTimeComputations()
brenden20 Jun 3, 2024
ac2bb3c
Added new test
brenden20 Jun 3, 2024
4191c2d
Updated testConsumerNetworkThreadWaitTimeComputations()
brenden20 Jun 3, 2024
cf1b5e7
Merge branch 'apache:trunk' into 16001
brenden20 Jun 3, 2024
5fb7304
Revert "Merge branch 'apache:trunk' into 16001"
brenden20 Jun 3, 2024
879e066
Update ConsumerNetworkThread.java
brenden20 Jun 4, 2024
2435d82
testRequestManagersArePolledOnce()Updated
brenden20 Jun 4, 2024
8bac05a
Small test updates
brenden20 Jun 4, 2024
41b78e8
Reapply "Merge branch 'apache:trunk' into 16001"
brenden20 Jun 4, 2024
ee2cd44
Add comment
brenden20 Jun 4, 2024
fabd7bd
Implementing suggestions
brenden20 Jun 5, 2024
574fc13
Updated test name
brenden20 Jun 5, 2024
a695958
Condensed multiple tests into 1
brenden20 Jun 5, 2024
d443311
Fixing conflict
brenden20 Jun 5, 2024
907676a
Revert "Fixing conflict"
brenden20 Jun 5, 2024
5445ddb
Merge branch 'trunk' into 16001
brenden20 Jun 5, 2024
d25fcb3
Updated new test to work with current file
brenden20 Jun 5, 2024
0fe18c6
Implementing PR suggestions
brenden20 Jun 6, 2024
51c7658
Updated constructor to look a little better
brenden20 Jun 6, 2024
a87f461
Updated testApplicationEventIsProcessed()
brenden20 Jun 6, 2024
7605928
Fixed checkstyle violation
brenden20 Jun 6, 2024
65aa03b
Fixed reminder comment
brenden20 Jun 6, 2024
b001ead
Update test scope and testSendUnsentRequests()
brenden20 Jun 6, 2024
23caab8
Cleaned up unused imports and unused methods
brenden20 Jun 6, 2024
b3036c2
Merge branch 'trunk' into 16001
brenden20 Jun 7, 2024
1fe289f
Update testSendUnsentRequests()
brenden20 Jun 7, 2024
4718fc9
Removed unused import
brenden20 Jun 7, 2024
6dd9de6
Fixed some merge issues
brenden20 Jun 7, 2024
41750b7
Implementing suggestions
brenden20 Jun 7, 2024
7c0aa57
Implement suggestion
brenden20 Jun 7, 2024
0236a18
Updated testConsumerNetworkThreadPollTimeComputations()
brenden20 Jun 7, 2024
41948b7
Fixed error
brenden20 Jun 7, 2024
609fd0c
Merge branch 'apache:trunk' into 16001
brenden20 Jun 10, 2024
61f673f
Implemented suggestion
brenden20 Jun 10, 2024
dcf2dab
Merge branch '16001' of https://github.com/brenden20/kafka into 16001
brenden20 Jun 10, 2024
9bcc67d
Merge branch 'apache:trunk' into 16001
brenden20 Jun 10, 2024
beaa017
Merge branch 'apache:trunk' into 16001
brenden20 Jun 11, 2024
aac3cd9
Merge branch 'apache:trunk' into 16001
brenden20 Jun 11, 2024
4aad5c6
Merge branch 'apache:trunk' into 16001
brenden20 Jun 12, 2024
8b0807a
Merge branch 'apache:trunk' into 16001
brenden20 Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1290,4 +1290,4 @@ static class MemberInfo {
this.memberEpoch = Optional.empty();
}
}
}
}
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,19 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.*;
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
Expand All @@ -53,89 +39,111 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.*;
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;

import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.junit.jupiter.api.Assertions.*;
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
brenden20 marked this conversation as resolved.
Show resolved Hide resolved

public class ConsumerNetworkThreadTest {
static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;

private final Time time;
private final ConsumerMetadata metadata;
private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
private final ApplicationEventProcessor applicationEventProcessor;
private final OffsetsRequestManager offsetsRequestManager;
private final HeartbeatRequestManager heartbeatRequestManager;
private final CoordinatorRequestManager coordinatorRequestManager;
private final ConsumerNetworkThread consumerNetworkThread;
private final MockClient client;
private final NetworkClientDelegate networkClientDelegate;
private final NetworkClientDelegate networkClient;
private final RequestManagers requestManagers;
private final CompletableEventReaper applicationEventReaper;

ConsumerNetworkThreadTest() {
LogContext logContext = new LogContext();
ConsumerConfig config = mock(ConsumerConfig.class);
this.time = new MockTime();
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
this.networkClientDelegate = mock(NetworkClientDelegate.class);
this.requestManagers = mock(RequestManagers.class);
this.offsetsRequestManager = mock(OffsetsRequestManager.class);
this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
this.applicationEventsQueue = new LinkedBlockingQueue<>();
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
this.metadata = mock(ConsumerMetadata.class);
this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
this.applicationEventReaper = mock(CompletableEventReaper.class);
this.client = new MockClient(time);

this.networkClient = new NetworkClientDelegate(
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
time,
config,
logContext,
client
);

private ConsumerTestBuilder testBuilder;
private Time time;
private ConsumerMetadata metadata;
private NetworkClientDelegate networkClient;
private BlockingQueue<ApplicationEvent> applicationEventsQueue;
private ApplicationEventProcessor applicationEventProcessor;
private OffsetsRequestManager offsetsRequestManager;
private CommitRequestManager commitRequestManager;
private CoordinatorRequestManager coordinatorRequestManager;
private ConsumerNetworkThread consumerNetworkThread;
private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class);
private MockClient client;

@BeforeEach
public void setup() {
testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
time = testBuilder.time;
metadata = testBuilder.metadata;
networkClient = testBuilder.networkClientDelegate;
client = testBuilder.client;
applicationEventsQueue = testBuilder.applicationEventQueue;
applicationEventProcessor = testBuilder.applicationEventProcessor;
commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
offsetsRequestManager = testBuilder.offsetsRequestManager;
coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
consumerNetworkThread = new ConsumerNetworkThread(
testBuilder.logContext,
this.consumerNetworkThread = new ConsumerNetworkThread(
logContext,
time,
testBuilder.applicationEventQueue,
applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
() -> testBuilder.networkClientDelegate,
() -> testBuilder.requestManagers
() -> networkClientDelegate,
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
() -> requestManagers
);
}

@BeforeEach
public void setup() {
consumerNetworkThread.initializeResources();
}

@AfterEach
public void tearDown() {
if (testBuilder != null) {
testBuilder.close();
consumerNetworkThread.close(Duration.ZERO);
}
if (consumerNetworkThread != null)
consumerNetworkThread.close();
}

@Test
public void testEnsureCloseStopsRunningThread() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is one of the newly added tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

// consumerNetworkThread.running is set to true in the constructor
assertTrue(consumerNetworkThread.isRunning());

// close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
consumerNetworkThread.close();
assertFalse(consumerNetworkThread.isRunning());
}

@ParameterizedTest
@ValueSource(longs = {1, 100, 1000, 4999, 5001})
public void testConsumerNetworkThreadWaitTimeComputations(long exampleTime) {
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
List<Optional<? extends RequestManager>> requestManagersList = new ArrayList<>();
requestManagersList.add(Optional.of(coordinatorRequestManager));
when(requestManagers.entries()).thenReturn(requestManagersList);
brenden20 marked this conversation as resolved.
Show resolved Hide resolved

NetworkClientDelegate.PollResult pollResult = new NetworkClientDelegate.PollResult(exampleTime);

when(coordinatorRequestManager.poll(anyLong())).thenReturn(pollResult);
when(coordinatorRequestManager.maximumTimeToWait(anyLong())).thenReturn(exampleTime);
when(networkClientDelegate.addAll(pollResult)).thenReturn(pollResult.timeUntilNextPollMs);
consumerNetworkThread.runOnce();

verify(networkClientDelegate).poll((exampleTime < 5001 ? exampleTime : 5000), time.milliseconds());
assertEquals(consumerNetworkThread.maximumTimeToWait(), exampleTime);
}

@Test
public void testStartupAndTearDown() throws InterruptedException {
// The consumer is closed in ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder.close()
// which is called from tearDown().
consumerNetworkThread.start();
TestCondition isStarted = () -> consumerNetworkThread.isRunning();
TestCondition isStarted = consumerNetworkThread::isRunning;
TestCondition isClosed = () -> !(consumerNetworkThread.isRunning() || consumerNetworkThread.isAlive());

// There's a nonzero amount of time between starting the thread and having it
Expand All @@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws InterruptedException {
"The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms");
}

@Test
void testRequestManagersArePolledOnce() {
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
consumerNetworkThread.runOnce();
requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong())));
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong())));
verify(networkClientDelegate).poll(anyLong(), anyLong());
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testApplicationEvent() {
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this test is rewritten as testApplicationEventIsProcessed further below? And testRequestsTransferFromManagersToClientOnThreadRun is another newly added test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this test as well as many of the other application event tests were rewritten into a single parameterized test. It gets the parameters from applicationEvents(), where it supplies a stream of different kinds of events. This was done to condense the code a bit.

And testRequestsTransferFromManagersToClientOnThreadRun() is a new test

ApplicationEvent e = new PollEvent(100);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
verify(applicationEventProcessor, times(1)).process(e);
verify(applicationEventProcessor).process(e);
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testMetadataUpdateEvent() {
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this test was effectively removed? Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to previous comment for all event tests that were removed

ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
verify(metadata).requestUpdateForNewTopics();
verify(applicationEventProcessor).process(e);
}

@Test
Expand Down Expand Up @@ -207,7 +223,7 @@ public void testResetPositionsProcessFailureIsIgnored() {

ResetPositionsEvent event = new ResetPositionsEvent(calculateDeadlineMs(time, 100));
applicationEventsQueue.add(event);
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
assertDoesNotThrow(consumerNetworkThread::runOnce);
brenden20 marked this conversation as resolved.
Show resolved Hide resolved

verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
}
Expand All @@ -231,10 +247,7 @@ public void testAssignmentChangeEvent() {

consumerNetworkThread.runOnce();
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
verify(networkClient, times(1)).poll(anyLong(), anyLong());
verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs);
// Assignment change should generate an async commit (not retried).
verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
verify(networkClientDelegate, times(1)).poll(anyLong(), anyLong());
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
Expand Down Expand Up @@ -268,32 +281,31 @@ void testPollResultTimer() {

@Test
void testMaximumTimeToWait() {
List<Optional<? extends RequestManager>> list = new ArrayList<>();
list.add(Optional.of(heartbeatRequestManager));
// Initial value before runOnce has been called
assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait());

when(requestManagers.entries()).thenReturn(list);
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS);

consumerNetworkThread.runOnce();
// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait());
}

@Test
void testRequestManagersArePolledOnce() {
consumerNetworkThread.runOnce();
testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong())));
testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong())));
verify(networkClient, times(1)).poll(anyLong(), anyLong());
}
void testEnsureEventsAreCompleted() {
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
Cluster cluster = mock(Cluster.class);
when(metadata.fetch()).thenReturn(cluster);

@Test
void testEnsureMetadataUpdateOnPoll() {
MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
client.prepareMetadataUpdate(metadataResponse);
metadata.requestUpdate(false);
consumerNetworkThread.runOnce();
verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong());
}
List<Node> list = new ArrayList<>();
list.add(new Node(0, "host", 0));
when(cluster.nodes()).thenReturn(list);
brenden20 marked this conversation as resolved.
Show resolved Hide resolved

Queue<NetworkClientDelegate.UnsentRequest> queue = new LinkedList<>();
when(networkClientDelegate.unsentRequests()).thenReturn(queue);

@Test
void testEnsureEventsAreCompleted() {
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
// Mimic the logic of CompletableEventReaper.reap(Collection):
doAnswer(__ -> {
Iterator<ApplicationEvent> i = applicationEventsQueue.iterator();
Expand All @@ -314,7 +326,7 @@ void testEnsureEventsAreCompleted() {
coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds());
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node));
prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
CompletableApplicationEvent<Void> event1 = spy(new AsyncCommitEvent(Collections.emptyMap()));
CompletableApplicationEvent<Void> event1 = mock(AsyncCommitEvent.class);
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
CompletableFuture<Void> future = new CompletableFuture<>();
when(event1.future()).thenReturn(future);
Expand All @@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() {
assertTrue(applicationEventsQueue.isEmpty());
}

// Look into this one
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
@Test
void testCleanupInvokesReaper() {
Queue<NetworkClientDelegate.UnsentRequest> queue = new LinkedList<>();
when(networkClientDelegate.unsentRequests()).thenReturn(queue);
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
consumerNetworkThread.cleanup();
verify(applicationEventReaper).reap(applicationEventsQueue);
}
Expand All @@ -339,6 +354,15 @@ void testRunOnceInvokesReaper() {
verify(applicationEventReaper).reap(any(Long.class));
}

private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
brenden20 marked this conversation as resolved.
Show resolved Hide resolved
topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
return topicPartitionOffsets;
}

private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets,
final Errors error,
final boolean disconnected) {
Expand Down Expand Up @@ -379,13 +403,4 @@ private MockClient.RequestMatcher offsetCommitRequestMatcher(final Map<TopicPart
return true;
};
}

private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
return topicPartitionOffsets;
}
}
}
brenden20 marked this conversation as resolved.
Show resolved Hide resolved