Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow update of CassandraDatacenter to be processed to StatefulSets i… #711

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/kindIntegTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ jobs:
- webhook_validation
# Three worker tests:
- canary_upgrade
# - config_change_condition # config_change takes care of testing the same
- config_change_condition
#- cdc_successful # OSS only
# - delete_node_lost_readiness # DSE specific behavior
- host_network
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/workflow-integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ jobs:
- webhook_validation
# Three worker tests:
# - canary_upgrade # See kind_40_tests job
# - config_change_condition # config_change takes care of the same testing
- config_change_condition
# - cdc_successful # CDC is OSS only , see kind_311_tests and kind_40_tests jobs
# - delete_node_lost_readiness # DSE specific behavior see kind_dse_tests job
- host_network
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti

## unreleased

* [FEATURE] [#583](https://github.com/k8ssandra/cass-operator/issues/583) If there are pods in failed state (CrashLoopBackOff, ImagePullBackOff or ErrImagePull), restartCount of a container/initContainer is more than zero with termination code >0 or we have a SchedulingFailed event, allow StatefulSet updates even if previous ones haven't been rolled yet. ForceUpgradeRacks will no longer remove itself from the CassandraDatacenter to prevent self modifying Spec.

## v1.23.0

* [CHANGE] [#720](https://github.com/k8ssandra/cass-operator/issues/720) Always use ObjectMeta.Name for the PodDisruptionBudget resource name, not the DatacenterName
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ OPM ?= $(LOCALBIN)/opm

## Tool Versions
CERT_MANAGER_VERSION ?= v1.14.7
KUSTOMIZE_VERSION ?= v5.4.2
KUSTOMIZE_VERSION ?= v5.5.0
CONTROLLER_TOOLS_VERSION ?= v0.15.0
OPERATOR_SDK_VERSION ?= 1.35.0
HELM_VERSION ?= 3.14.2
OPM_VERSION ?= 1.38.0
GOLINT_VERSION ?= 1.60.3
GOLINT_VERSION ?= 1.62.2

.PHONY: cert-manager
cert-manager: ## Install cert-manager to the cluster
Expand Down
17 changes: 16 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"go.uber.org/zap/zapcore"
_ "k8s.io/client-go/plugin/pkg/client/auth"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand Down Expand Up @@ -119,6 +121,8 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()

if err = (&controllers.CassandraDatacenterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("CassandraDatacenter"),
Expand Down Expand Up @@ -152,8 +156,19 @@ func main() {
os.Exit(1)
}

if err := mgr.GetCache().IndexField(ctx, &corev1.Event{}, "involvedObject.name", func(obj client.Object) []string {
event := obj.(*corev1.Event)
if event.InvolvedObject.Kind == "Pod" {
return []string{event.InvolvedObject.Name}
}
return []string{}
}); err != nil {
setupLog.Error(err, "unable to set up event index")
os.Exit(1)
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down
192 changes: 108 additions & 84 deletions pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -166,6 +167,79 @@ func (rc *ReconciliationContext) CheckRackCreation() result.ReconcileResult {
return result.Continue()
}

func (rc *ReconciliationContext) failureModeDetection() bool {
for _, pod := range rc.dcPods {
if pod == nil {
continue
}
if pod.Status.Phase == corev1.PodPending {
if pod.Status.StartTime == nil || hasBeenXMinutes(5, pod.Status.StartTime.Time) {
// Pod has been over 5 minutes in Pending state. This can be normal, but lets see
// if we have some detected failures events like FailedScheduling
events := &corev1.EventList{}
if err := rc.Client.List(rc.Ctx, events, &client.ListOptions{Namespace: pod.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{"involvedObject.name": pod.Name})}); err != nil {
rc.ReqLogger.Error(err, "error getting events for pod", "pod", pod.Name)
return false
}

for _, event := range events.Items {
if event.Reason == "FailedScheduling" {
rc.ReqLogger.Info("Found FailedScheduling event for pod", "pod", pod.Name)
// We have a failed scheduling event
return true
}
}
}
}

// Pod could also be running / terminated, we need to find if any container is in crashing state
// Sadly, this state is ephemeral, so it can change between reconciliations
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.State.Waiting != nil {
waitingReason := containerStatus.State.Waiting.Reason
if waitingReason == "CrashLoopBackOff" ||
waitingReason == "ImagePullBackOff" ||
waitingReason == "ErrImagePull" {
rc.ReqLogger.Info("Failing container state for pod", "pod", pod.Name, "reason", waitingReason)
// We have a container in a failing state
return true
}
}
if containerStatus.RestartCount > 2 {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode != 0 {
rc.ReqLogger.Info("Failing container state for pod", "pod", pod.Name, "exitCode", containerStatus.State.Terminated.ExitCode)
return true
}
}
}
}
// Check the same for initContainers
for _, containerStatus := range pod.Status.InitContainerStatuses {
if containerStatus.State.Waiting != nil {
waitingReason := containerStatus.State.Waiting.Reason
if waitingReason == "CrashLoopBackOff" ||
waitingReason == "ImagePullBackOff" ||
waitingReason == "ErrImagePull" {
// We have a container in a failing state
rc.ReqLogger.Info("Failing initcontainer state for pod", "pod", pod.Name, "reason", waitingReason)
return true
}
}
if containerStatus.RestartCount > 2 {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode != 0 {
rc.ReqLogger.Info("Failing initcontainer state for pod", "pod", pod.Name, "exitCode", containerStatus.State.Terminated.ExitCode)
return true
}
}
}
}
}

return false
}

func (rc *ReconciliationContext) UpdateAllowed() bool {
// HasAnnotation might require also checking if it's "once / always".. or then we need to validate those allowed values in the webhook
return rc.Datacenter.GenerationChanged() || metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.UpdateAllowedAnnotation)
Expand Down Expand Up @@ -300,13 +374,22 @@ func (rc *ReconciliationContext) CheckVolumeClaimSizes(statefulSet, desiredSts *
return result.Continue()
}

func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
func (rc *ReconciliationContext) CheckRackPodTemplate(force bool) result.ReconcileResult {
logger := rc.ReqLogger
dc := rc.Datacenter
logger.Info("reconcile_racks::CheckRackPodTemplate")

for idx := range rc.desiredRackInformation {
rackName := rc.desiredRackInformation[idx].RackName
if force {
forceRacks := dc.Spec.ForceUpgradeRacks
if len(forceRacks) > 0 {
if utils.IndexOfString(forceRacks, rackName) <= 0 {
continue
}
}
}

if dc.Spec.CanaryUpgrade && idx > 0 {
logger.
WithValues("rackName", rackName).
Expand All @@ -323,9 +406,9 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
updatedReplicas = status.CurrentReplicas + status.UpdatedReplicas
}

if statefulSet.Generation != status.ObservedGeneration ||
if !force && (statefulSet.Generation != status.ObservedGeneration ||
status.Replicas != status.ReadyReplicas ||
status.Replicas != updatedReplicas {
status.Replicas != updatedReplicas) {

logger.Info(
"waiting for upgrade to finish on statefulset",
Expand Down Expand Up @@ -357,7 +440,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
return result.Error(err)
}

if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && !rc.UpdateAllowed() {
if !force && !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && !rc.UpdateAllowed() {
logger.
WithValues("rackName", rackName).
Info("update is blocked, but statefulset needs an update. Marking datacenter as requiring update.")
Expand All @@ -369,7 +452,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
return result.Continue()
}

if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && rc.UpdateAllowed() {
if !utils.ResourcesHaveSameHash(statefulSet, desiredSts) && (force || rc.UpdateAllowed()) {
logger.
WithValues("rackName", rackName).
Info("statefulset needs an update")
Expand Down Expand Up @@ -397,7 +480,7 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
desiredSts.DeepCopyInto(statefulSet)

rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack,
"Updating rack %s", rackName)
"Updating rack %s", rackName, "force", force)

if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil {
return result.Error(err)
Expand All @@ -423,13 +506,17 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
}
}

if err := rc.enableQuietPeriod(20); err != nil {
logger.Error(
err,
"Error when enabling quiet period")
return result.Error(err)
if !force {
if err := rc.enableQuietPeriod(20); err != nil {
logger.Error(
err,
"Error when enabling quiet period")
return result.Error(err)
}
}

// TODO Do we really want to modify spec here?

// we just updated k8s and pods will be knocked out of ready state, so let k8s
// call us back when these changes are done and the new pods are back to ready
return result.Done()
Expand All @@ -441,85 +528,22 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
}

func (rc *ReconciliationContext) CheckRackForceUpgrade() result.ReconcileResult {
// This code is *very* similar to CheckRackPodTemplate(), but it's not an exact
// copy. Some 3 to 5 line parts could maybe be extracted into functions.
logger := rc.ReqLogger
dc := rc.Datacenter
logger.Info("starting CheckRackForceUpgrade()")
logger := rc.ReqLogger
logger.Info("reconcile_racks::CheckRackForceUpgrade")

// Datacenter configuration isn't healthy, we allow upgrades here before pods start
if rc.failureModeDetection() {
logger.Info("Failure detected, forcing CheckRackPodTemplate()")
return rc.CheckRackPodTemplate(true)
}

forceRacks := dc.Spec.ForceUpgradeRacks
if len(forceRacks) == 0 {
return result.Continue()
}

for idx, nextRack := range rc.desiredRackInformation {
rackName := rc.desiredRackInformation[idx].RackName
if utils.IndexOfString(forceRacks, rackName) >= 0 {
statefulSet := rc.statefulSets[idx]

// have to use zero here, because each statefulset is created with no replicas
// in GetStatefulSetForRack()
desiredSts, err := newStatefulSetForCassandraDatacenter(statefulSet, rackName, dc, nextRack.NodeCount)
if err != nil {
logger.Error(err, "error calling newStatefulSetForCassandraDatacenter")
return result.Error(err)
}

// Set the CassandraDatacenter as the owner and controller
err = setControllerReference(
rc.Datacenter,
desiredSts,
rc.Scheme)
if err != nil {
logger.Error(err, "error calling setControllerReference for statefulset", "desiredSts.Namespace",
desiredSts.Namespace, "desireSts.Name", desiredSts.Name)
return result.Error(err)
}

// "fix" the replica count, and maintain labels and annotations the k8s admin may have set
desiredSts.Spec.Replicas = statefulSet.Spec.Replicas
desiredSts.Labels = utils.MergeMap(map[string]string{}, statefulSet.Labels, desiredSts.Labels)
desiredSts.Annotations = utils.MergeMap(map[string]string{}, statefulSet.Annotations, desiredSts.Annotations)

desiredSts.DeepCopyInto(statefulSet)

rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.UpdatingRack,
"Force updating rack %s", rackName)

if err := rc.setConditionStatus(api.DatacenterUpdating, corev1.ConditionTrue); err != nil {
return result.Error(err)
}

if err := setOperatorProgressStatus(rc, api.ProgressUpdating); err != nil {
return result.Error(err)
}

logger.Info("Force updating statefulset pod specs",
"statefulSet", statefulSet,
)

if err := rc.Client.Update(rc.Ctx, statefulSet); err != nil {
if errors.IsInvalid(err) {
if err = rc.deleteStatefulSet(statefulSet); err != nil {
return result.Error(err)
}
} else {
return result.Error(err)
}
}
}
}

dcPatch := client.MergeFrom(dc.DeepCopy())
dc.Spec.ForceUpgradeRacks = nil

if err := rc.Client.Patch(rc.Ctx, dc, dcPatch); err != nil {
logger.Error(err, "error patching datacenter to clear force upgrade")
return result.Error(err)
}

logger.Info("done CheckRackForceUpgrade()")
return result.Done()
return rc.CheckRackPodTemplate(true)
}

func (rc *ReconciliationContext) deleteStatefulSet(statefulSet *appsv1.StatefulSet) error {
Expand Down Expand Up @@ -2595,7 +2619,7 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) {
return recResult.Output()
}

if recResult := rc.CheckRackPodTemplate(); recResult.Completed() {
if recResult := rc.CheckRackPodTemplate(false); recResult.Completed() {
return recResult.Output()
}

Expand Down
Loading
Loading