diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 96284698a4..f1ac840f34 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -17,13 +17,19 @@ package org.astraea.app.performance; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import org.astraea.app.common.DataSize; import org.astraea.app.common.Utils; +import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.client.HasNodeMetrics; +import org.astraea.app.metrics.client.producer.ProducerMetrics; /** Print out the given metrics. */ public interface TrackerThread extends AbstractThread { @@ -33,6 +39,7 @@ static TrackerThread create( Supplier> consumerReporter, ExeTime exeTime) { var start = System.currentTimeMillis() - Duration.ofSeconds(1).toMillis(); + var mBeanClient = MBeanClient.local(); Function logProducers = duration -> { @@ -47,6 +54,13 @@ static TrackerThread create( " average throughput: %.3f MB/second%n", Utils.averageMB( duration, producerReports.stream().mapToLong(Report::totalBytes).sum())); + System.out.printf( + " current traffic: %s/second%n", + DataSize.Byte.of( + ((Double) + sumOfAttribute( + ProducerMetrics.nodes(mBeanClient), HasNodeMetrics::outgoingByteRate)) + .longValue())); producerReports.stream() .mapToLong(Report::max) .max() @@ -87,6 +101,13 @@ static TrackerThread create( " average throughput: %.3f MB/second%n", Utils.averageMB( duration, consumerReports.stream().mapToLong(Report::totalBytes).sum())); + System.out.printf( + " current traffic: %s/second%n", + DataSize.Byte.of( + ((Double) + sumOfAttribute( + ProducerMetrics.nodes(mBeanClient), HasNodeMetrics::incomingByteRate)) + .longValue())); consumerReports.stream() .mapToLong(Report::max) .max() @@ -168,9 +189,20 @@ public boolean closed() { public void close() { closed.set(true); waitForDone(); + Utils.swallowException(mBeanClient::close); } }; } + /** + * Sum up the latest given attribute of all beans which is instance of HasNodeMetrics. + * + * @param mbeans mBeans fetched by the receivers + * @return sum of the latest given attribute of all beans which is instance of HasNodeMetrics. + */ + static double sumOfAttribute( + Collection mbeans, ToDoubleFunction targetAttribute) { + return mbeans.stream().mapToDouble(targetAttribute).filter(d -> !Double.isNaN(d)).sum(); + } long startTime(); } diff --git a/app/src/test/java/org/astraea/app/performance/PerformanceTest.java b/app/src/test/java/org/astraea/app/performance/PerformanceTest.java index efceb3337d..d4d219aa02 100644 --- a/app/src/test/java/org/astraea/app/performance/PerformanceTest.java +++ b/app/src/test/java/org/astraea/app/performance/PerformanceTest.java @@ -20,7 +20,6 @@ import java.time.Duration; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.app.admin.Admin; @@ -40,8 +39,6 @@ void testTransactionalProducer() { String[] arguments1 = { "--bootstrap.servers", bootstrapServers(), "--topic", topic, "--transaction.size", "2" }; - var latch = new CountDownLatch(1); - BiConsumer observer = (x, y) -> latch.countDown(); var argument = Argument.parse(new Performance.Argument(), arguments1); try (var producer = argument.createProducer()) { Assertions.assertTrue(producer.transactional()); @@ -55,7 +52,6 @@ void testProducerExecutor() throws InterruptedException { "--bootstrap.servers", bootstrapServers(), "--topic", topic, "--compression", "gzip" }; var latch = new CountDownLatch(1); - BiConsumer observer = (x, y) -> latch.countDown(); var argument = Argument.parse(new Performance.Argument(), arguments1); try (var producer = argument.createProducer()) { Assertions.assertFalse(producer.transactional()); diff --git a/app/src/test/java/org/astraea/app/performance/TrackerTest.java b/app/src/test/java/org/astraea/app/performance/TrackerTest.java index aca253c7c3..0403092033 100644 --- a/app/src/test/java/org/astraea/app/performance/TrackerTest.java +++ b/app/src/test/java/org/astraea/app/performance/TrackerTest.java @@ -19,8 +19,10 @@ import java.time.Duration; import java.util.List; import org.astraea.app.common.Utils; +import org.astraea.app.metrics.client.HasNodeMetrics; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class TrackerTest { @@ -73,4 +75,18 @@ void testConsumerAndProducer() { Utils.sleep(Duration.ofSeconds(2)); Assertions.assertTrue(tracker.closed()); } + + @Test + void testSumOfAttribute() { + var hasNodeMetrics = Mockito.mock(HasNodeMetrics.class); + var hasNodeMetrics2 = Mockito.mock(HasNodeMetrics.class); + Mockito.when(hasNodeMetrics.incomingByteTotal()).thenReturn(2D); + Mockito.when(hasNodeMetrics2.incomingByteTotal()).thenReturn(3D); + Mockito.when(hasNodeMetrics.createdTimestamp()).thenReturn(System.currentTimeMillis()); + Mockito.when(hasNodeMetrics2.createdTimestamp()).thenReturn(System.currentTimeMillis()); + Assertions.assertEquals( + 5D, + TrackerThread.sumOfAttribute( + List.of(hasNodeMetrics, hasNodeMetrics2), HasNodeMetrics::incomingByteTotal)); + } }