Skip to content

Commit

Permalink
feature: support one-to-many model for the leaf node feature
Browse files Browse the repository at this point in the history
Signed-off-by: qiuming520 <[email protected]>
  • Loading branch information
qiuming520 committed Dec 7, 2023
1 parent 66f5d46 commit a76c36c
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 279 deletions.
20 changes: 11 additions & 9 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, fmt.Errorf("new manager with err %v, cluster %s", err, cluster.Name)
}

leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.Root, mgr.GetClient(), c.RootClientset, leafClient)
leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.RootClientset, leafClient)
c.LeafModelHandler = leafModelHandler

nodes, err := c.createNode(ctx, cluster, leafClient)
nodes, leafNodeSelectors, err := c.createNode(ctx, cluster, leafClient)
if err != nil {
return reconcile.Result{RequeueAfter: RequeueTime}, fmt.Errorf("create node with err %v, cluster %s", err, cluster.Name)
}
Expand All @@ -206,7 +206,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
c.ManagerCancelFuncs[cluster.Name] = &cancel
c.ControllerManagersLock.Unlock()

if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafClient, kosmosClient, config); err != nil {
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
}

Expand Down Expand Up @@ -240,6 +240,7 @@ func (c *ClusterController) setupControllers(
cluster *kosmosv1alpha1.Cluster,
nodes []*corev1.Node,
clientDynamic *dynamic.DynamicClient,
leafNodeSelector map[string]kosmosv1alpha1.NodeSelector,
leafClientset kubernetes.Interface,
kosmosClient kosmosversioned.Interface,
leafRestConfig *rest.Config) error {
Expand All @@ -262,14 +263,15 @@ func (c *ClusterController) setupControllers(
Root: c.Root,
RootClientset: c.RootClientset,
Nodes: nodes,
LeafNodeSelectors: leafNodeSelector,
LeafModelHandler: c.LeafModelHandler,
Cluster: cluster,
}
if err := nodeResourcesController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", controllers.NodeResourcesControllerName, err)
}

nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, c.RootClientset, c.LeafModelHandler)
nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, leafNodeSelector, c.RootClientset, c.LeafModelHandler)
if err := mgr.Add(nodeLeaseController); err != nil {
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
}
Expand Down Expand Up @@ -333,19 +335,19 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, isOne2O
return nil
}

func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, error) {
func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) {
serverVersion, err := leafClient.Discovery().ServerVersion()
if err != nil {
klog.Errorf("create node failed, can not connect to leaf %s", cluster.Name)
return nil, err
return nil, nil, err
}

nodes, err := c.LeafModelHandler.CreateNodeInRoot(ctx, cluster, c.Options.ListenPort, serverVersion.GitVersion)
nodes, leafNodeSelectors, err := c.LeafModelHandler.CreateRootNode(ctx, c.Options.ListenPort, serverVersion.GitVersion)
if err != nil {
klog.Errorf("create node for cluster %s failed, err: %v", cluster.Name, err)
return nil, err
return nil, nil, err
}
return nodes, nil
return nodes, leafNodeSelectors, nil
}

func (c *ClusterController) deleteNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"

kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
)

Expand All @@ -38,19 +39,21 @@ type NodeLeaseController struct {
leaseInterval time.Duration
statusInterval time.Duration

nodes []*corev1.Node
nodeLock sync.Mutex
nodes []*corev1.Node
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
nodeLock sync.Mutex
}

func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
c := &NodeLeaseController{
leafClient: leafClient,
rootClient: rootClient,
root: root,
nodes: nodes,
LeafModelHandler: LeafModelHandler,
leaseInterval: getRenewInterval(),
statusInterval: DefaultNodeStatusUpdateInterval,
leafClient: leafClient,
rootClient: rootClient,
root: root,
nodes: nodes,
LeafModelHandler: LeafModelHandler,
LeafNodeSelectors: LeafNodeSelectors,
leaseInterval: getRenewInterval(),
statusInterval: DefaultNodeStatusUpdateInterval,
}
return c
}
Expand All @@ -71,15 +74,15 @@ func (c *NodeLeaseController) syncNodeStatus(ctx context.Context) {
}
c.nodeLock.Unlock()

err := c.updateNodeStatus(ctx, nodes)
err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors)
if err != nil {
klog.Errorf(err.Error())
}
}

// nolint
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node) error {
err := c.LeafModelHandler.UpdateNodeStatus(ctx, n)
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error {
err := c.LeafModelHandler.UpdateRootNodeStatus(ctx, n, leafNodeSelector)
if err != nil {
klog.Errorf("Could not update node status in root cluster,Error: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type NodeResourcesController struct {
GlobalLeafManager leafUtils.LeafResourceManager
RootClientset kubernetes.Interface

Nodes []*corev1.Node
LeafModelHandler leafUtils.LeafModelHandler
Cluster *kosmosv1alpha1.Cluster
EventRecorder record.EventRecorder
Nodes []*corev1.Node
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
LeafModelHandler leafUtils.LeafModelHandler
Cluster *kosmosv1alpha1.Cluster
EventRecorder record.EventRecorder
}

var predicatesFunc = predicate.Funcs{
Expand Down Expand Up @@ -110,15 +111,15 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
}, fmt.Errorf("cannot get node while update nodeInRoot resources %s, err: %v", rootNode.Name, err)
}

nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode)
nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
if err != nil {
klog.Errorf("Could not get node in leaf cluster %s,Error: %v", c.Cluster.Name, err)
return controllerruntime.Result{
RequeueAfter: RequeueTime,
}, err
}

pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode)
pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
if err != nil {
klog.Errorf("Could not list pod in leaf cluster %s,Error: %v", c.Cluster.Name, err)
return controllerruntime.Result{
Expand All @@ -130,7 +131,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
clone.Status.Conditions = utils.NodeConditions()

// Node2Node mode should sync leaf node's labels and annotations to root nodeInRoot
if c.LeafModelHandler.GetLeafModelType() == leafUtils.DispersionModel {
if c.LeafModelHandler.GetLeafMode() == leafUtils.Node {
getNode := func(nodes *corev1.NodeList) *corev1.Node {
for _, nodeInLeaf := range nodes.Items {
if nodeInLeaf.Name == rootNode.Name {
Expand All @@ -156,7 +157,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
}
}
}

// TODO ggregation Labels and Annotations for classificationModel
clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods)
clone.Status.Allocatable = clusterResources
clone.Status.Capacity = clusterResources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
"github.com/kosmos.io/kosmos/pkg/utils"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
// create pod in leaf
if err != nil {
if errors.IsNotFound(err) {
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod); err != nil {
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
klog.Errorf("create pod inleaf error, err: %s", err)
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
} else {
Expand All @@ -212,7 +213,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req

// update pod in leaf
if podutils.ShouldEnqueue(leafPod, &rootpod) {
if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod); err != nil {
if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
}
}
Expand Down Expand Up @@ -700,7 +701,7 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea
return nil
}

func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) error {
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil {
// span.SetStatus(err)
return err
Expand All @@ -711,7 +712,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
return fmt.Errorf("clusternode info is nil , name: %s", pod.Spec.NodeName)
}

basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode == leafUtils.ALL)
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
klog.V(4).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name)

// create ns
Expand Down Expand Up @@ -765,24 +766,28 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
return nil
}

func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootpod *corev1.Pod, leafpod *corev1.Pod) error {
func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootPod *corev1.Pod, leafPod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
// TODO: update env
// TODO: update config secret pv pvc ...
klog.V(4).Infof("Updating pod %v/%+v", rootpod.Namespace, rootpod.Name)
klog.V(4).Infof("Updating pod %v/%+v", rootPod.Namespace, rootPod.Name)

if !podutils.IsKosmosPod(leafpod) {
if !podutils.IsKosmosPod(leafPod) {
klog.V(4).Info("Pod is not created by kosmos tree, ignore")
return nil
}
// not used
podutils.FitLabels(leafpod.ObjectMeta.Labels, lr.IgnoreLabels)
podCopy := leafpod.DeepCopy()
podutils.FitLabels(leafPod.ObjectMeta.Labels, lr.IgnoreLabels)
podCopy := leafPod.DeepCopy()
// util.GetUpdatedPod update PodCopy container image, annotations, labels.
// recover toleration, affinity, tripped ignore labels.
podutils.GetUpdatedPod(podCopy, rootpod, lr.IgnoreLabels)
if reflect.DeepEqual(leafpod.Spec, podCopy.Spec) &&
reflect.DeepEqual(leafpod.Annotations, podCopy.Annotations) &&
reflect.DeepEqual(leafpod.Labels, podCopy.Labels) {
clusterNodeInfo := r.GlobalLeafManager.GetClusterNode(rootPod.Spec.NodeName)
if clusterNodeInfo == nil {
return fmt.Errorf("clusternode info is nil , name: %s", rootPod.Spec.NodeName)
}
podutils.GetUpdatedPod(podCopy, rootPod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
if reflect.DeepEqual(leafPod.Spec, podCopy.Spec) &&
reflect.DeepEqual(leafPod.Annotations, podCopy.Annotations) &&
reflect.DeepEqual(leafPod.Labels, podCopy.Labels) {
return nil
}

Expand All @@ -798,7 +803,7 @@ func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leaf
if err != nil {
return fmt.Errorf("could not update pod: %v", err)
}
klog.V(4).Infof("Update pod %v/%+v success ", rootpod.Namespace, rootpod.Name)
klog.V(4).Infof("Update pod %v/%+v success ", rootPod.Namespace, rootPod.Name)
return nil
}

Expand Down
Loading

0 comments on commit a76c36c

Please sign in to comment.