Skip to content

Commit

Permalink
Enable to trace the elapsed time of assigning/revoking partitions (op…
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Aug 12, 2022
1 parent 270a058 commit 5703206
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 196 deletions.
3 changes: 3 additions & 0 deletions app/src/main/java/org/astraea/app/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ default Collection<Record<Key, Value>> poll(Duration timeout) {
/** unsubscribe all partitions. */
void unsubscribe();

/** @return current partitions assigned to this consumer */
Set<TopicPartition> assignments();

/**
* Create a consumer builder by setting specific topics
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,24 @@ public interface ConsumerRebalanceListener {
*/
void onPartitionAssigned(Set<TopicPartition> partitions);

/**
* It is called when this consumer has to give up some partitions when running re-balance.
*
* @param partitions to give up
*/
default void onPartitionsRevoked(Set<TopicPartition> partitions) {}

static org.apache.kafka.clients.consumer.ConsumerRebalanceListener of(
List<ConsumerRebalanceListener> listeners) {
return new org.apache.kafka.clients.consumer.ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> ignore) {}
public void onPartitionsRevoked(
Collection<org.apache.kafka.common.TopicPartition> partitions) {
listeners.forEach(
l ->
l.onPartitionsRevoked(
partitions.stream().map(TopicPartition::from).collect(Collectors.toSet())));
}

@Override
public void onPartitionsAssigned(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.astraea.app.admin.TopicPartition;
Expand Down Expand Up @@ -130,5 +131,12 @@ public AssignedConsumerImpl(
protected void doResubscribe() {
kafkaConsumer.assign(partitions.stream().map(TopicPartition::to).collect(toList()));
}

@Override
public Set<TopicPartition> assignments() {
return kafkaConsumer.assignment().stream()
.map(TopicPartition::from)
.collect(Collectors.toUnmodifiableSet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package org.astraea.app.consumer;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.astraea.app.admin.TopicPartition;

/**
* This inherited consumer offers function related to consumer group.
Expand All @@ -45,10 +42,4 @@ public interface SubscribedConsumer<Key, Value> extends Consumer<Key, Value> {

/** @return group instance id (static member) */
Optional<String> groupInstanceId();

/**
* @return the historical subscription. key is the time of getting assignments. value is the
* assignments.
*/
Map<Long, Set<TopicPartition>> historicalSubscription();
}
50 changes: 11 additions & 39 deletions app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.astraea.app.admin.TopicPartition;
Expand All @@ -36,8 +35,6 @@ public class TopicsBuilder<Key, Value> extends Builder<Key, Value> {
private final Set<String> topics;
private ConsumerRebalanceListener listener = ignore -> {};

private boolean enableTrace = false;

TopicsBuilder(Set<String> topics) {
this.topics = requireNonNull(topics);
}
Expand Down Expand Up @@ -123,17 +120,6 @@ public TopicsBuilder<Key, Value> isolation(Isolation isolation) {
return this;
}

/**
* enable to trace the historical subscription. see {@link
* SubscribedConsumer#historicalSubscription()}
*
* @return this builder
*/
public TopicsBuilder<Key, Value> enableTrace() {
this.enableTrace = true;
return this;
}

public TopicsBuilder<Key, Value> disableAutoCommitOffsets() {
return config(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
Expand All @@ -150,22 +136,11 @@ public SubscribedConsumer<Key, Value> build() {
Deserializer.of((Deserializer<Key>) keyDeserializer),
Deserializer.of((Deserializer<Value>) valueDeserializer));

var tracker =
new ConsumerRebalanceListener() {
private final Map<Long, Set<TopicPartition>> history = new ConcurrentHashMap<>();

@Override
public void onPartitionAssigned(Set<TopicPartition> partitions) {
if (enableTrace) history.put(System.currentTimeMillis(), Set.copyOf(partitions));
}
};

if (seekStrategy != SeekStrategy.NONE) {
// make sure this consumer is assigned before seeking
var latch = new CountDownLatch(1);
kafkaConsumer.subscribe(
topics,
ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown(), tracker)));
topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
while (latch.getCount() != 0) {
// the offset will be reset, so it is fine to poll data
// TODO: should we disable auto-commit here?
Expand All @@ -174,31 +149,26 @@ public void onPartitionAssigned(Set<TopicPartition> partitions) {
}
} else {
// nothing to seek so we just subscribe topics
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener, tracker)));
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
}

seekStrategy.apply(kafkaConsumer, seekValue);

return new SubscribedConsumerImpl<>(
kafkaConsumer, topics, listener, Collections.unmodifiableMap(tracker.history));
return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener);
}

private static class SubscribedConsumerImpl<Key, Value> extends Builder.BaseConsumer<Key, Value>
implements SubscribedConsumer<Key, Value> {
private final Set<String> topics;
private final ConsumerRebalanceListener listener;

private final Map<Long, Set<TopicPartition>> history;

public SubscribedConsumerImpl(
org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer,
Set<String> topics,
ConsumerRebalanceListener listener,
Map<Long, Set<TopicPartition>> history) {
ConsumerRebalanceListener listener) {
super(kafkaConsumer);
this.topics = topics;
this.listener = listener;
this.history = history;
}

@Override
Expand All @@ -221,13 +191,15 @@ public Optional<String> groupInstanceId() {
}

@Override
public Map<Long, Set<TopicPartition>> historicalSubscription() {
return history;
protected void doResubscribe() {
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
}

@Override
protected void doResubscribe() {
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
public Set<TopicPartition> assignments() {
return kafkaConsumer.assignment().stream()
.map(TopicPartition::from)
.collect(Collectors.toUnmodifiableSet());
}
}
}
85 changes: 75 additions & 10 deletions app/src/main/java/org/astraea/app/performance/ConsumerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@

import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.errors.WakeupException;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.common.Utils;
import org.astraea.app.consumer.ConsumerRebalanceListener;
import org.astraea.app.consumer.SubscribedConsumer;

public interface ConsumerThread extends AbstractThread {

static List<ConsumerThread> create(
int consumers, Supplier<SubscribedConsumer<byte[], byte[]>> consumerSupplier) {
int consumers,
Function<ConsumerRebalanceListener, SubscribedConsumer<byte[], byte[]>> consumerSupplier) {
if (consumers == 0) return List.of();
var closeLatches =
IntStream.range(0, consumers)
Expand All @@ -53,18 +57,19 @@ static List<ConsumerThread> create(
return IntStream.range(0, consumers)
.mapToObj(
index -> {
var consumer = consumerSupplier.get();
var report = new Report();
var listener = new Listener(report);
var closeLatch = closeLatches.get(index);
var closed = new AtomicBoolean(false);
var subscribed = new AtomicBoolean(true);
executors.execute(
() -> {
try {
try (var consumer = consumerSupplier.apply(listener)) {
while (!closed.get()) {
if (subscribed.get()) consumer.resubscribe();
else {
consumer.unsubscribe();
report.assignments(Set.of());
Utils.sleep(Duration.ofSeconds(1));
continue;
}
Expand All @@ -80,15 +85,12 @@ record ->
record.offset(),
System.currentTimeMillis() - record.timestamp(),
record.serializedKeySize() + record.serializedValueSize()));
report.assignments(consumer.assignments());
}
} catch (WakeupException ignore) {
// Stop polling and being ready to clean up
} finally {
try {
consumer.close();
} finally {
closeLatch.countDown();
}
closeLatch.countDown();
}
});
return new ConsumerThread() {
Expand Down Expand Up @@ -121,7 +123,6 @@ public Report report() {
@Override
public void close() {
closed.set(true);
consumer.wakeup();
Utils.swallowException(closeLatch::await);
}
};
Expand All @@ -135,4 +136,68 @@ public void close() {

/** @return report of this thread */
Report report();

class Listener implements ConsumerRebalanceListener {
private final Report report;
private long previousCall = System.currentTimeMillis();
private long maxLatency = 0;
private long sumLatency = 0;
private long count = 0;

public Listener(Report report) {
this.report = report;
}

@Override
public void onPartitionAssigned(Set<TopicPartition> partitions) {
record();
}

@Override
public void onPartitionsRevoked(Set<TopicPartition> partitions) {
record();
}

private void record() {
count += 1;
var current = System.currentTimeMillis();
var diff = current - previousCall;
maxLatency = Math.max(maxLatency, diff);
sumLatency += diff;
previousCall = current;
report.maxSubscriptionLatency(maxLatency);
report.avgSubscriptionLatency((double) sumLatency / count);
}
}

class Report extends org.astraea.app.performance.Report.Impl {
private volatile long maxSubscriptionLatency = 0;
private volatile double avgSubscriptionLatency = 0;

private volatile Set<TopicPartition> assignments;

public long maxSubscriptionLatency() {
return maxSubscriptionLatency;
}

public void maxSubscriptionLatency(long maxSubscriptionLatency) {
this.maxSubscriptionLatency = maxSubscriptionLatency;
}

public double avgSubscriptionLatency() {
return avgSubscriptionLatency;
}

public void avgSubscriptionLatency(double avgSubscriptionLatency) {
this.avgSubscriptionLatency = avgSubscriptionLatency;
}

public void assignments(Set<TopicPartition> assignments) {
this.assignments = assignments;
}

public Set<TopicPartition> assignments() {
return assignments;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,14 @@ public static String execute(final Argument param) throws InterruptedException,
var consumerThreads =
ConsumerThread.create(
param.consumers,
() ->
listener ->
Consumer.forTopics(Set.of(param.topic))
.bootstrapServers(param.bootstrapServers())
.groupId(groupId)
.configs(param.configs())
.isolation(param.isolation())
.seek(latestOffsets)
.consumerRebalanceListener(listener)
.build());

var producerReports =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ static List<ProducerThread> create(
return IntStream.range(0, producers)
.mapToObj(
index -> {
var producer = producerSupplier.get();
var report = new Report();
var closeLatch = closeLatches.get(index);
var closed = new AtomicBoolean(false);
executors.execute(
() -> {
try {
try (var producer = producerSupplier.get()) {
while (!closed.get()) {
var data =
IntStream.range(0, batchSize)
Expand Down Expand Up @@ -105,11 +104,7 @@ static List<ProducerThread> create(
m.serializedValueSize() + m.serializedKeySize())));
}
} finally {
try {
producer.close();
} finally {
closeLatch.countDown();
}
closeLatch.countDown();
}
});
return new ProducerThread() {
Expand Down Expand Up @@ -141,4 +136,6 @@ public void close() {

/** @return report of this thread */
Report report();

class Report extends org.astraea.app.performance.Report.Impl {}
}
Loading

0 comments on commit 5703206

Please sign in to comment.