Skip to content

Commit

Permalink
add skip to skip some task
Browse files Browse the repository at this point in the history
Signed-off-by: duanmengkk <[email protected]>
  • Loading branch information
duanmengkk committed May 15, 2024
1 parent 71c767f commit 74531c2
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 15 deletions.
8 changes: 4 additions & 4 deletions cmd/clustertree/cluster-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ type Options struct {
}

type KubernetesOptions struct {
KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"`
Master string `json:"master,omitempty" yaml:"master,omitempty"`
QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"`
Burst int `json:"burst,omitempty" yaml:"burst,omitempty"`
KubeConfig string
Master string
QPS float32
Burst int
}

func NewOptions() (*Options, error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func run(ctx context.Context, opts *options.Options) error {
RootClientSet: hostKubeClient,
KosmosClient: kosmosClient,
EventRecorder: mgr.GetEventRecorderFor(constants.NodeControllerName),
Options: &opts.KubeNestOptions,
}

if err = VirtualClusterNodeController.SetupWithManager(mgr); err != nil {
Expand Down
14 changes: 10 additions & 4 deletions cmd/kubenest/operator/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@ import (
type Options struct {
LeaderElection componentbaseconfig.LeaderElectionConfiguration
KubernetesOptions KubernetesOptions
KubeNestOptions KubeNestOptions
AllowNodeOwnbyMulticluster bool
KosmosJoinController bool
}

type KubernetesOptions struct {
KubeConfig string `json:"kubeconfig" yaml:"kubeconfig"`
Master string `json:"master,omitempty" yaml:"master,omitempty"`
QPS float32 `json:"qps,omitempty" yaml:"qps,omitempty"`
Burst int `json:"burst,omitempty" yaml:"burst,omitempty"`
KubeConfig string
Master string
QPS float32
Burst int
}

type KubeNestOptions struct {
ForceDestroy bool
}

func NewOptions() *Options {
Expand Down Expand Up @@ -47,4 +52,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.")
flags.BoolVar(&o.AllowNodeOwnbyMulticluster, "multiowner", false, "Allow node own by multicluster or not.")
flags.BoolVar(&o.KosmosJoinController, "kosmos-join-controller", false, "Turn on or off kosmos-join-controller.")
flags.BoolVar(&o.KubeNestOptions.ForceDestroy, "kube-nest-force-destroy", false, "Force destroy the node.If it set true.If set to true, Kubernetes will not evict the existing nodes on the node when joining nodes to the tenant's control plane, but will instead force destroy.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options"
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
Expand All @@ -35,6 +36,7 @@ type NodeController struct {
RootClientSet kubernetes.Interface
EventRecorder record.EventRecorder
KosmosClient versioned.Interface
Options *options.KubeNestOptions
}

func (r *NodeController) SetupWithManager(mgr manager.Manager) error {
Expand Down Expand Up @@ -299,7 +301,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob
}

clusterDNS := ""
dnssvc, err := k8sClient.CoreV1().Services((constants.SystemNs)).Get(ctx, constants.KubeDNSSVCName, metav1.GetOptions{})
dnssvc, err := k8sClient.CoreV1().Services(constants.SystemNs).Get(ctx, constants.KubeDNSSVCName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get kube-dns service failed: %s", err)
} else {
Expand All @@ -314,6 +316,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
VirtualK8sClient: k8sClient,
Opt: r.Options,
}); err != nil {
return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options"
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env"
Expand All @@ -32,13 +33,16 @@ type TaskOpt struct {
HostClient client.Client
HostK8sClient kubernetes.Interface
VirtualK8sClient kubernetes.Interface

Opt *options.KubeNestOptions
}

type Task struct {
Name string
Run func(context.Context, TaskOpt, interface{}) (interface{}, error)
Retry bool
SubTasks []Task
Skip func(context.Context, TaskOpt) bool
ErrorIgnore bool
}

Expand Down Expand Up @@ -85,6 +89,9 @@ func NewDrainHostNodeTask() Task {
return Task{
Name: "drain host node",
Retry: true,
Skip: func(ctx context.Context, opt TaskOpt) bool {
return opt.Opt.ForceDestroy
},
Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) {
targetNode, err := to.HostK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -107,6 +114,9 @@ func NewDrainVirtualNodeTask() Task {
Name: "drain virtual-control-plane node",
Retry: true,
// ErrorIgnore: true,
Skip: func(ctx context.Context, opt TaskOpt) bool {
return opt.Opt.ForceDestroy
},
Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) {
targetNode, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,28 @@ func RunWithRetry(ctx context.Context, task task.Task, opt task.TaskOpt, preArgs

func (w WorkflowData) RunTask(ctx context.Context, opt task.TaskOpt) error {
var args interface{}
for i, task := range w.Tasks {
klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run task %s HHHHHHHHHHHH", i+1, len(w.Tasks), task.Name)
if len(task.SubTasks) > 0 {
for j, subTask := range task.SubTasks {
klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run sub task %s HHHHHHHHHHHH", j+1, len(task.SubTasks), subTask.Name)
for i, t := range w.Tasks {
klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run t %s HHHHHHHHHHHH", i+1, len(w.Tasks), t.Name)
if t.Skip != nil && t.Skip(ctx, opt) {
klog.V(4).Infof("work flow skip task %s", t.Name)
continue
}
if len(t.SubTasks) > 0 {
for j, subTask := range t.SubTasks {
klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run sub t %s HHHHHHHHHHHH", j+1, len(t.SubTasks), subTask.Name)
if t.Skip != nil && t.Skip(ctx, opt) {
klog.V(4).Infof("work flow skip sub task %s", t.Name)
continue
}

if nextArgs, err := RunWithRetry(ctx, subTask, opt, args); err != nil {
return err
} else {
args = nextArgs
}
}
} else {
if nextArgs, err := RunWithRetry(ctx, task, opt, args); err != nil {
if nextArgs, err := RunWithRetry(ctx, t, opt, args); err != nil {
return err
} else {
args = nextArgs
Expand Down

0 comments on commit 74531c2

Please sign in to comment.