diff --git a/pkg/clustertree/cluster-manager/controllers/pod_sync_controller.go b/pkg/clustertree/cluster-manager/controllers/pod_sync_controller.go index 95e9bec0e..e995836d3 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod_sync_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod_sync_controller.go @@ -2,10 +2,17 @@ package controllers import ( "context" + "fmt" + "reflect" + "sync" - //"github.com/pkg/errors" + pkgerror "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -19,6 +26,7 @@ import ( leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/kubenest/util" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" ) const ( @@ -52,7 +60,7 @@ func (c *PodSyncController) SetupWithManager(mgr manager.Manager) error { old := updateEvent.ObjectOld.(*corev1.Node) currReady := util.IsNodeReady(curr.Status.Conditions) oldReady := util.IsNodeReady(old.Status.Conditions) - return currReady != oldReady + return currReady == oldReady }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false @@ -64,76 +72,76 @@ func (c *PodSyncController) SetupWithManager(mgr manager.Manager) error { Complete(c) } -// func (c *PodSyncController) syncPodStatus(ctx context.Context) error { -// err := c.updatePodStatus(ctx) -// if err != nil { -// klog.Errorf(err.Error()) -// return err -// } -// return nil -// } +func (c *PodSyncController) syncPodStatus(ctx context.Context) error { + err := c.updatePodStatus(ctx) + if err != nil { + klog.Errorf(err.Error()) + return err + } + return nil +} -// func (c *PodSyncController) updatePodStatus(ctx context.Context) error { -// Selector := labels.SelectorFromSet( -// map[string]string{ -// utils.KosmosPodLabel: "true", -// }) -// pods, err := c.leafClient.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ -// LabelSelector: Selector.String(), -// }) -// if err != nil { -// klog.Errorf("Could not list pods in leaf cluster,Error: %v", err) -// return err -// } - -// var wg sync.WaitGroup -// errChan := make(chan error, len(pods.Items)) - -// for _, leafpod := range pods.Items { -// wg.Add(1) -// go func(leafpod corev1.Pod) { -// defer wg.Done() -// err := retry.RetryOnConflict(retry.DefaultRetry, func() error { -// rootpod := &corev1.Pod{} -// if err := c.root.Get(ctx, types.NamespacedName{Name: leafpod.Name, Namespace: leafpod.Namespace}, rootpod); err != nil { -// if apierrors.IsNotFound(err) { -// klog.Warningf("Pod %s in namespace %s not found in root cluster, skipping...", leafpod.Name, leafpod.Namespace) -// return nil -// } -// return err -// } -// if podutils.IsKosmosPod(rootpod) && !reflect.DeepEqual(rootpod.Status, leafpod.Status) { -// rPodCopy := rootpod.DeepCopy() -// rPodCopy.Status = leafpod.Status -// podutils.FitObjectMeta(&rPodCopy.ObjectMeta) -// if err := c.root.Status().Update(ctx, rPodCopy); err != nil && !apierrors.IsNotFound(err) { -// klog.V(4).Info(errors.Wrap(err, "error while updating pod status in kubernetes")) -// return err -// } -// } -// return nil -// }) -// if err != nil { -// errChan <- fmt.Errorf("failed to update pod %s/%s, error: %v", leafpod.Namespace, leafpod.Name, err) -// } -// }(leafpod) -// } -// wg.Wait() -// close(errChan) - -// var taskErr error -// for err := range errChan { -// if taskErr == nil { -// taskErr = err -// } else { -// taskErr = errors.Wrap(err, taskErr.Error()) -// } -// } -// if taskErr != nil { -// return taskErr -// } -// return nil -// } +func (c *PodSyncController) updatePodStatus(ctx context.Context) error { + Selector := labels.SelectorFromSet( + map[string]string{ + utils.KosmosPodLabel: "true", + }) + pods, err := c.leafClient.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ + LabelSelector: Selector.String(), + }) + if err != nil { + klog.Errorf("Could not list pods in leaf cluster,Error: %v", err) + return err + } + + var wg sync.WaitGroup + errChan := make(chan error, len(pods.Items)) + + for _, leafpod := range pods.Items { + wg.Add(1) + go func(leafpod corev1.Pod) { + defer wg.Done() + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + rootpod := &corev1.Pod{} + if err := c.root.Get(ctx, types.NamespacedName{Name: leafpod.Name, Namespace: leafpod.Namespace}, rootpod); err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("Pod %s in namespace %s not found in root cluster, skipping...", leafpod.Name, leafpod.Namespace) + return nil + } + return err + } + if podutils.IsKosmosPod(rootpod) && !reflect.DeepEqual(rootpod.Status, leafpod.Status) { + rPodCopy := rootpod.DeepCopy() + rPodCopy.Status = leafpod.Status + podutils.FitObjectMeta(&rPodCopy.ObjectMeta) + if err := c.root.Status().Update(ctx, rPodCopy); err != nil && !apierrors.IsNotFound(err) { + klog.V(4).Info(errors.Wrap(err, "error while updating pod status in kubernetes")) + return err + } + } + return nil + }) + if err != nil { + errChan <- fmt.Errorf("failed to update pod %s/%s, error: %v", leafpod.Namespace, leafpod.Name, err) + } + }(leafpod) + } + wg.Wait() + close(errChan) + + var taskErr error + for err := range errChan { + if taskErr == nil { + taskErr = err + } else { + taskErr = pkgerror.Wrap(err, taskErr.Error()) + } + } + if taskErr != nil { + return taskErr + } + return nil +} func (c *PodSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { klog.V(4).Infof("============ %s starts to reconcile %s ============", PodSyncControllerName, request.Name)