From fb3ad0bea0d662c5e263fe49d4b8c8c7840aea80 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Thu, 23 May 2024 13:16:04 +0000 Subject: [PATCH] KAFKA-16828: RackAwareTaskAssignorTest failed --- .../assignment/RackAwareTaskAssignor.java | 4 +-- .../assignment/RackAwareTaskAssignorTest.java | 30 ++++++++++--------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index b10b51f7f15b9..7cb70f0bda2aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -139,7 +139,7 @@ public boolean populateTopicsToDescribe(final Set topicsToDescribe, return populateTopicsToDescribe(fullMetadata, changelogPartitionsForTask, partitionsForTask, changelog, topicsToDescribe, racksForPartition); } - public static boolean populateTopicsToDescribe(final Cluster fullMetadata, + public boolean populateTopicsToDescribe(final Cluster fullMetadata, final Map> changelogPartitionsForTask, final Map> partitionsForTask, final boolean changelog, @@ -188,7 +188,7 @@ private boolean validateTopicPartitionRack(final boolean changelogTopics) { * * @return whether the operation successfully completed and the rack information is valid. */ - public static boolean validateTopicPartitionRack(final Cluster fullMetadata, + public boolean validateTopicPartitionRack(final Cluster fullMetadata, final InternalTopicManager internalTopicManager, final Map> changelogPartitionsForTask, final Map> partitionsForTask, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java index 22c771771812d..c770dccbc66a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java @@ -76,6 +76,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; @@ -194,8 +196,8 @@ public void shouldDisableAssignorFromConfig() { // False since partitionWithoutInfo10 is missing in cluster metadata assertFalse(assignor.canEnableRackAwareAssignor()); - verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(false)); - verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(true)); + verify(assignor, never()).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); + verify(assignor, never()).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(true), anySet(), anyMap()); } @Test @@ -213,8 +215,8 @@ public void shouldDisableActiveWhenMissingClusterInfo() { // False since partitionWithoutInfo10 is missing in cluster metadata assertFalse(assignor.canEnableRackAwareAssignor()); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(false)); - verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(true)); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); + verify(assignor, never()).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(true), anySet(), anyMap()); assertFalse(assignor.populateTopicsToDescribe(new HashSet<>(), false)); } @@ -233,8 +235,8 @@ public void shouldDisableActiveWhenRackMissingInNode() { // False since nodeMissingRack has one node which doesn't have rack assertFalse(assignor.canEnableRackAwareAssignor()); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(false)); - verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(true)); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); + verify(assignor, never()).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(true), anySet(), anyMap()); assertFalse(assignor.populateTopicsToDescribe(new HashSet<>(), false)); } @@ -342,12 +344,12 @@ public void shouldEnableRackAwareAssignorWithCacheResult() { // partitionWithoutInfo00 has rackInfo in cluster metadata assertTrue(assignor.canEnableRackAwareAssignor()); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(false)); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); // Should use cache result Mockito.reset(assignor); assertTrue(assignor.canEnableRackAwareAssignor()); - verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(false)); + verify(assignor, never()).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); } @Test @@ -409,8 +411,8 @@ public void shouldEnableRackAwareAssignorWithStandbyDescribingTopics() { )); assertTrue(assignor.canEnableRackAwareAssignor()); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(false)); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(true)); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(true), anySet(), anyMap()); final Map> racksForPartition = assignor.racksForPartition(); final Map> expected = mkMap( @@ -447,8 +449,8 @@ public void shouldDisableRackAwareAssignorWithStandbyDescribingTopicsFailure() { )); assertFalse(assignor.canEnableRackAwareAssignor()); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(false)); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(true)); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(true), anySet(), anyMap()); } @Test @@ -469,8 +471,8 @@ public void shouldDisableRackAwareAssignorWithDescribingTopicsFailure() { )); assertFalse(assignor.canEnableRackAwareAssignor()); - verify(assignor, times(1)).populateTopicsToDescribe(anySet(), eq(false)); - verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(true)); + verify(assignor, times(1)).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(false), anySet(), anyMap()); + verify(assignor, never()).populateTopicsToDescribe(any(), anyMap(), anyMap(), eq(true), anySet(), anyMap()); assertTrue(assignor.populateTopicsToDescribe(new HashSet<>(), false)); }