diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f80942dd7..377df31bd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ On the client `accessTokenLocation`, `clientAssertion`, `clientAssertionLocation`, `clientAssertionType`, and `saslExtensions` have been added. * Add support for custom Cruise Control API users * Update HTTP bridge to latest 0.30.0 release +* Unregistration of KRaft nodes after scale-down ### Changes, deprecations and removals diff --git a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaStatus.java b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaStatus.java index 8897701254..038bbe25e1 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaStatus.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaStatus.java @@ -23,12 +23,13 @@ builderPackage = Constants.FABRIC8_KUBERNETES_API ) @JsonInclude(JsonInclude.Include.NON_NULL) -@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion", "kafkaMetadataState" }) +@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "registeredNodeIds", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion", "kafkaMetadataState" }) @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) public class KafkaStatus extends Status { private List listeners; private List kafkaNodePools; + private List registeredNodeIds; private String clusterId; private String operatorLastSuccessfulVersion; @@ -54,6 +55,16 @@ public void setKafkaNodePools(List kafkaNodePools) { this.kafkaNodePools = kafkaNodePools; } + @Description("Registered node IDs used by this Kafka cluster. " + + "This field is used for internal purposes only and will be removed in the future.") + public List getRegisteredNodeIds() { + return registeredNodeIds; + } + + public void setRegisteredNodeIds(List registeredNodeIds) { + this.registeredNodeIds = registeredNodeIds; + } + @Description("Kafka cluster Id") public String getClusterId() { return clusterId; diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java index f72f5555ad..afdeac4aed 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java @@ -186,6 +186,12 @@ public Future createOrUpdate(Reconciliation reconciliation, Kafka k // Copy the metadata state if needed status.setKafkaMetadataState(kafkaAssembly.getStatus().getKafkaMetadataState()); } + + if (status.getRegisteredNodeIds() == null + && kafkaAssembly.getStatus().getRegisteredNodeIds() != null) { + // Copy the list of registered node IDs if needed + status.setRegisteredNodeIds(kafkaAssembly.getStatus().getRegisteredNodeIds()); + } } if (reconcileResult.succeeded()) { diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaNodeUnregistration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaNodeUnregistration.java new file mode 100644 index 0000000000..ccf215c78f --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaNodeUnregistration.java @@ -0,0 +1,101 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.operator.assembly; + +import io.strimzi.api.kafka.model.kafka.KafkaResources; +import io.strimzi.operator.cluster.model.KafkaCluster; +import io.strimzi.operator.cluster.operator.VertxUtil; +import io.strimzi.operator.common.AdminClientProvider; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.ReconciliationLogger; +import io.strimzi.operator.common.auth.PemAuthIdentity; +import io.strimzi.operator.common.auth.PemTrustSet; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; + +import java.util.ArrayList; +import java.util.List; + +/** + * Contains utility methods for unregistering KRaft nodes from a Kafka cluster after scale-down + */ +public class KafkaNodeUnregistration { + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaNodeUnregistration.class.getName()); + + /** + * Unregisters Kafka nodes from a KRaft-based Kafka cluster + * + * @param reconciliation Reconciliation marker + * @param vertx Vert.x instance + * @param adminClientProvider Kafka Admin API client provider + * @param pemTrustSet Trust set for the admin client to connect to the Kafka cluster + * @param pemAuthIdentity Key set for the admin client to connect to the Kafka cluster + * @param nodeIdsToUnregister List of node IDs that should be unregistered + * + * @return Future that completes when all nodes are unregistered + */ + public static Future unregisterNodes( + Reconciliation reconciliation, + Vertx vertx, + AdminClientProvider adminClientProvider, + PemTrustSet pemTrustSet, + PemAuthIdentity pemAuthIdentity, + List nodeIdsToUnregister + ) { + try { + String bootstrapHostname = KafkaResources.bootstrapServiceName(reconciliation.name()) + "." + reconciliation.namespace() + ".svc:" + KafkaCluster.REPLICATION_PORT; + Admin adminClient = adminClientProvider.createAdminClient(bootstrapHostname, pemTrustSet, pemAuthIdentity); + + List> futures = new ArrayList<>(); + for (Integer nodeId : nodeIdsToUnregister) { + futures.add(unregisterNode(reconciliation, vertx, adminClient, nodeId)); + } + + return Future.all(futures) + .eventually(() -> { + adminClient.close(); + return Future.succeededFuture(); + }) + .map((Void) null); + } catch (KafkaException e) { + LOGGER.warnCr(reconciliation, "Failed to unregister nodes", e); + return Future.failedFuture(e); + } + } + + /** + * Unregisters a single Kafka node using the Kafka Admin API. In case the failure is caused by the node not being + * registered, the error will be ignored. + * + * @param reconciliation Reconciliation marker + * @param vertx Vert.x instance + * @param adminClient Kafka Admin API client instance + * @param nodeIdToUnregister ID of the node that should be unregistered + * + * @return Future that completes when the node is unregistered + */ + private static Future unregisterNode(Reconciliation reconciliation, Vertx vertx, Admin adminClient, Integer nodeIdToUnregister) { + LOGGER.debugCr(reconciliation, "Unregistering node {} from the Kafka cluster", nodeIdToUnregister); + + return VertxUtil + .kafkaFutureToVertxFuture(reconciliation, vertx, adminClient.unregisterBroker(nodeIdToUnregister).all()) + .recover(t -> { + if (t instanceof BrokerIdNotRegisteredException) { + // The broker is not registered anymore, so it does not need to be unregistered anymore and we + // report success. Situation like this might happen when the operator fails before updating the + // status, when the Kafka API call fails (e.g. due to network connection) but the unregistration + // was done on the Kafka cluster and similar. + LOGGER.warnCr(reconciliation, "Node {} is not registered and cannot be unregistered from the Kafka cluster", nodeIdToUnregister, t); + return Future.succeededFuture(); + } else { + LOGGER.warnCr(reconciliation, "Failed to unregister node {} from the Kafka cluster", nodeIdToUnregister, t); + return Future.failedFuture(t); + } + }); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index d362f14dd1..25b1cd991d 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -80,6 +80,7 @@ import io.strimzi.operator.common.model.StatusDiff; import io.strimzi.operator.common.operator.resource.ReconcileResult; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.KafkaException; @@ -120,6 +121,7 @@ public class KafkaReconciler { private final PlatformFeaturesAvailability pfa; private final ImagePullPolicy imagePullPolicy; private final List imagePullSecrets; + private final List previousNodeIds; // Objects used during the reconciliation /* test */ final Reconciliation reconciliation; @@ -210,6 +212,7 @@ public KafkaReconciler( this.pfa = pfa; this.imagePullPolicy = config.getImagePullPolicy(); this.imagePullSecrets = config.getImagePullSecrets(); + this.previousNodeIds = kafkaCr.getStatus() != null ? kafkaCr.getStatus().getRegisteredNodeIds() : null; this.stsOperator = supplier.stsOperations; this.strimziPodSetOperator = supplier.strimziPodSetOperator; @@ -268,6 +271,7 @@ public Future reconcile(KafkaStatus kafkaStatus, Clock clock) { .compose(i -> headlessServiceEndpointsReady()) .compose(i -> clusterId(kafkaStatus)) .compose(i -> defaultKafkaQuotas()) + .compose(i -> nodeUnregistration(kafkaStatus)) .compose(i -> metadataVersion(kafkaStatus)) .compose(i -> deletePersistentClaims()) .compose(i -> sharedKafkaConfigurationCleanup()) @@ -939,9 +943,61 @@ protected Future defaultKafkaQuotas() { return DefaultKafkaQuotasManager.reconcileDefaultUserQuotas(reconciliation, vertx, adminClientProvider, this.coTlsPemIdentity.pemTrustSet(), this.coTlsPemIdentity.pemAuthIdentity(), kafka.quotas()); } + /** + * Unregisters the KRaft nodes that were removed from the Kafka cluster + * + * @param kafkaStatus Kafka status for updating the list of currently registered node IDs + * + * @return Future which completes when the nodes removed from the Kafka cluster are unregistered + */ + protected Future nodeUnregistration(KafkaStatus kafkaStatus) { + List currentNodeIds = kafka.nodes().stream().map(NodeRef::nodeId).sorted().toList(); + + if (kafkaMetadataStateManager.getMetadataConfigurationState().isKRaft() + && previousNodeIds != null + && !new HashSet<>(currentNodeIds).containsAll(previousNodeIds)) { + // We are in KRaft mode and there are some nodes that were removed => we should unregister them + List nodeIdsToUnregister = new ArrayList<>(previousNodeIds); + nodeIdsToUnregister.removeAll(currentNodeIds); + + LOGGER.infoCr(reconciliation, "Kafka nodes {} were removed from the Kafka cluster and will be unregistered", nodeIdsToUnregister); + + Promise unregistrationPromise = Promise.promise(); + KafkaNodeUnregistration.unregisterNodes(reconciliation, vertx, adminClientProvider, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity(), nodeIdsToUnregister) + .onComplete(res -> { + if (res.succeeded()) { + LOGGER.infoCr(reconciliation, "Kafka nodes {} were successfully unregistered from the Kafka cluster", nodeIdsToUnregister); + kafkaStatus.setRegisteredNodeIds(currentNodeIds); + } else { + LOGGER.warnCr(reconciliation, "Failed to unregister Kafka nodes {} from the Kafka cluster", nodeIdsToUnregister); + + // When the unregistration failed, we will keep the original registered node IDs to retry + // the unregistration for them. But we will merge it with any existing node IDs to make + // sure we do not lose track of them. + Set updatedNodeIds = new HashSet<>(currentNodeIds); + updatedNodeIds.addAll(previousNodeIds); + kafkaStatus.setRegisteredNodeIds(updatedNodeIds.stream().sorted().toList()); + } + + // We complete the promise with success even if the unregistration failed as we do not want to + // fail the reconciliation. + unregistrationPromise.complete(); + }); + + return unregistrationPromise.future(); + } else { + // We are either not in KRaft mode, or at a cluster without any information about previous nodes, or without + // any change to the nodes => we just update the status field + kafkaStatus.setRegisteredNodeIds(currentNodeIds); + return Future.succeededFuture(); + } + } + /** * Manages the KRaft metadata version * + * @param kafkaStatus Kafka status used for updating the currently used metadata version + * * @return Future which completes when the KRaft metadata version is set to the current version or updated. */ protected Future metadataVersion(KafkaStatus kafkaStatus) { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java index 55f28f8b61..16ddd68b79 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java @@ -106,6 +106,7 @@ import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.admin.UnregisterBrokerResult; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -591,6 +592,11 @@ public static Admin adminClient() { when(mock.describeClientQuotas(any())).thenReturn(dcqr); + // Mock KRaft node unregistration + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + when(mock.unregisterBroker(anyInt())).thenReturn(ubr); + return mock; } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java index fcb19931be..62f25ad8ec 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorKRaftMockTest.java @@ -268,6 +268,8 @@ private void basicCheck() { assertThat(k.getStatus().getOperatorLastSuccessfulVersion(), is(KafkaAssemblyOperator.OPERATOR_VERSION)); assertThat(k.getStatus().getKafkaMetadataState().toValue(), is("KRaft")); assertThat(k.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers"))); + assertThat(k.getStatus().getRegisteredNodeIds().size(), is(6)); + assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12)); } /** @@ -562,6 +564,11 @@ public void testReconcileKafkaScaleUpAndDown(VertxTestContext context) { assertThat(brokers.getStatus().getRoles().size(), is(1)); assertThat(brokers.getStatus().getRoles(), hasItems(ProcessRoles.BROKER)); + // Check the Kafka resource status + Kafka k = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get(); + assertThat(k.getStatus().getRegisteredNodeIds().size(), is(8)); + assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12, 13, 14)); + // Scale down again Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("brokers").edit(p -> new KafkaNodePoolBuilder(p) .editSpec() @@ -588,6 +595,11 @@ public void testReconcileKafkaScaleUpAndDown(VertxTestContext context) { assertThat(brokers.getStatus().getRoles().size(), is(1)); assertThat(brokers.getStatus().getRoles(), hasItems(ProcessRoles.BROKER)); + // Check the Kafka resource status + Kafka k = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get(); + assertThat(k.getStatus().getRegisteredNodeIds().size(), is(6)); + assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12)); + async.flag(); }))); } @@ -642,6 +654,8 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) { Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get(); assertThat(kafka.getStatus().getKafkaNodePools().size(), is(3)); assertThat(kafka.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), hasItems("brokers", "controllers", "new-pool")); + assertThat(kafka.getStatus().getRegisteredNodeIds().size(), is(9)); + assertThat(kafka.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12, 13, 14, 15)); KafkaNodePool controllers = Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("controllers").get(); assertThat(controllers.getStatus().getReplicas(), is(3)); @@ -676,6 +690,8 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) { Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get(); assertThat(kafka.getStatus().getKafkaNodePools().size(), is(2)); assertThat(kafka.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), hasItems("brokers", "controllers")); + assertThat(kafka.getStatus().getRegisteredNodeIds().size(), is(6)); + assertThat(kafka.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12)); KafkaNodePool controllers = Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("controllers").get(); assertThat(controllers.getStatus().getReplicas(), is(3)); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java index 04fe886ace..dcae07ff6f 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java @@ -57,6 +57,7 @@ import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.InvalidConfigurationException; import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.auth.TlsPemIdentity; import io.strimzi.operator.common.model.ClientsCa; import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.PasswordGenerator; @@ -69,6 +70,11 @@ import io.vertx.junit5.Checkpoint; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.UnregisterBrokerResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -91,10 +97,15 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -269,7 +280,6 @@ public void testRegularReconciliation(VertxTestContext context) { CrdOperator mockKafkaOps = supplier.kafkaOperator; when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); - when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; @@ -382,7 +392,6 @@ public void testFirstReconciliation(VertxTestContext context) { CrdOperator mockKafkaOps = supplier.kafkaOperator; when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); - when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; @@ -538,6 +547,11 @@ public void testScaleUp(VertxTestContext context) { .withReplicas(2) .endSpec() .build(); + Kafka kafkaWithStatus = new KafkaBuilder(KAFKA) + .withNewStatus() + .withRegisteredNodeIds(0, 1, 2, 3, 4) + .endStatus() + .build(); List oldPools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, KAFKA, List.of(CONTROLLERS, oldBrokersPool), Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, true, SHARED_ENV_PROVIDER); KafkaCluster oldKafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, KAFKA, oldPools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.KRAFT, null, SHARED_ENV_PROVIDER); @@ -586,21 +600,24 @@ public void testScaleUp(VertxTestContext context) { when(mockPodOps.readiness(any(), any(), any(), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaOps = supplier.kafkaOperator; - when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); - when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); - when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(kafkaWithStatus)); + ArgumentCaptor kafkaStatusCaptor = ArgumentCaptor.forClass(Kafka.class); + when(mockKafkaOps.updateStatusAsync(any(), kafkaStatusCaptor.capture())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; ArgumentCaptor kafkaNodePoolStatusCaptor = ArgumentCaptor.forClass(KafkaNodePool.class); when(mockKafkaNodePoolOps.updateStatusAsync(any(), kafkaNodePoolStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + // Used for the check that there were no interactions + Admin mockAdmin = supplier.adminClientProvider.createAdminClient(null, null, null, null); + MockKafkaReconciler kr = new MockKafkaReconciler( new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), vertx, CONFIG, supplier, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), - KAFKA, + kafkaWithStatus, List.of(CONTROLLERS, BROKERS), KAFKA_CLUSTER, CLUSTER_CA, @@ -646,6 +663,13 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER)); assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers"))); + assertThat(kafkaStatusCaptor.getAllValues().size(), is(1)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds().size(), is(6)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 3, 4, 5)); + + // Check node unregistrations + verify(mockAdmin, never()).unregisterBroker(anyInt()); + async.flag(); }))); } @@ -663,9 +687,14 @@ public void testScaleDown(VertxTestContext context) { .withReplicas(5) .endSpec() .build(); + Kafka kafkaWithStatus = new KafkaBuilder(KAFKA) + .withNewStatus() + .withRegisteredNodeIds(0, 1, 2, 3, 4, 5, 6, 7) + .endStatus() + .build(); - List oldPools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, KAFKA, List.of(CONTROLLERS, oldBrokersPool), Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, true, SHARED_ENV_PROVIDER); - KafkaCluster oldKafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, KAFKA, oldPools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.KRAFT, null, SHARED_ENV_PROVIDER); + List oldPools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafkaWithStatus, List.of(CONTROLLERS, oldBrokersPool), Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, true, SHARED_ENV_PROVIDER); + KafkaCluster oldKafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaWithStatus, oldPools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.KRAFT, null, SHARED_ENV_PROVIDER); List oldKafkaPodSets = oldKafkaCluster.generatePodSets(false, null, null, brokerId -> null); ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); @@ -714,21 +743,191 @@ public void testScaleDown(VertxTestContext context) { when(mockPodOps.waitFor(any(), any(), any(), any(), anyLong(), anyLong(), any())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaOps = supplier.kafkaOperator; - when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); - when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); - when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(kafkaWithStatus)); + ArgumentCaptor kafkaStatusCaptor = ArgumentCaptor.forClass(Kafka.class); + when(mockKafkaOps.updateStatusAsync(any(), kafkaStatusCaptor.capture())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; ArgumentCaptor kafkaNodePoolStatusCaptor = ArgumentCaptor.forClass(KafkaNodePool.class); when(mockKafkaNodePoolOps.updateStatusAsync(any(), kafkaNodePoolStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + Admin mockAdmin = supplier.adminClientProvider.createAdminClient(null, null, null, null); + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + ArgumentCaptor unregisteredNodeIdCaptor = ArgumentCaptor.forClass(Integer.class); + when(mockAdmin.unregisterBroker(unregisteredNodeIdCaptor.capture())).thenReturn(ubr); + MockKafkaReconciler kr = new MockKafkaReconciler( new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), vertx, CONFIG, supplier, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), - KAFKA, + kafkaWithStatus, + List.of(CONTROLLERS, BROKERS), + KAFKA_CLUSTER, + CLUSTER_CA, + CLIENTS_CA); + + MockKafkaAssemblyOperator kao = new MockKafkaAssemblyOperator( + vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + CERT_MANAGER, + PASSWORD_GENERATOR, + supplier, + CONFIG, + kr); + + Checkpoint async = context.checkpoint(); + kao.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + // Scale-down of Kafka is done in one go => we should see two invocations (first from scale-down and second from regular patching) + assertThat(kafkaPodSetCaptor.getAllValues().size(), is(1)); + assertThat(kafkaPodSetCaptor.getAllValues().get(0).getSpec().getPods().size(), is(3)); // => first capture is from kafkaScaleDown() with new replica count + assertThat(kafkaPodSetBatchCaptor.getAllValues().size(), is(1)); + assertThat(kafkaPodSetBatchCaptor.getAllValues().get(0).get(0).getSpec().getPods().size(), is(3)); // => The unchanged controllers pool + assertThat(kafkaPodSetBatchCaptor.getAllValues().get(0).get(1).getSpec().getPods().size(), is(3)); // => second capture is from kafkaPodSet() again with new replica count + + // Still one maybe-roll invocation + assertThat(kr.maybeRollKafkaInvocations, is(1)); + + // CMs for all remaining pods + the old shared config CM are reconciled + assertThat(cmReconciliationCaptor.getAllValues().size(), is(7)); + assertThat(cmReconciliationCaptor.getAllValues(), is(List.of("my-cluster-controllers-0", "my-cluster-controllers-1", "my-cluster-controllers-2", "my-cluster-brokers-3", "my-cluster-brokers-4", "my-cluster-brokers-5", "my-cluster-kafka-config"))); + + // The CMs for scaled down pods are deleted + assertThat(cmDeletionCaptor.getAllValues().size(), is(2)); + assertThat(cmDeletionCaptor.getAllValues(), is(List.of("my-cluster-brokers-6", "my-cluster-brokers-7"))); + + // Check statuses + assertThat(kafkaNodePoolStatusCaptor.getAllValues().size(), is(2)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2))); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.CONTROLLER)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(3)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4, 5))); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER)); + assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers"))); + + // Check Kafka status + assertThat(kafkaStatusCaptor.getAllValues().size(), is(1)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds().size(), is(6)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 3, 4, 5)); + + // Check node unregistrations + verify(mockAdmin, times(2)).unregisterBroker(anyInt()); + assertThat(unregisteredNodeIdCaptor.getAllValues().size(), is(2)); + assertThat(unregisteredNodeIdCaptor.getAllValues(), hasItems(6, 7)); + + async.flag(); + }))); + } + + /** + * Tests reconciliation with scale-down from 5 to 3 Kafka brokers with node unregistration failure. In such case, + * the reconciliation should proceed, but the nodes in the status should not be not updated so that the + * unregistration is retried in next reconciliation. + * + * @param context Test context + */ + @Test + @SuppressWarnings({"checkstyle:MethodLength"}) + public void testScaleDownWithUnregistrationFailure(VertxTestContext context) { + KafkaNodePool oldBrokersPool = new KafkaNodePoolBuilder(BROKERS) + .editSpec() + .withReplicas(5) + .endSpec() + .build(); + Kafka kafkaWithStatus = new KafkaBuilder(KAFKA) + .withNewStatus() + .withRegisteredNodeIds(0, 1, 2, 3, 4, 5, 6, 7) + .endStatus() + .build(); + + List oldPools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafkaWithStatus, List.of(CONTROLLERS, oldBrokersPool), Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, true, SHARED_ENV_PROVIDER); + KafkaCluster oldKafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaWithStatus, oldPools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.KRAFT, null, SHARED_ENV_PROVIDER); + List oldKafkaPodSets = oldKafkaCluster.generatePodSets(false, null, null, brokerId -> null); + + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + when(secretOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + when(secretOps.getAsync(any(), any())).thenReturn(Future.succeededFuture(new Secret())); + when(secretOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + + ConfigMapOperator mockCmOps = supplier.configMapOperations; + when(mockCmOps.listAsync(any(), eq(oldKafkaCluster.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaCluster.generatePerBrokerConfigurationConfigMaps(new MetricsAndLogging(null, null), ADVERTISED_HOSTNAMES, ADVERTISED_PORTS))); + ArgumentCaptor cmReconciliationCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.reconcile(any(), any(), cmReconciliationCaptor.capture(), any())).thenReturn(Future.succeededFuture()); + ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; + // Kafka + when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaPodSets)); + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> kafkaPodSetBatchCaptor = ArgumentCaptor.forClass(List.class); + when(mockPodSetOps.batchReconcile(any(), any(), kafkaPodSetBatchCaptor.capture(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenAnswer(i -> { + List podSets = i.getArgument(2); + HashMap> result = new HashMap<>(); + + for (StrimziPodSet podSet : podSets) { + result.put(podSet.getMetadata().getName(), ReconcileResult.noop(podSet)); + } + + return Future.succeededFuture(result); + }); + ArgumentCaptor kafkaPodSetCaptor = ArgumentCaptor.forClass(StrimziPodSet.class); + when(mockPodSetOps.reconcile(any(), any(), startsWith("my-cluster-brokers"), kafkaPodSetCaptor.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.noop(i.getArgument(3)))); + + StatefulSetOperator mockStsOps = supplier.stsOperations; + when(mockStsOps.getAsync(any(), eq(KAFKA_CLUSTER.getComponentName()))).thenReturn(Future.succeededFuture(null)); // Kafka STS is queried and deleted if it still exists + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(Collections.emptyList())); + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(Collections.emptyList())); + when(mockPodOps.readiness(any(), any(), any(), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); + when(mockPodOps.waitFor(any(), any(), any(), any(), anyLong(), anyLong(), any())).thenReturn(Future.succeededFuture()); + + CrdOperator mockKafkaOps = supplier.kafkaOperator; + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(kafkaWithStatus)); + ArgumentCaptor kafkaStatusCaptor = ArgumentCaptor.forClass(Kafka.class); + when(mockKafkaOps.updateStatusAsync(any(), kafkaStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + + CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; + ArgumentCaptor kafkaNodePoolStatusCaptor = ArgumentCaptor.forClass(KafkaNodePool.class); + when(mockKafkaNodePoolOps.updateStatusAsync(any(), kafkaNodePoolStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + + Admin mockAdmin = supplier.adminClientProvider.createAdminClient(null, null, null, null); + ArgumentCaptor unregisteredNodeIdCaptor = ArgumentCaptor.forClass(Integer.class); + when(mockAdmin.unregisterBroker(unregisteredNodeIdCaptor.capture())).thenAnswer(i -> { + if (i.getArgument(0, Integer.class) == 6) { + KafkaFutureImpl unregistrationFuture = new KafkaFutureImpl<>(); + unregistrationFuture.completeExceptionally(new TimeoutException("Fake timeout")); + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(unregistrationFuture); + return ubr; + } else { + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + return ubr; + } + }); + + MockKafkaReconciler kr = new MockKafkaReconciler( + new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), + vertx, + CONFIG, + supplier, + new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + kafkaWithStatus, List.of(CONTROLLERS, BROKERS), KAFKA_CLUSTER, CLUSTER_CA, @@ -777,6 +976,176 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER)); assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers"))); + // Check Kafka status + assertThat(kafkaStatusCaptor.getAllValues().size(), is(1)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds().size(), is(8)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 3, 4, 5, 6, 7)); + + // Check node unregistrations + verify(mockAdmin, times(2)).unregisterBroker(anyInt()); + assertThat(unregisteredNodeIdCaptor.getAllValues().size(), is(2)); + assertThat(unregisteredNodeIdCaptor.getAllValues(), hasItems(6, 7)); + + async.flag(); + }))); + } + + /** + * Tests reconciliation where node unregistration failed while new nodes are being added. It checks that the new + * nodes are added to the registered nodes. + * + * @param context Test context + */ + @Test + @SuppressWarnings({"checkstyle:MethodLength"}) + public void testUnregistrationFailureWithScaleUp(VertxTestContext context) { + KafkaNodePool oldBrokersPool = new KafkaNodePoolBuilder(BROKERS) + .editSpec() + .withReplicas(2) + .endSpec() + .build(); + Kafka kafkaWithStatus = new KafkaBuilder(KAFKA) + .withNewStatus() + .withRegisteredNodeIds(0, 1, 2, 3, 4, 1874, 1919) + .endStatus() + .build(); + + List oldPools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafkaWithStatus, List.of(CONTROLLERS, oldBrokersPool), Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, true, SHARED_ENV_PROVIDER); + KafkaCluster oldKafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaWithStatus, oldPools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.KRAFT, null, SHARED_ENV_PROVIDER); + List oldKafkaPodSets = oldKafkaCluster.generatePodSets(false, null, null, brokerId -> null); + + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + when(secretOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + when(secretOps.getAsync(any(), any())).thenReturn(Future.succeededFuture(new Secret())); + when(secretOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + + ConfigMapOperator mockCmOps = supplier.configMapOperations; + when(mockCmOps.listAsync(any(), eq(oldKafkaCluster.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaCluster.generatePerBrokerConfigurationConfigMaps(new MetricsAndLogging(null, null), ADVERTISED_HOSTNAMES, ADVERTISED_PORTS))); + ArgumentCaptor cmReconciliationCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.reconcile(any(), any(), cmReconciliationCaptor.capture(), any())).thenReturn(Future.succeededFuture()); + ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; + // Kafka + when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaPodSets)); + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> kafkaPodSetBatchCaptor = ArgumentCaptor.forClass(List.class); + when(mockPodSetOps.batchReconcile(any(), any(), kafkaPodSetBatchCaptor.capture(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenAnswer(i -> { + List podSets = i.getArgument(2); + HashMap> result = new HashMap<>(); + + for (StrimziPodSet podSet : podSets) { + result.put(podSet.getMetadata().getName(), ReconcileResult.noop(podSet)); + } + + return Future.succeededFuture(result); + }); + ArgumentCaptor kafkaPodSetCaptor = ArgumentCaptor.forClass(StrimziPodSet.class); + when(mockPodSetOps.reconcile(any(), any(), startsWith("my-cluster-brokers"), kafkaPodSetCaptor.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.noop(i.getArgument(3)))); + + StatefulSetOperator mockStsOps = supplier.stsOperations; + when(mockStsOps.getAsync(any(), eq(KAFKA_CLUSTER.getComponentName()))).thenReturn(Future.succeededFuture(null)); // Kafka STS is queried and deleted if it still exists + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(Collections.emptyList())); + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(Collections.emptyList())); + when(mockPodOps.readiness(any(), any(), any(), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); + when(mockPodOps.waitFor(any(), any(), any(), any(), anyLong(), anyLong(), any())).thenReturn(Future.succeededFuture()); + + CrdOperator mockKafkaOps = supplier.kafkaOperator; + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(kafkaWithStatus)); + ArgumentCaptor kafkaStatusCaptor = ArgumentCaptor.forClass(Kafka.class); + when(mockKafkaOps.updateStatusAsync(any(), kafkaStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + + CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; + ArgumentCaptor kafkaNodePoolStatusCaptor = ArgumentCaptor.forClass(KafkaNodePool.class); + when(mockKafkaNodePoolOps.updateStatusAsync(any(), kafkaNodePoolStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + + Admin mockAdmin = supplier.adminClientProvider.createAdminClient(null, null, null, null); + ArgumentCaptor unregisteredNodeIdCaptor = ArgumentCaptor.forClass(Integer.class); + when(mockAdmin.unregisterBroker(unregisteredNodeIdCaptor.capture())).thenAnswer(i -> { + if (i.getArgument(0, Integer.class) == 1919) { + KafkaFutureImpl unregistrationFuture = new KafkaFutureImpl<>(); + unregistrationFuture.completeExceptionally(new TimeoutException("Fake timeout")); + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(unregistrationFuture); + return ubr; + } else { + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + return ubr; + } + }); + + MockKafkaReconciler kr = new MockKafkaReconciler( + new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), + vertx, + CONFIG, + supplier, + new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + kafkaWithStatus, + List.of(CONTROLLERS, BROKERS), + KAFKA_CLUSTER, + CLUSTER_CA, + CLIENTS_CA); + + MockKafkaAssemblyOperator kao = new MockKafkaAssemblyOperator( + vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + CERT_MANAGER, + PASSWORD_GENERATOR, + supplier, + CONFIG, + kr); + + Checkpoint async = context.checkpoint(); + kao.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + // Scale-up of Kafka is done in one go => we should see two invocations from regular patching + assertThat(kafkaPodSetBatchCaptor.getAllValues().size(), is(1)); + assertThat(kafkaPodSetBatchCaptor.getAllValues().get(0).get(0).getSpec().getPods().size(), is(3)); + assertThat(kafkaPodSetBatchCaptor.getAllValues().get(0).get(1).getSpec().getPods().size(), is(3)); + + // Still one maybe-roll invocation + assertThat(kr.maybeRollKafkaInvocations, is(1)); + + // CMs for all pods are reconciled + assertThat(cmReconciliationCaptor.getAllValues().size(), is(7)); + assertThat(cmReconciliationCaptor.getAllValues(), is(List.of("my-cluster-controllers-0", "my-cluster-controllers-1", "my-cluster-controllers-2", "my-cluster-brokers-3", "my-cluster-brokers-4", "my-cluster-brokers-5", "my-cluster-kafka-config"))); + + // Only the shared CM is deleted + assertThat(cmDeletionCaptor.getAllValues().size(), is(0)); + + // Check statuses + assertThat(kafkaNodePoolStatusCaptor.getAllValues().size(), is(2)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2))); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.CONTROLLER)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(3)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4, 5))); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1)); + assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER)); + assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers"))); + + // Check Kafka status + assertThat(kafkaStatusCaptor.getAllValues().size(), is(1)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds().size(), is(8)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 3, 4, 5, 1874, 1919)); + + // Check node unregistrations + verify(mockAdmin, times(2)).unregisterBroker(anyInt()); + assertThat(unregisteredNodeIdCaptor.getAllValues().size(), is(2)); + assertThat(unregisteredNodeIdCaptor.getAllValues(), hasItems(1874, 1919)); + async.flag(); }))); } @@ -852,7 +1221,6 @@ public void testNewPool(VertxTestContext context) { CrdOperator mockKafkaOps = supplier.kafkaOperator; when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); - when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; @@ -949,6 +1317,11 @@ public void testRemovePool(VertxTestContext context) { .withRoles(ProcessRoles.BROKER) .endSpec() .build(); + Kafka kafkaWithStatus = new KafkaBuilder(KAFKA) + .withNewStatus() + .withRegisteredNodeIds(0, 1, 2, 3, 4, 5, 6, 7) + .endStatus() + .build(); List oldPools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, KAFKA, List.of(CONTROLLERS, BROKERS, newPool), Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, true, SHARED_ENV_PROVIDER); KafkaCluster oldKafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, KAFKA, oldPools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, KafkaMetadataConfigurationState.KRAFT, null, SHARED_ENV_PROVIDER); @@ -1000,21 +1373,27 @@ public void testRemovePool(VertxTestContext context) { when(mockPodOps.waitFor(any(), any(), any(), any(), anyLong(), anyLong(), any())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaOps = supplier.kafkaOperator; - when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); - when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); - when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(kafkaWithStatus)); + ArgumentCaptor kafkaStatusCaptor = ArgumentCaptor.forClass(Kafka.class); + when(mockKafkaOps.updateStatusAsync(any(), kafkaStatusCaptor.capture())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; ArgumentCaptor kafkaNodePoolStatusCaptor = ArgumentCaptor.forClass(KafkaNodePool.class); when(mockKafkaNodePoolOps.updateStatusAsync(any(), kafkaNodePoolStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + Admin mockAdmin = supplier.adminClientProvider.createAdminClient(null, null, null, null); + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + ArgumentCaptor unregisteredNodeIdCaptor = ArgumentCaptor.forClass(Integer.class); + when(mockAdmin.unregisterBroker(unregisteredNodeIdCaptor.capture())).thenReturn(ubr); + MockKafkaReconciler kr = new MockKafkaReconciler( new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), vertx, CONFIG, supplier, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), - KAFKA, + kafkaWithStatus, List.of(CONTROLLERS, BROKERS), KAFKA_CLUSTER, CLUSTER_CA, @@ -1062,6 +1441,16 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER)); assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers"))); + // Check Kafka status + assertThat(kafkaStatusCaptor.getAllValues().size(), is(1)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds().size(), is(6)); + assertThat(kafkaStatusCaptor.getValue().getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 3, 4, 5)); + + // Check node unregistrations + verify(mockAdmin, times(2)).unregisterBroker(anyInt()); + assertThat(unregisteredNodeIdCaptor.getAllValues().size(), is(2)); + assertThat(unregisteredNodeIdCaptor.getAllValues(), hasItems(6, 7)); + async.flag(); }))); } @@ -1359,7 +1748,6 @@ public void testRollDueToPersistentVolumeResizing(VertxTestContext context) { CrdOperator mockKafkaOps = supplier.kafkaOperator; when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); - when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; @@ -1454,6 +1842,8 @@ static class MockKafkaReconciler extends KafkaReconciler { public MockKafkaReconciler(Reconciliation reconciliation, Vertx vertx, ClusterOperatorConfig config, ResourceOperatorSupplier supplier, PlatformFeaturesAvailability pfa, Kafka kafkaAssembly, List nodePools, KafkaCluster kafkaCluster, ClusterCa clusterCa, ClientsCa clientsCa) { super(reconciliation, kafkaAssembly, nodePools, kafkaCluster, clusterCa, clientsCa, config, supplier, pfa, vertx, new KafkaMetadataStateManager(reconciliation, kafkaAssembly)); + + this.coTlsPemIdentity = new TlsPemIdentity(null, null); } @Override @@ -1468,6 +1858,7 @@ public Future reconcile(KafkaStatus kafkaStatus, Clock clock) { .compose(i -> migrateFromStatefulSetToPodSet()) .compose(i -> podSet()) .compose(this::rollingUpdate) + .compose(i -> nodeUnregistration(kafkaStatus)) .compose(i -> sharedKafkaConfigurationCleanup()); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaNodeUnregistrationTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaNodeUnregistrationTest.java new file mode 100644 index 0000000000..76413c8f13 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaNodeUnregistrationTest.java @@ -0,0 +1,130 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.operator.assembly; + +import io.strimzi.operator.cluster.ResourceUtils; +import io.strimzi.operator.common.AdminClientProvider; +import io.strimzi.operator.common.Reconciliation; +import io.vertx.core.Vertx; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.UnregisterBrokerResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(VertxExtension.class) +public class KafkaNodeUnregistrationTest { + private static Vertx vertx; + + @BeforeAll + public static void before() { + vertx = Vertx.vertx(); + } + + @AfterAll + public static void after() { + vertx.close(); + } + + @Test + void testUnregistration(VertxTestContext context) { + Admin mockAdmin = ResourceUtils.adminClient(); + + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + ArgumentCaptor unregisteredNodeIdCaptor = ArgumentCaptor.forClass(Integer.class); + when(mockAdmin.unregisterBroker(unregisteredNodeIdCaptor.capture())).thenReturn(ubr); + + AdminClientProvider mockProvider = ResourceUtils.adminClientProvider(mockAdmin); + + Checkpoint async = context.checkpoint(); + KafkaNodeUnregistration.unregisterNodes(Reconciliation.DUMMY_RECONCILIATION, vertx, mockProvider, null, null, List.of(1874, 1919)) + .onComplete(context.succeeding(v -> context.verify(() -> { + assertThat(unregisteredNodeIdCaptor.getAllValues().size(), is(2)); + assertThat(unregisteredNodeIdCaptor.getAllValues(), hasItems(1874, 1919)); + + async.flag(); + }))); + } + + @Test + void testUnknownNodeUnregistration(VertxTestContext context) { + Admin mockAdmin = ResourceUtils.adminClient(); + + ArgumentCaptor unregisteredNodeIdCaptor = ArgumentCaptor.forClass(Integer.class); + when(mockAdmin.unregisterBroker(unregisteredNodeIdCaptor.capture())).thenAnswer(i -> { + if (i.getArgument(0, Integer.class) == 1919) { + KafkaFutureImpl unregistrationFuture = new KafkaFutureImpl<>(); + unregistrationFuture.completeExceptionally(new BrokerIdNotRegisteredException("Unknown node")); + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(unregistrationFuture); + return ubr; + } else { + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + return ubr; + } + }); + + AdminClientProvider mockProvider = ResourceUtils.adminClientProvider(mockAdmin); + + Checkpoint async = context.checkpoint(); + KafkaNodeUnregistration.unregisterNodes(Reconciliation.DUMMY_RECONCILIATION, vertx, mockProvider, null, null, List.of(1874, 1919)) + .onComplete(context.succeeding(v -> context.verify(() -> { + assertThat(unregisteredNodeIdCaptor.getAllValues().size(), is(2)); + assertThat(unregisteredNodeIdCaptor.getAllValues(), hasItems(1874, 1919)); + + async.flag(); + }))); + } + + @Test + void testFailingNodeUnregistration(VertxTestContext context) { + Admin mockAdmin = ResourceUtils.adminClient(); + + ArgumentCaptor unregisteredNodeIdCaptor = ArgumentCaptor.forClass(Integer.class); + when(mockAdmin.unregisterBroker(unregisteredNodeIdCaptor.capture())).thenAnswer(i -> { + if (i.getArgument(0, Integer.class) == 1919) { + KafkaFutureImpl unregistrationFuture = new KafkaFutureImpl<>(); + unregistrationFuture.completeExceptionally(new TimeoutException("Fake timeout")); + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(unregistrationFuture); + return ubr; + } else { + UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class); + when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null)); + return ubr; + } + }); + + AdminClientProvider mockProvider = ResourceUtils.adminClientProvider(mockAdmin); + + Checkpoint async = context.checkpoint(); + KafkaNodeUnregistration.unregisterNodes(Reconciliation.DUMMY_RECONCILIATION, vertx, mockProvider, null, null, List.of(1874, 1919)) + .onComplete(context.failing(v -> context.verify(() -> { + assertThat(unregisteredNodeIdCaptor.getAllValues().size(), is(2)); + assertThat(unregisteredNodeIdCaptor.getAllValues(), hasItems(1874, 1919)); + + async.flag(); + }))); + } +} diff --git a/documentation/assemblies/deploying/assembly-kraft-mode.adoc b/documentation/assemblies/deploying/assembly-kraft-mode.adoc index a309cd31a0..324dde0513 100644 --- a/documentation/assemblies/deploying/assembly-kraft-mode.adoc +++ b/documentation/assemblies/deploying/assembly-kraft-mode.adoc @@ -54,7 +54,6 @@ image::kraft-dual-role-quorum.png[KRaft cluster with nodes that combine roles] Currently, the KRaft mode in Strimzi has the following major limitations: * Scaling of KRaft controller nodes up or down is not supported. -* Unregistering Kafka nodes removed from the Kafka cluster. NOTE: If you are using JBOD storage, you can xref:ref-jbod-storage-str[change the volume that stores the metadata log]. diff --git a/documentation/modules/appendix_crds.adoc b/documentation/modules/appendix_crds.adoc index 3739db2643..9668f0b827 100644 --- a/documentation/modules/appendix_crds.adoc +++ b/documentation/modules/appendix_crds.adoc @@ -2188,6 +2188,9 @@ Used in: xref:type-Kafka-{context}[`Kafka`] |kafkaNodePools |xref:type-UsedNodePoolStatus-{context}[`UsedNodePoolStatus`] array |List of the KafkaNodePools used by this Kafka cluster. +|registeredNodeIds +|integer array +|Registered node IDs used by this Kafka cluster. This field is used for internal purposes only and will be removed in the future. |clusterId |string |Kafka cluster Id. diff --git a/packaging/helm-charts/helm3/strimzi-kafka-operator/crds/040-Crd-kafka.yaml b/packaging/helm-charts/helm3/strimzi-kafka-operator/crds/040-Crd-kafka.yaml index e75b5a1743..166ebe3dbe 100644 --- a/packaging/helm-charts/helm3/strimzi-kafka-operator/crds/040-Crd-kafka.yaml +++ b/packaging/helm-charts/helm3/strimzi-kafka-operator/crds/040-Crd-kafka.yaml @@ -7736,6 +7736,11 @@ spec: type: string description: The name of the KafkaNodePool used by this Kafka resource. description: List of the KafkaNodePools used by this Kafka cluster. + registeredNodeIds: + type: array + items: + type: integer + description: Registered node IDs used by this Kafka cluster. This field is used for internal purposes only and will be removed in the future. clusterId: type: string description: Kafka cluster Id. diff --git a/packaging/install/cluster-operator/040-Crd-kafka.yaml b/packaging/install/cluster-operator/040-Crd-kafka.yaml index 6769d4d0d3..d602909936 100644 --- a/packaging/install/cluster-operator/040-Crd-kafka.yaml +++ b/packaging/install/cluster-operator/040-Crd-kafka.yaml @@ -7735,6 +7735,11 @@ spec: type: string description: The name of the KafkaNodePool used by this Kafka resource. description: List of the KafkaNodePools used by this Kafka cluster. + registeredNodeIds: + type: array + items: + type: integer + description: Registered node IDs used by this Kafka cluster. This field is used for internal purposes only and will be removed in the future. clusterId: type: string description: Kafka cluster Id.