Skip to content

Commit

Permalink
Allow update of CassandraDatacenter to be processed to StatefulSets i…
Browse files Browse the repository at this point in the history
…f the Datacenter is running in unhealthy Kubernetes configuration. These could be scheduling related or pods crashing
  • Loading branch information
burmanm committed Sep 25, 2024
1 parent e45e7e9 commit 287d162
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti

## unreleased

* [FEATURE] [#583](https://github.com/k8ssandra/cass-operator/issues/583) If there are pods in failed state (CrashLoopBackOff, ImagePullBackOff or ErrImagePull), restartCount of a container/initContainer is more than zero with termination code >0 or we have a SchedulingFailed event, allow StatefulSet updates even if previous ones haven't been rolled yet. ForceUpgradeRacks will no longer remove itself from the CassandraDatacenter to prevent self modifying Spec.
* [FEATURE] [#651](https://github.com/k8ssandra/cass-operator/issues/651) Add tsreload task for DSE deployments and ability to check if sync operation is available on the mgmt-api side

## v1.22.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ var _ = Describe("CassandraDatacenter tests", func() {
refreshDatacenter(ctx, &dc)

By("Updating the size to 3")
patch := client.MergeFrom(dc.DeepCopy())
dc.Spec.Size = 3
Expect(k8sClient.Update(ctx, &dc)).To(Succeed())
Expect(k8sClient.Patch(ctx, &dc, patch)).To(Succeed())

waitForDatacenterCondition(ctx, dcName, cassdcapi.DatacenterScalingUp, corev1.ConditionTrue).Should(Succeed())
waitForDatacenterProgress(ctx, dcName, cassdcapi.ProgressUpdating).Should(Succeed())
Expand Down
133 changes: 122 additions & 11 deletions pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -167,6 +168,76 @@ func (rc *ReconciliationContext) CheckRackCreation() result.ReconcileResult {
return result.Continue()
}

func (rc *ReconciliationContext) failureModeDetection() bool {
// TODO Even if these are true, we shouldn't allow update if we have a pod starting (that hasn't crashed yet)

// First check - do we even need a force?
// We can check if StatefulSet was updated, but that wouldn't tell us if there's crashing pods
for _, pod := range rc.dcPods {
if pod.Status.Phase == corev1.PodPending {
if hasBeenXMinutes(5, pod.Status.StartTime.Time) {
// Pod has been over 5 minutes in Pending state. This can be normal, but lets see
// if we have some detected failures events like FailedScheduling

events := &corev1.EventList{}
if err := rc.Client.List(rc.Ctx, events, &client.ListOptions{Namespace: pod.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{"involvedObject.name": pod.Name})}); err != nil {
rc.ReqLogger.Error(err, "error getting events for pod", "pod", pod.Name)
return false
}

for _, event := range events.Items {
if event.Reason == "FailedScheduling" {
// We have a failed scheduling event
return true
}
}
}
}

// Pod could also be running / terminated, we need to find if any container is in crashing state
// Sadly, this state is ephemeral, so it can change between reconciliations
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.State.Waiting != nil {
waitingReason := containerStatus.State.Waiting.Reason
if waitingReason == "CrashLoopBackOff" ||
waitingReason == "ImagePullBackOff" ||
waitingReason == "ErrImagePull" {
// We have a container in a failing state
return true
}
}
if containerStatus.RestartCount > 2 {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode != 0 {
return true
}
}
}
}
// Check the same for initContainers
for _, containerStatus := range pod.Status.InitContainerStatuses {
if containerStatus.State.Waiting != nil {
waitingReason := containerStatus.State.Waiting.Reason
if waitingReason == "CrashLoopBackOff" ||
waitingReason == "ImagePullBackOff" ||
waitingReason == "ErrImagePull" {
// We have a container in a failing state
return true
}
}
if containerStatus.RestartCount > 2 {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode != 0 {
return true
}
}
}
}
}

return false
}

func (rc *ReconciliationContext) UpdateAllowed() bool {
// HasAnnotation might require also checking if it's "once / always".. or then we need to validate those allowed values in the webhook
return rc.Datacenter.GenerationChanged() || metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.UpdateAllowedAnnotation)
Expand Down Expand Up @@ -301,13 +372,21 @@ func (rc *ReconciliationContext) CheckVolumeClaimSizes(statefulSet, desiredSts *
return result.Continue()
}

func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.ReconcileResult {
logger := rc.ReqLogger
dc := rc.Datacenter
logger.Info("reconcile_racks::CheckRackPodTemplate")

for idx := range rc.desiredRackInformation {

rackName := rc.desiredRackInformation[idx].RackName
if force {
forceRacks := dc.Spec.ForceUpgradeRacks
if utils.IndexOfString(forceRacks, rackName) <= 0 {
continue
}
}

if dc.Spec.CanaryUpgrade && idx > 0 {
logger.
WithValues("rackName", rackName).
Expand All @@ -324,7 +403,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
updatedReplicas = status.CurrentReplicas + status.UpdatedReplicas
}

if statefulSet.Generation != status.ObservedGeneration ||
if !force && statefulSet.Generation != status.ObservedGeneration ||
status.Replicas != status.ReadyReplicas ||
status.Replicas != updatedReplicas {

Expand Down Expand Up @@ -358,7 +437,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
return result.Error(err)
}

if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && !rc.UpdateAllowed() {
if !force && !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && !rc.UpdateAllowed() {
logger.
WithValues("rackName", rackName).
Info("update is blocked, but statefulset needs an update. Marking datacenter as requiring update.")
Expand All @@ -370,7 +449,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
return result.Continue()
}

if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && rc.UpdateAllowed() {
if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && (force || rc.UpdateAllowed()) {
logger.
WithValues("rackName", rackName).
Info("statefulset needs an update")
Expand Down Expand Up @@ -398,7 +477,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
desiredSts.DeepCopyInto(statefulSet)

rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack,
"Updating rack %s", rackName)
"Updating rack %s", rackName, "force", force)

if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil {
return result.Error(err)
Expand All @@ -424,13 +503,17 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
}
}

if err := rc.enableQuietPeriod(20); err != nil {
logger.Error(
err,
"Error when enabling quiet period")
return result.Error(err)
if !force {
if err := rc.enableQuietPeriod(20); err != nil {
logger.Error(
err,
"Error when enabling quiet period")
return result.Error(err)
}
}

// TODO Do we really want to modify spec here?

// we just updated k8s and pods will be knocked out of ready state, so let k8s
// call us back when these changes are done and the new pods are back to ready
return result.Done()
Expand All @@ -441,6 +524,33 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
return result.Continue()
}

/*
TODO An idea.. if the startNode phase is failing due to a Pod being unable to start (or get ready?), we could
make that as a state for CheckRackForceUpgrade to be allowed.
TODO Also, verify this code is close to the CheckRackPodTemplate() code or even merge those two if at all possible at this stage,
given that so much time has passed since the original comment.
*/

func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult {
dc := rc.Datacenter
logger := rc.ReqLogger
logger.Info("starting CheckRackForceUpgrade()")

forceRacks := dc.Spec.ForceUpgradeRacks
if len(forceRacks) == 0 {
return result.Continue()
}

// Datacenter configuration isn't healthy, we allow upgrades here before pods start
if rc.failureModeDetection() {
return rc.CheckRackPodTemplate(true)
}

return rc.CheckRackPodTemplate(true)
}

/*
func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult {
// This code is *very* similar to CheckRackPodTemplate(), but it's not an exact
// copy. Some 3 to 5 line parts could maybe be extracted into functions.
Expand Down Expand Up @@ -522,6 +632,7 @@ func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult
logger.Info("done CheckRackForceUpgrade()")
return result.Done()
}
*/

func (rc *ReconciliationContext) deleteStatefulSet(statefulSet *appsv1.StatefulSet) error {
policy := metav1.DeletePropagationOrphan
Expand Down Expand Up @@ -2511,7 +2622,7 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) {
return recResult.Output()
}

if recResult := rc.CheckRackPodTemplate(); recResult.Completed() {
if recResult := rc.CheckRackPodTemplate(false); recResult.Completed() {
return recResult.Output()
}

Expand Down
29 changes: 15 additions & 14 deletions pkg/reconciliation/reconcile_racks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package reconciliation
import (
"context"
"fmt"
"github.com/pkg/errors"
"io"
"net/http"
"reflect"
Expand All @@ -16,6 +15,8 @@ import (
"testing"
"time"

"github.com/pkg/errors"

"k8s.io/utils/ptr"

api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
Expand Down Expand Up @@ -287,7 +288,7 @@ func TestCheckRackPodTemplate_SetControllerRefOnStatefulSet(t *testing.T) {
}
rc.Datacenter.Spec.PodTemplateSpec = podTemplateSpec

result = rc.CheckRackPodTemplate()
result = rc.CheckRackPodTemplate(false)
assert.True(t, result.Completed())

assert.Equal(t, 1, invocations)
Expand All @@ -314,7 +315,7 @@ func TestCheckRackPodTemplate_CanaryUpgrade(t *testing.T) {
t.Fatalf("failed to add rack to cassandradatacenter: %s", err)
}

result = rc.CheckRackPodTemplate()
result = rc.CheckRackPodTemplate(false)
_, err := result.Output()

assert.True(t, result.Completed())
Expand All @@ -325,7 +326,7 @@ func TestCheckRackPodTemplate_CanaryUpgrade(t *testing.T) {
rc.Datacenter.Spec.ServerVersion = "6.8.44"
partition := rc.Datacenter.Spec.CanaryUpgradeCount

result = rc.CheckRackPodTemplate()
result = rc.CheckRackPodTemplate(false)
_, err = result.Output()

assert.True(t, result.Completed())
Expand Down Expand Up @@ -354,7 +355,7 @@ func TestCheckRackPodTemplate_CanaryUpgrade(t *testing.T) {

rc.Datacenter.Spec.CanaryUpgrade = false

result = rc.CheckRackPodTemplate()
result = rc.CheckRackPodTemplate(false)
assert.True(t, result.Completed())
assert.NotEqual(t, expectedStrategy, rc.statefulSets[0].Spec.UpdateStrategy)
}
Expand All @@ -373,7 +374,7 @@ func TestCheckRackPodTemplate_GenerationCheck(t *testing.T) {
rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation
rc.Datacenter.Spec.ServerVersion = "6.8.44"

res = rc.CheckRackPodTemplate()
res = rc.CheckRackPodTemplate(false)
assert.Equal(result.Continue(), res)
cond, found := rc.Datacenter.GetCondition(api.DatacenterRequiresUpdate)
assert.True(found)
Expand All @@ -390,7 +391,7 @@ func TestCheckRackPodTemplate_GenerationCheck(t *testing.T) {
metav1.SetMetaDataAnnotation(&rc.Datacenter.ObjectMeta, api.UpdateAllowedAnnotation, string(api.AllowUpdateAlways))
rc.Datacenter.Spec.ServerVersion = "6.8.44" // This needs to be reapplied, since we call Patch in the CheckRackPodTemplate()

res = rc.CheckRackPodTemplate()
res = rc.CheckRackPodTemplate(false)
assert.True(res.Completed())
}

Expand Down Expand Up @@ -443,7 +444,7 @@ func TestCheckRackPodTemplate_TemplateLabels(t *testing.T) {
rc.statefulSets = make([]*appsv1.StatefulSet, len(rackInfo))
rc.statefulSets[0] = desiredStatefulSet

res := rc.CheckRackPodTemplate()
res := rc.CheckRackPodTemplate(false)
require.Equal(result.Done(), res)
rc.statefulSets[0].Status.ObservedGeneration = rc.statefulSets[0].Generation

Expand All @@ -454,7 +455,7 @@ func TestCheckRackPodTemplate_TemplateLabels(t *testing.T) {
// Now update the template and verify that the StatefulSet is updated
rc.Datacenter.Spec.PodTemplateSpec.ObjectMeta.Labels["foo2"] = "baz"
rc.Datacenter.Generation++
res = rc.CheckRackPodTemplate()
res = rc.CheckRackPodTemplate(false)
require.Equal(result.Done(), res)

sts = &appsv1.StatefulSet{}
Expand Down Expand Up @@ -2689,7 +2690,7 @@ func TestCheckRackPodTemplateWithVolumeExpansion(t *testing.T) {
res := rc.CheckRackCreation()
require.False(res.Completed(), "CheckRackCreation did not complete as expected")

require.Equal(result.Continue(), rc.CheckRackPodTemplate())
require.Equal(result.Continue(), rc.CheckRackPodTemplate(false))

metav1.SetMetaDataAnnotation(&rc.Datacenter.ObjectMeta, api.AllowStorageChangesAnnotation, "true")
require.NoError(rc.Client.Update(rc.Ctx, rc.Datacenter))
Expand All @@ -2713,11 +2714,11 @@ func TestCheckRackPodTemplateWithVolumeExpansion(t *testing.T) {
require.NoError(rc.Client.Create(rc.Ctx, pvc))
}

require.Equal(result.Continue(), rc.CheckRackPodTemplate())
require.Equal(result.Continue(), rc.CheckRackPodTemplate(false))

rc.Datacenter.Spec.StorageConfig.CassandraDataVolumeClaimSpec.Resources.Requests = map[corev1.ResourceName]resource.Quantity{corev1.ResourceStorage: resource.MustParse("2Gi")}
require.NoError(rc.Client.Update(rc.Ctx, rc.Datacenter))
res = rc.CheckRackPodTemplate()
res = rc.CheckRackPodTemplate(false)
_, err := res.Output()
require.EqualError(err, "PVC resize requested, but StorageClass standard does not support expansion", "We should have an error, storageClass does not support expansion")

Expand All @@ -2727,14 +2728,14 @@ func TestCheckRackPodTemplateWithVolumeExpansion(t *testing.T) {
storageClass.AllowVolumeExpansion = ptr.To[bool](true)
require.NoError(rc.Client.Update(rc.Ctx, storageClass))

res = rc.CheckRackPodTemplate()
res = rc.CheckRackPodTemplate(false)
require.Equal(result.Done(), res, "Recreating StS should throw us to silence period")

require.NoError(rc.Client.Get(rc.Ctx, nsName, sts))
require.Equal(resource.MustParse("2Gi"), sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests[corev1.ResourceStorage])

// The fakeClient behavior does not prevent us from modifying the StS fields, so this test behaves unlike real world in that sense
res = rc.CheckRackPodTemplate()
res = rc.CheckRackPodTemplate(false)
require.Equal(result.Continue(), res, "Recreating StS should throw us to silence period")
}

Expand Down

0 comments on commit 287d162

Please sign in to comment.