diff --git a/pkg/controller/pod_controller.go b/pkg/controller/pod_controller.go index 3141f9400a..846f8c8476 100644 --- a/pkg/controller/pod_controller.go +++ b/pkg/controller/pod_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "fmt" "time" corev1 "k8s.io/api/core/v1" @@ -71,8 +72,8 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request } // get app pod list - var pv *corev1.PersistentVolume - var pvcNamespace string + relatedPVs := []*corev1.PersistentVolume{} + pvcNamespaces := []string{} pvs, err := m.K8sClient.ListPersistentVolumes(ctx, nil, nil) if err != nil { klog.Errorf("doReconcile ListPV: %v", err) @@ -84,16 +85,21 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request continue } if uniqueId != "" && (p.Spec.CSI.VolumeHandle == uniqueId || p.Spec.StorageClassName == uniqueId) { - pv = &p - break + relatedPVs = append(relatedPVs, &p) } } - if pv != nil { - if pv.Spec.ClaimRef != nil { - pvcNamespace = pv.Spec.ClaimRef.Namespace + if len(relatedPVs) == 0 { + return reconcile.Result{}, fmt.Errorf("can not get pv by uniqueId %s, mount pod: %s", uniqueId, mountPod.Name) + } + for _, pv := range relatedPVs { + if pv != nil { + if pv.Spec.ClaimRef != nil { + pvcNamespaces = append(pvcNamespaces, pv.Spec.ClaimRef.Namespace) + } } } - if pvcNamespace == "" { + + if len(pvcNamespaces) == 0 { klog.Errorf("can not get pvc based on mount pod %s/%s: %v", mountPod.Namespace, mountPod.Name, err) return reconcile.Result{}, err } @@ -104,10 +110,14 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request Operator: metav1.LabelSelectorOpExists, }}, } - podList, err := m.K8sClient.ListPod(ctx, pvcNamespace, &labelSelector, nil) - if err != nil { - klog.Errorf("doReconcile ListPod: %v", err) - return reconcile.Result{}, err + podLists := []corev1.Pod{} + for _, pvcNamespace := range pvcNamespaces { + podList, err := m.K8sClient.ListPod(ctx, pvcNamespace, &labelSelector, nil) + if err != nil { + klog.Errorf("doReconcile ListPod: %v", err) + return reconcile.Result{}, err + } + podLists = append(podLists, podList...) } mounter := mount.SafeFormatAndMount{ @@ -117,7 +127,7 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request podDriver := NewPodDriver(m.K8sClient, mounter) podDriver.SetMountInfo(*mit) - podDriver.mit.setPodsStatus(&corev1.PodList{Items: podList}) + podDriver.mit.setPodsStatus(&corev1.PodList{Items: podLists}) err = podDriver.Run(ctx, mountPod) if err != nil { @@ -141,7 +151,10 @@ func (m *PodController) Reconcile(ctx context.Context, request reconcile.Request RequeueAfter: requeueAfter, }, nil } - return reconcile.Result{}, nil + return reconcile.Result{ + Requeue: true, + RequeueAfter: 10 * time.Minute, + }, nil } func (m *PodController) SetupWithManager(mgr ctrl.Manager) error { diff --git a/pkg/controller/pod_driver.go b/pkg/controller/pod_driver.go index 7c8f1aa947..fc98494f4d 100644 --- a/pkg/controller/pod_driver.go +++ b/pkg/controller/pod_driver.go @@ -130,9 +130,11 @@ func (p *PodDriver) checkAnnotations(ctx context.Context, pod *corev1.Pod) error var existTargets int for k, target := range pod.Annotations { if k == util.GetReferenceKey(target) { - _, exists := p.mit.deletedPods[getPodUid(target)] + targetUid := getPodUid(target) + _, exists := p.mit.deletedPods[targetUid] if !exists { // only it is not in pod lists can be seen as deleted // target pod is deleted + klog.V(5).Infof("[PodDriver] get app pod %s deleted in annotations of mount pod, remove its ref.", targetUid) delAnnotations = append(delAnnotations, k) continue } diff --git a/pkg/controller/reconciler.go b/pkg/controller/reconciler.go index 9384e2ccc5..e9e0b31226 100644 --- a/pkg/controller/reconciler.go +++ b/pkg/controller/reconciler.go @@ -70,9 +70,14 @@ func StartReconciler() error { return nil } +type PodStatus struct { + podStatus + syncAt time.Time +} + func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) { backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod) - lastPodStatus := make(map[string]podStatus) + lastPodStatus := make(map[string]PodStatus) for { ctx := context.TODO() timeoutCtx, cancel := context.WithTimeout(context.Background(), config.ReconcileTimeout) @@ -98,12 +103,17 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) { if value, ok := pod.Labels[config.PodTypeKey]; !ok || value != config.PodTypeValue { continue } - podStatus := getPodStatus(pod) - if lastStatus, ok := lastPodStatus[pod.Name]; ok && lastStatus == podStatus { - // skipped - continue + crtPodStatus := getPodStatus(pod) + if lastStatus, ok := lastPodStatus[pod.Name]; ok { + if lastStatus.podStatus == crtPodStatus && time.Now().Before(lastStatus.syncAt.Add(10*time.Minute)) { + // skipped + continue + } + } + lastPodStatus[pod.Name] = PodStatus{ + podStatus: crtPodStatus, + syncAt: time.Now(), } - lastPodStatus[pod.Name] = podStatus backOffID := fmt.Sprintf("mountpod/%s", pod.Name) g.Go(func() error {