Skip to content

Commit

Permalink
add:globalnode mod
Browse files Browse the repository at this point in the history
Signed-off-by: luoyuanze <[email protected]>
  • Loading branch information
lyzuiui committed Dec 30, 2024
1 parent 97c791a commit 24bd431
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package globalnodecontroller

import (
"context"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -11,6 +10,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"reflect"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -55,6 +55,31 @@ func compareMaps(map1, map2 map[string]string) bool {
return true
}

// CustomPredicateForGlobalNode 用于 GlobalNode 资源的事件过滤
var CustomPredicateForGlobalNode = predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return true
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldObj, okOld := updateEvent.ObjectOld.(*v1alpha1.GlobalNode)
newObj, okNew := updateEvent.ObjectNew.(*v1alpha1.GlobalNode)

if !okOld || !okNew {
return true
}

specChanged := !reflect.DeepEqual(oldObj.Spec, newObj.Spec)

return specChanged
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
GenericFunc: func(e event.GenericEvent) bool {
return true
},
}

func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error {
if r.Client == nil {
r.Client = mgr.GetClient()
Expand All @@ -70,6 +95,7 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error {
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldObj := updateEvent.ObjectOld.(*v1.Node)
newObj := updateEvent.ObjectNew.(*v1.Node)

return !compareMaps(oldObj.Labels, newObj.Labels)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
Expand All @@ -81,12 +107,13 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error {
})).
Watches(&source.Kind{Type: &v1alpha1.GlobalNode{}}, handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request {
gn := a.(*v1alpha1.GlobalNode)

return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: gn.Name,
}},
}
})).
}), builder.WithPredicates(CustomPredicateForGlobalNode)).
// Watches(&source.Kind{Type: &v1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.newNodeMapFunc())).
Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(r.newVirtualClusterMapFunc())).
Complete(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package globalnodecontroller

import (
"context"
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -20,13 +22,21 @@ const (
GlobalNodeStatusControllerName = "global-node-status-controller"
DefaultStatusUpdateInterval = 15 * time.Second
ClientHeartbeatThreshold = 10 * time.Second
nodeUpdateWorkerSize = 8
RequiredNotReadyCount = 5
)

type nodeHealthData struct {
notReadyCount int
lastProbeTime time.Time
}

type GlobalNodeStatusController struct {
root client.Client
statusInterval time.Duration

kosmosClient versioned.Interface
kosmosClient versioned.Interface
nodeHealthMap sync.Map // map[string]*nodeHealthData
}

func NewGlobalNodeStatusController(
Expand All @@ -37,6 +47,7 @@ func NewGlobalNodeStatusController(
root: root,
statusInterval: DefaultStatusUpdateInterval,
kosmosClient: kosmosClient,
nodeHealthMap: sync.Map{},
}
}
func (c *GlobalNodeStatusController) Start(ctx context.Context) error {
Expand All @@ -46,9 +57,8 @@ func (c *GlobalNodeStatusController) Start(ctx context.Context) error {
return nil
}
func (c *GlobalNodeStatusController) syncGlobalNodeStatus(ctx context.Context) {

globalNodes := make([]*v1alpha1.GlobalNode, 0)
//c.globalNodeLock.Lock()
//defer c.globalNodeLock.Unlock()

nodeList, err := c.kosmosClient.KosmosV1alpha1().GlobalNodes().List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -70,14 +80,23 @@ func (c *GlobalNodeStatusController) updateGlobalNodeStatus(
ctx context.Context,
globalNodes []*v1alpha1.GlobalNode,
) error {
for _, globalNode := range globalNodes {
err := c.updateStatusForGlobalNode(ctx, globalNode)
if err != nil {
klog.Errorf("Failed to update status for global node %s: %v", globalNode.Name, err)
return err
errChan := make(chan error, len(globalNodes))

workqueue.ParallelizeUntil(ctx, nodeUpdateWorkerSize, len(globalNodes), func(piece int) {
node := globalNodes[piece]
if err := c.updateStatusForGlobalNode(ctx, node); err != nil {
klog.Errorf("Failed to update status for global node %s: %v", node.Name, err)
errChan <- err
}
})

close(errChan)

var retErr error
for err := range errChan {
retErr = err
}
return nil
return retErr
}

func (c *GlobalNodeStatusController) updateStatusForGlobalNode(
Expand Down Expand Up @@ -105,6 +124,25 @@ func (c *GlobalNodeStatusController) updateStatusForGlobalNode(
statusType = "NotReady"
}

dataRaw, _ := c.nodeHealthMap.LoadOrStore(globalNode.Name, &nodeHealthData{})
nh := dataRaw.(*nodeHealthData)

if statusType == "NotReady" {
nh.notReadyCount++
if condition.Type == v1.NodeConditionType("Ready") {
klog.V(2).Infof("GlobalNode %s: notReadyCount=%d, newStatus=%s", globalNode.Name, nh.notReadyCount, statusType)
}
} else {
nh.notReadyCount = 0
}

if nh.notReadyCount > 0 && nh.notReadyCount < RequiredNotReadyCount {

c.nodeHealthMap.Store(globalNode.Name, nh)
return nil
}

//update
if string(condition.Type) != statusType {
condition.Type = v1.NodeConditionType(statusType)
condition.LastTransitionTime = metav1.NewTime(time.Now())
Expand All @@ -122,10 +160,9 @@ func (c *GlobalNodeStatusController) updateStatusForGlobalNode(
}

klog.Infof("Successfully updated status for GlobalNode %s to %s", globalNode.Name, statusType)
} else {
klog.Infof("No status update required for GlobalNode %s, current status: %s", globalNode.Name, condition.Type)
nh.notReadyCount = 0
c.nodeHealthMap.Store(globalNode.Name, nh)
}

return nil
})
}

0 comments on commit 24bd431

Please sign in to comment.