From cb624eff340cdf60c8ada453ae7f99da6198617b Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Thu, 7 Dec 2023 23:16:09 +0100 Subject: [PATCH] use most of the normal construction code for testing and migrate config tests --- .../internals/AsyncKafkaConsumer.java | 66 ++++- .../internals/AsyncKafkaConsumerTest.java | 238 ------------------ .../internals/AsyncKafkaConsumerUnitTest.java | 112 ++++++--- 3 files changed, 138 insertions(+), 278 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 10d706ac7a77f..a80bbf00ff9d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -251,13 +251,25 @@ private void process(final GroupMetadataUpdateEvent event) { AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { - this(config, keyDeserializer, valueDeserializer, new LinkedBlockingQueue<>()); + this( + config, + keyDeserializer, + valueDeserializer, + Time.SYSTEM, + ApplicationEventHandler::new, + FetchCollector::new, + ConsumerMetadata::new + ); } + // Visible for testing AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer, - final LinkedBlockingQueue backgroundEventQueue) { + final Time time, + final ApplicationEventHandlerSupplier applicationEventHandlerFactory, + final FetchCollectorFactory fetchCollectorFactory, + final ConsumerMetadataFactory metadataFactory) { try { GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -269,7 +281,7 @@ private void process(final GroupMetadataUpdateEvent event) { log.debug("Initializing the Kafka consumer"); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); - this.time = Time.SYSTEM; + this.time = time; List reporters = CommonClientConfigs.metricsReporters(clientId, config); this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); @@ -283,7 +295,7 @@ private void process(final GroupMetadataUpdateEvent event) { ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(), interceptorList, Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer)); - this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners); + this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners); final List addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); @@ -320,7 +332,8 @@ private void process(final GroupMetadataUpdateEvent event) { metadata, applicationEventQueue, requestManagersSupplier); - this.applicationEventHandler = new ApplicationEventHandler(logContext, + this.applicationEventHandler = applicationEventHandlerFactory.build( + logContext, time, applicationEventQueue, applicationEventProcessorSupplier, @@ -335,7 +348,7 @@ private void process(final GroupMetadataUpdateEvent event) { this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig); // The FetchCollector is only used on the application thread. - this.fetchCollector = new FetchCollector<>(logContext, + this.fetchCollector = fetchCollectorFactory.build(logContext, metadata, subscriptions, fetchConfig, @@ -482,6 +495,47 @@ private void process(final GroupMetadataUpdateEvent event) { requestManagersSupplier); } + // auxiliary interface for testing + interface ApplicationEventHandlerSupplier { + + ApplicationEventHandler build( + final LogContext logContext, + final Time time, + final BlockingQueue applicationEventQueue, + final Supplier applicationEventProcessorSupplier, + final Supplier networkClientDelegateSupplier, + final Supplier requestManagersSupplier + ); + + } + + // auxiliary interface for testing + interface FetchCollectorFactory { + + FetchCollector build( + final LogContext logContext, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final Deserializers deserializers, + final FetchMetricsManager metricsManager, + final Time time + ); + + } + + // auxiliary interface for testing + interface ConsumerMetadataFactory { + + ConsumerMetadata build( + final ConsumerConfig config, + final SubscriptionState subscriptions, + final LogContext logContext, + final ClusterResourceListeners clusterResourceListeners + ); + + } + private Optional initializeGroupMetadata(final ConsumerConfig config, final GroupRebalanceConfig groupRebalanceConfig) { final Optional groupMetadata = initializeGroupMetadata( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index e97409658ce4c..73bd4c8e01f8c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -19,7 +19,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -46,7 +45,6 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -90,7 +88,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -102,7 +99,6 @@ import org.mockito.ArgumentMatchers; import org.mockito.MockedConstruction; import org.mockito.stubbing.Answer; -import org.opentest4j.AssertionFailedError; public class AsyncKafkaConsumerTest { @@ -367,240 +363,6 @@ public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false); } - @Test - public void testGroupMetadataAfterCreationWithGroupIdIsNull() { - final Properties props = requiredConsumerProperties(); - final ConsumerConfig config = new ConsumerConfig(props); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - - assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - final Throwable exception = assertThrows(InvalidGroupIdException.class, consumer::groupMetadata); - assertEquals( - "To use the group management or offset commit APIs, you must " + - "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.", - exception.getMessage() - ); - } - } - - @Test - public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() { - final String groupId = "consumerGroupA"; - final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - - final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - - assertEquals(groupId, groupMetadata.groupId()); - assertEquals(Optional.empty(), groupMetadata.groupInstanceId()); - assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); - assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); - } - } - - @Test - public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() { - final String groupId = "consumerGroupA"; - final String groupInstanceId = "groupInstanceId1"; - final Properties props = requiredConsumerPropertiesAndGroupId(groupId); - props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); - final ConsumerConfig config = new ConsumerConfig(props); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - - final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - - assertEquals(groupId, groupMetadata.groupId()); - assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); - assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); - assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); - } - } - - @Test - public void testGroupMetadataUpdateSingleCall() { - final String groupId = "consumerGroupA"; - final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final int generation = 1; - final String memberId = "newMemberId"; - final ConsumerGroupMetadata expectedGroupMetadata = new ConsumerGroupMetadata( - groupId, - generation, - memberId, - Optional.empty() - ); - final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent( - generation, - memberId - ); - backgroundEventQueue.add(groupMetadataUpdateEvent); - consumer.assign(singletonList(new TopicPartition("topic", 0))); - consumer.poll(Duration.ZERO); - - final ConsumerGroupMetadata actualGroupMetadata = consumer.groupMetadata(); - - assertEquals(expectedGroupMetadata, actualGroupMetadata); - - final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate = consumer.groupMetadata(); - - assertEquals(expectedGroupMetadata, secondActualGroupMetadataWithoutUpdate); - } - } - - @Test - public void testBackgroundError() { - final String groupId = "consumerGroupA"; - final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition"); - final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException); - backgroundEventQueue.add(errorBackgroundEvent); - consumer.assign(singletonList(new TopicPartition("topic", 0))); - - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); - - assertEquals(expectedException.getMessage(), exception.getMessage()); - } - } - - @Test - public void testMultipleBackgroundErrors() { - final String groupId = "consumerGroupA"; - final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition"); - final ErrorBackgroundEvent errorBackgroundEvent1 = new ErrorBackgroundEvent(expectedException1); - backgroundEventQueue.add(errorBackgroundEvent1); - final KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam"); - final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2); - backgroundEventQueue.add(errorBackgroundEvent2); - consumer.assign(singletonList(new TopicPartition("topic", 0))); - - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); - - assertEquals(expectedException1.getMessage(), exception.getMessage()); - assertTrue(backgroundEventQueue.isEmpty()); - } - } - - @Test - public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); - final ConsumerConfig config = new ConsumerConfig(props); - - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } - } - - @Test - public void testGroupRemoteAssignorUnusedInGenericProtocol() { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); - props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT)); - props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); - final ConsumerConfig config = new ConsumerConfig(props); - - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } - } - - @Test - public void testGroupRemoteAssignorUsedInConsumerProtocol() { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); - props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); - props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); - final ConsumerConfig config = new ConsumerConfig(props); - - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } - } - - @Test - public void testGroupIdNull() { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); - props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); - final ConsumerConfig config = new ConsumerConfig(props); - - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } - } - - @Test - public void testGroupIdNotNullAndValid() { - final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA"); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); - props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); - final ConsumerConfig config = new ConsumerConfig(props); - - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } - } - - @Test - public void testGroupIdEmpty() { - testInvalidGroupId(""); - } - - @Test - public void testGroupIdOnlyWhitespaces() { - testInvalidGroupId(" "); - } - - private void testInvalidGroupId(final String groupId) { - final Properties props = requiredConsumerPropertiesAndGroupId(groupId); - final ConsumerConfig config = new ConsumerConfig(props); - - final Exception exception = assertThrows( - KafkaException.class, - () -> new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()) - ); - - assertEquals("Failed to construct kafka consumer", exception.getMessage()); - } - - private Properties requiredConsumerPropertiesAndGroupId(final String groupId) { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - return props; - } - - private Properties requiredConsumerProperties() { - final Properties props = new Properties(); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); - return props; - } - private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) { // Uncompleted future that will time out if used CompletableFuture> committedFuture = new CompletableFuture<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java index 291893d5539f5..0855542a483a6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java @@ -19,10 +19,12 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -41,19 +43,17 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; import org.apache.kafka.clients.Metadata.LeaderAndEpoch; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; @@ -66,10 +66,9 @@ import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ListOffsetsRequest; -import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -85,12 +84,8 @@ public class AsyncKafkaConsumerUnitTest { private AsyncKafkaConsumer consumer = null; private final Time time = new MockTime(); - private final Deserializers deserializers = mock(Deserializers.class); - private final FetchBuffer fetchBuffer = mock(FetchBuffer.class); private final FetchCollector fetchCollector = mock(FetchCollector.class); - private final ConsumerInterceptors interceptors = mock(ConsumerInterceptors.class); private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class); - private final BlockingQueue backgroundEventQueue = mock(BlockingQueue.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); @AfterEach @@ -103,42 +98,91 @@ public void resetAll() { } private AsyncKafkaConsumer setup() { - return setup("group-id"); + Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); + final ConsumerConfig config = new ConsumerConfig(props); + return setup(config); } private AsyncKafkaConsumer setupWithEmptyGroupId() { - return setup(""); + Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, ""); + final ConsumerConfig config = new ConsumerConfig(props); + return setup(config); } - private AsyncKafkaConsumer setup(String groupId) { - String clientId = ""; - long retryBackoffMs = 100; - int defaultApiTimeoutMs = 100; - LogContext logContext = new LogContext(); - SubscriptionState subscriptionState = new SubscriptionState(logContext, OffsetResetStrategy.LATEST); + private AsyncKafkaConsumer setup(ConsumerConfig config) { return new AsyncKafkaConsumer<>( - logContext, - clientId, - deserializers, - fetchBuffer, - fetchCollector, - interceptors, + config, + new StringDeserializer(), + new StringDeserializer(), time, - applicationEventHandler, - backgroundEventQueue, - new Metrics(), - subscriptionState, - metadata, - retryBackoffMs, - defaultApiTimeoutMs, - Collections.singletonList(new RangeAssignor()), - groupId + (a,b,c,d,e,f) -> applicationEventHandler, + (a,b,c,d,e,f,g) -> fetchCollector, + (a,b,c,d) -> metadata ); } + @Test + public void testGroupIdNull() { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); + props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); + final ConsumerConfig config = new ConsumerConfig(props); + + consumer = setup(config); + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + } + + @Test + public void testGroupIdNotNullAndValid() { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); + props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); + final ConsumerConfig config = new ConsumerConfig(props); + + consumer = setup(config); + assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + } + + @Test + public void testGroupIdEmpty() { + testInvalidGroupId(""); + } + + @Test + public void testGroupIdOnlyWhitespaces() { + testInvalidGroupId(" "); + } + + private void testInvalidGroupId(final String groupId) { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + final ConsumerConfig config = new ConsumerConfig(props); + + final Exception exception = assertThrows( + KafkaException.class, + () -> consumer = setup(config) + ); + + assertEquals("Failed to construct kafka consumer", exception.getMessage()); + } + + private Properties requiredConsumerProperties() { + final Properties props = new Properties(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); + return props; + } + @Test public void testInvalidGroupId() { - assertThrows(InvalidGroupIdException.class, this::setupWithEmptyGroupId); + KafkaException e = assertThrows(KafkaException.class, this::setupWithEmptyGroupId); + assertTrue(e.getCause() instanceof InvalidGroupIdException); } @Test