diff --git a/common/src/main/java/org/astraea/common/cost/EstimationMethod.java b/common/src/main/java/org/astraea/common/cost/EstimationMethod.java
new file mode 100644
index 0000000000..4cb333ae72
--- /dev/null
+++ b/common/src/main/java/org/astraea/common/cost/EstimationMethod.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.astraea.common.cost;
+
+import org.astraea.common.EnumInfo;
+
+/** Method to estimate the partition bandwidth */
+enum EstimationMethod implements EnumInfo {
+ BROKER_TOPIC_ONE_MINUTE_RATE,
+ BROKER_TOPIC_FIVE_MINUTE_RATE,
+ BROKER_TOPIC_FIFTEEN_MINUTE_RATE;
+
+ static EstimationMethod ofAlias(String alias) {
+ return EnumInfo.ignoreCaseEnum(EstimationMethod.class, alias);
+ }
+
+ @Override
+ public String alias() {
+ return name();
+ }
+
+ @Override
+ public String toString() {
+ return alias();
+ }
+}
diff --git a/common/src/main/java/org/astraea/common/cost/NetworkCost.java b/common/src/main/java/org/astraea/common/cost/NetworkCost.java
index bc7931ad40..a4205a3f7e 100644
--- a/common/src/main/java/org/astraea/common/cost/NetworkCost.java
+++ b/common/src/main/java/org/astraea/common/cost/NetworkCost.java
@@ -26,6 +26,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.astraea.common.Configuration;
import org.astraea.common.DataRate;
import org.astraea.common.EnumInfo;
import org.astraea.common.admin.BrokerTopic;
@@ -36,7 +37,6 @@
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.cost.utils.ClusterInfoSensor;
import org.astraea.common.metrics.HasBeanObject;
-import org.astraea.common.metrics.broker.HasRate;
import org.astraea.common.metrics.broker.LogMetrics;
import org.astraea.common.metrics.broker.ServerMetrics;
import org.astraea.common.metrics.collector.MetricSensor;
@@ -60,17 +60,32 @@
* every consumer fetches data from the leader(which is the default behavior of Kafka). For
* more detail about consumer rack awareness or how consumer can fetch data from the closest
* replica, see KIP-392.
+ * NetworkCost implementation use broker-topic bandwidth rate and some other info to estimate
+ * the broker-topic-partition bandwidth rate. The implementation assume the broker-topic
+ * bandwidth is correct and steadily reflect the actual resource usage. This is generally true
+ * when the broker has reach its steady state, but to reach that state might takes awhile. And
+ * based on our observation this probably won't happen at the early broker start (see Issue #1641). We suggest use
+ * this cost with metrics from the servers in steady state.
*
*/
public abstract class NetworkCost implements HasClusterCost {
+ public static final String NETWORK_COST_ESTIMATION_METHOD = "network.cost.estimation.method";
+
+ private final EstimationMethod estimationMethod;
private final BandwidthType bandwidthType;
private final Map calculationCache;
private final ClusterInfoSensor clusterInfoSensor = new ClusterInfoSensor();
- NetworkCost(BandwidthType bandwidthType) {
+ NetworkCost(Configuration config, BandwidthType bandwidthType) {
this.bandwidthType = bandwidthType;
this.calculationCache = new ConcurrentHashMap<>();
+ this.estimationMethod =
+ config
+ .string(NETWORK_COST_ESTIMATION_METHOD)
+ .map(EstimationMethod::ofAlias)
+ .orElse(EstimationMethod.BROKER_TOPIC_ONE_MINUTE_RATE);
}
void noMetricCheck(ClusterBean clusterBean) {
@@ -221,7 +236,20 @@ Map estimateRate(
.brokerTopicMetrics(bt, ServerMetrics.Topic.Meter.class)
.filter(bean -> bean.type().equals(metric))
.max(Comparator.comparingLong(HasBeanObject::createdTimestamp))
- .map(HasRate::fifteenMinuteRate)
+ .map(
+ hasRate -> {
+ switch (estimationMethod) {
+ case BROKER_TOPIC_ONE_MINUTE_RATE:
+ return hasRate.oneMinuteRate();
+ case BROKER_TOPIC_FIVE_MINUTE_RATE:
+ return hasRate.fiveMinuteRate();
+ case BROKER_TOPIC_FIFTEEN_MINUTE_RATE:
+ return hasRate.fifteenMinuteRate();
+ default:
+ throw new IllegalStateException(
+ "Unknown estimation method: " + estimationMethod);
+ }
+ })
// no load metric for this partition, treat as zero load
.orElse(0.0);
if (Double.isNaN(totalShare) || totalShare < 0)
diff --git a/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java b/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java
index 94da5fb7b7..dfa27d86c9 100644
--- a/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java
+++ b/common/src/main/java/org/astraea/common/cost/NetworkEgressCost.java
@@ -16,13 +16,15 @@
*/
package org.astraea.common.cost;
+import org.astraea.common.Configuration;
+
/**
* A cost function to evaluate cluster load balance score in terms of message egress data rate. See
* {@link NetworkCost} for further detail.
*/
public class NetworkEgressCost extends NetworkCost {
- public NetworkEgressCost() {
- super(BandwidthType.Egress);
+ public NetworkEgressCost(Configuration config) {
+ super(config, BandwidthType.Egress);
}
@Override
diff --git a/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java b/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java
index 758e622adb..d1ddd918a2 100644
--- a/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java
+++ b/common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java
@@ -37,7 +37,7 @@ public class NetworkIngressCost extends NetworkCost implements HasPartitionCost
private static final String TRAFFIC_INTERVAL = "traffic.interval";
public NetworkIngressCost(Configuration config) {
- super(BandwidthType.Ingress);
+ super(config, BandwidthType.Ingress);
this.config = config;
}
diff --git a/common/src/main/java/org/astraea/common/metrics/broker/HasRate.java b/common/src/main/java/org/astraea/common/metrics/broker/HasRate.java
index 268607c0be..34f93751c2 100644
--- a/common/src/main/java/org/astraea/common/metrics/broker/HasRate.java
+++ b/common/src/main/java/org/astraea/common/metrics/broker/HasRate.java
@@ -21,19 +21,19 @@
public interface HasRate extends HasBeanObject {
default double meanRate() {
- return (double) beanObject().attributes().getOrDefault("MeanRate", 0);
+ return (double) beanObject().attributes().getOrDefault("MeanRate", 0.0);
}
default double oneMinuteRate() {
- return (double) beanObject().attributes().getOrDefault("OneMinuteRate", 0);
+ return (double) beanObject().attributes().getOrDefault("OneMinuteRate", 0.0);
}
default double fiveMinuteRate() {
- return (double) beanObject().attributes().getOrDefault("FiveMinuteRate", 0);
+ return (double) beanObject().attributes().getOrDefault("FiveMinuteRate", 0.0);
}
default double fifteenMinuteRate() {
- return (double) beanObject().attributes().getOrDefault("FifteenMinuteRate", 0);
+ return (double) beanObject().attributes().getOrDefault("FifteenMinuteRate", 0.0);
}
default TimeUnit rateUnit() {
diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java
index 9f621762e0..097cdaac00 100644
--- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java
+++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java
@@ -32,6 +32,7 @@
import java.util.stream.Stream;
import org.astraea.common.Configuration;
import org.astraea.common.DataRate;
+import org.astraea.common.Utils;
import org.astraea.common.admin.BrokerTopic;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
@@ -54,6 +55,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -104,7 +106,7 @@ void testEstimatedEgressRate() {
Map.entry(TopicPartition.of("Pipeline-0"), 455730337L));
Assertions.assertEquals(
expected,
- new NetworkEgressCost()
+ new NetworkEgressCost(Configuration.EMPTY)
.estimateRate(t.clusterInfo(), t.clusterBean(), ServerMetrics.Topic.BYTES_OUT_PER_SEC));
}
@@ -118,10 +120,14 @@ static Stream testcases() {
new NetworkIngressCost(Configuration.EMPTY), new LargeTestCase(10, 200, 0xcafebabe)),
Arguments.of(
new NetworkIngressCost(Configuration.EMPTY), new LargeTestCase(15, 300, 0xfee1dead)),
- Arguments.of(new NetworkEgressCost(), new LargeTestCase(5, 100, 0xfa11fa11)),
- Arguments.of(new NetworkEgressCost(), new LargeTestCase(6, 100, 0xf001f001)),
- Arguments.of(new NetworkEgressCost(), new LargeTestCase(8, 200, 0xba1aba1a)),
- Arguments.of(new NetworkEgressCost(), new LargeTestCase(14, 300, 0xdd0000bb)));
+ Arguments.of(
+ new NetworkEgressCost(Configuration.EMPTY), new LargeTestCase(5, 100, 0xfa11fa11)),
+ Arguments.of(
+ new NetworkEgressCost(Configuration.EMPTY), new LargeTestCase(6, 100, 0xf001f001)),
+ Arguments.of(
+ new NetworkEgressCost(Configuration.EMPTY), new LargeTestCase(8, 200, 0xba1aba1a)),
+ Arguments.of(
+ new NetworkEgressCost(Configuration.EMPTY), new LargeTestCase(14, 300, 0xdd0000bb)));
}
@ParameterizedTest
@@ -152,7 +158,7 @@ void testCompositeOptimization() {
HasClusterCost.of(
Map.of(
new NetworkIngressCost(Configuration.EMPTY), 1.0,
- new NetworkEgressCost(), 1.0));
+ new NetworkEgressCost(Configuration.EMPTY), 1.0));
testOptimization(costFunction, testCase);
}
@@ -237,7 +243,8 @@ void testReplicationAware() {
(expectedEgress3 - expectedEgress2) / Math.max(ingressSum, egressSum);
double ingressScore =
new NetworkIngressCost(Configuration.EMPTY).clusterCost(cluster, beans).value();
- double egressScore = new NetworkEgressCost().clusterCost(cluster, beans).value();
+ double egressScore =
+ new NetworkEgressCost(Configuration.EMPTY).clusterCost(cluster, beans).value();
Assertions.assertTrue(
around.apply(expectedIngressScore).test(ingressScore),
"Ingress score should be " + expectedIngressScore + " but it is " + ingressScore);
@@ -299,11 +306,12 @@ void testZeroBandwidth() {
() -> new NetworkIngressCost(Configuration.EMPTY).clusterCost(cluster, beans),
"Metric sampled but no load value, treat as zero load");
Assertions.assertDoesNotThrow(
- () -> new NetworkEgressCost().clusterCost(cluster, beans),
+ () -> new NetworkEgressCost(Configuration.EMPTY).clusterCost(cluster, beans),
"Metric sampled but no load value, treat as zero load");
Assertions.assertEquals(
0, new NetworkIngressCost(Configuration.EMPTY).clusterCost(cluster, beans).value());
- Assertions.assertEquals(0, new NetworkEgressCost().clusterCost(cluster, beans).value());
+ Assertions.assertEquals(
+ 0, new NetworkEgressCost(Configuration.EMPTY).clusterCost(cluster, beans).value());
Assertions.assertThrows(
NoSufficientMetricsException.class,
@@ -315,7 +323,7 @@ void testZeroBandwidth() {
Assertions.assertThrows(
NoSufficientMetricsException.class,
() ->
- new NetworkEgressCost()
+ new NetworkEgressCost(Configuration.EMPTY)
.clusterCost(
cluster, ClusterBean.of(Map.of(1, List.of(), 2, List.of(), 3, List.of()))),
"Should raise a exception since we don't know if first sample is performed or not");
@@ -334,7 +342,11 @@ void testExpectedImprovement(int seed) {
new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 31), (x) -> true);
var costFunction =
HasClusterCost.of(
- Map.of(new NetworkIngressCost(Configuration.EMPTY), 1.0, new NetworkEgressCost(), 1.0));
+ Map.of(
+ new NetworkIngressCost(Configuration.EMPTY),
+ 1.0,
+ new NetworkEgressCost(Configuration.EMPTY),
+ 1.0));
var originalCost = costFunction.clusterCost(clusterInfo, clusterBean);
Function experiment =
@@ -406,7 +418,8 @@ void testZeroReplicaBroker() {
Assertions.assertEquals(2, costI.brokerRate.size());
Assertions.assertEquals(0L, costI.brokerRate.get(node.id()));
var costE =
- (NetworkCost.NetworkClusterCost) new NetworkEgressCost().clusterCost(scaledCluster, beans);
+ (NetworkCost.NetworkClusterCost)
+ new NetworkEgressCost(Configuration.EMPTY).clusterCost(scaledCluster, beans);
Assertions.assertEquals(2, costE.brokerRate.size());
Assertions.assertEquals(0L, costE.brokerRate.get(node.id()));
}
@@ -467,6 +480,76 @@ void testPartitionCost() {
Assertions.assertEquals(ingressPartitionCost.get(TopicPartition.of("test-8")), (double) 9 / 18);
}
+ @ParameterizedTest
+ @CsvSource({
+ "BYTES_IN_PER_SEC, org.astraea.common.cost.NetworkIngressCost",
+ "BYTES_IN_PER_SEC, org.astraea.common.cost.NetworkEgressCost",
+ "BYTES_OUT_PER_SEC, org.astraea.common.cost.NetworkEgressCost",
+ "BYTES_OUT_PER_SEC, org.astraea.common.cost.NetworkEgressCost",
+ })
+ void testEstimationMethod(ServerMetrics.Topic metric, Class extends NetworkCost> clz) {
+ var cluster =
+ ClusterInfoBuilder.builder()
+ .addNode(Set.of(1))
+ .addFolders(Map.of(1, Set.of("/folder")))
+ .addTopic("Topic", 1, (short) 1)
+ .mapLog(r -> Replica.builder(r).size(100).build())
+ .build();
+ var partition = TopicPartition.of("Topic", 0);
+
+ var domainName = "kafka.server";
+ var one = (double) ThreadLocalRandom.current().nextInt(1, 10000);
+ var five = (double) ThreadLocalRandom.current().nextInt(1, 10000);
+ var fifteen = (double) ThreadLocalRandom.current().nextInt(1, 10000);
+ var beans =
+ ClusterBean.of(
+ Map.of(
+ 1,
+ List.of(
+ MetricFactory.ofPartitionMetric(partition.topic(), partition.partition(), 1),
+ new ServerMetrics.Topic.Meter(
+ new BeanObject(
+ domainName,
+ Map.of(
+ "type",
+ "BrokerTopicMetrics",
+ "topic",
+ "Topic",
+ "name",
+ metric.alias()),
+ Map.of(
+ "OneMinuteRate", one,
+ "FiveMinuteRate", five,
+ "FifteenMinuteRate", fifteen))))));
+
+ var oneCost =
+ Utils.construct(
+ clz,
+ Configuration.of(
+ Map.of(
+ NetworkCost.NETWORK_COST_ESTIMATION_METHOD, "BROKER_TOPIC_ONE_MINUTE_RATE")));
+ var fiveCost =
+ Utils.construct(
+ clz,
+ Configuration.of(
+ Map.of(
+ NetworkCost.NETWORK_COST_ESTIMATION_METHOD, "BROKER_TOPIC_FIVE_MINUTE_RATE")));
+ var fifteenCost =
+ Utils.construct(
+ clz,
+ Configuration.of(
+ Map.of(
+ NetworkCost.NETWORK_COST_ESTIMATION_METHOD,
+ "BROKER_TOPIC_FIFTEEN_MINUTE_RATE")));
+
+ Assertions.assertEquals(
+ one, oneCost.estimateRate(cluster, beans, metric).get(partition).doubleValue());
+ Assertions.assertEquals(
+ five, fiveCost.estimateRate(cluster, beans, metric).get(partition).doubleValue());
+ Assertions.assertEquals(
+ fifteen, fifteenCost.estimateRate(cluster, beans, metric).get(partition).doubleValue());
+ }
+
interface TestCase {
ClusterInfo clusterInfo();
@@ -543,6 +626,20 @@ public LargeTestCase(int brokers, int partitions, int seed) {
.builder()
.topic(topic)
.time(time.toEpochSecond(ZoneOffset.UTC))
+ .oneMinuteRate(
+ clusterInfo
+ .replicaStream(BrokerTopic.of(broker, topic))
+ .filter(Replica::isLeader)
+ .filter(Replica::isOnline)
+ .mapToDouble(r -> rate.get(r.topicPartition()))
+ .sum())
+ .fiveMinuteRate(
+ clusterInfo
+ .replicaStream(BrokerTopic.of(broker, topic))
+ .filter(Replica::isLeader)
+ .filter(Replica::isOnline)
+ .mapToDouble(r -> rate.get(r.topicPartition()))
+ .sum())
.fifteenMinuteRate(
clusterInfo
.replicaStream(BrokerTopic.of(broker, topic))
@@ -557,6 +654,26 @@ public LargeTestCase(int brokers, int partitions, int seed) {
.builder()
.topic(topic)
.time(time.toEpochSecond(ZoneOffset.UTC))
+ .oneMinuteRate(
+ clusterInfo
+ .replicaStream(BrokerTopic.of(broker, topic))
+ .filter(Replica::isLeader)
+ .filter(Replica::isOnline)
+ .mapToDouble(
+ r ->
+ rate.get(r.topicPartition())
+ * consumerFanout.get(r.topicPartition()))
+ .sum())
+ .fiveMinuteRate(
+ clusterInfo
+ .replicaStream(BrokerTopic.of(broker, topic))
+ .filter(Replica::isLeader)
+ .filter(Replica::isOnline)
+ .mapToDouble(
+ r ->
+ rate.get(r.topicPartition())
+ * consumerFanout.get(r.topicPartition()))
+ .sum())
.fifteenMinuteRate(
clusterInfo
.replicaStream(BrokerTopic.of(broker, topic))
@@ -615,12 +732,16 @@ static ServerMetrics.Topic.Meter noise(int seed) {
}
static ServerMetrics.Topic.Meter bandwidth(
- ServerMetrics.Topic metric, String topic, double fifteenRate) {
+ ServerMetrics.Topic metric, String topic, double steadyStateRate) {
if (metric == null) return noise(0);
var domainName = "kafka.server";
var properties =
- Map.of("type", "BrokerTopicMetric", "topic", topic, "name", metric.metricName());
- var attributes = Map.of("FifteenMinuteRate", fifteenRate);
+ Map.of("type", "BrokerTopicMetrics", "topic", topic, "name", metric.metricName());
+ var attributes =
+ Map.of(
+ "OneMinuteRate", steadyStateRate,
+ "FiveMinuteRate", steadyStateRate,
+ "FifteenMinuteRate", steadyStateRate);
return new ServerMetrics.Topic.Meter(new BeanObject(domainName, properties, attributes));
}