From 74531c2f684a9cbe807939c6cb7cc41aa428d715 Mon Sep 17 00:00:00 2001 From: duanmengkk Date: Mon, 13 May 2024 18:20:44 +0800 Subject: [PATCH] add skip to skip some task Signed-off-by: duanmengkk --- .../cluster-manager/app/options/options.go | 8 +++---- cmd/kubenest/operator/app/operator.go | 1 + cmd/kubenest/operator/app/options/options.go | 14 +++++++++---- .../node_controller.go | 5 ++++- .../workflow/task/task.go | 10 +++++++++ .../workflow/workflow.go | 21 +++++++++++++------ 6 files changed, 44 insertions(+), 15 deletions(-) diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index e55936fa1..5bc7e7d98 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -54,10 +54,10 @@ type Options struct { } type KubernetesOptions struct { - KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"` - Master string `json:"master,omitempty" yaml:"master,omitempty"` - QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"` - Burst int `json:"burst,omitempty" yaml:"burst,omitempty"` + KubeConfig string + Master string + QPS float32 + Burst int } func NewOptions() (*Options, error) { diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index e3054d5d8..717505d16 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -121,6 +121,7 @@ func run(ctx context.Context, opts *options.Options) error { RootClientSet: hostKubeClient, KosmosClient: kosmosClient, EventRecorder: mgr.GetEventRecorderFor(constants.NodeControllerName), + Options: &opts.KubeNestOptions, } if err = VirtualClusterNodeController.SetupWithManager(mgr); err != nil { diff --git a/cmd/kubenest/operator/app/options/options.go b/cmd/kubenest/operator/app/options/options.go index d37ffc13e..28ce9c289 100644 --- a/cmd/kubenest/operator/app/options/options.go +++ b/cmd/kubenest/operator/app/options/options.go @@ -11,15 +11,20 @@ import ( type Options struct { LeaderElection componentbaseconfig.LeaderElectionConfiguration KubernetesOptions KubernetesOptions + KubeNestOptions KubeNestOptions AllowNodeOwnbyMulticluster bool KosmosJoinController bool } type KubernetesOptions struct { - KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"` - Master string `json:"master,omitempty" yaml:"master,omitempty"` - QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"` - Burst int `json:"burst,omitempty" yaml:"burst,omitempty"` + KubeConfig string + Master string + QPS float32 + Burst int +} + +type KubeNestOptions struct { + ForceDestroy bool } func NewOptions() *Options { @@ -47,4 +52,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.") flags.BoolVar(&o.AllowNodeOwnbyMulticluster, "multiowner", false, "Allow node own by multicluster or not.") flags.BoolVar(&o.KosmosJoinController, "kosmos-join-controller", false, "Turn on or off kosmos-join-controller.") + flags.BoolVar(&o.KubeNestOptions.ForceDestroy, "kube-nest-force-destroy", false, "Force destroy the node.If it set true.If set to true, Kubernetes will not evict the existing nodes on the node when joining nodes to the tenant's control plane, but will instead force destroy.") } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go index 2e2a3c232..33b588261 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go @@ -21,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" @@ -35,6 +36,7 @@ type NodeController struct { RootClientSet kubernetes.Interface EventRecorder record.EventRecorder KosmosClient versioned.Interface + Options *options.KubeNestOptions } func (r *NodeController) SetupWithManager(mgr manager.Manager) error { @@ -299,7 +301,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob } clusterDNS := "" - dnssvc, err := k8sClient.CoreV1().Services((constants.SystemNs)).Get(ctx, constants.KubeDNSSVCName, metav1.GetOptions{}) + dnssvc, err := k8sClient.CoreV1().Services(constants.SystemNs).Get(ctx, constants.KubeDNSSVCName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("get kube-dns service failed: %s", err) } else { @@ -314,6 +316,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob HostClient: r.Client, HostK8sClient: r.RootClientSet, VirtualK8sClient: k8sClient, + Opt: r.Options, }); err != nil { return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, err) } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go index 6c6a3fa57..df92569e6 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go @@ -17,6 +17,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" @@ -32,6 +33,8 @@ type TaskOpt struct { HostClient client.Client HostK8sClient kubernetes.Interface VirtualK8sClient kubernetes.Interface + + Opt *options.KubeNestOptions } type Task struct { @@ -39,6 +42,7 @@ type Task struct { Run func(context.Context, TaskOpt, interface{}) (interface{}, error) Retry bool SubTasks []Task + Skip func(context.Context, TaskOpt) bool ErrorIgnore bool } @@ -85,6 +89,9 @@ func NewDrainHostNodeTask() Task { return Task{ Name: "drain host node", Retry: true, + Skip: func(ctx context.Context, opt TaskOpt) bool { + return opt.Opt.ForceDestroy + }, Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { targetNode, err := to.HostK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) if err != nil { @@ -107,6 +114,9 @@ func NewDrainVirtualNodeTask() Task { Name: "drain virtual-control-plane node", Retry: true, // ErrorIgnore: true, + Skip: func(ctx context.Context, opt TaskOpt) bool { + return opt.Opt.ForceDestroy + }, Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { targetNode, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) if err != nil { diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go index c92911798..3c550ee0d 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go @@ -45,11 +45,20 @@ func RunWithRetry(ctx context.Context, task task.Task, opt task.TaskOpt, preArgs func (w WorkflowData) RunTask(ctx context.Context, opt task.TaskOpt) error { var args interface{} - for i, task := range w.Tasks { - klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run task %s HHHHHHHHHHHH", i+1, len(w.Tasks), task.Name) - if len(task.SubTasks) > 0 { - for j, subTask := range task.SubTasks { - klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run sub task %s HHHHHHHHHHHH", j+1, len(task.SubTasks), subTask.Name) + for i, t := range w.Tasks { + klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run t %s HHHHHHHHHHHH", i+1, len(w.Tasks), t.Name) + if t.Skip != nil && t.Skip(ctx, opt) { + klog.V(4).Infof("work flow skip task %s", t.Name) + continue + } + if len(t.SubTasks) > 0 { + for j, subTask := range t.SubTasks { + klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run sub t %s HHHHHHHHHHHH", j+1, len(t.SubTasks), subTask.Name) + if t.Skip != nil && t.Skip(ctx, opt) { + klog.V(4).Infof("work flow skip sub task %s", t.Name) + continue + } + if nextArgs, err := RunWithRetry(ctx, subTask, opt, args); err != nil { return err } else { @@ -57,7 +66,7 @@ func (w WorkflowData) RunTask(ctx context.Context, opt task.TaskOpt) error { } } } else { - if nextArgs, err := RunWithRetry(ctx, task, opt, args); err != nil { + if nextArgs, err := RunWithRetry(ctx, t, opt, args); err != nil { return err } else { args = nextArgs