diff --git a/app/src/main/java/org/astraea/app/cost/HasMoveCost.java b/app/src/main/java/org/astraea/app/cost/HasMoveCost.java new file mode 100644 index 0000000000..3fd27a852c --- /dev/null +++ b/app/src/main/java/org/astraea/app/cost/HasMoveCost.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.cost; + +import java.util.Map; +import org.astraea.app.admin.ClusterBean; +import org.astraea.app.admin.ClusterInfo; + +public interface HasMoveCost extends CostFunction { + /** + * score migrate cost from originClusterInfo to newClusterInfo . + * + * @param originClusterInfo the clusterInfo before migrate + * @param newClusterInfo the mocked clusterInfo generate from balancer + * @param clusterBean cluster metrics + * @return the score of migrate cost + */ + ClusterCost clusterCost( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean); + + /** + * @param originClusterInfo he clusterInfo before migrate + * @param newClusterInfo the mocked clusterInfo generate from balancer + * @param clusterBean cluster metrics + * @return Check if the migrate plan exceeds the available hardware resources + */ + boolean overflow( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean); + + /** + * @param originClusterInfo + * @param newClusterInfo + * @return total migrate size of the plan + */ + Map totalMigrateSize( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean); + + double estimatedMigrateTime( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean); +} diff --git a/app/src/main/java/org/astraea/app/cost/MoveCost.java b/app/src/main/java/org/astraea/app/cost/MoveCost.java new file mode 100644 index 0000000000..b672773a65 --- /dev/null +++ b/app/src/main/java/org/astraea/app/cost/MoveCost.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.cost; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.astraea.app.admin.ClusterBean; +import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.admin.TopicPartition; +import org.astraea.app.admin.TopicPartitionReplica; +import org.astraea.app.metrics.HasBeanObject; +import org.astraea.app.metrics.broker.HasCount; +import org.astraea.app.metrics.broker.HasValue; +import org.astraea.app.metrics.broker.LogMetrics; +import org.astraea.app.metrics.broker.ServerMetrics; +import org.astraea.app.metrics.collector.Fetcher; +import org.astraea.app.partitioner.Configuration; + +public class MoveCost implements HasMoveCost { + static final double OVERFLOW_SCORE = 9999.0; + + public MoveCost(Configuration configuration) { + this.duration = + Duration.ofSeconds(Integer.parseInt(configuration.string("metrics.duration").orElse("30"))); + this.minDiskFreeSpace = + Double.valueOf(configuration.string("min.free.space.percentage").orElse("0.2")); + this.totalBrokerCapacity = sizeConvert(configuration.requireString(BROKERCAPACITY)); + this.brokerBandwidthCap = bandwidthConvert(configuration.requireString(BROKERBANDWIDTH)); + } + + static class ReplicaMigrateInfo { + TopicPartition topicPartition; + int brokerSource; + int brokerSink; + String pathSource; + String pathSink; + + public ReplicaMigrateInfo( + TopicPartition topicPartition, + int brokerSource, + int brokerSink, + String pathSource, + String pathSink) { + this.topicPartition = topicPartition; + this.brokerSource = brokerSource; + this.brokerSink = brokerSink; + this.pathSource = pathSource; + this.pathSink = pathSink; + } + + TopicPartitionReplica sourceTPR() { + return TopicPartitionReplica.of( + topicPartition.topic(), topicPartition.partition(), brokerSource); + } + } + + static final String BROKERBANDWIDTH = "brokerBandwidthConfig"; + static final String BROKERCAPACITY = "brokerCapacityConfig"; + static final String UNKNOWN = "unknown"; + final Double minDiskFreeSpace; + final Duration duration; + Map> totalBrokerCapacity; + Map brokerBandwidthCap; + + private boolean filterBean(HasBeanObject hasBeanObject, String metricName) { + var beanObject = hasBeanObject.beanObject(); + return beanObject != null + && beanObject.properties().containsKey("name") + && beanObject.properties().get("name").equals(metricName); + } + + public Map brokerTrafficMetrics( + ClusterBean clusterBean, String metricName, Duration sampleWindow) { + return clusterBean.all().entrySet().stream() + .map( + brokerMetrics -> { + var sizeTimeSeries = + brokerMetrics.getValue().stream() + .filter(hasBeanObject -> filterBean(hasBeanObject, metricName)) + .map(hasBeanObject -> (HasCount) hasBeanObject) + .sorted(Comparator.comparingLong(HasBeanObject::createdTimestamp).reversed()) + .collect(Collectors.toUnmodifiableList()); + var latestSize = + sizeTimeSeries.stream().findFirst().orElseThrow(NoSuchElementException::new); + var windowSize = + sizeTimeSeries.stream() + .dropWhile( + bean -> + bean.createdTimestamp() + > latestSize.createdTimestamp() - sampleWindow.toMillis()) + .findFirst() + .orElseThrow(); + var dataRate = + ((double) (latestSize.count() - windowSize.count())) + / ((double) (latestSize.createdTimestamp() - windowSize.createdTimestamp()) + / 1000) + / 1024.0 + / 1024.0; + return Map.entry(brokerMetrics.getKey(), dataRate); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + boolean checkBrokerInTraffic( + Map replicaDataRate, + List migratedReplicas, + ClusterBean clusterBean) { + // TODO: need replicaOutRate and estimated migrate traffic. + var brokerBandwidthCap = this.brokerBandwidthCap; + AtomicReference overflow = new AtomicReference<>(false); + var brokerBytesIn = brokerTrafficMetrics(clusterBean, "BytesInPerSec", duration); + var replicationBytesInPerSec = + brokerTrafficMetrics(clusterBean, "ReplicationBytesInPerSec", duration); + var replicationBytesOutPerSec = + brokerTrafficMetrics(clusterBean, "ReplicationBytesOutPerSec", duration); + var brokerTrafficChange = new HashMap(); + migratedReplicas.forEach( + replicaMigrateInfo -> { + // TODO: need replicaOutRate + var sourceChange = brokerTrafficChange.getOrDefault(replicaMigrateInfo.brokerSource, 0.0); + // - replicaDataRate.getOrDefault(replicaMigrateInfo.sourceTPR(), 0.0); + var sinkChange = + brokerTrafficChange.getOrDefault(replicaMigrateInfo.brokerSink, 0.0) + + replicaDataRate.getOrDefault(replicaMigrateInfo.sourceTPR(), 0.0); + brokerTrafficChange.put(replicaMigrateInfo.brokerSource, sourceChange); + brokerTrafficChange.put(replicaMigrateInfo.brokerSink, sinkChange); + }); + var brokerReplicaIn = + replicaDataRate.entrySet().stream() + .collect(Collectors.groupingBy(e -> e.getKey().brokerId())) + .entrySet() + .stream() + .map( + x -> + Map.entry( + x.getKey(), x.getValue().stream().mapToDouble(Map.Entry::getValue).sum())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + brokerBandwidthCap.forEach( + (broker, bandwidth) -> { + if (brokerBytesIn.get(broker) + + replicationBytesInPerSec.get(broker) + + brokerReplicaIn.get(broker) + + brokerTrafficChange.getOrDefault(broker, 0.0) + > bandwidth) overflow.set(true); + }); + return overflow.get(); + } + + boolean checkFolderSize( + Map replicaSize, List distributionChange) { + AtomicReference overflow = new AtomicReference<>(false); + var pathSizeChange = new HashMap, Long>(); + distributionChange.forEach( + replicaMigrateInfo -> { + var sourceSizeChange = + pathSizeChange.getOrDefault( + Map.entry(replicaMigrateInfo.brokerSource, replicaMigrateInfo.pathSource), 0L); + // - replicaSize.get(replicaMigrateInfo.sourceTPR()); + var sinkSizeChange = + pathSizeChange.getOrDefault( + Map.entry(replicaMigrateInfo.brokerSink, replicaMigrateInfo.pathSink), 0L) + + replicaSize.get(replicaMigrateInfo.sourceTPR()); + pathSizeChange.put( + Map.entry(replicaMigrateInfo.brokerSource, replicaMigrateInfo.pathSource), + sourceSizeChange); + pathSizeChange.put( + Map.entry(replicaMigrateInfo.brokerSink, replicaMigrateInfo.pathSink), + sinkSizeChange); + }); + totalBrokerCapacity.forEach( + (broker, pathSize) -> + pathSize.forEach( + (path, size) -> { + if ((totalBrokerCapacity.get(broker).getOrDefault(path, 0) + + pathSizeChange.getOrDefault(Map.entry(broker, path), 0L)) + / 1024.0 + / 1024.0 + / size + > (1 - minDiskFreeSpace)) overflow.set(true); + })); + return overflow.get(); + } + + boolean checkMigrateSpeed() { + // TODO: Estimate the traffic available to a replica + AtomicReference overflow = new AtomicReference<>(false); + return overflow.get(); + } + + @Override + public Optional fetcher() { + return Optional.of( + client -> + Stream.of( + client12 -> + List.of( + ServerMetrics.Topic.BYTES_IN_PER_SEC.fetch(client12), + ServerMetrics.Topic.BYTES_OUT_PER_SEC.fetch(client12), + ServerMetrics.Topic.REPLICATION_BYTES_IN_PER_SEC.fetch(client12), + ServerMetrics.Topic.REPLICATION_BYTES_OUT_PER_SEC.fetch(client12)), + (Fetcher) LogMetrics.Log.SIZE::fetch) + .flatMap(f -> f.fetch(client).stream()) + .collect(Collectors.toUnmodifiableList())); + } + + @Override + public ClusterCost clusterCost( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean) { + if (overflow(originClusterInfo, newClusterInfo, clusterBean)) return () -> OVERFLOW_SCORE; + var migratedReplicas = getMigrateReplicas(originClusterInfo, newClusterInfo, true); + var replicaSize = getReplicaSize(clusterBean); + var replicaDataRate = replicaDataRate(clusterBean, duration); + var brokerMigrateInSize = + migratedReplicas.stream() + .map( + replicaMigrateInfo -> + Map.entry( + replicaMigrateInfo.brokerSink, + (double) replicaSize.get(replicaMigrateInfo.sourceTPR()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum)); + var trafficSeries = + migratedReplicas.stream() + .map(x -> replicaDataRate.get(x.sourceTPR())) + .sorted() + .filter(x -> x != 0.0) + .collect(Collectors.toList()); + var meanTrafficSeries = trafficSeries.stream().mapToDouble(x -> x).sum() / trafficSeries.size(); + var SDTrafficSeries = + Math.sqrt( + trafficSeries.stream() + .mapToDouble(score -> Math.pow((score - meanTrafficSeries), 2)) + .sum() + / trafficSeries.size()); + + var totalMigrateTraffic = trafficSeries.stream().mapToDouble(x -> x).sum(); + var totalReplicaTrafficInSink = + replicaDataRate.entrySet().stream() + .filter(x -> brokerMigrateInSize.containsKey(x.getKey().brokerId())) + .mapToDouble(Map.Entry::getValue) + .sum(); + var meanMigrateSize = trafficSeries.stream().mapToDouble(x -> x).sum() / trafficSeries.size(); + var sdMigrateSize = + Math.sqrt( + trafficSeries.stream() + .mapToDouble(score -> Math.pow((score - meanMigrateSize), 2)) + .sum() + / trafficSeries.size()); + var totalMigrateSize = + brokerMigrateInSize.values().stream().mapToDouble(x -> x / 1024.0 / 1024.0).sum(); + var total = + totalBrokerCapacity.values().stream() + .mapToDouble(x -> x.values().stream().mapToDouble(y -> y).sum()) + .sum() + / 1024.0 + / 1024.0; + var totalMigrateSizeScore = totalMigrateSize / total > 1 ? 1 : totalMigrateSize / total; + if (replicaDataRate.containsValue(-1.0)) return () -> 999.0; + return () -> SDTrafficSeries; + } + + public Map getReplicaSize(ClusterBean clusterBean) { + return clusterBean.mapByReplica().entrySet().stream() + .flatMap( + e -> + e.getValue().stream() + .filter(x -> x instanceof HasValue) + .filter(x -> x.beanObject().domainName().equals("kafka.log")) + .filter(x -> x.beanObject().properties().get("type").equals("Log")) + .filter(x -> x.beanObject().properties().get("name").equals("Size")) + .map(x -> (HasValue) x) + .map(x -> Map.entry(e.getKey(), x.value()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x1, x2) -> x2)); + } + + public Map replicaDataRate( + ClusterBean clusterBean, Duration sampleWindow) { + return clusterBean.mapByReplica().entrySet().parallelStream() + .map( + metrics -> { + // calculate the increase rate over a specific window of time + var sizeTimeSeries = + metrics.getValue().stream() + .filter(bean -> bean instanceof HasValue) + .filter(bean -> bean.beanObject().properties().get("type").equals("Log")) + .filter(bean -> bean.beanObject().properties().get("name").equals("Size")) + .map(bean -> (HasValue) bean) + .sorted(Comparator.comparingLong(HasBeanObject::createdTimestamp).reversed()) + .collect(Collectors.toUnmodifiableList()); + var latestSize = sizeTimeSeries.stream().findFirst().orElseThrow(); + var windowSize = + sizeTimeSeries.stream() + .dropWhile( + bean -> + bean.createdTimestamp() + > latestSize.createdTimestamp() - sampleWindow.toMillis()) + .findFirst() + .orElse(latestSize); + var dataRate = + ((double) (latestSize.value() - windowSize.value())) + / ((double) (latestSize.createdTimestamp() - windowSize.createdTimestamp()) + / 1000) + / 1024.0 + / 1024.0; + if (dataRate < 0) + // retention.size is triggered + dataRate = -1.0; + if (latestSize == windowSize) dataRate = 0.0; + return Map.entry(metrics.getKey(), dataRate); + }) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public boolean overflow( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean) { + var distributionChange = getMigrateReplicas(originClusterInfo, newClusterInfo, false); + var migratedReplicas = getMigrateReplicas(originClusterInfo, newClusterInfo, true); + var replicaSize = getReplicaSize(clusterBean); + var replicaDataRate = replicaDataRate(clusterBean, duration); + return checkBrokerInTraffic(replicaDataRate, migratedReplicas, clusterBean) + || checkFolderSize(replicaSize, distributionChange) + || checkMigrateSpeed(); + } + + @Override + public Map totalMigrateSize( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean) { + var migrateReplicas = getMigrateReplicas(originClusterInfo, newClusterInfo, false); + var replicaSize = getReplicaSize(clusterBean); + return migrateReplicas.stream() + .map(migrate -> Map.entry(migrate, replicaSize.get(migrate.sourceTPR()))) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public double estimatedMigrateTime( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean) { + var brokerBytesInPerSec = brokerTrafficMetrics(clusterBean, "BytesInPerSec", duration); + var brokerBytesOutPerSec = brokerTrafficMetrics(clusterBean, "BytesOutPerSec", duration); + var migratedReplicas = getMigrateReplicas(originClusterInfo, newClusterInfo, true); + var replicationBytesInPerSec = + brokerTrafficMetrics(clusterBean, "ReplicationBytesInPerSec", duration); + var replicationBytesOutPerSec = + brokerTrafficMetrics(clusterBean, "ReplicationBytesOutPerSec", duration); + var replicaSize = getReplicaSize(clusterBean); + var replicaDataRate = replicaDataRate(clusterBean, duration); + var brokerReplicaIn = + replicaDataRate.entrySet().stream() + .collect(Collectors.groupingBy(e -> e.getKey().brokerId())) + .entrySet() + .stream() + .map( + x -> + Map.entry( + x.getKey(), x.getValue().stream().mapToDouble(Map.Entry::getValue).sum())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var availableInBandwidth = + brokerBandwidthCap.entrySet().stream() + .map( + brokerBandWidth -> + Map.entry( + brokerBandWidth.getKey(), + brokerBandWidth.getValue() + - brokerBytesInPerSec.get(brokerBandWidth.getKey()) + - replicationBytesInPerSec.get(brokerBandWidth.getKey()) + - brokerReplicaIn.get(brokerBandWidth.getKey()))) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + return migratedReplicas.stream() + .mapToDouble( + changes -> + replicaSize.get(changes.sourceTPR()) + / 1024.0 + / 1024.0 + / availableInBandwidth.get(changes.brokerSink)) + .max() + .orElse(0); + } + + public List getMigrateReplicas( + ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, boolean fromLeader) { + var leaderReplicas = + originClusterInfo.topics().stream() + .map(originClusterInfo::availableReplicaLeaders) + .flatMap( + replicaInfos -> + replicaInfos.stream() + .map( + replicaInfo -> + Map.entry( + TopicPartition.of(replicaInfo.topic(), replicaInfo.partition()), + Map.entry( + TopicPartitionReplica.of( + replicaInfo.topic(), + replicaInfo.partition(), + replicaInfo.nodeInfo().id()), + replicaInfo.dataFolder().orElse(UNKNOWN))))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var beforeMigrate = + originClusterInfo.topics().stream() + .map(originClusterInfo::availableReplicas) + .flatMap( + replicaInfos -> + replicaInfos.stream() + .map( + replicaInfo -> + Map.entry( + TopicPartitionReplica.of( + replicaInfo.topic(), + replicaInfo.partition(), + replicaInfo.nodeInfo().id()), + replicaInfo.dataFolder().orElse(UNKNOWN)))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var afterMigrate = + newClusterInfo.topics().stream() + .map(newClusterInfo::availableReplicas) + .flatMap( + replicaInfos -> + replicaInfos.stream() + .map( + replicaInfo -> + Map.entry( + TopicPartitionReplica.of( + replicaInfo.topic(), + replicaInfo.partition(), + replicaInfo.nodeInfo().id()), + replicaInfo.dataFolder().orElse(UNKNOWN)))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (fromLeader) + return afterMigrate.entrySet().stream() + .filter(newTPR -> !beforeMigrate.containsKey(newTPR.getKey())) + .map( + newTPR -> { + var tp = TopicPartition.of(newTPR.getKey().topic(), newTPR.getKey().partition()); + var sinkBroker = newTPR.getKey().brokerId(); + return new ReplicaMigrateInfo( + tp, + leaderReplicas.get(tp).getKey().brokerId(), + sinkBroker, + leaderReplicas.get(tp).getValue(), + newTPR.getValue()); + }) + .collect(Collectors.toList()); + else { + var sourceChange = + beforeMigrate.entrySet().stream() + .filter(oldTPR -> !afterMigrate.containsKey(oldTPR.getKey())) + .map( + oldTPR -> + Map.entry( + TopicPartition.of(oldTPR.getKey().topic(), oldTPR.getKey().partition()), + Map.of(oldTPR.getKey().brokerId(), oldTPR.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x1, x2) -> x1)); + var sinkChange = + afterMigrate.entrySet().stream() + .filter(newTPR -> !beforeMigrate.containsKey(newTPR.getKey())) + .map( + newTPR -> + Map.entry( + TopicPartition.of(newTPR.getKey().topic(), newTPR.getKey().partition()), + Map.of(newTPR.getKey().brokerId(), newTPR.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x1, x2) -> x1)); + var change = new ArrayList(); + if (sourceChange.keySet().containsAll(sinkChange.keySet()) + && sourceChange.values().size() == sinkChange.values().size()) + sourceChange.forEach( + (tp, brokerPathMap) -> { + var sinkBrokerPaths = new HashMap<>(sinkChange.get(tp)); + brokerPathMap.forEach( + (sourceBroker, sourcePath) -> { + var sinkBrokerPath = sinkBrokerPaths.entrySet().iterator().next(); + change.add( + new ReplicaMigrateInfo( + tp, + sourceBroker, + sinkBrokerPath.getKey(), + sourcePath, + sinkBrokerPath.getValue())); + sinkBrokerPaths.remove(sinkBrokerPath.getKey()); + }); + }); + return change; + } + } + + static Map.Entry transformEntryBandwidth(Map.Entry entry) { + final Pattern serviceUrlKeyPattern = Pattern.compile("broker\\.(?[0-9]{1,50})"); + final Matcher matcher = serviceUrlKeyPattern.matcher(entry.getKey()); + if (matcher.matches()) { + try { + int brokerId = Integer.parseInt(matcher.group("brokerId")); + return Map.entry(brokerId, Integer.parseInt(entry.getValue())); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Bad integer format for " + entry.getKey(), e); + } + } else { + throw new IllegalArgumentException( + "Bad key format for " + + entry.getKey() + + " no match for the following format :" + + serviceUrlKeyPattern.pattern()); + } + } + + static Map.Entry transformEntrySize(String entry) { + Map brokerPath = new HashMap<>(); + final Pattern serviceUrlKeyPattern = + Pattern.compile("broker\\.(?[0-9]{1,9})\\.(?/.{0,50})"); + final Matcher matcher = serviceUrlKeyPattern.matcher(entry); + if (matcher.matches()) { + try { + int brokerId = Integer.parseInt(matcher.group("brokerId")); + var path = matcher.group("path"); + return Map.entry(brokerId, path); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Bad integer format for " + entry, e); + } + } else { + throw new IllegalArgumentException( + "Bad key format for " + + entry + + " no match for the following format :" + + serviceUrlKeyPattern.pattern()); + } + } + + static Properties readFile(String value) { + final Properties properties = new Properties(); + try (var reader = Files.newBufferedReader(Path.of(value))) { + properties.load(reader); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return properties; + } + + static Map bandwidthConvert(String value) { + var properties = readFile(value); + return properties.entrySet().stream() + .map(entry -> Map.entry((String) entry.getKey(), (String) entry.getValue())) + .map(MoveCost::transformEntryBandwidth) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + static Map> sizeConvert(String value) { + var properties = readFile(value); + var brokerPathSize = new HashMap>(); + properties.forEach( + (k, v) -> { + var brokerPath = transformEntrySize((String) k); + brokerPathSize + .computeIfAbsent(brokerPath.getKey(), ignore -> new HashMap<>()) + .put(brokerPath.getValue(), Integer.parseInt((String) v)); + }); + return brokerPathSize; + } +} diff --git a/app/src/test/java/org/astraea/app/cost/MoveCostTest.java b/app/src/test/java/org/astraea/app/cost/MoveCostTest.java new file mode 100644 index 0000000000..ecf670e6c6 --- /dev/null +++ b/app/src/test/java/org/astraea/app/cost/MoveCostTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.cost; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.astraea.app.admin.ClusterBean; +import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.admin.NodeInfo; +import org.astraea.app.admin.ReplicaInfo; +import org.astraea.app.admin.TopicPartition; +import org.astraea.app.admin.TopicPartitionReplica; +import org.astraea.app.metrics.BeanObject; +import org.astraea.app.metrics.broker.HasCount; +import org.astraea.app.metrics.broker.LogMetrics; +import org.astraea.app.metrics.broker.ServerMetrics; +import org.astraea.app.partitioner.Configuration; +import org.astraea.app.service.RequireBrokerCluster; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class MoveCostTest extends RequireBrokerCluster { + private static MoveCost moveCost; + private static ClusterBean clusterBean; + private static Duration duration; + + @BeforeAll + static void initCost() throws IOException { + var bandwidthConfigPath = "/tmp/testBrokerBandwidthProperties"; + var logSizeConfigPath = "/tmp/testBrokerSizeProperties"; + OutputStream output = new FileOutputStream(bandwidthConfigPath); + Properties prop = new Properties(); + prop.setProperty("broker.0", "100"); + prop.setProperty("broker.1", "100"); + prop.setProperty("broker.2", "100"); + prop.store(output, null); + OutputStream output2 = new FileOutputStream(logSizeConfigPath); + Properties prop2 = new Properties(); + prop2.store(output, null); + prop2.setProperty("broker.0./logPath01", "1000"); + prop2.setProperty("broker.0./logPath02", "1000"); + prop2.setProperty("broker.1./logPath01", "1000"); + prop2.setProperty("broker.1./logPath02", "1000"); + prop2.setProperty("broker.2./logPath01", "1000"); + prop2.setProperty("broker.2./logPath02", "1000"); + prop2.store(output2, null); + var configuration = + Configuration.of( + Map.of( + "metrics.duration", + "5", + "brokerBandwidthConfig", + bandwidthConfigPath, + "brokerCapacityConfig", + logSizeConfigPath)); + + var fakeBeanObjectByteIn1 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), 1000, 1000); + var fakeBeanObjectByteIn2 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", + ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), + 100000000, + 10000); + var fakeBeanObjectByteOut1 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(), 1000, 1000); + var fakeBeanObjectByteOut2 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", + ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(), + 100000000, + 10000); + var fakeBeanObjectReplicationByteIn1 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", + ServerMetrics.Topic.REPLICATION_BYTES_IN_PER_SEC.metricName(), + 1000, + 1000); + var fakeBeanObjectReplicationByteIn2 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", + ServerMetrics.Topic.REPLICATION_BYTES_IN_PER_SEC.metricName(), + 100000000, + 10000); + var fakeBeanObjectReplicationByteOut1 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", + ServerMetrics.Topic.REPLICATION_BYTES_OUT_PER_SEC.metricName(), + 1000, + 1000); + var fakeBeanObjectReplicationByteOut2 = + fakeBrokerBeanObject( + "BrokerTopicMetrics", + ServerMetrics.Topic.REPLICATION_BYTES_OUT_PER_SEC.metricName(), + 100000000, + 10000); + var replicaSizeBeanObject1 = + fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 100, 1000); + var replicaSizeBeanObject2 = + fakePartitionBeanObject( + "Log", LogMetrics.Log.SIZE.metricName(), "test-1", "0", 6000000, 10000); + var replicaSizeBeanObject3 = + fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 100, 1000); + var replicaSizeBeanObject4 = + fakePartitionBeanObject( + "Log", LogMetrics.Log.SIZE.metricName(), "test-1", "1", 700000, 10000); + var replicaSizeBeanObject5 = + fakePartitionBeanObject("Log", LogMetrics.Log.SIZE.metricName(), "test-2", "0", 100, 1000); + var replicaSizeBeanObject6 = + fakePartitionBeanObject( + "Log", LogMetrics.Log.SIZE.metricName(), "test-2", "0", 800000, 10000); + + moveCost = new MoveCost(configuration); + clusterBean = + ClusterBean.of( + Map.of( + 0, + List.of( + fakeBeanObjectByteIn1, + fakeBeanObjectByteIn2, + fakeBeanObjectByteOut1, + fakeBeanObjectByteOut2, + fakeBeanObjectReplicationByteIn1, + fakeBeanObjectReplicationByteIn2, + fakeBeanObjectReplicationByteOut1, + fakeBeanObjectReplicationByteOut2, + replicaSizeBeanObject1, + replicaSizeBeanObject2, + replicaSizeBeanObject5, + replicaSizeBeanObject6), + 1, + List.of( + fakeBeanObjectByteIn1, + fakeBeanObjectByteIn2, + fakeBeanObjectByteOut1, + fakeBeanObjectByteOut2, + fakeBeanObjectReplicationByteIn1, + fakeBeanObjectReplicationByteIn2, + fakeBeanObjectReplicationByteOut1, + fakeBeanObjectReplicationByteOut2, + replicaSizeBeanObject1, + replicaSizeBeanObject2, + replicaSizeBeanObject3, + replicaSizeBeanObject4), + 2, + List.of( + fakeBeanObjectByteIn1, + fakeBeanObjectByteIn2, + fakeBeanObjectByteOut1, + fakeBeanObjectByteOut2, + fakeBeanObjectReplicationByteIn1, + fakeBeanObjectReplicationByteIn2, + fakeBeanObjectReplicationByteOut1, + fakeBeanObjectReplicationByteOut2, + replicaSizeBeanObject3, + replicaSizeBeanObject4, + replicaSizeBeanObject5, + replicaSizeBeanObject6))); + duration = + Duration.ofSeconds(Integer.parseInt(configuration.string("metrics.duration").orElse("30"))); + } + + @Test + void testBrokerTrafficMetrics() { + var brokerTraffic = + moveCost.brokerTrafficMetrics( + clusterBean, ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(), duration); + Assertions.assertEquals( + brokerTraffic.get(0), (100000000 - 1000) / 1024.0 / 1024.0 / ((10000.0 - 1000.0) / 1000)); + } + + @Test + void testCheckBrokerInTraffic() throws IOException { + var tprList = + List.of( + TopicPartitionReplica.of("test-1", 0, 0), + TopicPartitionReplica.of("test-1", 1, 0), + TopicPartitionReplica.of("test-1", 2, 1), + TopicPartitionReplica.of("test-1", 3, 1), + TopicPartitionReplica.of("test-1", 4, 2), + TopicPartitionReplica.of("test-1", 5, 2)); + var replicaDataRate = + Map.of( + tprList.get(0), + 10.0, + tprList.get(1), + 5.0, + tprList.get(2), + 8.0, + tprList.get(3), + 15.0, + tprList.get(4), + 10.0, + tprList.get(5), + 11.0); + var overflowReplicaDataRate = + Map.of( + tprList.get(0), + 1000000000000.0, + tprList.get(1), + 5.0, + tprList.get(2), + 8.0, + tprList.get(3), + 15.0, + tprList.get(4), + 10.0, + tprList.get(5), + 12.0); + var migratedReplicas = + List.of( + new MoveCost.ReplicaMigrateInfo( + TopicPartition.of("test-1", 0), 0, 1, "/logPath01", "/logPath02"), + new MoveCost.ReplicaMigrateInfo( + TopicPartition.of("test-1", 1), 0, 1, "/logPath01", "/logPath02")); + var notOverflow = moveCost.checkBrokerInTraffic(replicaDataRate, migratedReplicas, clusterBean); + var overflow = + moveCost.checkBrokerInTraffic(overflowReplicaDataRate, migratedReplicas, clusterBean); + Assertions.assertFalse(notOverflow); + Assertions.assertTrue(overflow); + } + + @Test + void testCheckFolderSize() { + var tprList = + List.of( + TopicPartitionReplica.of("test-1", 0, 0), + TopicPartitionReplica.of("test-1", 1, 0), + TopicPartitionReplica.of("test-1", 2, 1), + TopicPartitionReplica.of("test-1", 3, 1)); + var replicaLogSize = + Map.of( + tprList.get(0), + 524288000L, + tprList.get(1), + 524288000L, + tprList.get(2), + 120000L, + tprList.get(3), + 15000L); + var migratedReplicas = + List.of( + new MoveCost.ReplicaMigrateInfo( + TopicPartition.of("test-1", 0), 0, 1, "/logPath01", "/logPath01"), + new MoveCost.ReplicaMigrateInfo( + TopicPartition.of("test-1", 1), 0, 1, "/logPath02", "/logPath02")); + var overflowMigratedReplicas = + List.of( + new MoveCost.ReplicaMigrateInfo( + TopicPartition.of("test-1", 0), 0, 1, "/logPath01", "/logPath01"), + new MoveCost.ReplicaMigrateInfo( + TopicPartition.of("test-1", 1), 0, 1, "/logPath02", "/logPath01")); + + var notOverflow = moveCost.checkFolderSize(replicaLogSize, migratedReplicas); + var overflow = moveCost.checkFolderSize(replicaLogSize, overflowMigratedReplicas); + Assertions.assertFalse(notOverflow); + Assertions.assertTrue(overflow); + } + + @Test + void testEstimatedMigrateTime() { + var time = moveCost.estimatedMigrateTime(originClusterInfo(), newClusterInfo(), clusterBean); + Assertions.assertEquals(0.07275465003261834, time); + } + + @Test + void testClusterCost() { + var score = moveCost.clusterCost(originClusterInfo(), newClusterInfo(), clusterBean).value(); + Assertions.assertEquals(score, 0.2622827348460017); + } + + private static LogMetrics.Log.Meter fakePartitionBeanObject( + String type, String name, String topic, String partition, long size, long time) { + return new LogMetrics.Log.Meter( + new BeanObject( + "kafka.log", + Map.of("name", name, "type", type, "topic", topic, "partition", partition), + Map.of("Value", size), + time)); + } + + private static HasCount fakeBrokerBeanObject(String type, String name, long count, long time) { + return new ServerMetrics.Topic.Meter( + new BeanObject( + "kafka.server", Map.of("type", type, "name", name), Map.of("Count", count), time)); + } + + private ClusterInfo originClusterInfo() { + + ClusterInfo clusterInfo = Mockito.mock(ClusterInfo.class); + Mockito.when(clusterInfo.nodes()) + .thenReturn( + List.of(NodeInfo.of(1, "", -1), NodeInfo.of(2, "", -1), NodeInfo.of(3, "", -1))); + Mockito.when(clusterInfo.topics()).thenReturn(Set.of("test-1", "test-2")); + Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())) + .thenAnswer( + topic -> + topic.getArgument(0).equals("test-1") + ? List.of( + ReplicaInfo.of("test-1", 0, NodeInfo.of(0, "", -1), true, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(2, "", -1), true, true, false)) + : List.of( + ReplicaInfo.of("test-2", 0, NodeInfo.of(2, "", -1), true, true, false))); + Mockito.when(clusterInfo.replicas(Mockito.anyString())) + .thenAnswer( + topic -> + topic.getArgument(0).equals("test-1") + ? List.of( + ReplicaInfo.of("test-1", 0, NodeInfo.of(0, "", -1), true, true, false), + ReplicaInfo.of("test-1", 0, NodeInfo.of(1, "", -1), false, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(1, "", -1), false, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(2, "", -1), true, true, false)) + : List.of( + ReplicaInfo.of("test-2", 0, NodeInfo.of(0, "", -1), false, true, false), + ReplicaInfo.of("test-2", 0, NodeInfo.of(2, "", -1), true, true, false))); + Mockito.when(clusterInfo.availableReplicas(Mockito.anyString())) + .thenAnswer( + topic -> + topic.getArgument(0).equals("test-1") + ? List.of( + ReplicaInfo.of("test-1", 0, NodeInfo.of(0, "", -1), true, true, false), + ReplicaInfo.of("test-1", 0, NodeInfo.of(1, "", -1), false, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(1, "", -1), false, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(2, "", -1), true, true, false)) + : List.of( + ReplicaInfo.of("test-2", 0, NodeInfo.of(0, "", -1), false, true, false), + ReplicaInfo.of("test-2", 0, NodeInfo.of(2, "", -1), true, true, false))); + return clusterInfo; + } + + private ClusterInfo newClusterInfo() { + + ClusterInfo clusterInfo = Mockito.mock(ClusterInfo.class); + Mockito.when(clusterInfo.nodes()) + .thenReturn( + List.of(NodeInfo.of(1, "", -1), NodeInfo.of(2, "", -1), NodeInfo.of(3, "", -1))); + Mockito.when(clusterInfo.topics()).thenReturn(Set.of("test-1", "test-2")); + Mockito.when(clusterInfo.availableReplicaLeaders(Mockito.anyString())) + .thenAnswer( + topic -> + topic.getArgument(0).equals("test-1") + ? List.of( + ReplicaInfo.of("test-1", 0, NodeInfo.of(2, "", -1), true, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(2, "", -1), true, true, false)) + : List.of( + ReplicaInfo.of("test-2", 0, NodeInfo.of(2, "", -1), true, true, false))); + Mockito.when(clusterInfo.replicas(Mockito.anyString())) + .thenAnswer( + topic -> + topic.getArgument(0).equals("test-1") + ? List.of( + ReplicaInfo.of("test-1", 0, NodeInfo.of(0, "", -1), false, true, false), + ReplicaInfo.of("test-1", 0, NodeInfo.of(2, "", -1), true, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(0, "", -1), false, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(2, "", -1), true, true, false)) + : List.of( + ReplicaInfo.of("test-2", 0, NodeInfo.of(1, "", -1), false, true, false), + ReplicaInfo.of("test-2", 0, NodeInfo.of(2, "", -1), true, true, false))); + Mockito.when(clusterInfo.availableReplicas(Mockito.anyString())) + .thenAnswer( + topic -> + topic.getArgument(0).equals("test-1") + ? List.of( + ReplicaInfo.of("test-1", 0, NodeInfo.of(0, "", -1), false, true, false), + ReplicaInfo.of("test-1", 0, NodeInfo.of(2, "", -1), true, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(0, "", -1), false, true, false), + ReplicaInfo.of("test-1", 1, NodeInfo.of(2, "", -1), true, true, false)) + : List.of( + ReplicaInfo.of("test-2", 0, NodeInfo.of(1, "", -1), false, true, false), + ReplicaInfo.of("test-2", 0, NodeInfo.of(2, "", -1), true, true, false))); + return clusterInfo; + } +}