Skip to content

Commit

Permalink
Use BaseHashTable#pickRandomEntry in ClusterControlManager#chooseRand…
Browse files Browse the repository at this point in the history
…omUsable
  • Loading branch information
cmccabe committed Jan 16, 2021
1 parent e710ab9 commit 0755178
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -350,26 +351,22 @@ public boolean isUsable(int brokerId) {
}

public List<Integer> chooseRandomUsable(Random random, int numBrokers) {
if (usable.size() < numBrokers) {
if (numBrokers == 0) {
return Collections.emptyList();
} else if (usable.size() < numBrokers) {
throw new InvalidReplicationFactorException("there are only " + usable.size() +
" usable brokers");
}
List<Integer> choices = new ArrayList<>();
// TODO: rack-awareness
List<Integer> indexes = new ArrayList<>();
int initialIndex = random.nextInt(usable.size());
for (int i = 0; i < numBrokers; i++) {
indexes.add((initialIndex + i) % usable.size());
}
indexes.sort(Integer::compareTo);
Iterator<Integer> iter = usable.iterator();
for (int i = 0; choices.size() < indexes.size(); i++) {
int brokerId = iter.next();
if (indexes.get(choices.size()) == i) {
choices.add(brokerId);
// TODO: rack awareness?
List<Integer> results = new ArrayList<>();
Set<Integer> found = new HashSet<>();
do {
int brokerId = usable.pickRandomElement(random);
if (!found.contains(brokerId)) {
results.add(brokerId);
found.add(brokerId);
}
}
Collections.shuffle(choices, random);
return choices;
} while (results.size() < numBrokers);
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ String baseToDebugString() {
return bld.toString();
}

public T pickRandomElement(Random random) {
T pickRandomEntry(Random random) {
if (size == 0) {
throw new RuntimeException("No elements in map.");
}
while (true) {
int slot = random.nextInt(elements.length);
T value = (T) elements[slot];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Random;
import java.util.Set;

/**
Expand Down Expand Up @@ -232,6 +233,11 @@ public void clear() {
}
}

public T pickRandomElement(Random random) {
TimelineHashSetEntry<T> entry = pickRandomEntry(random);
return entry.value;
}

@Override
public int hashCode() {
int hash = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
Expand Down Expand Up @@ -108,7 +109,8 @@ public void testCreateTopics() throws Exception {
ControllerTestUtils.replayAll(replicationControl, result2.records());
assertEquals(new PartitionControlInfo(new int[] {1, 2, 0},
new int[] {1, 2, 0}, null, null, 1, 0),
replicationControl.getPartition(Uuid.fromString("5sxVbzQc-jzRErUhNcPf0Q"), 0));
replicationControl.getPartition(
((TopicRecord) result2.records().get(0).message()).topicId(), 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.timeline;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
Expand Down Expand Up @@ -130,12 +131,13 @@ public void testPickRandomElement() {
Set<Long> values = new HashSet<Long>(
Arrays.asList(12345L, 7890L, 45345L, 2345324L, 35463456L, 467578L, 56785768L));
BaseHashTable<Long> table = new BaseHashTable<>(20);
Random random = new Random();
assertThrows(RuntimeException.class, () -> table.pickRandomEntry(random));
for (long value : values) {
assertEquals(null, table.baseAddOrReplace(value));
}
Random random = new Random();
while (!values.isEmpty()) {
long value = table.pickRandomElement(random);
long value = table.pickRandomEntry(random);
values.remove(value);
}
}
Expand Down

0 comments on commit 0755178

Please sign in to comment.