From 287d16258cbaf9e882f7ac51985c27d6d87eabbb Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 25 Sep 2024 17:15:01 +0300 Subject: [PATCH] Allow update of CassandraDatacenter to be processed to StatefulSets if the Datacenter is running in unhealthy Kubernetes configuration. These could be scheduling related or pods crashing --- CHANGELOG.md | 1 + .../cassandradatacenter_controller_test.go | 3 +- pkg/reconciliation/reconcile_racks.go | 133 ++++++++++++++++-- pkg/reconciliation/reconcile_racks_test.go | 29 ++-- 4 files changed, 140 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b993ffc..21ff1dd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti ## unreleased +* [FEATURE] [#583](https://github.com/k8ssandra/cass-operator/issues/583) If there are pods in failed state (CrashLoopBackOff, ImagePullBackOff or ErrImagePull), restartCount of a container/initContainer is more than zero with termination code >0 or we have a SchedulingFailed event, allow StatefulSet updates even if previous ones haven't been rolled yet. ForceUpgradeRacks will no longer remove itself from the CassandraDatacenter to prevent self modifying Spec. * [FEATURE] [#651](https://github.com/k8ssandra/cass-operator/issues/651) Add tsreload task for DSE deployments and ability to check if sync operation is available on the mgmt-api side ## v1.22.4 diff --git a/internal/controllers/cassandra/cassandradatacenter_controller_test.go b/internal/controllers/cassandra/cassandradatacenter_controller_test.go index 2b0fe192..f172a7c2 100644 --- a/internal/controllers/cassandra/cassandradatacenter_controller_test.go +++ b/internal/controllers/cassandra/cassandradatacenter_controller_test.go @@ -155,8 +155,9 @@ var _ = Describe("CassandraDatacenter tests", func() { refreshDatacenter(ctx, &dc) By("Updating the size to 3") + patch := client.MergeFrom(dc.DeepCopy()) dc.Spec.Size = 3 - Expect(k8sClient.Update(ctx, &dc)).To(Succeed()) + Expect(k8sClient.Patch(ctx, &dc, patch)).To(Succeed()) waitForDatacenterCondition(ctx, dcName, cassdcapi.DatacenterScalingUp, corev1.ConditionTrue).Should(Succeed()) waitForDatacenterProgress(ctx, dcName, cassdcapi.ProgressUpdating).Should(Succeed()) diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index 2d79cb1e..042ccf63 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -167,6 +168,76 @@ func (rc *ReconciliationContext) CheckRackCreation() result.ReconcileResult { return result.Continue() } +func (rc *ReconciliationContext) failureModeDetection() bool { + // TODO Even if these are true, we shouldn't allow update if we have a pod starting (that hasn't crashed yet) + + // First check - do we even need a force? + // We can check if StatefulSet was updated, but that wouldn't tell us if there's crashing pods + for _, pod := range rc.dcPods { + if pod.Status.Phase == corev1.PodPending { + if hasBeenXMinutes(5, pod.Status.StartTime.Time) { + // Pod has been over 5 minutes in Pending state. This can be normal, but lets see + // if we have some detected failures events like FailedScheduling + + events := &corev1.EventList{} + if err := rc.Client.List(rc.Ctx, events, &client.ListOptions{Namespace: pod.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{"involvedObject.name": pod.Name})}); err != nil { + rc.ReqLogger.Error(err, "error getting events for pod", "pod", pod.Name) + return false + } + + for _, event := range events.Items { + if event.Reason == "FailedScheduling" { + // We have a failed scheduling event + return true + } + } + } + } + + // Pod could also be running / terminated, we need to find if any container is in crashing state + // Sadly, this state is ephemeral, so it can change between reconciliations + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.State.Waiting != nil { + waitingReason := containerStatus.State.Waiting.Reason + if waitingReason == "CrashLoopBackOff" || + waitingReason == "ImagePullBackOff" || + waitingReason == "ErrImagePull" { + // We have a container in a failing state + return true + } + } + if containerStatus.RestartCount > 2 { + if containerStatus.State.Terminated != nil { + if containerStatus.State.Terminated.ExitCode != 0 { + return true + } + } + } + } + // Check the same for initContainers + for _, containerStatus := range pod.Status.InitContainerStatuses { + if containerStatus.State.Waiting != nil { + waitingReason := containerStatus.State.Waiting.Reason + if waitingReason == "CrashLoopBackOff" || + waitingReason == "ImagePullBackOff" || + waitingReason == "ErrImagePull" { + // We have a container in a failing state + return true + } + } + if containerStatus.RestartCount > 2 { + if containerStatus.State.Terminated != nil { + if containerStatus.State.Terminated.ExitCode != 0 { + return true + } + } + } + } + } + + return false +} + func (rc *ReconciliationContext) UpdateAllowed() bool { // HasAnnotation might require also checking if it's "once / always".. or then we need to validate those allowed values in the webhook return rc.Datacenter.GenerationChanged() || metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.UpdateAllowedAnnotation) @@ -301,13 +372,21 @@ func (rc *ReconciliationContext) CheckVolumeClaimSizes(statefulSet, desiredSts * return result.Continue() } -func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { +func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.ReconcileResult { logger := rc.ReqLogger dc := rc.Datacenter logger.Info("reconcile_racks::CheckRackPodTemplate") for idx := range rc.desiredRackInformation { + rackName := rc.desiredRackInformation[idx].RackName + if force { + forceRacks := dc.Spec.ForceUpgradeRacks + if utils.IndexOfString(forceRacks, rackName) <= 0 { + continue + } + } + if dc.Spec.CanaryUpgrade && idx > 0 { logger. WithValues("rackName", rackName). @@ -324,7 +403,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { updatedReplicas = status.CurrentReplicas + status.UpdatedReplicas } - if statefulSet.Generation != status.ObservedGeneration || + if !force && statefulSet.Generation != status.ObservedGeneration || status.Replicas != status.ReadyReplicas || status.Replicas != updatedReplicas { @@ -358,7 +437,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { return result.Error(err) } - if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && !rc.UpdateAllowed() { + if !force && !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && !rc.UpdateAllowed() { logger. WithValues("rackName", rackName). Info("update is blocked, but statefulset needs an update. Marking datacenter as requiring update.") @@ -370,7 +449,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { return result.Continue() } - if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && rc.UpdateAllowed() { + if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && (force || rc.UpdateAllowed()) { logger. WithValues("rackName", rackName). Info("statefulset needs an update") @@ -398,7 +477,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { desiredSts.DeepCopyInto(statefulSet) rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack, - "Updating rack %s", rackName) + "Updating rack %s", rackName, "force", force) if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil { return result.Error(err) @@ -424,13 +503,17 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { } } - if err := rc.enableQuietPeriod(20); err != nil { - logger.Error( - err, - "Error when enabling quiet period") - return result.Error(err) + if !force { + if err := rc.enableQuietPeriod(20); err != nil { + logger.Error( + err, + "Error when enabling quiet period") + return result.Error(err) + } } + // TODO Do we really want to modify spec here? + // we just updated k8s and pods will be knocked out of ready state, so let k8s // call us back when these changes are done and the new pods are back to ready return result.Done() @@ -441,6 +524,33 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult { return result.Continue() } +/* + TODO An idea.. if the startNode phase is failing due to a Pod being unable to start (or get ready?), we could + make that as a state for CheckRackForceUpgrade to be allowed. + + TODO Also, verify this code is close to the CheckRackPodTemplate() code or even merge those two if at all possible at this stage, + given that so much time has passed since the original comment. +*/ + +func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult { + dc := rc.Datacenter + logger := rc.ReqLogger + logger.Info("starting CheckRackForceUpgrade()") + + forceRacks := dc.Spec.ForceUpgradeRacks + if len(forceRacks) == 0 { + return result.Continue() + } + + // Datacenter configuration isn't healthy, we allow upgrades here before pods start + if rc.failureModeDetection() { + return rc.CheckRackPodTemplate(true) + } + + return rc.CheckRackPodTemplate(true) +} + +/* func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult { // This code is *very* similar to CheckRackPodTemplate(), but it's not an exact // copy. Some 3 to 5 line parts could maybe be extracted into functions. @@ -522,6 +632,7 @@ func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult logger.Info("done CheckRackForceUpgrade()") return result.Done() } +*/ func (rc *ReconciliationContext) deleteStatefulSet(statefulSet *appsv1.StatefulSet) error { policy := metav1.DeletePropagationOrphan @@ -2511,7 +2622,7 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } - if recResult := rc.CheckRackPodTemplate(); recResult.Completed() { + if recResult := rc.CheckRackPodTemplate(false); recResult.Completed() { return recResult.Output() } diff --git a/pkg/reconciliation/reconcile_racks_test.go b/pkg/reconciliation/reconcile_racks_test.go index bb53bd67..bb939f22 100644 --- a/pkg/reconciliation/reconcile_racks_test.go +++ b/pkg/reconciliation/reconcile_racks_test.go @@ -6,7 +6,6 @@ package reconciliation import ( "context" "fmt" - "github.com/pkg/errors" "io" "net/http" "reflect" @@ -16,6 +15,8 @@ import ( "testing" "time" + "github.com/pkg/errors" + "k8s.io/utils/ptr" api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" @@ -287,7 +288,7 @@ func TestCheckRackPodTemplate_SetControllerRefOnStatefulSet(t *testing.T) { } rc.Datacenter.Spec.PodTemplateSpec = podTemplateSpec - result = rc.CheckRackPodTemplate() + result = rc.CheckRackPodTemplate(false) assert.True(t, result.Completed()) assert.Equal(t, 1, invocations) @@ -314,7 +315,7 @@ func TestCheckRackPodTemplate_CanaryUpgrade(t *testing.T) { t.Fatalf("failed to add rack to cassandradatacenter: %s", err) } - result = rc.CheckRackPodTemplate() + result = rc.CheckRackPodTemplate(false) _, err := result.Output() assert.True(t, result.Completed()) @@ -325,7 +326,7 @@ func TestCheckRackPodTemplate_CanaryUpgrade(t *testing.T) { rc.Datacenter.Spec.ServerVersion = "6.8.44" partition := rc.Datacenter.Spec.CanaryUpgradeCount - result = rc.CheckRackPodTemplate() + result = rc.CheckRackPodTemplate(false) _, err = result.Output() assert.True(t, result.Completed()) @@ -354,7 +355,7 @@ func TestCheckRackPodTemplate_CanaryUpgrade(t *testing.T) { rc.Datacenter.Spec.CanaryUpgrade = false - result = rc.CheckRackPodTemplate() + result = rc.CheckRackPodTemplate(false) assert.True(t, result.Completed()) assert.NotEqual(t, expectedStrategy, rc.statefulSets[0].Spec.UpdateStrategy) } @@ -373,7 +374,7 @@ func TestCheckRackPodTemplate_GenerationCheck(t *testing.T) { rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation rc.Datacenter.Spec.ServerVersion = "6.8.44" - res = rc.CheckRackPodTemplate() + res = rc.CheckRackPodTemplate(false) assert.Equal(result.Continue(), res) cond, found := rc.Datacenter.GetCondition(api.DatacenterRequiresUpdate) assert.True(found) @@ -390,7 +391,7 @@ func TestCheckRackPodTemplate_GenerationCheck(t *testing.T) { metav1.SetMetaDataAnnotation(&rc.Datacenter.ObjectMeta, api.UpdateAllowedAnnotation, string(api.AllowUpdateAlways)) rc.Datacenter.Spec.ServerVersion = "6.8.44" // This needs to be reapplied, since we call Patch in the CheckRackPodTemplate() - res = rc.CheckRackPodTemplate() + res = rc.CheckRackPodTemplate(false) assert.True(res.Completed()) } @@ -443,7 +444,7 @@ func TestCheckRackPodTemplate_TemplateLabels(t *testing.T) { rc.statefulSets = make([]*appsv1.StatefulSet, len(rackInfo)) rc.statefulSets[0] = desiredStatefulSet - res := rc.CheckRackPodTemplate() + res := rc.CheckRackPodTemplate(false) require.Equal(result.Done(), res) rc.statefulSets[0].Status.ObservedGeneration = rc.statefulSets[0].Generation @@ -454,7 +455,7 @@ func TestCheckRackPodTemplate_TemplateLabels(t *testing.T) { // Now update the template and verify that the StatefulSet is updated rc.Datacenter.Spec.PodTemplateSpec.ObjectMeta.Labels["foo2"] = "baz" rc.Datacenter.Generation++ - res = rc.CheckRackPodTemplate() + res = rc.CheckRackPodTemplate(false) require.Equal(result.Done(), res) sts = &appsv1.StatefulSet{} @@ -2689,7 +2690,7 @@ func TestCheckRackPodTemplateWithVolumeExpansion(t *testing.T) { res := rc.CheckRackCreation() require.False(res.Completed(), "CheckRackCreation did not complete as expected") - require.Equal(result.Continue(), rc.CheckRackPodTemplate()) + require.Equal(result.Continue(), rc.CheckRackPodTemplate(false)) metav1.SetMetaDataAnnotation(&rc.Datacenter.ObjectMeta, api.AllowStorageChangesAnnotation, "true") require.NoError(rc.Client.Update(rc.Ctx, rc.Datacenter)) @@ -2713,11 +2714,11 @@ func TestCheckRackPodTemplateWithVolumeExpansion(t *testing.T) { require.NoError(rc.Client.Create(rc.Ctx, pvc)) } - require.Equal(result.Continue(), rc.CheckRackPodTemplate()) + require.Equal(result.Continue(), rc.CheckRackPodTemplate(false)) rc.Datacenter.Spec.StorageConfig.CassandraDataVolumeClaimSpec.Resources.Requests = map[corev1.ResourceName]resource.Quantity{corev1.ResourceStorage: resource.MustParse("2Gi")} require.NoError(rc.Client.Update(rc.Ctx, rc.Datacenter)) - res = rc.CheckRackPodTemplate() + res = rc.CheckRackPodTemplate(false) _, err := res.Output() require.EqualError(err, "PVC resize requested, but StorageClass standard does not support expansion", "We should have an error, storageClass does not support expansion") @@ -2727,14 +2728,14 @@ func TestCheckRackPodTemplateWithVolumeExpansion(t *testing.T) { storageClass.AllowVolumeExpansion = ptr.To[bool](true) require.NoError(rc.Client.Update(rc.Ctx, storageClass)) - res = rc.CheckRackPodTemplate() + res = rc.CheckRackPodTemplate(false) require.Equal(result.Done(), res, "Recreating StS should throw us to silence period") require.NoError(rc.Client.Get(rc.Ctx, nsName, sts)) require.Equal(resource.MustParse("2Gi"), sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests[corev1.ResourceStorage]) // The fakeClient behavior does not prevent us from modifying the StS fields, so this test behaves unlike real world in that sense - res = rc.CheckRackPodTemplate() + res = rc.CheckRackPodTemplate(false) require.Equal(result.Continue(), res, "Recreating StS should throw us to silence period") }