Skip to content

Commit

Permalink
use most of the normal construction code for testing and migrate conf…
Browse files Browse the repository at this point in the history
…ig tests
  • Loading branch information
lucasbru committed Dec 7, 2023
1 parent 66a61d9 commit 1531135
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,25 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
this(
config,
keyDeserializer,
valueDeserializer,
Time.SYSTEM,
ApplicationEventHandler::new,
FetchCollector::new,
ConsumerMetadata::new
);
}

// Visible for testing
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Time time,
final ApplicationEventHandlerSupplier applicationEventHandlerFactory,
final FetchCollectorFactory<K, V> fetchCollectorFactory,
final ConsumerMetadataFactory metadataFactory) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
Expand All @@ -184,7 +203,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {

log.debug("Initializing the Kafka consumer");
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.time = time;
this.metrics = createMetrics(config, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

Expand All @@ -195,7 +214,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
interceptorList,
Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);

Expand Down Expand Up @@ -231,7 +250,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
metadata,
applicationEventQueue,
requestManagersSupplier);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
applicationEventQueue,
applicationEventProcessorSupplier,
Expand All @@ -246,7 +266,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig);

// The FetchCollector is only used on the application thread.
this.fetchCollector = new FetchCollector<>(logContext,
this.fetchCollector = fetchCollectorFactory.build(logContext,
metadata,
subscriptions,
fetchConfig,
Expand Down Expand Up @@ -386,6 +406,47 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
requestManagersSupplier);
}

// auxiliary interface for testing
interface ApplicationEventHandlerSupplier {

ApplicationEventHandler build(
final LogContext logContext,
final Time time,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final Supplier<RequestManagers> requestManagersSupplier
);

}

// auxiliary interface for testing
interface FetchCollectorFactory<K,V> {

FetchCollector<K,V> build(
final LogContext logContext,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final Deserializers<K, V> deserializers,
final FetchMetricsManager metricsManager,
final Time time
);

}

// auxiliary interface for testing
interface ConsumerMetadataFactory {

ConsumerMetadata build(
final ConsumerConfig config,
final SubscriptionState subscriptions,
final LogContext logContext,
final ClusterResourceListeners clusterResourceListeners
);

}

private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final ConsumerConfig config,
final GroupRebalanceConfig groupRebalanceConfig) {
final Optional<ConsumerGroupMetadata> groupMetadata = initializeGroupMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -44,7 +43,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -53,7 +51,6 @@
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -77,7 +74,6 @@
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -89,7 +85,6 @@
import org.mockito.ArgumentMatchers;
import org.mockito.MockedConstruction;
import org.mockito.stubbing.Answer;
import org.opentest4j.AssertionFailedError;

public class AsyncKafkaConsumerTest {

Expand Down Expand Up @@ -354,70 +349,6 @@ public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
}

@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

try (AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
} catch (final Exception exception) {
throw new AssertionFailedError("The following exception was not expected:", exception);
}
}

@Test
public void testGroupIdNotNullAndValid() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

try (AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) {
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
} catch (final Exception exception) {
throw new AssertionFailedError("The following exception was not expected:", exception);
}
}

@Test
public void testGroupIdEmpty() {
testInvalidGroupId("");
}

@Test
public void testGroupIdOnlyWhitespaces() {
testInvalidGroupId(" ");
}

private void testInvalidGroupId(final String groupId) {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
final ConsumerConfig config = new ConsumerConfig(props);

final Exception exception = assertThrows(
KafkaException.class,
() -> new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())
);

assertEquals("Failed to construct kafka consumer", exception.getMessage());
}

private Properties requiredConsumerProperties() {
final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
return props;
}

private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) {
// Uncompleted future that will time out if used
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -41,19 +43,17 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
Expand All @@ -66,10 +66,9 @@
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
Expand All @@ -85,12 +84,8 @@ public class AsyncKafkaConsumerUnitTest {
private AsyncKafkaConsumer<String, String> consumer = null;

private final Time time = new MockTime();
private final Deserializers<String, String> deserializers = mock(Deserializers.class);
private final FetchBuffer fetchBuffer = mock(FetchBuffer.class);
private final FetchCollector<String, String> fetchCollector = mock(FetchCollector.class);
private final ConsumerInterceptors<String, String> interceptors = mock(ConsumerInterceptors.class);
private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class);
private final BlockingQueue<BackgroundEvent> backgroundEventQueue = mock(BlockingQueue.class);
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);

@AfterEach
Expand All @@ -103,42 +98,91 @@ public void resetAll() {
}

private AsyncKafkaConsumer<String, String> setup() {
return setup("group-id");
Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
final ConsumerConfig config = new ConsumerConfig(props);
return setup(config);
}

private AsyncKafkaConsumer<String, String> setupWithEmptyGroupId() {
return setup("");
Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
final ConsumerConfig config = new ConsumerConfig(props);
return setup(config);
}

private AsyncKafkaConsumer<String, String> setup(String groupId) {
String clientId = "";
long retryBackoffMs = 100;
int defaultApiTimeoutMs = 100;
LogContext logContext = new LogContext();
SubscriptionState subscriptionState = new SubscriptionState(logContext, OffsetResetStrategy.LATEST);
private AsyncKafkaConsumer<String, String> setup(ConsumerConfig config) {
return new AsyncKafkaConsumer<>(
logContext,
clientId,
deserializers,
fetchBuffer,
fetchCollector,
interceptors,
config,
new StringDeserializer(),
new StringDeserializer(),
time,
applicationEventHandler,
backgroundEventQueue,
new Metrics(),
subscriptionState,
metadata,
retryBackoffMs,
defaultApiTimeoutMs,
Collections.singletonList(new RangeAssignor()),
groupId
(a,b,c,d,e,f) -> applicationEventHandler,
(a,b,c,d,e,f,g) -> fetchCollector,
(a,b,c,d) -> metadata
);
}

@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

consumer = setup(config);
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
}

@Test
public void testGroupIdNotNullAndValid() {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);

consumer = setup(config);
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
}

@Test
public void testGroupIdEmpty() {
testInvalidGroupId("");
}

@Test
public void testGroupIdOnlyWhitespaces() {
testInvalidGroupId(" ");
}

private void testInvalidGroupId(final String groupId) {
final Properties props = requiredConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
final ConsumerConfig config = new ConsumerConfig(props);

final Exception exception = assertThrows(
KafkaException.class,
() -> consumer = setup(config)
);

assertEquals("Failed to construct kafka consumer", exception.getMessage());
}

private Properties requiredConsumerProperties() {
final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
return props;
}

@Test
public void testInvalidGroupId() {
assertThrows(InvalidGroupIdException.class, this::setupWithEmptyGroupId);
KafkaException e = assertThrows(KafkaException.class, this::setupWithEmptyGroupId);
assertTrue(e.getCause() instanceof InvalidGroupIdException);
}

@Test
Expand Down

0 comments on commit 1531135

Please sign in to comment.