Skip to content

Commit

Permalink
MINOR: Add proper checks to KafkaConsumer.groupMetadata (apache#9349)
Browse files Browse the repository at this point in the history
Add following checks to `KafkaConsumer.groupMetadata`:

1. null check of coordinator (replace NPE by `InvalidGroupIdException` which is same to other methods)
2. concurrent check (`groupMetadata` is not thread-safe so concurrent check is necessary)

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
chia7712 authored and javierfreire committed Oct 8, 2020
1 parent 63ba1b9 commit b16c00d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2227,10 +2227,17 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
* Return the current group metadata associated with this consumer.
*
* @return consumer group metadata
* @throws org.apache.kafka.common.errors.InvalidGroupIdException if consumer does not have a group
*/
@Override
public ConsumerGroupMetadata groupMetadata() {
return coordinator.groupMetadata();
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
return coordinator.groupMetadata();
} finally {
release();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -2002,6 +2003,35 @@ public void testGetGroupMetadata() {
assertEquals(groupInstanceId, groupMetadataAfterPoll.groupInstanceId());
}

@Test
public void testInvalidGroupMetadata() throws InterruptedException {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
new RoundRobinAssignor(), true, groupInstanceId);
consumer.subscribe(singletonList(topic));
// concurrent access is illegal
client.enableBlockingUntilWakeup(1);
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
try {
TimeUnit.SECONDS.sleep(1);
assertThrows(ConcurrentModificationException.class, consumer::groupMetadata);
client.wakeup();
consumer.wakeup();
} finally {
service.shutdown();
assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
}

// accessing closed consumer is illegal
consumer.close(Duration.ofSeconds(5));
assertThrows(IllegalStateException.class, consumer::groupMetadata);
}

private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
Expand Down

0 comments on commit b16c00d

Please sign in to comment.