Skip to content

Commit

Permalink
Performance tool gets hanging when number of consumers is smaller tha… (
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Aug 10, 2022
1 parent ae0147a commit 69303f1
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 49 deletions.
20 changes: 7 additions & 13 deletions app/src/main/java/org/astraea/app/consumer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,16 @@ public enum SeekStrategy {
DISTANCE_FROM_LATEST(
(kafkaConsumer, distanceFromLatest) -> {
// this mode is not supported by kafka, so we have to calculate the offset first
// 1) poll data until the assignment is completed
while (kafkaConsumer.assignment().isEmpty()) {
kafkaConsumer.poll(Duration.ofMillis(500));
}
var partitions = kafkaConsumer.assignment();
// 2) get the end offsets from all subscribed partitions
// 1) get the end offsets from all subscribed partitions
var endOffsets = kafkaConsumer.endOffsets(partitions);
// 3) calculate and then seek to the correct offset (end offset - recent offset)
// 2) calculate and then seek to the correct offset (end offset - recent offset)
endOffsets.forEach(
(tp, latest) ->
kafkaConsumer.seek(tp, Math.max(0, latest - (long) distanceFromLatest)));
}),
DISTANCE_FROM_BEGINNING(
(kafkaConsumer, distanceFromBeginning) -> {
while (kafkaConsumer.assignment().isEmpty()) {
kafkaConsumer.poll(Duration.ofMillis(500));
}
var partitions = kafkaConsumer.assignment();
var beginningOffsets = kafkaConsumer.beginningOffsets(partitions);
beginningOffsets.forEach(
Expand All @@ -189,17 +182,18 @@ public enum SeekStrategy {
@SuppressWarnings("unchecked")
SEEK_TO(
(kafkaConsumer, seekTo) -> {
while (kafkaConsumer.assignment().isEmpty()) {
kafkaConsumer.poll(Duration.ofMillis(500));
}
if (seekTo instanceof Long) {
var partitions = kafkaConsumer.assignment();
partitions.forEach(tp -> kafkaConsumer.seek(tp, (long) seekTo));
return;
}
if (seekTo instanceof Map) {
var partitions = kafkaConsumer.assignment();
((Map<TopicPartition, Long>) seekTo)
.forEach((tp, offset) -> kafkaConsumer.seek(TopicPartition.to(tp), offset));
.entrySet().stream()
// don't seek the partition which is not belonged to this consumer
.filter(e -> partitions.contains(TopicPartition.to(e.getKey())))
.forEach(e -> kafkaConsumer.seek(TopicPartition.to(e.getKey()), e.getValue()));
return;
}
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.astraea.app.consumer;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.astraea.app.admin.TopicPartition;
Expand All @@ -33,16 +34,18 @@ public interface ConsumerRebalanceListener {
void onPartitionAssigned(Set<TopicPartition> partitions);

static org.apache.kafka.clients.consumer.ConsumerRebalanceListener of(
ConsumerRebalanceListener listener) {
List<ConsumerRebalanceListener> listeners) {
return new org.apache.kafka.clients.consumer.ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> ignore) {}

@Override
public void onPartitionsAssigned(
Collection<org.apache.kafka.common.TopicPartition> partitions) {
listener.onPartitionAssigned(
partitions.stream().map(TopicPartition::from).collect(Collectors.toSet()));
listeners.forEach(
l ->
l.onPartitionAssigned(
partitions.stream().map(TopicPartition::from).collect(Collectors.toSet())));
}
};
}
Expand Down
12 changes: 12 additions & 0 deletions app/src/main/java/org/astraea/app/consumer/PartitionsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public PartitionsBuilder<Key, Value> isolation(Isolation isolation) {
return this;
}

@Override
public PartitionsBuilder<Key, Value> seek(SeekStrategy seekStrategy, long value) {
super.seek(seekStrategy, value);
return this;
}

@Override
public PartitionsBuilder<Key, Value> seek(Map<TopicPartition, Long> offsets) {
super.seek(offsets);
return this;
}

@SuppressWarnings("unchecked")
@Override
public AssignedConsumer<Key, Value> build() {
Expand Down
35 changes: 33 additions & 2 deletions app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.common.Utils;

public class TopicsBuilder<Key, Value> extends Builder<Key, Value> {
private final Set<String> topics;
Expand All @@ -47,6 +51,18 @@ public TopicsBuilder<Key, Value> consumerRebalanceListener(ConsumerRebalanceList
return this;
}

@Override
public TopicsBuilder<Key, Value> seek(SeekStrategy seekStrategy, long value) {
super.seek(seekStrategy, value);
return this;
}

@Override
public TopicsBuilder<Key, Value> seek(Map<TopicPartition, Long> offsets) {
super.seek(offsets);
return this;
}

/**
* make the consumer read data from beginning. By default, it reads the latest data.
*
Expand Down Expand Up @@ -118,7 +134,22 @@ public SubscribedConsumer<Key, Value> build() {
configs,
Deserializer.of((Deserializer<Key>) keyDeserializer),
Deserializer.of((Deserializer<Value>) valueDeserializer));
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(listener));

if (seekStrategy != SeekStrategy.NONE) {
// make sure this consumer is assigned before seeking
var latch = new CountDownLatch(1);
kafkaConsumer.subscribe(
topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
while (latch.getCount() != 0) {
// the offset will be reset, so it is fine to poll data
// TODO: should we disable auto-commit here?
kafkaConsumer.poll(Duration.ofMillis(500));
Utils.sleep(Duration.ofSeconds(1));
}
} else {
// nothing to seek so we just subscribe topics
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
}

seekStrategy.apply(kafkaConsumer, seekValue);

Expand Down Expand Up @@ -160,7 +191,7 @@ public Optional<String> groupInstanceId() {

@Override
protected void doResubscribe() {
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(listener));
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
}
}
}
18 changes: 10 additions & 8 deletions app/src/main/java/org/astraea/app/performance/ConsumerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.errors.WakeupException;
Expand All @@ -31,21 +32,22 @@

public interface ConsumerThread extends AbstractThread {

static List<ConsumerThread> create(List<Consumer<byte[], byte[]>> consumers) {
if (consumers.isEmpty()) return List.of();
static List<ConsumerThread> create(
int consumers, Supplier<Consumer<byte[], byte[]>> consumerSupplier) {
if (consumers == 0) return List.of();
var reports =
IntStream.range(0, consumers.size())
IntStream.range(0, consumers)
.mapToObj(ignored -> new Report())
.collect(Collectors.toUnmodifiableList());
var closeLatches =
IntStream.range(0, consumers.size())
IntStream.range(0, consumers)
.mapToObj(ignored -> new CountDownLatch(1))
.collect(Collectors.toUnmodifiableList());
var closeFlags =
IntStream.range(0, consumers.size())
IntStream.range(0, consumers)
.mapToObj(ignored -> new AtomicBoolean(false))
.collect(Collectors.toUnmodifiableList());
var executors = Executors.newFixedThreadPool(consumers.size());
var executors = Executors.newFixedThreadPool(consumers);
// monitor
CompletableFuture.runAsync(
() -> {
Expand All @@ -56,10 +58,10 @@ static List<ConsumerThread> create(List<Consumer<byte[], byte[]>> consumers) {
Utils.swallowException(() -> executors.awaitTermination(30, TimeUnit.SECONDS));
}
});
return IntStream.range(0, consumers.size())
return IntStream.range(0, consumers)
.mapToObj(
index -> {
var consumer = consumers.get(index);
var consumer = consumerSupplier.get();
var report = reports.get(index);
var closeLatch = closeLatches.get(index);
var closed = closeFlags.get(index);
Expand Down
26 changes: 11 additions & 15 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.app.admin.Admin;
import org.astraea.app.admin.Compression;
import org.astraea.app.admin.TopicPartition;
Expand Down Expand Up @@ -122,22 +121,19 @@ public static String execute(final Argument param) throws InterruptedException,
param.transactionSize,
dataSupplier(param),
partitionSupplier,
IntStream.range(0, param.producers)
.mapToObj(ignored -> param.createProducer())
.collect(Collectors.toUnmodifiableList()));
param.producers,
param::createProducer);
var consumerThreads =
ConsumerThread.create(
IntStream.range(0, param.consumers)
.mapToObj(
ignored ->
Consumer.forTopics(Set.of(param.topic))
.bootstrapServers(param.bootstrapServers())
.groupId(groupId)
.configs(param.configs())
.isolation(param.isolation())
.seek(latestOffsets)
.build())
.collect(Collectors.toUnmodifiableList()));
param.consumers,
() ->
Consumer.forTopics(Set.of(param.topic))
.bootstrapServers(param.bootstrapServers())
.groupId(groupId)
.configs(param.configs())
.isolation(param.isolation())
.seek(latestOffsets)
.build());

var producerReports =
producerThreads.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,22 @@ static List<ProducerThread> create(
int batchSize,
DataSupplier dataSupplier,
Supplier<Integer> partitionSupplier,
List<Producer<byte[], byte[]>> producers) {
if (producers.isEmpty()) return List.of();
int producers,
Supplier<Producer<byte[], byte[]>> producerSupplier) {
if (producers <= 0) return List.of();
var reports =
IntStream.range(0, producers.size())
IntStream.range(0, producers)
.mapToObj(ignored -> new Report())
.collect(Collectors.toUnmodifiableList());
var closeLatches =
IntStream.range(0, producers.size())
IntStream.range(0, producers)
.mapToObj(ignored -> new CountDownLatch(1))
.collect(Collectors.toUnmodifiableList());
var closedFlags =
IntStream.range(0, producers.size())
IntStream.range(0, producers)
.mapToObj(ignored -> new AtomicBoolean(false))
.collect(Collectors.toUnmodifiableList());
var executors = Executors.newFixedThreadPool(producers.size());
var executors = Executors.newFixedThreadPool(producers);
// monitor
CompletableFuture.runAsync(
() -> {
Expand All @@ -61,10 +62,10 @@ static List<ProducerThread> create(
Utils.swallowException(() -> executors.awaitTermination(30, TimeUnit.SECONDS));
}
});
return IntStream.range(0, producers.size())
return IntStream.range(0, producers)
.mapToObj(
index -> {
var producer = producers.get(index);
var producer = producerSupplier.get();
var report = reports.get(index);
var closeLatch = closeLatches.get(index);
var closed = closedFlags.get(index);
Expand Down
44 changes: 44 additions & 0 deletions app/src/test/java/org/astraea/app/consumer/ConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.common.errors.WakeupException;
Expand Down Expand Up @@ -403,4 +408,43 @@ void testResubscribePartitions() {
consumer.resubscribe();
}
}

@Test
void testCreateConsumersConcurrent() throws ExecutionException, InterruptedException {
var partitions = 3;
var topic = Utils.randomString(10);
try (var admin = Admin.of(bootstrapServers())) {
admin.creator().topic(topic).numberOfPartitions(partitions).create();
Utils.sleep(Duration.ofSeconds(3));
}

// one consume is idle
var groupId = Utils.randomString(10);
var consumers = partitions + 1;
var log = new ConcurrentHashMap<Integer, Integer>();
var closed = new AtomicBoolean(false);
var fs =
Utils.sequence(
IntStream.range(0, consumers)
.mapToObj(
index ->
CompletableFuture.runAsync(
() -> {
try (var consumer =
Consumer.forTopics(Set.of(topic))
.groupId(groupId)
.bootstrapServers(bootstrapServers())
.seek(SEEK_TO, 0)
.consumerRebalanceListener(ps -> log.put(index, ps.size()))
.build()) {
while (!closed.get()) consumer.poll(Duration.ofSeconds(2));
}
}))
.collect(Collectors.toUnmodifiableList()));
Utils.waitFor(() -> log.size() == consumers, Duration.ofSeconds(15));
Utils.waitFor(
() -> log.values().stream().filter(ps -> ps == 0).count() == 1, Duration.ofSeconds(15));
closed.set(true);
fs.get();
}
}

0 comments on commit 69303f1

Please sign in to comment.