Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST] add MigrateTimeCost #1665

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5b5d00f
add PartitionMigrateTimeCost and revise MetricSensor#fetch
qoo332001 Apr 17, 2023
05b838e
fix issues
qoo332001 Apr 21, 2023
1b1a18a
fix conflict
qoo332001 Apr 21, 2023
43b1a5f
fix bug
qoo332001 Apr 21, 2023
749b63b
fix issues
qoo332001 Apr 23, 2023
f39ad51
fix issues
qoo332001 Apr 29, 2023
262a407
update docs
qoo332001 Apr 29, 2023
33967dd
fix issues
qoo332001 Apr 30, 2023
81439fe
fix conflict
qoo332001 Apr 30, 2023
e4e5981
fix issue
qoo332001 May 1, 2023
0bd88b1
Merge branch 'main' of https://github.com/skiptests/astraea into addP…
qoo332001 May 1, 2023
efb3216
fix issues
qoo332001 May 4, 2023
7452921
update
qoo332001 May 9, 2023
23e7f3f
fix conflict
qoo332001 May 9, 2023
ea28b65
fix issue
qoo332001 May 10, 2023
c74cf56
fix conflict
qoo332001 May 10, 2023
1ad7892
fix issues
qoo332001 May 11, 2023
60a62be
Merge branch 'main' of https://github.com/skiptests/astraea into addP…
qoo332001 May 11, 2023
e4ac803
update docs
qoo332001 May 15, 2023
311a48e
fix conflict
qoo332001 May 15, 2023
f4e1297
fix issues
May 16, 2023
1eca3ea
Merge branch 'main' of github.com:skiptests/astraea into addPartition…
May 16, 2023
9257e72
move brokerMaxRate() to MigrationCost
May 16, 2023
c27fe29
fix conflict
qoo332001 May 18, 2023
6df0a15
fix conflict
qoo332001 May 18, 2023
848f4e9
fix tests
qoo332001 May 18, 2023
e3dd72e
fix issue
qoo332001 May 22, 2023
38031ee
Merge branch 'main' of https://github.com/skiptests/astraea into addP…
qoo332001 May 22, 2023
dbae033
merge main
qoo332001 May 22, 2023
ebbc82b
remove reduant code
qoo332001 May 22, 2023
85ce98d
fix bugs
qoo332001 May 25, 2023
00a6fc7
fix bugs
qoo332001 May 25, 2023
5298769
fix issues
qoo332001 May 27, 2023
3731482
fix bugs
qoo332001 May 27, 2023
c04bdb8
fix conflict
qoo332001 May 29, 2023
a7e2515
Merge branch 'main' of https://github.com/skiptests/astraea into addP…
qoo332001 May 30, 2023
3e006c6
fix bug and merge main
qoo332001 May 30, 2023
bd59c89
fix issue
qoo332001 May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ public SensorAndCost(Configuration configuration) {

@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of((c, ignored) -> List.of(HostMetrics.jvmMemory(c)));
return Optional.of((ignore, c, ignored) -> List.of(HostMetrics.jvmMemory(c)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ static ClusterBean of(Map<Integer, MBeanClient> clients, MetricSensor sensor) {
Map.Entry::getKey,
entry ->
sensor.fetch(
MBeanClient.of(entry.getValue().beans(BeanQuery.all())), EMPTY))));
entry.getKey(),
MBeanClient.of(entry.getValue().beans(BeanQuery.all())),
EMPTY))));
}

static ClusterBean masked(ClusterBean clusterBean, Predicate<Integer> nodeFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of(
(client, ignored) -> List.of(ServerMetrics.BrokerTopic.BYTES_IN_PER_SEC.fetch(client)));
(ignore, client, ignored) ->
List.of(ServerMetrics.BrokerTopic.BYTES_IN_PER_SEC.fetch(client)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of(
(client, ignored) -> List.of(ServerMetrics.BrokerTopic.BYTES_OUT_PER_SEC.fetch(client)));
(ignore, client, ignored) ->
List.of(ServerMetrics.BrokerTopic.BYTES_OUT_PER_SEC.fetch(client)));
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/java/org/astraea/common/cost/CpuCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {

@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of((client, ignored) -> List.of(HostMetrics.operatingSystem(client)));
return Optional.of((ignore, client, ignored) -> List.of(HostMetrics.operatingSystem(client)));
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/java/org/astraea/common/cost/LoadCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Map<Integer, Integer> computeLoad(Map<Integer, Collection<HasBeanObject>> allBea
@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of(
(client, ignored) ->
(ignore, client, ignored) ->
List.of(
ServerMetrics.BrokerTopic.BYTES_IN_PER_SEC.fetch(client),
ServerMetrics.BrokerTopic.BYTES_OUT_PER_SEC.fetch(client)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {

@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of((client, ignored) -> List.of(HostMetrics.jvmMemory(client)));
return Optional.of((ignore, client, ignored) -> List.of(HostMetrics.jvmMemory(client)));
}

@Override
Expand Down
58 changes: 57 additions & 1 deletion common/src/main/java/org/astraea/common/cost/MigrationCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
*/
package org.astraea.common.cost;

import static org.astraea.common.cost.PartitionMigrateTimeCost.brokerMaxRate;
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;

public class MigrationCost {

public final String name;

public final Map<Integer, Long> brokerCosts;
public static final String TO_SYNC_BYTES = "record size to sync (bytes)";
public static final String TO_FETCH_BYTES = "record size to fetch (bytes)";
Expand Down Expand Up @@ -69,6 +71,60 @@ static Map<Integer, Long> replicaLeaderChanged(ClusterInfo before, ClusterInfo a
return changedReplicaNumber(before, after, Replica::isLeader);
}

/**
* @param before the ClusterInfo before migrated replicas
* @param after the ClusterInfo after migrated replicas
* @param clusterBean cluster metrics
* @return estimated migrated time required by all brokers (seconds)
*/
public static Map<Integer, Long> brokerMigrationTime(
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var brokerInRate =
brokerMaxRate(before, clusterBean, PartitionMigrateTimeCost.MaxReplicationInRateBean.class);
var brokerOutRate =
brokerMaxRate(
before, clusterBean, PartitionMigrateTimeCost.MaxReplicationOutRateBean.class);
var brokerMigrateInTime =
MigrationCost.recordSizeToFetch(before, after).entrySet().stream()
.map(
brokerSize ->
Map.entry(
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
brokerSize.getKey(),
brokerSize.getValue()
/ brokerInRate
.get(brokerSize.getKey())
.orElseThrow(
() ->
new RuntimeException(
"No any metric for broker" + brokerSize.getKey()))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
var brokerMigrateOutTime =
MigrationCost.recordSizeToSync(before, after).entrySet().stream()
.map(
brokerSize ->
Map.entry(
brokerSize.getKey(),
brokerSize.getValue()
/ brokerOutRate
.get(brokerSize.getKey())
.orElseThrow(
() ->
new RuntimeException(
"No any metric for broker" + brokerSize.getKey()))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Stream.concat(before.nodes().stream(), after.nodes().stream())
.distinct()
.map(
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
nodeInfo ->
Map.entry(
nodeInfo.id(),
(long)
Math.max(
brokerMigrateInTime.get(nodeInfo.id()),
brokerMigrateOutTime.get(nodeInfo.id()))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* @param before the ClusterInfo before migrated replicas
* @param after the ClusterInfo after migrated replicas
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/astraea/common/cost/NetworkCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ public Optional<MetricSensor> metricSensor() {
// obtain the replica info, so we intentionally sample log size but never use it.
// https://github.com/skiptests/astraea/pull/1240#discussion_r1044487473
return Optional.of(
(client, clusterBean) ->
(identity, client, clusterBean) ->
Stream.of(
List.of(HostMetrics.jvmMemory(client)),
ServerMetrics.Topic.BYTES_IN_PER_SEC.fetch(client),
ServerMetrics.Topic.BYTES_OUT_PER_SEC.fetch(client),
LogMetrics.Log.SIZE.fetch(client),
clusterInfoSensor.fetch(client, clusterBean))
clusterInfoSensor.fetch(identity, client, clusterBean))
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {

@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of((client, clusterBean) -> ProducerMetrics.node(client));
return Optional.of((ignore, client, clusterBean) -> ProducerMetrics.node(client));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.Configuration;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.HasBeanObject;
import org.astraea.common.metrics.Sensor;
import org.astraea.common.metrics.broker.HasMeter;
import org.astraea.common.metrics.broker.HasRate;
import org.astraea.common.metrics.broker.ServerMetrics;
import org.astraea.common.metrics.collector.MetricSensor;
import org.astraea.common.metrics.stats.Max;

/** MoveCost: more max write rate change -> higher migrate cost. */
public class PartitionMigrateTimeCost implements HasMoveCost {
private static final String REPLICATION_IN_RATE = "replication_in_rate";
private static final String REPLICATION_OUT_RATE = "replication_out_rate";
public static final String MAX_MIGRATE_TIME_KEY = "max.migrated.time.limit";
static final Map<Integer, Sensor<Double>> maxBrokerReplicationInRate = new HashMap<>();
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
static final Map<Integer, Sensor<Double>> maxBrokerReplicationOutRate = new HashMap<>();
// metrics windows size
private final long maxMigrateTime;

public PartitionMigrateTimeCost(Configuration config) {
this.maxMigrateTime =
config.string(MAX_MIGRATE_TIME_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE);
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of(
(identity, client, clusterBean) -> {
var metrics =
clusterBean.all().values().stream()
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toCollection(ArrayList::new));
var newInMetrics = ServerMetrics.BrokerTopic.REPLICATION_BYTES_IN_PER_SEC.fetch(client);
var newOutMetrics = ServerMetrics.BrokerTopic.REPLICATION_BYTES_OUT_PER_SEC.fetch(client);
var current = Duration.ofMillis(System.currentTimeMillis());
var maxInRateSensor =
maxBrokerReplicationInRate.computeIfAbsent(
identity,
ignore ->
Sensor.builder().addStat(REPLICATION_IN_RATE, Max.<Double>of()).build());
var maxOutRateSensor =
maxBrokerReplicationOutRate.computeIfAbsent(
identity,
ignore ->
Sensor.builder().addStat(REPLICATION_OUT_RATE, Max.<Double>of()).build());
maxInRateSensor.record(newInMetrics.oneMinuteRate());
maxOutRateSensor.record(newOutMetrics.oneMinuteRate());
var inRate = maxInRateSensor.measure(REPLICATION_IN_RATE);
var outRate = maxOutRateSensor.measure(REPLICATION_OUT_RATE);
metrics.add(
(MaxReplicationInRateBean)
() ->
new BeanObject(
newInMetrics.beanObject().domainName(),
newInMetrics.beanObject().properties(),
Map.of(HasRate.ONE_MIN_RATE_KEY, inRate),
current.toMillis()));
metrics.add(
(MaxReplicationOutRateBean)
() ->
new BeanObject(
newOutMetrics.beanObject().domainName(),
newOutMetrics.beanObject().properties(),
Map.of(HasRate.ONE_MIN_RATE_KEY, outRate),
current.toMillis()));
return metrics;
});
}

public static Map<Integer, OptionalDouble> brokerMaxRate(
ClusterInfo clusterInfo,
ClusterBean clusterBean,
Class<? extends HasBeanObject> statisticMetrics) {
return clusterInfo.brokers().stream()
.map(
broker ->
Map.entry(
broker.id(),
clusterBean.all().getOrDefault(broker.id(), List.of()).stream()
.filter(x -> statisticMetrics.isAssignableFrom(x.getClass()))
.mapToDouble(x -> ((HasMeter) x).oneMinuteRate())
.max()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<Integer, Double> brokerMigratedTime(
Map<Integer, Long> needToMigrated, Map<Integer, OptionalDouble> brokerRate) {
return needToMigrated.entrySet().stream()
.map(
brokerSize ->
Map.entry(
brokerSize.getKey(),
brokerSize.getValue()
/ brokerRate
.get(brokerSize.getKey())
.orElseThrow(
() ->
new NoSufficientMetricsException(
this,
Duration.ofSeconds(1),
"No metric for broker" + brokerSize.getKey()))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var brokerInRate = brokerMaxRate(before, clusterBean, MaxReplicationInRateBean.class);
var brokerOutRate = brokerMaxRate(before, clusterBean, MaxReplicationOutRateBean.class);
var needToMigrateIn = MigrationCost.recordSizeToFetch(before, after);
var needToMigrateOut = MigrationCost.recordSizeToSync(before, after);

var brokerMigrateInTime = brokerMigratedTime(needToMigrateIn, brokerInRate);
var brokerMigrateOutTime = brokerMigratedTime(needToMigrateOut, brokerOutRate);
var maxMigrateTime =
Stream.concat(before.nodes().stream(), after.nodes().stream())
.distinct()
.map(
nodeInfo ->
Math.max(
brokerMigrateInTime.get(nodeInfo.id()),
brokerMigrateOutTime.get(nodeInfo.id())))
.max(Comparator.comparing(Function.identity()))
.orElse(0.0);
return () -> maxMigrateTime > this.maxMigrateTime;
}

public interface MaxReplicationInRateBean extends HasMeter {}

public interface MaxReplicationOutRateBean extends HasMeter {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ static Map<Integer, Integer> leaderCount(ClusterInfo clusterInfo) {
@Override
public Optional<MetricSensor> metricSensor() {
return Optional.of(
(client, ignored) -> List.of(ServerMetrics.ReplicaManager.LEADER_COUNT.fetch(client)));
(ignore, client, ignored) ->
List.of(ServerMetrics.ReplicaManager.LEADER_COUNT.fetch(client)));
}

public Configuration config() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class ClusterInfoSensor implements MetricSensor {

@Override
public List<? extends HasBeanObject> fetch(MBeanClient client, ClusterBean bean) {
public List<? extends HasBeanObject> fetch(int identity, MBeanClient client, ClusterBean bean) {
return Stream.of(
List.of(ServerMetrics.KafkaServer.CLUSTER_ID.fetch(client)),
LogMetrics.Log.SIZE.fetch(client),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.astraea.common.metrics.HasBeanObject;

public interface HasRate extends HasBeanObject {
String ONE_MIN_RATE_KEY = "OneMinuteRate";
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
String FIVE_MINUTE_RATE = "FiveMinuteRate";
String FIFTEEN_MINUTE_RATE = "FifteenMinuteRate";

qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
default double meanRate() {
return (double) beanObject().attributes().get("MeanRate");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ static Optional<MetricSensor> of(
Collection<MetricSensor> metricSensors, Consumer<Exception> exceptionHandler) {
if (metricSensors.isEmpty()) return Optional.empty();
return Optional.of(
(client, clusterBean) ->
(identity, client, clusterBean) ->
metricSensors.stream()
.flatMap(
ms -> {
try {
return ms.fetch(client, clusterBean).stream();
return ms.fetch(identity, client, clusterBean).stream();
} catch (Exception ex) {
exceptionHandler.accept(ex);
return Stream.empty();
Expand All @@ -66,9 +66,10 @@ static Optional<MetricSensor> of(
* fetch metrics from remote/local mbean server. Or the implementation can generate custom metrics
* according to existent cluster bean
*
* @param identity
* @param client mbean client (don't close it!)
* @param bean current cluster bean
* @return java metrics
*/
Collection<? extends HasBeanObject> fetch(MBeanClient client, ClusterBean bean);
Collection<? extends HasBeanObject> fetch(int identity, MBeanClient client, ClusterBean bean);
}
Loading