-
Notifications
You must be signed in to change notification settings - Fork 50
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: gaoyuan <[email protected]>
- Loading branch information
Showing
23 changed files
with
5,501 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
130 changes: 130 additions & 0 deletions
130
pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
package pod | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"reflect" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/klog/v2" | ||
controllerruntime "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/controller" | ||
"sigs.k8s.io/controller-runtime/pkg/event" | ||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||
"sigs.k8s.io/controller-runtime/pkg/predicate" | ||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
|
||
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" | ||
"github.com/kosmos.io/kosmos/pkg/utils" | ||
"github.com/kosmos.io/kosmos/pkg/utils/podutils" | ||
) | ||
|
||
const ( | ||
PodSyncControllerName = "pod-sync-controller" | ||
) | ||
|
||
// feature:sync rootpod status from leafpod status when rootpod status modified by others(not from leafpod) | ||
type RootPodSyncReconciler struct { | ||
RootClient client.Client | ||
GlobalLeafManager leafUtils.LeafResourceManager | ||
GlobalLeafClientManager leafUtils.LeafClientResourceManager | ||
} | ||
|
||
func (c *RootPodSyncReconciler) SetupWithManager(mgr manager.Manager) error { | ||
skipFunc := func(obj client.Object) bool { | ||
p := obj.(*corev1.Pod) | ||
return podutils.IsKosmosPod(p) | ||
} | ||
|
||
return controllerruntime.NewControllerManagedBy(mgr). | ||
Named(PodSyncControllerName). | ||
WithOptions(controller.Options{}). | ||
For(&corev1.Pod{}, builder.WithPredicates(predicate.Funcs{ | ||
CreateFunc: func(createEvent event.CreateEvent) bool { | ||
return false | ||
}, | ||
UpdateFunc: func(updateEvent event.UpdateEvent) bool { | ||
pod1 := updateEvent.ObjectOld.(*corev1.Pod) | ||
pod2 := updateEvent.ObjectNew.(*corev1.Pod) | ||
if !skipFunc(updateEvent.ObjectNew) { | ||
return false | ||
} | ||
return !reflect.DeepEqual(pod1.Status, pod2.Status) | ||
}, | ||
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { | ||
return false | ||
}, | ||
GenericFunc: func(genericEvent event.GenericEvent) bool { | ||
return false | ||
}, | ||
})). | ||
Complete(c) | ||
} | ||
|
||
func (c *RootPodSyncReconciler) leafClientResource(lr *leafUtils.LeafResource) (*leafUtils.LeafClientResource, error) { | ||
actualClusterName := leafUtils.GetActualClusterName(lr.Cluster) | ||
lcr, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) | ||
if err != nil { | ||
return nil, fmt.Errorf("get leaf client resource err: %v", err) | ||
} | ||
return lcr, nil | ||
} | ||
|
||
func (c *RootPodSyncReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { | ||
klog.V(4).Infof("============ %s starts to reconcile %s ============", PodSyncControllerName, request.Name) | ||
|
||
var rootpod corev1.Pod | ||
if err := c.RootClient.Get(ctx, request.NamespacedName, &rootpod); err != nil { | ||
if apierrors.IsNotFound(err) { | ||
klog.V(4).Infof("pods not found, %s", request.NamespacedName) | ||
return reconcile.Result{}, nil | ||
} | ||
klog.Errorf("get %s error: %v", request.NamespacedName, err) | ||
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil | ||
} | ||
|
||
if !podutils.IsKosmosPod(&rootpod) { | ||
klog.V(4).Info("Pod is not create by kosmos tree, ignore") | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
if !c.GlobalLeafManager.HasNode(rootpod.Spec.NodeName) { | ||
klog.Errorf("Failed to get leaf Node %s", rootpod.Spec.NodeName) | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
lr, err := c.GlobalLeafManager.GetLeafResourceByNodeName(rootpod.Spec.NodeName) | ||
if err != nil { | ||
klog.Errorf("Failed to get leaf client for %s", rootpod.Spec.NodeName) | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
lcr, err := c.leafClientResource(lr) | ||
if err != nil { | ||
klog.Errorf("Failed to get leaf client resource %v", lr.Cluster.Name) | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
leafPod := &corev1.Pod{} | ||
err = lcr.Client.Get(ctx, request.NamespacedName, leafPod) | ||
if err != nil { | ||
klog.Errorf("Failed to get leaf pod %v", leafPod.Name) | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
if podutils.IsKosmosPod(leafPod) && !reflect.DeepEqual(rootpod.Status, leafPod.Status) { | ||
rPodCopy := rootpod.DeepCopy() | ||
rPodCopy.Status = leafPod.Status | ||
podutils.FitObjectMeta(&rPodCopy.ObjectMeta) | ||
if err := c.RootClient.Status().Update(ctx, rPodCopy); err != nil && !apierrors.IsNotFound(err) { | ||
klog.Errorf("error while updating rootpod status in kubernetes, %s", err) | ||
return reconcile.Result{}, nil | ||
} | ||
klog.Infof("update rootpod %s status success", leafPod.Name) | ||
} | ||
klog.V(4).Infof("============ %s has been reconciled %s ============", PodSyncControllerName, request.Name) | ||
return reconcile.Result{}, nil | ||
} |
192 changes: 192 additions & 0 deletions
192
pkg/clustertree/cluster-manager/controllers/pod/rootpodstatus_sync_controller_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
package pod | ||
|
||
import ( | ||
"context" | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/types" | ||
"sigs.k8s.io/controller-runtime/pkg/client/fake" | ||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
|
||
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" | ||
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" | ||
) | ||
|
||
type MockLeafResourceManager struct { | ||
mock.Mock | ||
} | ||
|
||
func (m *MockLeafResourceManager) AddLeafResource(lr *leafUtils.LeafResource, nodes []*corev1.Node) { | ||
m.Called(lr, nodes) | ||
} | ||
|
||
func (m *MockLeafResourceManager) RemoveLeafResource(clusterName string) { | ||
m.Called(clusterName) | ||
} | ||
|
||
func (m *MockLeafResourceManager) GetLeafResource(clusterName string) (*leafUtils.LeafResource, error) { | ||
args := m.Called(clusterName) | ||
return args.Get(0).(*leafUtils.LeafResource), args.Error(1) | ||
} | ||
|
||
func (m *MockLeafResourceManager) GetLeafResourceByNodeName(nodeName string) (*leafUtils.LeafResource, error) { | ||
args := m.Called(nodeName) | ||
return args.Get(0).(*leafUtils.LeafResource), args.Error(1) | ||
} | ||
|
||
func (m *MockLeafResourceManager) HasCluster(clusterName string) bool { | ||
args := m.Called(clusterName) | ||
return args.Bool(0) | ||
} | ||
|
||
func (m *MockLeafResourceManager) HasNode(nodeName string) bool { | ||
args := m.Called(nodeName) | ||
return args.Bool(0) | ||
} | ||
|
||
func (m *MockLeafResourceManager) ListNodes() []string { | ||
args := m.Called() | ||
return args.Get(0).([]string) | ||
} | ||
|
||
func (m *MockLeafResourceManager) ListClusters() []string { | ||
args := m.Called() | ||
return args.Get(0).([]string) | ||
} | ||
|
||
func (m *MockLeafResourceManager) GetClusterNode(nodeName string) *leafUtils.ClusterNode { | ||
args := m.Called(nodeName) | ||
return args.Get(0).(*leafUtils.ClusterNode) | ||
} | ||
|
||
type MockLeafClientResourceManager struct { | ||
mock.Mock | ||
} | ||
|
||
func (m *MockLeafClientResourceManager) AddLeafClientResource(lcr *leafUtils.LeafClientResource, cluster *kosmosv1alpha1.Cluster) { | ||
m.Called(lcr, cluster) | ||
} | ||
|
||
func (m *MockLeafClientResourceManager) RemoveLeafClientResource(actualClusterName string) { | ||
m.Called(actualClusterName) | ||
} | ||
|
||
func (m *MockLeafClientResourceManager) GetLeafResource(actualClusterName string) (*leafUtils.LeafClientResource, error) { | ||
args := m.Called(actualClusterName) | ||
return args.Get(0).(*leafUtils.LeafClientResource), args.Error(1) | ||
} | ||
|
||
func (m *MockLeafClientResourceManager) ListActualClusters() []string { | ||
args := m.Called() | ||
return args.Get(0).([]string) | ||
} | ||
|
||
// TestReconcile verifies different scenarios in Reconcile function | ||
func TestReconcile(t *testing.T) { | ||
// Set up scheme | ||
scheme := runtime.NewScheme() | ||
_ = corev1.AddToScheme(scheme) | ||
|
||
// // Initialize mocks | ||
mockLeafManager := new(MockLeafResourceManager) | ||
mockLeafClientManager := new(MockLeafClientResourceManager) | ||
|
||
// Test data: root pod | ||
rootPod := &corev1.Pod{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "test-pod", | ||
Namespace: "test-ns", | ||
Labels: map[string]string{ | ||
"kosmos-io/pod": "true", | ||
}, | ||
}, | ||
Spec: corev1.PodSpec{ | ||
NodeName: "kosmos-leaf", | ||
}, | ||
Status: corev1.PodStatus{ | ||
Phase: corev1.PodPending, | ||
Conditions: []corev1.PodCondition{ | ||
{ | ||
Type: corev1.PodReady, | ||
Status: corev1.ConditionFalse, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
leafPod := &corev1.Pod{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "test-pod", | ||
Namespace: "test-ns", | ||
Labels: map[string]string{ | ||
"kosmos-io/pod": "true", | ||
}, | ||
}, | ||
Spec: corev1.PodSpec{ | ||
NodeName: "leaf-worker", | ||
}, | ||
Status: corev1.PodStatus{ | ||
Phase: corev1.PodRunning, | ||
Conditions: []corev1.PodCondition{ | ||
{ | ||
Type: corev1.PodReady, | ||
Status: corev1.ConditionTrue, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
// Fake Kubernetes client with root pod | ||
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(rootPod).Build() | ||
|
||
// Mock GlobalLeafManager behavior | ||
mockLeafManager.On("HasNode", "kosmos-leaf").Return(true) | ||
mockLeafManager.On("GetLeafResourceByNodeName", "kosmos-leaf").Return(&leafUtils.LeafResource{ | ||
Cluster: &kosmosv1alpha1.Cluster{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: "test-cluster", | ||
}, | ||
}, | ||
}, nil) | ||
|
||
// Mock GlobalLeafClientManager behavior | ||
mockLeafClientManager.On("GetLeafResource", "test-cluster").Return(&leafUtils.LeafClientResource{ | ||
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(leafPod).Build(), | ||
}, nil) | ||
|
||
// Initialize reconciler | ||
reconciler := &RootPodSyncReconciler{ | ||
RootClient: fakeClient, | ||
GlobalLeafManager: mockLeafManager, | ||
GlobalLeafClientManager: mockLeafClientManager, | ||
} | ||
|
||
// Reconcile request | ||
request := reconcile.Request{ | ||
NamespacedName: types.NamespacedName{ | ||
Name: "test-pod", | ||
Namespace: "test-ns", | ||
}, | ||
} | ||
|
||
// Execute Reconcile | ||
result, err := reconciler.Reconcile(context.TODO(), request) | ||
assert.NoError(t, err) | ||
assert.Equal(t, reconcile.Result{}, result) | ||
|
||
// Verify Root Pod status update | ||
updatedPod := &corev1.Pod{} | ||
err = fakeClient.Get(context.TODO(), request.NamespacedName, updatedPod) | ||
assert.NoError(t, err) | ||
assert.True(t, reflect.DeepEqual(updatedPod.Status, leafPod.Status)) | ||
|
||
// Assertions on mock calls | ||
mockLeafManager.AssertExpectations(t) | ||
mockLeafClientManager.AssertExpectations(t) | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Oops, something went wrong.