Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show real outgoing bytes #236

Merged
merged 24 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d4bc8f2
Expose metrics from KafkaProducer/Consumer
chinghongfang Feb 17, 2022
eaba454
Unit test wait for Producer#send complete
chinghongfang Feb 17, 2022
c381245
Merge 'main' into compressedSize
chinghongfang Mar 9, 2022
e401377
Wrap kafkaMetric into a self-defined interface
chinghongfang Mar 9, 2022
b483a69
Wrap kafkaMetric into self-defined interface
chinghongfang Mar 9, 2022
c12ee28
Merge 'main' into compressedSize
chinghongfang Apr 5, 2022
964f2ad
Add unit test
chinghongfang Apr 5, 2022
c281242
Merge branch 'main' into compressedSize
chinghongfang Jun 28, 2022
d806711
Fix conflict
chinghongfang Jun 28, 2022
717daa3
Merge 'main' into compressedSize
chinghongfang Aug 4, 2022
b1d5914
Merge branch 'main' into compressedSize
chinghongfang Aug 8, 2022
be15c59
Print out real throughput from local mbean
chinghongfang Aug 8, 2022
6b1c9d2
Fix conflict
chinghongfang Aug 8, 2022
fd21f7c
Merge branch 'main' into compressedSize
chinghongfang Aug 15, 2022
17a7b38
Move `Receiver` into TrackerThread
chinghongfang Aug 16, 2022
2deec7d
Merge remote-tracking branch 'upstream/main' into compressedSize
chinghongfang Aug 22, 2022
2dae423
Swallow `Exception`s when `receiver.close()`
chinghongfang Aug 22, 2022
fb1ff5c
Change to `Utils.swallowException()`
chinghongfang Aug 22, 2022
a4bd8e1
Record by rate
chinghongfang Aug 23, 2022
8317133
Fix print out message
chinghongfang Aug 23, 2022
ee67492
Sum up latest metrics
chinghongfang Aug 23, 2022
b54e862
Merge branch 'main' into compressedSize
chinghongfang Aug 23, 2022
318765d
Fetch without `BeanCollector`
chinghongfang Aug 23, 2022
e95023c
Change print out message
chinghongfang Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import org.astraea.app.common.Utils;
import org.astraea.app.consumer.Consumer;
import org.astraea.app.consumer.Isolation;
import org.astraea.app.metrics.client.consumer.ConsumerMetrics;
import org.astraea.app.metrics.client.producer.ProducerMetrics;
import org.astraea.app.metrics.collector.BeanCollector;
import org.astraea.app.producer.Producer;

/**
Expand Down Expand Up @@ -146,7 +149,14 @@ public static String execute(final Argument param) throws InterruptedException,
.map(ConsumerThread::report)
.collect(Collectors.toUnmodifiableList());

var tracker = TrackerThread.create(producerReports, consumerReports, param.exeTime);
// The receivers (that is, producerReceiver and consumerReceiver) will fetch mbean once per
// second. The interval should be less than 1 second. Let the receiver fetch new data.
var beanCollector = BeanCollector.builder().interval(Duration.ofMillis(500)).build();
var producerReceiver = beanCollector.register().local().fetcher(ProducerMetrics::nodes).build();
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
var consumerReceiver = beanCollector.register().local().fetcher(ConsumerMetrics::nodes).build();
var tracker =
TrackerThread.create(
producerReports, consumerReports, producerReceiver, consumerReceiver, param.exeTime);

Optional<Runnable> fileWriter =
param.CSVPath == null
Expand Down Expand Up @@ -205,6 +215,8 @@ public static String execute(final Argument param) throws InterruptedException,

consumerThreads.forEach(AbstractThread::waitForDone);
tracker.waitForDone();
producerReceiver.close();
consumerReceiver.close();
fileWriterFuture.join();
chaos.join();
return param.topic;
Expand Down
31 changes: 31 additions & 0 deletions app/src/main/java/org/astraea/app/performance/TrackerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@
package org.astraea.app.performance;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import org.astraea.app.common.DataSize;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.client.HasNodeMetrics;
import org.astraea.app.metrics.collector.Receiver;

/** Print out the given metrics. */
public interface TrackerThread extends AbstractThread {

static TrackerThread create(
List<ProducerThread.Report> producerReports,
List<ConsumerThread.Report> consumerReports,
Receiver producerReceiver,
Receiver consumerReceiver,
ExeTime exeTime) {
var start = System.currentTimeMillis() - Duration.ofSeconds(1).toMillis();

Expand All @@ -55,6 +63,11 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n",
Utils.averageMB(
duration, producerReports.stream().mapToLong(Report::totalBytes).sum()));
System.out.printf(
" total real out-going: %s/second%n",
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
DataSize.Byte.of(
sumOfAttribute(producerReceiver.current(), HasNodeMetrics::outgoingByteTotal)
.longValue()));
producerReports.stream()
.mapToLong(Report::max)
.max()
Expand Down Expand Up @@ -95,6 +108,11 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n",
Utils.averageMB(
duration, consumerReports.stream().mapToLong(Report::totalBytes).sum()));
System.out.printf(
" total real in-coming: %s/second%n",
DataSize.Byte.of(
sumOfAttribute(consumerReceiver.current(), HasNodeMetrics::incomingByteTotal)
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
.longValue()));
consumerReports.stream()
.mapToLong(Report::max)
.max()
Expand Down Expand Up @@ -166,6 +184,19 @@ public void close() {
}
};
}
/**
* @param mbeans mBeans fetched by the receivers
* @return sum of given attribute of all beans which is instance of HasNodeMetrics.
*/
static Double sumOfAttribute(
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
Collection<HasBeanObject> mbeans, ToDoubleFunction<HasNodeMetrics> targetAttribute) {
return mbeans.stream()
.filter(mbean -> mbean instanceof HasNodeMetrics)
.map(mbean -> (HasNodeMetrics) mbean)
.mapToDouble(targetAttribute)
.filter(d -> !Double.isNaN(d))
.sum();
}

long startTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.beust.jcommander.ParameterException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import org.astraea.app.argument.Argument;
import org.astraea.app.consumer.Isolation;
import org.astraea.app.service.RequireBrokerCluster;
Expand All @@ -34,8 +33,6 @@ void testTransactionalProducer() {
String[] arguments1 = {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--transaction.size", "2"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertTrue(producer.transactional());
Expand All @@ -49,7 +46,6 @@ void testProducerExecutor() throws InterruptedException {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--compression", "gzip"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertFalse(producer.transactional());
Expand Down
43 changes: 39 additions & 4 deletions app/src/test/java/org/astraea/app/performance/TrackerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@
import java.time.Duration;
import java.util.List;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.client.HasNodeMetrics;
import org.astraea.app.metrics.collector.Receiver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TrackerTest {

@Test
void testClose() {
var tracker = TrackerThread.create(List.of(), List.of(), ExeTime.of("1records"));
var mockReceiver = Mockito.mock(Receiver.class);
var tracker =
TrackerThread.create(
List.of(), List.of(), mockReceiver, mockReceiver, ExeTime.of("1records"));
Assertions.assertFalse(tracker.closed());
tracker.close();
Assertions.assertTrue(tracker.closed());
Expand All @@ -35,7 +42,10 @@ void testClose() {
@Test
void testZeroConsumer() {
var producerReport = new ProducerThread.Report();
var tracker = TrackerThread.create(List.of(producerReport), List.of(), ExeTime.of("1records"));
var mockReceiver = Mockito.mock(Receiver.class);
var tracker =
TrackerThread.create(
List.of(producerReport), List.of(), mockReceiver, mockReceiver, ExeTime.of("1records"));
Assertions.assertFalse(tracker.closed());
producerReport.record("topic", 1, 100, 1L, 1);
// wait to done
Expand All @@ -47,8 +57,14 @@ void testZeroConsumer() {
void testExeTime() {
var producerReport = new ProducerThread.Report();
var consumerReport = new ConsumerThread.Report();
var mockReceiver = Mockito.mock(Receiver.class);
var tracker =
TrackerThread.create(List.of(producerReport), List.of(consumerReport), ExeTime.of("2s"));
TrackerThread.create(
List.of(producerReport),
List.of(consumerReport),
mockReceiver,
mockReceiver,
ExeTime.of("2s"));
Assertions.assertFalse(tracker.closed());
producerReport.record("topic", 1, 100, 1L, 1);
consumerReport.record("topic", 1, 100, 1L, 1);
Expand All @@ -60,14 +76,33 @@ void testExeTime() {
void testConsumerAndProducer() {
var producerReport = new ProducerThread.Report();
var consumerReport = new ConsumerThread.Report();
var mockReceiver = Mockito.mock(Receiver.class);
var tracker =
TrackerThread.create(
List.of(producerReport), List.of(consumerReport), ExeTime.of("1records"));
List.of(producerReport),
List.of(consumerReport),
mockReceiver,
mockReceiver,
ExeTime.of("1records"));
Assertions.assertFalse(tracker.closed());
producerReport.record("topic", 1, 100, 1L, 1);
consumerReport.record("topic", 1, 100, 1L, 1);
// wait to done
Utils.sleep(Duration.ofSeconds(2));
Assertions.assertTrue(tracker.closed());
}

@Test
void testSumOfAttribute() {
var hasNodeMetrics = Mockito.mock(HasNodeMetrics.class);
var hasNodeMetrics2 = Mockito.mock(HasNodeMetrics.class);
var hasBeanObject = Mockito.mock(HasBeanObject.class);
Mockito.when(hasNodeMetrics.incomingByteTotal()).thenReturn(2D);
Mockito.when(hasNodeMetrics2.incomingByteTotal()).thenReturn(3D);
Assertions.assertEquals(
5D,
TrackerThread.sumOfAttribute(
List.of(hasNodeMetrics, hasNodeMetrics2, hasBeanObject),
HasNodeMetrics::incomingByteTotal));
}
}