Skip to content

Commit

Permalink
Refactor producer metrics (opensource4you#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Jul 31, 2022
1 parent c68fbdd commit 5ff8628
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 88 deletions.
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.stream.Collectors;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.collector.Fetcher;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
import org.astraea.app.metrics.producer.ProducerMetrics;

public class NodeLatencyCost implements HasBrokerCost {

Expand Down Expand Up @@ -56,6 +56,6 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {

@Override
public Optional<Fetcher> fetcher() {
return Optional.of(KafkaMetrics.Producer::nodes);
return Optional.of(ProducerMetrics::nodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.stream.Collectors;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.collector.Fetcher;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
import org.astraea.app.metrics.producer.ProducerMetrics;

public class NodeThroughputCost implements HasBrokerCost {

Expand Down Expand Up @@ -56,6 +56,6 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {

@Override
public Optional<Fetcher> fetcher() {
return Optional.of(KafkaMetrics.Producer::nodes);
return Optional.of(ProducerMetrics::nodes);
}
}
77 changes: 0 additions & 77 deletions app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import org.astraea.app.metrics.broker.HasValue;
import org.astraea.app.metrics.broker.TotalTimeMs;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
import org.astraea.app.metrics.producer.HasProducerTopicMetrics;

public final class KafkaMetrics {

Expand Down Expand Up @@ -84,78 +81,4 @@ public Collection<HasBeanObject> fetch(MBeanClient mBeanClient) {
.collect(Collectors.toUnmodifiableList());
}
}

public static final class Producer {

private static int brokerId(String node) {
return Integer.parseInt(node.substring(node.indexOf("-") + 1));
}

private Producer() {}

/**
* node metrics traced by producer
*
* @param mBeanClient to query beans
* @param brokerId broker ids
* @return key is client id used by producer, and value is node metrics traced by each producer
*/
public static Map<String, HasProducerNodeMetrics> node(MBeanClient mBeanClient, int brokerId) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.producer")
.property("type", "producer-node-metrics")
.property("node-id", "node-" + brokerId)
.property("client-id", "*")
.build())
.stream()
.collect(
Collectors.toUnmodifiableMap(
b -> b.properties().get("client-id"),
b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id")))));
}

/**
* collect HasProducerNodeMetrics from all producers.
*
* @param mBeanClient to query metrics
* @return key is broker id, and value is associated to broker metrics recorded by all producers
*/
public static Collection<HasProducerNodeMetrics> nodes(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.producer")
.property("type", "producer-node-metrics")
.property("node-id", "*")
.property("client-id", "*")
.build())
.stream()
.map(b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id"))))
.collect(Collectors.toUnmodifiableList());
}

/**
* topic metrics traced by producer
*
* @param mBeanClient to query beans
* @param topic topic name
* @return key is client id used by producer, and value is topic metrics traced by each producer
*/
public static Map<String, HasProducerTopicMetrics> topic(
MBeanClient mBeanClient, String topic) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.producer")
.property("type", "producer-topic-metrics")
.property("client-id", "*")
.property("topic", topic)
.build())
.stream()
.collect(
Collectors.toUnmodifiableMap(b -> b.properties().get("client-id"), b -> () -> b));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.metrics.producer;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import org.astraea.app.metrics.BeanQuery;
import org.astraea.app.metrics.MBeanClient;

public final class ProducerMetrics {

private static int brokerId(String node) {
return Integer.parseInt(node.substring(node.indexOf("-") + 1));
}

/**
* node metrics traced by producer
*
* @param mBeanClient to query beans
* @param brokerId broker ids
* @return key is client id used by producer, and value is node metrics traced by each producer
*/
public static Map<String, HasProducerNodeMetrics> node(MBeanClient mBeanClient, int brokerId) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.producer")
.property("type", "producer-node-metrics")
.property("node-id", "node-" + brokerId)
.property("client-id", "*")
.build())
.stream()
.collect(
Collectors.toUnmodifiableMap(
b -> b.properties().get("client-id"),
b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id")))));
}

/**
* collect HasProducerNodeMetrics from all producers.
*
* @param mBeanClient to query metrics
* @return key is broker id, and value is associated to broker metrics recorded by all producers
*/
public static Collection<HasProducerNodeMetrics> nodes(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.producer")
.property("type", "producer-node-metrics")
.property("node-id", "*")
.property("client-id", "*")
.build())
.stream()
.map(b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id"))))
.collect(Collectors.toUnmodifiableList());
}

/**
* topic metrics traced by producer
*
* @param mBeanClient to query beans
* @param topic topic name
* @return key is client id used by producer, and value is topic metrics traced by each producer
*/
public static Map<String, HasProducerTopicMetrics> topic(MBeanClient mBeanClient, String topic) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
.domainName("kafka.producer")
.property("type", "producer-topic-metrics")
.property("client-id", "*")
.property("topic", topic)
.build())
.stream()
.collect(Collectors.toUnmodifiableMap(b -> b.properties().get("client-id"), b -> () -> b));
}

private ProducerMetrics() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
import org.astraea.app.metrics.producer.ProducerMetrics;
import org.astraea.app.producer.Producer;
import org.astraea.app.service.RequireBrokerCluster;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -71,7 +71,7 @@ void testCost() {
producer.sender().topic(Utils.randomString(10)).value(new byte[100]).run();
producer.flush();

var beans = KafkaMetrics.Producer.nodes(MBeanClient.local());
var beans = ProducerMetrics.nodes(MBeanClient.local());
var clusterBean =
ClusterBean.of(
Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.astraea.app.admin.Admin;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.producer.Producer;
import org.astraea.app.service.RequireBrokerCluster;
Expand All @@ -41,7 +40,7 @@ void testSingleBroker() throws ExecutionException, InterruptedException {
Utils.sleep(Duration.ofSeconds(3));
var owner = admin.replicas(Set.of(topic)).get(TopicPartition.of(topic, 0)).get(0).broker();
producer.sender().topic(topic).run().toCompletableFuture().get();
var metrics = KafkaMetrics.Producer.node(MBeanClient.local(), owner);
var metrics = ProducerMetrics.node(MBeanClient.local(), owner);
Assertions.assertEquals(1, metrics.size());
check(metrics.get("producer-1"));
}
Expand Down Expand Up @@ -78,7 +77,7 @@ void testMultiBrokers() throws ExecutionException, InterruptedException {
.run()
.toCompletableFuture()
.get();
var metrics = KafkaMetrics.Producer.nodes(MBeanClient.local());
var metrics = ProducerMetrics.nodes(MBeanClient.local());
Assertions.assertNotEquals(1, metrics.size());
Assertions.assertTrue(
metrics.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.concurrent.ExecutionException;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.producer.Producer;
import org.astraea.app.service.RequireSingleBrokerCluster;
Expand All @@ -32,7 +31,7 @@ void testAttributes() throws ExecutionException, InterruptedException {
var topic = Utils.randomString(10);
try (var producer = Producer.of(bootstrapServers())) {
producer.sender().topic(topic).run().toCompletableFuture().get();
var metrics = KafkaMetrics.Producer.topic(MBeanClient.local(), topic);
var metrics = ProducerMetrics.topic(MBeanClient.local(), topic);
Assertions.assertEquals(1, metrics.size());
var producerTopicMetrics = metrics.get("producer-1");
Assertions.assertNotEquals(0D, producerTopicMetrics.byteRate());
Expand Down

0 comments on commit 5ff8628

Please sign in to comment.