Skip to content

Commit

Permalink
feat: delete nodes that have already joined the virtual cluster from …
Browse files Browse the repository at this point in the history
…the host cluster

Signed-off-by: OrangeBao <[email protected]>
  • Loading branch information
OrangeBao committed Apr 28, 2024
1 parent 0cabcf9 commit 3458eb0
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package util
import (
"fmt"
"os"

"k8s.io/klog"
)

func GetExectorTmpPath() string {
Expand Down Expand Up @@ -38,18 +40,18 @@ func GetExectorShellPath() string {
}

func GetExectorHostMasterNodeIP() string {
// TODO: nil
return os.Getenv("EXECTOR_HOST_MASTER_NODE_IP")
hostIP := os.Getenv("EXECTOR_HOST_MASTER_NODE_IP")
if len(hostIP) == 0 {
klog.Fatal("EXECTOR_HOST_MASTER_NODE_IP is none")
}
return hostIP
}

// const username = "xxxxxxxx"
// const password = "xxxxxxxx"
// token = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password)))
// tobke = base64(`username:password`)
func GetExectorToken() string {
token := os.Getenv("EXECTOR_SHELL_TOKEN")
if len(token) == 0 {
// nolint
token = "YWRtaW46YmljaF9vb3NoMnpvaDZPaA=="
klog.Fatal("EXECTOR_SHELL_TOKEN is none")
}
return token
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package exector

import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/gorilla/websocket"
Expand All @@ -24,10 +20,15 @@ const (
FAILED
)

const (
NotFoundText = "127"
)

type ExectorReturn struct {
Status Status
Reason string
LastLog string
Text string
}

func (r *ExectorReturn) String() string {
Expand Down Expand Up @@ -62,8 +63,7 @@ type WebSocketOption struct {

func (h *ExectorHelper) DoExector(stopCh <-chan struct{}, exector Exector) *ExectorReturn {
ret := h.DoExectorReal(stopCh, exector)
// TODO: No such file or directory
if strings.Contains(ret.LastLog, "exit status 127") {
if ret.Text == NotFoundText {
// try to update shell script
srcFile := env.GetExectorShellPath()
klog.V(4).Infof("exector: src file path %s", srcFile)
Expand All @@ -86,7 +86,7 @@ func (h *ExectorHelper) DoExector(stopCh <-chan struct{}, exector Exector) *Exec
func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *ExectorReturn {
// default is error
result := &ExectorReturn{
FAILED, "init exector return status", "",
FAILED, "init exector return status", "", "",
}

// nolint
Expand All @@ -109,9 +109,15 @@ func (h *ExectorHelper) DoExectorReal(stopCh <-chan struct{}, exector Exector) *
if err != nil {
klog.V(4).Infof("read: %s", err)
cerr, ok := err.(*websocket.CloseError)
if ok && cerr.Text == "0" {
result.Status = SUCCESS
result.Reason = "success"
if ok {
if cerr.Text == "0" {
result.Status = SUCCESS
result.Reason = "success"
} else if cerr.Text == NotFoundText {
result.Status = FAILED
result.Reason = "command not found"
result.Text = cerr.Text
}
} else {
result.Reason = err.Error()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func (e *SCPExector) SendHandler(conn *websocket.Conn, done <-chan struct{}, int
}
defer file.Close()

// 指定每次读取的数据块大小
bufferSize := 1024 // 例如每次读取 1024 字节
bufferSize := 1024
buffer := make([]byte, bufferSize)

reader := bufio.NewReader(file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
"github.com/kosmos.io/kosmos/pkg/utils"
)
Expand Down Expand Up @@ -234,6 +235,14 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques
return reconcile.Result{}, nil
}

if !virtualCluster.GetDeletionTimestamp().IsZero() && len(virtualCluster.Spec.Kubeconfig) == 0 {
if err := r.DoNodeClean(ctx, virtualCluster); err != nil {
klog.Errorf("virtualcluster %s do node clean failed: %v", virtualCluster.Name, err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
return reconcile.Result{}, nil
}

if virtualCluster.Status.Phase == v1alpha1.Preparing {
klog.V(4).Infof("virtualcluster wait cluster ready, cluster name: %s", virtualCluster.Name)
return reconcile.Result{}, nil
Expand All @@ -247,6 +256,45 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques
return reconcile.Result{}, nil
}

func (r *NodeController) DoNodeClean(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error {
targetNodes := virtualCluster.Spec.PromoteResources.NodeInfos
globalNodes := &v1alpha1.GlobalNodeList{}

if err := r.Client.List(ctx, globalNodes); err != nil {
return fmt.Errorf("failed to list global nodes: %v", err)
}

cleanNodeInfos := []v1alpha1.GlobalNode{}

for _, targetNode := range targetNodes {
globalNode, ok := util.FindGlobalNode(targetNode.NodeName, globalNodes.Items)
if !ok {
return fmt.Errorf("global node %s not found", targetNode.NodeName)
}
cleanNodeInfos = append(cleanNodeInfos, *globalNode)
}

if err := r.cleanGlobalNode(ctx, cleanNodeInfos, virtualCluster, nil); err != nil {
return err
}
return nil
}

func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, _ kubernetes.Interface) error {
for _, nodeInfo := range nodeInfos {
if err := workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostK8sClient: r.Client,
// VirtualK8sClient: _,
}); err != nil {
return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err)
}
}

return nil
}

func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error {
if len(nodeInfos) == 0 {
return nil
Expand All @@ -261,7 +309,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob
}

for _, nodeInfo := range nodeInfos {
if err := workflow.NewJoinWorkerFlow().RunTask(ctx, workflow.TaskOpt{
if err := workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
KubeDNSAddress: clusterDNS,
Expand All @@ -276,7 +324,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob

func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error {
for _, nodeInfo := range nodeInfos {
if err := workflow.NewUnjoinworkerFlow().RunTask(ctx, workflow.TaskOpt{
if err := workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostK8sClient: r.Client,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflow
package task

import (
"context"
Expand All @@ -7,6 +7,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -58,6 +59,30 @@ func NewKubeadmResetTask() Task {
}
}

func NewCleanHostClusterNodeTask() Task {
return Task{
Name: "clean host cluster node",
Retry: true,
Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) {
targetNode := &v1.Node{}
if err := to.HostK8sClient.Get(ctx, types.NamespacedName{
Name: to.NodeInfo.Name,
}, targetNode); err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("get target node %s failed: %s", to.NodeInfo.Name, err)
}

if err := to.HostK8sClient.Delete(ctx, targetNode); err != nil {
return nil, err
}

return nil, nil
},
}
}

func NewReomteUploadCATask() Task {
return Task{
Name: "remote upload ca.crt",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package task

import (
"context"

"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
)

type TaskOpt struct {
NodeInfo v1alpha1.GlobalNode
VirtualCluster v1alpha1.VirtualCluster
KubeDNSAddress string

HostK8sClient client.Client
VirtualK8sClient kubernetes.Interface
}

type Task struct {
Name string
Run func(context.Context, TaskOpt, interface{}) (interface{}, error)
Retry bool
SubTasks []Task
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflow
package task

import (
"context"
Expand Down
Loading

0 comments on commit 3458eb0

Please sign in to comment.