Skip to content

Commit

Permalink
refactor backup snapshot status updates into UpdateBackupSnapshotsSta…
Browse files Browse the repository at this point in the history
…tus() and run in backup_finalizer_controller

Signed-off-by: Tiger Kaovilai <[email protected]>
  • Loading branch information
kaovilai committed Nov 2, 2023
1 parent 886e074 commit 60a0232
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 77 deletions.
3 changes: 0 additions & 3 deletions pkg/backup/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"fmt"
"sort"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/hook"
"github.com/vmware-tanzu/velero/internal/resourcepolicies"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
Expand Down Expand Up @@ -51,7 +49,6 @@ type Request struct {
VolumeSnapshots []*volume.Snapshot
PodVolumeBackups []*velerov1api.PodVolumeBackup
BackedUpItems map[itemKey]struct{}
CSISnapshots []snapshotv1api.VolumeSnapshot
itemOperationsList *[]*itemoperation.BackupOperation
ResPolicies *resourcepolicies.Policies
SkippedPVTracker *skipPVTracker
Expand Down
64 changes: 64 additions & 0 deletions pkg/backup/snapshots.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package backup

import (
"context"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
)

// Common function to update the status of CSI snapshots
// returns VolumeSnapshot, VolumeSnapshotContent, VolumeSnapshotClasses referenced
func UpdateBackupSnapshotsStatus(client kbclient.Client, volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, backup *velerov1api.Backup, backupLog logrus.FieldLogger) ([]snapshotv1api.VolumeSnapshot, []snapshotv1api.VolumeSnapshotContent, []snapshotv1api.VolumeSnapshotClass) {
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}

if volumeSnapshotLister != nil {
tmpVSs, err := volumeSnapshotLister.List(label.NewSelectorForBackup(backup.Name))
if err != nil {
backupLog.Error(err)
}
for _, vs := range tmpVSs {
volumeSnapshots = append(volumeSnapshots, *vs)
}

Check warning on line 33 in pkg/backup/snapshots.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/snapshots.go#L19-L33

Added lines #L19 - L33 were not covered by tests
}

err := client.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vscList.Items) >= 0 {
volumeSnapshotContents = vscList.Items
}

Check warning on line 42 in pkg/backup/snapshots.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/snapshots.go#L36-L42

Added lines #L36 - L42 were not covered by tests

vsClassSet := sets.NewString()
for index := range volumeSnapshotContents {
// persist the volumesnapshotclasses referenced by vsc
if volumeSnapshotContents[index].Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName) {
vsClass := &snapshotv1api.VolumeSnapshotClass{}
if err := client.Get(context.TODO(), kbclient.ObjectKey{Name: *volumeSnapshotContents[index].Spec.VolumeSnapshotClassName}, vsClass); err != nil {
backupLog.Error(err)
} else {
vsClassSet.Insert(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName)
volumeSnapshotClasses = append(volumeSnapshotClasses, *vsClass)
}

Check warning on line 54 in pkg/backup/snapshots.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/snapshots.go#L44-L54

Added lines #L44 - L54 were not covered by tests
}
}
backup.Status.CSIVolumeSnapshotsAttempted = len(volumeSnapshots)
for _, vs := range volumeSnapshots {
if vs.Status != nil && boolptr.IsSetToTrue(vs.Status.ReadyToUse) {
backup.Status.CSIVolumeSnapshotsCompleted++
}

Check warning on line 61 in pkg/backup/snapshots.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/snapshots.go#L57-L61

Added lines #L57 - L61 were not covered by tests
}
return volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses

Check warning on line 63 in pkg/backup/snapshots.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/snapshots.go#L63

Added line #L63 was not covered by tests
}
2 changes: 1 addition & 1 deletion pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
backupStoreGetter,
s.config.formatFlag.Parse(),
s.csiSnapshotLister,
s.csiSnapshotClient,
s.credentialFileStore,
s.config.maxConcurrentK8SConnections,
s.config.defaultSnapshotMoveData,
Expand Down Expand Up @@ -832,6 +831,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
cmd.CheckError(err)
r := controller.NewBackupFinalizerReconciler(
s.mgr.GetClient(),
s.csiSnapshotLister,

Check warning on line 834 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L834

Added line #L834 was not covered by tests
clock.RealClock{},
backupper,
newPluginManager,
Expand Down
59 changes: 3 additions & 56 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -112,7 +111,6 @@ func NewBackupReconciler(
backupStoreGetter persistence.ObjectBackupStoreGetter,
formatFlag logging.Format,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
volumeSnapshotClient snapshotterClientSet.Interface,
credentialStore credentials.FileStore,
maxConcurrentK8SConnections int,
defaultSnapshotMoveData bool,
Expand All @@ -138,7 +136,6 @@ func NewBackupReconciler(
backupStoreGetter: backupStoreGetter,
formatFlag: formatFlag,
volumeSnapshotLister: volumeSnapshotLister,
volumeSnapshotClient: volumeSnapshotClient,
credentialFileStore: credentialStore,
maxConcurrentK8SConnections: maxConcurrentK8SConnections,
defaultSnapshotMoveData: defaultSnapshotMoveData,
Expand Down Expand Up @@ -660,65 +657,15 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
fatalErrs = append(fatalErrs, err)
}

// Empty slices here so that they can be passed in to the persistBackup call later, regardless of whether or not CSI's enabled.
// This way, we only make the Lister call if the feature flag's on.
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
if boolptr.IsSetToTrue(backup.Spec.SnapshotMoveData) {
backupLog.Info("backup SnapshotMoveData is set to true, skip VolumeSnapshot resource persistence.")
} else if features.IsEnabled(velerov1api.CSIFeatureFlag) {
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}

if b.volumeSnapshotLister != nil {
tmpVSs, err := b.volumeSnapshotLister.List(label.NewSelectorForBackup(backup.Name))
if err != nil {
backupLog.Error(err)
}
for _, vs := range tmpVSs {
volumeSnapshots = append(volumeSnapshots, *vs)
}
}

backup.CSISnapshots = volumeSnapshots

err = b.kbClient.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vscList.Items) >= 0 {
volumeSnapshotContents = vscList.Items
}

vsClassSet := sets.NewString()
for index := range volumeSnapshotContents {
// persist the volumesnapshotclasses referenced by vsc
if volumeSnapshotContents[index].Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName) {
vsClass := &snapshotv1api.VolumeSnapshotClass{}
if err := b.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *volumeSnapshotContents[index].Spec.VolumeSnapshotClassName}, vsClass); err != nil {
backupLog.Error(err)
} else {
vsClassSet.Insert(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName)
volumeSnapshotClasses = append(volumeSnapshotClasses, *vsClass)
}
}
}
}

// native snapshots phase will either be failed or completed right away
// https://github.com/kaovilai/velero/blob/de3ea52f0cc478e99efa7b9524c7f353514261a4/pkg/backup/item_backupper.go#L632-L639
backup.Status.VolumeSnapshotsAttempted = len(backup.VolumeSnapshots)
for _, snap := range backup.VolumeSnapshots {
if snap.Status.Phase == volume.SnapshotPhaseCompleted {
backup.Status.VolumeSnapshotsCompleted++
}
}

backup.Status.CSIVolumeSnapshotsAttempted = len(backup.CSISnapshots)
for _, vs := range backup.CSISnapshots {
if vs.Status != nil && boolptr.IsSetToTrue(vs.Status.ReadyToUse) {
backup.Status.CSIVolumeSnapshotsCompleted++
}
}
volumeSnapshots, volumeSnapshotContents, volumeSnapshotClasses := pkgbackup.UpdateBackupSnapshotsStatus(b.kbClient, b.volumeSnapshotLister, backup.Backup, backupLog)

// Iterate over backup item operations and update progress.
// Any errors on operations at this point should be added to backup errors.
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"k8s.io/utils/clock"
testclocks "k8s.io/utils/clock/testing"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
Expand Down Expand Up @@ -1059,7 +1058,7 @@ func TestProcessBackupCompletions(t *testing.T) {
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 0,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
Expand Down Expand Up @@ -1181,7 +1180,7 @@ func TestProcessBackupCompletions(t *testing.T) {
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 0,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
Expand Down Expand Up @@ -1263,7 +1262,7 @@ func TestProcessBackupCompletions(t *testing.T) {
FormatVersion: "1.1.0",
StartTimestamp: &timestamp,
Expiration: &timestamp,
CSIVolumeSnapshotsAttempted: 0,
CSIVolumeSnapshotsAttempted: 1,
CSIVolumeSnapshotsCompleted: 0,
},
},
Expand Down Expand Up @@ -1710,7 +1709,7 @@ func TestPatchResourceWorksWithStatus(t *testing.T) {
},
}
// check original exists
if err := fakeClient.Get(context.Background(), client.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
if err := fakeClient.Get(context.Background(), kbclient.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
t.Errorf("PatchResource() error = %v", err)
}
// ignore resourceVersion
Expand All @@ -1720,7 +1719,7 @@ func TestPatchResourceWorksWithStatus(t *testing.T) {
t.Errorf("PatchResource() error = %v, wantErr %v", err, tt.wantErr)
}
// check updated exists
if err := fakeClient.Get(context.Background(), client.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
if err := fakeClient.Get(context.Background(), kbclient.ObjectKeyFromObject(tt.args.updated), fromCluster); err != nil {
t.Errorf("PatchResource() error = %v", err)
}

Expand Down
32 changes: 23 additions & 9 deletions pkg/controller/backup_finalizer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import (
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
"github.com/vmware-tanzu/velero/pkg/metrics"
Expand All @@ -40,19 +43,21 @@ import (

// backupFinalizerReconciler reconciles a Backup object
type backupFinalizerReconciler struct {
client kbclient.Client
clock clocks.WithTickerAndDelayedExecution
backupper pkgbackup.Backupper
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
log logrus.FieldLogger
client kbclient.Client
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister
clock clocks.WithTickerAndDelayedExecution
backupper pkgbackup.Backupper
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
log logrus.FieldLogger
}

// NewBackupFinalizerReconciler initializes and returns backupFinalizerReconciler struct.
func NewBackupFinalizerReconciler(
client kbclient.Client,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
clock clocks.WithTickerAndDelayedExecution,
backupper pkgbackup.Backupper,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
Expand Down Expand Up @@ -186,7 +191,16 @@ func (r *backupFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
backup.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()}
recordBackupMetrics(log, backup, outBackupFile, r.metrics, true)

CSISnapshotsNotReady := errors.New("CSI snapshots not ready")
retry.OnError(retry.DefaultBackoff, func(err error) bool {
return err == CSISnapshotsNotReady

Check warning on line 196 in pkg/controller/backup_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_finalizer_controller.go#L196

Added line #L196 was not covered by tests
}, func() error {
pkgbackup.UpdateBackupSnapshotsStatus(r.client, r.volumeSnapshotLister, backup, log)
if backup.Status.CSIVolumeSnapshotsCompleted < backup.Status.CSIVolumeSnapshotsAttempted {
return CSISnapshotsNotReady
}

Check warning on line 201 in pkg/controller/backup_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/backup_finalizer_controller.go#L200-L201

Added lines #L200 - L201 were not covered by tests
return nil
})
// update backup metadata in object store
backupJSON := new(bytes.Buffer)
if err := encode.To(backup, "json", backupJSON); err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/backup_finalizer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -43,12 +44,14 @@ import (
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
velerotestmocks "github.com/vmware-tanzu/velero/pkg/test/mocks"
)

func mockBackupFinalizerReconciler(fakeClient kbclient.Client, fakeClock *testclocks.FakeClock) (*backupFinalizerReconciler, *fakeBackupper) {
func mockBackupFinalizerReconciler(fakeClient kbclient.Client, fakeVolumeSnapshotLister snapshotv1listers.VolumeSnapshotLister, fakeClock *testclocks.FakeClock) (*backupFinalizerReconciler, *fakeBackupper) {
backupper := new(fakeBackupper)
return NewBackupFinalizerReconciler(
fakeClient,
fakeVolumeSnapshotLister,
fakeClock,
backupper,
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
Expand Down Expand Up @@ -160,7 +163,10 @@ func TestBackupFinalizerReconcile(t *testing.T) {
}

fakeClient := velerotest.NewFakeControllerRuntimeClient(t, initObjs...)
reconciler, backupper := mockBackupFinalizerReconciler(fakeClient, fakeClock)

fakeVolumeSnapshotLister := velerotestmocks.NewVolumeSnapshotLister(t)

reconciler, backupper := mockBackupFinalizerReconciler(fakeClient, fakeVolumeSnapshotLister, fakeClock)
pluginManager.On("CleanupClients").Return(nil)
backupStore.On("GetBackupItemOperations", test.backup.Name).Return(test.backupOperations, nil)
backupStore.On("GetBackupContents", mock.Anything).Return(io.NopCloser(bytes.NewReader([]byte("hello world"))), nil)
Expand Down
20 changes: 20 additions & 0 deletions pkg/test/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package test

import (
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"k8s.io/apimachinery/pkg/labels"
)

// VolumeSnapshotLister helps list VolumeSnapshots.
// All objects returned here must be treated as read-only.
//
//go:generate mockery --name VolumeSnapshotLister
type VolumeSnapshotLister interface {
// List lists all VolumeSnapshots in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*snapshotv1.VolumeSnapshot, err error)
// VolumeSnapshots returns an object that can list and get VolumeSnapshots.
VolumeSnapshots(namespace string) snapshotv1listers.VolumeSnapshotNamespaceLister
snapshotv1listers.VolumeSnapshotListerExpansion
}
Loading

0 comments on commit 60a0232

Please sign in to comment.