Skip to content

Commit

Permalink
Add CSI VolumeSnapshot client back.
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Jiang <[email protected]>
  • Loading branch information
Xun Jiang committed Oct 15, 2022
1 parent b5b4db2 commit 8a188fc
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 22 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/5449-blackpiglet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add VolumeSnapshot client back.
32 changes: 31 additions & 1 deletion pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
Expand All @@ -49,6 +50,7 @@ import (
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1informers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/storage"
Expand Down Expand Up @@ -551,6 +553,32 @@ func (s *server) initRestic() error {
return nil
}

func (s *server) getCSIVolumeSnapshotListers() snapshotv1listers.VolumeSnapshotLister {
// Make empty listers that will only be populated if CSI is properly enabled.
var vsLister snapshotv1listers.VolumeSnapshotLister
var err error

// If CSI is enabled, check for the CSI groups and generate the listers
// If CSI isn't enabled, return empty listers.
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
_, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1api.SchemeGroupVersion.String())
switch {
case apierrors.IsNotFound(err):
// CSI is enabled, but the required CRDs aren't installed, so halt.
s.logger.Fatalf("The '%s' feature flag was specified, but CSI API group [%s] was not found.", velerov1api.CSIFeatureFlag, snapshotv1api.SchemeGroupVersion.String())
case err == nil:
// CSI is enabled, and the resources were found.
// Instantiate the listers fully
s.logger.Debug("Creating CSI listers")
// Access the wrapped factory directly here since we've already done the feature flag check above to know it's safe.
vsLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshots().Lister()
case err != nil:
cmd.CheckError(err)
}
}
return vsLister
}

func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string) error {
s.logger.Info("Starting controllers")

Expand Down Expand Up @@ -610,8 +638,10 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
defaultVolumeSnapshotLocations,
s.metrics,
s.config.formatFlag.Parse(),
backupStoreGetter,
s.config.formatFlag.Parse(),
s.getCSIVolumeSnapshotListers(),
s.csiSnapshotClient,
s.credentialFileStore,
)

Expand Down
47 changes: 27 additions & 20 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/csi"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/storage"
Expand Down Expand Up @@ -92,6 +94,8 @@ type backupController struct {
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
formatFlag logging.Format
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister
volumeSnapshotClient *snapshotterClientSet.Clientset
credentialFileStore credentials.FileStore
}

Expand All @@ -112,8 +116,10 @@ func NewBackupController(
volumeSnapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
defaultSnapshotLocations map[string]string,
metrics *metrics.ServerMetrics,
formatFlag logging.Format,
backupStoreGetter persistence.ObjectBackupStoreGetter,
formatFlag logging.Format,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
volumeSnapshotClient *snapshotterClientSet.Clientset,
credentialStore credentials.FileStore,
) Interface {
c := &backupController{
Expand All @@ -134,8 +140,10 @@ func NewBackupController(
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
backupStoreGetter: backupStoreGetter,
formatFlag: formatFlag,
volumeSnapshotLister: volumeSnapshotLister,
volumeSnapshotClient: volumeSnapshotClient,
credentialFileStore: credentialStore,
}

Expand Down Expand Up @@ -653,19 +661,19 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
selector := label.NewSelectorForBackup(backup.Name)
// Listers are wrapped in a nil check out of caution, since they may not be populated based on the
// EnableCSI feature flag. This is more to guard against programmer error, as they shouldn't be nil
// when EnableCSI is on.
vsList := &snapshotv1api.VolumeSnapshotList{}
vscList := &snapshotv1api.VolumeSnapshotContentList{}

err = c.kbClient.List(context.Background(), vsList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vsList.Items) >= 0 {
volumeSnapshots = vsList.Items
if c.volumeSnapshotLister != nil {
tmpVSs, err := c.volumeSnapshotLister.List(selector)
if err != nil {
backupLog.Error(err)
}

for _, vs := range tmpVSs {
volumeSnapshots = append(volumeSnapshots, *vs)
}
}

err = c.checkVolumeSnapshotReadyToUse(context.Background(), volumeSnapshots, backup.Spec.CSISnapshotTimeout.Duration)
if err != nil {
backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error())
Expand All @@ -681,19 +689,19 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {
}

vsClassSet := sets.NewString()
for _, vsc := range volumeSnapshotContents {
for index := range volumeSnapshotContents {
// persist the volumesnapshotclasses referenced by vsc
if vsc.Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*vsc.Spec.VolumeSnapshotClassName) {
if volumeSnapshotContents[index].Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName) {
vsClass := &snapshotv1api.VolumeSnapshotClass{}
if err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *vsc.Spec.VolumeSnapshotClassName}, vsClass); err != nil {
if err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *volumeSnapshotContents[index].Spec.VolumeSnapshotClassName}, vsClass); err != nil {
backupLog.Error(err)
} else {
vsClassSet.Insert(*vsc.Spec.VolumeSnapshotClassName)
vsClassSet.Insert(*volumeSnapshotContents[index].Spec.VolumeSnapshotClassName)
volumeSnapshotClasses = append(volumeSnapshotClasses, *vsClass)
}
}

if err := csi.ResetVolumeSnapshotContent(vsc); err != nil {
if err := csi.ResetVolumeSnapshotContent(&volumeSnapshotContents[index]); err != nil {
backupLog.Error(err)
}
}
Expand Down Expand Up @@ -917,8 +925,7 @@ func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, vo
volumeSnapshot := vs
eg.Go(func() error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
tmpVS := &snapshotv1api.VolumeSnapshot{}
err := c.kbClient.Get(ctx, kbclient.ObjectKey{Name: volumeSnapshot.Name, Namespace: volumeSnapshot.Namespace}, tmpVS)
tmpVS, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(ctx, volumeSnapshot.Name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name))
}
Expand Down Expand Up @@ -995,7 +1002,7 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api.

// Delete VolumeSnapshot from cluster
logger.Debugf("Deleting VolumeSnapshotContent %s", vsc.Name)
err := c.kbClient.Delete(context.TODO(), &vs)
err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{})
if err != nil {
logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/csi/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// It will move the snapshot Handle to the source to avoid the snapshot-controller creating a snapshot when it's
// synced by the backup sync controller.
// It will return an error if the snapshot handle is not set, which should not happen when this func is called.
func ResetVolumeSnapshotContent(snapCont snapshotv1api.VolumeSnapshotContent) error {
func ResetVolumeSnapshotContent(snapCont *snapshotv1api.VolumeSnapshotContent) error {
if snapCont.Status != nil && snapCont.Status.SnapshotHandle != nil && len(*snapCont.Status.SnapshotHandle) > 0 {
v := *snapCont.Status.SnapshotHandle
snapCont.Spec.Source = snapshotv1api.VolumeSnapshotContentSource{
Expand Down

0 comments on commit 8a188fc

Please sign in to comment.