diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index a53fc4283..56c33d4cb 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -139,13 +139,13 @@ func run(ctx context.Context, opts *options.Options) error { return err } - VirtualClusterNodeController := vcnodecontroller.NodeController{ - Client: mgr.GetClient(), - RootClientSet: hostKubeClient, - KosmosClient: kosmosClient, - EventRecorder: mgr.GetEventRecorderFor(constants.NodeControllerName), - Options: &opts.KubeNestOptions, - } + VirtualClusterNodeController := vcnodecontroller.NewNodeController( + mgr.GetClient(), + hostKubeClient, + mgr.GetEventRecorderFor(constants.NodeControllerName), + kosmosClient, + &opts.KubeNestOptions, + ) if err = VirtualClusterNodeController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", constants.NodeControllerName, err) diff --git a/hack/k8s-in-k8s/free_globalnodes.sh b/hack/k8s-in-k8s/free_globalnodes.sh deleted file mode 100644 index dbb363507..000000000 --- a/hack/k8s-in-k8s/free_globalnodes.sh +++ /dev/null @@ -1,66 +0,0 @@ -#!/bin/bash - -filename="nodes.txt" -readarray -t globalnodes < "$filename" - -function deleteState() { - local nodename="$1" - kubectl patch globalnodes $nodename -p '{"spec": {"state": "free"}}' --type=merge -} - -function updateNodeState() { - local nodename="$1" - kubectl patch node $nodename -p '{"metadata": {"labels": {"kosmos-io/state": "free"}}}' -} - -# Update the state of the global nodes -function free_globalnodes() { - local globalnode="$1" - deleteState "$globalnode" - updateNodeState "$globalnode" -} - - - -# Function to display progress bar -show_progress() { - local progress=$1 - local total=$2 - local width=$3 - - # Calculate percentage - local percent=$((progress * 100 / total)) - local num_hashes=$((percent * width / 100)) - - # Generate progress bar - local bar="[" - for ((i = 0; i < width; i++)); do - if ((i < num_hashes)); then - bar+="#" - else - bar+=" " - fi - done - bar+="]" - - # Print progress bar with percentage - printf "\rProgress: %s %d%%" "$bar" "$percent" -} - -# Total steps for the task -total_steps=${#globalnodes[@]} -# Width of the progress bar -bar_width=50 - -# Simulate a task by looping through steps -for ((step = 1; step <= total_steps; step++)); do - # Simulate work with sleep - index=$((step - 1)) - free_globalnodes ${globalnodes[index]} - - # Update progress bar - show_progress $step $total_steps $bar_width -done - -# Print a new line after the progress bar completes -echo \ No newline at end of file diff --git a/hack/k8s-in-k8s/globalnodes_helper.sh b/hack/k8s-in-k8s/globalnodes_helper.sh new file mode 100644 index 000000000..b349bdc6c --- /dev/null +++ b/hack/k8s-in-k8s/globalnodes_helper.sh @@ -0,0 +1,115 @@ +#!/bin/bash + +filename="nodes.txt" +readarray -t globalnodes < "$filename" + +function updateState() { + local nodename="$1" + local state="$2" + kubectl patch globalnodes $nodename -p '{"spec": {"state": "'$state'"}}' --type=merge +} + +function updateNodeState() { + local nodename="$1" + local state="$2" + kubectl patch node $nodename -p '{"metadata": {"labels": {"kosmos-io/state": "'$state'"}}}' +} + +function uncordon() { + local nodename="$1" + kubectl uncordon $nodename + kubectl taint nodes $nodename node.kosmos.io/unschedulable- +} + + +# Update the state of the global nodes +function free_globalnodes() { + local globalnode="$1" + updateState "$globalnode" "free" + updateNodeState "$globalnode" "free" +} + + + +# Update the state of the global nodes +function reserved_globalnodes() { + local globalnode="$1" + updateState "$globalnode" "reserved" + updateNodeState "$globalnode" "reserved" + uncordon "$globalnode" +} + + +# Function to display progress bar +show_progress() { + local progress=$1 + local total=$2 + local width=$3 + + # Calculate percentage + local percent=$((progress * 100 / total)) + local num_hashes=$((percent * width / 100)) + + # Generate progress bar + local bar="[" + for ((i = 0; i < width; i++)); do + if ((i < num_hashes)); then + bar+="#" + else + bar+=" " + fi + done + bar+="]" + + # Print progress bar with percentage + printf "\rProgress: %s %d%%" "$bar" "$percent" +} + +# Total steps for the task +total_steps=${#globalnodes[@]} +# Width of the progress bar +bar_width=50 + +function free() { + # Simulate a task by looping through steps + for ((step = 1; step <= total_steps; step++)); do + # Simulate work with sleep + index=$((step - 1)) + free_globalnodes ${globalnodes[index]} + + # Update progress bar + show_progress $step $total_steps $bar_width + done + + # Print a new line after the progress bar completes + echo +} + +function reserved() { + # Simulate a task by looping through steps + for ((step = 1; step <= total_steps; step++)); do + # Simulate work with sleep + index=$((step - 1)) + reserved_globalnodes ${globalnodes[index]} + + # Update progress bar + show_progress $step $total_steps $bar_width + done + + # Print a new line after the progress bar completes + echo +} + + +# See how we were called. +case "$1" in + free) + free + ;; + reserved) + reserved + ;; + *) + echo $"usage: $0 free|reserved" + exit 1 +esac \ No newline at end of file diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go index 66d0a2284..0956c38a5 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go @@ -135,3 +135,15 @@ func GetWaitNodeReadTime() int { } return num } + +func GetNodeTaskMaxGoroutines() int { + maxGoroutines := os.Getenv("NODE_TASK_MAX_GOROUTINES") + if len(maxGoroutines) == 0 { + maxGoroutines = "10" + } + num, err := strconv.Atoi(maxGoroutines) + if err != nil { + klog.Fatalf("convert NODE_TASK_MAX_GOROUTINES failed, err: %s", err) + } + return num +} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go index 4264746ae..964dca046 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go @@ -3,7 +3,9 @@ package vcnodecontroller import ( "context" "fmt" + "sync" + "github.com/pkg/errors" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,6 +27,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task" "github.com/kosmos.io/kosmos/pkg/kubenest/util" @@ -37,6 +40,19 @@ type NodeController struct { EventRecorder record.EventRecorder KosmosClient versioned.Interface Options *options.KubeNestOptions + sem chan struct{} +} + +func NewNodeController(client client.Client, RootClientSet kubernetes.Interface, EventRecorder record.EventRecorder, KosmosClient versioned.Interface, Options *options.KubeNestOptions) *NodeController { + r := NodeController{ + Client: client, + RootClientSet: RootClientSet, + EventRecorder: EventRecorder, + KosmosClient: KosmosClient, + Options: Options, + sem: make(chan struct{}, env.GetNodeTaskMaxGoroutines()), + } + return &r } func (r *NodeController) SetupWithManager(mgr manager.Manager) error { @@ -285,20 +301,16 @@ func (r *NodeController) DoNodeClean(ctx context.Context, virtualCluster v1alpha } func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, _ kubernetes.Interface) error { - for _, nodeInfo := range nodeInfos { - if err := workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{ + return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { + return workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{ NodeInfo: nodeInfo, VirtualCluster: virtualCluster, HostClient: r.Client, HostK8sClient: r.RootClientSet, Opt: r.Options, // VirtualK8sClient: _, - }); err != nil { - return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err) - } - } - - return nil + }) + }) } func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { @@ -314,8 +326,8 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob clusterDNS = dnssvc.Spec.ClusterIP } - for _, nodeInfo := range nodeInfos { - if err := workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{ + return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { + return workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{ NodeInfo: nodeInfo, VirtualCluster: virtualCluster, KubeDNSAddress: clusterDNS, @@ -323,25 +335,56 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob HostK8sClient: r.RootClientSet, VirtualK8sClient: k8sClient, Opt: r.Options, - }); err != nil { - return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, err) - } - } - return nil + }) + }) } func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { - for _, nodeInfo := range nodeInfos { - if err := workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{ + return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { + return workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{ NodeInfo: nodeInfo, VirtualCluster: virtualCluster, HostClient: r.Client, HostK8sClient: r.RootClientSet, VirtualK8sClient: k8sClient, Opt: r.Options, - }); err != nil { - return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err) + }) + }) +} + +func (r *NodeController) BatchProcessNodes(nodeInfos []v1alpha1.GlobalNode, f func(v1alpha1.GlobalNode) error) error { + var wg sync.WaitGroup + errChan := make(chan error, len(nodeInfos)) + + for _, nodeInfo := range nodeInfos { + wg.Add(1) + r.sem <- struct{}{} + go func(nodeInfo v1alpha1.GlobalNode) { + defer wg.Done() + defer func() { <-r.sem }() + if err := f(nodeInfo); err != nil { + errChan <- fmt.Errorf("[%s] batchprocessnodes failed: %s", nodeInfo.Name, err) + } + }(nodeInfo) + } + + wg.Wait() + close(errChan) + + var taskErr error + for err := range errChan { + if err != nil { + if taskErr == nil { + taskErr = err + } else { + taskErr = errors.Wrap(err, taskErr.Error()) + } } } + + if taskErr != nil { + return taskErr + } + return nil } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/logger.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/logger.go new file mode 100644 index 000000000..740312bdc --- /dev/null +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/logger.go @@ -0,0 +1,36 @@ +package task + +import ( + "fmt" + + "k8s.io/klog/v2" +) + +type PrefixedLogger struct { + level klog.Verbose + prefix string +} + +func NewPrefixedLogger(level klog.Verbose, prefix string) *PrefixedLogger { + return &PrefixedLogger{level: level, prefix: prefix} +} + +func (p *PrefixedLogger) Info(args ...interface{}) { + if p.level.Enabled() { + klog.InfoDepth(1, append([]interface{}{p.prefix}, args...)...) + } +} + +func (p *PrefixedLogger) Infof(format string, args ...interface{}) { + if p.level.Enabled() { + klog.InfoDepth(1, fmt.Sprintf(p.prefix+format, args...)) + } +} + +func (p *PrefixedLogger) Error(args ...interface{}) { + klog.ErrorDepth(1, append([]interface{}{p.prefix}, args...)...) +} + +func (p *PrefixedLogger) Errorf(format string, args ...interface{}) { + klog.ErrorDepth(1, fmt.Sprintf(p.prefix+format, args...)) +} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go index 00dc29d90..8bd159145 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go @@ -34,7 +34,15 @@ type TaskOpt struct { HostK8sClient kubernetes.Interface VirtualK8sClient kubernetes.Interface - Opt *options.KubeNestOptions + Opt *options.KubeNestOptions + logger *PrefixedLogger +} + +func (to *TaskOpt) Loger() *PrefixedLogger { + if to.logger == nil { + to.logger = NewPrefixedLogger(klog.V(4), fmt.Sprintf("[%s] ", to.NodeInfo.Name)) + } + return to.logger } type Task struct { @@ -255,7 +263,7 @@ func NewRemoteNodeJoinTask() Task { joinCmd := &exector.CMDExector{ Cmd: fmt.Sprintf("bash %s join %s", env.GetExectorShellName(), to.KubeDNSAddress), } - klog.V(4).Infof("join node %s with cmd: %s", to.NodeInfo.Name, joinCmd.Cmd) + to.Loger().Infof("join node %s with cmd: %s", to.NodeInfo.Name, joinCmd.Cmd) ret := exectHelper.DoExector(ctx.Done(), joinCmd) if ret.Status != exector.SUCCESS { return nil, fmt.Errorf("join node %s failed: %s", to.NodeInfo.Name, ret.String()) @@ -283,14 +291,14 @@ func NewWaitNodeReadyTask(isHost bool) Task { node, err := client.CoreV1().Nodes().Get(waitCtx, to.NodeInfo.Name, metav1.GetOptions{}) if err == nil { if util.IsNodeReady(node.Status.Conditions) { - klog.V(4).Infof("node %s is ready", to.NodeInfo.Name) + to.Loger().Infof("node %s is ready", to.NodeInfo.Name) isReady = true cancel() } else { - klog.V(4).Infof("node %s is not ready, status: %s", to.NodeInfo.Name, node.Status.Phase) + to.Loger().Infof("node %s is not ready, status: %s", to.NodeInfo.Name, node.Status.Phase) } } else { - klog.V(4).Infof("get node %s failed: %s", to.NodeInfo.Name, err) + to.Loger().Infof("get node %s failed: %s", to.NodeInfo.Name, err) } }, 10*time.Second) // Interval time } @@ -302,7 +310,7 @@ func NewWaitNodeReadyTask(isHost bool) Task { } // try to restart containerd and kubelet - klog.V(4).Infof("try to restart containerd and kubelet on node: %s", to.NodeInfo.Name) + to.Loger().Infof("try to restart containerd and kubelet on node: %s", to.NodeInfo.Name) exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") restartContainerdCmd := &exector.CMDExector{ @@ -321,7 +329,7 @@ func NewWaitNodeReadyTask(isHost bool) Task { return nil, fmt.Errorf("cannot restart kubelet: %s", ret.String()) } - klog.V(4).Infof("wait for the node to be ready again. %s", to.NodeInfo.Name) + to.Loger().Infof("wait for the node to be ready again. %s", to.NodeInfo.Name) waitFunc(time.Duration(env.GetWaitNodeReadTime()*2) * time.Second) if isReady { @@ -342,7 +350,7 @@ func NewUpdateVirtualNodeLabelsTask() Task { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { node, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) if err != nil { - klog.V(4).Infof("get node %s failed: %s", to.NodeInfo.Name, err) + to.Loger().Infof("get node %s failed: %s", to.NodeInfo.Name, err) return err } @@ -355,7 +363,7 @@ func NewUpdateVirtualNodeLabelsTask() Task { updateNode.Labels[constants.StateLabelKey] = string(v1alpha1.NodeInUse) if _, err := to.VirtualK8sClient.CoreV1().Nodes().Update(ctx, updateNode, metav1.UpdateOptions{}); err != nil { - klog.V(4).Infof("add label to node %s failed: %s", to.NodeInfo.Name, err) + to.Loger().Infof("add label to node %s failed: %s", to.NodeInfo.Name, err) return err } return nil @@ -375,7 +383,7 @@ func NewUpdateHostNodeLabelsTask() Task { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { node, err := to.HostK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) if err != nil { - klog.V(4).Infof("get node %s failed: %s", to.NodeInfo.Name, err) + to.Loger().Infof("get node %s failed: %s", to.NodeInfo.Name, err) return err } @@ -388,7 +396,7 @@ func NewUpdateHostNodeLabelsTask() Task { updateNode.Labels[constants.StateLabelKey] = string(v1alpha1.NodeFreeState) if _, err := to.HostK8sClient.CoreV1().Nodes().Update(ctx, updateNode, metav1.UpdateOptions{}); err != nil { - klog.V(4).Infof("add label to node %s failed: %s", to.NodeInfo.Name, err) + to.Loger().Infof("add label to node %s failed: %s", to.NodeInfo.Name, err) return err } return nil @@ -407,7 +415,7 @@ func NewUpdateNodePoolItemStatusTask(nodeState v1alpha1.NodeState, isClean bool) targetGlobalNode := v1alpha1.GlobalNode{} if err := to.HostClient.Get(ctx, types.NamespacedName{Name: to.NodeInfo.Name}, &targetGlobalNode); err != nil { - klog.Errorf("get global node %s failed: %s", to.NodeInfo.Name, err) + to.Loger().Errorf("get global node %s failed: %s", to.NodeInfo.Name, err) return err } @@ -415,13 +423,13 @@ func NewUpdateNodePoolItemStatusTask(nodeState v1alpha1.NodeState, isClean bool) updateGlobalNode.Spec.State = nodeState if err := to.HostClient.Update(ctx, updateGlobalNode); err != nil { - klog.Errorf("update global node %s spec.state failed: %s", updateGlobalNode.Name, err) + to.Loger().Errorf("update global node %s spec.state failed: %s", updateGlobalNode.Name, err) return err } if isClean { updateGlobalNode.Status.VirtualCluster = "" if err := to.HostClient.Status().Update(ctx, updateGlobalNode); err != nil { - klog.Errorf("update global node %s status failed: %s", updateGlobalNode.Name, err) + to.Loger().Errorf("update global node %s status failed: %s", updateGlobalNode.Name, err) return err } } @@ -523,6 +531,16 @@ func NewExecJoinNodeToHostCmdTask() Task { Name: "remote join node to host", Retry: true, Run: func(ctx context.Context, to TaskOpt, args interface{}) (interface{}, error) { + // check + _, err := to.HostK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) + if err == nil { + to.Loger().Info("node already joined, skip task") + return nil, nil + } + if !apierrors.IsNotFound(err) { + return nil, fmt.Errorf("query node %s failed, the error is %s", to.NodeInfo.Name, err.Error()) + } + joinCmdStr, ok := args.(string) if !ok { return nil, fmt.Errorf("get join cmd str failed") diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go index 39ad3ae2b..e78bde136 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go @@ -4,8 +4,6 @@ import ( "context" "time" - "k8s.io/klog/v2" - "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task" ) @@ -29,7 +27,7 @@ func RunWithRetry(ctx context.Context, task task.Task, opt task.TaskOpt, preArgs break } waitTime := 3 * (i + 1) - klog.V(4).Infof("work flow retry %d after %ds, task name: %s, err: %s", i, waitTime, task.Name, err) + opt.Loger().Infof("work flow retry %d after %ds, task name: %s, err: %s", i, waitTime, task.Name, err) time.Sleep(time.Duration(waitTime) * time.Second) } else { break @@ -37,10 +35,10 @@ func RunWithRetry(ctx context.Context, task task.Task, opt task.TaskOpt, preArgs } if err != nil { if task.ErrorIgnore { - klog.V(4).Infof("work flow ignore err, task name: %s, err: %s", task.Name, err) + opt.Loger().Infof("work flow ignore err, task name: %s, err: %s", task.Name, err) return nil, nil } - klog.V(4).Infof("work flow interrupt, task name: %s, err: %s", task.Name, err) + opt.Loger().Infof("work flow interrupt, task name: %s, err: %s", task.Name, err) return nil, err } return args, nil @@ -49,16 +47,16 @@ func RunWithRetry(ctx context.Context, task task.Task, opt task.TaskOpt, preArgs func (w WorkflowData) RunTask(ctx context.Context, opt task.TaskOpt) error { var args interface{} for i, t := range w.Tasks { - klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run task %s HHHHHHHHHHHH", i+1, len(w.Tasks), t.Name) + opt.Loger().Infof("HHHHHHHHHHHH (%d/%d) work flow run task %s HHHHHHHHHHHH", i+1, len(w.Tasks), t.Name) if t.Skip != nil && t.Skip(ctx, opt) { - klog.V(4).Infof("work flow skip task %s", t.Name) + opt.Loger().Infof("work flow skip task %s", t.Name) continue } if len(t.SubTasks) > 0 { for j, subTask := range t.SubTasks { - klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run sub task %s HHHHHHHHHHHH", j+1, len(t.SubTasks), subTask.Name) + opt.Loger().Infof("HHHHHHHHHHHH (%d/%d) work flow run sub task %s HHHHHHHHHHHH", j+1, len(t.SubTasks), subTask.Name) if t.Skip != nil && t.Skip(ctx, opt) { - klog.V(4).Infof("work flow skip sub task %s", t.Name) + opt.Loger().Infof("work flow skip sub task %s", t.Name) continue } diff --git a/pkg/kubenest/tasks/anp.go b/pkg/kubenest/tasks/anp.go index dca200081..99f08864c 100644 --- a/pkg/kubenest/tasks/anp.go +++ b/pkg/kubenest/tasks/anp.go @@ -34,18 +34,14 @@ func NewAnpTask() workflow.Task { Name: "Upload-ProxyAgentCert", Run: runUploadProxyAgentCert, }, - { - Name: "deploy-anp-server", - Run: runAnpServer, - }, - { - Name: "check-anp-health", - Run: runCheckVirtualClusterAnp, - }, { Name: "deploy-anp-agent", Run: runAnpAgent, }, + { + Name: "deploy-anp-server", + Run: runAnpServer, + }, { Name: "check-anp-health", Run: runCheckVirtualClusterAnp, @@ -221,7 +217,9 @@ func installAnpAgent(data InitData) error { } actionFunc := func(ctx context.Context, c dynamic.Interface, u *unstructured.Unstructured) error { // create the object - return util.ApplyObject(vcClient, u) + return apiclient.TryRunCommand(func() error { + return util.ApplyObject(vcClient, u) + }, 3) } return util.ForEachObjectInYAML(context.TODO(), vcClient, []byte(anpAgentManifestBytes), "", actionFunc) }