From 28bdc52925fd86708ad331b87dda0c1fe7f7928f Mon Sep 17 00:00:00 2001 From: OrangeBao Date: Fri, 10 May 2024 15:19:28 +0800 Subject: [PATCH] feat: add taint for node Signed-off-by: OrangeBao --- deploy/virtual-cluster-operator.yml | 46 ++++++--- .../operator => hack}/kubelet_node_helper.sh | 52 ++++++---- hack/node_agent_backup.sh | 17 ++++ .../global_node_controller.go | 95 +++++++++++++++++-- .../virtualcluster.node.controller/env/env.go | 10 +- .../workflow/task/task.go | 38 +++++--- 6 files changed, 208 insertions(+), 50 deletions(-) rename {cmd/kubenest/operator => hack}/kubelet_node_helper.sh (70%) create mode 100755 hack/node_agent_backup.sh diff --git a/deploy/virtual-cluster-operator.yml b/deploy/virtual-cluster-operator.yml index 760a0a6a9..0e1926f12 100644 --- a/deploy/virtual-cluster-operator.yml +++ b/deploy/virtual-cluster-operator.yml @@ -61,6 +61,9 @@ data: # path for kubeadm PATH_KUBEADM=/usr/bin/kubeadm ################################################## + # path for kubeadm config + PATH_KUBEADM_CONFIG=/etc/kubeadm + ################################################## # path for kubernetes PATH_KUBERNETES=/etc/kubernetes/ PATH_KUBERNETES_PKI="$PATH_KUBERNETES/pki" @@ -75,16 +78,33 @@ data: # args DNS_ADDRESS=${2:-10.237.0.10} LOG_NAME=${2:-kubelet} + JOIN_TOKEN=$2 function unjoin() { # before unjoin, you need delete node by kubectl - echo "exec(1/2): kubeadm reset...." + echo "exec(1/1): kubeadm reset...." echo "y" | ${PATH_KUBEADM} reset if [ $? -ne 0 ]; then exit 1 fi - echo "exec(2/2): reset kubeadm-flags.env...." - cp "$PATH_FILE_TMP/kubeadm-flags.env.back" "$PATH_KUBELET_LIB/kubeadm-flags.env" + } + + function revert() { + echo "exec(1/3): update kubeadm.cfg..." + sed -e "s|token: .*$|token: $JOIN_TOKEN|g" -e "w $PATH_FILE_TMP/kubeadm.cfg.current" "$PATH_KUBEADM_CONFIG/kubeadm.cfg" + if [ $? -ne 0 ]; then + exit 1 + fi + + # add taints + echo "exec(2/3): update kubeadm.cfg tanits..." + sed -i "/kubeletExtraArgs/a \ register-with-taints: node.kosmos.io/unschedulable:NoSchedule" "$PATH_FILE_TMP/kubeadm.cfg.current" + if [ $? -ne 0 ]; then + exit 1 + fi + + echo "exec(3/3): execute join cmd...." + kubeadm join --config "$PATH_FILE_TMP/kubeadm.cfg.current" if [ $? -ne 0 ]; then exit 1 fi @@ -153,23 +173,22 @@ data: # check the environments function check() { - if [ ! -d "$PATH_FILE_TMP" ]; then - echo "check(1/3): try to create $PATH_FILE_TMP" + echo "check(1/2): try to create $PATH_FILE_TMP" + if [ ! -d "$PATH_FILE_TMP" ]; then mkdir -p "$PATH_FILE_TMP" if [ $? -ne 0 ]; then exit 1 fi - echo "check(2/3): copy kubeadm-flags.env to create $PATH_FILE_TMP and remove args[cloud-provider] " + fi + + echo "check(2/2): copy kubeadm-flags.env to create $PATH_FILE_TMP and remove args[cloud-provider] " + if [ ! -f "${PATH_FILE_TMP}/kubeadm-flags.env"]; then sed -e "s| --cloud-provider=external | |g" -e "w ${PATH_FILE_TMP}/kubeadm-flags.env" "$PATH_KUBELET_LIB/kubeadm-flags.env" if [ $? -ne 0 ]; then exit 1 fi - echo "check(3/3): backup kubeadm-flags.env" - cp "$PATH_KUBELET_LIB/kubeadm-flags.env" "$PATH_FILE_TMP/kubeadm-flags.env.back" - if [ $? -ne 0 ]; then - exit 1 - fi fi + echo "environments is ok" } @@ -194,11 +213,14 @@ data: log) log ;; + revert) + revert + ;; version) version ;; *) - echo $"usage: $0 unjoin|join|health|log|check|version" + echo $"usage: $0 unjoin|join|health|log|check|version|revert" exit 1 esac config.yaml: | diff --git a/cmd/kubenest/operator/kubelet_node_helper.sh b/hack/kubelet_node_helper.sh similarity index 70% rename from cmd/kubenest/operator/kubelet_node_helper.sh rename to hack/kubelet_node_helper.sh index fee15d79e..b155f69ae 100755 --- a/cmd/kubenest/operator/kubelet_node_helper.sh +++ b/hack/kubelet_node_helper.sh @@ -7,6 +7,9 @@ PATH_FILE_TMP=/apps/conf/kosmos/tmp # path for kubeadm PATH_KUBEADM=/usr/bin/kubeadm ################################################## +# path for kubeadm config +PATH_KUBEADM_CONFIG=/etc/kubeadm +################################################## # path for kubernetes PATH_KUBERNETES=/etc/kubernetes/ PATH_KUBERNETES_PKI="$PATH_KUBERNETES/pki" @@ -21,16 +24,33 @@ KUBELET_CONFIG_NAME=config.yaml # args DNS_ADDRESS=${2:-10.237.0.10} LOG_NAME=${2:-kubelet} +JOIN_TOKEN=$2 function unjoin() { # before unjoin, you need delete node by kubectl - echo "exec(1/2): kubeadm reset...." + echo "exec(1/1): kubeadm reset...." echo "y" | ${PATH_KUBEADM} reset if [ $? -ne 0 ]; then exit 1 fi - echo "exec(2/2): reset kubeadm-flags.env...." - cp "$PATH_FILE_TMP/kubeadm-flags.env.back" "$PATH_KUBELET_LIB/kubeadm-flags.env" +} + +function revert() { + echo "exec(1/3): update kubeadm.cfg..." + sed -e "s|token: .*$|token: $JOIN_TOKEN|g" -e "w $PATH_FILE_TMP/kubeadm.cfg.current" "$PATH_KUBEADM_CONFIG/kubeadm.cfg" + if [ $? -ne 0 ]; then + exit 1 + fi + + # add taints + echo "exec(2/3): update kubeadm.cfg tanits..." + sed -i "/kubeletExtraArgs/a \ register-with-taints: node.kosmos.io/unschedulable:NoSchedule" "$PATH_FILE_TMP/kubeadm.cfg.current" + if [ $? -ne 0 ]; then + exit 1 + fi + + echo "exec(3/3): execute join cmd...." + kubeadm join --config "$PATH_FILE_TMP/kubeadm.cfg.current" if [ $? -ne 0 ]; then exit 1 fi @@ -99,23 +119,20 @@ function log() { # check the environments function check() { - if [ ! -d "$PATH_FILE_TMP" ]; then - echo "check(1/3): try to create $PATH_FILE_TMP" + echo "check(1/2): try to create $PATH_FILE_TMP" + if [ ! -d "$PATH_FILE_TMP" ]; then mkdir -p "$PATH_FILE_TMP" if [ $? -ne 0 ]; then exit 1 fi - echo "check(2/3): copy kubeadm-flags.env to create $PATH_FILE_TMP and remove args[cloud-provider] " - sed -e "s| --cloud-provider=external | |g" -e "w ${PATH_FILE_TMP}/kubeadm-flags.env" "$PATH_KUBELET_LIB/kubeadm-flags.env" - if [ $? -ne 0 ]; then - exit 1 - fi - echo "check(3/3): backup kubeadm-flags.env" - cp "$PATH_KUBELET_LIB/kubeadm-flags.env" "$PATH_FILE_TMP/kubeadm-flags.env.back" - if [ $? -ne 0 ]; then - exit 1 - fi fi + + echo "check(2/2): copy kubeadm-flags.env to create $PATH_FILE_TMP and remove args[cloud-provider] " + sed -e "s| --cloud-provider=external | |g" -e "w ${PATH_FILE_TMP}/kubeadm-flags.env" "$PATH_KUBELET_LIB/kubeadm-flags.env" + if [ $? -ne 0 ]; then + exit 1 + fi + echo "environments is ok" } @@ -140,10 +157,13 @@ case "$1" in log) log ;; + revert) + revert + ;; version) version ;; *) - echo $"usage: $0 unjoin|join|health|log|check|version" + echo $"usage: $0 unjoin|join|health|log|check|version|revert" exit 1 esac diff --git a/hack/node_agent_backup.sh b/hack/node_agent_backup.sh new file mode 100755 index 000000000..0c1317f71 --- /dev/null +++ b/hack/node_agent_backup.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +echo "(1/2) Try to backup tmp dir" +mv /apps/conf/kosmos/tmp /apps/conf/kosmos/tmp.bk +if [ ! $? -eq 0 ]; then + echo "backup tmp dir failed" + exit +fi + +echo "(2/2) Try to backup kubelet_node_helper" +mv /srv/node-agent/kubelet_node_helper.sh '/srv/node-agent/kubelet_node_helper.sh.'`date +%Y_%m_%d_%H_%M_%S` +if [ ! $? -eq 0 ]; then + echo "backup kubelet_node_helper.sh failed" + exit +fi + +echo "backup successed" \ No newline at end of file diff --git a/pkg/kubenest/controller/global.node.controller/global_node_controller.go b/pkg/kubenest/controller/global.node.controller/global_node_controller.go index 6e4757734..17cf46146 100644 --- a/pkg/kubenest/controller/global.node.controller/global_node_controller.go +++ b/pkg/kubenest/controller/global.node.controller/global_node_controller.go @@ -16,13 +16,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" "github.com/kosmos.io/kosmos/pkg/kubenest/util" "github.com/kosmos.io/kosmos/pkg/utils" ) @@ -41,7 +44,7 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named(constants.GlobalNodeControllerName). - WithOptions(controller.Options{}). + WithOptions(controller.Options{MaxConcurrentReconciles: 5}). For(&v1alpha1.GlobalNode{}, builder.WithPredicates(predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { return true @@ -56,9 +59,71 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error { return true }, })). + Watches(&source.Kind{Type: &v1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.newNodeMapFunc())). + Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(r.newVirtualClusterMapFunc())). Complete(r) } +func (r *GlobalNodeController) newVirtualClusterMapFunc() handler.MapFunc { + return func(a client.Object) []reconcile.Request { + var requests []reconcile.Request + vcluster := a.(*v1alpha1.VirtualCluster) + if vcluster.Status.Phase != v1alpha1.Completed { + return requests + } + klog.V(4).Infof("global-node-controller: virtualclusternode change to completed: %s", vcluster.Name) + for _, nodeInfo := range vcluster.Spec.PromoteResources.NodeInfos { + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{ + Name: nodeInfo.NodeName, + }}) + } + return requests + } +} + +func (r *GlobalNodeController) newNodeMapFunc() handler.MapFunc { + return func(a client.Object) []reconcile.Request { + var requests []reconcile.Request + node := a.(*v1.Node) + klog.V(4).Infof("global-node-controller: node change: %s", node.Name) + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{ + Name: node.Name, + }}) + return requests + } +} + +func (r *GlobalNodeController) SyncTaint(ctx context.Context, globalNode *v1alpha1.GlobalNode) error { + if globalNode.Spec.State == v1alpha1.NodeFreeState { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + var targetNode v1.Node + if err := r.Get(ctx, types.NamespacedName{Name: globalNode.Name}, &targetNode); err != nil { + klog.Errorf("global-node-controller: SyncTaints: can not get global node, err: %s", globalNode.Name) + return err + } + + if targetNode.Spec.Unschedulable { + klog.V(4).Infof("global-node-controller: SyncTaints: node is unschedulable %s, skip", globalNode.Name) + return nil + } + + if _, ok := targetNode.Labels[env.GetControlPlaneLabel()]; ok { + klog.V(4).Infof("global-node-controller: SyncTaints: control-plane node %s, skip", globalNode.Name) + return nil + } + + if err := util.DrainNode(ctx, targetNode.Name, r.RootClientSet, &targetNode, env.GetDrainWaitSeconds()); err != nil { + return err + } + return nil + }) + return err + } else { + klog.V(4).Infof("global-node-controller: SyncTaints: node status is %s, skip", globalNode.Spec.State, globalNode.Name) + return nil + } +} + func (r *GlobalNodeController) SyncState(ctx context.Context, globalNode *v1alpha1.GlobalNode) error { if globalNode.Spec.State == v1alpha1.NodeInUse { klog.V(4).Infof("global-node-controller: SyncState: node is in use %s, skip", globalNode.Name) @@ -96,7 +161,7 @@ func (r *GlobalNodeController) SyncLabel(ctx context.Context, globalNode *v1alph } else { vclist, err := r.KosmosClient.KosmosV1alpha1().VirtualClusters("").List(ctx, metav1.ListOptions{}) if err != nil { - klog.Errorf("global-node-controller: SyncState: cannot list virtual cluster, err: %s", globalNode.Name) + klog.Warningf("global-node-controller: SyncState: cannot list virtual cluster, err: %s", globalNode.Name) return err } var targetVirtualCluster v1alpha1.VirtualCluster @@ -106,9 +171,13 @@ func (r *GlobalNodeController) SyncLabel(ctx context.Context, globalNode *v1alph break } } + if targetVirtualCluster.Status.Phase != v1alpha1.Completed && targetVirtualCluster.Status.Phase != v1alpha1.AllNodeReady { + klog.Warningf("global-node-controller: SyncState: virtual cluster is not completed, err: %s", globalNode.Name) + return nil + } virtualClient, err := util.GenerateKubeclient(&targetVirtualCluster) if err != nil { - klog.Errorf("global-node-controller: SyncState: cannot generate kubeclient, err: %s", globalNode.Name) + klog.Warningf("global-node-controller: SyncState: cannot generate kubeclient, err: %s", globalNode.Name) return err } @@ -156,19 +225,25 @@ func (r *GlobalNodeController) Reconcile(ctx context.Context, request reconcile. klog.Errorf("get global-node %s error: %v", request.NamespacedName, err) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } - if err := r.SyncState(ctx, &globalNode); err != nil { - klog.Errorf("sync State %s error: %v", request.NamespacedName, err) - return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil - } else { - klog.V(4).Infof("sync state successed, %s", request.NamespacedName) - } + // if err := r.SyncState(ctx, &globalNode); err != nil { + // klog.Errorf("sync State %s error: %v", request.NamespacedName, err) + // return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + // } else { + // klog.V(4).Infof("sync state successed, %s", request.NamespacedName) + // } if err := r.SyncLabel(ctx, &globalNode); err != nil { - klog.Errorf("sync label %s error: %v", request.NamespacedName, err) + klog.Warningf("sync label %s error: %v", request.NamespacedName, err) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } else { klog.V(4).Infof("sync label successed, %s", request.NamespacedName) } + if err := r.SyncTaint(ctx, &globalNode); err != nil { + klog.Errorf("sync taint %s error: %v", request.NamespacedName, err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } else { + klog.V(4).Infof("sync taint successed, %s", request.NamespacedName) + } return reconcile.Result{}, nil } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go index 7a5ca317b..c493dea59 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go @@ -26,7 +26,7 @@ func GetExectorWorkerDir() string { } func GetExectorShellName() string { - shellName := os.Getenv("EXECTOR_SHELL_VERSION") + shellName := os.Getenv("EXECTOR_SHELL_NAME") if len(shellName) == 0 { shellName = "kubelet_node_helper.sh" @@ -81,3 +81,11 @@ func GetDrainWaitSeconds() int { return num } + +func GetControlPlaneLabel() string { + controllPlaneLabel := os.Getenv("CONTROL_PLANE_LABEL") + if len(controllPlaneLabel) == 0 { + controllPlaneLabel = "node-role.kubernetes.io/control-plane" + } + return controllPlaneLabel +} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go index 0c71ec860..6c6a3fa57 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go @@ -50,7 +50,7 @@ func NewCheckEnvTask() Task { exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") // check checkCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s check", env.GetExectorShellName()), + Cmd: fmt.Sprintf("bash %s check", env.GetExectorShellName()), } ret := exectHelper.DoExector(ctx.Done(), checkCmd) if ret.Status != exector.SUCCESS { @@ -69,7 +69,7 @@ func NewKubeadmResetTask() Task { exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") resetCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s unjoin", env.GetExectorShellName()), + Cmd: fmt.Sprintf("bash %s unjoin", env.GetExectorShellName()), } ret := exectHelper.DoExector(ctx.Done(), resetCmd) @@ -235,7 +235,7 @@ func NewRemoteNodeJoinTask() Task { exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") joinCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s join %s", env.GetExectorShellName(), to.KubeDNSAddress), + Cmd: fmt.Sprintf("bash %s join %s", env.GetExectorShellName(), to.KubeDNSAddress), } ret := exectHelper.DoExector(ctx.Done(), joinCmd) if ret.Status != exector.SUCCESS { @@ -293,12 +293,12 @@ func NewUpdateVirtualNodeLabelsTask() Task { } updateNode := node.DeepCopy() - for k, v := range to.NodeInfo.Labels { - node.Labels[k] = v + for k, v := range to.NodeInfo.Spec.Labels { + updateNode.Labels[k] = v } // add free label - node.Labels[constants.StateLabelKey] = string(v1alpha1.NodeInUse) + updateNode.Labels[constants.StateLabelKey] = string(v1alpha1.NodeInUse) if _, err := to.VirtualK8sClient.CoreV1().Nodes().Update(ctx, updateNode, metav1.UpdateOptions{}); err != nil { klog.V(4).Infof("add label to node %s failed: %s", to.NodeInfo.Name, err) @@ -326,12 +326,12 @@ func NewUpdateHostNodeLabelsTask() Task { } updateNode := node.DeepCopy() - for k, v := range to.NodeInfo.Labels { - node.Labels[k] = v + for k, v := range to.NodeInfo.Spec.Labels { + updateNode.Labels[k] = v } // add free label - node.Labels[constants.StateLabelKey] = string(v1alpha1.NodeFreeState) + updateNode.Labels[constants.StateLabelKey] = string(v1alpha1.NodeFreeState) if _, err := to.HostK8sClient.CoreV1().Nodes().Update(ctx, updateNode, metav1.UpdateOptions{}); err != nil { klog.V(4).Infof("add label to node %s failed: %s", to.NodeInfo.Name, err) @@ -400,7 +400,7 @@ func NewExecShellUnjoinCmdTask() Task { exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") resetCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s unjoin", env.GetExectorShellName()), + Cmd: fmt.Sprintf("bash %s unjoin", env.GetExectorShellName()), } ret := exectHelper.DoExector(ctx.Done(), resetCmd) @@ -421,6 +421,18 @@ func getJoinCmdStr(log string) (string, error) { return fmt.Sprintf("kubeadm join %s", strs[1]), nil } +func getJoinCmdToken(joinCmdStr string) (string, error) { + strs := strings.Split(joinCmdStr, " --token ") + if len(strs) != 2 { + return "", fmt.Errorf("get join cmd token failed") + } + strs = strings.Split(strs[1], " ") + if len(strs) < 2 { + return "", fmt.Errorf("get join cmd token failed") + } + return strings.TrimSpace(strs[0]), nil +} + func NewJoinNodeToHostCmd() Task { return Task{ Name: "join node to host", @@ -464,8 +476,12 @@ func NewExecJoinNodeToHostCmdTask() Task { if !ok { return nil, fmt.Errorf("get join cmd str failed") } + token, err := getJoinCmdToken(joinCmdStr) + if err != nil { + return nil, err + } joinCmd := &exector.CMDExector{ - Cmd: joinCmdStr, + Cmd: fmt.Sprintf("bash %s revert %s", env.GetExectorShellName(), token), } exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "")