Skip to content

Commit

Permalink
Show real outgoing bytes (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang authored Aug 24, 2022
1 parent a562cba commit e98bd3b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
32 changes: 32 additions & 0 deletions app/src/main/java/org/astraea/app/performance/TrackerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
package org.astraea.app.performance;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import org.astraea.app.common.DataSize;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.client.HasNodeMetrics;
import org.astraea.app.metrics.client.producer.ProducerMetrics;

/** Print out the given metrics. */
public interface TrackerThread extends AbstractThread {
Expand All @@ -33,6 +39,7 @@ static TrackerThread create(
Supplier<List<ConsumerThread.Report>> consumerReporter,
ExeTime exeTime) {
var start = System.currentTimeMillis() - Duration.ofSeconds(1).toMillis();
var mBeanClient = MBeanClient.local();

Function<Duration, Boolean> logProducers =
duration -> {
Expand All @@ -47,6 +54,13 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n",
Utils.averageMB(
duration, producerReports.stream().mapToLong(Report::totalBytes).sum()));
System.out.printf(
" current traffic: %s/second%n",
DataSize.Byte.of(
((Double)
sumOfAttribute(
ProducerMetrics.nodes(mBeanClient), HasNodeMetrics::outgoingByteRate))
.longValue()));
producerReports.stream()
.mapToLong(Report::max)
.max()
Expand Down Expand Up @@ -87,6 +101,13 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n",
Utils.averageMB(
duration, consumerReports.stream().mapToLong(Report::totalBytes).sum()));
System.out.printf(
" current traffic: %s/second%n",
DataSize.Byte.of(
((Double)
sumOfAttribute(
ProducerMetrics.nodes(mBeanClient), HasNodeMetrics::incomingByteRate))
.longValue()));
consumerReports.stream()
.mapToLong(Report::max)
.max()
Expand Down Expand Up @@ -168,9 +189,20 @@ public boolean closed() {
public void close() {
closed.set(true);
waitForDone();
Utils.swallowException(mBeanClient::close);
}
};
}
/**
* Sum up the latest given attribute of all beans which is instance of HasNodeMetrics.
*
* @param mbeans mBeans fetched by the receivers
* @return sum of the latest given attribute of all beans which is instance of HasNodeMetrics.
*/
static double sumOfAttribute(
Collection<HasNodeMetrics> mbeans, ToDoubleFunction<HasNodeMetrics> targetAttribute) {
return mbeans.stream().mapToDouble(targetAttribute).filter(d -> !Double.isNaN(d)).sum();
}

long startTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.Admin;
Expand All @@ -40,8 +39,6 @@ void testTransactionalProducer() {
String[] arguments1 = {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--transaction.size", "2"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertTrue(producer.transactional());
Expand All @@ -55,7 +52,6 @@ void testProducerExecutor() throws InterruptedException {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--compression", "gzip"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertFalse(producer.transactional());
Expand Down
16 changes: 16 additions & 0 deletions app/src/test/java/org/astraea/app/performance/TrackerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import java.time.Duration;
import java.util.List;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.client.HasNodeMetrics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TrackerTest {

Expand Down Expand Up @@ -73,4 +75,18 @@ void testConsumerAndProducer() {
Utils.sleep(Duration.ofSeconds(2));
Assertions.assertTrue(tracker.closed());
}

@Test
void testSumOfAttribute() {
var hasNodeMetrics = Mockito.mock(HasNodeMetrics.class);
var hasNodeMetrics2 = Mockito.mock(HasNodeMetrics.class);
Mockito.when(hasNodeMetrics.incomingByteTotal()).thenReturn(2D);
Mockito.when(hasNodeMetrics2.incomingByteTotal()).thenReturn(3D);
Mockito.when(hasNodeMetrics.createdTimestamp()).thenReturn(System.currentTimeMillis());
Mockito.when(hasNodeMetrics2.createdTimestamp()).thenReturn(System.currentTimeMillis());
Assertions.assertEquals(
5D,
TrackerThread.sumOfAttribute(
List.of(hasNodeMetrics, hasNodeMetrics2), HasNodeMetrics::incomingByteTotal));
}
}

0 comments on commit e98bd3b

Please sign in to comment.