Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for node unregistration in KRaft mode #10442

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
On the client `accessTokenLocation`, `clientAssertion`, `clientAssertionLocation`, `clientAssertionType`, and `saslExtensions` have been added.
* Add support for custom Cruise Control API users
* Update HTTP bridge to latest 0.30.0 release
* Unregistration of KRaft nodes after scale-down

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion", "kafkaMetadataState" })
@JsonPropertyOrder({ "conditions", "observedGeneration", "listeners", "kafkaNodePools", "registeredNodeIds", "clusterId", "operatorLastSuccessfulVersion", "kafkaVersion", "kafkaMetadataVersion", "kafkaMetadataState" })
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaStatus extends Status {
private List<ListenerStatus> listeners;
private List<UsedNodePoolStatus> kafkaNodePools;
private List<Integer> registeredNodeIds;

private String clusterId;
private String operatorLastSuccessfulVersion;
Expand All @@ -54,6 +55,16 @@ public void setKafkaNodePools(List<UsedNodePoolStatus> kafkaNodePools) {
this.kafkaNodePools = kafkaNodePools;
}

@Description("Registered node IDs used by this Kafka cluster. " +
"This field is used for internal purposes only and will be removed in the future.")
public List<Integer> getRegisteredNodeIds() {
return registeredNodeIds;
}

public void setRegisteredNodeIds(List<Integer> registeredNodeIds) {
this.registeredNodeIds = registeredNodeIds;
}

@Description("Kafka cluster Id")
public String getClusterId() {
return clusterId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ public Future<KafkaStatus> createOrUpdate(Reconciliation reconciliation, Kafka k
// Copy the metadata state if needed
status.setKafkaMetadataState(kafkaAssembly.getStatus().getKafkaMetadataState());
}

if (status.getRegisteredNodeIds() == null
&& kafkaAssembly.getStatus().getRegisteredNodeIds() != null) {
// Copy the list of registered node IDs if needed
status.setRegisteredNodeIds(kafkaAssembly.getStatus().getRegisteredNodeIds());
}
}

if (reconcileResult.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.operator.VertxUtil;
import io.strimzi.operator.common.AdminClientProvider;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.auth.PemAuthIdentity;
import io.strimzi.operator.common.auth.PemTrustSet;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;

import java.util.ArrayList;
import java.util.List;

/**
* Contains utility methods for unregistering KRaft nodes from a Kafka cluster after scale-down
*/
public class KafkaNodeUnregistration {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaNodeUnregistration.class.getName());

/**
* Unregisters Kafka nodes from a KRaft-based Kafka cluster
*
* @param reconciliation Reconciliation marker
* @param vertx Vert.x instance
* @param adminClientProvider Kafka Admin API client provider
* @param pemTrustSet Trust set for the admin client to connect to the Kafka cluster
* @param pemAuthIdentity Key set for the admin client to connect to the Kafka cluster
* @param nodeIdsToUnregister List of node IDs that should be unregistered
*
* @return Future that completes when all nodes are unregistered
*/
public static Future<Void> unregisterNodes(
Reconciliation reconciliation,
Vertx vertx,
AdminClientProvider adminClientProvider,
PemTrustSet pemTrustSet,
PemAuthIdentity pemAuthIdentity,
List<Integer> nodeIdsToUnregister
) {
try {
String bootstrapHostname = KafkaResources.bootstrapServiceName(reconciliation.name()) + "." + reconciliation.namespace() + ".svc:" + KafkaCluster.REPLICATION_PORT;
Admin adminClient = adminClientProvider.createAdminClient(bootstrapHostname, pemTrustSet, pemAuthIdentity);

List<Future<Void>> futures = new ArrayList<>();
for (Integer nodeId : nodeIdsToUnregister) {
futures.add(unregisterNode(reconciliation, vertx, adminClient, nodeId));
}

return Future.all(futures)
.eventually(() -> {
adminClient.close();
return Future.succeededFuture();
})
.map((Void) null);
} catch (KafkaException e) {
LOGGER.warnCr(reconciliation, "Failed to unregister nodes", e);
return Future.failedFuture(e);
}
}

/**
* Unregisters a single Kafka node using the Kafka Admin API. In case the failure is caused by the node not being
* registered, the error will be ignored.
*
* @param reconciliation Reconciliation marker
* @param vertx Vert.x instance
* @param adminClient Kafka Admin API client instance
* @param nodeIdToUnregister ID of the node that should be unregistered
*
* @return Future that completes when the node is unregistered
*/
private static Future<Void> unregisterNode(Reconciliation reconciliation, Vertx vertx, Admin adminClient, Integer nodeIdToUnregister) {
LOGGER.debugCr(reconciliation, "Unregistering node {} from the Kafka cluster", nodeIdToUnregister);

return VertxUtil
.kafkaFutureToVertxFuture(reconciliation, vertx, adminClient.unregisterBroker(nodeIdToUnregister).all())
.recover(t -> {
if (t instanceof BrokerIdNotRegisteredException) {
// The broker is not registered anymore, so it does not need to be unregistered anymore and we
// report success. Situation like this might happen when the operator fails before updating the
// status, when the Kafka API call fails (e.g. due to network connection) but the unregistration
// was done on the Kafka cluster and similar.
LOGGER.warnCr(reconciliation, "Node {} is not registered and cannot be unregistered from the Kafka cluster", nodeIdToUnregister, t);
return Future.succeededFuture();
} else {
LOGGER.warnCr(reconciliation, "Failed to unregister node {} from the Kafka cluster", nodeIdToUnregister, t);
return Future.failedFuture(t);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import io.strimzi.operator.common.model.StatusDiff;
import io.strimzi.operator.common.operator.resource.ReconcileResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class KafkaReconciler {
private final PlatformFeaturesAvailability pfa;
private final ImagePullPolicy imagePullPolicy;
private final List<LocalObjectReference> imagePullSecrets;
private final List<Integer> previousNodeIds;

// Objects used during the reconciliation
/* test */ final Reconciliation reconciliation;
Expand Down Expand Up @@ -210,6 +212,7 @@ public KafkaReconciler(
this.pfa = pfa;
this.imagePullPolicy = config.getImagePullPolicy();
this.imagePullSecrets = config.getImagePullSecrets();
this.previousNodeIds = kafkaCr.getStatus() != null ? kafkaCr.getStatus().getRegisteredNodeIds() : null;

this.stsOperator = supplier.stsOperations;
this.strimziPodSetOperator = supplier.strimziPodSetOperator;
Expand Down Expand Up @@ -268,6 +271,7 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
.compose(i -> headlessServiceEndpointsReady())
.compose(i -> clusterId(kafkaStatus))
.compose(i -> defaultKafkaQuotas())
.compose(i -> nodeUnregistration(kafkaStatus))
.compose(i -> metadataVersion(kafkaStatus))
.compose(i -> deletePersistentClaims())
.compose(i -> sharedKafkaConfigurationCleanup())
Expand Down Expand Up @@ -939,9 +943,61 @@ protected Future<Void> defaultKafkaQuotas() {
return DefaultKafkaQuotasManager.reconcileDefaultUserQuotas(reconciliation, vertx, adminClientProvider, this.coTlsPemIdentity.pemTrustSet(), this.coTlsPemIdentity.pemAuthIdentity(), kafka.quotas());
}

/**
* Unregisters the KRaft nodes that were removed from the Kafka cluster
*
* @param kafkaStatus Kafka status for updating the list of currently registered node IDs
*
* @return Future which completes when the nodes removed from the Kafka cluster are unregistered
*/
protected Future<Void> nodeUnregistration(KafkaStatus kafkaStatus) {
List<Integer> currentNodeIds = kafka.nodes().stream().map(NodeRef::nodeId).sorted().toList();

if (kafkaMetadataStateManager.getMetadataConfigurationState().isKRaft()
&& previousNodeIds != null
&& !new HashSet<>(currentNodeIds).containsAll(previousNodeIds)) {
// We are in KRaft mode and there are some nodes that were removed => we should unregister them
List<Integer> nodeIdsToUnregister = new ArrayList<>(previousNodeIds);
nodeIdsToUnregister.removeAll(currentNodeIds);

LOGGER.infoCr(reconciliation, "Kafka nodes {} were removed from the Kafka cluster and will be unregistered", nodeIdsToUnregister);

Promise<Void> unregistrationPromise = Promise.promise();
KafkaNodeUnregistration.unregisterNodes(reconciliation, vertx, adminClientProvider, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity(), nodeIdsToUnregister)
.onComplete(res -> {
if (res.succeeded()) {
LOGGER.infoCr(reconciliation, "Kafka nodes {} were successfully unregistered from the Kafka cluster", nodeIdsToUnregister);
kafkaStatus.setRegisteredNodeIds(currentNodeIds);
} else {
LOGGER.warnCr(reconciliation, "Failed to unregister Kafka nodes {} from the Kafka cluster", nodeIdsToUnregister);

// When the unregistration failed, we will keep the original registered node IDs to retry
// the unregistration for them. But we will merge it with any existing node IDs to make
// sure we do not lose track of them.
Set<Integer> updatedNodeIds = new HashSet<>(currentNodeIds);
updatedNodeIds.addAll(previousNodeIds);
kafkaStatus.setRegisteredNodeIds(updatedNodeIds.stream().sorted().toList());
}
scholzj marked this conversation as resolved.
Show resolved Hide resolved

// We complete the promise with success even if the unregistration failed as we do not want to
// fail the reconciliation.
unregistrationPromise.complete();
});

return unregistrationPromise.future();
} else {
// We are either not in KRaft mode, or at a cluster without any information about previous nodes, or without
// any change to the nodes => we just update the status field
kafkaStatus.setRegisteredNodeIds(currentNodeIds);
return Future.succeededFuture();
}
}

/**
* Manages the KRaft metadata version
*
* @param kafkaStatus Kafka status used for updating the currently used metadata version
*
* @return Future which completes when the KRaft metadata version is set to the current version or updated.
*/
protected Future<Void> metadataVersion(KafkaStatus kafkaStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.admin.UnregisterBrokerResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.internals.KafkaFutureImpl;
Expand Down Expand Up @@ -591,6 +592,11 @@ public static Admin adminClient() {

when(mock.describeClientQuotas(any())).thenReturn(dcqr);

// Mock KRaft node unregistration
UnregisterBrokerResult ubr = mock(UnregisterBrokerResult.class);
when(ubr.all()).thenReturn(KafkaFuture.completedFuture(null));
when(mock.unregisterBroker(anyInt())).thenReturn(ubr);

return mock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ private void basicCheck() {
assertThat(k.getStatus().getOperatorLastSuccessfulVersion(), is(KafkaAssemblyOperator.OPERATOR_VERSION));
assertThat(k.getStatus().getKafkaMetadataState().toValue(), is("KRaft"));
assertThat(k.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("brokers", "controllers")));
assertThat(k.getStatus().getRegisteredNodeIds().size(), is(6));
assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12));
}

/**
Expand Down Expand Up @@ -562,6 +564,11 @@ public void testReconcileKafkaScaleUpAndDown(VertxTestContext context) {
assertThat(brokers.getStatus().getRoles().size(), is(1));
assertThat(brokers.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

// Check the Kafka resource status
Kafka k = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(k.getStatus().getRegisteredNodeIds().size(), is(8));
assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12, 13, 14));

// Scale down again
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("brokers").edit(p -> new KafkaNodePoolBuilder(p)
.editSpec()
Expand All @@ -588,6 +595,11 @@ public void testReconcileKafkaScaleUpAndDown(VertxTestContext context) {
assertThat(brokers.getStatus().getRoles().size(), is(1));
assertThat(brokers.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

// Check the Kafka resource status
Kafka k = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(k.getStatus().getRegisteredNodeIds().size(), is(6));
assertThat(k.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12));

async.flag();
})));
}
Expand Down Expand Up @@ -642,6 +654,8 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) {
Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(kafka.getStatus().getKafkaNodePools().size(), is(3));
assertThat(kafka.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), hasItems("brokers", "controllers", "new-pool"));
assertThat(kafka.getStatus().getRegisteredNodeIds().size(), is(9));
assertThat(kafka.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12, 13, 14, 15));

KafkaNodePool controllers = Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("controllers").get();
assertThat(controllers.getStatus().getReplicas(), is(3));
Expand Down Expand Up @@ -676,6 +690,8 @@ public void testReconcileAddAndRemovePool(VertxTestContext context) {
Kafka kafka = Crds.kafkaOperation(client).inNamespace(namespace).withName(CLUSTER_NAME).get();
assertThat(kafka.getStatus().getKafkaNodePools().size(), is(2));
assertThat(kafka.getStatus().getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), hasItems("brokers", "controllers"));
assertThat(kafka.getStatus().getRegisteredNodeIds().size(), is(6));
assertThat(kafka.getStatus().getRegisteredNodeIds(), hasItems(0, 1, 2, 10, 11, 12));

KafkaNodePool controllers = Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName("controllers").get();
assertThat(controllers.getStatus().getReplicas(), is(3));
Expand Down
Loading
Loading