Skip to content

Commit

Permalink
K8OP-5 Do not create MedusaBackup if MadusaBakupJob did not fully suc…
Browse files Browse the repository at this point in the history
…ceed
  • Loading branch information
rzvoncek committed Oct 30, 2024
1 parent 992633e commit fdf5d78
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 76 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG/CHANGELOG-1.21.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ When cutting a new release, update the `unreleased` heading to the tag being gen

## unreleased


* [BUGFIX] [#1383](https://github.com/k8ssandra/k8ssandra-operator/issues/1383) Do not create MedusaBackup if MadusaBakupJob did not fully succeed
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ VECTOR ?= $(LOCALBIN)/bin/vector
CERT_MANAGER_VERSION ?= v1.12.2
KUSTOMIZE_VERSION ?= v4.5.7
CONTROLLER_TOOLS_VERSION ?= v0.14.0
GOLINT_VERSION ?= 1.55.0
GOLINT_VERSION ?= 1.61.0

cert-manager: ## Install cert-manager to the cluster
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/$(CERT_MANAGER_VERSION)/cert-manager.yaml
Expand Down
93 changes: 54 additions & 39 deletions controllers/medusa/medusabackupjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -72,9 +73,9 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

backup := instance.DeepCopy()
backupJob := instance.DeepCopy()

cassdcKey := types.NamespacedName{Namespace: backup.Namespace, Name: backup.Spec.CassandraDatacenter}
cassdcKey := types.NamespacedName{Namespace: backupJob.Namespace, Name: backupJob.Spec.CassandraDatacenter}
cassdc := &cassdcapi.CassandraDatacenter{}
err = r.Get(ctx, cassdcKey, cassdc)
if err != nil {
Expand All @@ -83,12 +84,12 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

// Set an owner reference on the backup job so that it can be cleaned up when the cassandra datacenter is deleted
if backup.OwnerReferences == nil {
if err = controllerutil.SetControllerReference(cassdc, backup, r.Scheme); err != nil {
if backupJob.OwnerReferences == nil {
if err = controllerutil.SetControllerReference(cassdc, backupJob, r.Scheme); err != nil {
logger.Error(err, "failed to set controller reference", "CassandraDatacenter", cassdcKey)
return ctrl.Result{}, err
}
if err = r.Update(ctx, backup); err != nil {
if err = r.Update(ctx, backupJob); err != nil {
logger.Error(err, "failed to update MedusaBackupJob with owner reference", "CassandraDatacenter", cassdcKey)
return ctrl.Result{}, err
} else {
Expand All @@ -104,36 +105,36 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

// If there is anything in progress, simply requeue the request until each pod has finished or errored
if len(backup.Status.InProgress) > 0 {
if len(backupJob.Status.InProgress) > 0 {
logger.Info("There are backups in progress, checking them..")
progress := make([]string, 0, len(backup.Status.InProgress))
patch := client.MergeFrom(backup.DeepCopy())
progress := make([]string, 0, len(backupJob.Status.InProgress))
patch := client.MergeFrom(backupJob.DeepCopy())

StatusCheck:
for _, podName := range backup.Status.InProgress {
for _, podName := range backupJob.Status.InProgress {
for _, pod := range pods {
if podName == pod.Name {
status, err := backupStatus(ctx, backup.ObjectMeta.Name, &pod, r.ClientFactory, logger)
status, err := backupStatus(ctx, backupJob.ObjectMeta.Name, &pod, r.ClientFactory, logger)
if err != nil {
return ctrl.Result{}, err
}

if status == medusa.StatusType_IN_PROGRESS {
progress = append(progress, podName)
} else if status == medusa.StatusType_SUCCESS {
backup.Status.Finished = append(backup.Status.Finished, podName)
backupJob.Status.Finished = append(backupJob.Status.Finished, podName)
} else if status == medusa.StatusType_FAILED || status == medusa.StatusType_UNKNOWN {
backup.Status.Failed = append(backup.Status.Failed, podName)
backupJob.Status.Failed = append(backupJob.Status.Failed, podName)
}

continue StatusCheck
}
}
}

if len(backup.Status.InProgress) != len(progress) {
backup.Status.InProgress = progress
if err := r.Status().Patch(ctx, backup, patch); err != nil {
if len(backupJob.Status.InProgress) != len(progress) {
backupJob.Status.InProgress = progress
if err := r.Status().Patch(ctx, backupJob, patch); err != nil {
logger.Error(err, "failed to patch status")
return ctrl.Result{}, err
}
Expand All @@ -147,42 +148,49 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

// If the backup is already finished, there is nothing to do.
if medusaBackupFinished(backup) {
if medusaBackupFinished(backupJob) {
logger.Info("Backup operation is already finished")
return ctrl.Result{Requeue: false}, nil
}

// First check to see if the backup is already in progress
if !backup.Status.StartTime.IsZero() {
if !backupJob.Status.StartTime.IsZero() {
// If there is anything in progress, simply requeue the request
if len(backup.Status.InProgress) > 0 {
if len(backupJob.Status.InProgress) > 0 {
logger.Info("Backup is still in progress")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}

// there is nothing in progress, so the job is finished (not yet sure if successfully)
// Regardless of the success, we set the job finish time
// Note that the time here is not accurate, but that is ok. For now we are just
// using it as a completion marker.
patch := client.MergeFrom(backupJob.DeepCopy())
backupJob.Status.FinishTime = metav1.Now()
if err := r.Status().Patch(ctx, backupJob, patch); err != nil {
logger.Error(err, "failed to patch status with finish time")
return ctrl.Result{}, err
}

// if there are failures, we will end here and not proceed with creating a backup object
if len(backupJob.Status.Failed) > 0 {
logger.Info("Backup failed on some nodes", "BackupName", backupJob.Name, "Failed", backupJob.Status.Failed)
return ctrl.Result{Requeue: false}, nil
}

logger.Info("backup complete")

// The MedusaBackupJob is finished and we now need to create the MedusaBackup object.
backupSummary, err := r.getBackupSummary(ctx, backup, pods, logger)
// The MedusaBackupJob is finished successfully and we now need to create the MedusaBackup object.
backupSummary, err := r.getBackupSummary(ctx, backupJob, pods, logger)
if err != nil {
logger.Error(err, "Failed to get backup summary")
return ctrl.Result{}, err
}
if err := r.createMedusaBackup(ctx, backup, backupSummary, logger); err != nil {
if err := r.createMedusaBackup(ctx, backupJob, backupSummary, logger); err != nil {
logger.Error(err, "Failed to create MedusaBackup")
return ctrl.Result{}, err
}

// Set the finish time
// Note that the time here is not accurate, but that is ok. For now we are just
// using it as a completion marker.
patch := client.MergeFrom(backup.DeepCopy())
backup.Status.FinishTime = metav1.Now()
if err := r.Status().Patch(ctx, backup, patch); err != nil {
logger.Error(err, "failed to patch status with finish time")
return ctrl.Result{}, err
}

return ctrl.Result{Requeue: false}, nil
}

Expand All @@ -195,31 +203,31 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, medusa.BackupSidecarNotFound
}

patch := client.MergeFromWithOptions(backup.DeepCopy(), client.MergeFromWithOptimisticLock{})
patch := client.MergeFromWithOptions(backupJob.DeepCopy(), client.MergeFromWithOptimisticLock{})

backup.Status.StartTime = metav1.Now()
backupJob.Status.StartTime = metav1.Now()

if err := r.Status().Patch(ctx, backup, patch); err != nil {
if err := r.Status().Patch(ctx, backupJob, patch); err != nil {
logger.Error(err, "Failed to patch status")
// We received a stale object, requeue for next processing
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}

logger.Info("Starting backups")
patch = client.MergeFrom(backup.DeepCopy())
patch = client.MergeFrom(backupJob.DeepCopy())

for _, p := range pods {
logger.Info("starting backup", "CassandraPod", p.Name)
_, err := doMedusaBackup(ctx, backup.ObjectMeta.Name, backup.Spec.Type, &p, r.ClientFactory, logger)
_, err := doMedusaBackup(ctx, backupJob.ObjectMeta.Name, backupJob.Spec.Type, &p, r.ClientFactory, logger)
if err != nil {
logger.Error(err, "backup failed", "CassandraPod", p.Name)
}

backup.Status.InProgress = append(backup.Status.InProgress, p.Name)
backupJob.Status.InProgress = append(backupJob.Status.InProgress, p.Name)
}
// logger.Info("finished backup operations")
if err := r.Status().Patch(context.Background(), backup, patch); err != nil {
logger.Error(err, "failed to patch status", "Backup", fmt.Sprintf("%s/%s", backup.Name, backup.Namespace))
if err := r.Status().Patch(context.Background(), backupJob, patch); err != nil {
logger.Error(err, "failed to patch status", "Backup", fmt.Sprintf("%s/%s", backupJob.Name, backupJob.Namespace))
}

return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
Expand Down Expand Up @@ -317,10 +325,17 @@ func backupStatus(ctx context.Context, name string, pod *corev1.Pod, clientFacto
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
logger.Error(err, "Could not make a new medusa client")
return medusa.StatusType_UNKNOWN, err
} else {
resp, err := medusaClient.BackupStatus(ctx, name)
if err != nil {
// the gRPC client does not return proper NotFound error, we need to check the payload too
if errors.IsNotFound(err) || strings.Contains(err.Error(), "NotFound") {
logger.Info(fmt.Sprintf("did not find backup %s for pod %s", name, pod.Name))
return medusa.StatusType_UNKNOWN, nil
}
logger.Error(err, fmt.Sprintf("getting backup status for backup %s and pod %s failed", name, pod.Name))
return medusa.StatusType_UNKNOWN, err
}

Expand Down
110 changes: 89 additions & 21 deletions controllers/medusa/medusabackupjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,22 @@ import (
)

const (
medusaImageRepo = "test/medusa"
cassandraUserSecret = "medusa-secret"
defaultBackupName = "backup1"
dc1PodPrefix = "192.168.1."
dc2PodPrefix = "192.168.2."
fakeBackupFileCount = int64(13)
fakeBackupByteSize = int64(42)
fakeBackupHumanSize = "42.00 B"
fakeMaxBackupCount = 1
medusaImageRepo = "test/medusa"
cassandraUserSecret = "medusa-secret"
successfulBackupName = "good-backup"
failingBackupName = "bad-backup"
missingBackupName = "missing-backup"
dc1PodPrefix = "192.168.1."
dc2PodPrefix = "192.168.2."
fakeBackupFileCount = int64(13)
fakeBackupByteSize = int64(42)
fakeBackupHumanSize = "42.00 B"
fakeMaxBackupCount = 1
)

var (
alreadyReportedFailingBackup = false
alreadyReportedMissingBackup = false
)

func testMedusaBackupDatacenter(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) {
Expand Down Expand Up @@ -142,16 +149,24 @@ func testMedusaBackupDatacenter(t *testing.T, ctx context.Context, f *framework.
})
require.NoError(err, "failed to update dc1 status to ready")

backupCreated := createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, defaultBackupName)
backupCreated := createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, successfulBackupName)
require.True(backupCreated, "failed to create backup")

t.Log("verify that medusa gRPC clients are invoked")
require.Equal(map[string][]string{
fmt.Sprintf("%s:%d", getPodIpAddress(0, dc1.DatacenterName()), shared.BackupSidecarPort): {defaultBackupName},
fmt.Sprintf("%s:%d", getPodIpAddress(1, dc1.DatacenterName()), shared.BackupSidecarPort): {defaultBackupName},
fmt.Sprintf("%s:%d", getPodIpAddress(2, dc1.DatacenterName()), shared.BackupSidecarPort): {defaultBackupName},
fmt.Sprintf("%s:%d", getPodIpAddress(0, dc1.DatacenterName()), shared.BackupSidecarPort): {successfulBackupName},
fmt.Sprintf("%s:%d", getPodIpAddress(1, dc1.DatacenterName()), shared.BackupSidecarPort): {successfulBackupName},
fmt.Sprintf("%s:%d", getPodIpAddress(2, dc1.DatacenterName()), shared.BackupSidecarPort): {successfulBackupName},
}, medusaClientFactory.GetRequestedBackups(dc1.DatacenterName()))

// a failing backup is one that actually starts but fails (on one pod)
backupCreated = createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, failingBackupName)
require.False(backupCreated, "the backup object shouldn't have been created")

// a missing backup is one that never gets to start (on one pod)
backupCreated = createAndVerifyMedusaBackup(dc1Key, dc1, f, ctx, require, t, namespace, missingBackupName)
require.False(backupCreated, "the backup object shouldn't have been created")

err = f.DeleteK8ssandraCluster(ctx, client.ObjectKey{Namespace: kc.Namespace, Name: kc.Name}, timeout, interval)
require.NoError(err, "failed to delete K8ssandraCluster")
verifyObjectDoesNotExist(ctx, t, f, dc1Key, &cassdcapi.CassandraDatacenter{})
Expand Down Expand Up @@ -230,17 +245,23 @@ func createAndVerifyMedusaBackup(dcKey framework.ClusterKey, dc *cassdcapi.Cassa
if err != nil {
return false
}
t.Logf("backup finish time: %v", updated.Status.FinishTime)
t.Logf("backup finished: %v", updated.Status.Finished)
t.Logf("backup in progress: %v", updated.Status.InProgress)
return !updated.Status.FinishTime.IsZero() && len(updated.Status.Finished) == 3 && len(updated.Status.InProgress) == 0
t.Logf("backup %s finish time: %v", backupName, updated.Status.FinishTime)
t.Logf("backup %s failed: %v", backupName, updated.Status.Failed)
t.Logf("backup %s finished: %v", backupName, updated.Status.Finished)
t.Logf("backup %s in progress: %v", backupName, updated.Status.InProgress)
return !updated.Status.FinishTime.IsZero()
}, timeout, interval)

t.Log("verify that the MedusaBackup is created")
t.Log("check for the MedusaBackup being created")
medusaBackupKey := framework.NewClusterKey(dcKey.K8sContext, dcKey.Namespace, backupName)
medusaBackup := &api.MedusaBackup{}
err = f.Get(ctx, medusaBackupKey, medusaBackup)
require.NoError(err, "failed to get MedusaBackup")
if err != nil {
if errors.IsNotFound(err) {
return false
}
}
t.Log("verify the MedusaBackup is correct")
require.Equal(medusaBackup.Status.TotalNodes, dc.Spec.Size, "backup total nodes doesn't match dc nodes")
require.Equal(medusaBackup.Status.FinishedNodes, dc.Spec.Size, "backup finished nodes doesn't match dc nodes")
require.Equal(len(medusaBackup.Status.Nodes), int(dc.Spec.Size), "backup topology doesn't match dc topology")
Expand Down Expand Up @@ -336,6 +357,12 @@ type fakeMedusaClient struct {
}

func newFakeMedusaClient(dcName string) *fakeMedusaClient {
// the fake Medusa client keeps a bit of state in order to simulate different backup statuses
// more precisely, for some backups it will return not a success for some nodes
// we need to reset this state between tests
// doing it here is great since we make a new fake client for each test anyway
alreadyReportedFailingBackup = false
alreadyReportedMissingBackup = false
return &fakeMedusaClient{RequestedBackups: make([]string, 0), DcName: dcName}
}

Expand All @@ -349,8 +376,26 @@ func (c *fakeMedusaClient) CreateBackup(ctx context.Context, name string, backup
}

func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSummary, error) {

backups := make([]*medusa.BackupSummary, 0)

for _, name := range c.RequestedBackups {

// return status based on the backup name
// since we're implementing altogether different method of the Medusa client, we cannot reuse the BackupStatus logic
// but we still want to "mock" failing backups
// this does not get called per node/pod, se we don't need to track counts of what we returned
var status medusa.StatusType
if strings.HasPrefix(name, "good") {
status = *medusa.StatusType_SUCCESS.Enum()
} else if strings.HasPrefix(name, "bad") {
status = *medusa.StatusType_FAILED.Enum()
} else if strings.HasPrefix(name, "missing") {
status = *medusa.StatusType_UNKNOWN.Enum()
} else {
status = *medusa.StatusType_IN_PROGRESS.Enum()
}

backup := &medusa.BackupSummary{
BackupName: name,
StartTime: 0,
Expand All @@ -359,7 +404,7 @@ func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSumm
FinishedNodes: 3,
TotalObjects: fakeBackupFileCount,
TotalSize: fakeBackupByteSize,
Status: *medusa.StatusType_SUCCESS.Enum(),
Status: status,
Nodes: []*medusa.BackupNode{
{
Host: "host1",
Expand Down Expand Up @@ -387,8 +432,31 @@ func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSumm
}

func (c *fakeMedusaClient) BackupStatus(ctx context.Context, name string) (*medusa.BackupStatusResponse, error) {
// return different status for differently named backups
// but for each not-successful backup, return not-a-success only once
var status medusa.StatusType
if strings.HasPrefix(name, successfulBackupName) {
status = medusa.StatusType_SUCCESS
} else if strings.HasPrefix(name, failingBackupName) {
if !alreadyReportedFailingBackup {
status = medusa.StatusType_FAILED
alreadyReportedFailingBackup = true
} else {
status = medusa.StatusType_SUCCESS
}
} else if strings.HasPrefix(name, missingBackupName) {
if !alreadyReportedMissingBackup {
alreadyReportedMissingBackup = true
// reproducing what the gRPC client would send. sadly, it's not a proper NotFound error
return nil, fmt.Errorf("rpc error: code = NotFound desc = backup <%s> does not exist", name)
} else {
status = medusa.StatusType_SUCCESS
}
} else {
status = medusa.StatusType_IN_PROGRESS
}
return &medusa.BackupStatusResponse{
Status: medusa.StatusType_SUCCESS,
Status: status,
}, nil
}

Expand Down
Loading

0 comments on commit fdf5d78

Please sign in to comment.