Skip to content

Commit

Permalink
fix: get pod by pv volumeHandle (#801)
Browse files Browse the repository at this point in the history
* fix: get pod by pv volumeHandle

* add error handling when mount pod cannot find pv

---------

Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
zwwhdls authored Nov 24, 2023
1 parent 65cd7a8 commit e6d6413
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 21 deletions.
41 changes: 27 additions & 14 deletions pkg/controller/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -71,8 +72,8 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request
}

// get app pod list
var pv *corev1.PersistentVolume
var pvcNamespace string
relatedPVs := []*corev1.PersistentVolume{}
pvcNamespaces := []string{}
pvs, err := m.K8sClient.ListPersistentVolumes(ctx, nil, nil)
if err != nil {
klog.Errorf("doReconcile ListPV: %v", err)
Expand All @@ -84,16 +85,21 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request
continue
}
if uniqueId != "" && (p.Spec.CSI.VolumeHandle == uniqueId || p.Spec.StorageClassName == uniqueId) {
pv = &p
break
relatedPVs = append(relatedPVs, &p)
}
}
if pv != nil {
if pv.Spec.ClaimRef != nil {
pvcNamespace = pv.Spec.ClaimRef.Namespace
if len(relatedPVs) == 0 {
return reconcile.Result{}, fmt.Errorf("can not get pv by uniqueId %s, mount pod: %s", uniqueId, mountPod.Name)
}
for _, pv := range relatedPVs {
if pv != nil {
if pv.Spec.ClaimRef != nil {
pvcNamespaces = append(pvcNamespaces, pv.Spec.ClaimRef.Namespace)
}
}
}
if pvcNamespace == "" {

if len(pvcNamespaces) == 0 {
klog.Errorf("can not get pvc based on mount pod %s/%s: %v", mountPod.Namespace, mountPod.Name, err)
return reconcile.Result{}, err
}
Expand All @@ -104,10 +110,14 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request
Operator: metav1.LabelSelectorOpExists,
}},
}
podList, err := m.K8sClient.ListPod(ctx, pvcNamespace, &labelSelector, nil)
if err != nil {
klog.Errorf("doReconcile ListPod: %v", err)
return reconcile.Result{}, err
podLists := []corev1.Pod{}
for _, pvcNamespace := range pvcNamespaces {
podList, err := m.K8sClient.ListPod(ctx, pvcNamespace, &labelSelector, nil)
if err != nil {
klog.Errorf("doReconcile ListPod: %v", err)
return reconcile.Result{}, err
}
podLists = append(podLists, podList...)
}

mounter := mount.SafeFormatAndMount{
Expand All @@ -117,7 +127,7 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request

podDriver := NewPodDriver(m.K8sClient, mounter)
podDriver.SetMountInfo(*mit)
podDriver.mit.setPodsStatus(&corev1.PodList{Items: podList})
podDriver.mit.setPodsStatus(&corev1.PodList{Items: podLists})

err = podDriver.Run(ctx, mountPod)
if err != nil {
Expand All @@ -141,7 +151,10 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request
RequeueAfter: requeueAfter,
}, nil
}
return reconcile.Result{}, nil
return reconcile.Result{
Requeue: true,
RequeueAfter: 10 * time.Minute,
}, nil
}

func (m *PodController) SetupWithManager(mgr ctrl.Manager) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/pod_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ func (p *PodDriver) checkAnnotations(ctx context.Context, pod *corev1.Pod) error
var existTargets int
for k, target := range pod.Annotations {
if k == util.GetReferenceKey(target) {
_, exists := p.mit.deletedPods[getPodUid(target)]
targetUid := getPodUid(target)
_, exists := p.mit.deletedPods[targetUid]
if !exists { // only it is not in pod lists can be seen as deleted
// target pod is deleted
klog.V(5).Infof("[PodDriver] get app pod %s deleted in annotations of mount pod, remove its ref.", targetUid)
delAnnotations = append(delAnnotations, k)
continue
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ func StartReconciler() error {
return nil
}

type PodStatus struct {
podStatus
syncAt time.Time
}

func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
lastPodStatus := make(map[string]podStatus)
lastPodStatus := make(map[string]PodStatus)
for {
ctx := context.TODO()
timeoutCtx, cancel := context.WithTimeout(context.Background(), config.ReconcileTimeout)
Expand All @@ -98,12 +103,17 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
if value, ok := pod.Labels[config.PodTypeKey]; !ok || value != config.PodTypeValue {
continue
}
podStatus := getPodStatus(pod)
if lastStatus, ok := lastPodStatus[pod.Name]; ok && lastStatus == podStatus {
// skipped
continue
crtPodStatus := getPodStatus(pod)
if lastStatus, ok := lastPodStatus[pod.Name]; ok {
if lastStatus.podStatus == crtPodStatus && time.Now().Before(lastStatus.syncAt.Add(10*time.Minute)) {
// skipped
continue
}
}
lastPodStatus[pod.Name] = PodStatus{
podStatus: crtPodStatus,
syncAt: time.Now(),
}
lastPodStatus[pod.Name] = podStatus

backOffID := fmt.Sprintf("mountpod/%s", pod.Name)
g.Go(func() error {
Expand Down

0 comments on commit e6d6413

Please sign in to comment.