From 31ddb2fc07dbb39efd3ebb60e16ab1467f01e2b6 Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Thu, 13 Jul 2023 17:19:24 +0200 Subject: [PATCH 1/2] Factor in `min.insync.replicas` when calculating min node count --- .../kafka/health/KafkaHealthIndicator.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java b/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java index 35a8e744b..c08f556a1 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java @@ -52,6 +52,7 @@ public class KafkaHealthIndicator implements HealthIndicator { private static final String ID = "kafka"; + private static final String MIN_INSYNC_REPLICAS_PROPERTY = "min.insync.replicas"; private static final String REPLICATION_PROPERTY = "offsets.topic.replication.factor"; private static final String DEFAULT_REPLICATION_PROPERTY = "default.replication.factor"; private final AdminClient adminClient; @@ -80,6 +81,19 @@ public static int getClusterReplicationFactor(Config config) { return ce != null ? Integer.parseInt(ce.value()) : Integer.MAX_VALUE; } + /** + * Retrieve the cluster "min.insync.replicas" for the given {@link Config}, falling back to + * "offsets.topic.replication.factor" or "default.replication.factor" if required, in order to + * support Confluent Cloud hosted Kafka. + * + * @param config the cluster {@link Config} + * @return the optional cluster minimum number of replicas that must acknowledge a write + */ + public static int getMinNodeCount(Config config) { + return Optional.ofNullable(config.get(MIN_INSYNC_REPLICAS_PROPERTY)).map(ConfigEntry::value).map(Integer::parseInt) + .orElseGet(() -> getClusterReplicationFactor(config)); + } + @Override public Flux getResult() { DescribeClusterResult result = adminClient.describeCluster( @@ -99,11 +113,11 @@ public Flux getResult() { Mono> configs = KafkaReactorUtil.fromKafkaFuture(configResult::all); return configs.flux().switchMap(resources -> { Config config = resources.get(configResource); - int replicationFactor = getClusterReplicationFactor(config); + int minNodeCount = getMinNodeCount(config); return nodes.flux().switchMap(nodeList -> clusterId.map(clusterIdString -> { int nodeCount = nodeList.size(); HealthResult.Builder builder; - if (nodeCount >= replicationFactor) { + if (nodeCount >= minNodeCount) { builder = HealthResult.builder(ID, HealthStatus.UP); } else { builder = HealthResult.builder(ID, HealthStatus.DOWN); From 2fb00058904989680b042392b5a44d2444e2a045 Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Thu, 13 Jul 2023 17:21:12 +0200 Subject: [PATCH 2/2] Update unit tests --- .../health/KafkaHealthIndicatorSpec.groovy | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy index d3986277b..0070d9e3a 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/health/KafkaHealthIndicatorSpec.groovy @@ -10,6 +10,7 @@ import org.apache.kafka.clients.admin.ConfigEntry import spock.lang.Unroll import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.DEFAULT_REPLICATION_PROPERTY +import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.MIN_INSYNC_REPLICAS_PROPERTY import static io.micronaut.configuration.kafka.health.KafkaHealthIndicator.REPLICATION_PROPERTY import static io.micronaut.health.HealthStatus.DOWN import static io.micronaut.health.HealthStatus.UP @@ -25,7 +26,6 @@ class KafkaHealthIndicatorSpec extends AbstractKafkaSpec { HealthResult result = healthIndicator.result.next().block() then: - // report down because the not enough nodes to meet replication factor result.status == UP result.details.nodes == 1 @@ -75,6 +75,9 @@ class KafkaHealthIndicatorSpec extends AbstractKafkaSpec { void "kafka health indicator handle missing replication factor config"() { given: Collection configEntries = [] + if (minReplicas) { + configEntries << new ConfigEntry(MIN_INSYNC_REPLICAS_PROPERTY, minReplicas) + } if (offsetFactor) { configEntries << new ConfigEntry(REPLICATION_PROPERTY, offsetFactor) } @@ -84,16 +87,20 @@ class KafkaHealthIndicatorSpec extends AbstractKafkaSpec { Config config = new Config(configEntries) when: - int replicationFactor = KafkaHealthIndicator.getClusterReplicationFactor(config) + int replicationFactor = KafkaHealthIndicator.getMinNodeCount(config) then: replicationFactor == expected where: - offsetFactor | defaultFactor | expected - "10" | null | 10 - "10" | "8" | 10 - null | "8" | 8 - null | null | Integer.MAX_VALUE + minReplicas | offsetFactor | defaultFactor | expected + "100" | "10" | null | 100 + "100" | "10" | "8" | 100 + "100" | null | "8" | 100 + "100" | null | null | 100 + null | "10" | null | 10 + null | "10" | "8" | 10 + null | null | "8" | 8 + null | null | null | Integer.MAX_VALUE } }