From d40e970d6a089ddd6e41babd1a2a82c4591403ec Mon Sep 17 00:00:00 2001 From: Xiang-Jun Sun Date: Thu, 18 Aug 2022 00:57:38 +0800 Subject: [PATCH] add costUtils (#575) --- .../astraea/app/balancer/BalancerUtils.java | 185 ++++++++++++++++++ .../app/balancer/BalancerUtilsTest.java | 135 +++++++++++++ 2 files changed, 320 insertions(+) create mode 100644 app/src/main/java/org/astraea/app/balancer/BalancerUtils.java create mode 100644 app/src/test/java/org/astraea/app/balancer/BalancerUtilsTest.java diff --git a/app/src/main/java/org/astraea/app/balancer/BalancerUtils.java b/app/src/main/java/org/astraea/app/balancer/BalancerUtils.java new file mode 100644 index 0000000000..919fa684f7 --- /dev/null +++ b/app/src/main/java/org/astraea/app/balancer/BalancerUtils.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.balancer; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.astraea.app.admin.ClusterBean; +import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.admin.NodeInfo; +import org.astraea.app.admin.ReplicaInfo; +import org.astraea.app.admin.TopicPartition; +import org.astraea.app.balancer.log.ClusterLogAllocation; +import org.astraea.app.cost.HasClusterCost; +import org.astraea.app.metrics.HasBeanObject; + +class BalancerUtils { + + /** + * Create a {@link ClusterInfo} with its log placement replaced by {@link ClusterLogAllocation}. + * Every log will be marked as online & synced. Based on the given content in {@link + * ClusterLogAllocation}, some logs might not have its data directory specified. Noted that this + * method doesn't check if the given logs is suitable & exists in the cluster info base. the beans + * alongside the based cluster info might be out-of-date or even completely meaningless. + * + * @param clusterInfo the based cluster info + * @param allocation the log allocation to replace {@link ClusterInfo}'s log placement. If the + * allocation implementation is {@link ClusterLogAllocation} then the given instance will be + * locked. + * @return a {@link ClusterInfo} with its log placement replaced. + */ + public static ClusterInfo merge(ClusterInfo clusterInfo, ClusterLogAllocation allocation) { + return new ClusterInfo() { + // TODO: maybe add a field to tell if this cluster info is mocked. + + @Override + public List nodes() { + return clusterInfo.nodes(); + } + + @Override + public Set dataDirectories(int brokerId) { + return clusterInfo.dataDirectories(brokerId); + } + + @Override + public Set topics() { + return allocation.topicPartitions().stream() + .map(TopicPartition::topic) + .collect(Collectors.toUnmodifiableSet()); + } + + @Override + public List availableReplicaLeaders(String topic) { + return replicas(topic).stream() + .filter(ReplicaInfo::isLeader) + .collect(Collectors.toUnmodifiableList()); + } + + @Override + public List availableReplicas(String topic) { + // there is no offline sense for a fake cluster info, so everything is online. + return replicas(topic); + } + + @Override + public List replicas(String topic) { + Map nodeIdMap = + nodes().stream() + .collect(Collectors.toUnmodifiableMap(NodeInfo::id, Function.identity())); + var result = + allocation.topicPartitions().stream() + .filter(tp -> tp.topic().equals(topic)) + .map(tp -> Map.entry(tp, allocation.logPlacements(tp))) + .flatMap( + entry -> { + var tp = entry.getKey(); + var logs = entry.getValue(); + + return IntStream.range(0, logs.size()) + .mapToObj( + i -> + ReplicaInfo.of( + tp.topic(), + tp.partition(), + nodeIdMap.get(logs.get(i).broker()), + i == 0, + true, + false, + logs.get(i).logDirectory().orElse(null))); + }) + .collect(Collectors.toUnmodifiableList()); + + if (result.isEmpty()) throw new NoSuchElementException(); + + return result; + } + }; + } + + public static Thread progressWatch(String title, double totalTasks, Supplier accTasks) { + AtomicInteger counter = new AtomicInteger(); + + Supplier nextProgressBar = + () -> { + int blockCount = 20; + double percentagePerBlock = 1.0 / blockCount; + double now = accTasks.get(); + double currentProgress = now / totalTasks; + int fulfilled = Math.min((int) (currentProgress / percentagePerBlock), blockCount); + int rollingBlock = blockCount - fulfilled >= 1 ? 1 : 0; + int emptyBlocks = blockCount - rollingBlock - fulfilled; + + String rollingText = "-\\|/"; + String filled = String.join("", Collections.nCopies(fulfilled, "-")); + String rolling = + String.join( + "", + Collections.nCopies( + rollingBlock, "" + rollingText.charAt(counter.getAndIncrement() % 4))); + String empty = String.join("", Collections.nCopies(emptyBlocks, " ")); + return String.format("[%s%s%s] (%.2f/%.2f)", filled, rolling, empty, now, totalTasks); + }; + + Runnable progressWatch = + () -> { + while (!Thread.currentThread().isInterrupted()) { + System.out.print("[" + title + "] " + nextProgressBar.get() + '\r'); + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + break; + } + } + System.out.println("[" + title + "] " + nextProgressBar.get() + '\r'); + System.out.println(); + }; + + return new Thread(progressWatch); + } + + static double evaluateCost( + ClusterInfo clusterInfo, + Map>> metrics) { + var scores = + metrics.keySet().stream() + .map( + cf -> { + var theMetrics = metrics.get(cf); + var clusterBean = ClusterBean.of(theMetrics); + return Map.entry(cf, cf.clusterCost(clusterInfo, clusterBean).value()); + }) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + return aggregateFunction(scores); + } + + /** the lower, the better. */ + static double aggregateFunction(Map scores) { + // use the simple summation result, treat every cost equally. + return scores.values().stream().mapToDouble(x -> x).sum(); + } +} diff --git a/app/src/test/java/org/astraea/app/balancer/BalancerUtilsTest.java b/app/src/test/java/org/astraea/app/balancer/BalancerUtilsTest.java new file mode 100644 index 0000000000..7edbb13da6 --- /dev/null +++ b/app/src/test/java/org/astraea/app/balancer/BalancerUtilsTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.balancer; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.admin.TopicPartition; +import org.astraea.app.balancer.log.ClusterLogAllocation; +import org.astraea.app.balancer.log.LogPlacement; +import org.astraea.app.cost.ReplicaDiskInCost; +import org.astraea.app.cost.ReplicaLeaderCost; +import org.astraea.app.metrics.BeanObject; +import org.astraea.app.metrics.HasBeanObject; +import org.astraea.app.metrics.broker.HasValue; +import org.astraea.app.metrics.broker.LogMetrics; +import org.astraea.app.metrics.broker.ServerMetrics; +import org.astraea.app.partitioner.Configuration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BalancerUtilsTest { + private static final HasValue OLD_TP1_0 = + fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 1000, 1000L); + private static final HasValue NEW_TP1_0 = + fakePartitionBeanObject( + "Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 500000, 10000L); + private static final HasValue OLD_TP1_1 = + fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 500, 1000L); + private static final HasValue NEW_TP1_1 = + fakePartitionBeanObject( + "Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 100000000, 10000L); + private static final HasValue LEADER_BROKER1 = + fakeBrokerBeanObject( + "ReplicaManager", ServerMetrics.ReplicaManager.LEADER_COUNT.metricName(), 2, 10000L); + private static final HasValue LEADER_BROKER2 = + fakeBrokerBeanObject( + "ReplicaManager", ServerMetrics.ReplicaManager.LEADER_COUNT.metricName(), 4, 10000L); + private static final Collection broker1 = + List.of(OLD_TP1_0, NEW_TP1_0, LEADER_BROKER1); + private static final Collection broker2 = + List.of(OLD_TP1_1, NEW_TP1_1, LEADER_BROKER2); + private static final Map> beanObjects = + Map.of(0, broker1, 1, broker2); + + @Test + void testMockClusterInfoAllocation() { + var tp1 = TopicPartition.of("testMockCluster", 1); + var tp2 = TopicPartition.of("testMockCluster", 0); + var logPlacement1 = List.of(LogPlacement.of(0), LogPlacement.of(1)); + var logPlacement2 = List.of(LogPlacement.of(1), LogPlacement.of(2)); + var nodes = + new Node[] { + new Node(0, "localhost", 9092), + new Node(1, "localhost", 9092), + new Node(2, "localhost", 9092) + }; + var partitionInfo = new PartitionInfo("test-1", 1, nodes[0], nodes, nodes); + var clusterInfo = + ClusterInfo.of( + new Cluster( + "", + List.of(nodes[0], nodes[1], nodes[2]), + List.of(partitionInfo), + Set.of(), + Set.of(), + Node.noNode())); + var cla = ClusterLogAllocation.of(Map.of(tp1, logPlacement1, tp2, logPlacement2)); + var mockClusterInfo = BalancerUtils.merge(clusterInfo, cla); + Assertions.assertEquals(mockClusterInfo.replicas("testMockCluster").size(), 4); + Assertions.assertEquals(mockClusterInfo.nodes().size(), 3); + Assertions.assertEquals(mockClusterInfo.topics().size(), 1); + Assertions.assertTrue(mockClusterInfo.topics().contains("testMockCluster")); + } + + @Test + void testEvaluateCost() { + var node1 = new Node(0, "localhost", 9092); + var node2 = new Node(1, "localhost", 9092); + var partitionInfo1 = + new PartitionInfo("test-1", 0, node1, new Node[] {node1}, new Node[] {node1}); + var partitionInfo2 = + new PartitionInfo("test-1", 1, node2, new Node[] {node2}, new Node[] {node2}); + var clusterInfo = + ClusterInfo.of( + new Cluster( + "", + List.of(node1, node2), + List.of(partitionInfo1, partitionInfo2), + Set.of(), + Set.of(), + Node.noNode())); + + var cf1 = new ReplicaLeaderCost(); + var cf2 = new ReplicaDiskInCost(Configuration.of(Map.of("metrics.duration", "5"))); + var cost = BalancerUtils.evaluateCost(clusterInfo, Map.of(cf1, beanObjects, cf2, beanObjects)); + Assertions.assertEquals(1.3234028368582615, cost); + } + + private static LogMetrics.Log.Meter fakePartitionBeanObject( + String type, String name, String topic, String partition, long size, long time) { + return new LogMetrics.Log.Meter( + new BeanObject( + "kafka.log", + Map.of("name", name, "type", type, "topic", topic, "partition", partition), + Map.of("Value", size), + time)); + } + + private static ServerMetrics.ReplicaManager.Meter fakeBrokerBeanObject( + String type, String name, long value, long time) { + return new ServerMetrics.ReplicaManager.Meter( + new BeanObject( + "kafka.server", Map.of("type", type, "name", name), Map.of("Value", value), time)); + } +}