-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log size partition cost #569
Changes from 5 commits
65daa67
af8c7fe
424ee1d
5cb8961
ece8f89
7ad1be3
83c04bd
744b38a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,16 +16,21 @@ | |
*/ | ||
package org.astraea.app.cost; | ||
|
||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import org.astraea.app.admin.ClusterBean; | ||
import org.astraea.app.admin.ClusterInfo; | ||
import org.astraea.app.admin.ReplicaInfo; | ||
import org.astraea.app.admin.TopicPartition; | ||
import org.astraea.app.metrics.HasBeanObject; | ||
import org.astraea.app.metrics.broker.HasValue; | ||
import org.astraea.app.metrics.broker.LogMetrics; | ||
import org.astraea.app.metrics.collector.Fetcher; | ||
|
||
public class NodeTopicSizeCost implements HasBrokerCost { | ||
|
||
public class NodeTopicSizeCost implements HasBrokerCost, HasPartitionCost { | ||
@Override | ||
public Optional<Fetcher> fetcher() { | ||
return Optional.of(LogMetrics.Log.SIZE::fetch); | ||
|
@@ -48,4 +53,55 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { | |
.sum())); | ||
return () -> result; | ||
} | ||
|
||
@Override | ||
public PartitionCost partitionCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { | ||
return new PartitionCost() { | ||
@Override | ||
public Map<TopicPartition, Double> value(String topic) { | ||
return clusterBean.mapByPartition().entrySet().stream() | ||
.filter( | ||
topicPartitionCollectionEntry -> | ||
topicPartitionCollectionEntry.getKey().topic().equals(topic)) | ||
.collect( | ||
Collectors.toMap( | ||
wycccccc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Map.Entry::getKey, | ||
topicPartitionCollectionEntry -> | ||
toDouble.apply(topicPartitionCollectionEntry))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
|
||
@Override | ||
public Map<TopicPartition, Double> value(int brokerId) { | ||
var targetPartitions = | ||
clusterInfo.topics().stream() | ||
.collect( | ||
Collectors.toMap( | ||
topic -> topic, | ||
topic -> | ||
clusterInfo.availableReplicas(topic).stream() | ||
.filter(replicaInfo -> replicaInfo.nodeInfo().id() == brokerId) | ||
.map(ReplicaInfo::partition) | ||
.collect(Collectors.toSet()))); | ||
|
||
return clusterBean.mapByPartition().entrySet().stream() | ||
.filter( | ||
topicPartitionCollectionEntry -> | ||
targetPartitions | ||
.get(topicPartitionCollectionEntry.getKey().topic()) | ||
.contains(topicPartitionCollectionEntry.getKey().partition())) | ||
.collect( | ||
Collectors.toMap( | ||
Map.Entry::getKey, | ||
topicPartitionCollectionEntry -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 這一段也是重複的程式碼: Function<Map.Entry<TopicPartition, Collection<HasBeanObject>>, Double> toDouble =
e ->
LogMetrics.Log.meters(e.getValue(), LogMetrics.Log.SIZE).stream()
.mapToDouble(HasValue::value)
.findAny()
.orElse(0.0D); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 太不敏感了,已修正。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
toDouble.apply(topicPartitionCollectionEntry))); | ||
} | ||
}; | ||
} | ||
|
||
Function<Map.Entry<TopicPartition, Collection<HasBeanObject>>, Double> toDouble = | ||
e -> | ||
LogMetrics.Log.meters(e.getValue(), LogMetrics.Log.SIZE).stream() | ||
.mapToDouble(HasValue::value) | ||
.findAny() | ||
.orElse(0.0D); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
這邊用
mapByReplica
比較適合,因為mapByReplica
才會針對所有的replica去撈資料,mapByPartition
一個partition只會撈一筆而已,可能不會考慮到replica還沒sync完成的狀態There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
因爲
mapByReplica
的存在,我一開始認爲mapByPartition
會回傳的是leader partition。但看了一下實做發現並不是。已將partitionCost
修改爲該方法應該有的樣子。另外我感覺
mapByPartition
的設計似乎缺少明確的意義,它似乎只是一個劣化版的mapByReplica
,不如把它修改成回傳leader partition 或者直接移除。There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同意,
mapByPartition
似乎可以被mapByReplica
所取代@qoo332001 你覺得呢?