Skip to content

Commit

Permalink
issue 7036: fail early by peek expose
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Feb 18, 2024
1 parent 9a907a2 commit 920f88b
Show file tree
Hide file tree
Showing 12 changed files with 325 additions and 16 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7437-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issue #7036. Add the implementation of node selection for data mover backups
8 changes: 6 additions & 2 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.TryCancelDataDownload(ctx, dd)
r.TryCancelDataDownload(ctx, dd, "")

Check warning on line 218 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L218

Added line #L218 was not covered by tests
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, dd)
Expand Down Expand Up @@ -419,7 +422,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
}
}

func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Async fs backup data path canceled")

Expand All @@ -429,6 +432,7 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
dataDownload.Status.Message = message
})

if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func TestDataDownloadReconcile(t *testing.T) {
needCreateFSBR bool
isExposeErr bool
isGetExposeErr bool
isPeekExposeErr bool
isNilExposer bool
isFSBRInitErr bool
isFSBRRestoreErr bool
Expand Down Expand Up @@ -302,6 +303,12 @@ func TestDataDownloadReconcile(t *testing.T) {
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(),
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(),
},
{
name: "peek error",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
isPeekExposeErr: true,
expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseCanceled).Result(),
},
{
name: "dataDownload with enabled cancel",
dd: func() *velerov2alpha1api.DataDownload {
Expand Down Expand Up @@ -369,7 +376,7 @@ func TestDataDownloadReconcile(t *testing.T) {
return fsBR
}

if test.isExposeErr || test.isGetExposeErr || test.isNilExposer || test.notNilExpose {
if test.isExposeErr || test.isGetExposeErr || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose {
if test.isNilExposer {
r.restoreExposer = nil
} else {
Expand All @@ -383,6 +390,8 @@ func TestDataDownloadReconcile(t *testing.T) {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: hostingPod, VolumeName: "test-pvc"}}, nil)
} else if test.isGetExposeErr {
ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer"))
} else if test.isPeekExposeErr {
ep.On("PeekExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("fake-peek-error"))
}

if !test.notMockCleanUp {
Expand Down Expand Up @@ -802,7 +811,7 @@ func TestTryCancelDataDownload(t *testing.T) {
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)

r.TryCancelDataDownload(ctx, test.dd)
r.TryCancelDataDownload(ctx, test.dd, "")

if test.expectedErr == "" {
assert.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action
// we could retry when the CR requeue in periodcally
log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase)
r.TryCancelDataUpload(ctx, du)
r.TryCancelDataUpload(ctx, du, "")

Check warning on line 231 in pkg/controller/data_upload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_upload_controller.go#L231

Added line #L231 was not covered by tests
} else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil {
r.TryCancelDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr))
log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr)
} else if du.Status.StartTimestamp != nil {
if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, du)
Expand Down Expand Up @@ -444,7 +447,7 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
}

// TryCancelDataUpload clear up resources only when update success
func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) {
func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) {
log := r.logger.WithField("dataupload", du.Name)
log.Warn("Async fs backup data path canceled")
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) {
Expand All @@ -453,6 +456,7 @@ func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *vele
dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
dataUpload.Status.Message = message
})

if err != nil {
Expand Down
17 changes: 15 additions & 2 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func dataUploadBuilder() *builder.DataUploadBuilder {
type fakeSnapshotExposer struct {
kubeClient kbclient.Client
clock clock.WithTickerAndDelayedExecution
peekErr error
}

func (f *fakeSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, param interface{}) error {
Expand Down Expand Up @@ -283,6 +284,10 @@ func (f *fakeSnapshotExposer) GetExposed(ctx context.Context, du corev1.ObjectRe
return &exposer.ExposeResult{ByPod: exposer.ExposeByPod{HostingPod: pod, VolumeName: dataUploadName}}, nil
}

func (f *fakeSnapshotExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error {
return f.peekErr
}

func (f *fakeSnapshotExposer) CleanUp(context.Context, corev1.ObjectReference, string, string) {
}

Expand Down Expand Up @@ -330,6 +335,7 @@ func TestReconcile(t *testing.T) {
expectedRequeue ctrl.Result
expectedErrMsg string
needErrs []bool
peekErr error
}{
{
name: "Dataupload is not initialized",
Expand Down Expand Up @@ -420,6 +426,13 @@ func TestReconcile(t *testing.T) {
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(),
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
},
{
name: "peek error",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Result(),
peekErr: errors.New("fake-peek-error"),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceled).Result(),
},
{
name: "Dataupload with enabled cancel",
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
Expand Down Expand Up @@ -486,7 +499,7 @@ func TestReconcile(t *testing.T) {
}

if test.du.Spec.SnapshotType == fakeSnapshotType {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock}}
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{fakeSnapshotType: &fakeSnapshotExposer{r.client, r.Clock, test.peekErr}}
} else if test.du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())}
}
Expand Down Expand Up @@ -866,7 +879,7 @@ func TestTryCancelDataUpload(t *testing.T) {
err = r.client.Create(ctx, test.dd)
require.NoError(t, err)

r.TryCancelDataUpload(ctx, test.dd)
r.TryCancelDataUpload(ctx, test.dd, "")

if test.expectedErr == "" {
assert.NoError(t, err)
Expand Down
24 changes: 24 additions & 0 deletions pkg/exposer/csi_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,30 @@ func (e *csiSnapshotExposer) GetExposed(ctx context.Context, ownerObject corev1.
return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, VolumeName: volumeName}}, nil
}

func (e *csiSnapshotExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error {
backupPodName := ownerObject.Name

curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
})

pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, backupPodName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}

if err != nil {
curLog.WithError(err).Warnf("error to peek backup pod %s", backupPodName)
return nil
}

Check warning on line 277 in pkg/exposer/csi_snapshot.go

View check run for this annotation

Codecov / codecov/patch

pkg/exposer/csi_snapshot.go#L275-L277

Added lines #L275 - L277 were not covered by tests

if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed {
return errors.New(message)
}

return nil
}

func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference, vsName string, sourceNamespace string) {
backupPodName := ownerObject.Name
backupPVCName := ownerObject.Name
Expand Down
97 changes: 97 additions & 0 deletions pkg/exposer/csi_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,3 +621,100 @@ func TestGetExpose(t *testing.T) {
})
}
}

func TestPeekExpose(t *testing.T) {
backup := &velerov1.Backup{
TypeMeta: metav1.TypeMeta{
APIVersion: velerov1.SchemeGroupVersion.String(),
Kind: "Backup",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: velerov1.DefaultNamespace,
Name: "fake-backup",
UID: "fake-uid",
},
}

backupPodUrecoverable := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: backup.Name,
},
Status: corev1.PodStatus{
Phase: corev1.PodPending,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodScheduled,
Reason: "Unschedulable",
Message: "unrecoverable",
},
},
},
}

backupPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: backup.Name,
},
}

scheme := runtime.NewScheme()
corev1.AddToScheme(scheme)

tests := []struct {
name string
kubeClientObj []runtime.Object
ownerBackup *velerov1.Backup
err string
}{
{
name: "backup pod is not found",
ownerBackup: backup,
},
{
name: "pod is unrecoverable",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
backupPodUrecoverable,
},
err: "Pod is unschedulable: unrecoverable",
},
{
name: "succeed",
ownerBackup: backup,
kubeClientObj: []runtime.Object{
backupPod,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)

exposer := csiSnapshotExposer{
kubeClient: fakeKubeClient,
log: velerotest.NewLogger(),
}

var ownerObject corev1.ObjectReference
if test.ownerBackup != nil {
ownerObject = corev1.ObjectReference{
Kind: test.ownerBackup.Kind,
Namespace: test.ownerBackup.Namespace,
Name: test.ownerBackup.Name,
UID: test.ownerBackup.UID,
APIVersion: test.ownerBackup.APIVersion,
}
}

err := exposer.PeekExposed(context.Background(), ownerObject)
if test.err == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, test.err)
}
})
}
}
29 changes: 29 additions & 0 deletions pkg/exposer/generic_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type GenericRestoreExposer interface {
// Otherwise, it returns nil as the expose result without an error.
GetExposed(context.Context, corev1.ObjectReference, client.Client, string, time.Duration) (*ExposeResult, error)

// PeekExposed tests the status of the expose.
// If the expose is incomplete but not recoverable, it returns an error.
// Otherwise, it returns nil immediately.
PeekExposed(context.Context, corev1.ObjectReference) error

// RebindVolume unexposes the restored PV and rebind it to the target PVC
RebindVolume(context.Context, corev1.ObjectReference, string, string, time.Duration) error

Expand Down Expand Up @@ -160,6 +165,30 @@ func (e *genericRestoreExposer) GetExposed(ctx context.Context, ownerObject core
return &ExposeResult{ByPod: ExposeByPod{HostingPod: pod, VolumeName: volumeName}}, nil
}

func (e *genericRestoreExposer) PeekExposed(ctx context.Context, ownerObject corev1.ObjectReference) error {
restorePodName := ownerObject.Name

curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
})

pod, err := e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Get(ctx, restorePodName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}

Check warning on line 178 in pkg/exposer/generic_restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/exposer/generic_restore.go#L168-L178

Added lines #L168 - L178 were not covered by tests

if err != nil {
curLog.WithError(err).Warnf("error to peek restore pod %s", restorePodName)
return nil
}

Check warning on line 183 in pkg/exposer/generic_restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/exposer/generic_restore.go#L180-L183

Added lines #L180 - L183 were not covered by tests

if podFailed, message := kube.IsPodUnrecoverable(pod, curLog); podFailed {
return errors.New(message)
}

Check warning on line 187 in pkg/exposer/generic_restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/exposer/generic_restore.go#L185-L187

Added lines #L185 - L187 were not covered by tests

return nil

Check warning on line 189 in pkg/exposer/generic_restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/exposer/generic_restore.go#L189

Added line #L189 was not covered by tests
}

func (e *genericRestoreExposer) CleanUp(ctx context.Context, ownerObject corev1.ObjectReference) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name
Expand Down
Loading

0 comments on commit 920f88b

Please sign in to comment.