Skip to content

Commit

Permalink
add costUtils (opensource4you#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
qoo332001 authored Aug 17, 2022
1 parent e8077e7 commit d40e970
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 0 deletions.
185 changes: 185 additions & 0 deletions app/src/main/java/org/astraea/app/balancer/BalancerUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.balancer;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
import org.astraea.app.admin.ReplicaInfo;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.balancer.log.ClusterLogAllocation;
import org.astraea.app.cost.HasClusterCost;
import org.astraea.app.metrics.HasBeanObject;

class BalancerUtils {

/**
* Create a {@link ClusterInfo} with its log placement replaced by {@link ClusterLogAllocation}.
* Every log will be marked as online & synced. Based on the given content in {@link
* ClusterLogAllocation}, some logs might not have its data directory specified. Noted that this
* method doesn't check if the given logs is suitable & exists in the cluster info base. the beans
* alongside the based cluster info might be out-of-date or even completely meaningless.
*
* @param clusterInfo the based cluster info
* @param allocation the log allocation to replace {@link ClusterInfo}'s log placement. If the
* allocation implementation is {@link ClusterLogAllocation} then the given instance will be
* locked.
* @return a {@link ClusterInfo} with its log placement replaced.
*/
public static ClusterInfo merge(ClusterInfo clusterInfo, ClusterLogAllocation allocation) {
return new ClusterInfo() {
// TODO: maybe add a field to tell if this cluster info is mocked.

@Override
public List<NodeInfo> nodes() {
return clusterInfo.nodes();
}

@Override
public Set<String> dataDirectories(int brokerId) {
return clusterInfo.dataDirectories(brokerId);
}

@Override
public Set<String> topics() {
return allocation.topicPartitions().stream()
.map(TopicPartition::topic)
.collect(Collectors.toUnmodifiableSet());
}

@Override
public List<ReplicaInfo> availableReplicaLeaders(String topic) {
return replicas(topic).stream()
.filter(ReplicaInfo::isLeader)
.collect(Collectors.toUnmodifiableList());
}

@Override
public List<ReplicaInfo> availableReplicas(String topic) {
// there is no offline sense for a fake cluster info, so everything is online.
return replicas(topic);
}

@Override
public List<ReplicaInfo> replicas(String topic) {
Map<Integer, NodeInfo> nodeIdMap =
nodes().stream()
.collect(Collectors.toUnmodifiableMap(NodeInfo::id, Function.identity()));
var result =
allocation.topicPartitions().stream()
.filter(tp -> tp.topic().equals(topic))
.map(tp -> Map.entry(tp, allocation.logPlacements(tp)))
.flatMap(
entry -> {
var tp = entry.getKey();
var logs = entry.getValue();

return IntStream.range(0, logs.size())
.mapToObj(
i ->
ReplicaInfo.of(
tp.topic(),
tp.partition(),
nodeIdMap.get(logs.get(i).broker()),
i == 0,
true,
false,
logs.get(i).logDirectory().orElse(null)));
})
.collect(Collectors.toUnmodifiableList());

if (result.isEmpty()) throw new NoSuchElementException();

return result;
}
};
}

public static Thread progressWatch(String title, double totalTasks, Supplier<Double> accTasks) {
AtomicInteger counter = new AtomicInteger();

Supplier<String> nextProgressBar =
() -> {
int blockCount = 20;
double percentagePerBlock = 1.0 / blockCount;
double now = accTasks.get();
double currentProgress = now / totalTasks;
int fulfilled = Math.min((int) (currentProgress / percentagePerBlock), blockCount);
int rollingBlock = blockCount - fulfilled >= 1 ? 1 : 0;
int emptyBlocks = blockCount - rollingBlock - fulfilled;

String rollingText = "-\\|/";
String filled = String.join("", Collections.nCopies(fulfilled, "-"));
String rolling =
String.join(
"",
Collections.nCopies(
rollingBlock, "" + rollingText.charAt(counter.getAndIncrement() % 4)));
String empty = String.join("", Collections.nCopies(emptyBlocks, " "));
return String.format("[%s%s%s] (%.2f/%.2f)", filled, rolling, empty, now, totalTasks);
};

Runnable progressWatch =
() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.print("[" + title + "] " + nextProgressBar.get() + '\r');
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
break;
}
}
System.out.println("[" + title + "] " + nextProgressBar.get() + '\r');
System.out.println();
};

return new Thread(progressWatch);
}

static double evaluateCost(
ClusterInfo clusterInfo,
Map<HasClusterCost, Map<Integer, Collection<HasBeanObject>>> metrics) {
var scores =
metrics.keySet().stream()
.map(
cf -> {
var theMetrics = metrics.get(cf);
var clusterBean = ClusterBean.of(theMetrics);
return Map.entry(cf, cf.clusterCost(clusterInfo, clusterBean).value());
})
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
return aggregateFunction(scores);
}

/** the lower, the better. */
static double aggregateFunction(Map<HasClusterCost, Double> scores) {
// use the simple summation result, treat every cost equally.
return scores.values().stream().mapToDouble(x -> x).sum();
}
}
135 changes: 135 additions & 0 deletions app/src/test/java/org/astraea/app/balancer/BalancerUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.balancer;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.balancer.log.ClusterLogAllocation;
import org.astraea.app.balancer.log.LogPlacement;
import org.astraea.app.cost.ReplicaDiskInCost;
import org.astraea.app.cost.ReplicaLeaderCost;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.broker.HasValue;
import org.astraea.app.metrics.broker.LogMetrics;
import org.astraea.app.metrics.broker.ServerMetrics;
import org.astraea.app.partitioner.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class BalancerUtilsTest {
private static final HasValue OLD_TP1_0 =
fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 1000, 1000L);
private static final HasValue NEW_TP1_0 =
fakePartitionBeanObject(
"Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 500000, 10000L);
private static final HasValue OLD_TP1_1 =
fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 500, 1000L);
private static final HasValue NEW_TP1_1 =
fakePartitionBeanObject(
"Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 100000000, 10000L);
private static final HasValue LEADER_BROKER1 =
fakeBrokerBeanObject(
"ReplicaManager", ServerMetrics.ReplicaManager.LEADER_COUNT.metricName(), 2, 10000L);
private static final HasValue LEADER_BROKER2 =
fakeBrokerBeanObject(
"ReplicaManager", ServerMetrics.ReplicaManager.LEADER_COUNT.metricName(), 4, 10000L);
private static final Collection<HasBeanObject> broker1 =
List.of(OLD_TP1_0, NEW_TP1_0, LEADER_BROKER1);
private static final Collection<HasBeanObject> broker2 =
List.of(OLD_TP1_1, NEW_TP1_1, LEADER_BROKER2);
private static final Map<Integer, Collection<HasBeanObject>> beanObjects =
Map.of(0, broker1, 1, broker2);

@Test
void testMockClusterInfoAllocation() {
var tp1 = TopicPartition.of("testMockCluster", 1);
var tp2 = TopicPartition.of("testMockCluster", 0);
var logPlacement1 = List.of(LogPlacement.of(0), LogPlacement.of(1));
var logPlacement2 = List.of(LogPlacement.of(1), LogPlacement.of(2));
var nodes =
new Node[] {
new Node(0, "localhost", 9092),
new Node(1, "localhost", 9092),
new Node(2, "localhost", 9092)
};
var partitionInfo = new PartitionInfo("test-1", 1, nodes[0], nodes, nodes);
var clusterInfo =
ClusterInfo.of(
new Cluster(
"",
List.of(nodes[0], nodes[1], nodes[2]),
List.of(partitionInfo),
Set.of(),
Set.of(),
Node.noNode()));
var cla = ClusterLogAllocation.of(Map.of(tp1, logPlacement1, tp2, logPlacement2));
var mockClusterInfo = BalancerUtils.merge(clusterInfo, cla);
Assertions.assertEquals(mockClusterInfo.replicas("testMockCluster").size(), 4);
Assertions.assertEquals(mockClusterInfo.nodes().size(), 3);
Assertions.assertEquals(mockClusterInfo.topics().size(), 1);
Assertions.assertTrue(mockClusterInfo.topics().contains("testMockCluster"));
}

@Test
void testEvaluateCost() {
var node1 = new Node(0, "localhost", 9092);
var node2 = new Node(1, "localhost", 9092);
var partitionInfo1 =
new PartitionInfo("test-1", 0, node1, new Node[] {node1}, new Node[] {node1});
var partitionInfo2 =
new PartitionInfo("test-1", 1, node2, new Node[] {node2}, new Node[] {node2});
var clusterInfo =
ClusterInfo.of(
new Cluster(
"",
List.of(node1, node2),
List.of(partitionInfo1, partitionInfo2),
Set.of(),
Set.of(),
Node.noNode()));

var cf1 = new ReplicaLeaderCost();
var cf2 = new ReplicaDiskInCost(Configuration.of(Map.of("metrics.duration", "5")));
var cost = BalancerUtils.evaluateCost(clusterInfo, Map.of(cf1, beanObjects, cf2, beanObjects));
Assertions.assertEquals(1.3234028368582615, cost);
}

private static LogMetrics.Log.Meter fakePartitionBeanObject(
String type, String name, String topic, String partition, long size, long time) {
return new LogMetrics.Log.Meter(
new BeanObject(
"kafka.log",
Map.of("name", name, "type", type, "topic", topic, "partition", partition),
Map.of("Value", size),
time));
}

private static ServerMetrics.ReplicaManager.Meter fakeBrokerBeanObject(
String type, String name, long value, long time) {
return new ServerMetrics.ReplicaManager.Meter(
new BeanObject(
"kafka.server", Map.of("type", type, "name", name), Map.of("Value", value), time));
}
}

0 comments on commit d40e970

Please sign in to comment.