From a01050799bb25f709d1a321b69b706f6ccdd3e08 Mon Sep 17 00:00:00 2001 From: Joe Julian Date: Thu, 8 Jun 2023 10:09:01 -0700 Subject: [PATCH 1/4] fix decommission --- .../redpanda/cluster_controller_scale_test.go | 38 +++++++- src/go/k8s/pkg/resources/statefulset_scale.go | 87 +++++++++++-------- .../k8s/pkg/resources/statefulset_update.go | 5 +- 3 files changed, 89 insertions(+), 41 deletions(-) diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go index 2e64fd430933..ada6f9be526c 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go @@ -13,6 +13,7 @@ import ( "context" "time" + "github.com/fluxcd/pkg/runtime/logger" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "gopkg.in/yaml.v2" @@ -24,10 +25,9 @@ import ( "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" - "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" - vectorizedv1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/vectorized/v1alpha1" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" ) var _ = Describe("Redpanda cluster scale resource", func() { @@ -39,6 +39,8 @@ var _ = Describe("Redpanda cluster scale resource", func() { intervalShort = time.Millisecond * 20 ) + ctrl.SetLogger(logger.NewLogger(logger.Options{})) + Context("When starting up a fresh Redpanda cluster", func() { It("Should wait for the first replica to come up before launching the others", func() { By("Allowing creation of a new cluster with 3 replicas") @@ -210,6 +212,38 @@ var _ = Describe("Redpanda cluster scale resource", func() { By("Deleting the cluster") Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) }) + + It("Can decommission nodes that never started", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("direct-decommission", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas, but have last one not registered") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + return redpandaCluster.Status.CurrentReplicas + }), timeout, interval).Should(Equal(int32(3)), "CurrentReplicas should be 3, got %d", redpandaCluster.Status.CurrentReplicas) + + By("Doing direct scale down of node 2") + Eventually(clusterUpdater(key, func(cluster *vectorizedv1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + Eventually(statefulSetReplicasReconciler(ctrl.Log, key, redpandaCluster), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + return redpandaCluster.GetDecommissionBrokerID() + }), timeout, interval).Should(BeNil()) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) }) }) diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index 196e66ccaec2..77de7d457771 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -68,41 +68,10 @@ const ( //nolint:nestif // for clarity func (r *StatefulSetResource) handleScaling(ctx context.Context) error { log := r.logger.WithName("handleScaling") - // decommission already in progress - if r.pandaCluster.GetDecommissionBrokerID() != nil { - if *r.pandaCluster.Spec.Replicas >= r.pandaCluster.GetCurrentReplicas() { - // Decommissioning can also be canceled and we need to recommission - err := r.handleRecommission(ctx) - if !errors.Is(err, &RecommissionFatalError{}) { - return err - } - // if it's impossible to recommission, fall through and let the decommission complete - log.WithValues("node_id", r.pandaCluster.GetDecommissionBrokerID()).Info("cannot recommission broker", "error", err) - } - // handleDecommission will return an error until the decommission is completed - if err := r.handleDecommission(ctx); err != nil { - return err - } - // Broker is now removed - targetReplicas := r.pandaCluster.GetCurrentReplicas() - 1 - log.WithValues("targetReplicas", targetReplicas).Info("broker decommission complete: scaling down StatefulSet") - - // We set status.currentReplicas accordingly to trigger scaling down of the statefulset - if err := setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger); err != nil { - return err - } - - scaledDown, err := r.verifyRunningCount(ctx, targetReplicas) - if err != nil { - return err - } - if !scaledDown { - return &RequeueAfterError{ - RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), - Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas), - } - } + // decommission already in progress + if err := r.handleDecommissionInProgress(ctx, log); err != nil { + return err } if r.pandaCluster.Status.CurrentReplicas == 0 { @@ -146,16 +115,58 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { return err } if targetBroker == nil { - return &RequeueAfterError{ - RequeueAfter: RequeueDuration, - Msg: fmt.Sprintf("cannot retrieve broker id for pod %d", targetOrdinal), - } + // The target pod isn't in the broker list. Just select a non-existing broker for decommission so the next + // reconcile loop will succeed. + nonExistantBroker := int32(-1) + targetBroker = &nonExistantBroker } log.WithValues("ordinal", targetOrdinal, "node_id", targetBroker).Info("start decommission broker") r.pandaCluster.SetDecommissionBrokerID(targetBroker) return r.Status().Update(ctx, r.pandaCluster) } +func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context, logger logr.Logger) error { + log := logger.WithName("handleDecommissionInProgress") + if r.pandaCluster.GetDecommissionBrokerID() == nil { + return nil + } + + if *r.pandaCluster.Spec.Replicas >= r.pandaCluster.GetCurrentReplicas() { + // Decommissioning can also be canceled and we need to recommission + err := r.handleRecommission(ctx) + if !errors.Is(err, &RecommissionFatalError{}) { + return err + } + // if it's impossible to recommission, fall through and let the decommission complete + log.WithValues("node_id", r.pandaCluster.GetDecommissionBrokerID()).Info("cannot recommission broker", "error", err) + } + // handleDecommission will return an error until the decommission is completed + if err := r.handleDecommission(ctx); err != nil { + return err + } + + // Broker is now removed + targetReplicas := r.pandaCluster.GetCurrentReplicas() - 1 + log.WithValues("targetReplicas", targetReplicas).Info("broker decommission complete: scaling down StatefulSet") + + // We set status.currentReplicas accordingly to trigger scaling down of the statefulset + if err := setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger); err != nil { + return err + } + + scaledDown, err := r.verifyRunningCount(ctx, targetReplicas) + if err != nil { + return err + } + if !scaledDown { + return &RequeueAfterError{ + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), + Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas), + } + } + return nil +} + // handleDecommission manages the case of decommissioning of the last node of a cluster. // // When this handler is called, the `status.decommissioningNode` is populated with the diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index 605aa4c503b1..4e23ebb4e1b2 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -40,6 +40,9 @@ const ( // requeue resource reconciliation. RequeueDuration = time.Second * 10 defaultAdminAPITimeout = time.Second * 2 + + ManagedDecommissionAnnotation = "operator.redpanda.com/managed-decommission" + PodUpdateAnnotation = "operator.redpanda.com/pending-update" ) var ( @@ -329,7 +332,7 @@ func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID) if err != nil { - return fmt.Errorf("getting broker infromations: %w", err) + return fmt.Errorf("getting broker information: %w", err) } if br.Maintenance == nil { From 5e44461ed94a9687145aebfe0435cf17a756a0a0 Mon Sep 17 00:00:00 2001 From: Joe Julian Date: Wed, 7 Jun 2023 16:00:59 -0700 Subject: [PATCH 2/4] add feature to trigger managed rolling decommission This adds the ability to do planned decommissioning of StatefulSet pods. To trigger a managed decommission, set an annotation on the Cluster resource with an RFC3339 deadline. ``` kubectl annotate cluster managed-decommission operator.redpanda.com/managed-decommission="2999-12-31T00:00:00Z" ``` The Cluster controller will decommission each pod from 0 to N, delete the pvcs and the pod, waiting for the replacement pod to join and the cluster to be healthy between each deletion. When finished, the controller will remove the annotation. Should the deadline pass during the managed decommission, the controller will not continue decommissioning pods. To resume the decommission, set the annotation to a timestamp in the future --- .../v1alpha1/zz_generated.deepcopy.go | 3 +- .../k8s/config/rbac/bases/operator/role.yaml | 7 + .../redpanda/cluster_controller.go | 26 +- .../cluster_controller_configuration.go | 11 +- .../redpanda/cluster_controller_scale_test.go | 3 - src/go/k8s/controllers/redpanda/suite_test.go | 14 +- src/go/k8s/pkg/resources/statefulset.go | 12 + src/go/k8s/pkg/resources/statefulset_scale.go | 34 ++- src/go/k8s/pkg/resources/statefulset_test.go | 51 ++-- .../k8s/pkg/resources/statefulset_update.go | 242 +++++++++++++++--- .../pkg/resources/statefulset_update_test.go | 4 +- src/go/k8s/pkg/utils/kubernetes.go | 4 +- src/go/k8s/pkg/utils/kubernetes_test.go | 2 +- src/go/k8s/pkg/utils/podutils.go | 94 +++++++ 14 files changed, 394 insertions(+), 113 deletions(-) create mode 100644 src/go/k8s/pkg/utils/podutils.go diff --git a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go index adf4fcb222d6..d5f2e40c3862 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go @@ -15,11 +15,10 @@ package v1alpha1 import ( - timex "time" - "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + timex "time" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. diff --git a/src/go/k8s/config/rbac/bases/operator/role.yaml b/src/go/k8s/config/rbac/bases/operator/role.yaml index 2a1a634b2c98..a343abde1c8e 100644 --- a/src/go/k8s/config/rbac/bases/operator/role.yaml +++ b/src/go/k8s/config/rbac/bases/operator/role.yaml @@ -123,6 +123,13 @@ rules: - pods/finalizers verbs: - update +- apiGroups: + - "" + resources: + - pods/status + verbs: + - patch + - update - apiGroups: - "" resources: diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 930f42232ba3..b9fc05622848 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -86,6 +86,7 @@ type ClusterReconciler struct { //+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;delete; //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;delete //+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update +//+kubebuilder:rbac:groups=core,resources=pods/status,verbs=update;patch //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch; //+kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;update;patch; //+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch; @@ -406,27 +407,10 @@ func (r *ClusterReconciler) handlePodFinalizer( return nil } // delete the associated pvc - pvc := corev1.PersistentVolumeClaim{} - //nolint: gocritic // 248 bytes 6 times is not worth decreasing the readability over - for _, v := range pod.Spec.Volumes { - if v.PersistentVolumeClaim != nil { - key = types.NamespacedName{ - Name: v.PersistentVolumeClaim.ClaimName, - Namespace: pod.GetNamespace(), - } - err = r.Get(ctx, key, &pvc) - if err != nil { - if !apierrors.IsNotFound(err) { - return fmt.Errorf(`unable to fetch PersistentVolumeClaim "%s/%s": %w`, key.Namespace, key.Name, err) - } - continue - } - log.WithValues("persistent-volume-claim", key).Info("deleting PersistentVolumeClaim") - if err := r.Delete(ctx, &pvc, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { - return fmt.Errorf(`unable to delete PersistentVolumeClaim "%s/%s": %w`, key.Name, key.Namespace, err) - } - } + if err = utils.DeletePodPVCs(ctx, r.Client, pod, log); err != nil { + return fmt.Errorf(`unable to remove VPCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err) } + // remove the finalizer if err := r.removePodFinalizer(ctx, pod, log); err != nil { return fmt.Errorf(`unable to remove finalizer from pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err) @@ -549,7 +533,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *vectorized return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err) } - adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider(), int32(ordinal)) + adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, rp, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider(), ordinal) if err != nil { return -1, fmt.Errorf("unable to create admin client: %w", err) } diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_configuration.go b/src/go/k8s/controllers/redpanda/cluster_controller_configuration.go index 7e35ff31e00f..12931c9318c7 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_configuration.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_configuration.go @@ -126,7 +126,7 @@ func (r *ClusterReconciler) reconcileConfiguration( } // Synchronized status with cluster, including triggering a restart if needed - conditionData, err := r.synchronizeStatusWithCluster(ctx, redpandaCluster, adminAPI, log) + conditionData, err := r.synchronizeStatusWithCluster(ctx, redpandaCluster, statefulSetResource, adminAPI, log) if err != nil { return err } @@ -309,6 +309,7 @@ func (r *ClusterReconciler) checkCentralizedConfigurationHashChange( func (r *ClusterReconciler) synchronizeStatusWithCluster( ctx context.Context, redpandaCluster *vectorizedv1alpha1.Cluster, + statefulset *resources.StatefulSetResource, adminAPI adminutils.AdminAPIClient, l logr.Logger, ) (*vectorizedv1alpha1.ClusterCondition, error) { @@ -324,6 +325,7 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster( clusterNeedsRestart := needsRestart(status, log) clusterSafeToRestart := isSafeToRestart(status, log) restartingCluster := clusterNeedsRestart && clusterSafeToRestart + isRestarting := redpandaCluster.Status.IsRestarting() log.Info("Synchronizing configuration state for cluster", "status", conditionData.Status, @@ -332,7 +334,7 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster( "needs_restart", clusterNeedsRestart, "restarting", restartingCluster, ) - if conditionChanged || (restartingCluster && !redpandaCluster.Status.IsRestarting()) { + if conditionChanged || (restartingCluster && !isRestarting) { log.Info("Updating configuration state for cluster") // Trigger restart here if needed and safe to do it if restartingCluster { @@ -343,6 +345,11 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster( return nil, errorWithContext(err, "could not update condition on cluster") } } + if restartingCluster && !isRestarting { + if err := statefulset.MarkPodsForUpdate(ctx); err != nil { + return nil, errorWithContext(err, "could not mark pods for update") + } + } return redpandaCluster.Status.GetCondition(conditionData.Type), nil } diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go index ada6f9be526c..78d024d20cba 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go @@ -13,7 +13,6 @@ import ( "context" "time" - "github.com/fluxcd/pkg/runtime/logger" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "gopkg.in/yaml.v2" @@ -39,8 +38,6 @@ var _ = Describe("Redpanda cluster scale resource", func() { intervalShort = time.Millisecond * 20 ) - ctrl.SetLogger(logger.NewLogger(logger.Options{})) - Context("When starting up a fresh Redpanda cluster", func() { It("Should wait for the first replica to come up before launching the others", func() { By("Allowing creation of a new cluster with 3 replicas") diff --git a/src/go/k8s/controllers/redpanda/suite_test.go b/src/go/k8s/controllers/redpanda/suite_test.go index d08ec8b92ac3..7d4f824583a2 100644 --- a/src/go/k8s/controllers/redpanda/suite_test.go +++ b/src/go/k8s/controllers/redpanda/suite_test.go @@ -131,7 +131,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) { ctx = ctrl.SetupSignalHandler() ctx, controllerCancel = context.WithCancel(ctx) - testAdminAPI = &adminutils.MockAdminAPI{Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI")} + testAdminAPI = &adminutils.MockAdminAPI{Log: logf.Log.WithName("testAdminAPI").WithName("mockAdminAPI")} testAdminAPIFactory = func( _ context.Context, _ client.Reader, @@ -157,7 +157,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) { err = (&redpandacontrollers.ClusterReconciler{ Client: k8sManager.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), + Log: logf.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), Scheme: k8sManager.GetScheme(), AdminAPIClientFactory: testAdminAPIFactory, DecommissionWaitInterval: 100 * time.Millisecond, @@ -171,7 +171,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) { driftCheckPeriod := 500 * time.Millisecond err = (&redpandacontrollers.ClusterConfigurationDriftReconciler{ Client: k8sManager.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), + Log: logf.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), Scheme: k8sManager.GetScheme(), AdminAPIClientFactory: testAdminAPIFactory, DriftCheckPeriod: &driftCheckPeriod, @@ -181,7 +181,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) { err = (&redpandacontrollers.ConsoleReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), - Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Console"), + Log: logf.Log.WithName("controllers").WithName("redpanda").WithName("Console"), AdminAPIClientFactory: testAdminAPIFactory, Store: testStore, EventRecorder: k8sManager.GetEventRecorderFor("Console"), @@ -190,8 +190,8 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) { Expect(err).ToNot(HaveOccurred()) storageAddr := ":9090" - storageAdvAddr := redpandacontrollers.DetermineAdvStorageAddr(storageAddr, ctrl.Log.WithName("controllers").WithName("core").WithName("Redpanda")) - storage := redpandacontrollers.MustInitStorage("/tmp", storageAdvAddr, 60*time.Second, 2, ctrl.Log.WithName("controllers").WithName("core").WithName("Redpanda")) + storageAdvAddr := redpandacontrollers.DetermineAdvStorageAddr(storageAddr, logf.Log.WithName("controllers").WithName("core").WithName("Redpanda")) + storage := redpandacontrollers.MustInitStorage("/tmp", storageAdvAddr, 60*time.Second, 2, logf.Log.WithName("controllers").WithName("core").WithName("Redpanda")) metricsH := helper.MustMakeMetrics(k8sManager) // TODO fill this in with options @@ -243,7 +243,7 @@ var _ = BeforeSuite(func(suiteCtx SpecContext) { // to handle that. <-k8sManager.Elected() - redpandacontrollers.StartFileServer(storage.BasePath, storageAddr, ctrl.Log.WithName("controllers").WithName("core").WithName("Redpanda")) + redpandacontrollers.StartFileServer(storage.BasePath, storageAddr, logf.Log.WithName("controllers").WithName("core").WithName("Redpanda")) }() err = (&redpandacontrollers.RedpandaReconciler{ diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index b2e42fef8047..c6529ad59c26 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -921,6 +921,18 @@ func (r *StatefulSetResource) CurrentVersion(ctx context.Context) (string, error return stsVersion, nil } +func (r *StatefulSetResource) IsManagedDecommission() (bool, error) { + t, ok := r.pandaCluster.GetAnnotations()[ManagedDecommissionAnnotation] + if !ok { + return false, nil + } + deadline, err := time.Parse(time.RFC3339, t) + if err != nil { + return false, fmt.Errorf("managed decommission annotation must be a valid RFC3339 timestamp: %w", err) + } + return deadline.After(time.Now()), nil +} + func (r *StatefulSetResource) getPodByBrokerID(ctx context.Context, brokerID *int32) (*corev1.Pod, error) { if brokerID == nil { return nil, nil diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index 77de7d457771..a335b0dc9303 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -14,6 +14,7 @@ import ( "errors" "fmt" + "github.com/fluxcd/pkg/runtime/logger" "github.com/go-logr/logr" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" appsv1 "k8s.io/api/apps/v1" @@ -69,7 +70,8 @@ const ( func (r *StatefulSetResource) handleScaling(ctx context.Context) error { log := r.logger.WithName("handleScaling") - // decommission already in progress + // if a decommission is already in progress, handle it first. If it's not finished, it will return an error + // which will requeue the reconciliation. We can't do any further scaling until it's finished. if err := r.handleDecommissionInProgress(ctx, log); err != nil { return err } @@ -81,6 +83,7 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } if *r.pandaCluster.Spec.Replicas == r.pandaCluster.Status.CurrentReplicas { + r.logger.V(logger.DebugLevel).Info("No scaling changes required", "replicas", *r.pandaCluster.Spec.Replicas) // No changes to replicas, we do nothing here return nil } @@ -109,24 +112,36 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } // User required replicas is lower than current replicas (currentReplicas): start the decommissioning process + r.logger.Info("Downscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas) + targetOrdinal := r.pandaCluster.Status.CurrentReplicas - 1 // Always decommission last node targetBroker, err := r.getBrokerIDForPod(ctx, targetOrdinal) if err != nil { - return err + return fmt.Errorf("error getting broker ID for pod with ordinal %d when downscaling cluster: %w", targetOrdinal, err) } + nonExistantBroker := int32(-1) if targetBroker == nil { // The target pod isn't in the broker list. Just select a non-existing broker for decommission so the next // reconcile loop will succeed. - nonExistantBroker := int32(-1) targetBroker = &nonExistantBroker } log.WithValues("ordinal", targetOrdinal, "node_id", targetBroker).Info("start decommission broker") r.pandaCluster.SetDecommissionBrokerID(targetBroker) - return r.Status().Update(ctx, r.pandaCluster) + err = r.Status().Update(ctx, r.pandaCluster) + if err != nil { + return err + } + if *targetBroker == nonExistantBroker { + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("the broker for pod with ordinal %d is not registered with the cluster. Requeuing.", targetOrdinal), + } + } + return nil } -func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context, logger logr.Logger) error { - log := logger.WithName("handleDecommissionInProgress") +func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context, l logr.Logger) error { + log := l.WithName("handleDecommissionInProgress") if r.pandaCluster.GetDecommissionBrokerID() == nil { return nil } @@ -407,19 +422,20 @@ func setCurrentReplicas( c k8sclient.Client, pandaCluster *vectorizedv1alpha1.Cluster, replicas int32, - logger logr.Logger, + l logr.Logger, ) error { + log := l.WithName("setCurrentReplicas") if pandaCluster.Status.CurrentReplicas == replicas { // Skip if already done return nil } - logger.Info("Scaling StatefulSet", "replicas", replicas) + log.Info("Scaling StatefulSet", "replicas", replicas) pandaCluster.Status.CurrentReplicas = replicas if err := c.Status().Update(ctx, pandaCluster); err != nil { return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, replicas, err) } - logger.Info("StatefulSet scaled", "replicas", replicas) + log.Info("StatefulSet scaled", "replicas", replicas) return nil } diff --git a/src/go/k8s/pkg/resources/statefulset_test.go b/src/go/k8s/pkg/resources/statefulset_test.go index 3b656910924b..b07ad778de92 100644 --- a/src/go/k8s/pkg/resources/statefulset_test.go +++ b/src/go/k8s/pkg/resources/statefulset_test.go @@ -16,11 +16,6 @@ import ( "testing" "time" - vectorizedv1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/vectorized/v1alpha1" - adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" - "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" - res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" - resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/stretchr/testify/assert" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -32,6 +27,12 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + vectorizedv1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/vectorized/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" ) const ( @@ -87,8 +88,8 @@ func TestEnsure(t *testing.T) { {"update redpanda resources", stsResource, resourcesUpdatedRedpandaCluster, resourcesUpdatedSts, true, nil}, {"disabled sidecar", nil, noSidecarCluster, noSidecarSts, true, nil}, {"cluster without shadow index cache dir", stsResource, withoutShadowIndexCacheDirectory, stsWithoutSecondPersistentVolume, true, nil}, - {"update none healthy cluster", stsResource, unhealthyRedpandaCluster, stsResource, false, &res.RequeueAfterError{ - RequeueAfter: res.RequeueDuration, + {"update none healthy cluster", stsResource, unhealthyRedpandaCluster, stsResource, false, &resources.RequeueAfterError{ + RequeueAfter: resources.RequeueDuration, Msg: "wait for cluster to become healthy (cluster restarting)", }}, } @@ -110,7 +111,7 @@ func TestEnsure(t *testing.T) { err = c.Create(context.Background(), tt.pandaCluster) assert.NoError(t, err) - sts := res.NewStatefulSet( + sts := resources.NewStatefulSet( c, tt.pandaCluster, scheme.Scheme, @@ -120,7 +121,7 @@ func TestEnsure(t *testing.T) { TestStatefulsetTLSVolumeProvider{}, TestAdminTLSConfigProvider{}, "", - res.ConfiguratorSettings{ + resources.ConfiguratorSettings{ ConfiguratorBaseImage: "vectorized/configurator", ConfiguratorTag: "latest", ImagePullPolicy: "Always", @@ -256,7 +257,7 @@ func stsFromCluster(pandaCluster *vectorizedv1alpha1.Cluster) *v1.StatefulSet { func pandaCluster() *vectorizedv1alpha1.Cluster { var replicas int32 = 1 - resources := corev1.ResourceList{ + res := corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("2Gi"), } @@ -295,8 +296,8 @@ func pandaCluster() *vectorizedv1alpha1.Cluster { }, Resources: vectorizedv1alpha1.RedpandaResourceRequirements{ ResourceRequirements: corev1.ResourceRequirements{ - Limits: resources, - Requests: resources, + Limits: res, + Requests: res, }, Redpanda: nil, }, @@ -304,8 +305,8 @@ func pandaCluster() *vectorizedv1alpha1.Cluster { RpkStatus: &vectorizedv1alpha1.Sidecar{ Enabled: true, Resources: &corev1.ResourceRequirements{ - Limits: resources, - Requests: resources, + Limits: res, + Requests: res, }, }, }, @@ -332,7 +333,7 @@ func TestVersion(t *testing.T) { } for _, tt := range tests { - sts := &res.StatefulSetResource{ + sts := &resources.StatefulSetResource{ LastObservedState: &v1.StatefulSet{ Spec: v1.StatefulSetSpec{ Template: corev1.PodTemplateSpec{ @@ -414,14 +415,14 @@ func TestCurrentVersion(t *testing.T) { pod.Labels = labels.ForCluster(redpanda) assert.NoError(t, c.Create(context.TODO(), &pod)) } - sts := res.NewStatefulSet(c, redpanda, scheme.Scheme, + sts := resources.NewStatefulSet(c, redpanda, scheme.Scheme, "cluster.local", "servicename", types.NamespacedName{Name: "test", Namespace: "test"}, TestStatefulsetTLSVolumeProvider{}, TestAdminTLSConfigProvider{}, "", - res.ConfiguratorSettings{ + resources.ConfiguratorSettings{ ConfiguratorBaseImage: "vectorized/configurator", ConfiguratorTag: "latest", ImagePullPolicy: "Always", @@ -460,7 +461,7 @@ func Test_GetPodByBrokerIDfromPodList(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod-0", Annotations: map[string]string{ - res.PodAnnotationNodeIDKey: "3", + resources.PodAnnotationNodeIDKey: "3", }, }, }, @@ -468,7 +469,7 @@ func Test_GetPodByBrokerIDfromPodList(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod-1", Annotations: map[string]string{ - res.PodAnnotationNodeIDKey: "5", + resources.PodAnnotationNodeIDKey: "5", }, }, }, @@ -476,7 +477,7 @@ func Test_GetPodByBrokerIDfromPodList(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod-2", Annotations: map[string]string{ - res.PodAnnotationNodeIDKey: "7", + resources.PodAnnotationNodeIDKey: "7", }, }, }, @@ -516,7 +517,7 @@ func Test_GetPodByBrokerIDfromPodList(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := res.GetPodByBrokerIDfromPodList(tt.args.brokerIDStr, tt.args.pods) + got := resources.GetPodByBrokerIDfromPodList(tt.args.brokerIDStr, tt.args.pods) if !reflect.DeepEqual(got, tt.want) { t.Errorf("GetPodByBrokerIDfromPodList() = %v, want %v", got, tt.want) } @@ -531,7 +532,7 @@ func Test_GetBrokerIDForPodFromPodList(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod-0", Annotations: map[string]string{ - res.PodAnnotationNodeIDKey: "", + resources.PodAnnotationNodeIDKey: "", }, }, }, @@ -539,7 +540,7 @@ func Test_GetBrokerIDForPodFromPodList(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod-1", Annotations: map[string]string{ - res.PodAnnotationNodeIDKey: "5", + resources.PodAnnotationNodeIDKey: "5", }, }, }, @@ -547,7 +548,7 @@ func Test_GetBrokerIDForPodFromPodList(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod-2", Annotations: map[string]string{ - res.PodAnnotationNodeIDKey: "7", + resources.PodAnnotationNodeIDKey: "7", }, }, }, @@ -605,7 +606,7 @@ func Test_GetBrokerIDForPodFromPodList(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := res.GetBrokerIDForPodFromPodList(tt.args.pods, tt.args.podName) + got, err := resources.GetBrokerIDForPodFromPodList(tt.args.pods, tt.args.podName) if (err != nil) != tt.wantErr { t.Errorf("getBrokerIDForPodFromPodList() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index 4e23ebb4e1b2..16b15f04a88c 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -23,10 +23,12 @@ import ( "github.com/cisco-open/k8s-objectmatcher/patch" "github.com/fluxcd/pkg/runtime/logger" + "github.com/go-logr/logr" "github.com/prometheus/common/expfmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" vectorizedv1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/vectorized/v1alpha1" @@ -42,7 +44,7 @@ const ( defaultAdminAPITimeout = time.Second * 2 ManagedDecommissionAnnotation = "operator.redpanda.com/managed-decommission" - PodUpdateAnnotation = "operator.redpanda.com/pending-update" + ClusterUpdatePodCondition = corev1.PodConditionType("ClusterUpdate") ) var ( @@ -69,6 +71,7 @@ var ( func (r *StatefulSetResource) runUpdate( ctx context.Context, current, modified *appsv1.StatefulSet, ) error { + log := r.logger.WithName("runUpdate") // Keep existing central config hash annotation during standard reconciliation if ann, ok := current.Spec.Template.Annotations[CentralizedConfigurationHashAnnotationKey]; ok { if modified.Spec.Template.Annotations == nil { @@ -77,7 +80,8 @@ func (r *StatefulSetResource) runUpdate( modified.Spec.Template.Annotations[CentralizedConfigurationHashAnnotationKey] = ann } - update, err := r.shouldUpdate(r.pandaCluster.Status.IsRestarting(), current, modified) + log.V(logger.DebugLevel).Info("Checking that we should update") + update, err := r.shouldUpdate(current, modified) if err != nil { return fmt.Errorf("unable to determine the update procedure: %w", err) } @@ -86,23 +90,45 @@ func (r *StatefulSetResource) runUpdate( return nil } - if err = r.updateRestartingStatus(ctx, true); err != nil { - return fmt.Errorf("unable to turn on restarting status in cluster custom resource: %w", err) + if !r.getRestartingStatus() { + log.V(logger.DebugLevel).Info("restarting is required. Updating Cluster restarting status") + if err = r.updateRestartingStatus(ctx, true); err != nil { + return fmt.Errorf("unable to turn on restarting status in cluster custom resource: %w", err) + } + log.V(logger.DebugLevel).Info("updating restarting status on pods") + if err = r.MarkPodsForUpdate(ctx); err != nil { + return fmt.Errorf("unable to mark pods for update: %w", err) + } } + log.V(logger.DebugLevel).Info("updating statefulset") if err = r.updateStatefulSet(ctx, current, modified); err != nil { return err } + log.V(logger.DebugLevel).Info("checking if cluster is healthy") if err = r.isClusterHealthy(ctx); err != nil { return err } + log.V(logger.DebugLevel).Info("performing rolling update") if err = r.rollingUpdate(ctx, &modified.Spec.Template); err != nil { return err } - // Update is complete for all pods (and all are ready). Set restarting status to false. + ok, err := r.areAllPodsUpdated(ctx) + if err != nil { + return err + } + if !ok { + return &RequeueAfterError{ + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), + Msg: "waiting for all pods to be updated", + } + } + + // If update is complete for all pods (and all are ready). Set restarting status to false. + log.V(logger.DebugLevel).Info("pod update complete, set Cluster restarting status to false") if err = r.updateRestartingStatus(ctx, false); err != nil { return fmt.Errorf("unable to turn off restarting status in cluster custom resource: %w", err) } @@ -127,7 +153,7 @@ func (r *StatefulSetResource) isClusterHealthy(ctx context.Context) error { } restarting := "not restarting" - if r.pandaCluster.Status.IsRestarting() { + if r.getRestartingStatus() { restarting = "restarting" } @@ -164,32 +190,91 @@ func sortPodList(podList *corev1.PodList, cluster *vectorizedv1alpha1.Cluster) * return podList } +func (r *StatefulSetResource) MarkPodsForUpdate(ctx context.Context) error { + podList, err := r.getPodList(ctx) + if err != nil { + return fmt.Errorf("error getting pods %w", err) + } + + for i := range podList.Items { + pod := &podList.Items[i] + podPatch := k8sclient.MergeFrom(pod.DeepCopy()) + newCondition := corev1.PodCondition{ + Type: ClusterUpdatePodCondition, + Status: corev1.ConditionTrue, + Message: "Cluster update pending", + } + utils.SetStatusPodCondition(&pod.Status.Conditions, &newCondition) + if err := r.Client.Status().Patch(ctx, pod, podPatch); err != nil { + return fmt.Errorf("error setting pod update condition: %w", err) + } + } + return nil +} + +func (r *StatefulSetResource) areAllPodsUpdated(ctx context.Context) (bool, error) { + podList, err := r.getPodList(ctx) + if err != nil { + return false, fmt.Errorf("error getting pods %w", err) + } + + for i := range podList.Items { + pod := &podList.Items[i] + cond := utils.FindStatusPodCondition(pod.Status.Conditions, ClusterUpdatePodCondition) + if cond != nil && cond.Status == corev1.ConditionTrue { + return false, nil + } + } + return true, nil +} + func (r *StatefulSetResource) rollingUpdate( ctx context.Context, template *corev1.PodTemplateSpec, ) error { + log := r.logger.WithName("rollingUpdate") podList, err := r.getPodList(ctx) if err != nil { return fmt.Errorf("error getting pods %w", err) } + updateItems, maintenanceItems := r.listPodsForUpdateOrMaintenance(log, podList) + + if err = r.checkMaintenanceModeForPods(ctx, maintenanceItems); err != nil { + return fmt.Errorf("error checking maintenance mode for pods: %w", err) + } + + // There are no pods left to update, we're done. + if len(updateItems) == 0 { + log.V(logger.DebugLevel).Info("no pods to roll") + return nil + } + log.V(logger.DebugLevel).Info("rolling pods", "number of pods", len(updateItems)) + + return r.updatePods(ctx, log, updateItems, template) +} + +func (r *StatefulSetResource) updatePods(ctx context.Context, l logr.Logger, updateItems []*corev1.Pod, template *corev1.PodTemplateSpec) error { + log := l.WithName("updatePods") + var artificialPod corev1.Pod artificialPod.Annotations = template.Annotations artificialPod.Spec = template.Spec volumes := make(map[string]interface{}) + for i := range template.Spec.Volumes { vol := template.Spec.Volumes[i] volumes[vol.Name] = new(interface{}) } + for i := range updateItems { + pod := updateItems[i] - for i := range podList.Items { - pod := podList.Items[i] - - if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil { + if err := r.podEviction(ctx, pod, &artificialPod, volumes); err != nil { + log.V(logger.DebugLevel).Error(err, "podEviction", "pod name", pod.Name) return err } - if !utils.IsPodReady(&pod) { + if !utils.IsPodReady(pod) { return &RequeueAfterError{ RequeueAfter: RequeueDuration, Msg: fmt.Sprintf("wait for %s pod to become ready", pod.Name), @@ -222,7 +307,7 @@ func (r *StatefulSetResource) rollingUpdate( adminURL := url.URL{ Scheme: "http", - Host: hostOverwrite(&pod, headlessServiceWithPort), + Host: hostOverwrite(pod, headlessServiceWithPort), Path: "metrics", } @@ -242,6 +327,41 @@ func (r *StatefulSetResource) rollingUpdate( } } + log.V(logger.DebugLevel).Info("rollingUpdate completed") + return nil +} + +func (r *StatefulSetResource) listPodsForUpdateOrMaintenance(l logr.Logger, podList *corev1.PodList) (updateItems, maintenanceItems []*corev1.Pod) { + log := l.WithName("listPodsForUpdateOrMaintenance") + + // only roll pods marked with the ClusterUpdate condition + for i := range podList.Items { + pod := &podList.Items[i] + if utils.IsStatusPodConditionTrue(pod.Status.Conditions, ClusterUpdatePodCondition) { + log.V(logger.DebugLevel).Info("pod needs updated", "pod name", pod.GetName()) + updateItems = append(updateItems, pod) + } else { + log.V(logger.DebugLevel).Info("pod needs maintenance check", "pod name", pod.GetName()) + maintenanceItems = append(maintenanceItems, pod) + } + } + return updateItems, maintenanceItems +} + +func (r *StatefulSetResource) checkMaintenanceModeForPods(ctx context.Context, maintenanceItems []*corev1.Pod) error { + for i := range maintenanceItems { + pod := maintenanceItems[i] + ordinal, err := utils.GetPodOrdinal(pod.GetName(), r.pandaCluster.GetName()) + if err != nil { + return fmt.Errorf("cannot convert pod name to ordinal: %w", err) + } + if err = r.checkMaintenanceMode(ctx, ordinal); err != nil { + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("checking maintenance node %q: %v", pod.GetName(), err), + } + } + } return nil } @@ -255,6 +375,7 @@ func hostOverwrite(pod *corev1.Pod, headlessServiceWithPort string) string { } func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error { + log := r.logger.WithName("podEviction").WithValues("pod", pod.GetName(), "namespace", pod.GetNamespace()) opts := []patch.CalculateOption{ patch.IgnoreStatusFields(), ignoreKubernetesTokenVolumeMounts(), @@ -267,39 +388,62 @@ func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPo return err } - var ordinal int64 + var ordinal int32 ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name) if err != nil { return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err) } - if patchResult.IsEmpty() { - if err = r.checkMaintenanceMode(ctx, int32(ordinal)); err != nil { - return &RequeueAfterError{ - RequeueAfter: RequeueDuration, - Msg: fmt.Sprintf("checking maintenance node (%s): %v", pod.Name, err), - } - } - return nil + managedDecommission, err := r.IsManagedDecommission() + if err != nil { + log.Error(err, "not performing a managed decommission") } - if *r.pandaCluster.Spec.Replicas > 1 { - r.logger.Info("Put broker into maintenance mode", + if *r.pandaCluster.Spec.Replicas == 1 { + log.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", "pod-name", pod.Name, "patch", patchResult.Patch) - if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil { - // As maintenance mode can not be easily watched using controller runtime the requeue error - // is always returned. That way a rolling update will not finish when operator waits for - // maintenance mode finished. - return &RequeueAfterError{ - RequeueAfter: RequeueDuration, - Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), - } + + if err = r.Delete(ctx, pod); err != nil { + return fmt.Errorf("unable to remove Redpanda pod: %w", err) } + return nil } + if managedDecommission { + log.Info("managed decommission is set: decommission broker") + var id *int32 + if id, err = r.getBrokerIDForPod(ctx, ordinal); err != nil { + return fmt.Errorf("cannot get broker id for pod: %w", err) + } + r.pandaCluster.SetDecommissionBrokerID(id) - r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", - "pod-name", pod.Name, + if err = r.handleDecommissionInProgress(ctx, log); err != nil { + return err + } + + if err = utils.DeletePodPVCs(ctx, r.Client, pod, log); err != nil { + return fmt.Errorf(`unable to remove VPCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err) + } + + log.Info("deleting pod") + if err = r.Delete(ctx, pod); err != nil { + return fmt.Errorf("unable to remove Redpanda pod: %w", err) + } + + return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} + } + + log.Info("Put broker into maintenance mode", "patch", patchResult.Patch) + if err = r.putInMaintenanceMode(ctx, ordinal); err != nil { + // As maintenance mode can not be easily watched using controller runtime the requeue error + // is always returned. That way a rolling update will not finish when operator waits for + // maintenance mode finished. + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), + } + } + log.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", "patch", patchResult.Patch) if err = r.Delete(ctx, pod); err != nil { @@ -367,8 +511,7 @@ func (r *StatefulSetResource) checkMaintenanceMode(ctx context.Context, ordinal } if br.Maintenance != nil && br.Maintenance.Draining { - r.logger.Info("Disable broker maintenance mode as patch is empty", - "pod-ordinal", ordinal) + r.logger.Info("Disable broker maintenance", "pod-ordinal", ordinal) err = adminAPIClient.DisableMaintenanceMode(ctx, nodeConf.NodeID, false) if err != nil { return fmt.Errorf("disabling maintenance mode: %w", err) @@ -405,8 +548,12 @@ func (r *StatefulSetResource) updateStatefulSet( // shouldUpdate returns true if changes on the CR require update func (r *StatefulSetResource) shouldUpdate( - isRestarting bool, current, modified *appsv1.StatefulSet, + current, modified *appsv1.StatefulSet, ) (bool, error) { + managedDecommission, _ := r.IsManagedDecommission() // we have already error checked and logged the error where necessary + if managedDecommission || r.getRestartingStatus() { + return true, nil + } prepareResourceForPatch(current, modified) opts := []patch.CalculateOption{ patch.IgnoreStatusFields(), @@ -415,16 +562,20 @@ func (r *StatefulSetResource) shouldUpdate( utils.IgnoreAnnotation(CentralizedConfigurationHashAnnotationKey), } patchResult, err := patch.NewPatchMaker(patch.NewAnnotator(redpandaAnnotatorKey), &patch.K8sStrategicMergePatcher{}, &patch.BaseJSONMergePatcher{}).Calculate(current, modified, opts...) - if err != nil { + if err != nil || patchResult.IsEmpty() { return false, err } - return !patchResult.IsEmpty() || isRestarting, nil + return true, nil +} + +func (r *StatefulSetResource) getRestartingStatus() bool { + return r.pandaCluster.Status.IsRestarting() } func (r *StatefulSetResource) updateRestartingStatus( ctx context.Context, restarting bool, ) error { - if !reflect.DeepEqual(restarting, r.pandaCluster.Status.IsRestarting()) { + if !reflect.DeepEqual(restarting, r.getRestartingStatus()) { r.pandaCluster.Status.SetRestarting(restarting) r.logger.Info("Status updated", "restarting", restarting, @@ -437,6 +588,19 @@ func (r *StatefulSetResource) updateRestartingStatus( return nil } +func (r *StatefulSetResource) removeManagedDecommissionAnnotation(ctx context.Context) error { + p := k8sclient.MergeFrom(r.pandaCluster.DeepCopy()) + for k := range r.pandaCluster.Annotations { + if k == ManagedDecommissionAnnotation { + delete(r.pandaCluster.Annotations, k) + } + } + if err := r.Patch(ctx, r.pandaCluster, p); err != nil { + return fmt.Errorf("unable to remove managed decommission annotation from Cluster %q: %w", r.pandaCluster.Name, err) + } + return nil +} + func ignoreExistingVolumes( volumes map[string]interface{}, ) patch.CalculateOption { diff --git a/src/go/k8s/pkg/resources/statefulset_update_test.go b/src/go/k8s/pkg/resources/statefulset_update_test.go index 845ace78fdf0..25edabe0d3ac 100644 --- a/src/go/k8s/pkg/resources/statefulset_update_test.go +++ b/src/go/k8s/pkg/resources/statefulset_update_test.go @@ -72,12 +72,12 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { stsWithAnnotation := sts.DeepCopy() stsWithAnnotation.Spec.Template.Annotations = map[string]string{"test": "test2"} ssres := StatefulSetResource{} - update, err := ssres.shouldUpdate(false, sts, stsWithAnnotation) + update, err := ssres.shouldUpdate(sts, stsWithAnnotation) require.NoError(t, err) require.True(t, update) // same statefulset with same annotation - update, err = ssres.shouldUpdate(false, stsWithAnnotation, stsWithAnnotation) + update, err = ssres.shouldUpdate(stsWithAnnotation, stsWithAnnotation) require.NoError(t, err) require.False(t, update) } diff --git a/src/go/k8s/pkg/utils/kubernetes.go b/src/go/k8s/pkg/utils/kubernetes.go index 66994d42747a..750aea6f8445 100644 --- a/src/go/k8s/pkg/utils/kubernetes.go +++ b/src/go/k8s/pkg/utils/kubernetes.go @@ -29,7 +29,7 @@ func IsPodReady(pod *corev1.Pod) bool { return false } -func GetPodOrdinal(podName, clusterName string) (int64, error) { +func GetPodOrdinal(podName, clusterName string) (int32, error) { // Pod name needs to have at least 2 more characters if len(podName) < len(clusterName)+2 { return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, ErrInvalidInputParameters) @@ -41,5 +41,5 @@ func GetPodOrdinal(podName, clusterName string) (int64, error) { if err != nil { return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err) } - return ordinal, nil + return int32(ordinal), nil } diff --git a/src/go/k8s/pkg/utils/kubernetes_test.go b/src/go/k8s/pkg/utils/kubernetes_test.go index 7d590ebbf313..bb1d20165e04 100644 --- a/src/go/k8s/pkg/utils/kubernetes_test.go +++ b/src/go/k8s/pkg/utils/kubernetes_test.go @@ -23,7 +23,7 @@ func TestGetPodOrdinal(t *testing.T) { podName string clusterName string expectedError bool - expectedOrdinal int64 + expectedOrdinal int32 }{ {"", "", true, -1}, {"test", "", true, -1}, diff --git a/src/go/k8s/pkg/utils/podutils.go b/src/go/k8s/pkg/utils/podutils.go new file mode 100644 index 000000000000..712ac0df4806 --- /dev/null +++ b/src/go/k8s/pkg/utils/podutils.go @@ -0,0 +1,94 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package utils + +import ( + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// SetStatusPodCondition sets the corresponding condition in conditions to newCondition. +// conditions must be non-nil. +// 1. if the condition of the specified type already exists (all fields of the existing condition are updated to +// newCondition, LastTransitionTime is set to now if the new status differs from the old status) +// 2. if a condition of the specified type does not exist (LastTransitionTime is set to now() if unset, and newCondition is appended) +func SetStatusPodCondition(conditions *[]corev1.PodCondition, newCondition *corev1.PodCondition) { + if conditions == nil { + return + } + existingCondition := FindStatusPodCondition(*conditions, newCondition.Type) + if existingCondition == nil { + if newCondition.LastTransitionTime.IsZero() { + newCondition.LastTransitionTime = metav1.NewTime(time.Now()) + } + *conditions = append(*conditions, *newCondition) + return + } + + if existingCondition.Status != newCondition.Status { + existingCondition.Status = newCondition.Status + if !newCondition.LastTransitionTime.IsZero() { + existingCondition.LastTransitionTime = newCondition.LastTransitionTime + } else { + existingCondition.LastTransitionTime = metav1.NewTime(time.Now()) + } + } + + existingCondition.Reason = newCondition.Reason + existingCondition.Message = newCondition.Message +} + +// RemoveStatusPodCondition removes the corresponding conditionType from conditions. +// conditions must be non-nil. +func RemoveStatusPodCondition(conditions *[]corev1.PodCondition, conditionType corev1.PodConditionType) { + if conditions == nil || len(*conditions) == 0 { + return + } + newConditions := make([]corev1.PodCondition, 0, len(*conditions)-1) + for _, condition := range *conditions { + if condition.Type != conditionType { + newConditions = append(newConditions, condition) + } + } + + *conditions = newConditions +} + +func FindStatusPodCondition(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) *corev1.PodCondition { + for i := range conditions { + if conditions[i].Type == conditionType { + return &conditions[i] + } + } + + return nil +} + +// IsStatusPodConditionTrue returns true when the conditionType is present and set to `metav1.ConditionTrue` +func IsStatusPodConditionTrue(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) bool { + return IsStatusPodConditionPresentAndEqual(conditions, conditionType, corev1.ConditionTrue) +} + +// IsStatusPodConditionFalse returns true when the conditionType is present and set to `metav1.ConditionFalse` +func IsStatusPodConditionFalse(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) bool { + return IsStatusPodConditionPresentAndEqual(conditions, conditionType, corev1.ConditionFalse) +} + +// IsStatusPodConditionPresentAndEqual returns true when conditionType is present and equal to status. +func IsStatusPodConditionPresentAndEqual(conditions []corev1.PodCondition, conditionType corev1.PodConditionType, status corev1.ConditionStatus) bool { + for _, condition := range conditions { + if condition.Type == conditionType { + return condition.Status == status + } + } + return false +} From 274393422cedf071054b6d018e420670400bdec6 Mon Sep 17 00:00:00 2001 From: Joe Julian Date: Wed, 7 Jun 2023 17:13:38 -0700 Subject: [PATCH 3/4] fix TestShouldUpdate_AnnotationChange for new requirements --- src/go/k8s/pkg/resources/statefulset_update_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/go/k8s/pkg/resources/statefulset_update_test.go b/src/go/k8s/pkg/resources/statefulset_update_test.go index 25edabe0d3ac..7ada89c486c6 100644 --- a/src/go/k8s/pkg/resources/statefulset_update_test.go +++ b/src/go/k8s/pkg/resources/statefulset_update_test.go @@ -71,7 +71,13 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { } stsWithAnnotation := sts.DeepCopy() stsWithAnnotation.Spec.Template.Annotations = map[string]string{"test": "test2"} - ssres := StatefulSetResource{} + ssres := StatefulSetResource{ + pandaCluster: &vectorizedv1alpha1.Cluster{ + Status: vectorizedv1alpha1.ClusterStatus{ + Restarting: false, + }, + }, + } update, err := ssres.shouldUpdate(sts, stsWithAnnotation) require.NoError(t, err) require.True(t, update) From 020137bb67ed5425d7e65d5f10104084964db7f0 Mon Sep 17 00:00:00 2001 From: Joe Julian Date: Tue, 1 Aug 2023 11:20:15 -0700 Subject: [PATCH 4/4] add e2e test for managed-decommission feature --- src/go/k8s/main.go | 2 +- src/go/k8s/pkg/resources/statefulset_scale.go | 6 +- src/go/k8s/pkg/resources/statefulset_test.go | 74 +++++++++++++++++++ .../k8s/pkg/resources/statefulset_update.go | 10 ++- src/go/k8s/pkg/utils/podutils.go | 33 +++++++++ .../e2e/managed-decommission/00-assert.yaml | 23 ++++++ .../00-redpanda-cluster.yaml | 29 ++++++++ .../e2e/managed-decommission/01-assert.yaml | 23 ++++++ .../01-schedule-decommission.yaml | 6 ++ .../e2e/managed-decommission/02-assert.yaml | 18 +++++ .../e2e/managed-decommission/02-probe.yaml | 30 ++++++++ .../e2e/managed-decommission/03-clean.yaml | 19 +++++ .../tests/e2e/managed-decommission/README.txt | 5 ++ 13 files changed, 272 insertions(+), 6 deletions(-) create mode 100644 src/go/k8s/tests/e2e/managed-decommission/00-assert.yaml create mode 100644 src/go/k8s/tests/e2e/managed-decommission/00-redpanda-cluster.yaml create mode 100644 src/go/k8s/tests/e2e/managed-decommission/01-assert.yaml create mode 100644 src/go/k8s/tests/e2e/managed-decommission/01-schedule-decommission.yaml create mode 100644 src/go/k8s/tests/e2e/managed-decommission/02-assert.yaml create mode 100644 src/go/k8s/tests/e2e/managed-decommission/02-probe.yaml create mode 100644 src/go/k8s/tests/e2e/managed-decommission/03-clean.yaml create mode 100644 src/go/k8s/tests/e2e/managed-decommission/README.txt diff --git a/src/go/k8s/main.go b/src/go/k8s/main.go index 14462ac0e4e8..052c56deb58a 100644 --- a/src/go/k8s/main.go +++ b/src/go/k8s/main.go @@ -122,7 +122,7 @@ func main() { flag.StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy") flag.DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster") flag.DurationVar(&metricsTimeout, "metrics-timeout", 8*time.Second, "Set the timeout for a checking metrics Admin API endpoint. If set to 0, then the 2 seconds default will be used") - flag.BoolVar(&vectorizedv1alpha1.AllowDownscalingInWebhook, "allow-downscaling", true, "Allow to reduce the number of replicas in existing clusters (alpha feature)") + flag.BoolVar(&vectorizedv1alpha1.AllowDownscalingInWebhook, "allow-downscaling", true, "Allow to reduce the number of replicas in existing clusters") flag.BoolVar(&allowPVCDeletion, "allow-pvc-deletion", false, "Allow the operator to delete PVCs for Pods assigned to failed or missing Nodes (alpha feature)") flag.BoolVar(&vectorizedv1alpha1.AllowConsoleAnyNamespace, "allow-console-any-ns", false, "Allow to create Console in any namespace. Allowing this copies Redpanda SchemaRegistry TLS Secret to namespace (alpha feature)") flag.StringVar(&restrictToRedpandaVersion, "restrict-redpanda-version", "", "Restrict management of clusters to those with this version") diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index a335b0dc9303..309d2203da79 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -156,7 +156,7 @@ func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context, log.WithValues("node_id", r.pandaCluster.GetDecommissionBrokerID()).Info("cannot recommission broker", "error", err) } // handleDecommission will return an error until the decommission is completed - if err := r.handleDecommission(ctx); err != nil { + if err := r.handleDecommission(ctx, log); err != nil { return err } @@ -192,12 +192,12 @@ func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context, // // Before completing the process, it double-checks if the node is still not registered, for handling cases where the node was // about to start when the decommissioning process started. If the broker is found, the process is restarted. -func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { +func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Logger) error { brokerID := r.pandaCluster.GetDecommissionBrokerID() if brokerID == nil { return nil } - log := r.logger.WithName("handleDecommission").WithValues("node_id", *brokerID) + log := l.WithName("handleDecommission").WithValues("node_id", *brokerID) log.Info("handling broker decommissioning") adminAPI, err := r.getAdminAPIClient(ctx) diff --git a/src/go/k8s/pkg/resources/statefulset_test.go b/src/go/k8s/pkg/resources/statefulset_test.go index b07ad778de92..14ebadccb028 100644 --- a/src/go/k8s/pkg/resources/statefulset_test.go +++ b/src/go/k8s/pkg/resources/statefulset_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/go-logr/logr" "github.com/stretchr/testify/assert" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -624,3 +625,76 @@ func Test_GetBrokerIDForPodFromPodList(t *testing.T) { }) } } + +func TestStatefulSetResource_IsManagedDecommission(t *testing.T) { + type fields struct { + pandaCluster *vectorizedv1alpha1.Cluster + logger logr.Logger + } + tests := []struct { + name string + fields fields + want bool + wantErr bool + }{ + { + name: "decommission annotation is in the future", + fields: fields{ + pandaCluster: &vectorizedv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + resources.ManagedDecommissionAnnotation: "2999-12-31T00:00:00Z", + }, + }, + }, + }, + want: true, + wantErr: false, + }, + { + name: "decommission annotation is in the past", + fields: fields{ + pandaCluster: &vectorizedv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + resources.ManagedDecommissionAnnotation: "1999-12-31T00:00:00Z", + }, + }, + }, + }, + want: false, + wantErr: false, + }, + { + name: "decommission annotation is not a valid timestamp", + fields: fields{ + pandaCluster: &vectorizedv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + resources.ManagedDecommissionAnnotation: "true", + }, + }, + }, + }, + want: false, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := resources.NewStatefulSet(nil, + tt.fields.pandaCluster, + nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour, + tt.fields.logger, + time.Hour) + got, err := r.IsManagedDecommission() + if (err != nil) != tt.wantErr { + t.Errorf("StatefulSetResource.IsManagedDecommission() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("StatefulSetResource.IsManagedDecommission() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index 16b15f04a88c..889face1f2d6 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -132,6 +132,9 @@ func (r *StatefulSetResource) runUpdate( if err = r.updateRestartingStatus(ctx, false); err != nil { return fmt.Errorf("unable to turn off restarting status in cluster custom resource: %w", err) } + if err = r.removeManagedDecommissionAnnotation(ctx); err != nil { + return fmt.Errorf("unable to remove managed decommission annotation: %w", err) + } return nil } @@ -417,7 +420,7 @@ func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPo } r.pandaCluster.SetDecommissionBrokerID(id) - if err = r.handleDecommissionInProgress(ctx, log); err != nil { + if err = r.handleDecommission(ctx, log); err != nil { return err } @@ -550,7 +553,10 @@ func (r *StatefulSetResource) updateStatefulSet( func (r *StatefulSetResource) shouldUpdate( current, modified *appsv1.StatefulSet, ) (bool, error) { - managedDecommission, _ := r.IsManagedDecommission() // we have already error checked and logged the error where necessary + managedDecommission, err := r.IsManagedDecommission() + if err != nil { + r.logger.WithName("shouldUpdate").Error(err, "isManagedDecommission") + } if managedDecommission || r.getRestartingStatus() { return true, nil } diff --git a/src/go/k8s/pkg/utils/podutils.go b/src/go/k8s/pkg/utils/podutils.go index 712ac0df4806..42f18b32799c 100644 --- a/src/go/k8s/pkg/utils/podutils.go +++ b/src/go/k8s/pkg/utils/podutils.go @@ -10,10 +10,16 @@ package utils import ( + "context" + "fmt" "time" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) // SetStatusPodCondition sets the corresponding condition in conditions to newCondition. @@ -92,3 +98,30 @@ func IsStatusPodConditionPresentAndEqual(conditions []corev1.PodCondition, condi } return false } + +func DeletePodPVCs(ctx context.Context, c client.Client, pod *corev1.Pod, l logr.Logger) error { + log := l.WithName("DeletePodPVCs") + // delete the associated pvc + pvc := corev1.PersistentVolumeClaim{} + for i := range pod.Spec.Volumes { + v := &pod.Spec.Volumes[i] + if v.PersistentVolumeClaim != nil { + key := types.NamespacedName{ + Name: v.PersistentVolumeClaim.ClaimName, + Namespace: pod.GetNamespace(), + } + err := c.Get(ctx, key, &pvc) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf(`unable to fetch PersistentVolumeClaim "%s/%s": %w`, key.Namespace, key.Name, err) + } + continue + } + log.WithValues("persistent-volume-claim", key).Info("deleting PersistentVolumeClaim") + if err := c.Delete(ctx, &pvc, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf(`unable to delete PersistentVolumeClaim "%s/%s": %w`, key.Name, key.Namespace, err) + } + } + } + return nil +} diff --git a/src/go/k8s/tests/e2e/managed-decommission/00-assert.yaml b/src/go/k8s/tests/e2e/managed-decommission/00-assert.yaml new file mode 100644 index 000000000000..cfb2fd69df22 --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/00-assert.yaml @@ -0,0 +1,23 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: managed-decommission +status: + replicas: 3 + currentReplicas: 3 + readyReplicas: 3 + conditions: + - type: ClusterConfigured + status: "True" +--- +apiVersion: v1 +kind: Pod +metadata: + name: managed-decommission-0 + annotations: + operator.redpanda.com/node-id: "0" +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - command: ../../../hack/get-redpanda-info.sh diff --git a/src/go/k8s/tests/e2e/managed-decommission/00-redpanda-cluster.yaml b/src/go/k8s/tests/e2e/managed-decommission/00-redpanda-cluster.yaml new file mode 100644 index 000000000000..03adbdddb549 --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/00-redpanda-cluster.yaml @@ -0,0 +1,29 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: managed-decommission +spec: + image: "localhost/redpanda" + version: "dev" + replicas: 3 + resources: + requests: + cpu: 1 + memory: 1Gi + limits: + cpu: 1 + memory: 1Gi + configuration: + rpcServer: + port: 33145 + kafkaApi: + - port: 9092 + adminApi: + - port: 9644 + pandaproxyApi: + - port: 8082 + developerMode: true + additionalCommandlineArguments: + dump-memory-diagnostics-on-alloc-failure-kind: all + abort-on-seastar-bad-alloc: '' + reserve-memory: 100M diff --git a/src/go/k8s/tests/e2e/managed-decommission/01-assert.yaml b/src/go/k8s/tests/e2e/managed-decommission/01-assert.yaml new file mode 100644 index 000000000000..b491d8ce98ce --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/01-assert.yaml @@ -0,0 +1,23 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: managed-decommission +status: + replicas: 3 + currentReplicas: 3 + readyReplicas: 3 + conditions: + - type: ClusterConfigured + status: "True" +--- +apiVersion: v1 +kind: Pod +metadata: + name: managed-decommission-2 + annotations: + operator.redpanda.com/node-id: "5" +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - command: ../../../hack/get-redpanda-info.sh diff --git a/src/go/k8s/tests/e2e/managed-decommission/01-schedule-decommission.yaml b/src/go/k8s/tests/e2e/managed-decommission/01-schedule-decommission.yaml new file mode 100644 index 000000000000..e905bce6a3ac --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/01-schedule-decommission.yaml @@ -0,0 +1,6 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: managed-decommission + annotations: + operator.redpanda.com/managed-decommission: "2999-12-31T00:00:00Z" \ No newline at end of file diff --git a/src/go/k8s/tests/e2e/managed-decommission/02-assert.yaml b/src/go/k8s/tests/e2e/managed-decommission/02-assert.yaml new file mode 100644 index 000000000000..596a7294e53d --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/02-assert.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + job-name: get-broker-count +status: + containerStatuses: + - name: curl + state: + terminated: + exitCode: 0 + reason: Completed + phase: Succeeded +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - command: ../../../hack/get-redpanda-info.sh diff --git a/src/go/k8s/tests/e2e/managed-decommission/02-probe.yaml b/src/go/k8s/tests/e2e/managed-decommission/02-probe.yaml new file mode 100644 index 000000000000..9a0c679e08ed --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/02-probe.yaml @@ -0,0 +1,30 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: get-broker-count +spec: + backoffLimit: 10 + template: + spec: + activeDeadlineSeconds: 90 + containers: + - name: curl + image: apteno/alpine-jq:latest + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + command: + - /bin/sh + - -c + - -ex + args: + - | + url=http://managed-decommission-0.managed-decommission.$NAMESPACE.svc.cluster.local:9644/v1/brokers + res=$(curl --silent -L $url | jq '. | length') + + if [[ "$res" != "3" ]]; then + exit 1; + fi + restartPolicy: Never diff --git a/src/go/k8s/tests/e2e/managed-decommission/03-clean.yaml b/src/go/k8s/tests/e2e/managed-decommission/03-clean.yaml new file mode 100644 index 000000000000..eb5454696bae --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/03-clean.yaml @@ -0,0 +1,19 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +delete: + - apiVersion: redpanda.vectorized.io/v1alpha1 + kind: Cluster + name: managed-decommission + namespace: redpanda-system + - apiVersion: v1 + kind: PersistentVolumeClaim + name: datadir-decommission-0 + namespace: redpanda-system + - apiVersion: v1 + kind: PersistentVolumeClaim + name: datadir-decommission-1 + namespace: redpanda-system + - apiVersion: v1 + kind: PersistentVolumeClaim + name: datadir-decommission-2 + namespace: redpanda-system diff --git a/src/go/k8s/tests/e2e/managed-decommission/README.txt b/src/go/k8s/tests/e2e/managed-decommission/README.txt new file mode 100644 index 000000000000..c71533379fdc --- /dev/null +++ b/src/go/k8s/tests/e2e/managed-decommission/README.txt @@ -0,0 +1,5 @@ +This test + +0. creates a 3 node Redpanda cluster +1. triggers a planned decommission +2. check that the pods were rolled \ No newline at end of file