Skip to content

Commit

Permalink
Merge pull request #770 from gao12312/pod_snyc
Browse files Browse the repository at this point in the history
sync podstatus from leaf to root when root node from ready to notready
  • Loading branch information
duanmengkk authored Dec 23, 2024
2 parents a2b1928 + f496127 commit fe3fc43
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
clusterManager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod"
podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pv"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pvc"
Expand Down Expand Up @@ -248,6 +249,15 @@ func run(ctx context.Context, opts *options.Options) error {
return fmt.Errorf("error starting root pv controller %v", err)
}

RootPodSyncReconciler := pod.RootPodSyncReconciler{
RootClient: mgr.GetClient(),
GlobalLeafManager: globalLeafResourceManager,
GlobalLeafClientManager: globalLeafClientManager,
}
if err := RootPodSyncReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting root podsync controller %v", err)
}

if len(os.Getenv("USE-ONEWAY-STORAGE")) > 0 {
onewayPVController := pv.OnewayPVController{
Root: mgr.GetClient(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package pod

import (
"context"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

const (
PodSyncControllerName = "pod-sync-controller"
)

// feature:sync rootpod status from leafpod status when rootpod status modified by others(not from leafpod)
type RootPodSyncReconciler struct {
RootClient client.Client
GlobalLeafManager leafUtils.LeafResourceManager
GlobalLeafClientManager leafUtils.LeafClientResourceManager
}

func (c *RootPodSyncReconciler) SetupWithManager(mgr manager.Manager) error {
skipFunc := func(obj client.Object) bool {
p := obj.(*corev1.Pod)
return podutils.IsKosmosPod(p)
}

return controllerruntime.NewControllerManagedBy(mgr).
Named(PodSyncControllerName).
WithOptions(controller.Options{}).
For(&corev1.Pod{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return false
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldPod := updateEvent.ObjectOld.(*corev1.Pod)
newPod := updateEvent.ObjectNew.(*corev1.Pod)
if !skipFunc(updateEvent.ObjectNew) {
return false
}
return !reflect.DeepEqual(oldPod.Status, newPod.Status)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return false
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
})).
Complete(c)
}

func (c *RootPodSyncReconciler) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) {
actualClusterName := leafUtils.GetActualClusterName(lr.Cluster)
lcr, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName)
if err != nil {
return nil, fmt.Errorf("get leaf client resource err: %v", err)
}
return lcr, nil
}

func (c *RootPodSyncReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("============ %s starts to reconcile %s ============", PodSyncControllerName, request.Name)

var rootpod corev1.Pod
if err := c.RootClient.Get(ctx, request.NamespacedName, &rootpod); err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("pods not found, %s", request.NamespacedName)
return reconcile.Result{}, nil
}
klog.Errorf("get %s error: %v", request.NamespacedName, err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
//get LeafResource
lr, err := c.GlobalLeafManager.GetLeafResourceByNodeName(rootpod.Spec.NodeName)
if err != nil {
klog.Errorf("Failed to get leaf client for %s", rootpod.Spec.NodeName)
return reconcile.Result{}, nil
}

lcr, err := c.leafClientResource(lr)
if err != nil {
klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name)
return reconcile.Result{}, nil
}

leafPod := &corev1.Pod{}
err = lcr.Client.Get(ctx, request.NamespacedName, leafPod)
if err != nil {
klog.Errorf("Failed to get leaf pod %v", leafPod.Name)
return reconcile.Result{}, nil
}

if podutils.IsKosmosPod(leafPod) && !reflect.DeepEqual(rootpod.Status, leafPod.Status) {
rPodCopy := rootpod.DeepCopy()
rPodCopy.Status = leafPod.Status
podutils.FitObjectMeta(&rPodCopy.ObjectMeta)
if err := c.RootClient.Status().Update(ctx, rPodCopy); err != nil {
klog.Errorf("error while updating rootpod status in kubernetes, %s", err)
return reconcile.Result{}, nil
}
klog.Infof("update rootpod %s status success", leafPod.Name)
}
klog.V(4).Infof("============ %s has been reconciled %s ============", PodSyncControllerName, request.Name)
return reconcile.Result{}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package pod

import (
"context"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
)

type MockLeafResourceManager struct {
mock.Mock
}

func (m *MockLeafResourceManager) AddLeafResource(lr *leafUtils.LeafResource, nodes []*corev1.Node) {
m.Called(lr, nodes)
}

func (m *MockLeafResourceManager) RemoveLeafResource(clusterName string) {
m.Called(clusterName)
}

func (m *MockLeafResourceManager) GetLeafResource(clusterName string) (*leafUtils.LeafResource, error) {
args := m.Called(clusterName)
return args.Get(0).(*leafUtils.LeafResource), args.Error(1)
}

func (m *MockLeafResourceManager) GetLeafResourceByNodeName(nodeName string) (*leafUtils.LeafResource, error) {
args := m.Called(nodeName)
return args.Get(0).(*leafUtils.LeafResource), args.Error(1)
}

func (m *MockLeafResourceManager) HasCluster(clusterName string) bool {
args := m.Called(clusterName)
return args.Bool(0)
}

func (m *MockLeafResourceManager) HasNode(nodeName string) bool {
args := m.Called(nodeName)
return args.Bool(0)
}

func (m *MockLeafResourceManager) ListNodes() []string {
args := m.Called()
return args.Get(0).([]string)
}

func (m *MockLeafResourceManager) ListClusters() []string {
args := m.Called()
return args.Get(0).([]string)
}

func (m *MockLeafResourceManager) GetClusterNode(nodeName string) *leafUtils.ClusterNode {
args := m.Called(nodeName)
return args.Get(0).(*leafUtils.ClusterNode)
}

type MockLeafClientResourceManager struct {
mock.Mock
}

func (m *MockLeafClientResourceManager) AddLeafClientResource(lcr *leafUtils.LeafClientResource, cluster *kosmosv1alpha1.Cluster) {
m.Called(lcr, cluster)
}

func (m *MockLeafClientResourceManager) RemoveLeafClientResource(actualClusterName string) {
m.Called(actualClusterName)
}

func (m *MockLeafClientResourceManager) GetLeafResource(actualClusterName string) (*leafUtils.LeafClientResource, error) {
args := m.Called(actualClusterName)
return args.Get(0).(*leafUtils.LeafClientResource), args.Error(1)
}

func (m *MockLeafClientResourceManager) ListActualClusters() []string {
args := m.Called()
return args.Get(0).([]string)
}

// TestReconcile verifies different scenarios in Reconcile function
func TestReconcile(t *testing.T) {
// Set up scheme
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)

// // Initialize mocks
mockLeafManager := new(MockLeafResourceManager)
mockLeafClientManager := new(MockLeafClientResourceManager)

// Test data: root pod
rootPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "test-ns",
Labels: map[string]string{
"kosmos-io/pod": "true",
},
},
Spec: corev1.PodSpec{
NodeName: "kosmos-leaf",
},
Status: corev1.PodStatus{
Phase: corev1.PodPending,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionFalse,
},
},
},
}

leafPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "test-ns",
Labels: map[string]string{
"kosmos-io/pod": "true",
},
},
Spec: corev1.PodSpec{
NodeName: "leaf-worker",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}

// Fake Kubernetes client with root pod
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(rootPod).Build()

// Mock GlobalLeafManager behavior
mockLeafManager.On("HasNode", "kosmos-leaf").Return(true)
mockLeafManager.On("GetLeafResourceByNodeName", "kosmos-leaf").Return(&leafUtils.LeafResource{
Cluster: &kosmosv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
},
},
}, nil)

// Mock GlobalLeafClientManager behavior
mockLeafClientManager.On("GetLeafResource", "test-cluster").Return(&leafUtils.LeafClientResource{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(leafPod).Build(),
}, nil)

// Initialize reconciler
reconciler := &RootPodSyncReconciler{
RootClient: fakeClient,
GlobalLeafManager: mockLeafManager,
GlobalLeafClientManager: mockLeafClientManager,
}

// Reconcile request
request := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: "test-pod",
Namespace: "test-ns",
},
}

// Execute Reconcile
result, err := reconciler.Reconcile(context.TODO(), request)
assert.NoError(t, err)
assert.Equal(t, reconcile.Result{}, result)

// Verify Root Pod status update
updatedPod := &corev1.Pod{}
err = fakeClient.Get(context.TODO(), request.NamespacedName, updatedPod)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(updatedPod.Status, leafPod.Status))
}

0 comments on commit fe3fc43

Please sign in to comment.