diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index a0bfedab31868..675c6cf83a644 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -43,11 +43,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Random; +import java.util.Set; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -350,26 +351,22 @@ public boolean isUsable(int brokerId) { } public List chooseRandomUsable(Random random, int numBrokers) { - if (usable.size() < numBrokers) { + if (numBrokers == 0) { + return Collections.emptyList(); + } else if (usable.size() < numBrokers) { throw new InvalidReplicationFactorException("there are only " + usable.size() + " usable brokers"); } - List choices = new ArrayList<>(); - // TODO: rack-awareness - List indexes = new ArrayList<>(); - int initialIndex = random.nextInt(usable.size()); - for (int i = 0; i < numBrokers; i++) { - indexes.add((initialIndex + i) % usable.size()); - } - indexes.sort(Integer::compareTo); - Iterator iter = usable.iterator(); - for (int i = 0; choices.size() < indexes.size(); i++) { - int brokerId = iter.next(); - if (indexes.get(choices.size()) == i) { - choices.add(brokerId); + // TODO: rack awareness? + List results = new ArrayList<>(); + Set found = new HashSet<>(); + do { + int brokerId = usable.pickRandomElement(random); + if (!found.contains(brokerId)) { + results.add(brokerId); + found.add(brokerId); } - } - Collections.shuffle(choices, random); - return choices; + } while (results.size() < numBrokers); + return results; } } diff --git a/metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java index af4c447e091ec..bcd7586f45ce7 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java @@ -245,7 +245,10 @@ String baseToDebugString() { return bld.toString(); } - public T pickRandomElement(Random random) { + T pickRandomEntry(Random random) { + if (size == 0) { + throw new RuntimeException("No elements in map."); + } while (true) { int slot = random.nextInt(elements.length); T value = (T) elements[slot]; diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java index 7430352403f47..7d334cda0a960 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Objects; +import java.util.Random; import java.util.Set; /** @@ -232,6 +233,11 @@ public void clear() { } } + public T pickRandomElement(Random random) { + TimelineHashSetEntry entry = pickRandomEntry(random); + return entry.value; + } + @Override public int hashCode() { int hash = 0; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index eb797aee68a44..5411a3f25ae33 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; @@ -108,7 +109,8 @@ public void testCreateTopics() throws Exception { ControllerTestUtils.replayAll(replicationControl, result2.records()); assertEquals(new PartitionControlInfo(new int[] {1, 2, 0}, new int[] {1, 2, 0}, null, null, 1, 0), - replicationControl.getPartition(Uuid.fromString("5sxVbzQc-jzRErUhNcPf0Q"), 0)); + replicationControl.getPartition( + ((TopicRecord) result2.records().get(0).message()).topicId(), 0)); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java index 295d3ee1144b0..f8e7a85448e23 100644 --- a/metadata/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java +++ b/metadata/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.timeline; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; @@ -130,12 +131,13 @@ public void testPickRandomElement() { Set values = new HashSet( Arrays.asList(12345L, 7890L, 45345L, 2345324L, 35463456L, 467578L, 56785768L)); BaseHashTable table = new BaseHashTable<>(20); + Random random = new Random(); + assertThrows(RuntimeException.class, () -> table.pickRandomEntry(random)); for (long value : values) { assertEquals(null, table.baseAddOrReplace(value)); } - Random random = new Random(); while (!values.isEmpty()) { - long value = table.pickRandomElement(random); + long value = table.pickRandomEntry(random); values.remove(value); } }