Skip to content

Commit

Permalink
Make data mover fail early
Browse files Browse the repository at this point in the history
Signed-off-by: Ming <[email protected]>
  • Loading branch information
qiuming-best committed Nov 28, 2023
1 parent 6ac7ff1 commit e060d55
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 40 deletions.
2 changes: 2 additions & 0 deletions changelogs/unreleased/7052-qiuming-best
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Make data mover fail early

5 changes: 5 additions & 0 deletions pkg/builder/pod_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,8 @@ func (b *PodBuilder) Phase(phase corev1api.PodPhase) *PodBuilder {
b.object.Status.Phase = phase
return b
}

func (b *PodBuilder) Status(status corev1api.PodStatus) *PodBuilder {
b.object.Status = status
return b
}
51 changes: 30 additions & 21 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,6 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != v1.PodRunning {
return false
}

if newObj.Spec.NodeName == "" {
return false
}
Expand All @@ -510,43 +506,56 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {

func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*v1.Pod)

dd, err := findDataDownloadByPod(r.client, *pod)

log := r.logger.WithField("pod", pod.Name)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
log.WithError(err).Error("unable to get DataDownload")
return []reconcile.Request{}
} else if dd == nil {
r.logger.WithField("Restore pod", pod.Name).Error("get empty DataDownload")
log.Error("get empty DataDownload")
return []reconcile.Request{}
}
log = log.WithFields(logrus.Fields{
"Dataddownload": dd.Name,
})

if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
return []reconcile.Request{}
}

requests := make([]reconcile.Request, 1)

r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)
if pod.Status.Phase == v1.PodRunning {
log.Info("Preparing data download")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload")
return []reconcile.Request{}
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable {
err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for %s", pod.Namespace, pod.Name, reason)
})

Check warning on line 540 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L532-L540

Added lines #L532 - L540 were not covered by tests

// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Datadownload": dd.Name,
"Restore pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload")
if err != nil {
log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Debug("Exposed pod is in abnormal status, and datadownload is marked as cancel")
} else {
log.Debug("Waiting for exposed pod running...")

Check warning on line 548 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L542-L548

Added lines #L542 - L548 were not covered by tests
return []reconcile.Request{}
}

requests[0] = reconcile.Request{
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: dd.Namespace,
Name: dd.Name,
},
}

return requests
return []reconcile.Request{request}
}

func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func TestFindDataDownloadForPod(t *testing.T) {
{
name: "find dataDownload for pod",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)
Expand Down
50 changes: 33 additions & 17 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,6 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != corev1.PodRunning {
return false
}

if newObj.Spec.NodeName != r.nodeName {
return false
}
Expand All @@ -547,37 +543,57 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*corev1.Pod)
du, err := findDataUploadByPod(r.client, *pod)
log := r.logger.WithFields(logrus.Fields{
"Backup pod": pod.Name,
})

if err != nil {
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload")
log.WithError(err).Error("unable to get dataupload")
return []reconcile.Request{}
} else if du == nil {
r.logger.WithField("Backup pod", pod.Name).Error("get empty DataUpload")
log.Error("get empty DataUpload")
return []reconcile.Request{}
}
log = log.WithFields(logrus.Fields{
"Datadupload": du.Name,
})

if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
return []reconcile.Request{}
}

r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Dataupload": du.Name,
"Backup pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload")
if pod.Status.Phase == corev1.PodRunning {
log.Info("Preparing dataupload")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
log.WithField("updated", updated).WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload")
return []reconcile.Request{}
}
} else if unrecoverable, reason := kube.IsPodUnrecoverable(pod, log); unrecoverable { // let the abnormal backup pod failed early
err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status for reason %s", pod.Namespace, pod.Name, reason)
})

Check warning on line 578 in pkg/controller/data_upload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_upload_controller.go#L570-L578

Added lines #L570 - L578 were not covered by tests

if err != nil {
log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Debug("Exposed pod is in abnormal status and dataupload is marked as cancel")
} else {
log.Debug("Waiting for exposed pod running...")

Check warning on line 586 in pkg/controller/data_upload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_upload_controller.go#L580-L586

Added lines #L580 - L586 were not covered by tests
return []reconcile.Request{}
}

requests := reconcile.Request{
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: du.Namespace,
Name: du.Name,
},
}
return []reconcile.Request{requests}
return []reconcile.Request{request}
}

func (r *DataUploadReconciler) FindDataUploadsByPod(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func TestFindDataUploadForPod(t *testing.T) {
{
name: "find dataUpload for pod",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kube

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -110,3 +111,23 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface

return nil
}

// IsPodUnrecoverable checks if the pod is in an abnormal state and could not be recovered
// It could not cover all the cases but we could add more cases in the future
func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, string) {
// Check the Phase field
if pod.Status.Phase == corev1api.PodFailed || pod.Status.Phase == corev1api.PodUnknown {
log.Warnf("Pod is in abnormal state %s", pod.Status.Phase)
return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase)
}

// Check the Status field
for _, containerStatus := range pod.Status.ContainerStatuses {
// If the container's image state is ImagePullBackOff, it indicates an image pull failure
if containerStatus.State.Waiting != nil && (containerStatus.State.Waiting.Reason == "ImagePullBackOff" || containerStatus.State.Waiting.Reason == "ErrImageNeverPull") {
log.Warnf("Container %s in Pod %s/%s is in pull image failed with reason %s", containerStatus.Name, pod.Namespace, pod.Name, containerStatus.State.Waiting.Reason)
return true, fmt.Sprintf("Container %s in Pod %s/%s is in pull image failed with reason %s", containerStatus.Name, pod.Namespace, pod.Name, containerStatus.State.Waiting.Reason)
}
}
return false, ""
}
79 changes: 79 additions & 0 deletions pkg/util/kube/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,82 @@ func TestDeletePodIfAny(t *testing.T) {
})
}
}

func TestIsPodUnrecoverable(t *testing.T) {
tests := []struct {
name string
pod *corev1api.Pod
want bool
}{
{
name: "pod is in failed state",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodFailed,
},
},
want: true,
},
{
name: "pod is in unknown state",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodUnknown,
},
},
want: true,
},
{
name: "container image pull failure",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}},
},
},
},
want: true,
},
{
name: "container image pull failure with different reason",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ErrImageNeverPull"}}},
},
},
},
want: true,
},
{
name: "container image pull failure with different reason",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "OtherReason"}}},
},
},
},
want: false,
},
{
name: "pod is normal",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodRunning,
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true, State: corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}},
},
},
},
want: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, _ := IsPodUnrecoverable(test.pod, velerotest.NewLogger())
assert.Equal(t, test.want, got)
})
}
}

0 comments on commit e060d55

Please sign in to comment.