Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show real outgoing bytes #236

Merged
merged 24 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d4bc8f2
Expose metrics from KafkaProducer/Consumer
chinghongfang Feb 17, 2022
eaba454
Unit test wait for Producer#send complete
chinghongfang Feb 17, 2022
c381245
Merge 'main' into compressedSize
chinghongfang Mar 9, 2022
e401377
Wrap kafkaMetric into a self-defined interface
chinghongfang Mar 9, 2022
b483a69
Wrap kafkaMetric into self-defined interface
chinghongfang Mar 9, 2022
c12ee28
Merge 'main' into compressedSize
chinghongfang Apr 5, 2022
964f2ad
Add unit test
chinghongfang Apr 5, 2022
c281242
Merge branch 'main' into compressedSize
chinghongfang Jun 28, 2022
d806711
Fix conflict
chinghongfang Jun 28, 2022
717daa3
Merge 'main' into compressedSize
chinghongfang Aug 4, 2022
b1d5914
Merge branch 'main' into compressedSize
chinghongfang Aug 8, 2022
be15c59
Print out real throughput from local mbean
chinghongfang Aug 8, 2022
6b1c9d2
Fix conflict
chinghongfang Aug 8, 2022
fd21f7c
Merge branch 'main' into compressedSize
chinghongfang Aug 15, 2022
17a7b38
Move `Receiver` into TrackerThread
chinghongfang Aug 16, 2022
2deec7d
Merge remote-tracking branch 'upstream/main' into compressedSize
chinghongfang Aug 22, 2022
2dae423
Swallow `Exception`s when `receiver.close()`
chinghongfang Aug 22, 2022
fb1ff5c
Change to `Utils.swallowException()`
chinghongfang Aug 22, 2022
a4bd8e1
Record by rate
chinghongfang Aug 23, 2022
8317133
Fix print out message
chinghongfang Aug 23, 2022
ee67492
Sum up latest metrics
chinghongfang Aug 23, 2022
b54e862
Merge branch 'main' into compressedSize
chinghongfang Aug 23, 2022
318765d
Fetch without `BeanCollector`
chinghongfang Aug 23, 2022
e95023c
Change print out message
chinghongfang Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions app/src/main/java/org/astraea/consumer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;

public class Builder<Key, Value> {

Expand Down Expand Up @@ -98,6 +99,16 @@ public Collection<Record<Key, Value>> poll(Duration timeout) {
.collect(Collectors.toList());
}

/** Get a kafkaMetric by name. */
@Override
public Metric getMetric(String metricName) {
return kafkaConsumer.metrics().entrySet().stream()
.filter(e -> e.getKey().name().equals(metricName))
.findFirst()
.orElseThrow()
.getValue();
}

@Override
public void wakeup() {
kafkaConsumer.wakeup();
Expand Down
4 changes: 4 additions & 0 deletions app/src/main/java/org/astraea/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.common.Metric;

/** An interface for polling records. */
public interface Consumer<Key, Value> extends AutoCloseable {
Expand All @@ -18,6 +19,9 @@ public interface Consumer<Key, Value> extends AutoCloseable {
*/
void wakeup();

/** Get a kafkaMetric by name. */
Metric getMetric(String metricName);
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved

@Override
void close();

Expand Down
22 changes: 22 additions & 0 deletions app/src/main/java/org/astraea/performance/Metrics.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.astraea.performance;

import java.util.function.BiConsumer;
import org.apache.kafka.common.Metric;

/** Used to record statistics. This is thread safe. */
public class Metrics implements BiConsumer<Long, Long> {
Expand All @@ -10,6 +11,8 @@ public class Metrics implements BiConsumer<Long, Long> {
private long min;
private long bytes;
private long currentBytes;
private Metric realBytes = null;
private long lastRealBytes = 0L;

public Metrics() {
avgLatency = 0;
Expand Down Expand Up @@ -66,4 +69,23 @@ public synchronized long clearAndGetCurrentBytes() {
currentBytes = 0;
return ans;
}

public void setRealBytesMetric(Metric realBytesMetric) {
this.realBytes = realBytesMetric;
}

/**
* Get current real bytes since last call. Warning: Real bytes is recorded in `realBytes`. Please
* #setRealBytesMetric before get #currentRealBytes
*/
public synchronized long currentRealBytes() {
if (realBytes != null && realBytes.metricValue() instanceof Double) {
var totalRealBytes = (long) Double.parseDouble(realBytes.metricValue().toString());
var ans = totalRealBytes - lastRealBytes;
lastRealBytes = totalRealBytes;
return ans;
} else {
return 0L;
}
}
}
6 changes: 6 additions & 0 deletions app/src/main/java/org/astraea/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public static Future<String> execute(final Argument param)

static ThreadPool.Executor consumerExecutor(
Consumer<byte[], byte[]> consumer, BiConsumer<Long, Long> observer, Manager manager) {
if (observer instanceof Metrics) {
((Metrics) observer).setRealBytesMetric(consumer.getMetric("incoming-byte-total"));
}
return new ThreadPool.Executor() {
@Override
public State execute() {
Expand Down Expand Up @@ -183,6 +186,9 @@ static ThreadPool.Executor producerExecutor(
BiConsumer<Long, Long> observer,
List<Integer> partitions,
Manager manager) {
if (observer instanceof Metrics) {
((Metrics) observer).setRealBytesMetric(producer.getMetric("outgoing-byte-total"));
}
return new ThreadPool.Executor() {
@Override
public State execute() throws InterruptedException {
Expand Down
16 changes: 16 additions & 0 deletions app/src/main/java/org/astraea/performance/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ private boolean logProducers(Result result) {
System.out.printf(" average throughput: %.3f MB/second%n", result.averageBytes(duration));
System.out.printf(
" current throughput: %s/second%n", DataUnit.Byte.of(result.totalCurrentBytes()));
System.out.printf(
" current throughput (real): %s/second%n",
DataUnit.Byte.of(result.totalCurrentRealBytes()));
System.out.println(" publish max latency: " + result.maxLatency + " ms");
System.out.println(" publish mim latency: " + result.minLatency + " ms");
for (int i = 0; i < result.bytes.size(); ++i) {
Expand All @@ -88,6 +91,9 @@ private boolean logConsumers(Result result) {
System.out.printf(" average throughput: %.3f MB/second%n", result.averageBytes(duration));
System.out.printf(
" current throughput: %s/second%n", DataUnit.Byte.of(result.totalCurrentBytes()));
System.out.printf(
" current throughput (real): %s/second%n",
DataUnit.Byte.of(result.totalCurrentRealBytes()));
System.out.println(" end-to-end max latency: " + result.maxLatency + " ms");
System.out.println(" end-to-end mim latency: " + result.minLatency + " ms");
for (int i = 0; i < result.bytes.size(); ++i) {
Expand All @@ -114,13 +120,15 @@ private static Result result(List<Metrics> metrics) {
var completed = 0;
var bytes = new ArrayList<Long>();
var currentBytes = new ArrayList<Long>();
var currentRealBytes = new ArrayList<Long>();
var averageLatencies = new ArrayList<Double>();
var max = 0L;
var min = Long.MAX_VALUE;
for (Metrics data : metrics) {
completed += data.num();
bytes.add(data.bytes());
currentBytes.add(data.clearAndGetCurrentBytes());
currentRealBytes.add(data.currentRealBytes());
averageLatencies.add(data.avgLatency());
max = Math.max(max, data.max());
min = Math.min(min, data.min());
Expand All @@ -129,6 +137,7 @@ private static Result result(List<Metrics> metrics) {
completed,
Collections.unmodifiableList(bytes),
Collections.unmodifiableList(currentBytes),
Collections.unmodifiableList(currentRealBytes),
Collections.unmodifiableList(averageLatencies),
min,
max);
Expand All @@ -138,6 +147,7 @@ static class Result {
public final long completedRecords;
public final List<Long> bytes;
public final List<Long> currentBytes;
public final List<Long> currentRealBytes;
public final List<Double> averageLatencies;
public final long minLatency;
public final long maxLatency;
Expand All @@ -146,12 +156,14 @@ static class Result {
long completedRecords,
List<Long> bytes,
List<Long> currentBytes,
List<Long> currentRealBytes,
List<Double> averageLatencies,
long minLatency,
long maxLatency) {
this.completedRecords = completedRecords;
this.bytes = bytes;
this.currentBytes = currentBytes;
this.currentRealBytes = currentRealBytes;
this.averageLatencies = averageLatencies;
this.minLatency = minLatency;
this.maxLatency = maxLatency;
Expand All @@ -168,5 +180,9 @@ long totalBytes() {
long totalCurrentBytes() {
return currentBytes.stream().mapToLong(i -> i).sum();
}

Long totalCurrentRealBytes() {
return currentRealBytes.stream().mapToLong(i -> i).sum();
}
}
}
10 changes: 10 additions & 0 deletions app/src/main/java/org/astraea/producer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.astraea.consumer.Header;

public class Builder<Key, Value> {
Expand Down Expand Up @@ -121,6 +122,15 @@ public void flush() {
kafkaProducer.flush();
}

@Override
public Metric getMetric(String metricName) {
return kafkaProducer.metrics().entrySet().stream()
.filter(e -> e.getKey().name().equals(metricName))
.findFirst()
.orElseThrow()
.getValue();
}

@Override
public KafkaProducer<Key, Value> kafkaProducer() {
return kafkaProducer;
Expand Down
4 changes: 4 additions & 0 deletions app/src/main/java/org/astraea/producer/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;

/** An interface for sending records. */
public interface Producer<Key, Value> extends AutoCloseable {
Expand All @@ -12,6 +13,9 @@ public interface Producer<Key, Value> extends AutoCloseable {

void close();

/** Get a kafkaMetric by name. */
Metric getMetric(String metricName);

KafkaProducer<Key, Value> kafkaProducer();

static Builder<byte[], byte[]> builder() {
Expand Down
31 changes: 31 additions & 0 deletions app/src/test/java/org/astraea/performance/PerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
import static org.astraea.performance.Performance.partition;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.astraea.Utils;
import org.astraea.concurrent.ThreadPool;
import org.astraea.consumer.Consumer;
import org.astraea.producer.Producer;
import org.astraea.service.RequireBrokerCluster;
import org.astraea.topic.TopicAdmin;
import org.astraea.utils.DataUnit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -151,4 +154,32 @@ void testConsumerExecutor() throws InterruptedException, ExecutionException {
Assertions.assertNotEquals(1024, metrics.bytes());
}
}

@Test
void testRealThroughput() throws InterruptedException {
Map<String, Object> prop = Map.of(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
Metrics producerMetrics = new Metrics();
var param = new Performance.Argument();
param.brokers = bootstrapServers();
param.topic = "testProducerExecutor-" + System.currentTimeMillis();
param.recordSize = DataUnit.KiB.of(100);
param.fixedSize = true;
param.consumers = 0;
try (ThreadPool.Executor executor =
Performance.producerExecutor(
Producer.builder().brokers(bootstrapServers()).configs(prop).build(),
new Performance.Argument(),
producerMetrics,
List.of(-1),
new Manager(param, List.of(producerMetrics), List.of()))) {

// Compressed size should be less than raw record size.
executor.execute();
Utils.waitFor(() -> producerMetrics.num() == 1);
Assertions.assertTrue(
producerMetrics.currentRealBytes() < producerMetrics.clearAndGetCurrentBytes());
Assertions.assertEquals(0, producerMetrics.currentRealBytes());
Assertions.assertEquals(0, producerMetrics.clearAndGetCurrentBytes());
}
}
}