Skip to content

Commit

Permalink
use most of the normal construction code for testing and migrate conf…
Browse files Browse the repository at this point in the history
…ig tests
  • Loading branch information
lucasbru committed Dec 8, 2023
1 parent 1432b31 commit cb624ef
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,25 @@ private void process(final GroupMetadataUpdateEvent event) {
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
this(config, keyDeserializer, valueDeserializer, new LinkedBlockingQueue<>());
this(
config,
keyDeserializer,
valueDeserializer,
Time.SYSTEM,
ApplicationEventHandler::new,
FetchCollector::new,
ConsumerMetadata::new
);
}

// Visible for testing
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) {
final Time time,
final ApplicationEventHandlerSupplier applicationEventHandlerFactory,
final FetchCollectorFactory<K, V> fetchCollectorFactory,
final ConsumerMetadataFactory metadataFactory) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
Expand All @@ -269,7 +281,7 @@ private void process(final GroupMetadataUpdateEvent event) {

log.debug("Initializing the Kafka consumer");
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.time = time;
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
Expand All @@ -283,7 +295,7 @@ private void process(final GroupMetadataUpdateEvent event) {
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
interceptorList,
Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);

Expand Down Expand Up @@ -320,7 +332,8 @@ private void process(final GroupMetadataUpdateEvent event) {
metadata,
applicationEventQueue,
requestManagersSupplier);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
applicationEventQueue,
applicationEventProcessorSupplier,
Expand All @@ -335,7 +348,7 @@ private void process(final GroupMetadataUpdateEvent event) {
this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig);

// The FetchCollector is only used on the application thread.
this.fetchCollector = new FetchCollector<>(logContext,
this.fetchCollector = fetchCollectorFactory.build(logContext,
metadata,
subscriptions,
fetchConfig,
Expand Down Expand Up @@ -482,6 +495,47 @@ private void process(final GroupMetadataUpdateEvent event) {
requestManagersSupplier);
}

// auxiliary interface for testing
interface ApplicationEventHandlerSupplier {

ApplicationEventHandler build(
final LogContext logContext,
final Time time,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final Supplier<RequestManagers> requestManagersSupplier
);

}

// auxiliary interface for testing
interface FetchCollectorFactory<K,V> {

FetchCollector<K,V> build(
final LogContext logContext,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final Deserializers<K, V> deserializers,
final FetchMetricsManager metricsManager,
final Time time
);

}

// auxiliary interface for testing
interface ConsumerMetadataFactory {

ConsumerMetadata build(
final ConsumerConfig config,
final SubscriptionState subscriptions,
final LogContext logContext,
final ClusterResourceListeners clusterResourceListeners
);

}

private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final ConsumerConfig config,
final GroupRebalanceConfig groupRebalanceConfig) {
final Optional<ConsumerGroupMetadata> groupMetadata = initializeGroupMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -46,7 +45,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -90,7 +88,6 @@
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -102,7 +99,6 @@
import org.mockito.ArgumentMatchers;
import org.mockito.MockedConstruction;
import org.mockito.stubbing.Answer;
import org.opentest4j.AssertionFailedError;

public class AsyncKafkaConsumerTest {

Expand Down Expand Up @@ -367,240 +363,6 @@ public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
}

@Test
public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
final Properties props = requiredConsumerProperties();
final ConsumerConfig config = new ConsumerConfig(props);
try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {

assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
final Throwable exception = assertThrows(InvalidGroupIdException.class, consumer::groupMetadata);
assertEquals(
"To use the group management or offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.",
exception.getMessage()
);
}
}

@Test
public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {

final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();

assertEquals(groupId, groupMetadata.groupId());
assertEquals(Optional.empty(), groupMetadata.groupInstanceId());
assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId());
assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId());
}
}

@Test
public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {

final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();

assertEquals(groupId, groupMetadata.groupId());
assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId());
assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId());
assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId());
}
}

@Test
public void testGroupMetadataUpdateSingleCall() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) {
final int generation = 1;
final String memberId = "newMemberId";
final ConsumerGroupMetadata expectedGroupMetadata = new ConsumerGroupMetadata(
groupId,
generation,
memberId,
Optional.empty()
);
final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent(
generation,
memberId
);
backgroundEventQueue.add(groupMetadataUpdateEvent);
consumer.assign(singletonList(new TopicPartition("topic", 0)));
consumer.poll(Duration.ZERO);

final ConsumerGroupMetadata actualGroupMetadata = consumer.groupMetadata();

assertEquals(expectedGroupMetadata, actualGroupMetadata);

final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate = consumer.groupMetadata();

assertEquals(expectedGroupMetadata, secondActualGroupMetadataWithoutUpdate);
}
}

@Test
public void testBackgroundError() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) {
final KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition");
final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException);
backgroundEventQueue.add(errorBackgroundEvent);
consumer.assign(singletonList(new TopicPartition("topic", 0)));

final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));

assertEquals(expectedException.getMessage(), exception.getMessage());
}
}

@Test
public void testMultipleBackgroundErrors() {
final String groupId = "consumerGroupA";
final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) {
final KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition");
final ErrorBackgroundEvent errorBackgroundEvent1 = new ErrorBackgroundEvent(expectedException1);
backgroundEventQueue.add(errorBackgroundEvent1);
final KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam");
final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2);
backgroundEventQueue.add(errorBackgroundEvent2);
consumer.assign(singletonList(new TopicPartition("topic", 0)));

final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));

assertEquals(expectedException1.getMessage(), exception.getMessage());
assertTrue(backgroundEventQueue.isEmpty());
}
}

@Test
public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
final ConsumerConfig config = new ConsumerConfig(props);

try (AsyncKafkaConsumer<String, String> ignored =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
}

@Test
public void testGroupRemoteAssignorUnusedInGenericProtocol() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
final ConsumerConfig config = new ConsumerConfig(props);

try (AsyncKafkaConsumer<String, String> ignored =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
}

@Test
public void testGroupRemoteAssignorUsedInConsumerProtocol() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
final ConsumerConfig config = new ConsumerConfig(props);

try (AsyncKafkaConsumer<String, String> ignored =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
}

@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
} catch (final Exception exception) {
throw new AssertionFailedError("The following exception was not expected:", exception);
}
}

@Test
public void testGroupIdNotNullAndValid() {
final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
} catch (final Exception exception) {
throw new AssertionFailedError("The following exception was not expected:", exception);
}
}

@Test
public void testGroupIdEmpty() {
testInvalidGroupId("");
}

@Test
public void testGroupIdOnlyWhitespaces() {
testInvalidGroupId(" ");
}

private void testInvalidGroupId(final String groupId) {
final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
final ConsumerConfig config = new ConsumerConfig(props);

final Exception exception = assertThrows(
KafkaException.class,
() -> new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())
);

assertEquals("Failed to construct kafka consumer", exception.getMessage());
}

private Properties requiredConsumerPropertiesAndGroupId(final String groupId) {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}

private Properties requiredConsumerProperties() {
final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
return props;
}

private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) {
// Uncompleted future that will time out if used
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
Expand Down
Loading

0 comments on commit cb624ef

Please sign in to comment.