Skip to content

Commit

Permalink
Enable to unsubscribe/resubscribe topics/partitions (opensource4you#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Aug 10, 2022
1 parent 1648e7b commit a16c6fd
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 5 deletions.
16 changes: 15 additions & 1 deletion app/src/main/java/org/astraea/app/consumer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.astraea.app.admin.TopicPartition;
Expand Down Expand Up @@ -115,8 +116,9 @@ public Builder<Key, Value> isolation(Isolation isolation) {
/** @return consumer instance. The different builders may return inherited consumer interface. */
public abstract Consumer<Key, Value> build();

protected static class BaseConsumer<Key, Value> implements Consumer<Key, Value> {
protected abstract static class BaseConsumer<Key, Value> implements Consumer<Key, Value> {
protected final org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer;
private final AtomicBoolean subscribed = new AtomicBoolean(true);

public BaseConsumer(org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
Expand All @@ -143,6 +145,18 @@ public void wakeup() {
public void close() {
kafkaConsumer.close();
}

@Override
public void resubscribe() {
if (subscribed.compareAndSet(false, true)) doResubscribe();
}

@Override
public void unsubscribe() {
if (subscribed.compareAndSet(true, false)) kafkaConsumer.unsubscribe();
}

protected abstract void doResubscribe();
}

public enum SeekStrategy {
Expand Down
6 changes: 6 additions & 0 deletions app/src/main/java/org/astraea/app/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ default Collection<Record<Key, Value>> poll(Duration timeout) {
@Override
void close();

/** resubscribe partitions or rejoin the consumer group. */
void resubscribe();

/** unsubscribe all partitions. */
void unsubscribe();

/**
* Create a consumer builder by setting specific topics
*
Expand Down
13 changes: 11 additions & 2 deletions app/src/main/java/org/astraea/app/consumer/PartitionsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,23 @@ public AssignedConsumer<Key, Value> build() {

seekStrategy.apply(kafkaConsumer, seekValue);

return new AssignedConsumerImpl<>(kafkaConsumer);
return new AssignedConsumerImpl<>(kafkaConsumer, partitions);
}

private static class AssignedConsumerImpl<Key, Value> extends Builder.BaseConsumer<Key, Value>
implements AssignedConsumer<Key, Value> {

public AssignedConsumerImpl(Consumer<Key, Value> kafkaConsumer) {
private final Set<TopicPartition> partitions;

public AssignedConsumerImpl(
Consumer<Key, Value> kafkaConsumer, Set<TopicPartition> partitions) {
super(kafkaConsumer);
this.partitions = partitions;
}

@Override
protected void doResubscribe() {
kafkaConsumer.assign(partitions.stream().map(TopicPartition::to).collect(toList()));
}
}
}
15 changes: 13 additions & 2 deletions app/src/main/java/org/astraea/app/consumer/TopicsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,21 @@ public SubscribedConsumer<Key, Value> build() {

seekStrategy.apply(kafkaConsumer, seekValue);

return new SubscribedConsumerImpl<>(kafkaConsumer);
return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener);
}

private static class SubscribedConsumerImpl<Key, Value> extends Builder.BaseConsumer<Key, Value>
implements SubscribedConsumer<Key, Value> {
private final Set<String> topics;
private final ConsumerRebalanceListener listener;

public SubscribedConsumerImpl(
org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer) {
org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer,
Set<String> topics,
ConsumerRebalanceListener listener) {
super(kafkaConsumer);
this.topics = topics;
this.listener = listener;
}

@Override
Expand All @@ -151,5 +157,10 @@ public String memberId() {
public Optional<String> groupInstanceId() {
return kafkaConsumer.groupMetadata().groupInstanceId();
}

@Override
protected void doResubscribe() {
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(listener));
}
}
}
21 changes: 21 additions & 0 deletions app/src/main/java/org/astraea/app/performance/ConsumerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,17 @@ static List<ConsumerThread> create(List<Consumer<byte[], byte[]>> consumers) {
var report = reports.get(index);
var closeLatch = closeLatches.get(index);
var closed = closeFlags.get(index);
var subscribed = new AtomicBoolean(true);
executors.execute(
() -> {
try {
while (!closed.get()) {
if (subscribed.get()) consumer.resubscribe();
else {
consumer.unsubscribe();
Utils.sleep(Duration.ofSeconds(1));
continue;
}
consumer
.poll(Duration.ofSeconds(1))
.forEach(
Expand Down Expand Up @@ -102,6 +109,16 @@ public boolean closed() {
return closeLatch.getCount() == 0;
}

@Override
public void resubscribe() {
subscribed.set(true);
}

@Override
public void unsubscribe() {
subscribed.set(false);
}

@Override
public Report report() {
return report;
Expand All @@ -118,6 +135,10 @@ public void close() {
.collect(Collectors.toUnmodifiableList());
}

void resubscribe();

void unsubscribe();

/** @return report of this thread */
Report report();
}
56 changes: 56 additions & 0 deletions app/src/test/java/org/astraea/app/consumer/ConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,60 @@ void testInvalidSeekValue() {
.build(),
"seek value should >= 0");
}

@Test
void testResubscribeTopics() {
var topic = Utils.randomString(10);
produceData(topic, 100);
try (var consumer =
Consumer.forTopics(Set.of(topic))
.bootstrapServers(bootstrapServers())
.fromBeginning()
.build()) {
Assertions.assertNotEquals(0, consumer.poll(Duration.ofSeconds(5)).size());
consumer.unsubscribe();
Assertions.assertThrows(
IllegalStateException.class, () -> consumer.poll(Duration.ofSeconds(2)));
// unsubscribe is idempotent op
consumer.unsubscribe();
consumer.unsubscribe();
consumer.unsubscribe();

consumer.resubscribe();
Assertions.assertNotEquals(0, consumer.poll(Duration.ofSeconds(5)).size());

// resubscribe is idempotent op
consumer.resubscribe();
consumer.resubscribe();
consumer.resubscribe();
}
}

@Test
void testResubscribePartitions() {
var topic = Utils.randomString(10);
produceData(topic, 100);
try (var consumer =
Consumer.forPartitions(Set.of(TopicPartition.of(topic, 0)))
.bootstrapServers(bootstrapServers())
.fromBeginning()
.build()) {
Assertions.assertNotEquals(0, consumer.poll(Duration.ofSeconds(5)).size());
consumer.unsubscribe();
Assertions.assertThrows(
IllegalStateException.class, () -> consumer.poll(Duration.ofSeconds(2)));
// unsubscribe is idempotent op
consumer.unsubscribe();
consumer.unsubscribe();
consumer.unsubscribe();

consumer.resubscribe();
Assertions.assertNotEquals(0, consumer.poll(Duration.ofSeconds(5)).size());

// resubscribe is idempotent op
consumer.resubscribe();
consumer.resubscribe();
consumer.resubscribe();
}
}
}

0 comments on commit a16c6fd

Please sign in to comment.