Skip to content

Commit

Permalink
Merge pull request #488 from zhouhaoA1/feature_components_deploy
Browse files Browse the repository at this point in the history
fix nodes assign func bug
  • Loading branch information
kosmos-robot authored Apr 25, 2024
2 parents a44007b + 338c7d8 commit 08051b0
Showing 1 changed file with 72 additions and 30 deletions.
102 changes: 72 additions & 30 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,26 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re
}
case v1alpha1.Completed:
//update request, check if promotepolicy nodes increase or decrease.
err := c.assignWorkNodes(updatedCluster)
// only 2 scenarios matched update request with status 'completed'.
// 1. node scale request, original status is 'completed'. 2. node scale process finished by NodeController, the controller changes status from 'updating' to 'completed'
policyChanged, err := c.checkPromotePoliciesChanged(updatedCluster)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name)
klog.Errorf("Error check promote policies changed. err: %s", err.Error())
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error checkPromotePoliciesChanged virtualcluster %s", updatedCluster.Name)
}
updatedCluster.Status.Phase = v1alpha1.Updating
err = c.Update(originalCluster, updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
if !policyChanged {
return reconcile.Result{}, nil
} else {
err := c.assignWorkNodes(updatedCluster)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name)
}
updatedCluster.Status.Phase = v1alpha1.Updating
err = c.Update(originalCluster, updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}
}
default:
klog.Warningf("Skip virtualcluster %s reconcile status: %s", originalCluster.Name, originalCluster.Status.Phase)
Expand Down Expand Up @@ -248,63 +259,94 @@ func (c *VirtualClusterInitController) assignWorkNodes(virtualCluster *v1alpha1.
return nil
}

// nodesChangeCalculate calculate nodes changed when update virtualcluster.
func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, globalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) {
var matched int32 = 0
var nodesAssignedMatchedPolicy []v1alpha1.NodeInfo
var nodesAssignedUnMatched []v1alpha1.NodeInfo
nodesAssigned := virtualCluster.Spec.PromoteResources.NodeInfos
for _, nodeInfo := range nodesAssigned {
node, ok := util.FindGlobalNode(nodeInfo.NodeName, globalNodes)
if !ok {
return nodesAssigned, errors.Errorf("Node %s doesn't find in nodes pool", nodeInfo.NodeName)
func (c *VirtualClusterInitController) checkPromotePoliciesChanged(virtualCluster *v1alpha1.VirtualCluster) (bool, error) {
globalNodeList := &v1alpha1.GlobalNodeList{}
if err := c.Client.List(context.TODO(), globalNodeList); err != nil {
return false, fmt.Errorf("list global nodes: %w", err)
}
for _, policy := range virtualCluster.Spec.PromotePolicies {
nodesAssignedMatchedPolicy, err := getAssignedNodesByPolicy(virtualCluster, policy, globalNodeList.Items)
if err != nil {
return false, errors.Wrapf(err, "Parse assigned nodes by policy %s error", policy.LabelSelector.String())
}
if mapContains(node.Labels, policy.LabelSelector.MatchLabels) {
nodesAssignedMatchedPolicy = append(nodesAssignedMatchedPolicy, nodeInfo)
matched++
} else {
nodesAssignedUnMatched = append(nodesAssignedUnMatched, nodeInfo)
if policy.NodeCount != int32(len(nodesAssignedMatchedPolicy)) {
klog.V(2).Infof("Promote policy node count changed from %d to %d", len(nodesAssignedMatchedPolicy), policy.NodeCount)
return true, nil
}
}
requestNodesChanged := policy.NodeCount - matched
return false, nil
}

// nodesChangeCalculate calculate nodes changed when update virtualcluster.
func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, globalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) {
nodesAssigned, err := getAssignedNodesByPolicy(virtualCluster, policy, globalNodes)
if err != nil {
return nil, errors.Wrapf(err, "Parse assigned nodes by policy %s error", policy.LabelSelector.String())
}

requestNodesChanged := policy.NodeCount - int32(len(nodesAssigned))
if requestNodesChanged == 0 {
klog.V(2).Infof("Nothing to do for policy %s", policy.LabelSelector.String())
return nodesAssigned, nil
} else if requestNodesChanged > 0 { // nodes needs to be increased
klog.V(2).Infof("Try allocate %d nodes for policy %s", requestNodesChanged, policy.LabelSelector.String())
var cnt int32 = 0
for _, globalNode := range globalNodes {
var updatedGlobalNodes []*v1alpha1.GlobalNode
for i, globalNode := range globalNodes {
if globalNode.Spec.State == v1alpha1.NodeFreeState && mapContains(globalNode.Spec.Labels, policy.LabelSelector.MatchLabels) {
nodesAssigned = append(nodesAssigned, v1alpha1.NodeInfo{
NodeName: globalNode.Name,
})
// 更新globalNode的状态为占用状态
updated := globalNode.DeepCopy()
updated.Spec.State = v1alpha1.NodeInUse
if err := c.Client.Update(context.TODO(), updated); err != nil {
return nodesAssigned, errors.Wrapf(err, "Failed to update globalNode %s to InUse", globalNode.Name)
updated.Status = v1alpha1.GlobalNodeStatus{
VirtualCluster: virtualCluster.Name,
}
globalNodes[i] = *updated
updatedGlobalNodes = append(updatedGlobalNodes, updated)
cnt++
}
if cnt == requestNodesChanged {
break
}
}
if cnt < requestNodesChanged {
return nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, matched)
return nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, cnt)
}
for _, updated := range updatedGlobalNodes {
klog.V(2).Infof("Assign node %s for virtualcluster %s policy %s", updated.Name, virtualCluster.GetName(), policy.LabelSelector.String())
if err := c.Client.Update(context.TODO(), updated); err != nil {
return nil, errors.Wrapf(err, "Failed to update globalNode %s to InUse", updated.Name)
}
}

} else { // nodes needs to decrease
klog.V(2).Infof("Try decrease nodes %d for policy %s", -requestNodesChanged, policy.LabelSelector.String())
decrease := int(-requestNodesChanged)
if len(nodesAssignedMatchedPolicy) < decrease {
return nodesAssigned, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssignedMatchedPolicy))
if len(nodesAssigned) < decrease {
return nil, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssigned))
}
nodesAssigned = append(nodesAssignedUnMatched, nodesAssignedMatchedPolicy[:(len(nodesAssignedMatchedPolicy)-decrease)]...)
nodesAssigned = nodesAssigned[:len(nodesAssigned)-decrease]
// note: node pool will not be modified here. NodeController will modify it when node delete success
}
return nodesAssigned, nil
}

func getAssignedNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, globalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) {
var nodesAssignedMatchedPolicy []v1alpha1.NodeInfo
for _, nodeInfo := range virtualCluster.Spec.PromoteResources.NodeInfos {
node, ok := util.FindGlobalNode(nodeInfo.NodeName, globalNodes)
if !ok {
return nil, errors.Errorf("Node %s doesn't find in nodes pool", nodeInfo.NodeName)
}
if mapContains(node.Labels, policy.LabelSelector.MatchLabels) {
nodesAssignedMatchedPolicy = append(nodesAssignedMatchedPolicy, nodeInfo)
}
}
return nodesAssignedMatchedPolicy, nil
}

func (c *VirtualClusterInitController) ensureAllPodsRunning(virtualCluster *v1alpha1.VirtualCluster, timeout time.Duration) error {
secret, err := c.RootClientSet.CoreV1().Secrets(virtualCluster.GetNamespace()).Get(context.TODO(),
fmt.Sprintf("%s-%s", virtualCluster.GetName(), constants.AdminConfig), metav1.GetOptions{})
Expand Down

0 comments on commit 08051b0

Please sign in to comment.