diff --git a/controllers/k8ssandra/medusa_reconciler.go b/controllers/k8ssandra/medusa_reconciler.go index e55f7ca5e..357c1b52f 100644 --- a/controllers/k8ssandra/medusa_reconciler.go +++ b/controllers/k8ssandra/medusa_reconciler.go @@ -109,11 +109,7 @@ func (r *K8ssandraClusterReconciler) reconcileMedusa( return result.Error(err) } purgeCronJob.SetLabels(labels.CleanedUpByLabels(kcKey)) - recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *purgeCronJob) - switch { - case recRes.IsError(): - return recRes - case recRes.IsRequeue(): + if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *purgeCronJob); recRes.Completed() { return recRes } } else { @@ -173,12 +169,7 @@ func (r *K8ssandraClusterReconciler) reconcileMedusaSecrets( return result.Error(err) } - res := r.reconcileRemoteBucketSecretsDeprecated(ctx, r.ClientCache.GetLocalClient(), kc, logger) - switch { - case res.IsError(): - logger.Error(res.GetError(), "Failed to reconcile Medusa bucket secrets") - return res - case res.IsRequeue(): + if res := r.reconcileRemoteBucketSecretsDeprecated(ctx, r.ClientCache.GetLocalClient(), kc, logger); res.Completed() { return res } } @@ -202,11 +193,7 @@ func (r *K8ssandraClusterReconciler) reconcileMedusaConfigMap( desiredConfigMap := medusa.CreateMedusaConfigMap(namespace, kc.SanitizedName(), medusaIni) kcKey := utils.GetKey(kc) desiredConfigMap.SetLabels(labels.CleanedUpByLabels(kcKey)) - recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredConfigMap) - switch { - case recRes.IsError(): - return recRes - case recRes.IsRequeue(): + if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredConfigMap); recRes.Completed() { return recRes } } diff --git a/controllers/k8ssandra/vector.go b/controllers/k8ssandra/vector.go index 730d4a2ea..69e44ee65 100644 --- a/controllers/k8ssandra/vector.go +++ b/controllers/k8ssandra/vector.go @@ -41,11 +41,7 @@ func (r *K8ssandraClusterReconciler) reconcileVector( // Check if the vector config map already exists desiredVectorConfigMap.SetLabels(labels.CleanedUpByLabels(kcKey)) - recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap) - switch { - case recRes.IsError(): - return recRes - case recRes.IsRequeue(): + if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap); recRes.Completed() { return recRes } } else { diff --git a/controllers/reaper/vector.go b/controllers/reaper/vector.go index c0a6f572f..c8518821a 100644 --- a/controllers/reaper/vector.go +++ b/controllers/reaper/vector.go @@ -42,14 +42,9 @@ func (r *ReaperReconciler) reconcileVectorConfigMap( dcLogger.Error(err, "Failed to set controller reference on new Reaper Vector ConfigMap", "ConfigMap", configMapKey) return ctrl.Result{}, err } - recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap) - switch { - case recRes.IsError(): - return ctrl.Result{}, recRes.GetError() - case recRes.IsRequeue(): - return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil + if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap); recRes.Completed() { + return recRes.Output() } - } else { if err := shared.DeleteConfigMapIfExists(ctx, remoteClient, configMapKey, dcLogger); err != nil { return ctrl.Result{}, err diff --git a/controllers/stargate/stargate_controller.go b/controllers/stargate/stargate_controller.go index 6dfe7dfa8..a9c77fcf3 100644 --- a/controllers/stargate/stargate_controller.go +++ b/controllers/stargate/stargate_controller.go @@ -160,7 +160,7 @@ func (r *StargateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if stargateConfigResult, err := r.reconcileStargateConfigMap(ctx, stargate, dcConfig, userStargateCassandraYaml, userStargateCqlYaml, req.Namespace, *actualDc, logger); err != nil { return ctrl.Result{}, err } else { - if stargateConfigResult.Requeue { + if stargateConfigResult.Requeue || stargateConfigResult.RequeueAfter > 0 { return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil } } @@ -198,7 +198,7 @@ func (r *StargateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // reconcile Vector configmap if vectorReconcileResult, err := r.reconcileVectorConfigMap(ctx, *stargate, actualDc, r.Client, logger); err != nil { return vectorReconcileResult, err - } else if vectorReconcileResult.Requeue { + } else if vectorReconcileResult.Requeue || vectorReconcileResult.RequeueAfter > 0 { return vectorReconcileResult, nil } @@ -446,12 +446,8 @@ func (r *StargateReconciler) reconcileStargateConfigMap( return ctrl.Result{}, err } - recRes := reconciliation.ReconcileObject(ctx, r.Client, r.DefaultDelay, *desiredConfigMap) - switch { - case recRes.IsError(): - return ctrl.Result{}, recRes.GetError() - case recRes.IsRequeue(): - return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil + if recRes := reconciliation.ReconcileObject(ctx, r.Client, r.DefaultDelay, *desiredConfigMap); recRes.Completed() { + return recRes.Output() } logger.Info("Stargate ConfigMap successfully reconciled") return ctrl.Result{}, nil diff --git a/controllers/stargate/stargate_controller_test.go b/controllers/stargate/stargate_controller_test.go index 105584e38..d4c386074 100644 --- a/controllers/stargate/stargate_controller_test.go +++ b/controllers/stargate/stargate_controller_test.go @@ -3,6 +3,7 @@ package stargate import ( "context" "encoding/json" + "os" "testing" "time" @@ -39,6 +40,10 @@ const ( var managementApiFactory = &testutils.FakeManagementApiFactory{} func TestStargate(t *testing.T) { + + os.Setenv("REQUEUE_DEFAULT_DELAY", "10ms") + os.Setenv("REQUEUE_LONG_DELAY", "10ms") + ctx := testutils.TestSetup(t) ctx, cancel := context.WithCancel(ctx) testEnv := &testutils.TestEnv{} diff --git a/controllers/stargate/vector.go b/controllers/stargate/vector.go index 0884c378c..1ec9bb299 100644 --- a/controllers/stargate/vector.go +++ b/controllers/stargate/vector.go @@ -43,12 +43,8 @@ func (r *StargateReconciler) reconcileVectorConfigMap( dcLogger.Error(err, "Failed to set controller reference on new Stargate Vector ConfigMap", "ConfigMap", configMapKey) return ctrl.Result{}, err } - recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap) - switch { - case recRes.IsError(): - return ctrl.Result{}, recRes.GetError() - case recRes.IsRequeue(): - return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil + if recRes := reconciliation.ReconcileObject(ctx, remoteClient, r.DefaultDelay, *desiredVectorConfigMap); recRes.Completed() { + return recRes.Output() } } else { if err := deleteConfigMapIfExists(ctx, remoteClient, configMapKey, dcLogger); err != nil { diff --git a/pkg/medusa/refresh_secrets.go b/pkg/medusa/refresh_secrets.go index 659b24aee..132ca43ad 100644 --- a/pkg/medusa/refresh_secrets.go +++ b/pkg/medusa/refresh_secrets.go @@ -52,7 +52,7 @@ func RefreshSecrets(dc *cassdcapi.CassandraDatacenter, ctx context.Context, clie case recRes.IsRequeue(): requeues++ continue - case recRes.IsDone(): + case !recRes.Completed(): continue } if requeues > 0 { diff --git a/pkg/reconciliation/generic.go b/pkg/reconciliation/generic.go index 2d3c9cbb7..2f9f32f7d 100644 --- a/pkg/reconciliation/generic.go +++ b/pkg/reconciliation/generic.go @@ -18,8 +18,18 @@ type Reconcileable[T any] interface { *T } -// Try with U, a type of any whose POINTER still fulfils Reoncilable... +// ReconcileObject ensures that desiredObject exists in the given state, either by creating it, or updating it if it +// already exists. func ReconcileObject[U any, T Reconcileable[U]](ctx context.Context, kClient client.Client, requeueDelay time.Duration, desiredObject U) result.ReconcileResult { + recResult, _ := ReconcileAndGetObject[U, T](ctx, kClient, requeueDelay, desiredObject) + return recResult +} + +// ReconcileAndGetObject ensures that desiredObject exists in the given state, either by creating it, or updating it if +// it already exists. It returns the current state of the object on the server after the reconciliation. +func ReconcileAndGetObject[U any, T Reconcileable[U]]( + ctx context.Context, kClient client.Client, requeueDelay time.Duration, desiredObject U, +) (result.ReconcileResult, *U) { objectKey := types.NamespacedName{ Name: T(&desiredObject).GetName(), Namespace: T(&desiredObject).GetNamespace(), @@ -34,14 +44,13 @@ func ReconcileObject[U any, T Reconcileable[U]](ctx context.Context, kClient cli if errors.IsNotFound(err) { if err := kClient.Create(ctx, T(&desiredObject)); err != nil { if errors.IsAlreadyExists(err) { - return result.RequeueSoon(requeueDelay) + return result.RequeueSoon(requeueDelay), nil } - return result.Error(err) + return result.Error(err), nil } - return result.RequeueSoon(requeueDelay) - } else { - return result.Error(err) + return result.Continue(), &desiredObject } + return result.Error(err), nil } if !annotations.CompareHashAnnotations(T(currentCm), T(&desiredObject)) { @@ -49,9 +58,9 @@ func ReconcileObject[U any, T Reconcileable[U]](ctx context.Context, kClient cli T(&desiredObject).DeepCopyInto(currentCm) T(currentCm).SetResourceVersion(resourceVersion) if err := kClient.Update(ctx, T(currentCm)); err != nil { - return result.Error(err) + return result.Error(err), nil } - return result.RequeueSoon(requeueDelay) + return result.Continue(), currentCm } - return result.Done() + return result.Continue(), currentCm } diff --git a/pkg/reconciliation/generic_test.go b/pkg/reconciliation/generic_test.go index a2bc5abfa..45c9f7df8 100644 --- a/pkg/reconciliation/generic_test.go +++ b/pkg/reconciliation/generic_test.go @@ -32,7 +32,8 @@ func Test_ReconcileObject_UpdateDone(t *testing.T) { kClient := testutils.NewFakeClientWRestMapper() // Reset the Client // Launch reconciliation. recRes := ReconcileObject(ctx, kClient, requeueDelay, desiredObject) - assert.True(t, recRes.IsRequeue()) + // Should update immediately and signal we can continue + assert.False(t, recRes.Completed()) // After the update we should see the expected ConfigMap afterUpdateCM := &corev1.ConfigMap{} err := kClient.Get(ctx, @@ -42,15 +43,13 @@ func Test_ReconcileObject_UpdateDone(t *testing.T) { }, afterUpdateCM) assert.NoError(t, err) - // If we reconcile again, we should move into the Done state. - recRes = ReconcileObject(ctx, kClient, requeueDelay, desiredObject) - assert.True(t, recRes.IsDone()) } func Test_ReconcileObject_CreateSuccess(t *testing.T) { kClient := testutils.NewFakeClientWRestMapper() // Reset the Client recRes := ReconcileObject(ctx, kClient, requeueDelay, desiredObject) - assert.True(t, recRes.IsRequeue()) + // Should create immediately and signal we can continue + assert.False(t, recRes.Completed()) actualCm := &corev1.ConfigMap{} err := kClient.Get(ctx, types.NamespacedName{Name: desiredObject.Name, Namespace: desiredObject.Namespace}, actualCm) assert.NoError(t, err) @@ -75,7 +74,7 @@ func Test_ReconcileObject_UpdateSuccess(t *testing.T) { } // Launch reconciliation. recRes := ReconcileObject(ctx, kClient, requeueDelay, desiredObject) - assert.True(t, recRes.IsRequeue()) + assert.False(t, recRes.Completed()) annotations.AddHashAnnotation(&desiredObject) // After the update we should see the expected ConfigMap afterUpdateCM := &corev1.ConfigMap{} diff --git a/pkg/result/result_helper.go b/pkg/result/result_helper.go index 0133074f1..533c7f063 100644 --- a/pkg/result/result_helper.go +++ b/pkg/result/result_helper.go @@ -1,20 +1,56 @@ package result import ( + "sigs.k8s.io/controller-runtime/pkg/reconcile" "time" ctrl "sigs.k8s.io/controller-runtime" ) -// Copyright DataStax, Inc. -// Please see the included license file for details. - +// This is just so that we can reference TerminalError in the Godoc of [Error] +var _ error = reconcile.TerminalError(nil) + +// ReconcileResult represents the result of a step in the reconciliation process. +// +// We typically split the top-level Reconcile() method of a controller into multiple sub-functions. Each of these +// functions uses ReconcileResult to communicate to its caller how the current iteration should proceed. There are 4 +// possible implementations: [Continue], [Done], [RequeueSoon], and [Error]. +// +// The idiomatic way to handle a ReconcileResult in an intermediary sub-function is: +// +// if recResult := callStep1(); recResult.Completed() { +// // Global success, requeue or error: stop what we're doing and propagate up +// return recResult +// } +// // Otherwise, proceed with the next step(s) +// if recResult := callStep2(); recResult.Completed() { +// // etc... +// +// The idiomatic way to handle a ReconcileResult in the top-level Reconcile() method is: +// +// recResult := callSomeSubmethod() +// // Possibly inspect the result (e.g. to set an error field in the status) +// return recResult.Output() type ReconcileResult interface { + // Completed indicates that the current iteration of the reconciliation loop is complete, and the top-level + // Reconcile() method should return [ReconcileResult.Output] to the controller runtime. + // + // This returns true for a [Done] or terminal [Error] (where the output will stop the entire reconciliation loop); + // and for a [RequeueSoon] or regular [Error] (where the output will trigger a retry). + // + // This returns false for a [Continue]. Completed() bool + // Output converts this result into a format that the main Reconcile() method can return to the controller runtime. + // + // Calling this method on a [Continue] will panic. Output() (ctrl.Result, error) + // IsError indicates whether this result is an [Error]. IsError() bool + // IsRequeue indicates whether this result is a [RequeueSoon]. IsRequeue() bool + // IsDone indicates whether this result is a [Done]. IsDone() bool + // GetError returns the wrapped error if the result is an [Error], otherwise it returns nil. GetError() error } @@ -121,18 +157,31 @@ func (r errorOut) GetError() error { return r.err } +// Continue indicates that the current step in the reconciliation is done. The caller should proceed with the next step. func Continue() ReconcileResult { return continueReconcile{} } +// Done indicates that the entire reconciliation loop was successful. +// The caller should skip the next steps (if any), and propagate the result up the stack. This will eventually reach the +// top-level Reconcile() method, which should stop the reconciliation. func Done() ReconcileResult { return done{} } +// RequeueSoon indicates that the current step in the reconciliation requires a requeue after a certain amount of time. +// The caller should skip the next steps (if any), and propagate the result up the stack. This will eventually reach the +// top-level Reconcile() method, which should schedule a requeue with the given delay. func RequeueSoon(after time.Duration) ReconcileResult { return callBackSoon{after: after} } +// Error indicates that the current step in the reconciliation has failed. +// The caller should skip the next steps (if any), and propagate the result up the stack. This will eventually reach the +// top-level Reconcile() method, which should return the error to the controller runtime. +// +// If the argument is wrapped with [reconcile.TerminalError], the reconciliation loop will stop; otherwise, it will be +// retried with exponential backoff. func Error(e error) ReconcileResult { return errorOut{err: e} } diff --git a/pkg/telemetry/cassandra_agent/cassandra_agent_config.go b/pkg/telemetry/cassandra_agent/cassandra_agent_config.go index c69062d80..fa75ae21b 100644 --- a/pkg/telemetry/cassandra_agent/cassandra_agent_config.go +++ b/pkg/telemetry/cassandra_agent/cassandra_agent_config.go @@ -158,11 +158,7 @@ func (c Configurator) ReconcileTelemetryAgentConfig(dc *cassdcapi.CassandraDatac } desiredCm.SetLabels(labels.CleanedUpByLabels(KlKey)) - recRes := reconciliation.ReconcileObject(c.Ctx, c.RemoteClient, c.RequeueDelay, *desiredCm) - switch { - case recRes.IsError(): - return recRes - case recRes.IsRequeue(): + if recRes := reconciliation.ReconcileObject(c.Ctx, c.RemoteClient, c.RequeueDelay, *desiredCm); recRes.Completed() { return recRes } diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 5a9b98d98..3d0ea9ca2 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -842,7 +842,7 @@ func createSingleDatacenterCluster(t *testing.T, ctx context.Context, namespace require.NoError(err, "failed to patch K8ssandraCluster in namespace %s", namespace) checkStargateReady(t, f, ctx, stargateKey) checkStargateK8cStatusReady(t, f, ctx, kcKey, dcKey) - checkContainerPresence(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName) + checkContainerPresenceEventually(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName) checkVectorAgentConfigMapPresence(t, ctx, f, dcKey, stargate.VectorAgentConfigMapName) t.Log("check that if Stargate is deleted directly it gets re-created") @@ -853,7 +853,7 @@ func createSingleDatacenterCluster(t *testing.T, ctx context.Context, namespace require.NoError(err, "failed to delete Stargate in namespace %s", namespace) checkStargateReady(t, f, ctx, stargateKey) - checkContainerPresence(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName) + checkContainerPresenceEventually(t, ctx, f, stargateDeploymentKey, k8ssandra, getPodTemplateSpec, stargate.VectorContainerName) checkVectorAgentConfigMapPresence(t, ctx, f, dcKey, stargate.VectorAgentConfigMapName) t.Log("delete Stargate in k8ssandracluster resource") @@ -2324,6 +2324,15 @@ func checkContainerPresence(t *testing.T, ctx context.Context, f *framework.E2eF require.True(t, containerFound, "cannot find Container in pod template spec") } +func checkContainerPresenceEventually(t *testing.T, ctx context.Context, f *framework.E2eFramework, podKey framework.ClusterKey, kc *api.K8ssandraCluster, specFunction func(t *testing.T, ctx context.Context, f *framework.E2eFramework, dcKey framework.ClusterKey, kc *api.K8ssandraCluster) *corev1.PodTemplateSpec, containerName string) { + t.Logf("check that %s contains Container named %s", podKey.Name, containerName) + require.Eventually(t, func() bool { + podTempSpec := specFunction(t, ctx, f, podKey, kc) + _, containerFound := cassandra.FindContainer(podTempSpec, containerName) + return containerFound + }, polling.stargateReady.timeout, polling.stargateReady.interval, "cannot find Container in pod template spec") +} + func checkContainerDeleted(t *testing.T, ctx context.Context, f *framework.E2eFramework, podKey framework.ClusterKey, kc *api.K8ssandraCluster, specFunction func(t *testing.T, ctx context.Context, f *framework.E2eFramework, dcKey framework.ClusterKey, kc *api.K8ssandraCluster) *corev1.PodTemplateSpec, containerName string) { t.Logf("check that %s does not have a Container named %s", podKey.Name, containerName) podTempSpec := specFunction(t, ctx, f, podKey, kc)