Skip to content

Commit

Permalink
Kafka health checks and topics replication (#774)
Browse files Browse the repository at this point in the history
* Factor in `min.insync.replicas` when calculating min node count

* Update unit tests
  • Loading branch information
guillermocalvo authored Jul 19, 2023
1 parent 536ba05 commit 872b0cf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
public class KafkaHealthIndicator implements HealthIndicator {

private static final String ID = "kafka";
private static final String MIN_INSYNC_REPLICAS_PROPERTY = "min.insync.replicas";
private static final String REPLICATION_PROPERTY = "offsets.topic.replication.factor";
private static final String DEFAULT_REPLICATION_PROPERTY = "default.replication.factor";
private final AdminClient adminClient;
Expand Down Expand Up @@ -80,6 +81,19 @@ public static int getClusterReplicationFactor(Config config) {
return ce != null ? Integer.parseInt(ce.value()) : Integer.MAX_VALUE;
}

/**
* Retrieve the cluster "min.insync.replicas" for the given {@link Config}, falling back to
* "offsets.topic.replication.factor" or "default.replication.factor" if required, in order to
* support Confluent Cloud hosted Kafka.
*
* @param config the cluster {@link Config}
* @return the optional cluster minimum number of replicas that must acknowledge a write
*/
public static int getMinNodeCount(Config config) {
return Optional.ofNullable(config.get(MIN_INSYNC_REPLICAS_PROPERTY)).map(ConfigEntry::value).map(Integer::parseInt)
.orElseGet(() -> getClusterReplicationFactor(config));
}

@Override
public Flux<HealthResult> getResult() {
DescribeClusterResult result = adminClient.describeCluster(
Expand All @@ -99,11 +113,11 @@ public Flux<HealthResult> getResult() {
Mono<Map<ConfigResource, Config>> configs = KafkaReactorUtil.fromKafkaFuture(configResult::all);
return configs.flux().switchMap(resources -> {
Config config = resources.get(configResource);
int replicationFactor = getClusterReplicationFactor(config);
int minNodeCount = getMinNodeCount(config);
return nodes.flux().switchMap(nodeList -> clusterId.map(clusterIdString -> {
int nodeCount = nodeList.size();
HealthResult.Builder builder;
if (nodeCount >= replicationFactor) {
if (nodeCount >= minNodeCount) {
builder = HealthResult.builder(ID, HealthStatus.UP);
} else {
builder = HealthResult.builder(ID, HealthStatus.DOWN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.kafka.clients.admin.ConfigEntry
import spock.lang.Unroll

import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.DEFAULT_REPLICATION_PROPERTY
import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.MIN_INSYNC_REPLICAS_PROPERTY
import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.REPLICATION_PROPERTY
import static io.micronaut.health.HealthStatus.DOWN
import static io.micronaut.health.HealthStatus.UP
Expand All @@ -25,7 +26,6 @@ class KafkaHealthIndicatorSpec extends AbstractKafkaSpec {
HealthResult result = healthIndicator.result.next().block()

then:
// report down because the not enough nodes to meet replication factor
result.status == UP
result.details.nodes == 1

Expand Down Expand Up @@ -75,6 +75,9 @@ class KafkaHealthIndicatorSpec extends AbstractKafkaSpec {
void "kafka health indicator handle missing replication factor config"() {
given:
Collection<ConfigEntry> configEntries = []
if (minReplicas) {
configEntries << new ConfigEntry(MIN_INSYNC_REPLICAS_PROPERTY, minReplicas)
}
if (offsetFactor) {
configEntries << new ConfigEntry(REPLICATION_PROPERTY, offsetFactor)
}
Expand All @@ -84,16 +87,20 @@ class KafkaHealthIndicatorSpec extends AbstractKafkaSpec {
Config config = new Config(configEntries)
when:
int replicationFactor = KafkaHealthIndicator.getClusterReplicationFactor(config)
int replicationFactor = KafkaHealthIndicator.getMinNodeCount(config)
then:
replicationFactor == expected
where:
offsetFactor | defaultFactor | expected
"10" | null | 10
"10" | "8" | 10
null | "8" | 8
null | null | Integer.MAX_VALUE
minReplicas | offsetFactor | defaultFactor | expected
"100" | "10" | null | 100
"100" | "10" | "8" | 100
"100" | null | "8" | 100
"100" | null | null | 100
null | "10" | null | 10
null | "10" | "8" | 10
null | null | "8" | 8
null | null | null | Integer.MAX_VALUE
}
}

0 comments on commit 872b0cf

Please sign in to comment.