From 0acd15958d057577909ee737cdbab33787e1e7cd Mon Sep 17 00:00:00 2001 From: gaoyuan Date: Thu, 19 Dec 2024 17:57:31 +0800 Subject: [PATCH] sync root podstatus from leaf --- .../cluster-manager/app/manager.go | 10 + .../pod/rootpodstatus_sync_controller.go | 124 +++++++++++ .../pod/rootpodstatus_sync_controller_test.go | 192 ++++++++++++++++++ 3 files changed, 326 insertions(+) create mode 100644 pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller.go create mode 100644 pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller_test.go diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index 0e7f4651f..1e0307bb9 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -21,6 +21,7 @@ import ( clusterManager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod" podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pv" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pvc" @@ -247,6 +248,15 @@ func run(ctx context.Context, opts *options.Options) error { return fmt.Errorf("error starting root pv controller %v", err) } + RootPodSyncReconciler := pod.RootPodSyncReconciler{ + RootClient: mgr.GetClient(), + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, + } + if err := RootPodSyncReconciler.SetupWithManager(mgr); err != nil { + return fmt.Errorf("error starting root podsync controller %v", err) + } + if len(os.Getenv("USE-ONEWAY-STORAGE")) > 0 { onewayPVController := pv.OnewayPVController{ Root: mgr.GetClient(), diff --git a/pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller.go new file mode 100644 index 000000000..67b1dc498 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller.go @@ -0,0 +1,124 @@ +package pod + +import ( + "context" + "fmt" + "reflect" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" + "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/podutils" +) + +const ( + PodSyncControllerName = "pod-sync-controller" +) + +// feature:sync rootpod status from leafpod status when rootpod status modified by others(not from leafpod) +type RootPodSyncReconciler struct { + RootClient client.Client + GlobalLeafManager leafUtils.LeafResourceManager + GlobalLeafClientManager leafUtils.LeafClientResourceManager +} + +func (c *RootPodSyncReconciler) SetupWithManager(mgr manager.Manager) error { + skipFunc := func(obj client.Object) bool { + p := obj.(*corev1.Pod) + return podutils.IsKosmosPod(p) + } + + return controllerruntime.NewControllerManagedBy(mgr). + Named(PodSyncControllerName). + WithOptions(controller.Options{}). + For(&corev1.Pod{}, builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return false + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + old := updateEvent.ObjectOld.(*corev1.Pod) + curr := updateEvent.ObjectNew.(*corev1.Pod) + if !skipFunc(updateEvent.ObjectNew) { + return false + } + return !reflect.DeepEqual(old.Status, curr.Status) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return false + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + })). + Complete(c) +} + +func (c *RootPodSyncReconciler) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { + actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) + lcr, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} + +func (c *RootPodSyncReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + klog.V(4).Infof("============ %s starts to reconcile %s ============", PodSyncControllerName, request.Name) + + var rootpod corev1.Pod + if err := c.RootClient.Get(ctx, request.NamespacedName, &rootpod); err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("pods not found, %s", request.NamespacedName) + return reconcile.Result{}, nil + } + klog.Errorf("get %s error: %v", request.NamespacedName, err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + //get LeafResource + lr, err := c.GlobalLeafManager.GetLeafResourceByNodeName(rootpod.Spec.NodeName) + if err != nil { + klog.Errorf("Failed to get leaf client for %s", rootpod.Spec.NodeName) + return reconcile.Result{}, nil + } + + lcr, err := c.leafClientResource(lr) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) + return reconcile.Result{}, nil + } + + leafPod := &corev1.Pod{} + err = lcr.Client.Get(ctx, request.NamespacedName, leafPod) + if err != nil { + klog.Errorf("Failed to get leaf pod %v", leafPod.Name) + return reconcile.Result{}, nil + } + + if podutils.IsKosmosPod(leafPod) && !reflect.DeepEqual(rootpod.Status, leafPod.Status) { + rPodCopy := rootpod.DeepCopy() + rPodCopy.Status = leafPod.Status + podutils.FitObjectMeta(&rPodCopy.ObjectMeta) + if err := c.RootClient.Status().Update(ctx, rPodCopy); err != nil { + if apierrors.IsNotFound(err) { + klog.Errorf("rootpod %s not found while updating status", rootpod.Name) + } else { + klog.Errorf("error while updating rootpod status in kubernetes, %s", err) + } + return reconcile.Result{}, nil + } + klog.Infof("update rootpod %s status success", leafPod.Name) + } + klog.V(4).Infof("============ %s has been reconciled %s ============", PodSyncControllerName, request.Name) + return reconcile.Result{}, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller_test.go b/pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller_test.go new file mode 100644 index 000000000..767d348a0 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller_test.go @@ -0,0 +1,192 @@ +package pod + +import ( + "context" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" +) + +type MockLeafResourceManager struct { + mock.Mock +} + +func (m *MockLeafResourceManager) AddLeafResource(lr *leafUtils.LeafResource, nodes []*corev1.Node) { + m.Called(lr, nodes) +} + +func (m *MockLeafResourceManager) RemoveLeafResource(clusterName string) { + m.Called(clusterName) +} + +func (m *MockLeafResourceManager) GetLeafResource(clusterName string) (*leafUtils.LeafResource, error) { + args := m.Called(clusterName) + return args.Get(0).(*leafUtils.LeafResource), args.Error(1) +} + +func (m *MockLeafResourceManager) GetLeafResourceByNodeName(nodeName string) (*leafUtils.LeafResource, error) { + args := m.Called(nodeName) + return args.Get(0).(*leafUtils.LeafResource), args.Error(1) +} + +func (m *MockLeafResourceManager) HasCluster(clusterName string) bool { + args := m.Called(clusterName) + return args.Bool(0) +} + +func (m *MockLeafResourceManager) HasNode(nodeName string) bool { + args := m.Called(nodeName) + return args.Bool(0) +} + +func (m *MockLeafResourceManager) ListNodes() []string { + args := m.Called() + return args.Get(0).([]string) +} + +func (m *MockLeafResourceManager) ListClusters() []string { + args := m.Called() + return args.Get(0).([]string) +} + +func (m *MockLeafResourceManager) GetClusterNode(nodeName string) *leafUtils.ClusterNode { + args := m.Called(nodeName) + return args.Get(0).(*leafUtils.ClusterNode) +} + +type MockLeafClientResourceManager struct { + mock.Mock +} + +func (m *MockLeafClientResourceManager) AddLeafClientResource(lcr *leafUtils.LeafClientResource, cluster *kosmosv1alpha1.Cluster) { + m.Called(lcr, cluster) +} + +func (m *MockLeafClientResourceManager) RemoveLeafClientResource(actualClusterName string) { + m.Called(actualClusterName) +} + +func (m *MockLeafClientResourceManager) GetLeafResource(actualClusterName string) (*leafUtils.LeafClientResource, error) { + args := m.Called(actualClusterName) + return args.Get(0).(*leafUtils.LeafClientResource), args.Error(1) +} + +func (m *MockLeafClientResourceManager) ListActualClusters() []string { + args := m.Called() + return args.Get(0).([]string) +} + +// TestReconcile verifies different scenarios in Reconcile function +func TestReconcile(t *testing.T) { + // Set up scheme + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + + // // Initialize mocks + mockLeafManager := new(MockLeafResourceManager) + mockLeafClientManager := new(MockLeafClientResourceManager) + + // Test data: root pod + rootPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-ns", + Labels: map[string]string{ + "kosmos-io/pod": "true", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "kosmos-leaf", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + } + + leafPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-ns", + Labels: map[string]string{ + "kosmos-io/pod": "true", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "leaf-worker", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + // Fake Kubernetes client with root pod + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(rootPod).Build() + + // Mock GlobalLeafManager behavior + mockLeafManager.On("HasNode", "kosmos-leaf").Return(true) + mockLeafManager.On("GetLeafResourceByNodeName", "kosmos-leaf").Return(&leafUtils.LeafResource{ + Cluster: &kosmosv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + }, + }, + }, nil) + + // Mock GlobalLeafClientManager behavior + mockLeafClientManager.On("GetLeafResource", "test-cluster").Return(&leafUtils.LeafClientResource{ + Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(leafPod).Build(), + }, nil) + + // Initialize reconciler + reconciler := &RootPodSyncReconciler{ + RootClient: fakeClient, + GlobalLeafManager: mockLeafManager, + GlobalLeafClientManager: mockLeafClientManager, + } + + // Reconcile request + request := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test-pod", + Namespace: "test-ns", + }, + } + + // Execute Reconcile + result, err := reconciler.Reconcile(context.TODO(), request) + assert.NoError(t, err) + assert.Equal(t, reconcile.Result{}, result) + + // Verify Root Pod status update + updatedPod := &corev1.Pod{} + err = fakeClient.Get(context.TODO(), request.NamespacedName, updatedPod) + assert.NoError(t, err) + assert.True(t, reflect.DeepEqual(updatedPod.Status, leafPod.Status)) + + // Assertions on mock calls + mockLeafManager.AssertExpectations(t) + mockLeafClientManager.AssertExpectations(t) +}