Skip to content

Commit

Permalink
Merge pull request #12167 from joejulian/planned_decommission
Browse files Browse the repository at this point in the history
Planned decommission
  • Loading branch information
RafalKorepta authored Aug 3, 2023
2 parents b050ac4 + 020137b commit a6e407a
Show file tree
Hide file tree
Showing 23 changed files with 750 additions and 149 deletions.
3 changes: 1 addition & 2 deletions src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/go/k8s/config/rbac/bases/operator/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ rules:
- pods/finalizers
verbs:
- update
- apiGroups:
- ""
resources:
- pods/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
Expand Down
26 changes: 5 additions & 21 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ClusterReconciler struct {
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;delete;
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;delete
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update
//+kubebuilder:rbac:groups=core,resources=pods/status,verbs=update;patch
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;
//+kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;update;patch;
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;
Expand Down Expand Up @@ -406,27 +407,10 @@ func (r *ClusterReconciler) handlePodFinalizer(
return nil
}
// delete the associated pvc
pvc := corev1.PersistentVolumeClaim{}
//nolint: gocritic // 248 bytes 6 times is not worth decreasing the readability over
for _, v := range pod.Spec.Volumes {
if v.PersistentVolumeClaim != nil {
key = types.NamespacedName{
Name: v.PersistentVolumeClaim.ClaimName,
Namespace: pod.GetNamespace(),
}
err = r.Get(ctx, key, &pvc)
if err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf(`unable to fetch PersistentVolumeClaim "%s/%s": %w`, key.Namespace, key.Name, err)
}
continue
}
log.WithValues("persistent-volume-claim", key).Info("deleting PersistentVolumeClaim")
if err := r.Delete(ctx, &pvc, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf(`unable to delete PersistentVolumeClaim "%s/%s": %w`, key.Name, key.Namespace, err)
}
}
if err = utils.DeletePodPVCs(ctx, r.Client, pod, log); err != nil {
return fmt.Errorf(`unable to remove VPCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}

// remove the finalizer
if err := r.removePodFinalizer(ctx, pod, log); err != nil {
return fmt.Errorf(`unable to remove finalizer from pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
Expand Down Expand Up @@ -549,7 +533,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *vectorized
return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err)
}

adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider(), int32(ordinal))
adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider(), ordinal)
if err != nil {
return -1, fmt.Errorf("unable to create admin client: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *ClusterReconciler) reconcileConfiguration(
}

// Synchronized status with cluster, including triggering a restart if needed
conditionData, err := r.synchronizeStatusWithCluster(ctx, redpandaCluster, adminAPI, log)
conditionData, err := r.synchronizeStatusWithCluster(ctx, redpandaCluster, statefulSetResource, adminAPI, log)
if err != nil {
return err
}
Expand Down Expand Up @@ -309,6 +309,7 @@ func (r *ClusterReconciler) checkCentralizedConfigurationHashChange(
func (r *ClusterReconciler) synchronizeStatusWithCluster(
ctx context.Context,
redpandaCluster *vectorizedv1alpha1.Cluster,
statefulset *resources.StatefulSetResource,
adminAPI adminutils.AdminAPIClient,
l logr.Logger,
) (*vectorizedv1alpha1.ClusterCondition, error) {
Expand All @@ -324,6 +325,7 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster(
clusterNeedsRestart := needsRestart(status, log)
clusterSafeToRestart := isSafeToRestart(status, log)
restartingCluster := clusterNeedsRestart && clusterSafeToRestart
isRestarting := redpandaCluster.Status.IsRestarting()

log.Info("Synchronizing configuration state for cluster",
"status", conditionData.Status,
Expand All @@ -332,7 +334,7 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster(
"needs_restart", clusterNeedsRestart,
"restarting", restartingCluster,
)
if conditionChanged || (restartingCluster && !redpandaCluster.Status.IsRestarting()) {
if conditionChanged || (restartingCluster && !isRestarting) {
log.Info("Updating configuration state for cluster")
// Trigger restart here if needed and safe to do it
if restartingCluster {
Expand All @@ -343,6 +345,11 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster(
return nil, errorWithContext(err, "could not update condition on cluster")
}
}
if restartingCluster && !isRestarting {
if err := statefulset.MarkPodsForUpdate(ctx); err != nil {
return nil, errorWithContext(err, "could not mark pods for update")
}
}
return redpandaCluster.Status.GetCondition(conditionData.Type), nil
}

Expand Down
35 changes: 33 additions & 2 deletions src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"

vectorizedv1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/vectorized/v1alpha1"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
)

var _ = Describe("Redpanda cluster scale resource", func() {
Expand Down Expand Up @@ -210,6 +209,38 @@ var _ = Describe("Redpanda cluster scale resource", func() {
By("Deleting the cluster")
Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed())
})

It("Can decommission nodes that never started", func() {
By("Allowing creation of a new cluster with 3 replicas")
key, redpandaCluster := getClusterWithReplicas("direct-decommission", 3)
Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed())

By("Scaling to 3 replicas, but have last one not registered")
testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive})
testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive})
var sts appsv1.StatefulSet
Eventually(resourceDataGetter(key, &sts, func() interface{} {
return *sts.Spec.Replicas
}), timeout, interval).Should(Equal(int32(3)))
Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} {
return redpandaCluster.Status.CurrentReplicas
}), timeout, interval).Should(Equal(int32(3)), "CurrentReplicas should be 3, got %d", redpandaCluster.Status.CurrentReplicas)

By("Doing direct scale down of node 2")
Eventually(clusterUpdater(key, func(cluster *vectorizedv1alpha1.Cluster) {
cluster.Spec.Replicas = pointer.Int32(2)
}), timeout, interval).Should(Succeed())
Eventually(resourceDataGetter(key, &sts, func() interface{} {
return *sts.Spec.Replicas
}), timeout, interval).Should(Equal(int32(2)))
Eventually(statefulSetReplicasReconciler(ctrl.Log, key, redpandaCluster), timeout, interval).Should(Succeed())
Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} {
return redpandaCluster.GetDecommissionBrokerID()
}), timeout, interval).Should(BeNil())

By("Deleting the cluster")
Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed())
})
})
})

Expand Down
14 changes: 7 additions & 7 deletions src/go/k8s/controllers/redpanda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) {
ctx = ctrl.SetupSignalHandler()
ctx, controllerCancel = context.WithCancel(ctx)

testAdminAPI = &adminutils.MockAdminAPI{Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI")}
testAdminAPI = &adminutils.MockAdminAPI{Log: logf.Log.WithName("testAdminAPI").WithName("mockAdminAPI")}
testAdminAPIFactory = func(
_ context.Context,
_ client.Reader,
Expand All @@ -157,7 +157,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) {

err = (&redpandacontrollers.ClusterReconciler{
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"),
Log: logf.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"),
Scheme: k8sManager.GetScheme(),
AdminAPIClientFactory: testAdminAPIFactory,
DecommissionWaitInterval: 100 * time.Millisecond,
Expand All @@ -171,7 +171,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) {
driftCheckPeriod := 500 * time.Millisecond
err = (&redpandacontrollers.ClusterConfigurationDriftReconciler{
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"),
Log: logf.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"),
Scheme: k8sManager.GetScheme(),
AdminAPIClientFactory: testAdminAPIFactory,
DriftCheckPeriod: &driftCheckPeriod,
Expand All @@ -181,7 +181,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) {
err = (&redpandacontrollers.ConsoleReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Console"),
Log: logf.Log.WithName("controllers").WithName("redpanda").WithName("Console"),
AdminAPIClientFactory: testAdminAPIFactory,
Store: testStore,
EventRecorder: k8sManager.GetEventRecorderFor("Console"),
Expand All @@ -190,8 +190,8 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) {
Expect(err).ToNot(HaveOccurred())

storageAddr := ":9090"
storageAdvAddr := redpandacontrollers.DetermineAdvStorageAddr(storageAddr, ctrl.Log.WithName("controllers").WithName("core").WithName("Redpanda"))
storage := redpandacontrollers.MustInitStorage("/tmp", storageAdvAddr, 60*time.Second, 2, ctrl.Log.WithName("controllers").WithName("core").WithName("Redpanda"))
storageAdvAddr := redpandacontrollers.DetermineAdvStorageAddr(storageAddr, logf.Log.WithName("controllers").WithName("core").WithName("Redpanda"))
storage := redpandacontrollers.MustInitStorage("/tmp", storageAdvAddr, 60*time.Second, 2, logf.Log.WithName("controllers").WithName("core").WithName("Redpanda"))

metricsH := helper.MustMakeMetrics(k8sManager)
// TODO fill this in with options
Expand Down Expand Up @@ -243,7 +243,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) {
// to handle that.
<-k8sManager.Elected()

redpandacontrollers.StartFileServer(storage.BasePath, storageAddr, ctrl.Log.WithName("controllers").WithName("core").WithName("Redpanda"))
redpandacontrollers.StartFileServer(storage.BasePath, storageAddr, logf.Log.WithName("controllers").WithName("core").WithName("Redpanda"))
}()

err = (&redpandacontrollers.RedpandaReconciler{
Expand Down
2 changes: 1 addition & 1 deletion src/go/k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func main() {
flag.StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy")
flag.DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster")
flag.DurationVar(&metricsTimeout, "metrics-timeout", 8*time.Second, "Set the timeout for a checking metrics Admin API endpoint. If set to 0, then the 2 seconds default will be used")
flag.BoolVar(&vectorizedv1alpha1.AllowDownscalingInWebhook, "allow-downscaling", true, "Allow to reduce the number of replicas in existing clusters (alpha feature)")
flag.BoolVar(&vectorizedv1alpha1.AllowDownscalingInWebhook, "allow-downscaling", true, "Allow to reduce the number of replicas in existing clusters")
flag.BoolVar(&allowPVCDeletion, "allow-pvc-deletion", false, "Allow the operator to delete PVCs for Pods assigned to failed or missing Nodes (alpha feature)")
flag.BoolVar(&vectorizedv1alpha1.AllowConsoleAnyNamespace, "allow-console-any-ns", false, "Allow to create Console in any namespace. Allowing this copies Redpanda SchemaRegistry TLS Secret to namespace (alpha feature)")
flag.StringVar(&restrictToRedpandaVersion, "restrict-redpanda-version", "", "Restrict management of clusters to those with this version")
Expand Down
12 changes: 12 additions & 0 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,18 @@ func (r *StatefulSetResource) CurrentVersion(ctx context.Context) (string, error
return stsVersion, nil
}

func (r *StatefulSetResource) IsManagedDecommission() (bool, error) {
t, ok := r.pandaCluster.GetAnnotations()[ManagedDecommissionAnnotation]
if !ok {
return false, nil
}
deadline, err := time.Parse(time.RFC3339, t)
if err != nil {
return false, fmt.Errorf("managed decommission annotation must be a valid RFC3339 timestamp: %w", err)
}
return deadline.After(time.Now()), nil
}

func (r *StatefulSetResource) getPodByBrokerID(ctx context.Context, brokerID *int32) (*corev1.Pod, error) {
if brokerID == nil {
return nil, nil
Expand Down
Loading

0 comments on commit a6e407a

Please sign in to comment.