Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix node metrics #527

Merged
merged 2 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.stream.Collectors;
import org.astraea.app.admin.ClusterBean;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.collector.Fetcher;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
Expand All @@ -32,26 +31,26 @@ public class NodeLatencyCost implements HasBrokerCost {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var result =
clusterBean.all().entrySet().stream()
clusterBean.all().values().stream()
.flatMap(Collection::stream)
.filter(b -> b instanceof HasProducerNodeMetrics)
.map(b -> (HasProducerNodeMetrics) b)
.filter(b -> !Double.isNaN(b.requestLatencyAvg()))
.collect(Collectors.groupingBy(HasProducerNodeMetrics::brokerId))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.filter(b -> b instanceof HasProducerNodeMetrics)
.map(b -> (HasProducerNodeMetrics) b)
.mapToDouble(HasProducerNodeMetrics::requestLatencyAvg)
.sum()));
return () -> result;
}

@Override
public Optional<Fetcher> fetcher() {
return Optional.of(
client ->
KafkaMetrics.Producer.nodes(client).values().stream()
.flatMap(Collection::stream)
.map(b -> (HasBeanObject) b)
.collect(Collectors.toUnmodifiableList()));
return Optional.of(KafkaMetrics.Producer::nodes);
}
}
19 changes: 10 additions & 9 deletions app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ public class NodeThroughputCost implements HasBrokerCost {
@Override
public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
var result =
clusterBean.all().entrySet().stream()
clusterBean.all().values().stream()
.flatMap(Collection::stream)
.filter(b -> b instanceof HasProducerNodeMetrics)
.map(b -> (HasProducerNodeMetrics) b)
.filter(b -> !Double.isNaN(b.incomingByteRate()) && !Double.isNaN(b.outgoingByteRate()))
.collect(Collectors.groupingBy(HasProducerNodeMetrics::brokerId))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.filter(b -> b instanceof HasProducerNodeMetrics)
.map(b -> (HasProducerNodeMetrics) b)
.mapToDouble(b -> b.incomingByteRate() + b.outgoingByteRate())
.mapToDouble(m -> m.incomingByteRate() + m.outgoingByteRate())
.sum()));
return () -> result;
}

@Override
public Optional<Fetcher> fetcher() {
return Optional.of(
client ->
KafkaMetrics.Producer.nodes(client).values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableList()));
return Optional.of(KafkaMetrics.Producer::nodes);
}
}
25 changes: 10 additions & 15 deletions app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.app.metrics.broker.HasValue;
import org.astraea.app.metrics.broker.TotalTimeMs;
Expand Down Expand Up @@ -88,6 +87,10 @@ public Collection<HasBeanObject> fetch(MBeanClient mBeanClient) {

public static final class Producer {

private static int brokerId(String node) {
return Integer.parseInt(node.substring(node.indexOf("-") + 1));
}

private Producer() {}

/**
Expand All @@ -108,7 +111,9 @@ public static Map<String, HasProducerNodeMetrics> node(MBeanClient mBeanClient,
.build())
.stream()
.collect(
Collectors.toUnmodifiableMap(b -> b.properties().get("client-id"), b -> () -> b));
Collectors.toUnmodifiableMap(
b -> b.properties().get("client-id"),
b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id")))));
}

/**
Expand All @@ -117,9 +122,7 @@ public static Map<String, HasProducerNodeMetrics> node(MBeanClient mBeanClient,
* @param mBeanClient to query metrics
* @return key is broker id, and value is associated to broker metrics recorded by all producers
*/
public static Map<Integer, Collection<HasProducerNodeMetrics>> nodes(MBeanClient mBeanClient) {
Function<String, Integer> brokerId =
string -> Integer.parseInt(string.substring(string.indexOf("-") + 1));
public static Collection<HasProducerNodeMetrics> nodes(MBeanClient mBeanClient) {
return mBeanClient
.queryBeans(
BeanQuery.builder()
Expand All @@ -129,16 +132,8 @@ public static Map<Integer, Collection<HasProducerNodeMetrics>> nodes(MBeanClient
.property("client-id", "*")
.build())
.stream()
.collect(Collectors.groupingBy(b -> brokerId.apply(b.properties().get("node-id"))))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
e.getValue().stream()
.map(b -> (HasProducerNodeMetrics) (() -> b))
.collect(Collectors.toUnmodifiableList())));
.map(b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id"))))
.collect(Collectors.toUnmodifiableList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,27 @@
*/
package org.astraea.app.metrics.producer;

import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.HasBeanObject;

public interface HasProducerNodeMetrics extends HasBeanObject {

static HasProducerNodeMetrics of(BeanObject beanObject, int brokerId) {
return new HasProducerNodeMetrics() {
@Override
public int brokerId() {
return brokerId;
}

@Override
public BeanObject beanObject() {
return beanObject;
}
};
}

int brokerId();

default double incomingByteRate() {
return (double) beanObject().attributes().get("incoming-byte-rate");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public int partition(String topic, byte[] key, byte[] value, ClusterInfo cluster
.filter(r -> r.nodeInfo().id() == brokerId)
.map(ReplicaInfo::partition)
.findAny())
.orElse(0);
.orElse(partitionLeaders.get((int) (Math.random() * partitionLeaders.size())).partition());
}

void tryToUpdateRoundRobin(ClusterInfo clusterInfo) {
Expand Down
31 changes: 27 additions & 4 deletions app/src/test/java/org/astraea/app/cost/NodeLatencyCostTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.KafkaMetrics;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.producer.HasProducerNodeMetrics;
import org.astraea.app.producer.Producer;
import org.astraea.app.service.RequireBrokerCluster;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,6 +37,29 @@

public class NodeLatencyCostTest extends RequireBrokerCluster {

@Test
void testNan() {
var bean = Mockito.mock(HasProducerNodeMetrics.class);
Mockito.when(bean.brokerId()).thenReturn(1);
Mockito.when(bean.requestLatencyAvg()).thenReturn(Double.NaN);
var clusterBean = ClusterBean.of(Map.of(-1, List.of(bean)));
var function = new NodeLatencyCost();
var result = function.brokerCost(Mockito.mock(ClusterInfo.class), clusterBean);
Assertions.assertEquals(0, result.value().size());
}

@Test
void testBrokerId() {
var bean = Mockito.mock(HasProducerNodeMetrics.class);
Mockito.when(bean.brokerId()).thenReturn(1);
Mockito.when(bean.requestLatencyAvg()).thenReturn(10D);
var clusterBean = ClusterBean.of(Map.of(-1, List.of(bean)));
var function = new NodeLatencyCost();
var result = function.brokerCost(Mockito.mock(ClusterInfo.class), clusterBean);
Assertions.assertEquals(1, result.value().size());
Assertions.assertEquals(10D, result.value().get(1));
}

@Test
void testCost() {
var brokerId = brokerIds().iterator().next();
Expand All @@ -47,19 +71,18 @@ void testCost() {
producer.sender().topic(Utils.randomString(10)).value(new byte[100]).run();
producer.flush();

var beans = KafkaMetrics.Producer.node(MBeanClient.local(), brokerId);
var beans = KafkaMetrics.Producer.nodes(MBeanClient.local());
var clusterBean =
ClusterBean.of(
Map.of(
brokerId,
beans.values().stream()
-1,
beans.stream()
.map(b -> (HasBeanObject) b)
.collect(Collectors.toUnmodifiableList())));
var clusterInfo = Mockito.mock(ClusterInfo.class);
var function = new NodeLatencyCost();
var cost = function.brokerCost(clusterInfo, clusterBean);
Assertions.assertEquals(1, cost.value().size());
Assertions.assertNotNull(cost.value().get(brokerId));
}
}

Expand Down
31 changes: 29 additions & 2 deletions app/src/test/java/org/astraea/app/cost/NodeThroughputCostTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,47 @@

public class NodeThroughputCostTest {

@Test
void testNan() {
var bean = Mockito.mock(HasProducerNodeMetrics.class);
Mockito.when(bean.brokerId()).thenReturn(1);
Mockito.when(bean.incomingByteRate()).thenReturn(Double.NaN);
Mockito.when(bean.outgoingByteRate()).thenReturn(Double.NaN);
var clusterBean = ClusterBean.of(Map.of(-1, List.of(bean)));
var function = new NodeThroughputCost();
var result = function.brokerCost(Mockito.mock(ClusterInfo.class), clusterBean);
Assertions.assertEquals(0, result.value().size());
}

@Test
void testBrokerId() {
var bean = Mockito.mock(HasProducerNodeMetrics.class);
Mockito.when(bean.brokerId()).thenReturn(1);
Mockito.when(bean.incomingByteRate()).thenReturn(10D);
Mockito.when(bean.outgoingByteRate()).thenReturn(10D);
var clusterBean = ClusterBean.of(Map.of(-1, List.of(bean)));
var function = new NodeThroughputCost();
var result = function.brokerCost(Mockito.mock(ClusterInfo.class), clusterBean);
Assertions.assertEquals(1, result.value().size());
Assertions.assertEquals(20D, result.value().get(1));
}

@Test
void testCost() {
var throughputCost = new NodeThroughputCost();
var bean0 = Mockito.mock(HasProducerNodeMetrics.class);
Mockito.when(bean0.incomingByteRate()).thenReturn(10D);
Mockito.when(bean0.outgoingByteRate()).thenReturn(20D);
Mockito.when(bean0.brokerId()).thenReturn(10);
var bean1 = Mockito.mock(HasProducerNodeMetrics.class);
Mockito.when(bean1.incomingByteRate()).thenReturn(2D);
Mockito.when(bean1.outgoingByteRate()).thenReturn(3D);
Mockito.when(bean1.brokerId()).thenReturn(11);
var clusterBean = ClusterBean.of(Map.of(0, List.of(bean0), 1, List.of(bean1)));
var clusterInfo = Mockito.mock(ClusterInfo.class);
var cost = throughputCost.brokerCost(clusterInfo, clusterBean);
Assertions.assertEquals(30D, cost.value().get(0));
Assertions.assertEquals(5D, cost.value().get(1));
Assertions.assertEquals(30D, cost.value().get(10));
Assertions.assertEquals(5D, cost.value().get(11));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.astraea.app.metrics.producer;

import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.astraea.app.admin.Admin;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.common.Utils;
Expand Down Expand Up @@ -80,10 +80,12 @@ void testMultiBrokers() throws ExecutionException, InterruptedException {
.get();
var metrics = KafkaMetrics.Producer.nodes(MBeanClient.local());
Assertions.assertNotEquals(1, metrics.size());
Assertions.assertTrue(metrics.keySet().containsAll(brokerIds()));
metrics.values().stream()
.flatMap(Collection::stream)
.forEach(HasProducerNodeMetricsTest::check);
Assertions.assertTrue(
metrics.stream()
.map(HasProducerNodeMetrics::brokerId)
.collect(Collectors.toUnmodifiableList())
.containsAll(brokerIds()));
metrics.forEach(HasProducerNodeMetricsTest::check);
}
}

Expand Down