Skip to content

Commit

Permalink
Patch newly dynamically provisioned PV with volume info to restore cu…
Browse files Browse the repository at this point in the history
…stom setting of PV

Signed-off-by: allenxu404 <[email protected]>
  • Loading branch information
allenxu404 committed Mar 18, 2024
1 parent 6ec1701 commit 67b5e82
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7504-allenxu404
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Patch newly dynamically provisioned PV with volume info to restore custom setting of PV
6 changes: 6 additions & 0 deletions pkg/builder/persistent_volume_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelect
}
return b
}

// Phase sets the PersistentVolume's phase.
func (b *PersistentVolumeBuilder) Phase(phase corev1api.PersistentVolumePhase) *PersistentVolumeBuilder {
b.object.Status.Phase = phase
return b
}
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
newPluginManager,
backupStoreGetter,
s.metrics,
s.crClient,

Check warning on line 1000 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1000

Added line #L1000 was not covered by tests
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.RestoreFinalizer)
}
Expand Down
187 changes: 184 additions & 3 deletions pkg/controller/restore_finalizer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,36 @@ package controller

import (
"context"
"fmt"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"

internalVolume "github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/restore"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/results"
)

const (
PVPatchMaximumDuration = 10 * time.Minute
)

type restoreFinalizerReconciler struct {
client.Client
namespace string
Expand All @@ -44,6 +56,7 @@ type restoreFinalizerReconciler struct {
backupStoreGetter persistence.ObjectBackupStoreGetter
metrics *metrics.ServerMetrics
clock clock.WithTickerAndDelayedExecution
crClient client.Client
}

func NewRestoreFinalizerReconciler(
Expand All @@ -53,6 +66,7 @@ func NewRestoreFinalizerReconciler(
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
metrics *metrics.ServerMetrics,
crClient client.Client,
) *restoreFinalizerReconciler {
return &restoreFinalizerReconciler{
Client: client,
Expand All @@ -62,6 +76,7 @@ func NewRestoreFinalizerReconciler(
backupStoreGetter: backupStoreGetter,
metrics: metrics,
clock: &clock.RealClock{},
crClient: crClient,
}
}

Expand Down Expand Up @@ -123,7 +138,27 @@ func (r *restoreFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, errors.Wrap(err, "error getting backup store")
}

finalizerCtx := &finalizerContext{log: log}
volumeInfo, err := backupStore.GetBackupVolumeInfos(restore.Spec.BackupName)
if err != nil {
log.WithError(err).Errorf("error getting volumeInfo for backup %s", restore.Spec.BackupName)
return ctrl.Result{}, errors.Wrap(err, "error getting volumeInfo")
}

Check warning on line 145 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L143-L145

Added lines #L143 - L145 were not covered by tests

restoredResourceList, err := backupStore.GetRestoredResourceList(restore.Name)
if err != nil {
log.WithError(err).Error("error getting restoredResourceList")
return ctrl.Result{}, errors.Wrap(err, "error getting restoredResourceList")
}

Check warning on line 151 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L149-L151

Added lines #L149 - L151 were not covered by tests

restoredPVCList := getRestoredPVCFromRestoredResourceList(restoredResourceList)

finalizerCtx := &finalizerContext{
logger: log,
restore: restore,
crClient: r.crClient,
volumeInfo: volumeInfo,
restoredPVCList: restoredPVCList,
}
warnings, errs := finalizerCtx.execute()

warningCnt := len(warnings.Velero) + len(warnings.Cluster)
Expand Down Expand Up @@ -200,14 +235,160 @@ func (r *restoreFinalizerReconciler) finishProcessing(restorePhase velerov1api.R
// finalizerContext includes all the dependencies required by finalization tasks and
// a function execute() to orderly implement task logic.
type finalizerContext struct {
log logrus.FieldLogger
logger logrus.FieldLogger
restore *velerov1api.Restore
crClient client.Client
volumeInfo []*internalVolume.VolumeInfo
restoredPVCList map[string]struct{}
}

func (ctx *finalizerContext) execute() (results.Result, results.Result) { //nolint:unparam //temporarily ignore the lint report: result 0 is always nil (unparam)
warnings, errs := results.Result{}, results.Result{}

// implement finalization tasks
ctx.log.Debug("Starting running execute()")
pdpErrs := ctx.patchDynamicPVWithVolumeInfo()
errs.Merge(&pdpErrs)

return warnings, errs
}

// patchDynamicPV patches newly dynamically provisioned PV using volume info
// in order to restore custom settings that would otherwise be lost during dynamic PV recreation.
func (ctx *finalizerContext) patchDynamicPVWithVolumeInfo() (errs results.Result) {
ctx.logger.Info("patching newly dynamically provisioned PV starts")

var pvWaitGroup sync.WaitGroup
var resultLock sync.Mutex
maxConcurrency := 3
semaphore := make(chan struct{}, maxConcurrency)

for _, volumeItem := range ctx.volumeInfo {
if (volumeItem.BackupMethod == internalVolume.PodVolumeBackup || volumeItem.BackupMethod == internalVolume.CSISnapshot) && volumeItem.PVInfo != nil {
// Determine restored PVC namespace
restoredNamespace := volumeItem.PVCNamespace
if remapped, ok := ctx.restore.Spec.NamespaceMapping[restoredNamespace]; ok {
restoredNamespace = remapped
}

// Check if PVC was restored in previous phase
pvcKey := fmt.Sprintf("%s/%s", restoredNamespace, volumeItem.PVCName)
if _, restored := ctx.restoredPVCList[pvcKey]; !restored {
continue

Check warning on line 276 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L276

Added line #L276 was not covered by tests
}

pvWaitGroup.Add(1)
go func(volInfo internalVolume.VolumeInfo, restoredNamespace string) {
defer pvWaitGroup.Done()

semaphore <- struct{}{}

log := ctx.logger.WithField("PVC", volInfo.PVCName).WithField("PVCNamespace", restoredNamespace)
log.Debug("patching dynamic PV is in progress")

err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, PVPatchMaximumDuration, true, func(context.Context) (bool, error) {
// wait for PVC to be bound
pvc := &v1.PersistentVolumeClaim{}
err := ctx.crClient.Get(context.Background(), client.ObjectKey{Name: volInfo.PVCName, Namespace: restoredNamespace}, pvc)
if apierrors.IsNotFound(err) {
log.Debug("error not finding PVC")
return false, nil
}

Check warning on line 295 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L293-L295

Added lines #L293 - L295 were not covered by tests
if err != nil {
return false, err
}

Check warning on line 298 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L297-L298

Added lines #L297 - L298 were not covered by tests

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
log.Debugf("PVC: %s not ready", pvc.Name)
return false, nil
}

Check warning on line 303 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L301-L303

Added lines #L301 - L303 were not covered by tests

// wait for PV to be bound
pvName := pvc.Spec.VolumeName
pv := &v1.PersistentVolume{}
err = ctx.crClient.Get(context.Background(), client.ObjectKey{Name: pvName}, pv)
if apierrors.IsNotFound(err) {
log.Debugf("error not finding PV: %s", pvName)
return false, nil
}

Check warning on line 312 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L310-L312

Added lines #L310 - L312 were not covered by tests
if err != nil {
return false, err
}

Check warning on line 315 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L314-L315

Added lines #L314 - L315 were not covered by tests

if pv.Spec.ClaimRef == nil || pv.Status.Phase != v1.VolumeBound {
log.Debugf("PV: %s not ready", pvName)
return false, nil
}

Check warning on line 320 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L318-L320

Added lines #L318 - L320 were not covered by tests

// validate PV
if pv.Spec.ClaimRef.Name != pvc.Name || pv.Spec.ClaimRef.Namespace != restoredNamespace {
return false, fmt.Errorf("PV was bound by unexpected PVC, unexpected PVC: %s/%s, expected PVC: %s/%s",
pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name, restoredNamespace, pvc.Name)
}

// patch PV's reclaim policy and label using the corresponding data stored in volume info
if needPatch(pv, volInfo.PVInfo) {
updatedPV := pv.DeepCopy()
updatedPV.Labels = volInfo.PVInfo.Labels
updatedPV.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimPolicy(volInfo.PVInfo.ReclaimPolicy)
if err := kubeutil.PatchResource(pv, updatedPV, ctx.crClient); err != nil {
return false, err
}

Check warning on line 335 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L334-L335

Added lines #L334 - L335 were not covered by tests
log.Infof("newly dynamically provisioned PV:%s has been patched using volume info", pvName)
}

return true, nil
})

if err != nil {
err = fmt.Errorf("fail to patch dynamic PV, err: %s, PVC: %s, PV: %s", err, volInfo.PVCName, volInfo.PVName)
ctx.logger.WithError(errors.WithStack((err))).Error("err patching dynamic PV using volume info")
resultLock.Lock()
defer resultLock.Unlock()
errs.Add(restoredNamespace, err)
}

<-semaphore
}(*volumeItem, restoredNamespace)
}
}

pvWaitGroup.Wait()
ctx.logger.Info("patching newly dynamically provisioned PV ends")

return errs
}

func getRestoredPVCFromRestoredResourceList(restoredResourceList map[string][]string) map[string]struct{} {
pvcKey := "v1/PersistentVolumeClaim"
pvcList := make(map[string]struct{})

for _, pvc := range restoredResourceList[pvcKey] {
// the format of pvc string in restoredResourceList is like: "namespace/pvcName(status)"
// extract the substring before "(created)" if the status in rightmost Parenthesis is "created"
r := regexp.MustCompile(`\(([^)]+)\)`)
matches := r.FindAllStringSubmatch(pvc, -1)
if len(matches) > 0 && matches[len(matches)-1][1] == restore.ItemRestoreResultCreated {
pvcList[pvc[:len(pvc)-len("(created)")]] = struct{}{}
}
}

return pvcList
}

func needPatch(newPV *v1.PersistentVolume, pvInfo *internalVolume.PVInfo) bool {
if newPV.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimPolicy(pvInfo.ReclaimPolicy) {
return true
}

newPVLabels, pvLabels := newPV.Labels, pvInfo.Labels
for k, v := range pvLabels {
if _, ok := newPVLabels[k]; !ok {
return true
}

Check warning on line 387 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L386-L387

Added lines #L386 - L387 were not covered by tests
if newPVLabels[k] != v {
return true
}

Check warning on line 390 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L389-L390

Added lines #L389 - L390 were not covered by tests
}

return false
}
Loading

0 comments on commit 67b5e82

Please sign in to comment.