From 338c7d83160d4a023bc68d05c42be11c00813bd1 Mon Sep 17 00:00:00 2001 From: zhouhaoA1 Date: Thu, 25 Apr 2024 15:25:19 +0800 Subject: [PATCH] fix nodes assign func bug Signed-off-by: zhouhaoA1 --- .../virtualcluster_init_controller.go | 102 ++++++++++++------ 1 file changed, 72 insertions(+), 30 deletions(-) diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index e0f04a73d..e3ecc2860 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -124,15 +124,26 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re } case v1alpha1.Completed: //update request, check if promotepolicy nodes increase or decrease. - err := c.assignWorkNodes(updatedCluster) + // only 2 scenarios matched update request with status 'completed'. + // 1. node scale request, original status is 'completed'. 2. node scale process finished by NodeController, the controller changes status from 'updating' to 'completed' + policyChanged, err := c.checkPromotePoliciesChanged(updatedCluster) if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name) + klog.Errorf("Error check promote policies changed. err: %s", err.Error()) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error checkPromotePoliciesChanged virtualcluster %s", updatedCluster.Name) } - updatedCluster.Status.Phase = v1alpha1.Updating - err = c.Update(originalCluster, updatedCluster) - if err != nil { - klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + if !policyChanged { + return reconcile.Result{}, nil + } else { + err := c.assignWorkNodes(updatedCluster) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name) + } + updatedCluster.Status.Phase = v1alpha1.Updating + err = c.Update(originalCluster, updatedCluster) + if err != nil { + klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } } default: klog.Warningf("Skip virtualcluster %s reconcile status: %s", originalCluster.Name, originalCluster.Status.Phase) @@ -248,32 +259,40 @@ func (c *VirtualClusterInitController) assignWorkNodes(virtualCluster *v1alpha1. return nil } -// nodesChangeCalculate calculate nodes changed when update virtualcluster. -func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, globalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) { - var matched int32 = 0 - var nodesAssignedMatchedPolicy []v1alpha1.NodeInfo - var nodesAssignedUnMatched []v1alpha1.NodeInfo - nodesAssigned := virtualCluster.Spec.PromoteResources.NodeInfos - for _, nodeInfo := range nodesAssigned { - node, ok := util.FindGlobalNode(nodeInfo.NodeName, globalNodes) - if !ok { - return nodesAssigned, errors.Errorf("Node %s doesn't find in nodes pool", nodeInfo.NodeName) +func (c *VirtualClusterInitController) checkPromotePoliciesChanged(virtualCluster *v1alpha1.VirtualCluster) (bool, error) { + globalNodeList := &v1alpha1.GlobalNodeList{} + if err := c.Client.List(context.TODO(), globalNodeList); err != nil { + return false, fmt.Errorf("list global nodes: %w", err) + } + for _, policy := range virtualCluster.Spec.PromotePolicies { + nodesAssignedMatchedPolicy, err := getAssignedNodesByPolicy(virtualCluster, policy, globalNodeList.Items) + if err != nil { + return false, errors.Wrapf(err, "Parse assigned nodes by policy %s error", policy.LabelSelector.String()) } - if mapContains(node.Labels, policy.LabelSelector.MatchLabels) { - nodesAssignedMatchedPolicy = append(nodesAssignedMatchedPolicy, nodeInfo) - matched++ - } else { - nodesAssignedUnMatched = append(nodesAssignedUnMatched, nodeInfo) + if policy.NodeCount != int32(len(nodesAssignedMatchedPolicy)) { + klog.V(2).Infof("Promote policy node count changed from %d to %d", len(nodesAssignedMatchedPolicy), policy.NodeCount) + return true, nil } } - requestNodesChanged := policy.NodeCount - matched + return false, nil +} + +// nodesChangeCalculate calculate nodes changed when update virtualcluster. +func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, globalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) { + nodesAssigned, err := getAssignedNodesByPolicy(virtualCluster, policy, globalNodes) + if err != nil { + return nil, errors.Wrapf(err, "Parse assigned nodes by policy %s error", policy.LabelSelector.String()) + } + + requestNodesChanged := policy.NodeCount - int32(len(nodesAssigned)) if requestNodesChanged == 0 { klog.V(2).Infof("Nothing to do for policy %s", policy.LabelSelector.String()) return nodesAssigned, nil } else if requestNodesChanged > 0 { // nodes needs to be increased klog.V(2).Infof("Try allocate %d nodes for policy %s", requestNodesChanged, policy.LabelSelector.String()) var cnt int32 = 0 - for _, globalNode := range globalNodes { + var updatedGlobalNodes []*v1alpha1.GlobalNode + for i, globalNode := range globalNodes { if globalNode.Spec.State == v1alpha1.NodeFreeState && mapContains(globalNode.Spec.Labels, policy.LabelSelector.MatchLabels) { nodesAssigned = append(nodesAssigned, v1alpha1.NodeInfo{ NodeName: globalNode.Name, @@ -281,9 +300,11 @@ func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alp // 更新globalNode的状态为占用状态 updated := globalNode.DeepCopy() updated.Spec.State = v1alpha1.NodeInUse - if err := c.Client.Update(context.TODO(), updated); err != nil { - return nodesAssigned, errors.Wrapf(err, "Failed to update globalNode %s to InUse", globalNode.Name) + updated.Status = v1alpha1.GlobalNodeStatus{ + VirtualCluster: virtualCluster.Name, } + globalNodes[i] = *updated + updatedGlobalNodes = append(updatedGlobalNodes, updated) cnt++ } if cnt == requestNodesChanged { @@ -291,20 +312,41 @@ func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alp } } if cnt < requestNodesChanged { - return nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, matched) + return nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, cnt) + } + for _, updated := range updatedGlobalNodes { + klog.V(2).Infof("Assign node %s for virtualcluster %s policy %s", updated.Name, virtualCluster.GetName(), policy.LabelSelector.String()) + if err := c.Client.Update(context.TODO(), updated); err != nil { + return nil, errors.Wrapf(err, "Failed to update globalNode %s to InUse", updated.Name) + } } + } else { // nodes needs to decrease klog.V(2).Infof("Try decrease nodes %d for policy %s", -requestNodesChanged, policy.LabelSelector.String()) decrease := int(-requestNodesChanged) - if len(nodesAssignedMatchedPolicy) < decrease { - return nodesAssigned, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssignedMatchedPolicy)) + if len(nodesAssigned) < decrease { + return nil, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssigned)) } - nodesAssigned = append(nodesAssignedUnMatched, nodesAssignedMatchedPolicy[:(len(nodesAssignedMatchedPolicy)-decrease)]...) + nodesAssigned = nodesAssigned[:len(nodesAssigned)-decrease] // note: node pool will not be modified here. NodeController will modify it when node delete success } return nodesAssigned, nil } +func getAssignedNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, globalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) { + var nodesAssignedMatchedPolicy []v1alpha1.NodeInfo + for _, nodeInfo := range virtualCluster.Spec.PromoteResources.NodeInfos { + node, ok := util.FindGlobalNode(nodeInfo.NodeName, globalNodes) + if !ok { + return nil, errors.Errorf("Node %s doesn't find in nodes pool", nodeInfo.NodeName) + } + if mapContains(node.Labels, policy.LabelSelector.MatchLabels) { + nodesAssignedMatchedPolicy = append(nodesAssignedMatchedPolicy, nodeInfo) + } + } + return nodesAssignedMatchedPolicy, nil +} + func (c *VirtualClusterInitController) ensureAllPodsRunning(virtualCluster *v1alpha1.VirtualCluster, timeout time.Duration) error { secret, err := c.RootClientSet.CoreV1().Secrets(virtualCluster.GetNamespace()).Get(context.TODO(), fmt.Sprintf("%s-%s", virtualCluster.GetName(), constants.AdminConfig), metav1.GetOptions{})