Skip to content

Commit

Permalink
Move VS deletion to backup-finalizer-controller (vmware-tanzu#255)
Browse files Browse the repository at this point in the history
pass missing params during controller registration

refactor code to avoid code duplication and fix tests

remove unused function

fix via make update
  • Loading branch information
shubham-pampattiwar authored Apr 7, 2023
1 parent 410e203 commit 6c6cbf7
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 46 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
)
cmd.CheckError(err)
r := controller.NewBackupFinalizerReconciler(
s.ctx,
s.mgr.GetClient(),
clock.RealClock{},
backupper,
Expand All @@ -812,6 +813,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
backupStoreGetter,
s.logger,
s.metrics,
s.csiSnapshotLister,
s.csiSnapshotClient,
)
if err := r.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupFinalizer)
Expand Down
49 changes: 22 additions & 27 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}

volumeSnapshots, err = b.waitVolumeSnapshotReadyToUse(context.Background(), backup.Spec.CSISnapshotTimeout.Duration, backup.Name)
volumeSnapshots, err = waitVolumeSnapshotReadyToUse(context.Background(), backup.Spec.CSISnapshotTimeout.Duration, backup.Name, b.logger, b.volumeSnapshotLister, b.volumeSnapshotClient)
if err != nil {
backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error())
}
Expand Down Expand Up @@ -689,11 +689,6 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
backupLog.Error(err)
}
}

// Delete the VolumeSnapshots created in the backup, when CSI feature is enabled.
if len(volumeSnapshots) > 0 && len(volumeSnapshotContents) > 0 {
b.deleteVolumeSnapshots(volumeSnapshots, volumeSnapshotContents, backupLog, b.maxConcurrentK8SConnections)
}
}

backup.Status.VolumeSnapshotsAttempted = len(backup.VolumeSnapshots)
Expand Down Expand Up @@ -937,17 +932,17 @@ func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
// using goroutine here instead of waiting in CSI plugin, because it's not easy to make BackupItemAction
// parallel by now. After BackupItemAction parallel is implemented, this logic should be moved to CSI plugin
// as https://github.com/vmware-tanzu/velero-plugin-for-csi/pull/100
func (b *backupReconciler) waitVolumeSnapshotReadyToUse(ctx context.Context,
csiSnapshotTimeout time.Duration, backupName string) ([]snapshotv1api.VolumeSnapshot, error) {
func waitVolumeSnapshotReadyToUse(ctx context.Context,
csiSnapshotTimeout time.Duration, backupName string, logger logrus.FieldLogger, volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, volumeSnapshotClient snapshotterClientSet.Interface) ([]snapshotv1api.VolumeSnapshot, error) {
eg, _ := errgroup.WithContext(ctx)
timeout := csiSnapshotTimeout
interval := 5 * time.Second
volumeSnapshots := make([]snapshotv1api.VolumeSnapshot, 0)

if b.volumeSnapshotLister != nil {
tmpVSs, err := b.volumeSnapshotLister.List(label.NewSelectorForBackup(backupName))
if volumeSnapshotLister != nil {
tmpVSs, err := volumeSnapshotLister.List(label.NewSelectorForBackup(backupName))
if err != nil {
b.logger.Error(err)
logger.Error(err)
return volumeSnapshots, err
}
for _, vs := range tmpVSs {
Expand All @@ -962,22 +957,22 @@ func (b *backupReconciler) waitVolumeSnapshotReadyToUse(ctx context.Context,
volumeSnapshot := volumeSnapshots[index]
eg.Go(func() error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
tmpVS, err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(b.ctx, volumeSnapshot.Name, metav1.GetOptions{})
tmpVS, err := volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(ctx, volumeSnapshot.Name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name))
}
if tmpVS.Status == nil || tmpVS.Status.BoundVolumeSnapshotContentName == nil || !boolptr.IsSetToTrue(tmpVS.Status.ReadyToUse) {
b.logger.Infof("Waiting for CSI driver to reconcile volumesnapshot %s/%s. Retrying in %ds", volumeSnapshot.Namespace, volumeSnapshot.Name, interval/time.Second)
logger.Infof("Waiting for CSI driver to reconcile volumesnapshot %s/%s. Retrying in %ds", volumeSnapshot.Namespace, volumeSnapshot.Name, interval/time.Second)
return false, nil
}

b.logger.Debugf("VolumeSnapshot %s/%s turned into ReadyToUse.", volumeSnapshot.Namespace, volumeSnapshot.Name)
logger.Debugf("VolumeSnapshot %s/%s turned into ReadyToUse.", volumeSnapshot.Namespace, volumeSnapshot.Name)
// Put the ReadyToUse VolumeSnapshot element in the result channel.
vsChannel <- *tmpVS
return true, nil
})
if err == wait.ErrWaitTimeout {
b.logger.Errorf("Timed out awaiting reconciliation of volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name)
logger.Errorf("Timed out awaiting reconciliation of volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name)
}
return err
})
Expand All @@ -999,9 +994,9 @@ func (b *backupReconciler) waitVolumeSnapshotReadyToUse(ctx context.Context,
// which will cause snapshot deletion on cloud provider, then backup cannot restore the PV.
// If DeletionPolicy is Retain, just delete it. If DeletionPolicy is Delete, need to
// change DeletionPolicy to Retain before deleting VS, then change DeletionPolicy back to Delete.
func (b *backupReconciler) deleteVolumeSnapshots(volumeSnapshots []snapshotv1api.VolumeSnapshot,
func deleteVolumeSnapshots(volumeSnapshots []snapshotv1api.VolumeSnapshot,
volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent,
logger logrus.FieldLogger, maxConcurrent int) {
logger logrus.FieldLogger, maxConcurrent int, kbClient kbclient.Client, volumeSnapshotClient snapshotterClientSet.Interface, resourceTimeout time.Duration) {
var wg sync.WaitGroup
vscMap := make(map[string]snapshotv1api.VolumeSnapshotContent)
for _, vsc := range volumeSnapshotContents {
Expand All @@ -1024,7 +1019,7 @@ func (b *backupReconciler) deleteVolumeSnapshots(volumeSnapshots []snapshotv1api
wg.Done()
return
}
b.deleteVolumeSnapshot(vs, vscMap, logger)
deleteVolumeSnapshot(vs, vscMap, logger, kbClient, volumeSnapshotClient, resourceTimeout)
}
}()
}
Expand All @@ -1039,7 +1034,7 @@ func (b *backupReconciler) deleteVolumeSnapshots(volumeSnapshots []snapshotv1api

// deleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot
// instance.
func (b *backupReconciler) deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vscMap map[string]snapshotv1api.VolumeSnapshotContent, logger logrus.FieldLogger) {
func deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vscMap map[string]snapshotv1api.VolumeSnapshotContent, logger logrus.FieldLogger, kbClient kbclient.Client, volumeSnapshotClient snapshotterClientSet.Interface, resourceTimeout time.Duration) {
var vsc snapshotv1api.VolumeSnapshotContent
modifyVSCFlag := false
if vs.Status != nil &&
Expand All @@ -1066,14 +1061,14 @@ func (b *backupReconciler) deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot,
logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name)
original := vsc.DeepCopy()
vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain
if err := b.kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil {
if err := kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil {
logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error())
return
}

defer func() {
logger.Debugf("Start to recreate VolumeSnapshotContent %s", vsc.Name)
err := b.recreateVolumeSnapshotContent(vsc)
err := recreateVolumeSnapshotContent(vsc, resourceTimeout, kbClient)
if err != nil {
logger.Errorf("fail to recreate VolumeSnapshotContent %s: %s", vsc.Name, err.Error())
}
Expand All @@ -1082,7 +1077,7 @@ func (b *backupReconciler) deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot,

// Delete VolumeSnapshot from cluster
logger.Debugf("Deleting VolumeSnapshot %s/%s", vs.Namespace, vs.Name)
err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{})
err := volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{})
if err != nil {
logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error())
}
Expand All @@ -1093,19 +1088,19 @@ func (b *backupReconciler) deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot,
// and Source. Source is updated to let csi-controller thinks the VSC is statically provsisioned with VS.
// Set VolumeSnapshotRef's UID to nil will let the csi-controller finds out the related VS is gone, then
// VSC can be deleted.
func (b *backupReconciler) recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent) error {
timeout := b.resourceTimeout
func recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent, resourceTimeout time.Duration, kbClient kbclient.Client) error {
timeout := resourceTimeout
interval := 1 * time.Second

err := b.kbClient.Delete(context.TODO(), &vsc)
err := kbClient.Delete(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to delete VolumeSnapshotContent: %s", vsc.Name)
}

// Check VolumeSnapshotContents is already deleted, before re-creating it.
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
tmpVSC := &snapshotv1api.VolumeSnapshotContent{}
err := b.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: vsc.Name}, tmpVSC)
err := kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: vsc.Name}, tmpVSC)
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
Expand Down Expand Up @@ -1134,7 +1129,7 @@ func (b *backupReconciler) recreateVolumeSnapshotContent(vsc snapshotv1api.Volum
}
// ResourceVersion shouldn't exist for new creation.
vsc.ResourceVersion = ""
err = b.kbClient.Create(context.TODO(), &vsc)
err = kbClient.Create(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to create VolumeSnapshotContent %s", vsc.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ func TestDeleteVolumeSnapshots(t *testing.T) {
}
logger := logging.DefaultLogger(logrus.DebugLevel, logging.FormatText)

c.deleteVolumeSnapshots(tc.vsArray, tc.vscArray, logger, 30)
deleteVolumeSnapshots(tc.vsArray, tc.vscArray, logger, 30, c.kbClient, c.volumeSnapshotClient, 10)

vsList, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots("velero").List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)
Expand Down
76 changes: 60 additions & 16 deletions pkg/controller/backup_finalizer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"context"
"os"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,6 +32,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/pkg/features"
"github.com/vmware-tanzu/velero/pkg/label"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
"github.com/vmware-tanzu/velero/pkg/metrics"
Expand All @@ -40,18 +46,22 @@ import (

// backupFinalizerReconciler reconciles a Backup object
type backupFinalizerReconciler struct {
client kbclient.Client
clock clocks.WithTickerAndDelayedExecution
backupper pkgbackup.Backupper
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
log logrus.FieldLogger
ctx context.Context
client kbclient.Client
clock clocks.WithTickerAndDelayedExecution
backupper pkgbackup.Backupper
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
log logrus.FieldLogger
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister
volumeSnapshotClient snapshotterClientSet.Interface
}

// NewBackupFinalizerReconciler initializes and returns backupFinalizerReconciler struct.
func NewBackupFinalizerReconciler(
ctx context.Context,
client kbclient.Client,
clock clocks.WithTickerAndDelayedExecution,
backupper pkgbackup.Backupper,
Expand All @@ -60,16 +70,21 @@ func NewBackupFinalizerReconciler(
backupStoreGetter persistence.ObjectBackupStoreGetter,
log logrus.FieldLogger,
metrics *metrics.ServerMetrics,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
volumeSnapshotClient snapshotterClientSet.Interface,
) *backupFinalizerReconciler {
return &backupFinalizerReconciler{
client: client,
clock: clock,
backupper: backupper,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
backupStoreGetter: backupStoreGetter,
log: log,
metrics: metrics,
ctx: ctx,
client: client,
clock: clock,
backupper: backupper,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
backupStoreGetter: backupStoreGetter,
log: log,
metrics: metrics,
volumeSnapshotLister: volumeSnapshotLister,
volumeSnapshotClient: volumeSnapshotClient,
}
}

Expand Down Expand Up @@ -201,6 +216,35 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, errors.Wrap(err, "error uploading backup final contents")
}
}

//Defer Deleting CSI Snapshots
// Empty slices here so that they can be passed in to the persistBackup call later, regardless of whether or not CSI's enabled.
// This way, we only make the Lister call if the feature flag's on.
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}

volumeSnapshots, err = waitVolumeSnapshotReadyToUse(context.Background(), backup.Spec.CSISnapshotTimeout.Duration, backup.Name, r.log, r.volumeSnapshotLister, r.volumeSnapshotClient)
if err != nil {
log.Errorf("fail to list VolumeSnapshots to be deleted: %s", err.Error())
}

err = r.client.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
log.Error(err)
}
if len(vscList.Items) >= 0 {
volumeSnapshotContents = vscList.Items
}

// Delete the VolumeSnapshots created in the backup, when CSI feature is enabled.
if len(volumeSnapshots) > 0 && len(volumeSnapshotContents) > 0 {
deleteVolumeSnapshots(volumeSnapshots, volumeSnapshotContents, log, 30, r.client, r.volumeSnapshotClient, 10)
}
}

return ctrl.Result{}, nil
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/controller/backup_finalizer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"testing"
"time"

snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
snapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -45,9 +50,10 @@ import (
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)

func mockBackupFinalizerReconciler(fakeClient kbclient.Client, fakeClock *testclocks.FakeClock) (*backupFinalizerReconciler, *fakeBackupper) {
func mockBackupFinalizerReconciler(fakectx context.Context, fakeClient kbclient.Client, fakeClock *testclocks.FakeClock, fakevolumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, fakevolumeSnapshotClient snapshotterClientSet.Interface) (*backupFinalizerReconciler, *fakeBackupper) {
backupper := new(fakeBackupper)
return NewBackupFinalizerReconciler(
fakectx,
fakeClient,
fakeClock,
backupper,
Expand All @@ -56,6 +62,8 @@ func mockBackupFinalizerReconciler(fakeClient kbclient.Client, fakeClock *testcl
NewFakeSingleObjectBackupStoreGetter(backupStore),
logrus.StandardLogger(),
metrics.NewServerMetrics(),
fakevolumeSnapshotLister,
fakevolumeSnapshotClient,
), backupper
}
func TestBackupFinalizerReconcile(t *testing.T) {
Expand Down Expand Up @@ -160,7 +168,11 @@ func TestBackupFinalizerReconcile(t *testing.T) {
}

fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...)
reconciler, backupper := mockBackupFinalizerReconciler(fakeClient, fakeClock)
vsClient := snapshotfake.NewSimpleClientset()
sharedInformers := snapshotinformers.NewSharedInformerFactory(vsClient, 0)
fakevolumeSnapshotLister := sharedInformers.Snapshot().V1().VolumeSnapshots().Lister()
fakevolumeSnapshotClient := vsClient
reconciler, backupper := mockBackupFinalizerReconciler(context.Background(), fakeClient, fakeClock, fakevolumeSnapshotLister, fakevolumeSnapshotClient)
pluginManager.On("CleanupClients").Return(nil)
backupStore.On("GetBackupItemOperations", test.backup.Name).Return(test.backupOperations, nil)
backupStore.On("GetBackupContents", mock.Anything).Return(ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil)
Expand Down

0 comments on commit 6c6cbf7

Please sign in to comment.