From 0386d051e521fdfcbdec3bf09060a5a8e8fca6c2 Mon Sep 17 00:00:00 2001 From: DrmagicE <379342542@qq.com> Date: Mon, 11 Oct 2021 10:08:08 +0800 Subject: [PATCH] feature: add ability for 'yurtctl convert/revert' to deploy/remove yurthub on cloud side. (#513) * feature: add ability for 'yurtctl convert' to deploy yurthub on cloud side * add 'yurtctl revert cloudnode' subcommand to revert cloud nodes --- pkg/yurtctl/cmd/convert/cloudnode.go | 70 +++++ pkg/yurtctl/cmd/convert/convert.go | 23 +- pkg/yurtctl/cmd/convert/edgenode.go | 380 +---------------------- pkg/yurtctl/cmd/convert/node.go | 443 +++++++++++++++++++++++++++ pkg/yurtctl/cmd/revert/cloudnode.go | 71 +++++ pkg/yurtctl/cmd/revert/edgenode.go | 257 +--------------- pkg/yurtctl/cmd/revert/node.go | 307 +++++++++++++++++++ pkg/yurtctl/cmd/revert/revert.go | 68 ++-- pkg/yurtctl/constants/constants.go | 4 +- pkg/yurtctl/util/edgenode/common.go | 1 + pkg/yurtctl/util/kubernetes/util.go | 6 +- 11 files changed, 963 insertions(+), 667 deletions(-) create mode 100644 pkg/yurtctl/cmd/convert/cloudnode.go create mode 100644 pkg/yurtctl/cmd/convert/node.go create mode 100644 pkg/yurtctl/cmd/revert/cloudnode.go create mode 100644 pkg/yurtctl/cmd/revert/node.go diff --git a/pkg/yurtctl/cmd/convert/cloudnode.go b/pkg/yurtctl/cmd/convert/cloudnode.go new file mode 100644 index 00000000000..8dbccebfe58 --- /dev/null +++ b/pkg/yurtctl/cmd/convert/cloudnode.go @@ -0,0 +1,70 @@ +/* +Copyright 2020 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package convert + +import ( + "strings" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "k8s.io/klog" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +// ConvertCloudNodeOptions has the information required by sub command convert cloudnode +type ConvertCloudNodeOptions struct { + ConvertNodeOptions +} + +// NewConvertCloudNodeOptions creates a new ConvertCloudNodeOptions +func NewConvertCloudNodeOptions() *ConvertCloudNodeOptions { + return &ConvertCloudNodeOptions{} +} + +// NewConvertCloudNodeCmd generates a new sub command convert cloudnode +func NewConvertCloudNodeCmd() *cobra.Command { + c := NewConvertCloudNodeOptions() + cmd := &cobra.Command{ + Use: "cloudnode", + Short: "Converts the kubernetes node to a yurt cloud node", + Run: func(cmd *cobra.Command, _ []string) { + if err := c.Complete(cmd.Flags()); err != nil { + klog.Fatalf("fail to complete the convert cloudnode option: %s", err) + } + if err := c.RunConvertNode(util.WorkingModeCloud); err != nil { + klog.Fatalf("fail to convert the kubernetes node to a yurt node: %s", err) + } + }, + } + cmd.Flags().StringP("cloud-nodes", "c", "", + "The list of cloud nodes wanted to be convert.(e.g. -e cloudnode1,cloudnode2)") + commonFlags(cmd) + return cmd +} + +// Complete completes all the required options. +func (c *ConvertCloudNodeOptions) Complete(flags *pflag.FlagSet) error { + enStr, err := flags.GetString("cloud-nodes") + if err != nil { + return err + } + if enStr != "" { + c.Nodes = strings.Split(enStr, ",") + } + return c.ConvertNodeOptions.Complete(flags) +} diff --git a/pkg/yurtctl/cmd/convert/convert.go b/pkg/yurtctl/cmd/convert/convert.go index b157d281ce9..54964fcef4f 100644 --- a/pkg/yurtctl/cmd/convert/convert.go +++ b/pkg/yurtctl/cmd/convert/convert.go @@ -24,7 +24,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" @@ -32,7 +31,6 @@ import ( bootstrapapi "k8s.io/cluster-bootstrap/token/api" "k8s.io/klog" clusterinfophase "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/clusterinfo" - nodeutil "k8s.io/kubernetes/pkg/controller/util/node" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurtctl/lock" @@ -108,7 +106,7 @@ func NewConvertCmd() *cobra.Command { } cmd.AddCommand(NewConvertEdgeNodeCmd()) - + cmd.AddCommand(NewConvertCloudNodeCmd()) cmd.Flags().StringP("cloud-nodes", "c", "", "The list of cloud nodes.(e.g. -c cloudnode1,cloudnode2)") cmd.Flags().StringP("provider", "p", "minikube", @@ -304,8 +302,7 @@ func (co *ConvertOptions) RunConvert() (err error) { } for _, node := range nodeLst.Items { if !strutil.IsInStringLst(co.CloudNodes, node.GetName()) || strutil.IsInStringLst(kcmNodeNames, node.GetName()) { - _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) - if condition == nil || condition.Status != v1.ConditionTrue { + if !isNodeReady(&node.Status) { klog.Errorf("Cannot do the convert, the status of worker node or kube-controller-manager node: %s is not 'Ready'.", node.Name) return } @@ -389,8 +386,6 @@ func (co *ConvertOptions) RunConvert() (err error) { return err } - // 9. deploy yurt-hub and reset the kubelet service - klog.Infof("deploying the yurt-hub and resetting the kubelet service...") joinToken, err := kubeutil.GetOrCreateJoinTokenString(co.clientSet) if err != nil { return err @@ -409,11 +404,23 @@ func (co *ConvertOptions) RunConvert() (err error) { ctx["yurthub_healthcheck_timeout"] = co.YurthubHealthCheckTimeout.String() } + // 9. deploy yurt-hub and reset the kubelet service on edge nodes. + klog.Infof("deploying the yurt-hub and resetting the kubelet service on edge nodes...") + ctx["sub_command"] = "edgenode" if err = kubeutil.RunServantJobs(co.clientSet, ctx, edgeNodeNames); err != nil { klog.Errorf("fail to run ServantJobs: %s", err) return } - klog.Info("complete deploying yurt-hub") + klog.Info("complete deploying yurt-hub on edge nodes") + + // 10. deploy yurt-hub and reset the kubelet service on cloud nodes + klog.Infof("deploying the yurt-hub and resetting the kubelet service on cloud nodes") + ctx["sub_command"] = "cloudnode" + if err = kubeutil.RunServantJobs(co.clientSet, ctx, co.CloudNodes); err != nil { + klog.Errorf("fail to run ServantJobs: %s", err) + return + } + klog.Info("complete deploying yurt-hub on cloud nodes") return } diff --git a/pkg/yurtctl/cmd/convert/edgenode.go b/pkg/yurtctl/cmd/convert/edgenode.go index b4bcf1ced85..6597f04cb3b 100644 --- a/pkg/yurtctl/cmd/convert/edgenode.go +++ b/pkg/yurtctl/cmd/convert/edgenode.go @@ -17,51 +17,18 @@ limitations under the License. package convert import ( - "context" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "os" - "os/exec" - "path/filepath" "strings" - "time" "github.com/spf13/cobra" "github.com/spf13/pflag" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" "k8s.io/klog" - nodeutil "k8s.io/kubernetes/pkg/controller/util/node" - "github.com/openyurtio/openyurt/pkg/projectinfo" - "github.com/openyurtio/openyurt/pkg/yurtctl/constants" - enutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/edgenode" - kubeutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/kubernetes" - strutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/strings" -) - -const ( - kubeletConfigRegularExpression = "\\-\\-kubeconfig=.*kubelet.conf" - apiserverAddrRegularExpression = "server: (http(s)?:\\/\\/)?[\\w][-\\w]{0,62}(\\.[\\w][-\\w]{0,62})*(:[\\d]{1,5})?" - hubHealthzCheckFrequency = 10 * time.Second - filemode = 0666 - dirmode = 0755 + "github.com/openyurtio/openyurt/pkg/yurthub/util" ) // ConvertEdgeNodeOptions has the information required by sub command convert edgenode type ConvertEdgeNodeOptions struct { - clientSet *kubernetes.Clientset - EdgeNodes []string - YurthubImage string - YurthubHealthCheckTimeout time.Duration - YurctlServantImage string - JoinToken string - KubeadmConfPath string - openyurtDir string + ConvertNodeOptions } // NewConvertEdgeNodeOptions creates a new ConvertEdgeNodeOptions @@ -74,12 +41,12 @@ func NewConvertEdgeNodeCmd() *cobra.Command { c := NewConvertEdgeNodeOptions() cmd := &cobra.Command{ Use: "edgenode", - Short: "Converts the kubernetes node to a yurt node", + Short: "Converts the kubernetes node to a yurt edge node", Run: func(cmd *cobra.Command, _ []string) { if err := c.Complete(cmd.Flags()); err != nil { klog.Fatalf("fail to complete the convert edgenode option: %s", err) } - if err := c.RunConvertEdgeNode(); err != nil { + if err := c.RunConvertNode(util.WorkingModeEdge); err != nil { klog.Fatalf("fail to convert the kubernetes node to a yurt node: %s", err) } }, @@ -87,15 +54,7 @@ func NewConvertEdgeNodeCmd() *cobra.Command { cmd.Flags().StringP("edge-nodes", "e", "", "The list of edge nodes wanted to be convert.(e.g. -e edgenode1,edgenode2)") - cmd.Flags().String("yurthub-image", "openyurt/yurthub:latest", - "The yurthub image.") - cmd.Flags().Duration("yurthub-healthcheck-timeout", defaultYurthubHealthCheckTimeout, - "The timeout for yurthub health check.") - cmd.Flags().String("yurtctl-servant-image", "openyurt/yurtctl-servant:latest", - "The yurtctl-servant image.") - cmd.Flags().String("kubeadm-conf-path", "", - "The path to kubelet service conf that is used by kubelet component to join the cluster on the edge node.") - cmd.Flags().String("join-token", "", "The token used by yurthub for joining the cluster.") + commonFlags(cmd) return cmd } @@ -107,332 +66,7 @@ func (c *ConvertEdgeNodeOptions) Complete(flags *pflag.FlagSet) error { return err } if enStr != "" { - c.EdgeNodes = strings.Split(enStr, ",") - } - - yurthubImage, err := flags.GetString("yurthub-image") - if err != nil { - return err - } - c.YurthubImage = yurthubImage - - yurthubHealthCheckTimeout, err := flags.GetDuration("yurthub-healthcheck-timeout") - if err != nil { - return err - } - c.YurthubHealthCheckTimeout = yurthubHealthCheckTimeout - - ycsi, err := flags.GetString("yurtctl-servant-image") - if err != nil { - return err - } - c.YurctlServantImage = ycsi - - kubeadmConfPath, err := flags.GetString("kubeadm-conf-path") - if err != nil { - return err - } - if kubeadmConfPath == "" { - kubeadmConfPath = os.Getenv("KUBELET_SVC") - } - if kubeadmConfPath == "" { - kubeadmConfPath = enutil.KubeletSvcPath - } - c.KubeadmConfPath = kubeadmConfPath - - c.clientSet, err = enutil.GenClientSet(flags) - if err != nil { - return err - } - - joinToken, err := flags.GetString("join-token") - if err != nil { - return err - } - if joinToken == "" { - joinToken, err = kubeutil.GetOrCreateJoinTokenString(c.clientSet) - if err != nil { - return err - } - } - c.JoinToken = joinToken - - openyurtDir := os.Getenv("OPENYURT_DIR") - if openyurtDir == "" { - openyurtDir = enutil.OpenyurtDir - } - c.openyurtDir = openyurtDir - - return nil -} - -// RunConvertEdgeNode converts a standard Kubernetes node to a Yurt node -func (c *ConvertEdgeNodeOptions) RunConvertEdgeNode() (err error) { - // 1. check the server version - if err = kubeutil.ValidateServerVersion(c.clientSet); err != nil { - return - } - klog.V(4).Info("the server version is valid") - - nodeName, err := enutil.GetNodeName(c.KubeadmConfPath) - if err != nil { - nodeName = "" - } - if len(c.EdgeNodes) > 1 || (len(c.EdgeNodes) == 1 && c.EdgeNodes[0] != nodeName) { - // 2 remote edgenode convert - nodeLst, err := c.clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) - if err != nil { - return err - } - - // 2.1. check the EdgeNodes and its label - var edgeNodeNames []string - for _, node := range nodeLst.Items { - _, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] - if !ok { - edgeNodeNames = append(edgeNodeNames, node.GetName()) - } - } - for _, edgeNode := range c.EdgeNodes { - if !strutil.IsInStringLst(edgeNodeNames, edgeNode) { - return fmt.Errorf("Cannot do the convert, the worker node: %s is not a Kubernetes node.", edgeNode) - } - } - - // 2.2. check the state of EdgeNodes - for _, node := range nodeLst.Items { - if strutil.IsInStringLst(c.EdgeNodes, node.GetName()) { - _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) - if condition == nil || condition.Status != v1.ConditionTrue { - return fmt.Errorf("Cannot do the convert, the status of worker node: %s is not 'Ready'.", node.Name) - } - } - } - - // 2.3. deploy yurt-hub and reset the kubelet service - ctx := map[string]string{ - "action": "convert", - "yurtctl_servant_image": c.YurctlServantImage, - "yurthub_image": c.YurthubImage, - "joinToken": c.JoinToken, - "kubeadm_conf_path": c.KubeadmConfPath, - } - - if c.YurthubHealthCheckTimeout != defaultYurthubHealthCheckTimeout { - ctx["yurthub_healthcheck_timeout"] = c.YurthubHealthCheckTimeout.String() - } - - if err = kubeutil.RunServantJobs(c.clientSet, ctx, c.EdgeNodes); err != nil { - klog.Errorf("fail to run ServantJobs: %s", err) - return err - } - } else if (len(c.EdgeNodes) == 0 && nodeName != "") || (len(c.EdgeNodes) == 1 && c.EdgeNodes[0] == nodeName) { - // 3. local edgenode convert - // 3.1. check if critical files exist - if _, err := enutil.FileExists(c.KubeadmConfPath); err != nil { - return err - } - - // 3.2. check the state of EdgeNodes - node, err := c.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - return err - } - _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) - if condition == nil || condition.Status != v1.ConditionTrue { - return fmt.Errorf("Cannot do the convert, the status of worker node: %s is not 'Ready'.", node.Name) - } - - // 3.3. check the label of EdgeNodes - _, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] - if ok { - return fmt.Errorf("Cannot do the convert, the worker node: %s is not a Kubernetes node.", node.Name) - } - - // 3.4. deploy yurt-hub and reset the kubelet service - err = c.SetupYurthub() - if err != nil { - return fmt.Errorf("fail to set up the yurthub pod: %v", err) - } - err = c.ResetKubelet() - if err != nil { - return fmt.Errorf("fail to reset the kubelet service: %v", err) - } - - // 3.5. label node as edge node - klog.Infof("mark %s as the edge-node", nodeName) - node, err = c.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - return err - } - node, err = kubeutil.LabelNode(c.clientSet, node, projectinfo.GetEdgeWorkerLabelKey(), "true") - if err != nil { - return err - } - - // 3.6. open the autonomous - klog.Infof("open the %s autonomous", nodeName) - _, err = kubeutil.AnnotateNode(c.clientSet, node, constants.AnnotationAutonomy, "true") - if err != nil { - return err - } - - } else { - return fmt.Errorf("fail to revert edge node, flag --edge-nodes %s err", c.EdgeNodes) - } - return nil -} - -// SetupYurthub sets up the yurthub pod and wait for the its status to be Running -func (c *ConvertEdgeNodeOptions) SetupYurthub() error { - // 1. put yurt-hub yaml into /etc/kubernetes/manifests - klog.Infof("setting up yurthub on node") - - // 1-1. get apiserver address - kubeletConfPath, err := enutil.GetSingleContentFromFile(c.KubeadmConfPath, kubeletConfigRegularExpression) - if err != nil { - return err - } - kubeletConfPath = strings.Split(kubeletConfPath, "=")[1] - apiserverAddr, err := enutil.GetSingleContentFromFile(kubeletConfPath, apiserverAddrRegularExpression) - if err != nil { - return err - } - apiserverAddr = strings.Split(apiserverAddr, " ")[1] - - // 1-2. replace variables in yaml file - klog.Infof("setting up yurthub apiserver addr") - yurthubTemplate := enutil.ReplaceRegularExpression(enutil.YurthubTemplate, - map[string]string{ - "__kubernetes_service_addr__": apiserverAddr, - "__yurthub_image__": c.YurthubImage, - "__join_token__": c.JoinToken, - }) - - // 1-3. create yurthub.yaml - podManifestPath := enutil.GetPodManifestPath() - if err = enutil.EnsureDir(podManifestPath); err != nil { - return err - } - err = ioutil.WriteFile(getYurthubYaml(podManifestPath), []byte(yurthubTemplate), filemode) - if err != nil { - return err - } - klog.Infof("create the %s/yurt-hub.yaml", podManifestPath) - - // 2. wait yurthub pod to be ready - err = hubHealthcheck(c.YurthubHealthCheckTimeout) - return err -} - -// ResetKubelet changes the configuration of the kubelet service and restart it -func (c *ConvertEdgeNodeOptions) ResetKubelet() error { - // 1. create a working dir to store revised kubelet.conf - err := os.MkdirAll(c.openyurtDir, dirmode) - if err != nil { - return err - } - fullpath := c.getYurthubKubeletConf() - err = ioutil.WriteFile(fullpath, []byte(enutil.OpenyurtKubeletConf), filemode) - if err != nil { - return err - } - klog.Infof("revised kubeconfig %s is generated", fullpath) - - // 2. revise the kubelet.service drop-in - // 2.1 make a backup for the origin kubelet.service - bkfile := c.getKubeletSvcBackup() - err = enutil.CopyFile(c.KubeadmConfPath, bkfile, 0666) - if err != nil { - return err - } - - // 2.2 revise the drop-in, point it to the $OPENYURT_DIR/kubelet.conf - contentbyte, err := ioutil.ReadFile(c.KubeadmConfPath) - if err != nil { - return err - } - kubeConfigSetup := fmt.Sprintf("--kubeconfig=%s/kubelet.conf", c.openyurtDir) - content := enutil.ReplaceRegularExpression(string(contentbyte), map[string]string{ - "--bootstrap.*bootstrap-kubelet.conf": "", - "--kubeconfig=.*kubelet.conf": kubeConfigSetup, - }) - err = ioutil.WriteFile(c.KubeadmConfPath, []byte(content), filemode) - if err != nil { - return err - } - klog.Info("kubelet.service drop-in file is revised") - - // 3. reset the kubelet.service - klog.Info(enutil.DaemonReload) - cmd := exec.Command("bash", "-c", enutil.DaemonReload) - if err := enutil.Exec(cmd); err != nil { - return err - } - klog.Info(enutil.RestartKubeletSvc) - cmd = exec.Command("bash", "-c", enutil.RestartKubeletSvc) - if err := enutil.Exec(cmd); err != nil { - return err - } - klog.Infof("kubelet has been restarted") - return nil -} - -func (c *ConvertEdgeNodeOptions) getYurthubKubeletConf() string { - return filepath.Join(c.openyurtDir, enutil.KubeletConfName) -} - -func (c *ConvertEdgeNodeOptions) getKubeletSvcBackup() string { - return fmt.Sprintf(enutil.KubeletSvcBackup, c.KubeadmConfPath) -} - -func getYurthubYaml(podManifestPath string) string { - return filepath.Join(podManifestPath, enutil.YurthubYamlName) -} - -// hubHealthcheck will check the status of yurthub pod -func hubHealthcheck(timeout time.Duration) error { - serverHealthzURL, err := url.Parse(fmt.Sprintf("http://%s", enutil.ServerHealthzServer)) - if err != nil { - return err + c.Nodes = strings.Split(enStr, ",") } - serverHealthzURL.Path = enutil.ServerHealthzURLPath - - start := time.Now() - return wait.PollImmediate(hubHealthzCheckFrequency, timeout, func() (bool, error) { - _, err := pingClusterHealthz(http.DefaultClient, serverHealthzURL.String()) - if err != nil { - klog.Infof("yurt-hub is not ready, ping cluster healthz with result: %v", err) - return false, nil - } - klog.Infof("yurt-hub healthz is OK after %f seconds", time.Since(start).Seconds()) - return true, nil - }) -} - -func pingClusterHealthz(client *http.Client, addr string) (bool, error) { - if client == nil { - return false, fmt.Errorf("http client is invalid") - } - - resp, err := client.Get(addr) - if err != nil { - return false, err - } - - b, err := ioutil.ReadAll(resp.Body) - defer resp.Body.Close() - if err != nil { - return false, fmt.Errorf("failed to read response of cluster healthz, %v", err) - } - - if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("response status code is %d", resp.StatusCode) - } - - if strings.ToLower(string(b)) != "ok" { - return false, fmt.Errorf("cluster healthz is %s", string(b)) - } - - return true, nil + return c.ConvertNodeOptions.Complete(flags) } diff --git a/pkg/yurtctl/cmd/convert/node.go b/pkg/yurtctl/cmd/convert/node.go new file mode 100644 index 00000000000..a4c92127616 --- /dev/null +++ b/pkg/yurtctl/cmd/convert/node.go @@ -0,0 +1,443 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package convert + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" + + nodeutil "github.com/openyurtio/openyurt/pkg/controller/util/node" + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurtctl/constants" + enutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/edgenode" + kubeutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/kubernetes" + strutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/strings" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +const ( + kubeletConfigRegularExpression = "\\-\\-kubeconfig=.*kubelet.conf" + apiserverAddrRegularExpression = "server: (http(s)?:\\/\\/)?[\\w][-\\w]{0,62}(\\.[\\w][-\\w]{0,62})*(:[\\d]{1,5})?" + hubHealthzCheckFrequency = 10 * time.Second + filemode = 0666 + dirmode = 0755 +) + +// ConvertNodeOptions has the information required by sub command convert edgenode and convert cloudnode +type ConvertNodeOptions struct { + clientSet *kubernetes.Clientset + Nodes []string + YurthubImage string + YurthubHealthCheckTimeout time.Duration + YurctlServantImage string + JoinToken string + KubeadmConfPath string + openyurtDir string +} + +// commonFlags sets all common flags. +func commonFlags(cmd *cobra.Command) { + cmd.Flags().String("yurthub-image", "openyurt/yurthub:latest", + "The yurthub image.") + cmd.Flags().Duration("yurthub-healthcheck-timeout", defaultYurthubHealthCheckTimeout, + "The timeout for yurthub health check.") + cmd.Flags().String("yurtctl-servant-image", "openyurt/yurtctl-servant:latest", + "The yurtctl-servant image.") + cmd.Flags().String("kubeadm-conf-path", "", + "The path to kubelet service conf that is used by kubelet component to join the cluster on the edge node.") + cmd.Flags().String("join-token", "", "The token used by yurthub for joining the cluster.") +} + +// Complete completes all the required options. +func (c *ConvertNodeOptions) Complete(flags *pflag.FlagSet) error { + yurthubImage, err := flags.GetString("yurthub-image") + if err != nil { + return err + } + c.YurthubImage = yurthubImage + + yurthubHealthCheckTimeout, err := flags.GetDuration("yurthub-healthcheck-timeout") + if err != nil { + return err + } + c.YurthubHealthCheckTimeout = yurthubHealthCheckTimeout + + ycsi, err := flags.GetString("yurtctl-servant-image") + if err != nil { + return err + } + c.YurctlServantImage = ycsi + + kubeadmConfPath, err := flags.GetString("kubeadm-conf-path") + if err != nil { + return err + } + if kubeadmConfPath == "" { + kubeadmConfPath = os.Getenv("KUBELET_SVC") + } + if kubeadmConfPath == "" { + kubeadmConfPath = enutil.KubeletSvcPath + } + c.KubeadmConfPath = kubeadmConfPath + + c.clientSet, err = enutil.GenClientSet(flags) + if err != nil { + return err + } + + joinToken, err := flags.GetString("join-token") + if err != nil { + return err + } + if joinToken == "" { + joinToken, err = kubeutil.GetOrCreateJoinTokenString(c.clientSet) + if err != nil { + return err + } + } + c.JoinToken = joinToken + + openyurtDir := os.Getenv("OPENYURT_DIR") + if openyurtDir == "" { + openyurtDir = enutil.OpenyurtDir + } + c.openyurtDir = openyurtDir + + return nil +} + +// SetupYurthub sets up the yurthub pod and wait for the its status to be Running. +func (c *ConvertNodeOptions) SetupYurthub(workingMode util.WorkingMode) error { + // 1. put yurt-hub yaml into /etc/kubernetes/manifests + klog.Infof("setting up yurthub on node") + + // 1-1. get apiserver address + kubeletConfPath, err := enutil.GetSingleContentFromFile(c.KubeadmConfPath, kubeletConfigRegularExpression) + if err != nil { + return err + } + kubeletConfPath = strings.Split(kubeletConfPath, "=")[1] + apiserverAddr, err := enutil.GetSingleContentFromFile(kubeletConfPath, apiserverAddrRegularExpression) + if err != nil { + return err + } + apiserverAddr = strings.Split(apiserverAddr, " ")[1] + + // 1-2. replace variables in yaml file + klog.Infof("setting up yurthub apiserver addr") + yurthubTemplate := enutil.ReplaceRegularExpression(enutil.YurthubTemplate, + map[string]string{ + "__kubernetes_service_addr__": apiserverAddr, + "__yurthub_image__": c.YurthubImage, + "__join_token__": c.JoinToken, + "__working_mode__": string(workingMode), + }) + + // 1-3. create yurthub.yaml + podManifestPath := enutil.GetPodManifestPath() + if err != nil { + return err + } + if err = enutil.EnsureDir(podManifestPath); err != nil { + return err + } + err = ioutil.WriteFile(getYurthubYaml(podManifestPath), []byte(yurthubTemplate), filemode) + if err != nil { + return err + } + klog.Infof("create the %s/yurt-hub.yaml", podManifestPath) + + // 2. wait yurthub pod to be ready + err = hubHealthcheck(c.YurthubHealthCheckTimeout) + return err +} + +// ResetKubelet changes the configuration of the kubelet service and restart it +func (c *ConvertNodeOptions) ResetKubelet() error { + // 1. create a working dir to store revised kubelet.conf + err := os.MkdirAll(c.openyurtDir, dirmode) + if err != nil { + return err + } + fullpath := c.getYurthubKubeletConf() + err = ioutil.WriteFile(fullpath, []byte(enutil.OpenyurtKubeletConf), filemode) + if err != nil { + return err + } + klog.Infof("revised kubeconfig %s is generated", fullpath) + + // 2. revise the kubelet.service drop-in + // 2.1 make a backup for the origin kubelet.service + bkfile := c.getKubeletSvcBackup() + err = enutil.CopyFile(c.KubeadmConfPath, bkfile, 0666) + if err != nil { + return err + } + + // 2.2 revise the drop-in, point it to the $OPENYURT_DIR/kubelet.conf + contentbyte, err := ioutil.ReadFile(c.KubeadmConfPath) + if err != nil { + return err + } + kubeConfigSetup := fmt.Sprintf("--kubeconfig=%s/kubelet.conf", c.openyurtDir) + content := enutil.ReplaceRegularExpression(string(contentbyte), map[string]string{ + "--bootstrap.*bootstrap-kubelet.conf": "", + "--kubeconfig=.*kubelet.conf": kubeConfigSetup, + }) + err = ioutil.WriteFile(c.KubeadmConfPath, []byte(content), filemode) + if err != nil { + return err + } + klog.Info("kubelet.service drop-in file is revised") + + // 3. reset the kubelet.service + klog.Info(enutil.DaemonReload) + cmd := exec.Command("bash", "-c", enutil.DaemonReload) + if err := enutil.Exec(cmd); err != nil { + return err + } + klog.Info(enutil.RestartKubeletSvc) + cmd = exec.Command("bash", "-c", enutil.RestartKubeletSvc) + if err := enutil.Exec(cmd); err != nil { + return err + } + klog.Infof("kubelet has been restarted") + return nil +} + +func (c *ConvertNodeOptions) getYurthubKubeletConf() string { + return filepath.Join(c.openyurtDir, enutil.KubeletConfName) +} + +func (c *ConvertNodeOptions) getKubeletSvcBackup() string { + return fmt.Sprintf(enutil.KubeletSvcBackup, c.KubeadmConfPath) +} + +func getYurthubYaml(podManifestPath string) string { + return filepath.Join(podManifestPath, enutil.YurthubYamlName) +} + +// hubHealthcheck will check the status of yurthub pod +func hubHealthcheck(timeout time.Duration) error { + serverHealthzURL, err := url.Parse(fmt.Sprintf("http://%s", enutil.ServerHealthzServer)) + if err != nil { + return err + } + serverHealthzURL.Path = enutil.ServerHealthzURLPath + + start := time.Now() + return wait.PollImmediate(hubHealthzCheckFrequency, timeout, func() (bool, error) { + _, err := pingClusterHealthz(http.DefaultClient, serverHealthzURL.String()) + if err != nil { + klog.Infof("yurt-hub is not ready, ping cluster healthz with result: %v", err) + return false, nil + } + klog.Infof("yurt-hub healthz is OK after %f seconds", time.Since(start).Seconds()) + return true, nil + }) +} + +func pingClusterHealthz(client *http.Client, addr string) (bool, error) { + if client == nil { + return false, fmt.Errorf("http client is invalid") + } + + resp, err := client.Get(addr) + if err != nil { + return false, err + } + + b, err := ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + if err != nil { + return false, fmt.Errorf("failed to read response of cluster healthz, %v", err) + } + + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("response status code is %d", resp.StatusCode) + } + + if strings.ToLower(string(b)) != "ok" { + return false, fmt.Errorf("cluster healthz is %s", string(b)) + } + + return true, nil +} + +func isNodeReady(status *v1.NodeStatus) bool { + _, condition := nodeutil.GetNodeCondition(status, v1.NodeReady) + return condition != nil && condition.Status == v1.ConditionTrue +} + +// RunConvertNode converts a standard Kubernetes node to a Yurt node +func (c *ConvertNodeOptions) RunConvertNode(workingMode util.WorkingMode) (err error) { + // 1. check the server version + if err = kubeutil.ValidateServerVersion(c.clientSet); err != nil { + return + } + klog.V(4).Info("the server version is valid") + + nodeName, err := enutil.GetNodeName(c.KubeadmConfPath) + if err != nil { + nodeName = "" + } + if len(c.Nodes) > 1 || (len(c.Nodes) == 1 && c.Nodes[0] != nodeName) { + // dispatch servant job to remote nodes + err = c.dispatchServantJobs(workingMode) + } else if (len(c.Nodes) == 0 && nodeName != "") || (len(c.Nodes) == 1 && c.Nodes[0] == nodeName) { + // convert local node + err = c.convertLocalNode(nodeName, workingMode) + } else { + return fmt.Errorf("fail to convert node, flag --edge-nodes or --cloud-nodes %s err", c.Nodes) + } + return err +} + +// dispatchServantJobs dispatches servant job to remote nodes. +func (c *ConvertNodeOptions) dispatchServantJobs(workingMode util.WorkingMode) error { + nodeLst, err := c.clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + if err != nil { + return err + } + + // 1. check the nodes and its label + var nodeNames []string + for _, node := range nodeLst.Items { + labelValue, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + // The label of edge nodes will be set by the servant job that we are going to dispatch here, + // so we assume that when dispatching servant job to edge nodes, the label should not be set. + if workingMode == util.WorkingModeEdge && !ok { + nodeNames = append(nodeNames, node.GetName()) + } + // The label of cloud nodes is usually already set by yurtctl convert before dispatching the servant job. + if workingMode == util.WorkingModeCloud && labelValue == "false" { + nodeNames = append(nodeNames, node.GetName()) + } + } + + for _, node := range c.Nodes { + if !strutil.IsInStringLst(nodeNames, node) { + return fmt.Errorf("Cannot do the convert, the node: %s is not a Kubernetes node.", node) + } + } + + // 2. check the state of nodes + for _, node := range nodeLst.Items { + if strutil.IsInStringLst(c.Nodes, node.GetName()) { + if !isNodeReady(&node.Status) { + return fmt.Errorf("Cannot do the convert, the status of node: %s is not 'Ready'.", node.Name) + } + } + } + + // 3. deploy yurt-hub and reset the kubelet service + ctx := map[string]string{ + "action": "convert", + "yurtctl_servant_image": c.YurctlServantImage, + "yurthub_image": c.YurthubImage, + "joinToken": c.JoinToken, + "kubeadm_conf_path": c.KubeadmConfPath, + } + + switch workingMode { + case util.WorkingModeCloud: + ctx["sub_command"] = "cloudnode" + case util.WorkingModeEdge: + ctx["sub_command"] = "edgenode" + } + if c.YurthubHealthCheckTimeout != defaultYurthubHealthCheckTimeout { + ctx["yurthub_healthcheck_timeout"] = c.YurthubHealthCheckTimeout.String() + } + + if err = kubeutil.RunServantJobs(c.clientSet, ctx, c.Nodes); err != nil { + klog.Errorf("fail to run ServantJobs: %s", err) + return err + } + return nil +} + +// convertLocalNode converts the local node +func (c *ConvertNodeOptions) convertLocalNode(nodeName string, workingMode util.WorkingMode) error { + // 1. check if critical files exist + if _, err := enutil.FileExists(c.KubeadmConfPath); err != nil { + return err + } + + // 2. check the state of node + node, err := c.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + if !isNodeReady(&node.Status) { + return fmt.Errorf("Cannot do the convert, the status of node: %s is not 'Ready'.", node.Name) + } + + // 3. check the label of node + labelVal, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + if workingMode == util.WorkingModeEdge && ok { + return fmt.Errorf("Cannot do the convert, the edge node: %s is not a Kubernetes node.", node.Name) + } + if workingMode == util.WorkingModeCloud && labelVal != "false" { + return fmt.Errorf("Cannot do the convert, the cloud node: %s is not a Kubernetes node.", node.Name) + } + + // 4. deploy yurt-hub and reset the kubelet service + err = c.SetupYurthub(workingMode) + if err != nil { + return fmt.Errorf("fail to set up the yurthub pod: %v", err) + } + err = c.ResetKubelet() + if err != nil { + return fmt.Errorf("fail to reset the kubelet service: %v", err) + } + + if workingMode == util.WorkingModeEdge { + // 5. label node as edge node + klog.Infof("mark %s as the edge-node", nodeName) + node, err = c.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + node, err = kubeutil.LabelNode(c.clientSet, node, projectinfo.GetEdgeWorkerLabelKey(), "true") + if err != nil { + return err + } + // 6. open the autonomous + klog.Infof("open the %s autonomous", nodeName) + _, err = kubeutil.AnnotateNode(c.clientSet, node, constants.AnnotationAutonomy, "true") + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/yurtctl/cmd/revert/cloudnode.go b/pkg/yurtctl/cmd/revert/cloudnode.go new file mode 100644 index 00000000000..8c404735255 --- /dev/null +++ b/pkg/yurtctl/cmd/revert/cloudnode.go @@ -0,0 +1,71 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package revert + +import ( + "github.com/spf13/pflag" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" + + "strings" + + "github.com/spf13/cobra" + "k8s.io/klog" +) + +// RevertCloudNodeOptions has the information required by sub command revert cloudnode +type RevertCloudNodeOptions struct { + RevertNodeOptions +} + +// NewRevertCloudNodeOptions creates a new RevertCloudNodeOptions +func NewRevertCloudNodeOptions() *RevertCloudNodeOptions { + return &RevertCloudNodeOptions{} +} + +// NewRevertCloudNodeCmd generates a new sub command revert edgenode and cloudnode +func NewRevertCloudNodeCmd() *cobra.Command { + r := NewRevertCloudNodeOptions() + cmd := &cobra.Command{ + Use: "cloudnode", + Short: "reverts the yurt cloud node to a kubernetes node", + Run: func(cmd *cobra.Command, _ []string) { + if err := r.Complete(cmd.Flags()); err != nil { + klog.Fatalf("fail to complete the revert cloudnode option: %s", err) + } + if err := r.RunRevertNode(util.WorkingModeCloud); err != nil { + klog.Fatalf("fail to revert the yurt node to a kubernetes node: %s", err) + } + }, + } + cmd.Flags().StringP("cloud-nodes", "e", "", + "The list of edge nodes wanted to be revert.(e.g. -e cloudnode1,cloudnode2)") + commonFlags(cmd) + return cmd +} + +// Complete completes all the required options +func (r *RevertCloudNodeOptions) Complete(flags *pflag.FlagSet) error { + enStr, err := flags.GetString("cloud-nodes") + if err != nil { + return err + } + if enStr != "" { + r.Nodes = strings.Split(enStr, ",") + } + return r.RevertNodeOptions.Complete(flags) +} diff --git a/pkg/yurtctl/cmd/revert/edgenode.go b/pkg/yurtctl/cmd/revert/edgenode.go index 8711481b855..f4da4e62877 100644 --- a/pkg/yurtctl/cmd/revert/edgenode.go +++ b/pkg/yurtctl/cmd/revert/edgenode.go @@ -17,36 +17,19 @@ limitations under the License. package revert import ( - "context" - "fmt" - "os" - "os/exec" - "path/filepath" + "github.com/spf13/pflag" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" + "strings" "github.com/spf13/cobra" - "github.com/spf13/pflag" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/klog" - nodeutil "k8s.io/kubernetes/pkg/controller/util/node" - - "github.com/openyurtio/openyurt/pkg/projectinfo" - "github.com/openyurtio/openyurt/pkg/yurtctl/constants" - enutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/edgenode" - kubeutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/kubernetes" - strutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/strings" - "github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself" ) // RevertEdgeNodeOptions has the information required by sub command revert edgenode type RevertEdgeNodeOptions struct { - clientSet *kubernetes.Clientset - EdgeNodes []string - YurtctlServantImage string - KubeadmConfPath string - openyurtDir string + RevertNodeOptions } // NewRevertEdgeNodeOptions creates a new RevertEdgeNodeOptions @@ -59,24 +42,19 @@ func NewRevertEdgeNodeCmd() *cobra.Command { r := NewRevertEdgeNodeOptions() cmd := &cobra.Command{ Use: "edgenode", - Short: "reverts the yurt node to a kubernetes node", + Short: "reverts the yurt edge node to a kubernetes node", Run: func(cmd *cobra.Command, _ []string) { if err := r.Complete(cmd.Flags()); err != nil { klog.Fatalf("fail to complete the revert edgenode option: %s", err) } - if err := r.RunRevertEdgeNode(); err != nil { + if err := r.RunRevertNode(util.WorkingModeEdge); err != nil { klog.Fatalf("fail to revert the yurt node to a kubernetes node: %s", err) } }, } - cmd.Flags().StringP("edge-nodes", "e", "", "The list of edge nodes wanted to be revert.(e.g. -e edgenode1,edgenode2)") - cmd.Flags().String("yurtctl-servant-image", "openyurt/yurtctl-servant:latest", - "The yurtctl-servant image.") - cmd.Flags().String("kubeadm-conf-path", "", - "The path to kubelet service conf that is used by kubelet component to join the cluster on the edge node.") - + commonFlags(cmd) return cmd } @@ -87,222 +65,7 @@ func (r *RevertEdgeNodeOptions) Complete(flags *pflag.FlagSet) (err error) { return err } if enStr != "" { - r.EdgeNodes = strings.Split(enStr, ",") + r.Nodes = strings.Split(enStr, ",") } - - ycsi, err := flags.GetString("yurtctl-servant-image") - if err != nil { - return err - } - r.YurtctlServantImage = ycsi - - kubeadmConfPath, err := flags.GetString("kubeadm-conf-path") - if err != nil { - return err - } - if kubeadmConfPath == "" { - kubeadmConfPath = os.Getenv("KUBELET_SVC") - } - if kubeadmConfPath == "" { - kubeadmConfPath = enutil.KubeletSvcPath - } - r.KubeadmConfPath = kubeadmConfPath - - r.clientSet, err = enutil.GenClientSet(flags) - if err != nil { - return err - } - - openyurtDir := os.Getenv("OPENYURT_DIR") - if openyurtDir == "" { - openyurtDir = enutil.OpenyurtDir - } - r.openyurtDir = openyurtDir - - return -} - -// RunRevertEdgeNode reverts the target Yurt node back to a standard Kubernetes node -func (r *RevertEdgeNodeOptions) RunRevertEdgeNode() (err error) { - // 1. check the server version - if err = kubeutil.ValidateServerVersion(r.clientSet); err != nil { - return - } - klog.V(4).Info("the server version is valid") - - nodeName, err := enutil.GetNodeName(r.KubeadmConfPath) - if err != nil { - nodeName = "" - } - if len(r.EdgeNodes) > 1 || (len(r.EdgeNodes) == 1 && r.EdgeNodes[0] != nodeName) { - // 2. remote edgenode revert - nodeLst, err := r.clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) - if err != nil { - return err - } - - // 2.1. check the EdgeNodes and its label - var edgeNodeNames []string - for _, node := range nodeLst.Items { - isEdgeNode, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] - if ok && isEdgeNode == "true" { - edgeNodeNames = append(edgeNodeNames, node.GetName()) - } - } - for _, edgeNode := range r.EdgeNodes { - if !strutil.IsInStringLst(edgeNodeNames, edgeNode) { - klog.Errorf("Cannot do the revert, the worker node: %s is not a Yurt edge node.", edgeNode) - return err - } - } - - // 2.2. check the state of EdgeNodes - for _, node := range nodeLst.Items { - if strutil.IsInStringLst(r.EdgeNodes, node.GetName()) { - _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) - if condition == nil || condition.Status != v1.ConditionTrue { - klog.Errorf("Cannot do the revert, the status of worker node: %s is not 'Ready'.", node.Name) - return err - } - } - } - - // 2.3. remove yurt-hub and revert kubelet service - if err = kubeutil.RunServantJobs(r.clientSet, - map[string]string{ - "action": "revert", - "yurtctl_servant_image": r.YurtctlServantImage, - "kubeadm_conf_path": r.KubeadmConfPath, - }, - r.EdgeNodes); err != nil { - klog.Errorf("fail to revert edge node: %s", err) - return err - } - } else if (len(r.EdgeNodes) == 0 && nodeName != "") || (len(r.EdgeNodes) == 1 && r.EdgeNodes[0] == nodeName) { - // 3. local edgenode revert - // 3.1. check if critical files exist - yurtKubeletConf := r.getYurthubKubeletConf() - if ok, err := enutil.FileExists(yurtKubeletConf); !ok { - return err - } - kubeletSvcBk := r.getKubeletSvcBackup() - if _, err := enutil.FileExists(kubeletSvcBk); err != nil { - klog.Errorf("fail to get file %s, should revise the %s directly", kubeletSvcBk, r.KubeadmConfPath) - return err - } - podManifestPath := enutil.GetPodManifestPath() - yurthubYamlPath := getYurthubYaml(podManifestPath) - if ok, err := enutil.FileExists(yurthubYamlPath); !ok { - return err - } - - // 3.2. check the state of EdgeNodes - node, err := r.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - return err - } - _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) - if condition == nil || condition.Status != v1.ConditionTrue { - klog.Errorf("Cannot do the revert, the status of worker node: %s is not 'Ready'.", node.Name) - return err - } - - // 3.3. check the label of EdgeNodes - isEdgeNode, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] - if !ok || isEdgeNode == "false" { - return fmt.Errorf("Cannot do the revert, the worker node: %s is not a Yurt edge node.", node.Name) - } - - // 3.4. remove yurt-hub and revert kubelet service - if err := r.RevertKubelet(); err != nil { - return fmt.Errorf("fail to revert kubelet: %v", err) - } - if err := r.RemoveYurthub(yurthubYamlPath); err != nil { - return err - } - - // 3.5. remove label of EdgeNodes - node, err = r.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - return err - } - _, foundAutonomy := node.Annotations[constants.AnnotationAutonomy] - if foundAutonomy { - delete(node.Annotations, constants.AnnotationAutonomy) - } - delete(node.Labels, projectinfo.GetEdgeWorkerLabelKey()) - if _, err = r.clientSet.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}); err != nil { - return err - } - klog.Info("label openyurt.io/is-edge-worker is removed") - - } else { - return fmt.Errorf("fail to revert edge node, flag --edge-nodes %s err", r.EdgeNodes) - } - - return nil -} - -// RevertKubelet resets the kubelet service -func (r *RevertEdgeNodeOptions) RevertKubelet() error { - // 1. remove openyurt's kubelet.conf if exist - yurtKubeletConf := r.getYurthubKubeletConf() - if err := os.Remove(yurtKubeletConf); err != nil { - return err - } - kubeletSvcBk := r.getKubeletSvcBackup() - klog.Infof("found backup file %s, will use it to revert the node", kubeletSvcBk) - err := os.Rename(kubeletSvcBk, r.KubeadmConfPath) - if err != nil { - return err - } - - // 2. reset the kubelet.service - klog.Info(enutil.DaemonReload) - cmd := exec.Command("bash", "-c", enutil.DaemonReload) - if err := enutil.Exec(cmd); err != nil { - return err - } - - klog.Info(enutil.RestartKubeletSvc) - cmd = exec.Command("bash", "-c", enutil.RestartKubeletSvc) - if err := enutil.Exec(cmd); err != nil { - return err - } - klog.Infof("kubelet has been reset back to default") - return nil -} - -// RemoveYurthub deletes the yurt-hub pod -func (r *RevertEdgeNodeOptions) RemoveYurthub(yurthubYamlPath string) error { - // 1. remove the yurt-hub.yaml to delete the yurt-hub - err := os.Remove(yurthubYamlPath) - if err != nil { - return err - } - - // 2. remove yurt-hub config directory and certificates in it - yurthubConf := getYurthubConf() - err = os.RemoveAll(yurthubConf) - if err != nil { - return err - } - klog.Infof("yurt-hub has been removed") - return nil -} - -func (r *RevertEdgeNodeOptions) getYurthubKubeletConf() string { - return filepath.Join(r.openyurtDir, enutil.KubeletConfName) -} - -func (r *RevertEdgeNodeOptions) getKubeletSvcBackup() string { - return fmt.Sprintf(enutil.KubeletSvcBackup, r.KubeadmConfPath) -} - -func getYurthubYaml(podManifestPath string) string { - return filepath.Join(podManifestPath, enutil.YurthubYamlName) -} - -func getYurthubConf() string { - return filepath.Join(hubself.HubRootDir, hubself.HubName) + return r.RevertNodeOptions.Complete(flags) } diff --git a/pkg/yurtctl/cmd/revert/node.go b/pkg/yurtctl/cmd/revert/node.go new file mode 100644 index 00000000000..f1991affe65 --- /dev/null +++ b/pkg/yurtctl/cmd/revert/node.go @@ -0,0 +1,307 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package revert + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" + + nodeutil "github.com/openyurtio/openyurt/pkg/controller/util/node" + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurtctl/constants" + enutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/edgenode" + kubeutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/kubernetes" + strutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/strings" + "github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +// RevertNodeOptions has the information required by sub command revert edgenode and revert cloudnode +type RevertNodeOptions struct { + clientSet *kubernetes.Clientset + Nodes []string + YurtctlServantImage string + KubeadmConfPath string + openyurtDir string +} + +// commonFlags sets all common flags. +func commonFlags(cmd *cobra.Command) { + cmd.Flags().String("yurtctl-servant-image", "openyurt/yurtctl-servant:latest", + "The yurtctl-servant image.") + cmd.Flags().String("kubeadm-conf-path", "", + "The path to kubelet service conf that is used by kubelet component to join the cluster on the edge node.") +} + +func isNodeReady(status *v1.NodeStatus) bool { + _, condition := nodeutil.GetNodeCondition(status, v1.NodeReady) + return condition != nil && condition.Status == v1.ConditionTrue +} + +// Complete completes all the required options +func (r *RevertNodeOptions) Complete(flags *pflag.FlagSet) (err error) { + ycsi, err := flags.GetString("yurtctl-servant-image") + if err != nil { + return err + } + r.YurtctlServantImage = ycsi + + kubeadmConfPath, err := flags.GetString("kubeadm-conf-path") + if err != nil { + return err + } + if kubeadmConfPath == "" { + kubeadmConfPath = os.Getenv("KUBELET_SVC") + } + if kubeadmConfPath == "" { + kubeadmConfPath = enutil.KubeletSvcPath + } + r.KubeadmConfPath = kubeadmConfPath + + r.clientSet, err = enutil.GenClientSet(flags) + if err != nil { + return err + } + + openyurtDir := os.Getenv("OPENYURT_DIR") + if openyurtDir == "" { + openyurtDir = enutil.OpenyurtDir + } + r.openyurtDir = openyurtDir + + return +} + +// RunRevertNode reverts the target Yurt node back to a standard Kubernetes node +func (r *RevertNodeOptions) RunRevertNode(workingMode util.WorkingMode) (err error) { + // 1. check the server version + if err = kubeutil.ValidateServerVersion(r.clientSet); err != nil { + return + } + klog.V(4).Info("the server version is valid") + + nodeName, err := enutil.GetNodeName(r.KubeadmConfPath) + if err != nil { + nodeName = "" + } + if len(r.Nodes) > 1 || (len(r.Nodes) == 1 && r.Nodes[0] != nodeName) { + // dispatch servant job to remote nodes + err = r.dispatchServantJobs(workingMode) + } else if (len(r.Nodes) == 0 && nodeName != "") || (len(r.Nodes) == 1 && r.Nodes[0] == nodeName) { + // convert local node + err = r.convertLocalNode(nodeName, workingMode) + } else { + return fmt.Errorf("fail to revert node, flag --edge-nodes or --cloud-nodes %s err", r.Nodes) + } + + return err +} + +// dispatchServantJobs dispatches servant job to remote nodes. +func (r *RevertNodeOptions) dispatchServantJobs(workingMode util.WorkingMode) error { + nodeLst, err := r.clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + if err != nil { + return err + } + + // 1. check the Nodes and its label + var nodeNames []string + for _, node := range nodeLst.Items { + isEdgeNode, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + if workingMode == util.WorkingModeEdge && isEdgeNode == "true" { + nodeNames = append(nodeNames, node.GetName()) + } + if workingMode == util.WorkingModeCloud && (!ok || isEdgeNode == "false") { + nodeNames = append(nodeNames, node.GetName()) + } + } + for _, node := range r.Nodes { + if !strutil.IsInStringLst(nodeNames, node) { + klog.Errorf("Cannot do the revert, the worker node: %s is not a Yurt edge node.", node) + return err + } + } + + // 2. check the state of Nodes + for _, node := range nodeLst.Items { + if strutil.IsInStringLst(r.Nodes, node.GetName()) { + if !isNodeReady(&node.Status) { + klog.Errorf("Cannot do the revert, the status of worker node: %s is not 'Ready'.", node.Name) + return err + } + } + } + + // 3. remove yurt-hub and revert kubelet service + ctx := map[string]string{ + "action": "revert", + "yurtctl_servant_image": r.YurtctlServantImage, + "kubeadm_conf_path": r.KubeadmConfPath, + } + + switch workingMode { + case util.WorkingModeCloud: + ctx["sub_command"] = "cloudnode" + case util.WorkingModeEdge: + ctx["sub_command"] = "edgenode" + } + + if err = kubeutil.RunServantJobs(r.clientSet, ctx, r.Nodes); err != nil { + klog.Errorf("fail to revert node: %s", err) + return err + } + return nil +} + +// convertLocalNode converts the local node +func (r *RevertNodeOptions) convertLocalNode(nodeName string, workingMode util.WorkingMode) error { + // 1. check if critical files exist + yurtKubeletConf := r.getYurthubKubeletConf() + if ok, err := enutil.FileExists(yurtKubeletConf); !ok { + return err + } + kubeletSvcBk := r.getKubeletSvcBackup() + if _, err := enutil.FileExists(kubeletSvcBk); err != nil { + klog.Errorf("fail to get file %s, should revise the %s directly", kubeletSvcBk, r.KubeadmConfPath) + return err + } + podManifestPath := enutil.GetPodManifestPath() + yurthubYamlPath := getYurthubYaml(podManifestPath) + if ok, err := enutil.FileExists(yurthubYamlPath); !ok { + return err + } + // 2. check the state of Nodes + node, err := r.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + if !isNodeReady(&node.Status) { + klog.Errorf("Cannot do the revert, the status of worker node: %s is not 'Ready'.", node.Name) + return err + } + + // 3. check the label of Nodes + isEdgeNode := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + if workingMode == util.WorkingModeEdge && isEdgeNode != "true" { + return fmt.Errorf("Cannot do the revert, the node: %s is not a Yurt edge node.", node.Name) + } + if workingMode == util.WorkingModeCloud && isEdgeNode == "true" { + return fmt.Errorf("Cannot do the revert, the node: %s is not a Yurt cloud node.", node.Name) + } + + // 3.4. remove yurt-hub and revert kubelet service + if err := r.RevertKubelet(); err != nil { + return fmt.Errorf("fail to revert kubelet: %v", err) + } + if err := r.RemoveYurthub(yurthubYamlPath); err != nil { + return err + } + + // 5. remove label of Nodes + node, err = r.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + _, foundAutonomy := node.Annotations[constants.AnnotationAutonomy] + if foundAutonomy { + delete(node.Annotations, constants.AnnotationAutonomy) + } + delete(node.Labels, projectinfo.GetEdgeWorkerLabelKey()) + if _, err = r.clientSet.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}); err != nil { + return err + } + klog.Infof("label %s is removed", projectinfo.GetEdgeWorkerLabelKey()) + if foundAutonomy { + klog.Infof("annotation %s is removed", constants.AnnotationAutonomy) + } + + return nil +} + +// RevertKubelet resets the kubelet service +func (r *RevertNodeOptions) RevertKubelet() error { + // 1. remove openyurt's kubelet.conf if exist + yurtKubeletConf := r.getYurthubKubeletConf() + if err := os.Remove(yurtKubeletConf); err != nil { + return err + } + kubeletSvcBk := r.getKubeletSvcBackup() + klog.Infof("found backup file %s, will use it to revert the node", kubeletSvcBk) + err := os.Rename(kubeletSvcBk, r.KubeadmConfPath) + if err != nil { + return err + } + + // 2. reset the kubelet.service + klog.Info(enutil.DaemonReload) + cmd := exec.Command("bash", "-c", enutil.DaemonReload) + if err := enutil.Exec(cmd); err != nil { + return err + } + + klog.Info(enutil.RestartKubeletSvc) + cmd = exec.Command("bash", "-c", enutil.RestartKubeletSvc) + if err := enutil.Exec(cmd); err != nil { + return err + } + klog.Infof("kubelet has been reset back to default") + return nil +} + +// RemoveYurthub deletes the yurt-hub pod +func (r *RevertNodeOptions) RemoveYurthub(yurthubYamlPath string) error { + // 1. remove the yurt-hub.yaml to delete the yurt-hub + err := os.Remove(yurthubYamlPath) + if err != nil { + return err + } + + // 2. remove yurt-hub config directory and certificates in it + yurthubConf := getYurthubConf() + err = os.RemoveAll(yurthubConf) + if err != nil { + return err + } + klog.Infof("yurt-hub has been removed") + return nil +} + +func (r *RevertNodeOptions) getYurthubKubeletConf() string { + return filepath.Join(r.openyurtDir, enutil.KubeletConfName) +} + +func (r *RevertNodeOptions) getKubeletSvcBackup() string { + return fmt.Sprintf(enutil.KubeletSvcBackup, r.KubeadmConfPath) +} + +func getYurthubYaml(podManifestPath string) string { + return filepath.Join(podManifestPath, enutil.YurthubYamlName) +} + +func getYurthubConf() string { + return filepath.Join(hubself.HubRootDir, hubself.HubName) +} diff --git a/pkg/yurtctl/cmd/revert/revert.go b/pkg/yurtctl/cmd/revert/revert.go index 4e23c61ffea..57a4cba6175 100644 --- a/pkg/yurtctl/cmd/revert/revert.go +++ b/pkg/yurtctl/cmd/revert/revert.go @@ -22,12 +22,10 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog" - nodeutil "k8s.io/kubernetes/pkg/controller/util/node" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurtctl/constants" @@ -45,7 +43,7 @@ type RevertOptions struct { KubeadmConfPath string } -// NewConvertOptions creates a new RevertOptions +// NewRevertOptions creates a new RevertOptions func NewRevertOptions() *RevertOptions { return &RevertOptions{} } @@ -67,6 +65,7 @@ func NewRevertCmd() *cobra.Command { } cmd.AddCommand(NewRevertEdgeNodeCmd()) + cmd.AddCommand(NewRevertCloudNodeCmd()) cmd.Flags().String("yurtctl-servant-image", "openyurt/yurtctl-servant:latest", @@ -134,8 +133,7 @@ func (ro *RevertOptions) RunRevert() (err error) { for _, node := range nodeLst.Items { isEdgeNode, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] if ok && isEdgeNode == "true" || strutil.IsInStringLst(kcmNodeNames, node.GetName()) { - _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) - if condition == nil || condition.Status != v1.ConditionTrue { + if !isNodeReady(&node.Status) { klog.Errorf("Cannot do the revert, the status of worker or kube-controller-manager node: %s is not 'Ready'.", node.Name) return } @@ -143,25 +141,19 @@ func (ro *RevertOptions) RunRevert() (err error) { } klog.V(4).Info("the status of worker nodes and kube-controller-manager nodes are satisfied") - // 2. remove labels from nodes - var edgeNodeNames []string + var edgeNodeNames, cloudNodeNames []string for _, node := range nodeLst.Items { - isEdgeNode, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] - if ok && isEdgeNode == "true" { - // cache edge nodes, we need to run servant job on each edge node later + isEdgeNode := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + // cache edge nodes and cloud nodes, we need to run servant job on each node later + if isEdgeNode == "true" { edgeNodeNames = append(edgeNodeNames, node.GetName()) } - if ok && isEdgeNode == "false" { - // remove the label for both the cloud node - delete(node.Labels, projectinfo.GetEdgeWorkerLabelKey()) - if _, err = ro.clientSet.CoreV1().Nodes().Update(context.Background(), &node, metav1.UpdateOptions{}); err != nil { - return - } + if isEdgeNode == "false" { + cloudNodeNames = append(cloudNodeNames, node.GetName()) } } - klog.Info("label openyurt.io/is-edge-worker is removed") - // 3. remove the yurt controller manager + // 2. remove the yurt controller manager if err = ro.clientSet.AppsV1().Deployments("kube-system"). Delete(context.Background(), "yurt-controller-manager", metav1.DeleteOptions{ PropagationPolicy: &kubeutil.PropagationPolicy, @@ -171,7 +163,7 @@ func (ro *RevertOptions) RunRevert() (err error) { } klog.Info("yurt controller manager is removed") - // 3.1 remove the serviceaccount for yurt-controller-manager + // 2.1 remove the serviceaccount for yurt-controller-manager if err = ro.clientSet.CoreV1().ServiceAccounts("kube-system"). Delete(context.Background(), "yurt-controller-manager", metav1.DeleteOptions{ PropagationPolicy: &kubeutil.PropagationPolicy, @@ -181,7 +173,7 @@ func (ro *RevertOptions) RunRevert() (err error) { } klog.Info("serviceaccount for yurt controller manager is removed") - // 3.2 remove the clusterrole for yurt-controller-manager + // 2.2 remove the clusterrole for yurt-controller-manager if err = ro.clientSet.RbacV1().ClusterRoles(). Delete(context.Background(), "yurt-controller-manager", metav1.DeleteOptions{ PropagationPolicy: &kubeutil.PropagationPolicy, @@ -191,7 +183,7 @@ func (ro *RevertOptions) RunRevert() (err error) { } klog.Info("clusterrole for yurt controller manager is removed") - // 3.3 remove the clusterrolebinding for yurt-controller-manager + // 2.3 remove the clusterrolebinding for yurt-controller-manager if err = ro.clientSet.RbacV1().ClusterRoleBindings(). Delete(context.Background(), "yurt-controller-manager", metav1.DeleteOptions{ PropagationPolicy: &kubeutil.PropagationPolicy, @@ -201,19 +193,19 @@ func (ro *RevertOptions) RunRevert() (err error) { } klog.Info("clusterrolebinding for yurt controller manager is removed") - // 4. remove the yurt-tunnel agent + // 3. remove the yurt-tunnel agent if err = removeYurtTunnelAgent(ro.clientSet); err != nil { klog.Errorf("fail to remove the yurt tunnel agent: %s", err) return } - // 5. remove the yurt-tunnel server + // 4. remove the yurt-tunnel server if err = removeYurtTunnelServer(ro.clientSet); err != nil { klog.Errorf("fail to remove the yurt tunnel server: %s", err) return } - // 6. enable node-controller + // 5. enable node-controller if err = kubeutil.RunServantJobs(ro.clientSet, map[string]string{ "action": "enable", @@ -226,20 +218,28 @@ func (ro *RevertOptions) RunRevert() (err error) { } klog.Info("complete enabling node-controller") - // 7. remove yurt-hub and revert kubelet service - if err = kubeutil.RunServantJobs(ro.clientSet, - map[string]string{ - "action": "revert", - "yurtctl_servant_image": ro.YurtctlServantImage, - "kubeadm_conf_path": ro.KubeadmConfPath, - }, - edgeNodeNames); err != nil { + // 6. remove yurt-hub and revert kubelet service on edge nodes + ctx := map[string]string{ + "action": "revert", + "yurtctl_servant_image": ro.YurtctlServantImage, + "kubeadm_conf_path": ro.KubeadmConfPath, + } + ctx["sub_command"] = "edgenode" + if err = kubeutil.RunServantJobs(ro.clientSet, ctx, edgeNodeNames); err != nil { + klog.Errorf("fail to revert edge node: %s", err) + return + } + klog.Info("complete removing yurt-hub and resetting kubelet service on edge nodes") + + // 7. remove yurt-hub and revert kubelet service on cloud nodes + ctx["sub_command"] = "cloudnode" + if err = kubeutil.RunServantJobs(ro.clientSet, ctx, cloudNodeNames); err != nil { klog.Errorf("fail to revert edge node: %s", err) return } - klog.Info("complete removing yurt-hub and resetting kubelet service") + klog.Info("complete removing yurt-hub and resetting kubelet service on cloud nodes") - // 7.1. remove yut-hub k8s config, roleBinding role + // 8. remove yut-hub k8s config, roleBinding role err = kubeutil.DeleteYurthubSetting(ro.clientSet) if err != nil { klog.Error("DeleteYurthubSetting err: ", err) diff --git a/pkg/yurtctl/constants/constants.go b/pkg/yurtctl/constants/constants.go index 78e18946567..daff2767413 100644 --- a/pkg/yurtctl/constants/constants.go +++ b/pkg/yurtctl/constants/constants.go @@ -271,7 +271,7 @@ spec: - /bin/sh - -c args: - - "cp /usr/local/bin/yurtctl /tmp && nsenter -t 1 -m -u -n -i -- /var/tmp/yurtctl convert edgenode --yurthub-image {{.yurthub_image}} {{if .yurthub_healthcheck_timeout}}--yurthub-healthcheck-timeout {{.yurthub_healthcheck_timeout}} {{end}}--join-token {{.joinToken}} && rm /tmp/yurtctl" + - "cp /usr/local/bin/yurtctl /tmp && nsenter -t 1 -m -u -n -i -- /var/tmp/yurtctl convert {{.sub_command}} --yurthub-image {{.yurthub_image}} {{if .yurthub_healthcheck_timeout}}--yurthub-healthcheck-timeout {{.yurthub_healthcheck_timeout}} {{end}}--join-token {{.joinToken}} && rm /tmp/yurtctl" securityContext: privileged: true volumeMounts: @@ -314,7 +314,7 @@ spec: - /bin/sh - -c args: - - "cp /usr/local/bin/yurtctl /tmp && nsenter -t 1 -m -u -n -i -- /var/tmp/yurtctl revert edgenode && rm /tmp/yurtctl" + - "cp /usr/local/bin/yurtctl /tmp && nsenter -t 1 -m -u -n -i -- /var/tmp/yurtctl revert {{.sub_command}} && rm /tmp/yurtctl" securityContext: privileged: true volumeMounts: diff --git a/pkg/yurtctl/util/edgenode/common.go b/pkg/yurtctl/util/edgenode/common.go index c5d561596da..fe24d321db1 100644 --- a/pkg/yurtctl/util/edgenode/common.go +++ b/pkg/yurtctl/util/edgenode/common.go @@ -93,6 +93,7 @@ spec: - --server-addr=__kubernetes_service_addr__ - --node-name=$(NODE_NAME) - --join-token=__join_token__ + - --working-mode=__working_mode__ livenessProbe: httpGet: host: 127.0.0.1 diff --git a/pkg/yurtctl/util/kubernetes/util.go b/pkg/yurtctl/util/kubernetes/util.go index 594f425a78f..3f729b966b8 100644 --- a/pkg/yurtctl/util/kubernetes/util.go +++ b/pkg/yurtctl/util/kubernetes/util.go @@ -443,8 +443,8 @@ func RunJobAndCleanup(cliSet *kubernetes.Clientset, job *batchv1.Job, timeout, p } } -// RunServantJobs launch servant jobs on specified edge nodes -func RunServantJobs(cliSet *kubernetes.Clientset, tmplCtx map[string]string, edgeNodeNames []string) error { +// RunServantJobs launch servant jobs on specified nodes +func RunServantJobs(cliSet *kubernetes.Clientset, tmplCtx map[string]string, nodeNames []string) error { var wg sync.WaitGroup var servantJobTemplate, jobBaseName string action, exist := tmplCtx["action"] @@ -468,7 +468,7 @@ func RunServantJobs(cliSet *kubernetes.Clientset, tmplCtx map[string]string, edg return fmt.Errorf("unknown action: %s", action) } - for _, nodeName := range edgeNodeNames { + for _, nodeName := range nodeNames { tmplCtx["jobName"] = jobBaseName + "-" + nodeName tmplCtx["nodeName"] = nodeName jobYaml, err := tmplutil.SubsituteTemplate(servantJobTemplate, tmplCtx)