Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize dispatcher for large partitions #599

Merged
merged 3 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions app/src/main/java/org/astraea/app/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.astraea.app.admin;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -81,7 +82,7 @@ static ClusterInfo of(org.apache.kafka.common.Cluster cluster) {
e ->
e.getValue().stream()
.filter(ReplicaInfo::isLeader)
.collect(Collectors.toUnmodifiableList())));
.collect(Collectors.toCollection(ArrayList::new))));
return new ClusterInfo() {
@Override
public List<NodeInfo> nodes() {
Expand All @@ -100,7 +101,7 @@ public Set<String> topics() {

@Override
public List<ReplicaInfo> availableReplicaLeaders(String topic) {
return availableReplicaLeaders.getOrDefault(topic, List.of());
return availableReplicaLeaders.getOrDefault(topic, new ArrayList<>(0));
}

@Override
Expand Down
9 changes: 5 additions & 4 deletions app/src/main/java/org/astraea/app/partitioner/RoundRobin.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,31 @@ static <E> RoundRobin<E> smooth(Map<E, Double> scores) {
class SmoothRoundRobin<E> implements RoundRobin<E> {

private final Map<E, Double> effectiveScores;

private final double sum;
private volatile Map<E, Double> currentScores;

private SmoothRoundRobin(Map<E, Double> scores) {
this.effectiveScores = Collections.unmodifiableMap(scores);
this.currentScores =
scores.keySet().stream()
.collect(Collectors.toUnmodifiableMap(Function.identity(), ignored -> 0D));
this.sum = effectiveScores.values().stream().mapToDouble(d -> d).sum();
}

@Override
public Optional<E> next(Set<E> availableTargets) {
// no data no answer
if (effectiveScores.isEmpty() || availableTargets.isEmpty()) return Optional.empty();

// 1) calculate the sum of all effective scores
var sum = effectiveScores.values().stream().mapToDouble(d -> d).sum();
// 2) add effective score to each current score
// 1) add effective score to each current score
var nextScores =
currentScores.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> effectiveScores.getOrDefault(e.getKey(), 0D) + e.getValue()));
// 3) get the E which has max value
// 2) get the E which has max value
var maxObj =
nextScores.entrySet().stream()
.filter(e -> availableTargets.contains(e.getKey()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
import org.astraea.app.admin.ReplicaInfo;
import org.astraea.app.argument.DurationField;
import org.astraea.app.common.Utils;
import org.astraea.app.cost.CostFunction;
Expand All @@ -53,6 +54,8 @@
* `org.astraea.cost.ThroughputCost=1,org.astraea.cost.broker.BrokerOutputCost=1`.
*/
public class StrictCostDispatcher implements Dispatcher {
static final int ROUND_ROBIN_LENGTH = 400;

public static final String JMX_PORT = "jmx.port";
public static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease";

Expand All @@ -71,7 +74,10 @@ public class StrictCostDispatcher implements Dispatcher {

final Map<Integer, Receiver> receivers = new TreeMap<>();

volatile RoundRobin<Integer> roundRobin;
final int[] roundRobin = new int[ROUND_ROBIN_LENGTH];

final AtomicInteger next = new AtomicInteger(0);

volatile long timeToUpdateRoundRobin = -1;

// visible for testing
Expand All @@ -86,15 +92,14 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster
if (partitionLeaders.isEmpty()) return 0;

// just return the only one available partition
if (partitionLeaders.size() == 1) return partitionLeaders.iterator().next().partition();
if (partitionLeaders.size() == 1) return partitionLeaders.get(0).partition();

// add new receivers for new brokers
receivers.putAll(
fetcher
.map(
fetcher ->
partitionLeaders.stream()
.map(ReplicaInfo::nodeInfo)
clusterInfo.nodes().stream()
.filter(nodeInfo -> !receivers.containsKey(nodeInfo.id()))
.distinct()
.filter(nodeInfo -> jmxPortGetter.apply(nodeInfo.id()).isPresent())
Expand All @@ -110,27 +115,35 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster

tryToUpdateRoundRobin(clusterInfo);

return roundRobin
.next(partitionLeaders.stream().map(r -> r.nodeInfo().id()).collect(Collectors.toSet()))
.flatMap(
brokerId ->
// TODO: which partition is better when all of them are in same node?
partitionLeaders.stream()
.filter(r -> r.nodeInfo().id() == brokerId)
.map(ReplicaInfo::partition)
.findAny())
.orElse(partitionLeaders.get((int) (Math.random() * partitionLeaders.size())).partition());
var target =
roundRobin[
next.getAndUpdate(previous -> previous >= roundRobin.length - 1 ? 0 : previous + 1)];

// TODO: if the topic partitions are existent in fewer brokers, the target gets -1 in most cases
var candidate =
target < 0
? partitionLeaders
: partitionLeaders.stream()
.filter(r -> r.nodeInfo().id() == target)
.collect(Collectors.toUnmodifiableList());
candidate = candidate.isEmpty() ? partitionLeaders : candidate;
return candidate.get((int) (Math.random() * candidate.size())).partition();
}

void tryToUpdateRoundRobin(ClusterInfo clusterInfo) {
if (roundRobin == null || System.currentTimeMillis() >= timeToUpdateRoundRobin) {
roundRobin =
synchronized void tryToUpdateRoundRobin(ClusterInfo clusterInfo) {
if (System.currentTimeMillis() >= timeToUpdateRoundRobin) {
var roundRobin =
newRoundRobin(
functions,
clusterInfo,
ClusterBean.of(
receivers.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().current()))));
var ids =
clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet());
// TODO: make ROUND_ROBIN_LENGTH configurable ???
IntStream.range(0, ROUND_ROBIN_LENGTH)
.forEach(index -> this.roundRobin[index] = roundRobin.next(ids).orElse(-1));
timeToUpdateRoundRobin = System.currentTimeMillis() + roundRobinLease.toMillis();
}
}
Expand Down Expand Up @@ -164,13 +177,17 @@ static RoundRobin<Integer> newRoundRobin(
costFunctions.entrySet().stream()
.flatMap(
functionWeight ->
((HasBrokerCost) functionWeight.getKey())
.brokerCost(clusterInfo, clusterBean).value().entrySet().stream()
.map(
idAndCost ->
Map.entry(
idAndCost.getKey(),
idAndCost.getValue() * functionWeight.getValue())))
functionWeight
.getKey()
.brokerCost(clusterInfo, clusterBean)
.value()
.entrySet()
.stream()
.map(
idAndCost ->
Map.entry(
idAndCost.getKey(),
idAndCost.getValue() * functionWeight.getValue())))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, Double::sum, HashMap::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package org.astraea.app.partitioner;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
Expand Down Expand Up @@ -174,7 +178,6 @@ void testCostFunctionWithoutFetcher() {
dispatcher.configure(
Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));
dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo);
Assertions.assertNotNull(dispatcher.roundRobin);
Assertions.assertEquals(0, dispatcher.receivers.size());
}
}
Expand Down Expand Up @@ -210,9 +213,12 @@ Receiver receiver(String host, int port, Fetcher fetcher) {
var replicaInfo1 = ReplicaInfo.of("topic", 1, NodeInfo.of(10, "host", 11111), true, true, true);
var replicaInfo2 =
ReplicaInfo.of("topic", 1, NodeInfo.of(11, "host2", 11111), true, true, true);
var rs = List.of(replicaInfo0, replicaInfo1, replicaInfo2);
var clusterInfo = Mockito.mock(ClusterInfo.class);
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString()))
.thenReturn(List.of(replicaInfo0, replicaInfo1, replicaInfo2));
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())).thenReturn(rs);
Mockito.when(clusterInfo.nodes())
.thenReturn(
rs.stream().map(ReplicaInfo::nodeInfo).collect(Collectors.toUnmodifiableList()));
// there is one local receiver by default
Assertions.assertEquals(1, dispatcher.receivers.size());
Assertions.assertEquals(-1, dispatcher.receivers.keySet().iterator().next());
Expand All @@ -230,58 +236,65 @@ Receiver receiver(String host, int port, Fetcher fetcher) {

@Test
void testEmptyJmxPort() {
var dispatcher = new StrictCostDispatcher();
try (var dispatcher = new StrictCostDispatcher()) {

// pass due to local mbean
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));
// pass due to local mbean
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));

// pass due to default port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.of(111), Map.of(), Duration.ofSeconds(10));
// pass due to default port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.of(111), Map.of(), Duration.ofSeconds(10));

// pass due to specify port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D),
Optional.empty(),
Map.of(222, 111),
Duration.ofSeconds(10));
// pass due to specify port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D),
Optional.empty(),
Map.of(222, 111),
Duration.ofSeconds(10));
}
}

@Test
void testReturnedPartition() {
var brokerId = 22;
var partitionId = 123;
var dispatcher = new StrictCostDispatcher();
var costFunction =
new HasBrokerCost() {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> Map.of(brokerId, 10D);
}
};
dispatcher.configure(
Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));

var replicaInfo0 =
ReplicaInfo.of(
"topic", partitionId, NodeInfo.of(brokerId, "host", 11111), true, true, true);
var replicaInfo1 =
ReplicaInfo.of("topic", 1, NodeInfo.of(1111, "host2", 11111), true, true, true);
var clusterInfo = Mockito.mock(ClusterInfo.class);
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString()))
.thenReturn(List.of(replicaInfo0, replicaInfo1));
try (var dispatcher = new StrictCostDispatcher()) {
var costFunction =
new HasBrokerCost() {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> Map.of(brokerId, 10D);
}
};
dispatcher.configure(
Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));

Assertions.assertEquals(
partitionId, dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo));
var replicaInfo0 =
ReplicaInfo.of(
"topic", partitionId, NodeInfo.of(brokerId, "host", 11111), true, true, true);
var replicaInfo1 =
ReplicaInfo.of("topic", 1, NodeInfo.of(1111, "host2", 11111), true, true, true);
var clusterInfo = Mockito.mock(ClusterInfo.class);
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString()))
.thenReturn(List.of(replicaInfo0, replicaInfo1));
Mockito.when(clusterInfo.nodes())
.thenReturn(
Stream.of(replicaInfo0, replicaInfo1)
.map(ReplicaInfo::nodeInfo)
.collect(Collectors.toUnmodifiableList()));
Assertions.assertEquals(
partitionId, dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo));
}
}

@Test
void testDefaultFunction() {
var dispatcher = new StrictCostDispatcher();
dispatcher.configure(Configuration.of(Map.of()));
Assertions.assertEquals(1, dispatcher.functions.size());
Assertions.assertEquals(1, dispatcher.receivers.size());
try (var dispatcher = new StrictCostDispatcher()) {
dispatcher.configure(Configuration.of(Map.of()));
Assertions.assertEquals(1, dispatcher.functions.size());
Assertions.assertEquals(1, dispatcher.receivers.size());
}
}

@Test
Expand All @@ -293,22 +306,26 @@ void testCostToScore() {

@Test
void testRoundRobinLease() {
var dispatcher = new StrictCostDispatcher();
dispatcher.configure(
Configuration.of(Map.of(StrictCostDispatcher.ROUND_ROBIN_LEASE_KEY, "2s")));
Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinLease);
try (var dispatcher = new StrictCostDispatcher()) {

var clusterInfo = Mockito.mock(ClusterInfo.class);
dispatcher.tryToUpdateRoundRobin(clusterInfo);
var rr = dispatcher.roundRobin;
Assertions.assertNotNull(rr);
// the rr is not updated yet
dispatcher.tryToUpdateRoundRobin(clusterInfo);
Assertions.assertEquals(rr, dispatcher.roundRobin);
dispatcher.configure(
Configuration.of(Map.of(StrictCostDispatcher.ROUND_ROBIN_LEASE_KEY, "2s")));
Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinLease);

Utils.sleep(Duration.ofSeconds(3));
dispatcher.tryToUpdateRoundRobin(clusterInfo);
// rr is updated already
Assertions.assertNotEquals(rr, dispatcher.roundRobin);
var clusterInfo = Mockito.mock(ClusterInfo.class);
dispatcher.tryToUpdateRoundRobin(clusterInfo);
var t = dispatcher.timeToUpdateRoundRobin;
var rr =
Arrays.stream(dispatcher.roundRobin).boxed().collect(Collectors.toUnmodifiableList());
Assertions.assertEquals(StrictCostDispatcher.ROUND_ROBIN_LENGTH, rr.size());
// the rr is not updated yet
dispatcher.tryToUpdateRoundRobin(clusterInfo);
IntStream.range(0, rr.size())
.forEach(i -> Assertions.assertEquals(rr.get(i), dispatcher.roundRobin[i]));
Utils.sleep(Duration.ofSeconds(3));
dispatcher.tryToUpdateRoundRobin(clusterInfo);
// rr is updated already
Assertions.assertNotEquals(t, dispatcher.timeToUpdateRoundRobin);
}
}
}