diff --git a/CHANGELOG/CHANGELOG-1.21.md b/CHANGELOG/CHANGELOG-1.21.md index 240204af1..4ddc03a96 100644 --- a/CHANGELOG/CHANGELOG-1.21.md +++ b/CHANGELOG/CHANGELOG-1.21.md @@ -15,4 +15,4 @@ When cutting a new release, update the `unreleased` heading to the tag being gen ## unreleased - +* [BUGFIX] [#1383](https://github.com/k8ssandra/k8ssandra-operator/issues/1383) Do not create MedusaBackup if MadusaBakupJob did not fully succeed diff --git a/Makefile b/Makefile index ef15da416..b523eee10 100644 --- a/Makefile +++ b/Makefile @@ -341,7 +341,7 @@ VECTOR ?= $(LOCALBIN)/bin/vector CERT_MANAGER_VERSION ?= v1.12.2 KUSTOMIZE_VERSION ?= v4.5.7 CONTROLLER_TOOLS_VERSION ?= v0.14.0 -GOLINT_VERSION ?= 1.55.0 +GOLINT_VERSION ?= 1.61.0 cert-manager: ## Install cert-manager to the cluster kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/$(CERT_MANAGER_VERSION)/cert-manager.yaml diff --git a/controllers/medusa/medusabackupjob_controller.go b/controllers/medusa/medusabackupjob_controller.go index 5e9399a5e..36607440f 100644 --- a/controllers/medusa/medusabackupjob_controller.go +++ b/controllers/medusa/medusabackupjob_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -72,9 +73,9 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - backup := instance.DeepCopy() + backupJob := instance.DeepCopy() - cassdcKey := types.NamespacedName{Namespace: backup.Namespace, Name: backup.Spec.CassandraDatacenter} + cassdcKey := types.NamespacedName{Namespace: backupJob.Namespace, Name: backupJob.Spec.CassandraDatacenter} cassdc := &cassdcapi.CassandraDatacenter{} err = r.Get(ctx, cassdcKey, cassdc) if err != nil { @@ -83,12 +84,12 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ } // Set an owner reference on the backup job so that it can be cleaned up when the cassandra datacenter is deleted - if backup.OwnerReferences == nil { - if err = controllerutil.SetControllerReference(cassdc, backup, r.Scheme); err != nil { + if backupJob.OwnerReferences == nil { + if err = controllerutil.SetControllerReference(cassdc, backupJob, r.Scheme); err != nil { logger.Error(err, "failed to set controller reference", "CassandraDatacenter", cassdcKey) return ctrl.Result{}, err } - if err = r.Update(ctx, backup); err != nil { + if err = r.Update(ctx, backupJob); err != nil { logger.Error(err, "failed to update MedusaBackupJob with owner reference", "CassandraDatacenter", cassdcKey) return ctrl.Result{}, err } else { @@ -104,16 +105,16 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ } // If there is anything in progress, simply requeue the request until each pod has finished or errored - if len(backup.Status.InProgress) > 0 { + if len(backupJob.Status.InProgress) > 0 { logger.Info("There are backups in progress, checking them..") - progress := make([]string, 0, len(backup.Status.InProgress)) - patch := client.MergeFrom(backup.DeepCopy()) + progress := make([]string, 0, len(backupJob.Status.InProgress)) + patch := client.MergeFrom(backupJob.DeepCopy()) StatusCheck: - for _, podName := range backup.Status.InProgress { + for _, podName := range backupJob.Status.InProgress { for _, pod := range pods { if podName == pod.Name { - status, err := backupStatus(ctx, backup.ObjectMeta.Name, &pod, r.ClientFactory, logger) + status, err := backupStatus(ctx, backupJob.ObjectMeta.Name, &pod, r.ClientFactory, logger) if err != nil { return ctrl.Result{}, err } @@ -121,9 +122,9 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ if status == medusa.StatusType_IN_PROGRESS { progress = append(progress, podName) } else if status == medusa.StatusType_SUCCESS { - backup.Status.Finished = append(backup.Status.Finished, podName) + backupJob.Status.Finished = append(backupJob.Status.Finished, podName) } else if status == medusa.StatusType_FAILED || status == medusa.StatusType_UNKNOWN { - backup.Status.Failed = append(backup.Status.Failed, podName) + backupJob.Status.Failed = append(backupJob.Status.Failed, podName) } continue StatusCheck @@ -131,9 +132,9 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ } } - if len(backup.Status.InProgress) != len(progress) { - backup.Status.InProgress = progress - if err := r.Status().Patch(ctx, backup, patch); err != nil { + if len(backupJob.Status.InProgress) != len(progress) { + backupJob.Status.InProgress = progress + if err := r.Status().Patch(ctx, backupJob, patch); err != nil { logger.Error(err, "failed to patch status") return ctrl.Result{}, err } @@ -147,42 +148,49 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ } // If the backup is already finished, there is nothing to do. - if medusaBackupFinished(backup) { + if medusaBackupFinished(backupJob) { logger.Info("Backup operation is already finished") return ctrl.Result{Requeue: false}, nil } // First check to see if the backup is already in progress - if !backup.Status.StartTime.IsZero() { + if !backupJob.Status.StartTime.IsZero() { // If there is anything in progress, simply requeue the request - if len(backup.Status.InProgress) > 0 { + if len(backupJob.Status.InProgress) > 0 { logger.Info("Backup is still in progress") return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil } + // there is nothing in progress, so the job is finished (not yet sure if successfully) + // Regardless of the success, we set the job finish time + // Note that the time here is not accurate, but that is ok. For now we are just + // using it as a completion marker. + patch := client.MergeFrom(backupJob.DeepCopy()) + backupJob.Status.FinishTime = metav1.Now() + if err := r.Status().Patch(ctx, backupJob, patch); err != nil { + logger.Error(err, "failed to patch status with finish time") + return ctrl.Result{}, err + } + + // if there are failures, we will end here and not proceed with creating a backup object + if len(backupJob.Status.Failed) > 0 { + logger.Info("Backup failed on some nodes", "BackupName", backupJob.Name, "Failed", backupJob.Status.Failed) + return ctrl.Result{Requeue: false}, nil + } + logger.Info("backup complete") - // The MedusaBackupJob is finished and we now need to create the MedusaBackup object. - backupSummary, err := r.getBackupSummary(ctx, backup, pods, logger) + // The MedusaBackupJob is finished successfully and we now need to create the MedusaBackup object. + backupSummary, err := r.getBackupSummary(ctx, backupJob, pods, logger) if err != nil { logger.Error(err, "Failed to get backup summary") return ctrl.Result{}, err } - if err := r.createMedusaBackup(ctx, backup, backupSummary, logger); err != nil { + if err := r.createMedusaBackup(ctx, backupJob, backupSummary, logger); err != nil { logger.Error(err, "Failed to create MedusaBackup") return ctrl.Result{}, err } - // Set the finish time - // Note that the time here is not accurate, but that is ok. For now we are just - // using it as a completion marker. - patch := client.MergeFrom(backup.DeepCopy()) - backup.Status.FinishTime = metav1.Now() - if err := r.Status().Patch(ctx, backup, patch); err != nil { - logger.Error(err, "failed to patch status with finish time") - return ctrl.Result{}, err - } - return ctrl.Result{Requeue: false}, nil } @@ -195,31 +203,31 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, medusa.BackupSidecarNotFound } - patch := client.MergeFromWithOptions(backup.DeepCopy(), client.MergeFromWithOptimisticLock{}) + patch := client.MergeFromWithOptions(backupJob.DeepCopy(), client.MergeFromWithOptimisticLock{}) - backup.Status.StartTime = metav1.Now() + backupJob.Status.StartTime = metav1.Now() - if err := r.Status().Patch(ctx, backup, patch); err != nil { + if err := r.Status().Patch(ctx, backupJob, patch); err != nil { logger.Error(err, "Failed to patch status") // We received a stale object, requeue for next processing return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil } logger.Info("Starting backups") - patch = client.MergeFrom(backup.DeepCopy()) + patch = client.MergeFrom(backupJob.DeepCopy()) for _, p := range pods { logger.Info("starting backup", "CassandraPod", p.Name) - _, err := doMedusaBackup(ctx, backup.ObjectMeta.Name, backup.Spec.Type, &p, r.ClientFactory, logger) + _, err := doMedusaBackup(ctx, backupJob.ObjectMeta.Name, backupJob.Spec.Type, &p, r.ClientFactory, logger) if err != nil { logger.Error(err, "backup failed", "CassandraPod", p.Name) } - backup.Status.InProgress = append(backup.Status.InProgress, p.Name) + backupJob.Status.InProgress = append(backupJob.Status.InProgress, p.Name) } // logger.Info("finished backup operations") - if err := r.Status().Patch(context.Background(), backup, patch); err != nil { - logger.Error(err, "failed to patch status", "Backup", fmt.Sprintf("%s/%s", backup.Name, backup.Namespace)) + if err := r.Status().Patch(context.Background(), backupJob, patch); err != nil { + logger.Error(err, "failed to patch status", "Backup", fmt.Sprintf("%s/%s", backupJob.Name, backupJob.Namespace)) } return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil @@ -317,10 +325,17 @@ func backupStatus(ctx context.Context, name string, pod *corev1.Pod, clientFacto addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort)) logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr) if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil { + logger.Error(err, "Could not make a new medusa client") return medusa.StatusType_UNKNOWN, err } else { resp, err := medusaClient.BackupStatus(ctx, name) if err != nil { + // the gRPC client does not return proper NotFound error, we need to check the payload too + if errors.IsNotFound(err) || strings.Contains(err.Error(), "NotFound") { + logger.Info(fmt.Sprintf("did not find backup %s for pod %s", name, pod.Name)) + return medusa.StatusType_UNKNOWN, nil + } + logger.Error(err, fmt.Sprintf("getting backup status for backup %s and pod %s failed", name, pod.Name)) return medusa.StatusType_UNKNOWN, err } diff --git a/controllers/medusa/medusabackupjob_controller_test.go b/controllers/medusa/medusabackupjob_controller_test.go index 163382133..e20ce82f8 100644 --- a/controllers/medusa/medusabackupjob_controller_test.go +++ b/controllers/medusa/medusabackupjob_controller_test.go @@ -26,15 +26,22 @@ import ( ) const ( - medusaImageRepo = "test/medusa" - cassandraUserSecret = "medusa-secret" - defaultBackupName = "backup1" - dc1PodPrefix = "192.168.1." - dc2PodPrefix = "192.168.2." - fakeBackupFileCount = int64(13) - fakeBackupByteSize = int64(42) - fakeBackupHumanSize = "42.00 B" - fakeMaxBackupCount = 1 + medusaImageRepo = "test/medusa" + cassandraUserSecret = "medusa-secret" + successfulBackupName = "good-backup" + failingBackupName = "bad-backup" + missingBackupName = "missing-backup" + dc1PodPrefix = "192.168.1." + dc2PodPrefix = "192.168.2." + fakeBackupFileCount = int64(13) + fakeBackupByteSize = int64(42) + fakeBackupHumanSize = "42.00 B" + fakeMaxBackupCount = 1 +) + +var ( + alreadyReportedFailingBackup = false + alreadyReportedMissingBackup = false ) func testMedusaBackupDatacenter(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) { @@ -142,16 +149,24 @@ func testMedusaBackupDatacenter(t *testing.T, ctx context.Context, f *framework. }) require.NoError(err, "failed to update dc1 status to ready") - backupCreated := createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, defaultBackupName) + backupCreated := createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, successfulBackupName) require.True(backupCreated, "failed to create backup") t.Log("verify that medusa gRPC clients are invoked") require.Equal(map[string][]string{ - fmt.Sprintf("%s:%d", getPodIpAddress(0, dc1.DatacenterName()), shared.BackupSidecarPort): {defaultBackupName}, - fmt.Sprintf("%s:%d", getPodIpAddress(1, dc1.DatacenterName()), shared.BackupSidecarPort): {defaultBackupName}, - fmt.Sprintf("%s:%d", getPodIpAddress(2, dc1.DatacenterName()), shared.BackupSidecarPort): {defaultBackupName}, + fmt.Sprintf("%s:%d", getPodIpAddress(0, dc1.DatacenterName()), shared.BackupSidecarPort): {successfulBackupName}, + fmt.Sprintf("%s:%d", getPodIpAddress(1, dc1.DatacenterName()), shared.BackupSidecarPort): {successfulBackupName}, + fmt.Sprintf("%s:%d", getPodIpAddress(2, dc1.DatacenterName()), shared.BackupSidecarPort): {successfulBackupName}, }, medusaClientFactory.GetRequestedBackups(dc1.DatacenterName())) + // a failing backup is one that actually starts but fails (on one pod) + backupCreated = createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, failingBackupName) + require.False(backupCreated, "the backup object shouldn't have been created") + + // a missing backup is one that never gets to start (on one pod) + backupCreated = createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, missingBackupName) + require.False(backupCreated, "the backup object shouldn't have been created") + err = f.DeleteK8ssandraCluster(ctx, client.ObjectKey{Namespace: kc.Namespace, Name: kc.Name}, timeout, interval) require.NoError(err, "failed to delete K8ssandraCluster") verifyObjectDoesNotExist(ctx, t, f, dc1Key, &cassdcapi.CassandraDatacenter{}) @@ -230,17 +245,23 @@ func createAndVerifyMedusaBackup(dcKey framework.ClusterKey, dc *cassdcapi.Cassa if err != nil { return false } - t.Logf("backup finish time: %v", updated.Status.FinishTime) - t.Logf("backup finished: %v", updated.Status.Finished) - t.Logf("backup in progress: %v", updated.Status.InProgress) - return !updated.Status.FinishTime.IsZero() && len(updated.Status.Finished) == 3 && len(updated.Status.InProgress) == 0 + t.Logf("backup %s finish time: %v", backupName, updated.Status.FinishTime) + t.Logf("backup %s failed: %v", backupName, updated.Status.Failed) + t.Logf("backup %s finished: %v", backupName, updated.Status.Finished) + t.Logf("backup %s in progress: %v", backupName, updated.Status.InProgress) + return !updated.Status.FinishTime.IsZero() }, timeout, interval) - t.Log("verify that the MedusaBackup is created") + t.Log("check for the MedusaBackup being created") medusaBackupKey := framework.NewClusterKey(dcKey.K8sContext, dcKey.Namespace, backupName) medusaBackup := &api.MedusaBackup{} err = f.Get(ctx, medusaBackupKey, medusaBackup) - require.NoError(err, "failed to get MedusaBackup") + if err != nil { + if errors.IsNotFound(err) { + return false + } + } + t.Log("verify the MedusaBackup is correct") require.Equal(medusaBackup.Status.TotalNodes, dc.Spec.Size, "backup total nodes doesn't match dc nodes") require.Equal(medusaBackup.Status.FinishedNodes, dc.Spec.Size, "backup finished nodes doesn't match dc nodes") require.Equal(len(medusaBackup.Status.Nodes), int(dc.Spec.Size), "backup topology doesn't match dc topology") @@ -336,6 +357,12 @@ type fakeMedusaClient struct { } func newFakeMedusaClient(dcName string) *fakeMedusaClient { + // the fake Medusa client keeps a bit of state in order to simulate different backup statuses + // more precisely, for some backups it will return not a success for some nodes + // we need to reset this state between tests + // doing it here is great since we make a new fake client for each test anyway + alreadyReportedFailingBackup = false + alreadyReportedMissingBackup = false return &fakeMedusaClient{RequestedBackups: make([]string, 0), DcName: dcName} } @@ -349,8 +376,26 @@ func (c *fakeMedusaClient) CreateBackup(ctx context.Context, name string, backup } func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSummary, error) { + backups := make([]*medusa.BackupSummary, 0) + for _, name := range c.RequestedBackups { + + // return status based on the backup name + // since we're implementing altogether different method of the Medusa client, we cannot reuse the BackupStatus logic + // but we still want to "mock" failing backups + // this does not get called per node/pod, se we don't need to track counts of what we returned + var status medusa.StatusType + if strings.HasPrefix(name, "good") { + status = *medusa.StatusType_SUCCESS.Enum() + } else if strings.HasPrefix(name, "bad") { + status = *medusa.StatusType_FAILED.Enum() + } else if strings.HasPrefix(name, "missing") { + status = *medusa.StatusType_UNKNOWN.Enum() + } else { + status = *medusa.StatusType_IN_PROGRESS.Enum() + } + backup := &medusa.BackupSummary{ BackupName: name, StartTime: 0, @@ -359,7 +404,7 @@ func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSumm FinishedNodes: 3, TotalObjects: fakeBackupFileCount, TotalSize: fakeBackupByteSize, - Status: *medusa.StatusType_SUCCESS.Enum(), + Status: status, Nodes: []*medusa.BackupNode{ { Host: "host1", @@ -387,8 +432,31 @@ func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSumm } func (c *fakeMedusaClient) BackupStatus(ctx context.Context, name string) (*medusa.BackupStatusResponse, error) { + // return different status for differently named backups + // but for each not-successful backup, return not-a-success only once + var status medusa.StatusType + if strings.HasPrefix(name, successfulBackupName) { + status = medusa.StatusType_SUCCESS + } else if strings.HasPrefix(name, failingBackupName) { + if !alreadyReportedFailingBackup { + status = medusa.StatusType_FAILED + alreadyReportedFailingBackup = true + } else { + status = medusa.StatusType_SUCCESS + } + } else if strings.HasPrefix(name, missingBackupName) { + if !alreadyReportedMissingBackup { + alreadyReportedMissingBackup = true + // reproducing what the gRPC client would send. sadly, it's not a proper NotFound error + return nil, fmt.Errorf("rpc error: code = NotFound desc = backup <%s> does not exist", name) + } else { + status = medusa.StatusType_SUCCESS + } + } else { + status = medusa.StatusType_IN_PROGRESS + } return &medusa.BackupStatusResponse{ - Status: medusa.StatusType_SUCCESS, + Status: status, }, nil } diff --git a/controllers/medusa/medusatask_controller.go b/controllers/medusa/medusatask_controller.go index 5f3303cfb..9648415ff 100644 --- a/controllers/medusa/medusatask_controller.go +++ b/controllers/medusa/medusatask_controller.go @@ -269,6 +269,10 @@ func (r *MedusaTaskReconciler) syncOperation(ctx context.Context, task *medusav1 logger.Error(err, "failed to list backups", "CassandraPod", pod.Name) } else { for _, backup := range remoteBackups { + if backup.Status != medusa.StatusType_SUCCESS { + logger.Info(fmt.Sprintf("Skipping sync of backup %s because it wasn't a success", backup.BackupName)) + continue + } logger.Info("Syncing Backup", "Backup", backup.BackupName) // Create backups that should exist but are missing backupKey := types.NamespacedName{Namespace: task.Namespace, Name: backup.BackupName} diff --git a/controllers/medusa/medusatask_controller_test.go b/controllers/medusa/medusatask_controller_test.go index 7cb8b6357..9ef9beadb 100644 --- a/controllers/medusa/medusatask_controller_test.go +++ b/controllers/medusa/medusatask_controller_test.go @@ -16,10 +16,12 @@ import ( ) const ( - backup1 = "backup1" - backup2 = "backup2" - backup3 = "backup3" - backup4 = "backup4" + backup1 = "good-backup1" + backup2 = "good-backup2" + backup3 = "good-backup3" + backup4 = "good-backup4" + backup5 = "bad-backup5" + backup6 = "missing-backup6" ) func testMedusaTasks(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) { @@ -152,9 +154,18 @@ func testMedusaTasks(t *testing.T, ctx context.Context, f *framework.Framework, require.True(backup3Created, "failed to create backup3") backup4Created := createAndVerifyMedusaBackup(dc2Key, dc2, f, ctx, require, t, namespace, backup4) require.True(backup4Created, "failed to create backup4") + backup5Created := createAndVerifyMedusaBackup(dc2Key, dc2, f, ctx, require, t, namespace, backup5) + require.False(backup5Created, "failed to create backup5") + backup6Created := createAndVerifyMedusaBackup(dc2Key, dc2, f, ctx, require, t, namespace, backup6) + require.False(backup6Created, "failed to create backup6") - // Ensure that 4 backups and backup jobs were created - checkBackupsAndJobs(require, ctx, 4, namespace, f, []string{}) + // Ensure that 6 backups jobs, but only 4 backups were created (two jobs did not succeed on some pods) + checkBackupsAndJobs(require, ctx, 6, 4, namespace, f, []string{}) + + checkSyncTask(require, ctx, namespace, "dc2", f) + + // Ensure the sync task did not create backups for the failed jobs + checkBackupsAndJobs(require, ctx, 6, 4, namespace, f, []string{}) // Purge backups and verify that only one out of three remains t.Log("purge backups") @@ -209,7 +220,7 @@ func testMedusaTasks(t *testing.T, ctx context.Context, f *framework.Framework, // Ensure that 2 backups and backup jobs were deleted deletedBackups := []string{backup1, backup2} - checkBackupsAndJobs(require, ctx, 2, namespace, f, deletedBackups) + checkBackupsAndJobs(require, ctx, 4, 2, namespace, f, deletedBackups) medusaBackup4Key := framework.NewClusterKey(f.DataPlaneContexts[0], namespace, backup4) medusaBackup4 := &api.MedusaBackup{} @@ -222,19 +233,57 @@ func testMedusaTasks(t *testing.T, ctx context.Context, f *framework.Framework, verifyObjectDoesNotExist(ctx, t, f, dc2Key, &cassdcapi.CassandraDatacenter{}) } -func checkBackupsAndJobs(require *require.Assertions, ctx context.Context, expectedLen int, namespace string, f *framework.Framework, deleted []string) { +func checkBackupsAndJobs(require *require.Assertions, ctx context.Context, expectedJobsLen, expectedBackupsLen int, namespace string, f *framework.Framework, deleted []string) { var backups api.MedusaBackupList err := f.List(ctx, framework.NewClusterKey(f.DataPlaneContexts[0], namespace, "list-backups"), &backups) require.NoError(err, "failed to list medusabackup") - require.Len(backups.Items, expectedLen, "expected %d backups, got %d", expectedLen, len(backups.Items)) + require.Len(backups.Items, expectedBackupsLen, "expected %d backups, got %d", expectedBackupsLen, len(backups.Items)) var jobs api.MedusaBackupJobList err = f.List(ctx, framework.NewClusterKey(f.DataPlaneContexts[0], namespace, "list-backup-jobs"), &jobs) require.NoError(err, "failed to list medusabackupjobs") - require.Len(jobs.Items, expectedLen, "expected %d jobs, got %d", expectedLen, len(jobs.Items)) + require.Len(jobs.Items, expectedJobsLen, "expected %d jobs, got %d", expectedJobsLen, len(jobs.Items)) for _, d := range deleted { require.NotContains(backups.Items, d, "MedusaBackup %s to have been deleted", d) require.NotContains(jobs.Items, d, "MedusaBackupJob %s to have been deleted", d) } } + +func checkSyncTask(require *require.Assertions, ctx context.Context, namespace, dc string, f *framework.Framework) { + syncTaskName := "sync-backups" + + // create a sync task + syncTask := &api.MedusaTask{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: syncTaskName, + Labels: map[string]string{ + "app": "medusa", + }, + }, + Spec: api.MedusaTaskSpec{ + CassandraDatacenter: dc, + Operation: "sync", + }, + } + syncTaskKey := framework.NewClusterKey(f.DataPlaneContexts[0], namespace, syncTaskName) + err := f.Create(ctx, syncTaskKey, syncTask) + require.NoError(err, "failed to create sync task") + + // wait for sync task to finish + require.Eventually(func() bool { + updated := &api.MedusaTask{} + err := f.Get(ctx, syncTaskKey, updated) + if err != nil { + return false + } + + v, ok := updated.Labels["app"] + if !ok || v != "medusa" { + return false + } + + return !updated.Status.FinishTime.IsZero() + }, timeout, interval) +} diff --git a/pkg/mocks/reaper_manager.go b/pkg/mocks/reaper_manager.go index fffb5e9a0..14c346205 100644 --- a/pkg/mocks/reaper_manager.go +++ b/pkg/mocks/reaper_manager.go @@ -4,10 +4,11 @@ package mocks import ( context "context" - alpha1 "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" client "sigs.k8s.io/controller-runtime/pkg/client" + k8ssandrav1alpha1 "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" + mock "github.com/stretchr/testify/mock" v1 "k8s.io/api/core/v1" @@ -58,8 +59,8 @@ func (_m *ReaperManager) Connect(ctx context.Context, _a1 *v1alpha1.Reaper, user return r0 } -// ConnectWithReaperRef provides a mock function with given fields: ctx, reaperRef, username, password -func (_m *ReaperManager) ConnectWithReaperRef(ctx context.Context, kc *alpha1.K8ssandraCluster, username, password string) error { +// ConnectWithReaperRef provides a mock function with given fields: ctx, kc, username, password +func (_m *ReaperManager) ConnectWithReaperRef(ctx context.Context, kc *k8ssandrav1alpha1.K8ssandraCluster, username string, password string) error { ret := _m.Called(ctx, kc, username, password) if len(ret) == 0 { @@ -67,7 +68,7 @@ func (_m *ReaperManager) ConnectWithReaperRef(ctx context.Context, kc *alpha1.K8 } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *alpha1.K8ssandraCluster, string, string) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *k8ssandrav1alpha1.K8ssandraCluster, string, string) error); ok { r0 = rf(ctx, kc, username, password) } else { r0 = ret.Error(0)