From c4a4700a499faf01e5d4a7314c5a1858b99de130 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 18 Aug 2022 23:08:28 +0800 Subject: [PATCH] Optimize dispatcher for large partitions (#599) --- .../org/astraea/app/admin/ClusterInfo.java | 5 +- .../astraea/app/partitioner/RoundRobin.java | 9 +- .../app/partitioner/StrictCostDispatcher.java | 67 +++++---- .../partitioner/StrictCostDispatcherTest.java | 129 ++++++++++-------- 4 files changed, 123 insertions(+), 87 deletions(-) diff --git a/app/src/main/java/org/astraea/app/admin/ClusterInfo.java b/app/src/main/java/org/astraea/app/admin/ClusterInfo.java index 8307670424..7a910388b3 100644 --- a/app/src/main/java/org/astraea/app/admin/ClusterInfo.java +++ b/app/src/main/java/org/astraea/app/admin/ClusterInfo.java @@ -16,6 +16,7 @@ */ package org.astraea.app.admin; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -81,7 +82,7 @@ static ClusterInfo of(org.apache.kafka.common.Cluster cluster) { e -> e.getValue().stream() .filter(ReplicaInfo::isLeader) - .collect(Collectors.toUnmodifiableList()))); + .collect(Collectors.toCollection(ArrayList::new)))); return new ClusterInfo() { @Override public List nodes() { @@ -100,7 +101,7 @@ public Set topics() { @Override public List availableReplicaLeaders(String topic) { - return availableReplicaLeaders.getOrDefault(topic, List.of()); + return availableReplicaLeaders.getOrDefault(topic, new ArrayList<>(0)); } @Override diff --git a/app/src/main/java/org/astraea/app/partitioner/RoundRobin.java b/app/src/main/java/org/astraea/app/partitioner/RoundRobin.java index e007e18f88..3b7269bd68 100644 --- a/app/src/main/java/org/astraea/app/partitioner/RoundRobin.java +++ b/app/src/main/java/org/astraea/app/partitioner/RoundRobin.java @@ -45,6 +45,8 @@ static RoundRobin smooth(Map scores) { class SmoothRoundRobin implements RoundRobin { private final Map effectiveScores; + + private final double sum; private volatile Map currentScores; private SmoothRoundRobin(Map scores) { @@ -52,6 +54,7 @@ private SmoothRoundRobin(Map scores) { this.currentScores = scores.keySet().stream() .collect(Collectors.toUnmodifiableMap(Function.identity(), ignored -> 0D)); + this.sum = effectiveScores.values().stream().mapToDouble(d -> d).sum(); } @Override @@ -59,16 +62,14 @@ public Optional next(Set availableTargets) { // no data no answer if (effectiveScores.isEmpty() || availableTargets.isEmpty()) return Optional.empty(); - // 1) calculate the sum of all effective scores - var sum = effectiveScores.values().stream().mapToDouble(d -> d).sum(); - // 2) add effective score to each current score + // 1) add effective score to each current score var nextScores = currentScores.entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, e -> effectiveScores.getOrDefault(e.getKey(), 0D) + e.getValue())); - // 3) get the E which has max value + // 2) get the E which has max value var maxObj = nextScores.entrySet().stream() .filter(e -> availableTargets.contains(e.getKey())) diff --git a/app/src/main/java/org/astraea/app/partitioner/StrictCostDispatcher.java b/app/src/main/java/org/astraea/app/partitioner/StrictCostDispatcher.java index ccc6ace65a..db111ba327 100644 --- a/app/src/main/java/org/astraea/app/partitioner/StrictCostDispatcher.java +++ b/app/src/main/java/org/astraea/app/partitioner/StrictCostDispatcher.java @@ -22,12 +22,13 @@ import java.util.Objects; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.admin.NodeInfo; -import org.astraea.app.admin.ReplicaInfo; import org.astraea.app.argument.DurationField; import org.astraea.app.common.Utils; import org.astraea.app.cost.CostFunction; @@ -53,6 +54,8 @@ * `org.astraea.cost.ThroughputCost=1,org.astraea.cost.broker.BrokerOutputCost=1`. */ public class StrictCostDispatcher implements Dispatcher { + static final int ROUND_ROBIN_LENGTH = 400; + public static final String JMX_PORT = "jmx.port"; public static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease"; @@ -71,7 +74,10 @@ public class StrictCostDispatcher implements Dispatcher { final Map receivers = new TreeMap<>(); - volatile RoundRobin roundRobin; + final int[] roundRobin = new int[ROUND_ROBIN_LENGTH]; + + final AtomicInteger next = new AtomicInteger(0); + volatile long timeToUpdateRoundRobin = -1; // visible for testing @@ -86,15 +92,14 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster if (partitionLeaders.isEmpty()) return 0; // just return the only one available partition - if (partitionLeaders.size() == 1) return partitionLeaders.iterator().next().partition(); + if (partitionLeaders.size() == 1) return partitionLeaders.get(0).partition(); // add new receivers for new brokers receivers.putAll( fetcher .map( fetcher -> - partitionLeaders.stream() - .map(ReplicaInfo::nodeInfo) + clusterInfo.nodes().stream() .filter(nodeInfo -> !receivers.containsKey(nodeInfo.id())) .distinct() .filter(nodeInfo -> jmxPortGetter.apply(nodeInfo.id()).isPresent()) @@ -110,27 +115,35 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster tryToUpdateRoundRobin(clusterInfo); - return roundRobin - .next(partitionLeaders.stream().map(r -> r.nodeInfo().id()).collect(Collectors.toSet())) - .flatMap( - brokerId -> - // TODO: which partition is better when all of them are in same node? - partitionLeaders.stream() - .filter(r -> r.nodeInfo().id() == brokerId) - .map(ReplicaInfo::partition) - .findAny()) - .orElse(partitionLeaders.get((int) (Math.random() * partitionLeaders.size())).partition()); + var target = + roundRobin[ + next.getAndUpdate(previous -> previous >= roundRobin.length - 1 ? 0 : previous + 1)]; + + // TODO: if the topic partitions are existent in fewer brokers, the target gets -1 in most cases + var candidate = + target < 0 + ? partitionLeaders + : partitionLeaders.stream() + .filter(r -> r.nodeInfo().id() == target) + .collect(Collectors.toUnmodifiableList()); + candidate = candidate.isEmpty() ? partitionLeaders : candidate; + return candidate.get((int) (Math.random() * candidate.size())).partition(); } - void tryToUpdateRoundRobin(ClusterInfo clusterInfo) { - if (roundRobin == null || System.currentTimeMillis() >= timeToUpdateRoundRobin) { - roundRobin = + synchronized void tryToUpdateRoundRobin(ClusterInfo clusterInfo) { + if (System.currentTimeMillis() >= timeToUpdateRoundRobin) { + var roundRobin = newRoundRobin( functions, clusterInfo, ClusterBean.of( receivers.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().current())))); + var ids = + clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet()); + // TODO: make ROUND_ROBIN_LENGTH configurable ??? + IntStream.range(0, ROUND_ROBIN_LENGTH) + .forEach(index -> this.roundRobin[index] = roundRobin.next(ids).orElse(-1)); timeToUpdateRoundRobin = System.currentTimeMillis() + roundRobinLease.toMillis(); } } @@ -164,13 +177,17 @@ static RoundRobin newRoundRobin( costFunctions.entrySet().stream() .flatMap( functionWeight -> - ((HasBrokerCost) functionWeight.getKey()) - .brokerCost(clusterInfo, clusterBean).value().entrySet().stream() - .map( - idAndCost -> - Map.entry( - idAndCost.getKey(), - idAndCost.getValue() * functionWeight.getValue()))) + functionWeight + .getKey() + .brokerCost(clusterInfo, clusterBean) + .value() + .entrySet() + .stream() + .map( + idAndCost -> + Map.entry( + idAndCost.getKey(), + idAndCost.getValue() * functionWeight.getValue()))) .collect( Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, Double::sum, HashMap::new)); diff --git a/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java b/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java index 7be347f03b..bede667ff8 100644 --- a/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java +++ b/app/src/test/java/org/astraea/app/partitioner/StrictCostDispatcherTest.java @@ -17,10 +17,14 @@ package org.astraea.app.partitioner; import java.time.Duration; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.admin.NodeInfo; @@ -174,7 +178,6 @@ void testCostFunctionWithoutFetcher() { dispatcher.configure( Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10)); dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo); - Assertions.assertNotNull(dispatcher.roundRobin); Assertions.assertEquals(0, dispatcher.receivers.size()); } } @@ -210,9 +213,12 @@ Receiver receiver(String host, int port, Fetcher fetcher) { var replicaInfo1 = ReplicaInfo.of("topic", 1, NodeInfo.of(10, "host", 11111), true, true, true); var replicaInfo2 = ReplicaInfo.of("topic", 1, NodeInfo.of(11, "host2", 11111), true, true, true); + var rs = List.of(replicaInfo0, replicaInfo1, replicaInfo2); var clusterInfo = Mockito.mock(ClusterInfo.class); - Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())) - .thenReturn(List.of(replicaInfo0, replicaInfo1, replicaInfo2)); + Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())).thenReturn(rs); + Mockito.when(clusterInfo.nodes()) + .thenReturn( + rs.stream().map(ReplicaInfo::nodeInfo).collect(Collectors.toUnmodifiableList())); // there is one local receiver by default Assertions.assertEquals(1, dispatcher.receivers.size()); Assertions.assertEquals(-1, dispatcher.receivers.keySet().iterator().next()); @@ -230,58 +236,65 @@ Receiver receiver(String host, int port, Fetcher fetcher) { @Test void testEmptyJmxPort() { - var dispatcher = new StrictCostDispatcher(); + try (var dispatcher = new StrictCostDispatcher()) { - // pass due to local mbean - dispatcher.configure( - Map.of(new NodeThroughputCost(), 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10)); + // pass due to local mbean + dispatcher.configure( + Map.of(new NodeThroughputCost(), 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10)); - // pass due to default port - dispatcher.configure( - Map.of(new NodeThroughputCost(), 1D), Optional.of(111), Map.of(), Duration.ofSeconds(10)); + // pass due to default port + dispatcher.configure( + Map.of(new NodeThroughputCost(), 1D), Optional.of(111), Map.of(), Duration.ofSeconds(10)); - // pass due to specify port - dispatcher.configure( - Map.of(new NodeThroughputCost(), 1D), - Optional.empty(), - Map.of(222, 111), - Duration.ofSeconds(10)); + // pass due to specify port + dispatcher.configure( + Map.of(new NodeThroughputCost(), 1D), + Optional.empty(), + Map.of(222, 111), + Duration.ofSeconds(10)); + } } @Test void testReturnedPartition() { var brokerId = 22; var partitionId = 123; - var dispatcher = new StrictCostDispatcher(); - var costFunction = - new HasBrokerCost() { - @Override - public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { - return () -> Map.of(brokerId, 10D); - } - }; - dispatcher.configure( - Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10)); - - var replicaInfo0 = - ReplicaInfo.of( - "topic", partitionId, NodeInfo.of(brokerId, "host", 11111), true, true, true); - var replicaInfo1 = - ReplicaInfo.of("topic", 1, NodeInfo.of(1111, "host2", 11111), true, true, true); - var clusterInfo = Mockito.mock(ClusterInfo.class); - Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())) - .thenReturn(List.of(replicaInfo0, replicaInfo1)); + try (var dispatcher = new StrictCostDispatcher()) { + var costFunction = + new HasBrokerCost() { + @Override + public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { + return () -> Map.of(brokerId, 10D); + } + }; + dispatcher.configure( + Map.of(costFunction, 1D), Optional.empty(), Map.of(), Duration.ofSeconds(10)); - Assertions.assertEquals( - partitionId, dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo)); + var replicaInfo0 = + ReplicaInfo.of( + "topic", partitionId, NodeInfo.of(brokerId, "host", 11111), true, true, true); + var replicaInfo1 = + ReplicaInfo.of("topic", 1, NodeInfo.of(1111, "host2", 11111), true, true, true); + var clusterInfo = Mockito.mock(ClusterInfo.class); + Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())) + .thenReturn(List.of(replicaInfo0, replicaInfo1)); + Mockito.when(clusterInfo.nodes()) + .thenReturn( + Stream.of(replicaInfo0, replicaInfo1) + .map(ReplicaInfo::nodeInfo) + .collect(Collectors.toUnmodifiableList())); + Assertions.assertEquals( + partitionId, dispatcher.partition("topic", new byte[0], new byte[0], clusterInfo)); + } } @Test void testDefaultFunction() { - var dispatcher = new StrictCostDispatcher(); - dispatcher.configure(Configuration.of(Map.of())); - Assertions.assertEquals(1, dispatcher.functions.size()); - Assertions.assertEquals(1, dispatcher.receivers.size()); + try (var dispatcher = new StrictCostDispatcher()) { + dispatcher.configure(Configuration.of(Map.of())); + Assertions.assertEquals(1, dispatcher.functions.size()); + Assertions.assertEquals(1, dispatcher.receivers.size()); + } } @Test @@ -293,22 +306,26 @@ void testCostToScore() { @Test void testRoundRobinLease() { - var dispatcher = new StrictCostDispatcher(); - dispatcher.configure( - Configuration.of(Map.of(StrictCostDispatcher.ROUND_ROBIN_LEASE_KEY, "2s"))); - Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinLease); + try (var dispatcher = new StrictCostDispatcher()) { - var clusterInfo = Mockito.mock(ClusterInfo.class); - dispatcher.tryToUpdateRoundRobin(clusterInfo); - var rr = dispatcher.roundRobin; - Assertions.assertNotNull(rr); - // the rr is not updated yet - dispatcher.tryToUpdateRoundRobin(clusterInfo); - Assertions.assertEquals(rr, dispatcher.roundRobin); + dispatcher.configure( + Configuration.of(Map.of(StrictCostDispatcher.ROUND_ROBIN_LEASE_KEY, "2s"))); + Assertions.assertEquals(Duration.ofSeconds(2), dispatcher.roundRobinLease); - Utils.sleep(Duration.ofSeconds(3)); - dispatcher.tryToUpdateRoundRobin(clusterInfo); - // rr is updated already - Assertions.assertNotEquals(rr, dispatcher.roundRobin); + var clusterInfo = Mockito.mock(ClusterInfo.class); + dispatcher.tryToUpdateRoundRobin(clusterInfo); + var t = dispatcher.timeToUpdateRoundRobin; + var rr = + Arrays.stream(dispatcher.roundRobin).boxed().collect(Collectors.toUnmodifiableList()); + Assertions.assertEquals(StrictCostDispatcher.ROUND_ROBIN_LENGTH, rr.size()); + // the rr is not updated yet + dispatcher.tryToUpdateRoundRobin(clusterInfo); + IntStream.range(0, rr.size()) + .forEach(i -> Assertions.assertEquals(rr.get(i), dispatcher.roundRobin[i])); + Utils.sleep(Duration.ofSeconds(3)); + dispatcher.tryToUpdateRoundRobin(clusterInfo); + // rr is updated already + Assertions.assertNotEquals(t, dispatcher.timeToUpdateRoundRobin); + } } }