Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add costUtils #575

Merged
merged 7 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions app/src/main/java/org/astraea/app/balancer/BalancerUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.balancer;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
import org.astraea.app.admin.ReplicaInfo;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.balancer.log.ClusterLogAllocation;
import org.astraea.app.cost.HasClusterCost;
import org.astraea.app.metrics.HasBeanObject;

class BalancerUtils {

/**
* Create a {@link ClusterInfo} with its log placement replaced by {@link ClusterLogAllocation}.
* Every log will be marked as online & synced. Based on the given content in {@link
* ClusterLogAllocation}, some logs might not have its data directory specified. Noted that this
* method doesn't check if the given logs is suitable & exists in the cluster info base. the beans
* alongside the based cluster info might be out-of-date or even completely meaningless.
*
* @param clusterInfo the based cluster info
* @param allocation the log allocation to replace {@link ClusterInfo}'s log placement. If the
* allocation implementation is {@link ClusterLogAllocation} then the given instance will be
* locked.
* @return a {@link ClusterInfo} with its log placement replaced.
*/
public static ClusterInfo merge(ClusterInfo clusterInfo, ClusterLogAllocation allocation) {
return new ClusterInfo() {
// TODO: maybe add a field to tell if this cluster info is mocked.

@Override
public List<NodeInfo> nodes() {
return clusterInfo.nodes();
}

@Override
public Set<String> dataDirectories(int brokerId) {
return clusterInfo.dataDirectories(brokerId);
}

@Override
public Set<String> topics() {
return allocation.topicPartitions().stream()
.map(TopicPartition::topic)
.collect(Collectors.toUnmodifiableSet());
}

@Override
public List<ReplicaInfo> availableReplicaLeaders(String topic) {
return replicas(topic).stream()
.filter(ReplicaInfo::isLeader)
.collect(Collectors.toUnmodifiableList());
}

@Override
public List<ReplicaInfo> availableReplicas(String topic) {
// there is no offline sense for a fake cluster info, so everything is online.
return replicas(topic);
}

@Override
public List<ReplicaInfo> replicas(String topic) {
Map<Integer, NodeInfo> nodeIdMap =
nodes().stream()
.collect(Collectors.toUnmodifiableMap(NodeInfo::id, Function.identity()));
var result =
allocation.topicPartitions().stream()
.filter(tp -> tp.topic().equals(topic))
.map(tp -> Map.entry(tp, allocation.logPlacements(tp)))
.flatMap(
entry -> {
var tp = entry.getKey();
var logs = entry.getValue();

return IntStream.range(0, logs.size())
.mapToObj(
i ->
ReplicaInfo.of(
tp.topic(),
tp.partition(),
nodeIdMap.get(logs.get(i).broker()),
i == 0,
true,
false,
logs.get(i).logDirectory().orElse(null)));
})
.collect(Collectors.toUnmodifiableList());

if (result.isEmpty()) throw new NoSuchElementException();

return result;
}
};
}

public static Thread progressWatch(String title, double totalTasks, Supplier<Double> accTasks) {
AtomicInteger counter = new AtomicInteger();

Supplier<String> nextProgressBar =
() -> {
int blockCount = 20;
double percentagePerBlock = 1.0 / blockCount;
double now = accTasks.get();
double currentProgress = now / totalTasks;
int fulfilled = Math.min((int) (currentProgress / percentagePerBlock), blockCount);
int rollingBlock = blockCount - fulfilled >= 1 ? 1 : 0;
int emptyBlocks = blockCount - rollingBlock - fulfilled;

String rollingText = "-\\|/";
String filled = String.join("", Collections.nCopies(fulfilled, "-"));
String rolling =
String.join(
"",
Collections.nCopies(
rollingBlock, "" + rollingText.charAt(counter.getAndIncrement() % 4)));
String empty = String.join("", Collections.nCopies(emptyBlocks, " "));
return String.format("[%s%s%s] (%.2f/%.2f)", filled, rolling, empty, now, totalTasks);
};

Runnable progressWatch =
() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.print("[" + title + "] " + nextProgressBar.get() + '\r');
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
break;
}
}
System.out.println("[" + title + "] " + nextProgressBar.get() + '\r');
System.out.println();
};

return new Thread(progressWatch);
}

static double evaluateCost(
ClusterInfo clusterInfo,
Map<HasClusterCost, Map<Integer, Collection<HasBeanObject>>> metrics) {
var scores =
metrics.keySet().stream()
.map(
cf -> {
var theMetrics = metrics.get(cf);
var clusterBean = ClusterBean.of(theMetrics);
return Map.entry(cf, cf.clusterCost(clusterInfo, clusterBean).value());
})
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
return aggregateFunction(scores);
}

/** the lower, the better. */
static double aggregateFunction(Map<HasClusterCost, Double> scores) {
// use the simple summation result, treat every cost equally.
return scores.values().stream().mapToDouble(x -> x).sum();
}
}
135 changes: 135 additions & 0 deletions app/src/test/java/org/astraea/app/balancer/BalancerUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.balancer;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.balancer.log.ClusterLogAllocation;
import org.astraea.app.balancer.log.LogPlacement;
import org.astraea.app.cost.ReplicaDiskInCost;
import org.astraea.app.cost.ReplicaLeaderCost;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.broker.HasValue;
import org.astraea.app.metrics.broker.LogMetrics;
import org.astraea.app.metrics.broker.ServerMetrics;
import org.astraea.app.partitioner.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class BalancerUtilsTest {
private static final HasValue OLD_TP1_0 =
fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 1000, 1000L);
private static final HasValue NEW_TP1_0 =
fakePartitionBeanObject(
"Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 500000, 10000L);
private static final HasValue OLD_TP1_1 =
fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 500, 1000L);
private static final HasValue NEW_TP1_1 =
fakePartitionBeanObject(
"Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 100000000, 10000L);
private static final HasValue LEADER_BROKER1 =
fakeBrokerBeanObject(
"ReplicaManager", ServerMetrics.ReplicaManager.LEADER_COUNT.metricName(), 2, 10000L);
private static final HasValue LEADER_BROKER2 =
fakeBrokerBeanObject(
"ReplicaManager", ServerMetrics.ReplicaManager.LEADER_COUNT.metricName(), 4, 10000L);
private static final Collection<HasBeanObject> broker1 =
List.of(OLD_TP1_0, NEW_TP1_0, LEADER_BROKER1);
private static final Collection<HasBeanObject> broker2 =
List.of(OLD_TP1_1, NEW_TP1_1, LEADER_BROKER2);
private static final Map<Integer, Collection<HasBeanObject>> beanObjects =
Map.of(0, broker1, 1, broker2);

@Test
void testMockClusterInfoAllocation() {
var tp1 = TopicPartition.of("testMockCluster", 1);
var tp2 = TopicPartition.of("testMockCluster", 0);
var logPlacement1 = List.of(LogPlacement.of(0), LogPlacement.of(1));
var logPlacement2 = List.of(LogPlacement.of(1), LogPlacement.of(2));
var nodes =
new Node[] {
new Node(0, "localhost", 9092),
new Node(1, "localhost", 9092),
new Node(2, "localhost", 9092)
};
var partitionInfo = new PartitionInfo("test-1", 1, nodes[0], nodes, nodes);
var clusterInfo =
ClusterInfo.of(
new Cluster(
"",
List.of(nodes[0], nodes[1], nodes[2]),
List.of(partitionInfo),
Set.of(),
Set.of(),
Node.noNode()));
var cla = ClusterLogAllocation.of(Map.of(tp1, logPlacement1, tp2, logPlacement2));
var mockClusterInfo = BalancerUtils.merge(clusterInfo, cla);
Assertions.assertEquals(mockClusterInfo.replicas("testMockCluster").size(), 4);
Assertions.assertEquals(mockClusterInfo.nodes().size(), 3);
Assertions.assertEquals(mockClusterInfo.topics().size(), 1);
Assertions.assertTrue(mockClusterInfo.topics().contains("testMockCluster"));
}

@Test
void testEvaluateCost() {
var node1 = new Node(0, "localhost", 9092);
var node2 = new Node(1, "localhost", 9092);
var partitionInfo1 =
new PartitionInfo("test-1", 0, node1, new Node[] {node1}, new Node[] {node1});
var partitionInfo2 =
new PartitionInfo("test-1", 1, node2, new Node[] {node2}, new Node[] {node2});
var clusterInfo =
ClusterInfo.of(
new Cluster(
"",
List.of(node1, node2),
List.of(partitionInfo1, partitionInfo2),
Set.of(),
Set.of(),
Node.noNode()));

var cf1 = new ReplicaLeaderCost();
var cf2 = new ReplicaDiskInCost(Configuration.of(Map.of("metrics.duration", "5")));
var cost = BalancerUtils.evaluateCost(clusterInfo, Map.of(cf1, beanObjects, cf2, beanObjects));
Assertions.assertEquals(1.3234028368582615, cost);
}

private static LogMetrics.Log.Meter fakePartitionBeanObject(
String type, String name, String topic, String partition, long size, long time) {
return new LogMetrics.Log.Meter(
new BeanObject(
"kafka.log",
Map.of("name", name, "type", type, "topic", topic, "partition", partition),
Map.of("Value", size),
time));
}

private static ServerMetrics.ReplicaManager.Meter fakeBrokerBeanObject(
String type, String name, long value, long time) {
return new ServerMetrics.ReplicaManager.Meter(
new BeanObject(
"kafka.server", Map.of("type", type, "name", name), Map.of("Value", value), time));
}
}