Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST] Implement feedback for network ingress cost #1637

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/src/main/java/org/astraea/common/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ default Optional<Duration> duration(String key) {
return string(key).map(Utils::toDuration);
}

/**
* @param key the key whose associated value is to be returned
* @return Long value. If there is no key, return Optional.Empty
*/
default Optional<DataSize> dataSize(String key) {
return string(key).map(DataSize::of);
}

default int requireInteger(String key) {
return integer(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent"));
}
Expand Down
156 changes: 134 additions & 22 deletions common/src/main/java/org/astraea/common/cost/NetworkIngressCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.astraea.common.cost;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.Replica;
Expand All @@ -35,6 +39,8 @@ public class NetworkIngressCost extends NetworkCost implements HasPartitionCost
private final Configuration config;
private static final String UPPER_BOUND = "upper.bound";
private static final String TRAFFIC_INTERVAL = "traffic.interval";
private final DataSize DEFAULT_UPPER_BOUND_BYTES = DataSize.MB.of(30);
private final DataSize DEFAULT_TRAFFIC_INTERVAL_BYTES = DataSize.MB.of(10);

public NetworkIngressCost(Configuration config) {
super(BandwidthType.Ingress);
Expand All @@ -45,45 +51,57 @@ public NetworkIngressCost(Configuration config) {
public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
noMetricCheck(clusterBean);

var partitionCost =
var partitionTraffic =
estimateRate(clusterInfo, clusterBean, ServerMetrics.Topic.BYTES_IN_PER_SEC)
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> (double) e.getValue()));
var partitionTrafficPerBroker = wrappedByNode(partitionTraffic, clusterInfo);

var partitionPerBroker = new HashMap<Integer, Map<TopicPartition, Double>>();

clusterInfo.nodes().forEach(node -> partitionPerBroker.put(node.id(), new HashMap<>()));

clusterInfo
.replicaStream()
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.forEach(
replica -> {
var tp = replica.topicPartition();
var id = replica.nodeInfo().id();
partitionPerBroker.get(id).put(tp, partitionCost.get(tp));
});

var result =
partitionPerBroker.values().stream()
var partitionCost =
partitionTrafficPerBroker.values().stream()
.map(
topicPartitionDoubleMap ->
Normalizer.proportion().normalize(topicPartitionDoubleMap))
.flatMap(cost -> cost.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
var partitionCostPerBroker = wrappedByNode(partitionCost, clusterInfo);

return new PartitionCost() {
@Override
public Map<TopicPartition, Double> value() {
return result;
return partitionCost;
}

@Override
public Map<TopicPartition, Set<TopicPartition>> incompatibility() {
// TODO: Impl feedback logic, use Map.of() instead of incompatible partitions temporary
return Map.of();
Map<TopicPartition, Set<TopicPartition>> incompatible =
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
partitionCost.keySet().stream()
.map(tp -> Map.entry(tp, new HashSet<TopicPartition>()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

partitionCostPerBroker.forEach(
(brokerId, partitionCost) -> {
var partitionTraffic = partitionTrafficPerBroker.get(brokerId);
var trafficInterval = trafficToCostInterval(partitionTraffic, partitionCost);
var partitionSet =
partitionCost.entrySet().stream()
.collect(
Collectors.groupingBy(
e -> intervalOrder(trafficInterval, e.getValue()),
Collectors.mapping(
Map.Entry::getKey, Collectors.toUnmodifiableSet())));

partitionCost.forEach(
(tp, cost) -> {
for (var intervals : partitionSet.entrySet()) {
if (!intervals.getValue().contains(tp))
incompatible.get(tp).addAll(intervals.getValue());
}
});
});

return incompatible;
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
}
};
}
Expand All @@ -92,4 +110,98 @@ public Map<TopicPartition, Set<TopicPartition>> incompatibility() {
public String toString() {
return this.getClass().getSimpleName();
}

// --------------------[helper]--------------------

/**
* assign partition to the number by traffic interval
*
* @param interval traffic interval
* @param value the partition cost
* @return the number that partition belongs
*/
private double intervalOrder(List<Double> interval, double value) {
for (var v : interval) {
if (value < v) return v;
}
return Double.MAX_VALUE;
}

/**
* Obtain the cost for each interval to classify partitions into corresponding intervals for later
* incompatibility.
*
* @param partitionTraffic the traffic of partition in the same node
* @param partitionCost the cost of partition in the same node
* @return the interval costs
*/
private List<Double> trafficToCostInterval(
Map<TopicPartition, Double> partitionTraffic, Map<TopicPartition, Double> partitionCost) {
var upperBound =
convertTrafficToCost(
partitionTraffic,
partitionCost,
config.dataSize(UPPER_BOUND).orElse(DEFAULT_UPPER_BOUND_BYTES));
var trafficInterval =
convertTrafficToCost(
partitionTraffic,
partitionCost,
config.dataSize(TRAFFIC_INTERVAL).orElse(DEFAULT_TRAFFIC_INTERVAL_BYTES));
var count = (int) Math.ceil(upperBound / trafficInterval);
var largerThanUpperBound = 1;

return IntStream.range(0, count + largerThanUpperBound)
.mapToDouble(
i -> {
if (i == count) return Double.MAX_VALUE;
var traffic = trafficInterval * (i + 1);
return Math.min(traffic, upperBound);
})
.boxed()
.collect(Collectors.toUnmodifiableList());
}

/**
* Converting partition traffic within the same node into costs
*
* @param partitionTraffic partition traffic in the same node
* @param partitionCost partition cost in the same node
* @param traffic the traffic would be converted
* @return the cost derived from traffic conversion
*/
private double convertTrafficToCost(
Map<TopicPartition, Double> partitionTraffic,
Map<TopicPartition, Double> partitionCost,
DataSize traffic) {
var trafficCost =
partitionTraffic.entrySet().stream()
.filter(e -> e.getValue() > 0.0)
.findFirst()
.map(e -> Map.entry(e.getValue(), partitionCost.get(e.getKey())))
.orElseThrow(
() ->
new NoSuchElementException(
"There is no available traffic, please confirm if the MBean has been retrieved"));
return traffic.bytes() / trafficCost.getKey() * trafficCost.getValue();
}

/**
* Group partitions of the same node together
*
* @param partitions all partitions in the cluster
* @param clusterInfo clusterInfo
* @return A Map with brokerId as the key and partitions as the value
*/
private Map<Integer, Map<TopicPartition, Double>> wrappedByNode(
Map<TopicPartition, Double> partitions, ClusterInfo clusterInfo) {
return clusterInfo
.replicaStream()
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.collect(
Collectors.groupingBy(
replica -> replica.nodeInfo().id(),
Collectors.toMap(
Replica::topicPartition, r -> partitions.get(r.topicPartition()))));
}
}
14 changes: 14 additions & 0 deletions common/src/test/java/org/astraea/common/ConfigurationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,18 @@ void testDuration() {
Assertions.assertEquals(Utils.toDuration("3s"), response.orElseThrow());
Assertions.assertTrue(empty.isEmpty());
}

@Test
void testDataSize() {
var config = Configuration.of(Map.of("upper.bound", "30MiB", "traffic.interval", "5MB"));
var upper = config.dataSize("upper.bound");
var interval = config.dataSize("traffic.interval");
var empty = config.dataSize("kekw");

Assertions.assertEquals(
DataRate.MiB.of(30).perSecond().dataSize().bytes(), upper.get().bytes());
Assertions.assertEquals(
DataRate.MB.of(5).perSecond().dataSize().bytes(), interval.get().bytes());
Assertions.assertTrue(empty.isEmpty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.DataRate;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfoBuilder;
import org.astraea.common.admin.Replica;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.broker.ServerMetrics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class NetworkIngressCostTest {

@Test
void testTwoNodesWithSameTopic() {
var clusterInfo =
ClusterInfoBuilder.builder()
.addNode(Set.of(1, 2))
.addFolders(
Map.of(1, Set.of("/folder0", "/folder1"), 2, Set.of("/folder0", "/folder1")))
.addTopic(
"a",
4,
(short) 1,
replica -> {
var size = 0;
if (replica.partition() == 0) size = 10;
else if (replica.partition() == 1) size = 40;
else if (replica.partition() == 2) size = 90;
else size = 60;
return Replica.builder(replica).size(size).build();
})
.build();
var clusterBean =
ClusterBean.of(
Map.of(
1,
List.of(
bandwidth(
ServerMetrics.Topic.BYTES_IN_PER_SEC,
"a",
DataRate.MB.of(60).perSecond().byteRate())),
2,
List.of(
bandwidth(
ServerMetrics.Topic.BYTES_IN_PER_SEC,
"a",
DataRate.MB.of(60).perSecond().byteRate()))));

var networkCost = new NetworkIngressCost(Configuration.EMPTY);
var partitionCost = networkCost.partitionCost(clusterInfo, clusterBean);
var tpBrokerId =
clusterInfo
.replicaStream()
.collect(Collectors.toMap(Replica::topicPartition, r -> r.nodeInfo().id()));

var incompatiblePartitions = partitionCost.incompatibility();
incompatiblePartitions.forEach(
(tp, set) -> {
Assertions.assertFalse(set.isEmpty());
set.forEach(p -> Assertions.assertEquals(tpBrokerId.get(tp), tpBrokerId.get(p)));
});
}

@Test
void testOneNodeWithMultipleTopics() {
var clusterInfo =
ClusterInfoBuilder.builder()
.addNode(Set.of(1))
.addFolders(Map.of(1, Set.of("/folder0", "/folder1")))
.addTopic(
"a",
2,
(short) 1,
replica -> {
var size = 0;
if (replica.partition() == 0) size = 20;
else size = 80;
return Replica.builder(replica).size(size).build();
})
.addTopic(
"b",
2,
(short) 1,
replica -> {
var size = 0;
if (replica.partition() == 0) size = 40;
else size = 60;
return Replica.builder(replica).size(size).build();
})
.build();
var clusterBean =
ClusterBean.of(
Map.of(
1,
List.of(
bandwidth(
ServerMetrics.Topic.BYTES_IN_PER_SEC,
"a",
DataRate.MB.of(100).perSecond().byteRate()),
bandwidth(
ServerMetrics.Topic.BYTES_IN_PER_SEC,
"b",
DataRate.MB.of(100).perSecond().byteRate()))));

var networkCost = new NetworkIngressCost(Configuration.EMPTY);
var partitionCost = networkCost.partitionCost(clusterInfo, clusterBean);

var incompatible = partitionCost.incompatibility();
System.out.println(incompatible);
incompatible.forEach(
(tp, set) -> {
if (tp.topic().equals("a") && tp.partition() == 0) Assertions.assertEquals(3, set.size());
else Assertions.assertEquals(1, set.size());
});
}

static ServerMetrics.Topic.Meter bandwidth(
ServerMetrics.Topic metric, String topic, double fifteenRate) {
var domainName = "kafka.server";
var properties =
Map.of("type", "BrokerTopicMetric", "topic", topic, "name", metric.metricName());
var attributes = Map.<String, Object>of("FifteenMinuteRate", fifteenRate);
return new ServerMetrics.Topic.Meter(new BeanObject(domainName, properties, attributes));
}
}