Skip to content

Commit

Permalink
add ClusterCost interface (opensource4you#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
qoo332001 authored Aug 8, 2022
1 parent 89c18f7 commit 09b2d94
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 6 deletions.
24 changes: 24 additions & 0 deletions app/src/main/java/org/astraea/app/cost/ClusterCost.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.cost;

/** Return type of cost function, `HasMoveCost`. It returns the score of brokers. */
public interface ClusterCost {

/** @return cost of cluster */
double value();
}
31 changes: 31 additions & 0 deletions app/src/main/java/org/astraea/app/cost/HasClusterCost.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.cost;

import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;

public interface HasClusterCost extends CostFunction {
/**
* score cluster for a particular metrics according to passed beans and cluster information.
*
* @param clusterInfo cluster information
* @param clusterBean cluster metrics
* @return the score of cluster.
*/
ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean);
}
21 changes: 20 additions & 1 deletion app/src/main/java/org/astraea/app/cost/ReplicaLeaderCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,20 @@
import org.astraea.app.metrics.collector.Fetcher;

/** more replica leaders -> higher cost */
public class ReplicaLeaderCost implements HasBrokerCost {
public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost {
static double coefficientVariation(Map<Integer, Double> brokerScore) {
var dataRateMean =
brokerScore.values().stream().mapToDouble(x -> (double) x).sum() / brokerScore.size();
var dataRateSD =
Math.sqrt(
brokerScore.values().stream()
.mapToDouble(score -> Math.pow((score - dataRateMean), 2))
.sum()
/ brokerScore.size());
var cv = dataRateSD / dataRateMean;
if (cv > 1) return 1.0;
return cv;
}

@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
Expand All @@ -38,6 +51,12 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return () -> result;
}

@Override
public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var brokerScore = brokerCost(clusterInfo, clusterBean).value();
return () -> coefficientVariation(brokerScore);
}

Map<Integer, Integer> leaderCount(ClusterInfo ignored, ClusterBean clusterBean) {
return clusterBean.all().entrySet().stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ void testWithMetrics() {
var broker2 = List.of((HasBeanObject) LeaderCount2);
var broker3 = List.of((HasBeanObject) LeaderCount3);
var clusterBean = ClusterBean.of(Map.of(1, broker1, 2, broker2, 3, broker3));
var load = costFunction.brokerCost(ClusterInfo.EMPTY, clusterBean);
var brokerLoad = costFunction.brokerCost(ClusterInfo.EMPTY, clusterBean);
var clusterLoad = costFunction.clusterCost(ClusterInfo.EMPTY, clusterBean);

Assertions.assertEquals(3, load.value().size());
Assertions.assertEquals(3.0, load.value().get(1));
Assertions.assertEquals(4.0, load.value().get(2));
Assertions.assertEquals(5.0, load.value().get(3));
Assertions.assertEquals(3, brokerLoad.value().size());
Assertions.assertEquals(3.0, brokerLoad.value().get(1));
Assertions.assertEquals(4.0, brokerLoad.value().get(2));
Assertions.assertEquals(5.0, brokerLoad.value().get(3));
Assertions.assertEquals(0.2041241452319315, clusterLoad.value());
}

private ServerMetrics.ReplicaManager.Meter mockResult(String name, long count) {
Expand Down

0 comments on commit 09b2d94

Please sign in to comment.