Skip to content

Commit

Permalink
Refactor broker topic metrics (opensource4you#521)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Jul 29, 2022
1 parent c7fb4a9 commit 381d913
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 312 deletions.
9 changes: 4 additions & 5 deletions app/src/main/java/org/astraea/app/cost/BrokerInputCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.util.stream.Collectors;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.broker.BrokerTopicMetricsResult;
import org.astraea.app.metrics.broker.ServerMetrics;
import org.astraea.app.metrics.collector.Fetcher;

public class BrokerInputCost implements HasBrokerCost {
Expand All @@ -35,14 +34,14 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
Collectors.toMap(
Map.Entry::getKey,
entry ->
KafkaMetrics.BrokerTopic.BytesInPerSec.of(entry.getValue()).stream()
.mapToDouble(BrokerTopicMetricsResult::oneMinuteRate)
ServerMetrics.Topic.BYTES_IN_PER_SEC.of(entry.getValue()).stream()
.mapToDouble(ServerMetrics.Topic.Meter::oneMinuteRate)
.sum()));
return () -> brokerCost;
}

@Override
public Optional<Fetcher> fetcher() {
return Optional.of(client -> List.of(KafkaMetrics.BrokerTopic.BytesInPerSec.fetch(client)));
return Optional.of(client -> List.of(ServerMetrics.Topic.BYTES_IN_PER_SEC.fetch(client)));
}
}
9 changes: 4 additions & 5 deletions app/src/main/java/org/astraea/app/cost/BrokerOutputCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.util.stream.Collectors;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.broker.BrokerTopicMetricsResult;
import org.astraea.app.metrics.broker.ServerMetrics;
import org.astraea.app.metrics.collector.Fetcher;

public class BrokerOutputCost implements HasBrokerCost {
Expand All @@ -36,14 +35,14 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
Collectors.toMap(
Map.Entry::getKey,
entry ->
KafkaMetrics.BrokerTopic.BytesOutPerSec.of(entry.getValue()).stream()
.mapToDouble(BrokerTopicMetricsResult::oneMinuteRate)
ServerMetrics.Topic.BYTES_OUT_PER_SEC.of(entry.getValue()).stream()
.mapToDouble(ServerMetrics.Topic.Meter::oneMinuteRate)
.sum()));
return () -> brokerCost;
}

@Override
public Optional<Fetcher> fetcher() {
return Optional.of(client -> List.of(KafkaMetrics.BrokerTopic.BytesOutPerSec.fetch(client)));
return Optional.of(client -> List.of(ServerMetrics.Topic.BYTES_OUT_PER_SEC.fetch(client)));
}
}
15 changes: 7 additions & 8 deletions app/src/main/java/org/astraea/app/cost/LoadCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.broker.BrokerTopicMetricsResult;
import org.astraea.app.metrics.broker.ServerMetrics;
import org.astraea.app.metrics.collector.Fetcher;
import org.astraea.app.partitioner.PartitionerUtils;

public class LoadCost implements HasBrokerCost {
private final Map<Integer, BrokerMetric> brokersMetric = new HashMap<>();
private final Map<String, Double> metricNameAndWeight =
Map.of(
KafkaMetrics.BrokerTopic.BytesInPerSec.metricName(),
ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(),
0.5,
KafkaMetrics.BrokerTopic.BytesOutPerSec.metricName(),
ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(),
0.5);

/** Do "Poisson" and "weightPoisson" calculation on "load". And change output to double. */
Expand Down Expand Up @@ -86,8 +85,8 @@ Map<Integer, Integer> computeLoad(Map<Integer, Collection<HasBeanObject>> allBea
(brokerID, value) -> {
if (!brokersMetric.containsKey(brokerID)) brokersMetric.put(brokerID, new BrokerMetric());
value.stream()
.filter(hasBeanObject -> hasBeanObject instanceof BrokerTopicMetricsResult)
.map(hasBeanObject -> (BrokerTopicMetricsResult) hasBeanObject)
.filter(hasBeanObject -> hasBeanObject instanceof ServerMetrics.Topic.Meter)
.map(hasBeanObject -> (ServerMetrics.Topic.Meter) hasBeanObject)
.forEach(
result ->
brokersMetric
Expand Down Expand Up @@ -151,8 +150,8 @@ public Optional<Fetcher> fetcher() {
return Optional.of(
client ->
List.of(
KafkaMetrics.BrokerTopic.BytesInPerSec.fetch(client),
KafkaMetrics.BrokerTopic.BytesOutPerSec.fetch(client)));
ServerMetrics.Topic.BYTES_IN_PER_SEC.fetch(client),
ServerMetrics.Topic.BYTES_OUT_PER_SEC.fetch(client)));
}

private static class BrokerMetric {
Expand Down
124 changes: 0 additions & 124 deletions app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.app.metrics.broker.BrokerTopicMetricsResult;
import org.astraea.app.metrics.broker.HasValue;
import org.astraea.app.metrics.broker.TotalTimeMs;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
Expand All @@ -31,129 +30,6 @@ public final class KafkaMetrics {

private KafkaMetrics() {}

public enum BrokerTopic {
/** Message validation failure rate due to non-continuous offset or sequence number in batch */
InvalidOffsetOrSequenceRecordsPerSec("InvalidOffsetOrSequenceRecordsPerSec"),

/** Message validation failure rate due to incorrect crc checksum */
InvalidMessageCrcRecordsPerSec("InvalidMessageCrcRecordsPerSec"),

FetchMessageConversionsPerSec("FetchMessageConversionsPerSec"),

BytesRejectedPerSec("BytesRejectedPerSec"),

/** Message in rate */
MessagesInPerSec("MessagesInPerSec"),

/** Incoming byte rate of reassignment traffic */
ReassignmentBytesInPerSec("ReassignmentBytesInPerSec"),

FailedFetchRequestsPerSec("FailedFetchRequestsPerSec"),

/** Byte in rate from other brokers */
ReplicationBytesInPerSec("ReplicationBytesInPerSec"),

/** Message validation failure rate due to no key specified for compacted topic */
NoKeyCompactedTopicRecordsPerSec("NoKeyCompactedTopicRecordsPerSec"),

TotalFetchRequestsPerSec("TotalFetchRequestsPerSec"),

FailedProduceRequestsPerSec("FailedProduceRequestsPerSec"),

/** Byte in rate from clients */
BytesInPerSec("BytesInPerSec"),

TotalProduceRequestsPerSec("TotalProduceRequestsPerSec"),

/** Message validation failure rate due to invalid magic number */
InvalidMagicNumberRecordsPerSec("InvalidMagicNumberRecordsPerSec"),

/** Outgoing byte rate of reassignment traffic */
ReassignmentBytesOutPerSec("ReassignmentBytesOutPerSec"),

/** Bytes in rate from other brokers */
ReplicationBytesOutPerSec("ReplicationBytesOutPerSec"),

ProduceMessageConversionsPerSec("ProduceMessageConversionsPerSec"),

/** Byte out rate to clients. */
BytesOutPerSec("BytesOutPerSec");

private final String metricName;

BrokerTopic(String name) {
this.metricName = name;
}

public String metricName() {
return metricName;
}

/**
* find out the objects related to this metrics.
*
* @param objects to search
* @return collection of BrokerTopicMetricsResult, or empty if all objects are not related to
* this metrics
*/
public Collection<BrokerTopicMetricsResult> of(Collection<HasBeanObject> objects) {
return objects.stream()
.filter(o -> o instanceof BrokerTopicMetricsResult)
.filter(o -> metricName().equals(o.beanObject().properties().get("name")))
.map(o -> (BrokerTopicMetricsResult) o)
.collect(Collectors.toUnmodifiableList());
}

public BrokerTopicMetricsResult fetch(MBeanClient mBeanClient) {
return new BrokerTopicMetricsResult(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.property("type", "BrokerTopicMetrics")
.property("name", this.metricName())
.build()));
}

/**
* resolve specific {@link BrokerTopic} by the given metric string, compare by case insensitive
*
* @param metricName the metric to resolve
* @return a {@link BrokerTopic} match to give metric name
*/
public static BrokerTopic of(String metricName) {
return Arrays.stream(BrokerTopic.values())
.filter(metric -> metric.metricName().equalsIgnoreCase(metricName))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No such metric: " + metricName));
}

public static long linuxDiskReadBytes(MBeanClient mBeanClient) {
return (long)
mBeanClient
.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.property("type", "KafkaServer")
.property("name", "linux-disk-read-bytes")
.build())
.attributes()
.get("Value");
}

public static long linuxDiskWriteBytes(MBeanClient mBeanClient) {
return (long)
mBeanClient
.queryBean(
BeanQuery.builder()
.domainName("kafka.server")
.property("type", "KafkaServer")
.property("name", "linux-disk-write-bytes")
.build())
.attributes()
.get("Value");
}
}

public enum Request {
Produce,
FetchConsumer,
Expand Down

This file was deleted.

Loading

0 comments on commit 381d913

Please sign in to comment.