Skip to content

Commit

Permalink
Make data mover fail early
Browse files Browse the repository at this point in the history
Signed-off-by: Ming <[email protected]>
  • Loading branch information
qiuming-best committed Nov 6, 2023
1 parent 03e582c commit 841cfd0
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 40 deletions.
2 changes: 2 additions & 0 deletions changelogs/unreleased/7052-qiuming-best
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Make data mover fail early

5 changes: 5 additions & 0 deletions pkg/builder/pod_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@ func (b *PodBuilder) ContainerStatuses(containerStatuses ...*corev1api.Container
}
return b
}

func (b *PodBuilder) Status(status corev1api.PodStatus) *PodBuilder {
b.object.Status = status
return b
}
51 changes: 30 additions & 21 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,6 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != v1.PodRunning {
return false
}

if newObj.Spec.NodeName == "" {
return false
}
Expand All @@ -513,43 +509,56 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {

func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*v1.Pod)

dd, err := findDataDownloadByPod(r.client, *pod)

log := r.logger.WithField("pod", pod.Name)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload")
log.WithError(err).Error("unable to get DataDownload")
return []reconcile.Request{}
} else if dd == nil {
r.logger.WithField("Restore pod", pod.Name).Error("get empty DataDownload")
log.Error("get empty DataDownload")
return []reconcile.Request{}
}
log = log.WithFields(logrus.Fields{
"Dataddownload": dd.Name,
})

if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
return []reconcile.Request{}
}

requests := make([]reconcile.Request, 1)

r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)
if pod.Status.Phase == v1.PodRunning {
log.Info("Preparing data download")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
log.WithField("updated", updated).WithError(err).Warn("failed to update datadownload, prepare will halt for this datadownload")
return []reconcile.Request{}
}
} else if kube.IsPodUnrecoverable(pod, log) {
err := UpdateDataDownloadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownlad", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("datadownload mark as cancel to failed early for exposing pod %s/%s is in abnormal status", pod.Namespace, pod.Name)
})

Check warning on line 543 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L535-L543

Added lines #L535 - L543 were not covered by tests

// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Datadownload": dd.Name,
"Restore pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload")
if err != nil {
log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Debug("Exposed pod is in abnormal status, and datadownload is marked as cancel")
} else {
log.Debug("Waiting for exposed pod running...")

Check warning on line 551 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L545-L551

Added lines #L545 - L551 were not covered by tests
return []reconcile.Request{}
}

requests[0] = reconcile.Request{
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: dd.Namespace,
Name: dd.Name,
},
}

return requests
return []reconcile.Request{request}
}

func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func TestFindDataDownloadForPod(t *testing.T) {
{
name: "find dataDownload for pod",
du: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataDownloadName).Labels(map[string]string{velerov1api.DataDownloadLabel: dataDownloadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
checkFunc: func(du *velerov2alpha1api.DataDownload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)
Expand Down
50 changes: 33 additions & 17 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,6 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != corev1.PodRunning {
return false
}

if newObj.Spec.NodeName != r.nodeName {
return false
}
Expand All @@ -548,37 +544,57 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reconcile.Request {
pod := podObj.(*corev1.Pod)
du, err := findDataUploadByPod(r.client, *pod)
log := r.logger.WithFields(logrus.Fields{
"Backup pod": pod.Name,
})

if err != nil {
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("unable to get dataupload")
log.WithError(err).Error("unable to get dataupload")
return []reconcile.Request{}
} else if du == nil {
r.logger.WithField("Backup pod", pod.Name).Error("get empty DataUpload")
log.Error("get empty DataUpload")
return []reconcile.Request{}
}
log = log.WithFields(logrus.Fields{
"Datadupload": du.Name,
})

if du.Status.Phase != velerov2alpha1api.DataUploadPhaseAccepted {
return []reconcile.Request{}
}

r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Dataupload": du.Name,
"Backup pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload")
if pod.Status.Phase == corev1.PodRunning {
log.Info("Preparing dataupload")
// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
log.WithField("updated", updated).WithError(err).Warn("failed to update dataupload, prepare will halt for this dataupload")
return []reconcile.Request{}
}
} else if kube.IsPodUnrecoverable(pod, log) { // let the abnormal backup pod failed early
err := UpdateDataUploadWithRetry(context.Background(), r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("dataupload mark as cancel to failed early for exposing pod %s/%s is in abnormal status", pod.Namespace, pod.Name)
})

Check warning on line 579 in pkg/controller/data_upload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_upload_controller.go#L571-L579

Added lines #L571 - L579 were not covered by tests

if err != nil {
log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Debug("Exposed pod is in abnormal status and dataupload is marked as cancel")
} else {
log.Debug("Waiting for exposed pod running...")

Check warning on line 587 in pkg/controller/data_upload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_upload_controller.go#L581-L587

Added lines #L581 - L587 were not covered by tests
return []reconcile.Request{}
}

requests := reconcile.Request{
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: du.Namespace,
Name: du.Name,
},
}
return []reconcile.Request{requests}
return []reconcile.Request{request}
}

func (r *DataUploadReconciler) FindDataUploads(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataUpload, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func TestFindDataUploadForPod(t *testing.T) {
{
name: "find dataUpload for pod",
du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}).Status(corev1.PodStatus{Phase: corev1.PodRunning}).Result(),
checkFunc: func(du *velerov2alpha1api.DataUpload, requests []reconcile.Request) {
// Assert that the function returns a single request
assert.Len(t, requests, 1)
Expand Down
34 changes: 34 additions & 0 deletions pkg/util/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,37 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface

return nil
}

// IsPodUnrecoverable checks if the pod is in an abnormal state and could not be recovered
// It could not cover all the cases but we could add more cases in the future
func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) bool {
// Check the Phase field
if pod.Status.Phase == corev1api.PodFailed || pod.Status.Phase == corev1api.PodUnknown {
log.Debugf("Pod is in abnormal state %s", pod.Status.Phase)
return true
} else if pod.Status.Phase == corev1api.PodPending {
// Check the conditions for Pending reason to see if it's unschedulable
for _, condition := range pod.Status.Conditions {
if condition.Reason == corev1api.PodReasonUnschedulable {
log.Debug("Pod is in unschedulable state")
return true
}
}
}

// Check the Status field
for _, containerStatus := range pod.Status.ContainerStatuses {
// Check for container status not ready
if !containerStatus.Ready && containerStatus.State.Running != nil { // Readiness probe failure
log.Debugf("Container %s in Pod %s/%s is not ready", containerStatus.Name, pod.Namespace, pod.Name)
return true
}

// If the container's image state is ImagePullBackOff, it indicates an image pull failure
if containerStatus.State.Waiting != nil && (containerStatus.State.Waiting.Reason == "ImagePullBackOff" || containerStatus.State.Waiting.Reason == "ErrImageNeverPull") {
log.Debugf("Container %s in Pod %s/%s is in ImagePullBackOff state", containerStatus.Name, pod.Namespace, pod.Name)
return true
}
}
return false
}
133 changes: 133 additions & 0 deletions pkg/util/kube/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,136 @@ func TestDeletePodIfAny(t *testing.T) {
})
}
}

func TestIsPodUnrecoverable(t *testing.T) {
tests := []struct {
name string
pod *corev1api.Pod
want bool
}{
{
name: "pod is in failed state",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodFailed,
},
},
want: true,
},
{
name: "pod is in unknown state",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodUnknown,
},
},
want: true,
},
{
name: "pod is in pending state but schedulable",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodPending,
},
},
want: false,
},
{
name: "pod is unschedulable",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodPending,
Conditions: []corev1api.PodCondition{
{Reason: corev1api.PodReasonUnschedulable},
},
},
},
want: true,
},
{
name: "container is ready and running",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true, State: corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}},
},
},
},
want: false,
},
{
name: "container is not ready but running",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: false, State: corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}},
},
},
},
want: true,
},
{
name: "container is not ready and not running",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: false, State: corev1api.ContainerState{}},
},
},
},
want: false,
},
{
name: "container image pull failure",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}},
},
},
},
want: true,
},
{
name: "container image pull failure with different reason",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ErrImageNeverPull"}}},
},
},
},
want: true,
},
{
name: "container image pull failure with different reason",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "OtherReason"}}},
},
},
},
want: false,
},
{
name: "pod is normal",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
Phase: corev1api.PodRunning,
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true, State: corev1api.ContainerState{Running: &corev1api.ContainerStateRunning{}}},
},
},
},
want: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := IsPodUnrecoverable(test.pod, velerotest.NewLogger())
assert.Equal(t, test.want, got)
})
}
}

0 comments on commit 841cfd0

Please sign in to comment.