Skip to content

Commit

Permalink
Prevent MaintenanceEventTopicReader from prematurely closing the admi…
Browse files Browse the repository at this point in the history
…nClient. (apache#1362)
  • Loading branch information
efeg authored and Adem Efe Gencer committed Oct 26, 2020
1 parent 877eeee commit 1d71362
Showing 1 changed file with 5 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import java.time.Duration;
Expand Down Expand Up @@ -310,16 +309,12 @@ protected static int maintenanceEventTopicPartitionCount(Map<String, ?> config)

protected void ensureTopicCreated(Map<String, ?> config) {
AdminClient adminClient = _kafkaCruiseControl.adminClient();
try {
short replicationFactor = maintenanceEventTopicReplicationFactor(config, adminClient);
long retentionMs = maintenanceEventTopicRetentionMs(config);
int partitionCount = maintenanceEventTopicPartitionCount(config);
short replicationFactor = maintenanceEventTopicReplicationFactor(config, adminClient);
long retentionMs = maintenanceEventTopicRetentionMs(config);
int partitionCount = maintenanceEventTopicPartitionCount(config);

NewTopic maintenanceEventTopic = wrapTopic(_maintenanceEventTopic, partitionCount, replicationFactor, retentionMs);
maybeCreateOrUpdateTopic(adminClient, maintenanceEventTopic);
} finally {
KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient);
}
NewTopic maintenanceEventTopic = wrapTopic(_maintenanceEventTopic, partitionCount, replicationFactor, retentionMs);
maybeCreateOrUpdateTopic(adminClient, maintenanceEventTopic);
}

/**
Expand Down

0 comments on commit 1d71362

Please sign in to comment.