Skip to content

Commit

Permalink
Merge pull request #711 from DependencyTrack/ensure-topics-timeout
Browse files Browse the repository at this point in the history
Add timeout for Kafka API describeTopics commands
  • Loading branch information
nscuro authored Jun 11, 2024
2 parents 67f45f0 + e867543 commit 19e4f91
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.github.resilience4j.core.IntervalFunction;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -195,9 +196,9 @@ public void close() {

private void ensureTopicsExist() {
final List<String> topicNames = managedProcessors.values().stream().map(ManagedProcessor::topic).toList();
LOGGER.debug("Ensuring existence of topics: %s".formatted(topicNames));
LOGGER.info("Verifying existence of subscribed topics: %s".formatted(topicNames));

final DescribeTopicsResult topicsResult = adminClient.describeTopics(topicNames);
final DescribeTopicsResult topicsResult = adminClient.describeTopics(topicNames, new DescribeTopicsOptions().timeoutMs(3_000));
final var exceptionsByTopicName = new HashMap<String, Throwable>();
for (final Map.Entry<String, KafkaFuture<TopicDescription>> entry : topicsResult.topicNameValues().entrySet()) {
final String topicName = entry.getKey();
Expand All @@ -224,7 +225,7 @@ private void ensureTopicsExist() {

private int getTopicPartitionCount(final String topicName) {
LOGGER.debug("Determining partition count of topic %s".formatted(topicName));
final DescribeTopicsResult topicsResult = adminClient.describeTopics(List.of(topicName));
final DescribeTopicsResult topicsResult = adminClient.describeTopics(List.of(topicName), new DescribeTopicsOptions().timeoutMs(3_000));
final KafkaFuture<TopicDescription> topicDescriptionFuture = topicsResult.topicNameValues().get(topicName);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class ProcessorManagerTest {

@Rule
public RedpandaContainer kafkaContainer = new RedpandaContainer(DockerImageName
.parse("docker.redpanda.com/vectorized/redpanda:v23.3.13"));
.parse("docker.redpanda.com/vectorized/redpanda:v24.1.7"));

private ExternalKafkaCluster kafka;
private Config configMock;
Expand Down

0 comments on commit 19e4f91

Please sign in to comment.