Skip to content

Commit

Permalink
Do not perform Redpanda decommission based on annotation
Browse files Browse the repository at this point in the history
Calling decommission in the case of changing Pod annotation might be not
possible if Pod was removed along with its annotation where previous
Redpanda ID was stored. There is dedicated function to handle Ghost
brokers.

Reference

redpanda-data/redpanda#9750

redpanda-data/redpanda#13298
redpanda-data/redpanda#13132

redpanda-data/helm-charts#253
redpanda-data/redpanda#12847
  • Loading branch information
RafalKorepta committed Jul 2, 2024
1 parent d9e2a57 commit 552ba34
Showing 1 changed file with 0 additions and 38 deletions.
38 changes: 0 additions & 38 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,21 +370,6 @@ func (r *ClusterReconciler) setPodNodeIDAnnotation(
continue
}

var oldNodeID int
if annotationExist {
oldNodeID, err = strconv.Atoi(nodeIDStrAnnotation)
if err != nil {
combinedErrors = errors.Join(combinedErrors, fmt.Errorf("unable to convert node ID (%s) to int: %w", nodeIDStrAnnotation, err))
continue
}

log.WithValues("pod-name", pod.Name, "old-node-id", oldNodeID).Info("decommission old node-id")
if err = r.decommissionBroker(ctx, rp, oldNodeID, log, ar); err != nil {
combinedErrors = errors.Join(combinedErrors, fmt.Errorf("unable to decommission broker: %w", err))
continue
}
}

log.WithValues("pod-name", pod.Name, "new-node-id", nodeID).Info("setting node-id annotation")
pod.Annotations[resources.PodAnnotationNodeIDKey] = realNodeIDStr
if err := r.Update(ctx, pod, &client.UpdateOptions{}); err != nil {
Expand Down Expand Up @@ -432,29 +417,6 @@ func (r *ClusterReconciler) setPodNodeIDLabel(
return nil
}

func (r *ClusterReconciler) decommissionBroker(
ctx context.Context, rp *vectorizedv1alpha1.Cluster, nodeID int, l logr.Logger, ar *attachedResources,
) error {
log := l.WithName("decommissionBroker").WithValues("node-id", nodeID)
log.V(logger.DebugLevel).Info("decommission broker")

pki, err := ar.getPKI()
if err != nil {
return fmt.Errorf("getting pki: %w", err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider())
if err != nil {
return fmt.Errorf("unable to create admin client: %w", err)
}

err = adminClient.DecommissionBroker(ctx, nodeID)
if err != nil && !strings.Contains(err.Error(), "failed: Not Found") {
return fmt.Errorf("unable to decommission broker: %w", err)
}
return nil
}

func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *vectorizedv1alpha1.Cluster, pod *corev1.Pod, ar *attachedResources) (int32, error) {
pki, err := ar.getPKI()
if err != nil {
Expand Down

0 comments on commit 552ba34

Please sign in to comment.