Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
qoo332001 committed Aug 18, 2022
1 parent 05e6623 commit 23d95f0
Showing 1 changed file with 2 additions and 41 deletions.
43 changes: 2 additions & 41 deletions app/src/main/java/org/astraea/app/cost/MoveCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,54 +238,15 @@ public ClusterCost clusterCost(
ClusterInfo originClusterInfo, ClusterInfo newClusterInfo, ClusterBean clusterBean) {
if (overflow(originClusterInfo, newClusterInfo, clusterBean)) return () -> OVERFLOW_SCORE;
var migratedReplicas = getMigrateReplicas(originClusterInfo, newClusterInfo, true);
var replicaSize = getReplicaSize(clusterBean);
var replicaDataRate = replicaDataRate(clusterBean, duration);
var brokerMigrateInSize =
migratedReplicas.stream()
.map(
replicaMigrateInfo ->
Map.entry(
replicaMigrateInfo.brokerSink,
(double) replicaSize.get(replicaMigrateInfo.sourceTPR())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Double::sum));
var trafficSeries =
migratedReplicas.stream()
.map(x -> replicaDataRate.get(x.sourceTPR()))
.sorted()
.filter(x -> x != 0.0)
.collect(Collectors.toList());
var meanTrafficSeries = trafficSeries.stream().mapToDouble(x -> x).sum() / trafficSeries.size();
var SDTrafficSeries =
Math.sqrt(
trafficSeries.stream()
.mapToDouble(score -> Math.pow((score - meanTrafficSeries), 2))
.sum()
/ trafficSeries.size());

var totalMigrateTraffic = trafficSeries.stream().mapToDouble(x -> x).sum();
var totalReplicaTrafficInSink =
replicaDataRate.entrySet().stream()
.filter(x -> brokerMigrateInSize.containsKey(x.getKey().brokerId()))
.mapToDouble(Map.Entry::getValue)
.sum();
var meanMigrateSize = trafficSeries.stream().mapToDouble(x -> x).sum() / trafficSeries.size();
var sdMigrateSize =
Math.sqrt(
trafficSeries.stream()
.mapToDouble(score -> Math.pow((score - meanMigrateSize), 2))
.sum()
/ trafficSeries.size());
var totalMigrateSize =
brokerMigrateInSize.values().stream().mapToDouble(x -> x / 1024.0 / 1024.0).sum();
var total =
totalBrokerCapacity.values().stream()
.mapToDouble(x -> x.values().stream().mapToDouble(y -> y).sum())
.sum()
/ 1024.0
/ 1024.0;
var totalMigrateSizeScore = totalMigrateSize / total > 1 ? 1 : totalMigrateSize / total;
if (replicaDataRate.containsValue(-1.0)) return () -> 999.0;
return () -> SDTrafficSeries;
if (replicaDataRate.containsValue(-1.0)) return () -> OVERFLOW_SCORE;
return () -> Dispersion.correlationCoefficient().calculate(trafficSeries);
}

public Map<TopicPartitionReplica, Long> getReplicaSize(ClusterBean clusterBean) {
Expand Down

0 comments on commit 23d95f0

Please sign in to comment.