Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show real outgoing bytes #236

Merged
merged 24 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d4bc8f2
Expose metrics from KafkaProducer/Consumer
chinghongfang Feb 17, 2022
eaba454
Unit test wait for Producer#send complete
chinghongfang Feb 17, 2022
c381245
Merge 'main' into compressedSize
chinghongfang Mar 9, 2022
e401377
Wrap kafkaMetric into a self-defined interface
chinghongfang Mar 9, 2022
b483a69
Wrap kafkaMetric into self-defined interface
chinghongfang Mar 9, 2022
c12ee28
Merge 'main' into compressedSize
chinghongfang Apr 5, 2022
964f2ad
Add unit test
chinghongfang Apr 5, 2022
c281242
Merge branch 'main' into compressedSize
chinghongfang Jun 28, 2022
d806711
Fix conflict
chinghongfang Jun 28, 2022
717daa3
Merge 'main' into compressedSize
chinghongfang Aug 4, 2022
b1d5914
Merge branch 'main' into compressedSize
chinghongfang Aug 8, 2022
be15c59
Print out real throughput from local mbean
chinghongfang Aug 8, 2022
6b1c9d2
Fix conflict
chinghongfang Aug 8, 2022
fd21f7c
Merge branch 'main' into compressedSize
chinghongfang Aug 15, 2022
17a7b38
Move `Receiver` into TrackerThread
chinghongfang Aug 16, 2022
2deec7d
Merge remote-tracking branch 'upstream/main' into compressedSize
chinghongfang Aug 22, 2022
2dae423
Swallow `Exception`s when `receiver.close()`
chinghongfang Aug 22, 2022
fb1ff5c
Change to `Utils.swallowException()`
chinghongfang Aug 22, 2022
a4bd8e1
Record by rate
chinghongfang Aug 23, 2022
8317133
Fix print out message
chinghongfang Aug 23, 2022
ee67492
Sum up latest metrics
chinghongfang Aug 23, 2022
b54e862
Merge branch 'main' into compressedSize
chinghongfang Aug 23, 2022
318765d
Fetch without `BeanCollector`
chinghongfang Aug 23, 2022
e95023c
Change print out message
chinghongfang Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions app/src/main/java/org/astraea/app/performance/Metrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.performance;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

/** Used to record statistics. This is thread safe. */
public class Metrics implements BiConsumer<Long, Integer> {
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
private double avgLatency;
private long num;
private long max;
private long min;
private long bytes;
private long currentBytes;
private final Map<MutableMetric<Double>, Long> realBytes = new HashMap<>();

public Metrics() {
avgLatency = 0;
num = 0;
max = 0;
min = Long.MAX_VALUE;
bytes = 0;
currentBytes = 0;
}

/** Simultaneously add latency and bytes. */
@Override
public synchronized void accept(Long latency, Integer bytes) {
++num;
putLatency(latency);
addBytes(bytes);
}

/** Add a new value to latency metric. */
private synchronized void putLatency(long latency) {
min = Math.min(min, latency);
max = Math.max(max, latency);
avgLatency += (((double) latency) - avgLatency) / (double) num;
}
/** Add a new value to bytes. */
private synchronized void addBytes(long bytes) {
this.currentBytes += bytes;
this.bytes += bytes;
}

/** @return Get the number of latency put. */
public synchronized long num() {
return num;
}
/** @return Get the maximum of latency put. */
public synchronized long max() {
return max;
}
/** @return Get the minimum of latency put. */
public synchronized long min() {
return min;
}
/** @return Get the average latency. */
public synchronized double avgLatency() {
return avgLatency;
}

/** @return total send/received bytes */
public synchronized long bytes() {
return bytes;
}

public synchronized long clearAndGetCurrentBytes() {
var ans = currentBytes;
currentBytes = 0;
return ans;
}

public void putRealBytesMetric(MutableMetric<Double> realBytesMetric) {
this.realBytes.put(realBytesMetric, 0L);
}

/**
* Get current real bytes since last call. Warning: Real bytes is recorded in `realBytes`. Please
* #setRealBytesMetric before get #currentRealBytes
*/
public synchronized long currentRealBytes() {
AtomicLong all = new AtomicLong(0L);

// Get sum of all instances and update
realBytes.replaceAll(
(metric, last) -> {
var totalRealBytes = (long) metric.metricValue().doubleValue();
all.addAndGet(totalRealBytes - last);
return totalRealBytes;
});

return all.get();
}
}
58 changes: 58 additions & 0 deletions app/src/main/java/org/astraea/app/performance/MutableMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.performance;

import java.util.function.Supplier;

/**
* A metric that the value may change over time. The changes may be caused by other thread, or
* change when metricValue() is called.
*/
public interface MutableMetric<T> {
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
/** Default return type is double. */
static MutableMetric<Double> of(org.apache.kafka.common.Metric kafkaMetric) {
return new MutableMetric<>() {
@Override
public String metricName() {
return kafkaMetric.metricName().name();
}

@Override
public Double metricValue() {
return Double.parseDouble(kafkaMetric.metricValue().toString());
}
};
}

static <T> MutableMetric<T> create(String name, Supplier<T> metricValue) {
return new MutableMetric<>() {
@Override
public String metricName() {
return name;
}

@Override
public T metricValue() {
return metricValue.get();
}
};
}

String metricName();

T metricValue();
}
14 changes: 13 additions & 1 deletion app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import org.astraea.app.common.Utils;
import org.astraea.app.consumer.Consumer;
import org.astraea.app.consumer.Isolation;
import org.astraea.app.metrics.client.consumer.ConsumerMetrics;
import org.astraea.app.metrics.client.producer.ProducerMetrics;
import org.astraea.app.metrics.collector.BeanCollector;
import org.astraea.app.producer.Producer;

/**
Expand Down Expand Up @@ -148,7 +151,14 @@ public static String execute(final Argument param) throws InterruptedException,
.map(ConsumerThread::report)
.collect(Collectors.toUnmodifiableList());

var tracker = TrackerThread.create(producerReports, consumerReports, param.exeTime);
// The receivers (that is, producerReceiver and consumerReceiver) will fetch mbean once per
// second. The interval should be less than 1 second. Let the receiver fetch new data.
var beanCollector = BeanCollector.builder().interval(Duration.ofMillis(500)).build();
var producerReceiver = beanCollector.register().local().fetcher(ProducerMetrics::nodes).build();
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
var consumerReceiver = beanCollector.register().local().fetcher(ConsumerMetrics::nodes).build();
var tracker =
TrackerThread.create(
producerReports, consumerReports, producerReceiver, consumerReceiver, param.exeTime);

Optional<Runnable> fileWriter =
param.CSVPath == null
Expand Down Expand Up @@ -191,6 +201,8 @@ public static String execute(final Argument param) throws InterruptedException,

consumerThreads.forEach(AbstractThread::waitForDone);
tracker.waitForDone();
producerReceiver.close();
consumerReceiver.close();
return param.topic;
}

Expand Down
35 changes: 34 additions & 1 deletion app/src/main/java/org/astraea/app/performance/TrackerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.ToDoubleFunction;
import org.astraea.app.common.DataSize;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.client.HasNodeMetrics;
import org.astraea.app.metrics.collector.Receiver;

/** Print out the given metrics. */
public interface TrackerThread extends AbstractThread {

static TrackerThread create(
List<Report> producerReports, List<Report> consumerReports, ExeTime exeTime) {
List<Report> producerReports,
List<Report> consumerReports,
Receiver producerReceiver,
Receiver consumerReceiver,
ExeTime exeTime) {
var start = System.currentTimeMillis() - Duration.ofSeconds(1).toMillis();

Function<Result, Boolean> logProducers =
Expand All @@ -56,6 +65,11 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n", result.averageBytes(duration));
System.out.printf(
" current throughput: %s/second%n", DataSize.Byte.of(result.totalCurrentBytes()));
System.out.printf(
" total real out-going: %s/second%n",
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
DataSize.Byte.of(
sumOfAttribute(producerReceiver.current(), HasNodeMetrics::outgoingByteTotal)
.longValue()));
System.out.println(" publish max latency: " + result.maxLatency + " ms");
System.out.println(" publish mim latency: " + result.minLatency + " ms");
for (int i = 0; i < result.bytes.size(); ++i) {
Expand Down Expand Up @@ -90,6 +104,11 @@ static TrackerThread create(
" average throughput: %.3f MB/second%n", result.averageBytes(duration));
System.out.printf(
" current throughput: %s/second%n", DataSize.Byte.of(result.totalCurrentBytes()));
System.out.printf(
" total real in-coming: %s/second%n",
DataSize.Byte.of(
sumOfAttribute(consumerReceiver.current(), HasNodeMetrics::incomingByteTotal)
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
.longValue()));
System.out.println(" end-to-end max latency: " + result.maxLatency + " ms");
System.out.println(" end-to-end mim latency: " + result.minLatency + " ms");
for (int i = 0; i < result.bytes.size(); ++i) {
Expand Down Expand Up @@ -159,6 +178,20 @@ public void close() {
};
}

/**
* @param mbeans mBeans fetched by the receivers
* @return sum of given attribute of all beans which is instance of HasNodeMetrics.
*/
static Double sumOfAttribute(
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
Collection<HasBeanObject> mbeans, ToDoubleFunction<HasNodeMetrics> targetAttribute) {
return mbeans.stream()
.filter(mbean -> mbean instanceof HasNodeMetrics)
.map(mbean -> (HasNodeMetrics) mbean)
.mapToDouble(targetAttribute)
.filter(d -> !Double.isNaN(d))
.sum();
}

static double avg(Duration duration, long value) {
return duration.toSeconds() <= 0
? 0
Expand Down
12 changes: 12 additions & 0 deletions app/src/main/java/org/astraea/app/producer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.astraea.app.admin.Compression;
import org.astraea.app.performance.MutableMetric;

public class Builder<Key, Value> {
private final Map<String, Object> configs = new HashMap<>();
Expand Down Expand Up @@ -148,6 +149,17 @@ private BaseProducer(org.apache.kafka.clients.producer.Producer<Key, Value> kafk
this.kafkaProducer = kafkaProducer;
}

@Override
public MutableMetric<Double> getMetric(String metricName) {
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
var kafkaMetric =
kafkaProducer.metrics().entrySet().stream()
.filter(e -> e.getKey().name().equals(metricName))
.findFirst()
.orElseThrow()
.getValue();
return MutableMetric.of(kafkaMetric);
}

@Override
public void flush() {
kafkaProducer.flush();
Expand Down
6 changes: 6 additions & 0 deletions app/src/main/java/org/astraea/app/producer/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.astraea.app.performance.MutableMetric;

/** An interface for sending records. */
public interface Producer<Key, Value> extends AutoCloseable {
Expand All @@ -38,6 +39,11 @@ public interface Producer<Key, Value> extends AutoCloseable {

void close();

/** Get a kafkaMetric by name. */
default MutableMetric<Double> getMetric(String metricName) {
return MutableMetric.create("null", () -> 0.0);
}

/** @return true if the producer supports transactional. */
default boolean transactional() {
return transactionId().isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.beust.jcommander.ParameterException;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import org.astraea.app.argument.Argument;
import org.astraea.app.consumer.Isolation;
import org.astraea.app.service.RequireBrokerCluster;
Expand All @@ -33,8 +32,6 @@ void testTransactionalProducer() {
String[] arguments1 = {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--transaction.size", "2"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertTrue(producer.transactional());
Expand All @@ -48,7 +45,13 @@ void testProducerExecutor() throws InterruptedException {
"--bootstrap.servers", bootstrapServers(), "--topic", topic, "--compression", "gzip"
};
var latch = new CountDownLatch(1);
BiConsumer<Long, Integer> observer = (x, y) -> latch.countDown();
Metrics observer =
new Metrics() {
@Override
public void accept(Long x, Integer y) {
latch.countDown();
}
};
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
Assertions.assertFalse(producer.transactional());
Expand Down
Loading