Skip to content

Commit

Permalink
Optimize dispatcher for large partitions (#599)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Aug 18, 2022
1 parent 61d6fc5 commit c4a4700
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 87 deletions.
5 changes: 3 additions & 2 deletions app/src/main/java/org/astraea/app/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.astraea.app.admin;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -81,7 +82,7 @@ static ClusterInfo of(org.apache.kafka.common.Cluster cluster) {
e ->
e.getValue().stream()
.filter(ReplicaInfo::isLeader)
.collect(Collectors.toUnmodifiableList())));
.collect(Collectors.toCollection(ArrayList::new))));
return new ClusterInfo() {
@Override
public List<NodeInfo> nodes() {
Expand All @@ -100,7 +101,7 @@ public Set<String> topics() {

@Override
public List<ReplicaInfo> availableReplicaLeaders(String topic) {
return availableReplicaLeaders.getOrDefault(topic, List.of());
return availableReplicaLeaders.getOrDefault(topic, new ArrayList<>(0));
}

@Override
Expand Down
9 changes: 5 additions & 4 deletions app/src/main/java/org/astraea/app/partitioner/RoundRobin.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,31 @@ static <E> RoundRobin<E> smooth(Map<E, Double> scores) {
class SmoothRoundRobin<E> implements RoundRobin<E> {

private final Map<E, Double> effectiveScores;

private final double sum;
private volatile Map<E, Double> currentScores;

private SmoothRoundRobin(Map<E, Double> scores) {
this.effectiveScores = Collections.unmodifiableMap(scores);
this.currentScores =
scores.keySet().stream()
.collect(Collectors.toUnmodifiableMap(Function.identity(), ignored -> 0D));
this.sum = effectiveScores.values().stream().mapToDouble(d -> d).sum();
}

@Override
public Optional<E> next(Set<E> availableTargets) {
// no data no answer
if (effectiveScores.isEmpty() || availableTargets.isEmpty()) return Optional.empty();

// 1) calculate the sum of all effective scores
var sum = effectiveScores.values().stream().mapToDouble(d -> d).sum();
// 2) add effective score to each current score
// 1) add effective score to each current score
var nextScores =
currentScores.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> effectiveScores.getOrDefault(e.getKey(), 0D) + e.getValue()));
// 3) get the E which has max value
// 2) get the E which has max value
var maxObj =
nextScores.entrySet().stream()
.filter(e -> availableTargets.contains(e.getKey()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
import org.astraea.app.admin.ReplicaInfo;
import org.astraea.app.argument.DurationField;
import org.astraea.app.common.Utils;
import org.astraea.app.cost.CostFunction;
Expand All @@ -53,6 +54,8 @@
* `org.astraea.cost.ThroughputCost=1,org.astraea.cost.broker.BrokerOutputCost=1`.
*/
public class StrictCostDispatcher implements Dispatcher {
static final int ROUND_ROBIN_LENGTH = 400;

public static final String JMX_PORT = "jmx.port";
public static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease";

Expand All @@ -71,7 +74,10 @@ public class StrictCostDispatcher implements Dispatcher {

final Map<Integer, Receiver> receivers = new TreeMap<>();

volatile RoundRobin<Integer> roundRobin;
final int[] roundRobin = new int[ROUND_ROBIN_LENGTH];

final AtomicInteger next = new AtomicInteger(0);

volatile long timeToUpdateRoundRobin = -1;

// visible for testing
Expand All @@ -86,15 +92,14 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster
if (partitionLeaders.isEmpty()) return 0;

// just return the only one available partition
if (partitionLeaders.size() == 1) return partitionLeaders.iterator().next().partition();
if (partitionLeaders.size() == 1) return partitionLeaders.get(0).partition();

// add new receivers for new brokers
receivers.putAll(
fetcher
.map(
fetcher ->
partitionLeaders.stream()
.map(ReplicaInfo::nodeInfo)
clusterInfo.nodes().stream()
.filter(nodeInfo -> !receivers.containsKey(nodeInfo.id()))
.distinct()
.filter(nodeInfo -> jmxPortGetter.apply(nodeInfo.id()).isPresent())
Expand All @@ -110,27 +115,35 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster

tryToUpdateRoundRobin(clusterInfo);

return roundRobin
.next(partitionLeaders.stream().map(r -> r.nodeInfo().id()).collect(Collectors.toSet()))
.flatMap(
brokerId ->
// TODO: which partition is better when all of them are in same node?
partitionLeaders.stream()
.filter(r -> r.nodeInfo().id() == brokerId)
.map(ReplicaInfo::partition)
.findAny())
.orElse(partitionLeaders.get((int) (Math.random() * partitionLeaders.size())).partition());
var target =
roundRobin[
next.getAndUpdate(previous -> previous >= roundRobin.length - 1 ? 0 : previous + 1)];

// TODO: if the topic partitions are existent in fewer brokers, the target gets -1 in most cases
var candidate =
target < 0
? partitionLeaders
: partitionLeaders.stream()
.filter(r -> r.nodeInfo().id() == target)
.collect(Collectors.toUnmodifiableList());
candidate = candidate.isEmpty() ? partitionLeaders : candidate;
return candidate.get((int) (Math.random() * candidate.size())).partition();
}

void tryToUpdateRoundRobin(ClusterInfo clusterInfo) {
if (roundRobin == null || System.currentTimeMillis() >= timeToUpdateRoundRobin) {
roundRobin =
synchronized void tryToUpdateRoundRobin(ClusterInfo clusterInfo) {
if (System.currentTimeMillis() >= timeToUpdateRoundRobin) {
var roundRobin =
newRoundRobin(
functions,
clusterInfo,
ClusterBean.of(
receivers.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().current()))));
var ids =
clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet());
// TODO: make ROUND_ROBIN_LENGTH configurable ???
IntStream.range(0, ROUND_ROBIN_LENGTH)
.forEach(index -> this.roundRobin[index] = roundRobin.next(ids).orElse(-1));
timeToUpdateRoundRobin = System.currentTimeMillis() + roundRobinLease.toMillis();
}
}
Expand Down Expand Up @@ -164,13 +177,17 @@ static RoundRobin<Integer> newRoundRobin(
costFunctions.entrySet().stream()
.flatMap(
functionWeight ->
((HasBrokerCost) functionWeight.getKey())
.brokerCost(clusterInfo, clusterBean).value().entrySet().stream()
.map(
idAndCost ->
Map.entry(
idAndCost.getKey(),
idAndCost.getValue() * functionWeight.getValue())))
functionWeight
.getKey()
.brokerCost(clusterInfo, clusterBean)
.value()
.entrySet()
.stream()
.map(
idAndCost ->
Map.entry(
idAndCost.getKey(),
idAndCost.getValue() * functionWeight.getValue())))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, Double::sum, HashMap::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package org.astraea.app.partitioner;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
Expand Down Expand Up @@ -174,7 +178,6 @@ void testCostFunctionWithoutFetcher() {
dispatcher.configure(
Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));
dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo);
Assertions.assertNotNull(dispatcher.roundRobin);
Assertions.assertEquals(0, dispatcher.receivers.size());
}
}
Expand Down Expand Up @@ -210,9 +213,12 @@ Receiver receiver(String host, int port, Fetcher fetcher) {
var replicaInfo1 = ReplicaInfo.of("topic", 1, NodeInfo.of(10, "host", 11111), true, true, true);
var replicaInfo2 =
ReplicaInfo.of("topic", 1, NodeInfo.of(11, "host2", 11111), true, true, true);
var rs = List.of(replicaInfo0, replicaInfo1, replicaInfo2);
var clusterInfo = Mockito.mock(ClusterInfo.class);
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString()))
.thenReturn(List.of(replicaInfo0, replicaInfo1, replicaInfo2));
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())).thenReturn(rs);
Mockito.when(clusterInfo.nodes())
.thenReturn(
rs.stream().map(ReplicaInfo::nodeInfo).collect(Collectors.toUnmodifiableList()));
// there is one local receiver by default
Assertions.assertEquals(1, dispatcher.receivers.size());
Assertions.assertEquals(-1, dispatcher.receivers.keySet().iterator().next());
Expand All @@ -230,58 +236,65 @@ Receiver receiver(String host, int port, Fetcher fetcher) {

@Test
void testEmptyJmxPort() {
var dispatcher = new StrictCostDispatcher();
try (var dispatcher = new StrictCostDispatcher()) {

// pass due to local mbean
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));
// pass due to local mbean
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));

// pass due to default port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.of(111), Map.of(), Duration.ofSeconds(10));
// pass due to default port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D), Optional.of(111), Map.of(), Duration.ofSeconds(10));

// pass due to specify port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D),
Optional.empty(),
Map.of(222, 111),
Duration.ofSeconds(10));
// pass due to specify port
dispatcher.configure(
Map.of(new NodeThroughputCost(), 1D),
Optional.empty(),
Map.of(222, 111),
Duration.ofSeconds(10));
}
}

@Test
void testReturnedPartition() {
var brokerId = 22;
var partitionId = 123;
var dispatcher = new StrictCostDispatcher();
var costFunction =
new HasBrokerCost() {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> Map.of(brokerId, 10D);
}
};
dispatcher.configure(
Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));

var replicaInfo0 =
ReplicaInfo.of(
"topic", partitionId, NodeInfo.of(brokerId, "host", 11111), true, true, true);
var replicaInfo1 =
ReplicaInfo.of("topic", 1, NodeInfo.of(1111, "host2", 11111), true, true, true);
var clusterInfo = Mockito.mock(ClusterInfo.class);
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString()))
.thenReturn(List.of(replicaInfo0, replicaInfo1));
try (var dispatcher = new StrictCostDispatcher()) {
var costFunction =
new HasBrokerCost() {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> Map.of(brokerId, 10D);
}
};
dispatcher.configure(
Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10));

Assertions.assertEquals(
partitionId, dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo));
var replicaInfo0 =
ReplicaInfo.of(
"topic", partitionId, NodeInfo.of(brokerId, "host", 11111), true, true, true);
var replicaInfo1 =
ReplicaInfo.of("topic", 1, NodeInfo.of(1111, "host2", 11111), true, true, true);
var clusterInfo = Mockito.mock(ClusterInfo.class);
Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString()))
.thenReturn(List.of(replicaInfo0, replicaInfo1));
Mockito.when(clusterInfo.nodes())
.thenReturn(
Stream.of(replicaInfo0, replicaInfo1)
.map(ReplicaInfo::nodeInfo)
.collect(Collectors.toUnmodifiableList()));
Assertions.assertEquals(
partitionId, dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo));
}
}

@Test
void testDefaultFunction() {
var dispatcher = new StrictCostDispatcher();
dispatcher.configure(Configuration.of(Map.of()));
Assertions.assertEquals(1, dispatcher.functions.size());
Assertions.assertEquals(1, dispatcher.receivers.size());
try (var dispatcher = new StrictCostDispatcher()) {
dispatcher.configure(Configuration.of(Map.of()));
Assertions.assertEquals(1, dispatcher.functions.size());
Assertions.assertEquals(1, dispatcher.receivers.size());
}
}

@Test
Expand All @@ -293,22 +306,26 @@ void testCostToScore() {

@Test
void testRoundRobinLease() {
var dispatcher = new StrictCostDispatcher();
dispatcher.configure(
Configuration.of(Map.of(StrictCostDispatcher.ROUND_ROBIN_LEASE_KEY, "2s")));
Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinLease);
try (var dispatcher = new StrictCostDispatcher()) {

var clusterInfo = Mockito.mock(ClusterInfo.class);
dispatcher.tryToUpdateRoundRobin(clusterInfo);
var rr = dispatcher.roundRobin;
Assertions.assertNotNull(rr);
// the rr is not updated yet
dispatcher.tryToUpdateRoundRobin(clusterInfo);
Assertions.assertEquals(rr, dispatcher.roundRobin);
dispatcher.configure(
Configuration.of(Map.of(StrictCostDispatcher.ROUND_ROBIN_LEASE_KEY, "2s")));
Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinLease);

Utils.sleep(Duration.ofSeconds(3));
dispatcher.tryToUpdateRoundRobin(clusterInfo);
// rr is updated already
Assertions.assertNotEquals(rr, dispatcher.roundRobin);
var clusterInfo = Mockito.mock(ClusterInfo.class);
dispatcher.tryToUpdateRoundRobin(clusterInfo);
var t = dispatcher.timeToUpdateRoundRobin;
var rr =
Arrays.stream(dispatcher.roundRobin).boxed().collect(Collectors.toUnmodifiableList());
Assertions.assertEquals(StrictCostDispatcher.ROUND_ROBIN_LENGTH, rr.size());
// the rr is not updated yet
dispatcher.tryToUpdateRoundRobin(clusterInfo);
IntStream.range(0, rr.size())
.forEach(i -> Assertions.assertEquals(rr.get(i), dispatcher.roundRobin[i]));
Utils.sleep(Duration.ofSeconds(3));
dispatcher.tryToUpdateRoundRobin(clusterInfo);
// rr is updated already
Assertions.assertNotEquals(t, dispatcher.timeToUpdateRoundRobin);
}
}
}

0 comments on commit c4a4700

Please sign in to comment.