diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 196bed50286e1..3cba5418eaa64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -173,6 +173,25 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { + this( + config, + keyDeserializer, + valueDeserializer, + Time.SYSTEM, + ApplicationEventHandler::new, + FetchCollector::new, + ConsumerMetadata::new + ); + } + + // Visible for testing + AsyncKafkaConsumer(final ConsumerConfig config, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Time time, + final ApplicationEventHandlerSupplier applicationEventHandlerFactory, + final FetchCollectorFactory fetchCollectorFactory, + final ConsumerMetadataFactory metadataFactory) { try { GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -184,7 +203,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { log.debug("Initializing the Kafka consumer"); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); - this.time = Time.SYSTEM; + this.time = time; this.metrics = createMetrics(config, time); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); @@ -195,7 +214,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(), interceptorList, Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer)); - this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners); + this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners); final List addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); @@ -231,7 +250,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { metadata, applicationEventQueue, requestManagersSupplier); - this.applicationEventHandler = new ApplicationEventHandler(logContext, + this.applicationEventHandler = applicationEventHandlerFactory.build( + logContext, time, applicationEventQueue, applicationEventProcessorSupplier, @@ -246,7 +266,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig); // The FetchCollector is only used on the application thread. - this.fetchCollector = new FetchCollector<>(logContext, + this.fetchCollector = fetchCollectorFactory.build(logContext, metadata, subscriptions, fetchConfig, @@ -386,6 +406,47 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } + // auxiliary interface for testing + interface ApplicationEventHandlerSupplier { + + ApplicationEventHandler build( + final LogContext logContext, + final Time time, + final BlockingQueue applicationEventQueue, + final Supplier applicationEventProcessorSupplier, + final Supplier networkClientDelegateSupplier, + final Supplier requestManagersSupplier + ); + + } + + // auxiliary interface for testing + interface FetchCollectorFactory { + + FetchCollector build( + final LogContext logContext, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final Deserializers deserializers, + final FetchMetricsManager metricsManager, + final Time time + ); + + } + + // auxiliary interface for testing + interface ConsumerMetadataFactory { + + ConsumerMetadata build( + final ConsumerConfig config, + final SubscriptionState subscriptions, + final LogContext logContext, + final ClusterResourceListeners clusterResourceListeners + ); + + } + private Optional initializeGroupMetadata(final ConsumerConfig config, final GroupRebalanceConfig groupRebalanceConfig) { final Optional groupMetadata = initializeGroupMetadata( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 1e34835317fb6..be8e7a056f4a9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -19,7 +19,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -44,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -53,7 +51,6 @@ import java.util.concurrent.Future; import java.util.stream.Stream; import org.apache.kafka.clients.ClientResponse; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -77,7 +74,6 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -89,7 +85,6 @@ import org.mockito.ArgumentMatchers; import org.mockito.MockedConstruction; import org.mockito.stubbing.Answer; -import org.opentest4j.AssertionFailedError; public class AsyncKafkaConsumerTest { @@ -354,70 +349,6 @@ public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false); } - @Test - public void testGroupIdNull() { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); - props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); - final ConsumerConfig config = new ConsumerConfig(props); - - try (AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } - } - - @Test - public void testGroupIdNotNullAndValid() { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); - props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); - final ConsumerConfig config = new ConsumerConfig(props); - - try (AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } - } - - @Test - public void testGroupIdEmpty() { - testInvalidGroupId(""); - } - - @Test - public void testGroupIdOnlyWhitespaces() { - testInvalidGroupId(" "); - } - - private void testInvalidGroupId(final String groupId) { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - final ConsumerConfig config = new ConsumerConfig(props); - - final Exception exception = assertThrows( - KafkaException.class, - () -> new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()) - ); - - assertEquals("Failed to construct kafka consumer", exception.getMessage()); - } - - private Properties requiredConsumerProperties() { - final Properties props = new Properties(); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); - return props; - } - private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) { // Uncompleted future that will time out if used CompletableFuture> committedFuture = new CompletableFuture<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java index d9e39d4ce2c46..0a8bae184ea96 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerUnitTest.java @@ -19,10 +19,12 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -41,19 +43,17 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; import org.apache.kafka.clients.Metadata.LeaderAndEpoch; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; @@ -66,10 +66,9 @@ import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ListOffsetsRequest; -import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -85,12 +84,8 @@ public class AsyncKafkaConsumerUnitTest { private AsyncKafkaConsumer consumer = null; private final Time time = new MockTime(); - private final Deserializers deserializers = mock(Deserializers.class); - private final FetchBuffer fetchBuffer = mock(FetchBuffer.class); private final FetchCollector fetchCollector = mock(FetchCollector.class); - private final ConsumerInterceptors interceptors = mock(ConsumerInterceptors.class); private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class); - private final BlockingQueue backgroundEventQueue = mock(BlockingQueue.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); @AfterEach @@ -103,42 +98,91 @@ public void resetAll() { } private AsyncKafkaConsumer setup() { - return setup("group-id"); + Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); + final ConsumerConfig config = new ConsumerConfig(props); + return setup(config); } private AsyncKafkaConsumer setupWithEmptyGroupId() { - return setup(""); + Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, ""); + final ConsumerConfig config = new ConsumerConfig(props); + return setup(config); } - private AsyncKafkaConsumer setup(String groupId) { - String clientId = ""; - long retryBackoffMs = 100; - int defaultApiTimeoutMs = 100; - LogContext logContext = new LogContext(); - SubscriptionState subscriptionState = new SubscriptionState(logContext, OffsetResetStrategy.LATEST); + private AsyncKafkaConsumer setup(ConsumerConfig config) { return new AsyncKafkaConsumer<>( - logContext, - clientId, - deserializers, - fetchBuffer, - fetchCollector, - interceptors, + config, + new StringDeserializer(), + new StringDeserializer(), time, - applicationEventHandler, - backgroundEventQueue, - new Metrics(), - subscriptionState, - metadata, - retryBackoffMs, - defaultApiTimeoutMs, - Collections.singletonList(new RangeAssignor()), - groupId + (a,b,c,d,e,f) -> applicationEventHandler, + (a,b,c,d,e,f,g) -> fetchCollector, + (a,b,c,d) -> metadata ); } + @Test + public void testGroupIdNull() { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); + props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); + final ConsumerConfig config = new ConsumerConfig(props); + + consumer = setup(config); + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + } + + @Test + public void testGroupIdNotNullAndValid() { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); + props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); + final ConsumerConfig config = new ConsumerConfig(props); + + consumer = setup(config); + assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + } + + @Test + public void testGroupIdEmpty() { + testInvalidGroupId(""); + } + + @Test + public void testGroupIdOnlyWhitespaces() { + testInvalidGroupId(" "); + } + + private void testInvalidGroupId(final String groupId) { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + final ConsumerConfig config = new ConsumerConfig(props); + + final Exception exception = assertThrows( + KafkaException.class, + () -> consumer = setup(config) + ); + + assertEquals("Failed to construct kafka consumer", exception.getMessage()); + } + + private Properties requiredConsumerProperties() { + final Properties props = new Properties(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); + return props; + } + @Test public void testInvalidGroupId() { - assertThrows(InvalidGroupIdException.class, this::setupWithEmptyGroupId); + KafkaException e = assertThrows(KafkaException.class, this::setupWithEmptyGroupId); + assertTrue(e.getCause() instanceof InvalidGroupIdException); } @Test