From 3458eb02ac6219c158e08675527bfea146a3fc77 Mon Sep 17 00:00:00 2001 From: OrangeBao Date: Sun, 28 Apr 2024 14:54:55 +0800 Subject: [PATCH] feat: delete nodes that have already joined the virtual cluster from the host cluster Signed-off-by: OrangeBao --- .../virtualcluster.node.controller/env/env.go | 16 +-- .../exector/exector.go | 26 +++-- .../exector/remote_scp.go | 3 +- .../node_controller.go | 52 ++++++++- .../workflow/{join_tasks.go => task/join.go} | 27 ++++- .../workflow/task/task.go | 26 +++++ .../{unjoin_task.go => task/unjoin.go} | 2 +- .../workflow/workflow.go | 101 ++++++++---------- 8 files changed, 176 insertions(+), 77 deletions(-) rename pkg/kubenest/controller/virtualcluster.node.controller/workflow/{join_tasks.go => task/join.go} (91%) create mode 100644 pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go rename pkg/kubenest/controller/virtualcluster.node.controller/workflow/{unjoin_task.go => task/unjoin.go} (99%) diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go index 0d7c25d37..e6224aa03 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go @@ -3,6 +3,8 @@ package util import ( "fmt" "os" + + "k8s.io/klog" ) func GetExectorTmpPath() string { @@ -38,18 +40,18 @@ func GetExectorShellPath() string { } func GetExectorHostMasterNodeIP() string { - // TODO: nil - return os.Getenv("EXECTOR_HOST_MASTER_NODE_IP") + hostIP := os.Getenv("EXECTOR_HOST_MASTER_NODE_IP") + if len(hostIP) == 0 { + klog.Fatal("EXECTOR_HOST_MASTER_NODE_IP is none") + } + return hostIP } -// const username = "xxxxxxxx" -// const password = "xxxxxxxx" -// token = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password))) +// tobke = base64(`username:password`) func GetExectorToken() string { token := os.Getenv("EXECTOR_SHELL_TOKEN") if len(token) == 0 { - // nolint - token = "YWRtaW46YmljaF9vb3NoMnpvaDZPaA==" + klog.Fatal("EXECTOR_SHELL_TOKEN is none") } return token } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go b/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go index b45f9cde5..9e6cb4493 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go @@ -1,6 +1,3 @@ -// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. package exector import ( @@ -8,7 +5,6 @@ import ( "fmt" "net/http" "net/url" - "strings" "time" "github.com/gorilla/websocket" @@ -24,10 +20,15 @@ const ( FAILED ) +const ( + NotFoundText = "127" +) + type ExectorReturn struct { Status Status Reason string LastLog string + Text string } func (r *ExectorReturn) String() string { @@ -62,8 +63,7 @@ type WebSocketOption struct { func (h *ExectorHelper) DoExector(stopCh <-chan struct{}, exector Exector) *ExectorReturn { ret := h.DoExectorReal(stopCh, exector) - // TODO: No such file or directory - if strings.Contains(ret.LastLog, "exit status 127") { + if ret.Text == NotFoundText { // try to update shell script srcFile := env.GetExectorShellPath() klog.V(4).Infof("exector: src file path %s", srcFile) @@ -86,7 +86,7 @@ func (h *ExectorHelper) DoExector(stopCh <-chan struct{}, exector Exector) *Exec func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *ExectorReturn { // default is error result := &ExectorReturn{ - FAILED, "init exector return status", "", + FAILED, "init exector return status", "", "", } // nolint @@ -109,9 +109,15 @@ func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) * if err != nil { klog.V(4).Infof("read: %s", err) cerr, ok := err.(*websocket.CloseError) - if ok && cerr.Text == "0" { - result.Status = SUCCESS - result.Reason = "success" + if ok { + if cerr.Text == "0" { + result.Status = SUCCESS + result.Reason = "success" + } else if cerr.Text == NotFoundText { + result.Status = FAILED + result.Reason = "command not found" + result.Text = cerr.Text + } } else { result.Reason = err.Error() } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_scp.go b/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_scp.go index 7207d0b72..8ace685c7 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_scp.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_scp.go @@ -51,8 +51,7 @@ func (e *SCPExector) SendHandler(conn *websocket.Conn, done <-chan struct{}, int } defer file.Close() - // 指定每次读取的数据块大小 - bufferSize := 1024 // 例如每次读取 1024 字节 + bufferSize := 1024 buffer := make([]byte, bufferSize) reader := bufio.NewReader(file) diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go index 6793ab624..936c2bc8f 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go @@ -25,6 +25,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task" "github.com/kosmos.io/kosmos/pkg/kubenest/util" "github.com/kosmos.io/kosmos/pkg/utils" ) @@ -234,6 +235,14 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques return reconcile.Result{}, nil } + if !virtualCluster.GetDeletionTimestamp().IsZero() && len(virtualCluster.Spec.Kubeconfig) == 0 { + if err := r.DoNodeClean(ctx, virtualCluster); err != nil { + klog.Errorf("virtualcluster %s do node clean failed: %v", virtualCluster.Name, err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + return reconcile.Result{}, nil + } + if virtualCluster.Status.Phase == v1alpha1.Preparing { klog.V(4).Infof("virtualcluster wait cluster ready, cluster name: %s", virtualCluster.Name) return reconcile.Result{}, nil @@ -247,6 +256,45 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques return reconcile.Result{}, nil } +func (r *NodeController) DoNodeClean(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error { + targetNodes := virtualCluster.Spec.PromoteResources.NodeInfos + globalNodes := &v1alpha1.GlobalNodeList{} + + if err := r.Client.List(ctx, globalNodes); err != nil { + return fmt.Errorf("failed to list global nodes: %v", err) + } + + cleanNodeInfos := []v1alpha1.GlobalNode{} + + for _, targetNode := range targetNodes { + globalNode, ok := util.FindGlobalNode(targetNode.NodeName, globalNodes.Items) + if !ok { + return fmt.Errorf("global node %s not found", targetNode.NodeName) + } + cleanNodeInfos = append(cleanNodeInfos, *globalNode) + } + + if err := r.cleanGlobalNode(ctx, cleanNodeInfos, virtualCluster, nil); err != nil { + return err + } + return nil +} + +func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, _ kubernetes.Interface) error { + for _, nodeInfo := range nodeInfos { + if err := workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{ + NodeInfo: nodeInfo, + VirtualCluster: virtualCluster, + HostK8sClient: r.Client, + // VirtualK8sClient: _, + }); err != nil { + return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err) + } + } + + return nil +} + func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { if len(nodeInfos) == 0 { return nil @@ -261,7 +309,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob } for _, nodeInfo := range nodeInfos { - if err := workflow.NewJoinWorkerFlow().RunTask(ctx, workflow.TaskOpt{ + if err := workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{ NodeInfo: nodeInfo, VirtualCluster: virtualCluster, KubeDNSAddress: clusterDNS, @@ -276,7 +324,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { for _, nodeInfo := range nodeInfos { - if err := workflow.NewUnjoinworkerFlow().RunTask(ctx, workflow.TaskOpt{ + if err := workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{ NodeInfo: nodeInfo, VirtualCluster: virtualCluster, HostK8sClient: r.Client, diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/join_tasks.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/join.go similarity index 91% rename from pkg/kubenest/controller/virtualcluster.node.controller/workflow/join_tasks.go rename to pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/join.go index e440cfbd1..1ae56b0b7 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/join_tasks.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/join.go @@ -1,4 +1,4 @@ -package workflow +package task import ( "context" @@ -7,6 +7,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -58,6 +59,30 @@ func NewKubeadmResetTask() Task { } } +func NewCleanHostClusterNodeTask() Task { + return Task{ + Name: "clean host cluster node", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + targetNode := &v1.Node{} + if err := to.HostK8sClient.Get(ctx, types.NamespacedName{ + Name: to.NodeInfo.Name, + }, targetNode); err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("get target node %s failed: %s", to.NodeInfo.Name, err) + } + + if err := to.HostK8sClient.Delete(ctx, targetNode); err != nil { + return nil, err + } + + return nil, nil + }, + } +} + func NewReomteUploadCATask() Task { return Task{ Name: "remote upload ca.crt", diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go new file mode 100644 index 000000000..5376efb18 --- /dev/null +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go @@ -0,0 +1,26 @@ +package task + +import ( + "context" + + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" +) + +type TaskOpt struct { + NodeInfo v1alpha1.GlobalNode + VirtualCluster v1alpha1.VirtualCluster + KubeDNSAddress string + + HostK8sClient client.Client + VirtualK8sClient kubernetes.Interface +} + +type Task struct { + Name string + Run func(context.Context, TaskOpt, interface{}) (interface{}, error) + Retry bool + SubTasks []Task +} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/unjoin_task.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/unjoin.go similarity index 99% rename from pkg/kubenest/controller/virtualcluster.node.controller/workflow/unjoin_task.go rename to pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/unjoin.go index 48cef2cd1..1370738c9 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/unjoin_task.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/unjoin.go @@ -1,4 +1,4 @@ -package workflow +package task import ( "context" diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go index 29bb895a4..6f6f2d447 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go @@ -3,70 +3,22 @@ package workflow import ( "context" - "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task" ) -func NewJoinWorkerFlow() WorkflowData { - joinTasks := []Task{ - NewCheckEnvTask(), - NewKubeadmResetTask(), - NewReomteUploadCATask(), - NewRemoteUpdateKubeletConfTask(), - NewRemoteUpdateConfigYamlTask(), - NewRemoteNodeJoinTask(), - NewWaitNodeReadyTask(), - NewUpdateNodeLabelsTask(), - NewUpdateNodePoolItemStatusTask(v1alpha1.NodeInUse, false), - } - - return WorkflowData{ - Tasks: joinTasks, - } -} - -func NewUnjoinworkerFlow() WorkflowData { - unjoinTasks := []Task{ - NewCheckEnvTask(), - NewRemoveNodeFromVirtualTask(), - NewExecShellUnjoinCmdTask(), - NewJoinNodeToHostCmd(), - NewUpdateNodePoolItemStatusTask(v1alpha1.NodeFreeState, true), - } - return WorkflowData{ - Tasks: unjoinTasks, - } -} - const ( retryCount = 0 maxRetries = 3 ) type WorkflowData struct { - Tasks []Task -} - -type TaskOpt struct { - NodeInfo v1alpha1.GlobalNode - VirtualCluster v1alpha1.VirtualCluster - KubeDNSAddress string - - HostK8sClient client.Client - VirtualK8sClient kubernetes.Interface -} - -type Task struct { - Name string - Run func(context.Context, TaskOpt, interface{}) (interface{}, error) - Retry bool - SubTasks []Task + Tasks []task.Task } -func RunWithRetry(ctx context.Context, task Task, opt TaskOpt, preArgs interface{}) (interface{}, error) { +func RunWithRetry(ctx context.Context, task task.Task, opt task.TaskOpt, preArgs interface{}) (interface{}, error) { i := retryCount var err error var args interface{} @@ -87,13 +39,13 @@ func RunWithRetry(ctx context.Context, task Task, opt TaskOpt, preArgs interface return args, nil } -func (w WorkflowData) RunTask(ctx context.Context, opt TaskOpt) error { +func (w WorkflowData) RunTask(ctx context.Context, opt task.TaskOpt) error { var args interface{} for i, task := range w.Tasks { - klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run task %s HHHHHHHHHHHH", i, len(w.Tasks), task.Name) + klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run task %s HHHHHHHHHHHH", i+1, len(w.Tasks), task.Name) if len(task.SubTasks) > 0 { for j, subTask := range task.SubTasks { - klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run sub task %s HHHHHHHHHHHH", j, len(task.SubTasks), subTask.Name) + klog.V(4).Infof("HHHHHHHHHHHH (%d/%d) work flow run sub task %s HHHHHHHHHHHH", j+1, len(task.SubTasks), subTask.Name) if nextArgs, err := RunWithRetry(ctx, subTask, opt, args); err != nil { return err } else { @@ -110,3 +62,44 @@ func (w WorkflowData) RunTask(ctx context.Context, opt TaskOpt) error { } return nil } + +func NewJoinWorkFlow() WorkflowData { + joinTasks := []task.Task{ + task.NewCheckEnvTask(), + task.NewKubeadmResetTask(), + task.NewCleanHostClusterNodeTask(), + task.NewReomteUploadCATask(), + task.NewRemoteUpdateKubeletConfTask(), + task.NewRemoteUpdateConfigYamlTask(), + task.NewRemoteNodeJoinTask(), + task.NewWaitNodeReadyTask(), + task.NewUpdateNodeLabelsTask(), + task.NewUpdateNodePoolItemStatusTask(v1alpha1.NodeInUse, false), + } + + return WorkflowData{ + Tasks: joinTasks, + } +} + +func NewUnjoinWorkFlow() WorkflowData { + unjoinTasks := []task.Task{ + task.NewCheckEnvTask(), + task.NewRemoveNodeFromVirtualTask(), + task.NewExecShellUnjoinCmdTask(), + task.NewJoinNodeToHostCmd(), + task.NewUpdateNodePoolItemStatusTask(v1alpha1.NodeFreeState, true), + } + return WorkflowData{ + Tasks: unjoinTasks, + } +} + +func NewCleanNodeWorkFlow() WorkflowData { + cleanNodeTasks := []task.Task{ + task.NewUpdateNodePoolItemStatusTask(v1alpha1.NodeFreeState, true), + } + return WorkflowData{ + Tasks: cleanNodeTasks, + } +}