Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show real outgoing bytes #236

Merged
merged 24 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d4bc8f2
Expose metrics from KafkaProducer/Consumer
chinghongfang Feb 17, 2022
eaba454
Unit test wait for Producer#send complete
chinghongfang Feb 17, 2022
c381245
Merge 'main' into compressedSize
chinghongfang Mar 9, 2022
e401377
Wrap kafkaMetric into a self-defined interface
chinghongfang Mar 9, 2022
b483a69
Wrap kafkaMetric into self-defined interface
chinghongfang Mar 9, 2022
c12ee28
Merge 'main' into compressedSize
chinghongfang Apr 5, 2022
964f2ad
Add unit test
chinghongfang Apr 5, 2022
c281242
Merge branch 'main' into compressedSize
chinghongfang Jun 28, 2022
d806711
Fix conflict
chinghongfang Jun 28, 2022
717daa3
Merge 'main' into compressedSize
chinghongfang Aug 4, 2022
b1d5914
Merge branch 'main' into compressedSize
chinghongfang Aug 8, 2022
be15c59
Print out real throughput from local mbean
chinghongfang Aug 8, 2022
6b1c9d2
Fix conflict
chinghongfang Aug 8, 2022
fd21f7c
Merge branch 'main' into compressedSize
chinghongfang Aug 15, 2022
17a7b38
Move `Receiver` into TrackerThread
chinghongfang Aug 16, 2022
2deec7d
Merge remote-tracking branch 'upstream/main' into compressedSize
chinghongfang Aug 22, 2022
2dae423
Swallow `Exception`s when `receiver.close()`
chinghongfang Aug 22, 2022
fb1ff5c
Change to `Utils.swallowException()`
chinghongfang Aug 22, 2022
a4bd8e1
Record by rate
chinghongfang Aug 23, 2022
8317133
Fix print out message
chinghongfang Aug 23, 2022
ee67492
Sum up latest metrics
chinghongfang Aug 23, 2022
b54e862
Merge branch 'main' into compressedSize
chinghongfang Aug 23, 2022
318765d
Fetch without `BeanCollector`
chinghongfang Aug 23, 2022
e95023c
Change print out message
chinghongfang Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions app/src/main/java/org/astraea/app/performance/TrackerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
package org.astraea.app.performance;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import org.astraea.app.common.DataSize;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.client.HasNodeMetrics;
import org.astraea.app.metrics.client.producer.ProducerMetrics;

/** Print out the given metrics. */
public interface TrackerThread extends AbstractThread {
Expand All @@ -33,6 +39,7 @@ static TrackerThread create(
Supplier<List<ConsumerThread.Report>> consumerReporter,
ExeTime exeTime) {
var start = System.currentTimeMillis() - Duration.ofSeconds(1).toMillis();
var mBeanClient = MBeanClient.local();

Function<Duration, Boolean> logProducers =
duration -> {
Expand All @@ -47,6 +54,13 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n",
Utils.averageMB(
duration, producerReports.stream().mapToLong(Report::totalBytes).sum()));
System.out.printf(
" current traffic: %s/second%n",
DataSize.Byte.of(
((Double)
sumOfAttribute(
ProducerMetrics.nodes(mBeanClient), HasNodeMetrics::outgoingByteRate))
.longValue()));
producerReports.stream()
.mapToLong(Report::max)
.max()
Expand Down Expand Up @@ -87,6 +101,13 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n",
Utils.averageMB(
duration, consumerReports.stream().mapToLong(Report::totalBytes).sum()));
System.out.printf(
" current traffic: %s/second%n",
DataSize.Byte.of(
((Double)
sumOfAttribute(
ProducerMetrics.nodes(mBeanClient), HasNodeMetrics::incomingByteRate))
.longValue()));
consumerReports.stream()
.mapToLong(Report::max)
.max()
Expand Down Expand Up @@ -168,9 +189,20 @@ public boolean closed() {
public void close() {
closed.set(true);
waitForDone();
Utils.swallowException(mBeanClient::close);
}
};
}
/**
* Sum up the latest given attribute of all beans which is instance of HasNodeMetrics.
*
* @param mbeans mBeans fetched by the receivers
* @return sum of the latest given attribute of all beans which is instance of HasNodeMetrics.
*/
static double sumOfAttribute(
Collection<HasNodeMetrics> mbeans, ToDoubleFunction<HasNodeMetrics> targetAttribute) {
return mbeans.stream().mapToDouble(targetAttribute).filter(d -> !Double.isNaN(d)).sum();
}

long startTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.Admin;
Expand All @@ -40,8 +39,6 @@ void testTransactionalProducer() {
String[] arguments1 = {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--transaction.size", "2"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertTrue(producer.transactional());
Expand All @@ -55,7 +52,6 @@ void testProducerExecutor() throws InterruptedException {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--compression", "gzip"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertFalse(producer.transactional());
Expand Down
16 changes: 16 additions & 0 deletions app/src/test/java/org/astraea/app/performance/TrackerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import java.time.Duration;
import java.util.List;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.client.HasNodeMetrics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TrackerTest {

Expand Down Expand Up @@ -73,4 +75,18 @@ void testConsumerAndProducer() {
Utils.sleep(Duration.ofSeconds(2));
Assertions.assertTrue(tracker.closed());
}

@Test
void testSumOfAttribute() {
var hasNodeMetrics = Mockito.mock(HasNodeMetrics.class);
var hasNodeMetrics2 = Mockito.mock(HasNodeMetrics.class);
Mockito.when(hasNodeMetrics.incomingByteTotal()).thenReturn(2D);
Mockito.when(hasNodeMetrics2.incomingByteTotal()).thenReturn(3D);
Mockito.when(hasNodeMetrics.createdTimestamp()).thenReturn(System.currentTimeMillis());
Mockito.when(hasNodeMetrics2.createdTimestamp()).thenReturn(System.currentTimeMillis());
Assertions.assertEquals(
5D,
TrackerThread.sumOfAttribute(
List.of(hasNodeMetrics, hasNodeMetrics2), HasNodeMetrics::incomingByteTotal));
}
}