Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log size partition cost #569

Merged
merged 8 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion app/src/main/java/org/astraea/app/cost/NodeTopicSizeCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.util.stream.Collectors;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.ReplicaInfo;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.metrics.broker.LogMetrics;
import org.astraea.app.metrics.collector.Fetcher;

public class NodeTopicSizeCost implements HasBrokerCost {
public class NodeTopicSizeCost implements HasBrokerCost, HasPartitionCost {

@Override
public Optional<Fetcher> fetcher() {
Expand All @@ -48,4 +50,60 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
.sum()));
return () -> result;
}

@Override
public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return new PartitionCost() {
@Override
public Map<TopicPartition, Double> value(String topic) {
return clusterBean.mapByPartition().entrySet().stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊用mapByReplica比較適合,因為mapByReplica才會針對所有的replica去撈資料,mapByPartition一個partition只會撈一筆而已,可能不會考慮到replica還沒sync完成的狀態

Copy link
Collaborator Author

@wycccccc wycccccc Aug 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因爲mapByReplica的存在,我一開始認爲mapByPartition會回傳的是leader partition。但看了一下實做發現並不是。已將partitionCost修改爲該方法應該有的樣子。
另外我感覺mapByPartition的設計似乎缺少明確的意義,它似乎只是一個劣化版的mapByReplica,不如把它修改成回傳leader partition 或者直接移除。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外我感覺mapByPartition的設計似乎缺少明確的意義,它似乎只是一個劣化版的mapByReplica,不如把它修改成回傳leader partition 或者直接移除。

同意,mapByPartition似乎可以被mapByReplica所取代

@qoo332001 你覺得呢?

.filter(
topicPartitionReplicaCollectionEntry ->
topicPartitionReplicaCollectionEntry.getKey().topic().equals(topic))
.collect(
Collectors.toMap(
wycccccc marked this conversation as resolved.
Show resolved Hide resolved
Map.Entry::getKey,
topicPartitionCollectionEntry -> {
var meter =
(LogMetrics.Log.Meter)
topicPartitionCollectionEntry.getValue().stream()
.findAny()
.orElse(null);
return meter != null ? (double) meter.value() : 0.0;
}));
}

@Override
public Map<TopicPartition, Double> value(int brokerId) {
var targetPartitions =
clusterInfo.topics().stream()
.collect(
Collectors.toMap(
topic -> topic,
topic ->
clusterInfo.availableReplicas(topic).stream()
.filter(replicaInfo -> replicaInfo.nodeInfo().id() == brokerId)
.map(ReplicaInfo::partition)
.collect(Collectors.toSet())));

return clusterBean.mapByPartition().entrySet().stream()
.filter(
topicPartitionCollectionEntry ->
targetPartitions
.get(topicPartitionCollectionEntry.getKey().topic())
.contains(topicPartitionCollectionEntry.getKey().partition()))
.collect(
Collectors.toMap(
Map.Entry::getKey,
topicPartitionCollectionEntry -> {
var meter =
(LogMetrics.Log.Meter)
topicPartitionCollectionEntry.getValue().stream()
.findAny()
.orElse(null);
return meter != null ? (double) meter.value() : 0.0;
}));
}
};
}
}
31 changes: 24 additions & 7 deletions app/src/test/java/org/astraea/app/cost/NodeTopicSizeCostTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,47 @@
*/
package org.astraea.app.cost;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
import org.astraea.app.admin.ReplicaInfo;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.broker.LogMetrics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class NodeTopicSizeCostTest {
private final BeanObject bean =
new BeanObject(
"domain", Map.of("topic", "t", "partition", "10", "name", "SIZE"), Map.of("Value", 777));

@Test
void testCost() {
var bean =
new BeanObject(
"domain",
Map.of("topic", "t", "partition", "10", "name", "SIZE"),
Map.of("Value", 777));
void testBrokerCost() {
var meter = new LogMetrics.Log.Meter(bean);
var cost = new NodeTopicSizeCost();
var result =
cost.brokerCost(Mockito.mock(ClusterInfo.class), ClusterBean.of(Map.of(1, List.of(meter))));
cost.brokerCost(mock(ClusterInfo.class), ClusterBean.of(Map.of(1, List.of(meter))));
Assertions.assertEquals(1, result.value().size());
Assertions.assertEquals(777, result.value().entrySet().iterator().next().getValue());
}

@Test
void testPartitionCost() {
var meter = new LogMetrics.Log.Meter(bean);
var cost = new NodeTopicSizeCost();
var clusterInfo = Mockito.mock(ClusterInfo.class);
when(clusterInfo.topics()).thenReturn(Set.of("t"));
when(clusterInfo.availableReplicas("t"))
.thenReturn(List.of(ReplicaInfo.of("t", 10, NodeInfo.of(0, "0", 0), true, false, false)));
var result = cost.partitionCost(clusterInfo, ClusterBean.of(Map.of(1, List.of(meter))));
Assertions.assertEquals(1, result.value(0).size());
Assertions.assertEquals(777, result.value(0).entrySet().iterator().next().getValue());
}
}