From b16c00d0c7a749511658edd2fa4495ea62ad95c1 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 6 Oct 2020 05:14:51 +0800 Subject: [PATCH] MINOR: Add proper checks to KafkaConsumer.groupMetadata (#9349) Add following checks to `KafkaConsumer.groupMetadata`: 1. null check of coordinator (replace NPE by `InvalidGroupIdException` which is same to other methods) 2. concurrent check (`groupMetadata` is not thread-safe so concurrent check is necessary) Reviewers: Jason Gustafson --- .../kafka/clients/consumer/KafkaConsumer.java | 9 +++++- .../clients/consumer/KafkaConsumerTest.java | 30 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index ac996cd69a177..2f633b6b0f559 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2227,10 +2227,17 @@ public Map endOffsets(Collection partition * Return the current group metadata associated with this consumer. * * @return consumer group metadata + * @throws org.apache.kafka.common.errors.InvalidGroupIdException if consumer does not have a group */ @Override public ConsumerGroupMetadata groupMetadata() { - return coordinator.groupMetadata(); + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + return coordinator.groupMetadata(); + } finally { + release(); + } } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index e86331c36b7ae..420a226351401 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -103,6 +103,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -2002,6 +2003,35 @@ public void testGetGroupMetadata() { assertEquals(groupInstanceId, groupMetadataAfterPoll.groupInstanceId()); } + @Test + public void testInvalidGroupMetadata() throws InterruptedException { + Time time = new MockTime(); + SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + initMetadata(client, Collections.singletonMap(topic, 1)); + KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, + new RoundRobinAssignor(), true, groupInstanceId); + consumer.subscribe(singletonList(topic)); + // concurrent access is illegal + client.enableBlockingUntilWakeup(1); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(() -> consumer.poll(Duration.ofSeconds(5))); + try { + TimeUnit.SECONDS.sleep(1); + assertThrows(ConcurrentModificationException.class, consumer::groupMetadata); + client.wakeup(); + consumer.wakeup(); + } finally { + service.shutdown(); + assertTrue(service.awaitTermination(10, TimeUnit.SECONDS)); + } + + // accessing closed consumer is illegal + consumer.close(Duration.ofSeconds(5)); + assertThrows(IllegalStateException.class, consumer::groupMetadata); + } + private KafkaConsumer consumerWithPendingAuthenticationError() { Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);