From dfe309cba1ca02965f996d4d09a51c07cab998ca Mon Sep 17 00:00:00 2001 From: DrmagicE <379342542@qq.com> Date: Wed, 1 Dec 2021 11:06:40 +0800 Subject: [PATCH] refactor: yurtctl convert use node-servant in dispatch job (#617) 1. Remove convert cloudnode/edgenode sub command. 2. Replace the "yurt-servant" job in yurtctl with "node-servant". 3. Automatically set "--cert-ip" to yurt-tunnel-server if the user specify the tunnel server address in "yurtctl convert". 4. Add "--autonomous-nodes" option to "yurtctl convert". Co-authored-by: zhanglifang@chinatelecom.cn --- hack/local_up_openyurt.sh | 7 +- pkg/node-servant/convert/options.go | 7 +- pkg/yurtctl/cmd/convert/cloudnode.go | 70 --- pkg/yurtctl/cmd/convert/convert.go | 162 +++++-- pkg/yurtctl/cmd/convert/edgenode.go | 72 --- pkg/yurtctl/cmd/convert/node.go | 443 ------------------ .../cmd/markautonomous/markautonomous.go | 2 +- pkg/yurtctl/cmd/yurtinit/init.go | 8 + .../yurtinit/phases/install_yurt_addons.go | 7 +- ...{labelCloudNode.go => label_cloud_node.go} | 0 pkg/yurtctl/constants/constants.go | 45 +- .../constants/yurt-app-manager-tmpl.go | 2 +- .../constants/yurt-tunnel-server-tmpl.go | 3 + pkg/yurtctl/util/kubernetes/apply_addons.go | 3 +- pkg/yurtctl/util/kubernetes/util.go | 8 +- 15 files changed, 152 insertions(+), 687 deletions(-) delete mode 100644 pkg/yurtctl/cmd/convert/cloudnode.go delete mode 100644 pkg/yurtctl/cmd/convert/edgenode.go delete mode 100644 pkg/yurtctl/cmd/convert/node.go rename pkg/yurtctl/cmd/yurtinit/phases/{labelCloudNode.go => label_cloud_node.go} (100%) diff --git a/hack/local_up_openyurt.sh b/hack/local_up_openyurt.sh index 2c6dfdb4561..4e8c3c24124 100755 --- a/hack/local_up_openyurt.sh +++ b/hack/local_up_openyurt.sh @@ -72,6 +72,7 @@ readonly BUILD_TARGETS=( yurtctl yurt-tunnel-server yurt-tunnel-agent + yurt-node-servant ) readonly LOCAL_ARCH=$(go env GOHOSTARCH) @@ -192,6 +193,10 @@ function kind_load_images { if [[ "${bin}" = "yurtctl" ]]; then imagename="yurtctl-servant-${postfix}" fi + + if [[ "${bin}" = "yurt-node-servant" ]]; then + imagename="node-servant-${postfix}" + fi echo "loading image ${imagename} to nodes" local nodesarg=$(echo ${master} ${edgenodes[@]} | sed "s/ /,/g") @@ -251,7 +256,7 @@ function convert_to_openyurt { ${yurtctl_dir}/yurtctl convert --provider kubeadm --cloud-nodes ${master} \ --yurthub-image=$(get_image_name "yurthub" ${LOCAL_ARCH}) \ --yurt-controller-manager-image=$(get_image_name "yurt-controller-manager" ${LOCAL_ARCH}) \ - --yurtctl-servant-image=$(get_image_name "yurtctl-servant" ${LOCAL_ARCH}) + --node-servant-image=$(get_image_name "node-servant" ${LOCAL_ARCH}) # --yurt-tunnel-server-image=$(get_image_name "yurt-tunnel-server" ${LOCAL_ARCH}) \ # --yurt-tunnel-agent-image=$(get_image_name "yurt-tunnel-agent" ${LOCAL_ARCH}) diff --git a/pkg/node-servant/convert/options.go b/pkg/node-servant/convert/options.go index 7c3ec302705..cfd5fee6c33 100644 --- a/pkg/node-servant/convert/options.go +++ b/pkg/node-servant/convert/options.go @@ -95,7 +95,12 @@ func (o *Options) Complete(flags *pflag.FlagSet) error { if err != nil { return err } - o.workingMode = util.WorkingMode(workingMode) + + wm := util.WorkingMode(workingMode) + if !util.IsSupportedWorkingMode(wm) { + return fmt.Errorf("invalid working mode: %s", workingMode) + } + o.workingMode = wm return nil } diff --git a/pkg/yurtctl/cmd/convert/cloudnode.go b/pkg/yurtctl/cmd/convert/cloudnode.go deleted file mode 100644 index 8dbccebfe58..00000000000 --- a/pkg/yurtctl/cmd/convert/cloudnode.go +++ /dev/null @@ -1,70 +0,0 @@ -/* -Copyright 2020 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package convert - -import ( - "strings" - - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "k8s.io/klog" - - "github.com/openyurtio/openyurt/pkg/yurthub/util" -) - -// ConvertCloudNodeOptions has the information required by sub command convert cloudnode -type ConvertCloudNodeOptions struct { - ConvertNodeOptions -} - -// NewConvertCloudNodeOptions creates a new ConvertCloudNodeOptions -func NewConvertCloudNodeOptions() *ConvertCloudNodeOptions { - return &ConvertCloudNodeOptions{} -} - -// NewConvertCloudNodeCmd generates a new sub command convert cloudnode -func NewConvertCloudNodeCmd() *cobra.Command { - c := NewConvertCloudNodeOptions() - cmd := &cobra.Command{ - Use: "cloudnode", - Short: "Converts the kubernetes node to a yurt cloud node", - Run: func(cmd *cobra.Command, _ []string) { - if err := c.Complete(cmd.Flags()); err != nil { - klog.Fatalf("fail to complete the convert cloudnode option: %s", err) - } - if err := c.RunConvertNode(util.WorkingModeCloud); err != nil { - klog.Fatalf("fail to convert the kubernetes node to a yurt node: %s", err) - } - }, - } - cmd.Flags().StringP("cloud-nodes", "c", "", - "The list of cloud nodes wanted to be convert.(e.g. -e cloudnode1,cloudnode2)") - commonFlags(cmd) - return cmd -} - -// Complete completes all the required options. -func (c *ConvertCloudNodeOptions) Complete(flags *pflag.FlagSet) error { - enStr, err := flags.GetString("cloud-nodes") - if err != nil { - return err - } - if enStr != "" { - c.Nodes = strings.Split(enStr, ",") - } - return c.ConvertNodeOptions.Complete(flags) -} diff --git a/pkg/yurtctl/cmd/convert/convert.go b/pkg/yurtctl/cmd/convert/convert.go index 54964fcef4f..045763ae352 100644 --- a/pkg/yurtctl/cmd/convert/convert.go +++ b/pkg/yurtctl/cmd/convert/convert.go @@ -19,11 +19,14 @@ package convert import ( "context" "fmt" + "net" "strings" "time" "github.com/spf13/cobra" "github.com/spf13/pflag" + + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" @@ -32,11 +35,14 @@ import ( "k8s.io/klog" clusterinfophase "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/clusterinfo" + nodeutil "github.com/openyurtio/openyurt/pkg/controller/util/node" "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurtctl/constants" "github.com/openyurtio/openyurt/pkg/yurtctl/lock" enutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/edgenode" kubeutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/kubernetes" strutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/strings" + "github.com/openyurtio/openyurt/pkg/yurthub/util" ) // Provider signifies the provider type @@ -59,13 +65,16 @@ const ( // ConvertOptions has the information that required by convert operation type ConvertOptions struct { - clientSet *kubernetes.Clientset - CloudNodes []string + clientSet *kubernetes.Clientset + CloudNodes []string + // AutonomousNodes stores the names of edge nodes that are going to be marked as autonomous. + // If empty, all edge nodes will be marked as autonomous. + AutonomousNodes []string Provider Provider YurhubImage string YurthubHealthCheckTimeout time.Duration YurtControllerManagerImage string - YurctlServantImage string + NodeServantImage string YurttunnelServerImage string YurttunnelServerAddress string YurttunnelAgentImage string @@ -105,8 +114,6 @@ func NewConvertCmd() *cobra.Command { }, } - cmd.AddCommand(NewConvertEdgeNodeCmd()) - cmd.AddCommand(NewConvertCloudNodeCmd()) cmd.Flags().StringP("cloud-nodes", "c", "", "The list of cloud nodes.(e.g. -c cloudnode1,cloudnode2)") cmd.Flags().StringP("provider", "p", "minikube", @@ -119,9 +126,9 @@ func NewConvertCmd() *cobra.Command { cmd.Flags().String("yurt-controller-manager-image", "openyurt/yurt-controller-manager:latest", "The yurt-controller-manager image.") - cmd.Flags().String("yurtctl-servant-image", - "openyurt/yurtctl-servant:latest", - "The yurtctl-servant image.") + cmd.Flags().String("node-servant-image", + "openyurt/node-servant:latest", + "The node-servant image.") cmd.Flags().String("kubeadm-conf-path", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", "The path to kubelet service conf that is used by kubelet component to join the cluster on the edge node.") @@ -135,14 +142,17 @@ func NewConvertCmd() *cobra.Command { "openyurt/yurt-tunnel-agent:latest", "The yurt-tunnel-agent image.") cmd.Flags().BoolP("deploy-yurttunnel", "t", false, - "if set, yurttunnel will be deployed.") + "If set, yurttunnel will be deployed.") cmd.Flags().BoolP("enable-app-manager", "e", false, - "if set, yurtappmanager will be deployed.") + "If set, yurtappmanager will be deployed.") cmd.Flags().String("yurt-app-manager-image", "openyurt/yurt-app-manager:v0.4.0", "The yurt-app-manager image.") cmd.Flags().String("system-architecture", "amd64", "The system architecture of cloud nodes.") + cmd.Flags().StringP("autonomous-nodes", "a", "", + "The list of nodes that will be marked as autonomous. If not set, all edge nodes will be marked as autonomous."+ + "(e.g. -a autonomousnode1,autonomousnode2)") return cmd } @@ -160,6 +170,16 @@ func (co *ConvertOptions) Complete(flags *pflag.FlagSet) error { return err } + anStr, err := flags.GetString("autonomous-nodes") + if err != nil { + return err + } + if anStr == "" { + co.AutonomousNodes = []string{} + } else { + co.AutonomousNodes = strings.Split(anStr, ",") + } + dt, err := flags.GetBool("deploy-yurttunnel") if err != nil { return err @@ -196,11 +216,11 @@ func (co *ConvertOptions) Complete(flags *pflag.FlagSet) error { } co.YurtControllerManagerImage = ycmi - ycsi, err := flags.GetString("yurtctl-servant-image") + nsi, err := flags.GetString("node-servant-image") if err != nil { return err } - co.YurctlServantImage = ycsi + co.NodeServantImage = nsi ytsi, err := flags.GetString("yurt-tunnel-server-image") if err != nil { @@ -267,6 +287,11 @@ func (co *ConvertOptions) Validate() error { return fmt.Errorf("unknown provider: %s, valid providers are: minikube, ack, kubeadm, kind", co.Provider) } + if co.YurttunnelServerAddress != "" { + if _, _, err := net.SplitHostPort(co.YurttunnelServerAddress); err != nil { + return fmt.Errorf("invalid --yurt-tunnel-server-address: %s", err) + } + } return nil } @@ -295,23 +320,46 @@ func (co *ConvertOptions) RunConvert() (err error) { return } - // 1.2. check the state of worker nodes and kcm nodes + // edgeNodeNames stores the node names of all edge nodes. + var edgeNodeNames []string + // 1.2 check the nodes status and label nodeLst, err := co.clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) if err != nil { return } for _, node := range nodeLst.Items { - if !strutil.IsInStringLst(co.CloudNodes, node.GetName()) || strutil.IsInStringLst(kcmNodeNames, node.GetName()) { - if !isNodeReady(&node.Status) { - klog.Errorf("Cannot do the convert, the status of worker node or kube-controller-manager node: %s is not 'Ready'.", node.Name) - return - } + if !isNodeReady(&node.Status) { + err = fmt.Errorf("the status of node: %s is not 'Ready'", node.Name) + return + } + _, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + if ok { + return fmt.Errorf("the node %s has already been labeled as a OpenYurt node", node.GetName()) + } + if !strutil.IsInStringLst(co.CloudNodes, node.GetName()) { + edgeNodeNames = append(edgeNodeNames, node.GetName()) } } - klog.V(4).Info("the status of worker nodes and kube-controller-manager nodes are satisfied") + klog.V(4).Info("the status of all nodes are satisfied") + + var autonomousNodes []string + // 1.3 check if autonomous nodes are valid + for _, v := range co.AutonomousNodes { + if strutil.IsInStringLst(co.CloudNodes, v) { + return fmt.Errorf("can't make cloud node %s autonomous", v) + } + if !strutil.IsInStringLst(edgeNodeNames, v) { + return fmt.Errorf("can't make unknown node %s autonomous", v) + } + autonomousNodes = append(autonomousNodes, v) + } + // If empty, mark all edge nodes as autonomous + if len(co.AutonomousNodes) == 0 { + autonomousNodes = make([]string, len(edgeNodeNames)) + copy(autonomousNodes, edgeNodeNames) + } // 2. label nodes as cloud node or edge node - var edgeNodeNames []string for _, node := range nodeLst.Items { if strutil.IsInStringLst(co.CloudNodes, node.GetName()) { // label node as cloud node @@ -320,32 +368,48 @@ func (co *ConvertOptions) RunConvert() (err error) { &node, projectinfo.GetEdgeWorkerLabelKey(), "false"); err != nil { return } - continue + } else { + // label node as edge node + var updatedNode *v1.Node + klog.Infof("mark %s as the edge-node", node.GetName()) + if updatedNode, err = kubeutil.LabelNode(co.clientSet, + &node, projectinfo.GetEdgeWorkerLabelKey(), "true"); err != nil { + return + } + if strutil.IsInStringLst(autonomousNodes, node.GetName()) { + // mark edge node as autonomous + klog.Infof("mark %s as autonomous", node.GetName()) + if _, err = kubeutil.AnnotateNode(co.clientSet, + updatedNode, constants.AnnotationAutonomy, "true"); err != nil { + return + } + } } - edgeNodeNames = append(edgeNodeNames, node.GetName()) } // 3. deploy yurt controller manager if err = kubeutil.DeployYurtControllerManager(co.clientSet, co.YurtControllerManagerImage); err != nil { - klog.Errorf("fail to deploy yurtcontrollermanager: %s", err) - return + return fmt.Errorf("fail to deploy yurtcontrollermanager: %s", err) } // 4. disable node-controller ctx := map[string]string{ - "action": "disable", - "yurtctl_servant_image": co.YurctlServantImage, - "pod_manifest_path": co.PodMainfestPath, + "action": "disable", + "node_servant_image": co.NodeServantImage, + "pod_manifest_path": co.PodMainfestPath, } if err = kubeutil.RunServantJobs(co.clientSet, ctx, kcmNodeNames); err != nil { - klog.Errorf("fail to run DisableNodeControllerJobs: %s", err) - return + return fmt.Errorf("fail to run DisableNodeControllerJobs: %s", err) } klog.Info("complete disabling node-controller") // 5. deploy the yurttunnel if required if co.DeployTunnel { + var certIP string + if co.YurttunnelServerAddress != "" { + certIP, _, _ = net.SplitHostPort(co.YurttunnelServerAddress) + } if err = kubeutil.DeployYurttunnelServer(co.clientSet, - co.CloudNodes, + certIP, co.YurttunnelServerImage, co.SystemArchitecture); err != nil { err = fmt.Errorf("fail to deploy the yurt-tunnel-server: %s", err) @@ -365,8 +429,7 @@ func (co *ConvertOptions) RunConvert() (err error) { // 6. prepare kube-public/cluster-info configmap before convert err = prepareClusterInfoConfigMap(co.clientSet, co.kubeConfigPath) if err != nil { - klog.Errorf("fail to prepre cluster-info configmap, %v", err) - return + return fmt.Errorf("fail to prepre cluster-info configmap, %v", err) } //7. deploy the yurtappmanager if required @@ -392,12 +455,11 @@ func (co *ConvertOptions) RunConvert() (err error) { } ctx = map[string]string{ - "provider": string(co.Provider), - "action": "convert", - "yurtctl_servant_image": co.YurctlServantImage, - "yurthub_image": co.YurhubImage, - "joinToken": joinToken, - "kubeadm_conf_path": co.KubeadmConfPath, + "action": "convert", + "node_servant_image": co.NodeServantImage, + "yurthub_image": co.YurhubImage, + "joinToken": joinToken, + "kubeadm_conf_path": co.KubeadmConfPath, } if co.YurthubHealthCheckTimeout != defaultYurthubHealthCheckTimeout { @@ -405,20 +467,20 @@ func (co *ConvertOptions) RunConvert() (err error) { } // 9. deploy yurt-hub and reset the kubelet service on edge nodes. - klog.Infof("deploying the yurt-hub and resetting the kubelet service on edge nodes...") - ctx["sub_command"] = "edgenode" - if err = kubeutil.RunServantJobs(co.clientSet, ctx, edgeNodeNames); err != nil { - klog.Errorf("fail to run ServantJobs: %s", err) - return + if len(edgeNodeNames) != 0 { + klog.Infof("deploying the yurt-hub and resetting the kubelet service on edge nodes...") + ctx["working_mode"] = string(util.WorkingModeEdge) + if err = kubeutil.RunServantJobs(co.clientSet, ctx, edgeNodeNames); err != nil { + return fmt.Errorf("fail to run ServantJobs: %s", err) + } + klog.Info("complete deploying yurt-hub on edge nodes") } - klog.Info("complete deploying yurt-hub on edge nodes") // 10. deploy yurt-hub and reset the kubelet service on cloud nodes klog.Infof("deploying the yurt-hub and resetting the kubelet service on cloud nodes") - ctx["sub_command"] = "cloudnode" + ctx["working_mode"] = string(util.WorkingModeCloud) if err = kubeutil.RunServantJobs(co.clientSet, ctx, co.CloudNodes); err != nil { - klog.Errorf("fail to run ServantJobs: %s", err) - return + return fmt.Errorf("fail to run ServantJobs: %s", err) } klog.Info("complete deploying yurt-hub on cloud nodes") @@ -437,7 +499,6 @@ func prepareClusterInfoConfigMap(client *kubernetes.Clientset, file string) erro return fmt.Errorf("error creating clusterinfo RBAC rules, %v", err) } } else if err != nil || info == nil { - klog.Errorf("fail to get configmap, %v", err) return fmt.Errorf("fail to get configmap, %v", err) } else { klog.Infof("%s/%s configmap already exists, skip to prepare it", info.Namespace, info.Name) @@ -445,3 +506,8 @@ func prepareClusterInfoConfigMap(client *kubernetes.Clientset, file string) erro return nil } + +func isNodeReady(status *v1.NodeStatus) bool { + _, condition := nodeutil.GetNodeCondition(status, v1.NodeReady) + return condition != nil && condition.Status == v1.ConditionTrue +} diff --git a/pkg/yurtctl/cmd/convert/edgenode.go b/pkg/yurtctl/cmd/convert/edgenode.go deleted file mode 100644 index 6597f04cb3b..00000000000 --- a/pkg/yurtctl/cmd/convert/edgenode.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Copyright 2021 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package convert - -import ( - "strings" - - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "k8s.io/klog" - - "github.com/openyurtio/openyurt/pkg/yurthub/util" -) - -// ConvertEdgeNodeOptions has the information required by sub command convert edgenode -type ConvertEdgeNodeOptions struct { - ConvertNodeOptions -} - -// NewConvertEdgeNodeOptions creates a new ConvertEdgeNodeOptions -func NewConvertEdgeNodeOptions() *ConvertEdgeNodeOptions { - return &ConvertEdgeNodeOptions{} -} - -// NewConvertEdgeNodeCmd generates a new sub command convert edgenode -func NewConvertEdgeNodeCmd() *cobra.Command { - c := NewConvertEdgeNodeOptions() - cmd := &cobra.Command{ - Use: "edgenode", - Short: "Converts the kubernetes node to a yurt edge node", - Run: func(cmd *cobra.Command, _ []string) { - if err := c.Complete(cmd.Flags()); err != nil { - klog.Fatalf("fail to complete the convert edgenode option: %s", err) - } - if err := c.RunConvertNode(util.WorkingModeEdge); err != nil { - klog.Fatalf("fail to convert the kubernetes node to a yurt node: %s", err) - } - }, - } - - cmd.Flags().StringP("edge-nodes", "e", "", - "The list of edge nodes wanted to be convert.(e.g. -e edgenode1,edgenode2)") - commonFlags(cmd) - - return cmd -} - -// Complete completes all the required options -func (c *ConvertEdgeNodeOptions) Complete(flags *pflag.FlagSet) error { - enStr, err := flags.GetString("edge-nodes") - if err != nil { - return err - } - if enStr != "" { - c.Nodes = strings.Split(enStr, ",") - } - return c.ConvertNodeOptions.Complete(flags) -} diff --git a/pkg/yurtctl/cmd/convert/node.go b/pkg/yurtctl/cmd/convert/node.go deleted file mode 100644 index a4c92127616..00000000000 --- a/pkg/yurtctl/cmd/convert/node.go +++ /dev/null @@ -1,443 +0,0 @@ -/* -Copyright 2021 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package convert - -import ( - "context" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "os" - "os/exec" - "path/filepath" - "strings" - "time" - - "github.com/spf13/cobra" - "github.com/spf13/pflag" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/klog" - - nodeutil "github.com/openyurtio/openyurt/pkg/controller/util/node" - "github.com/openyurtio/openyurt/pkg/projectinfo" - "github.com/openyurtio/openyurt/pkg/yurtctl/constants" - enutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/edgenode" - kubeutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/kubernetes" - strutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/strings" - "github.com/openyurtio/openyurt/pkg/yurthub/util" -) - -const ( - kubeletConfigRegularExpression = "\\-\\-kubeconfig=.*kubelet.conf" - apiserverAddrRegularExpression = "server: (http(s)?:\\/\\/)?[\\w][-\\w]{0,62}(\\.[\\w][-\\w]{0,62})*(:[\\d]{1,5})?" - hubHealthzCheckFrequency = 10 * time.Second - filemode = 0666 - dirmode = 0755 -) - -// ConvertNodeOptions has the information required by sub command convert edgenode and convert cloudnode -type ConvertNodeOptions struct { - clientSet *kubernetes.Clientset - Nodes []string - YurthubImage string - YurthubHealthCheckTimeout time.Duration - YurctlServantImage string - JoinToken string - KubeadmConfPath string - openyurtDir string -} - -// commonFlags sets all common flags. -func commonFlags(cmd *cobra.Command) { - cmd.Flags().String("yurthub-image", "openyurt/yurthub:latest", - "The yurthub image.") - cmd.Flags().Duration("yurthub-healthcheck-timeout", defaultYurthubHealthCheckTimeout, - "The timeout for yurthub health check.") - cmd.Flags().String("yurtctl-servant-image", "openyurt/yurtctl-servant:latest", - "The yurtctl-servant image.") - cmd.Flags().String("kubeadm-conf-path", "", - "The path to kubelet service conf that is used by kubelet component to join the cluster on the edge node.") - cmd.Flags().String("join-token", "", "The token used by yurthub for joining the cluster.") -} - -// Complete completes all the required options. -func (c *ConvertNodeOptions) Complete(flags *pflag.FlagSet) error { - yurthubImage, err := flags.GetString("yurthub-image") - if err != nil { - return err - } - c.YurthubImage = yurthubImage - - yurthubHealthCheckTimeout, err := flags.GetDuration("yurthub-healthcheck-timeout") - if err != nil { - return err - } - c.YurthubHealthCheckTimeout = yurthubHealthCheckTimeout - - ycsi, err := flags.GetString("yurtctl-servant-image") - if err != nil { - return err - } - c.YurctlServantImage = ycsi - - kubeadmConfPath, err := flags.GetString("kubeadm-conf-path") - if err != nil { - return err - } - if kubeadmConfPath == "" { - kubeadmConfPath = os.Getenv("KUBELET_SVC") - } - if kubeadmConfPath == "" { - kubeadmConfPath = enutil.KubeletSvcPath - } - c.KubeadmConfPath = kubeadmConfPath - - c.clientSet, err = enutil.GenClientSet(flags) - if err != nil { - return err - } - - joinToken, err := flags.GetString("join-token") - if err != nil { - return err - } - if joinToken == "" { - joinToken, err = kubeutil.GetOrCreateJoinTokenString(c.clientSet) - if err != nil { - return err - } - } - c.JoinToken = joinToken - - openyurtDir := os.Getenv("OPENYURT_DIR") - if openyurtDir == "" { - openyurtDir = enutil.OpenyurtDir - } - c.openyurtDir = openyurtDir - - return nil -} - -// SetupYurthub sets up the yurthub pod and wait for the its status to be Running. -func (c *ConvertNodeOptions) SetupYurthub(workingMode util.WorkingMode) error { - // 1. put yurt-hub yaml into /etc/kubernetes/manifests - klog.Infof("setting up yurthub on node") - - // 1-1. get apiserver address - kubeletConfPath, err := enutil.GetSingleContentFromFile(c.KubeadmConfPath, kubeletConfigRegularExpression) - if err != nil { - return err - } - kubeletConfPath = strings.Split(kubeletConfPath, "=")[1] - apiserverAddr, err := enutil.GetSingleContentFromFile(kubeletConfPath, apiserverAddrRegularExpression) - if err != nil { - return err - } - apiserverAddr = strings.Split(apiserverAddr, " ")[1] - - // 1-2. replace variables in yaml file - klog.Infof("setting up yurthub apiserver addr") - yurthubTemplate := enutil.ReplaceRegularExpression(enutil.YurthubTemplate, - map[string]string{ - "__kubernetes_service_addr__": apiserverAddr, - "__yurthub_image__": c.YurthubImage, - "__join_token__": c.JoinToken, - "__working_mode__": string(workingMode), - }) - - // 1-3. create yurthub.yaml - podManifestPath := enutil.GetPodManifestPath() - if err != nil { - return err - } - if err = enutil.EnsureDir(podManifestPath); err != nil { - return err - } - err = ioutil.WriteFile(getYurthubYaml(podManifestPath), []byte(yurthubTemplate), filemode) - if err != nil { - return err - } - klog.Infof("create the %s/yurt-hub.yaml", podManifestPath) - - // 2. wait yurthub pod to be ready - err = hubHealthcheck(c.YurthubHealthCheckTimeout) - return err -} - -// ResetKubelet changes the configuration of the kubelet service and restart it -func (c *ConvertNodeOptions) ResetKubelet() error { - // 1. create a working dir to store revised kubelet.conf - err := os.MkdirAll(c.openyurtDir, dirmode) - if err != nil { - return err - } - fullpath := c.getYurthubKubeletConf() - err = ioutil.WriteFile(fullpath, []byte(enutil.OpenyurtKubeletConf), filemode) - if err != nil { - return err - } - klog.Infof("revised kubeconfig %s is generated", fullpath) - - // 2. revise the kubelet.service drop-in - // 2.1 make a backup for the origin kubelet.service - bkfile := c.getKubeletSvcBackup() - err = enutil.CopyFile(c.KubeadmConfPath, bkfile, 0666) - if err != nil { - return err - } - - // 2.2 revise the drop-in, point it to the $OPENYURT_DIR/kubelet.conf - contentbyte, err := ioutil.ReadFile(c.KubeadmConfPath) - if err != nil { - return err - } - kubeConfigSetup := fmt.Sprintf("--kubeconfig=%s/kubelet.conf", c.openyurtDir) - content := enutil.ReplaceRegularExpression(string(contentbyte), map[string]string{ - "--bootstrap.*bootstrap-kubelet.conf": "", - "--kubeconfig=.*kubelet.conf": kubeConfigSetup, - }) - err = ioutil.WriteFile(c.KubeadmConfPath, []byte(content), filemode) - if err != nil { - return err - } - klog.Info("kubelet.service drop-in file is revised") - - // 3. reset the kubelet.service - klog.Info(enutil.DaemonReload) - cmd := exec.Command("bash", "-c", enutil.DaemonReload) - if err := enutil.Exec(cmd); err != nil { - return err - } - klog.Info(enutil.RestartKubeletSvc) - cmd = exec.Command("bash", "-c", enutil.RestartKubeletSvc) - if err := enutil.Exec(cmd); err != nil { - return err - } - klog.Infof("kubelet has been restarted") - return nil -} - -func (c *ConvertNodeOptions) getYurthubKubeletConf() string { - return filepath.Join(c.openyurtDir, enutil.KubeletConfName) -} - -func (c *ConvertNodeOptions) getKubeletSvcBackup() string { - return fmt.Sprintf(enutil.KubeletSvcBackup, c.KubeadmConfPath) -} - -func getYurthubYaml(podManifestPath string) string { - return filepath.Join(podManifestPath, enutil.YurthubYamlName) -} - -// hubHealthcheck will check the status of yurthub pod -func hubHealthcheck(timeout time.Duration) error { - serverHealthzURL, err := url.Parse(fmt.Sprintf("http://%s", enutil.ServerHealthzServer)) - if err != nil { - return err - } - serverHealthzURL.Path = enutil.ServerHealthzURLPath - - start := time.Now() - return wait.PollImmediate(hubHealthzCheckFrequency, timeout, func() (bool, error) { - _, err := pingClusterHealthz(http.DefaultClient, serverHealthzURL.String()) - if err != nil { - klog.Infof("yurt-hub is not ready, ping cluster healthz with result: %v", err) - return false, nil - } - klog.Infof("yurt-hub healthz is OK after %f seconds", time.Since(start).Seconds()) - return true, nil - }) -} - -func pingClusterHealthz(client *http.Client, addr string) (bool, error) { - if client == nil { - return false, fmt.Errorf("http client is invalid") - } - - resp, err := client.Get(addr) - if err != nil { - return false, err - } - - b, err := ioutil.ReadAll(resp.Body) - defer resp.Body.Close() - if err != nil { - return false, fmt.Errorf("failed to read response of cluster healthz, %v", err) - } - - if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("response status code is %d", resp.StatusCode) - } - - if strings.ToLower(string(b)) != "ok" { - return false, fmt.Errorf("cluster healthz is %s", string(b)) - } - - return true, nil -} - -func isNodeReady(status *v1.NodeStatus) bool { - _, condition := nodeutil.GetNodeCondition(status, v1.NodeReady) - return condition != nil && condition.Status == v1.ConditionTrue -} - -// RunConvertNode converts a standard Kubernetes node to a Yurt node -func (c *ConvertNodeOptions) RunConvertNode(workingMode util.WorkingMode) (err error) { - // 1. check the server version - if err = kubeutil.ValidateServerVersion(c.clientSet); err != nil { - return - } - klog.V(4).Info("the server version is valid") - - nodeName, err := enutil.GetNodeName(c.KubeadmConfPath) - if err != nil { - nodeName = "" - } - if len(c.Nodes) > 1 || (len(c.Nodes) == 1 && c.Nodes[0] != nodeName) { - // dispatch servant job to remote nodes - err = c.dispatchServantJobs(workingMode) - } else if (len(c.Nodes) == 0 && nodeName != "") || (len(c.Nodes) == 1 && c.Nodes[0] == nodeName) { - // convert local node - err = c.convertLocalNode(nodeName, workingMode) - } else { - return fmt.Errorf("fail to convert node, flag --edge-nodes or --cloud-nodes %s err", c.Nodes) - } - return err -} - -// dispatchServantJobs dispatches servant job to remote nodes. -func (c *ConvertNodeOptions) dispatchServantJobs(workingMode util.WorkingMode) error { - nodeLst, err := c.clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) - if err != nil { - return err - } - - // 1. check the nodes and its label - var nodeNames []string - for _, node := range nodeLst.Items { - labelValue, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] - // The label of edge nodes will be set by the servant job that we are going to dispatch here, - // so we assume that when dispatching servant job to edge nodes, the label should not be set. - if workingMode == util.WorkingModeEdge && !ok { - nodeNames = append(nodeNames, node.GetName()) - } - // The label of cloud nodes is usually already set by yurtctl convert before dispatching the servant job. - if workingMode == util.WorkingModeCloud && labelValue == "false" { - nodeNames = append(nodeNames, node.GetName()) - } - } - - for _, node := range c.Nodes { - if !strutil.IsInStringLst(nodeNames, node) { - return fmt.Errorf("Cannot do the convert, the node: %s is not a Kubernetes node.", node) - } - } - - // 2. check the state of nodes - for _, node := range nodeLst.Items { - if strutil.IsInStringLst(c.Nodes, node.GetName()) { - if !isNodeReady(&node.Status) { - return fmt.Errorf("Cannot do the convert, the status of node: %s is not 'Ready'.", node.Name) - } - } - } - - // 3. deploy yurt-hub and reset the kubelet service - ctx := map[string]string{ - "action": "convert", - "yurtctl_servant_image": c.YurctlServantImage, - "yurthub_image": c.YurthubImage, - "joinToken": c.JoinToken, - "kubeadm_conf_path": c.KubeadmConfPath, - } - - switch workingMode { - case util.WorkingModeCloud: - ctx["sub_command"] = "cloudnode" - case util.WorkingModeEdge: - ctx["sub_command"] = "edgenode" - } - if c.YurthubHealthCheckTimeout != defaultYurthubHealthCheckTimeout { - ctx["yurthub_healthcheck_timeout"] = c.YurthubHealthCheckTimeout.String() - } - - if err = kubeutil.RunServantJobs(c.clientSet, ctx, c.Nodes); err != nil { - klog.Errorf("fail to run ServantJobs: %s", err) - return err - } - return nil -} - -// convertLocalNode converts the local node -func (c *ConvertNodeOptions) convertLocalNode(nodeName string, workingMode util.WorkingMode) error { - // 1. check if critical files exist - if _, err := enutil.FileExists(c.KubeadmConfPath); err != nil { - return err - } - - // 2. check the state of node - node, err := c.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - return err - } - if !isNodeReady(&node.Status) { - return fmt.Errorf("Cannot do the convert, the status of node: %s is not 'Ready'.", node.Name) - } - - // 3. check the label of node - labelVal, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] - if workingMode == util.WorkingModeEdge && ok { - return fmt.Errorf("Cannot do the convert, the edge node: %s is not a Kubernetes node.", node.Name) - } - if workingMode == util.WorkingModeCloud && labelVal != "false" { - return fmt.Errorf("Cannot do the convert, the cloud node: %s is not a Kubernetes node.", node.Name) - } - - // 4. deploy yurt-hub and reset the kubelet service - err = c.SetupYurthub(workingMode) - if err != nil { - return fmt.Errorf("fail to set up the yurthub pod: %v", err) - } - err = c.ResetKubelet() - if err != nil { - return fmt.Errorf("fail to reset the kubelet service: %v", err) - } - - if workingMode == util.WorkingModeEdge { - // 5. label node as edge node - klog.Infof("mark %s as the edge-node", nodeName) - node, err = c.clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - return err - } - node, err = kubeutil.LabelNode(c.clientSet, node, projectinfo.GetEdgeWorkerLabelKey(), "true") - if err != nil { - return err - } - // 6. open the autonomous - klog.Infof("open the %s autonomous", nodeName) - _, err = kubeutil.AnnotateNode(c.clientSet, node, constants.AnnotationAutonomy, "true") - if err != nil { - return err - } - } - return nil -} diff --git a/pkg/yurtctl/cmd/markautonomous/markautonomous.go b/pkg/yurtctl/cmd/markautonomous/markautonomous.go index 8e45804946f..9d794625077 100644 --- a/pkg/yurtctl/cmd/markautonomous/markautonomous.go +++ b/pkg/yurtctl/cmd/markautonomous/markautonomous.go @@ -63,7 +63,7 @@ func NewMarkAutonomousCmd() *cobra.Command { } cmd.Flags().StringP("autonomous-nodes", "a", "", - "The list of nodes that will be marked as autonomous."+ + "The list of nodes that will be marked as autonomous. If not set, all edge nodes will be marked as autonomous."+ "(e.g. -a autonomousnode1,autonomousnode2)") return cmd diff --git a/pkg/yurtctl/cmd/yurtinit/init.go b/pkg/yurtctl/cmd/yurtinit/init.go index b54575e7604..65f5ce7ddbf 100644 --- a/pkg/yurtctl/cmd/yurtinit/init.go +++ b/pkg/yurtctl/cmd/yurtinit/init.go @@ -22,6 +22,7 @@ import ( "crypto/x509" "fmt" "io" + "net" "os" "path/filepath" "strings" @@ -365,6 +366,13 @@ func newInitData(cmd *cobra.Command, args []string, options *initOptions, out io } } + if options.openyurtTunnelServerAddress != "" { + _, _, err = net.SplitHostPort(options.openyurtTunnelServerAddress) + if err != nil { + return nil, errors.Wrapf(err, "invalid yurt tunnel server address") + } + } + return &initData{ cfg: cfg, certificatesDir: cfg.CertificatesDir, diff --git a/pkg/yurtctl/cmd/yurtinit/phases/install_yurt_addons.go b/pkg/yurtctl/cmd/yurtinit/phases/install_yurt_addons.go index 2f9d22aa4fe..9b408974cd1 100644 --- a/pkg/yurtctl/cmd/yurtinit/phases/install_yurt_addons.go +++ b/pkg/yurtctl/cmd/yurtinit/phases/install_yurt_addons.go @@ -18,6 +18,7 @@ package phases import ( "fmt" + "net" "runtime" "k8s.io/client-go/dynamic" @@ -75,7 +76,11 @@ func runInstallYurtAddons(c workflow.RunData) error { if err := kubeutil.DeployYurtAppManager(client, fmt.Sprintf("%s/%s:%s", imageRegistry, YurtContants.YurtAppManager, version), dynamicClient, runtime.GOARCH); err != nil { return err } - if err := kubeutil.DeployYurttunnelServer(client, nil, fmt.Sprintf("%s/%s:%s", imageRegistry, YurtContants.YurtTunnelServer, version), runtime.GOARCH); err != nil { + var certIP string + if data.YurtTunnelAddress() != "" { + certIP, _, _ = net.SplitHostPort(data.YurtTunnelAddress()) + } + if err := kubeutil.DeployYurttunnelServer(client, certIP, fmt.Sprintf("%s/%s:%s", imageRegistry, YurtContants.YurtTunnelServer, version), runtime.GOARCH); err != nil { return err } if err := kubeutil.DeployYurttunnelAgent(client, tunnelServerAddress, fmt.Sprintf("%s/%s:%s", imageRegistry, YurtContants.YurtTunnelAgent, version)); err != nil { diff --git a/pkg/yurtctl/cmd/yurtinit/phases/labelCloudNode.go b/pkg/yurtctl/cmd/yurtinit/phases/label_cloud_node.go similarity index 100% rename from pkg/yurtctl/cmd/yurtinit/phases/labelCloudNode.go rename to pkg/yurtctl/cmd/yurtinit/phases/label_cloud_node.go diff --git a/pkg/yurtctl/constants/constants.go b/pkg/yurtctl/constants/constants.go index 06ce22884c0..507ed94edbe 100644 --- a/pkg/yurtctl/constants/constants.go +++ b/pkg/yurtctl/constants/constants.go @@ -249,49 +249,6 @@ spec: image: {{.image}} command: - yurt-controller-manager -` - // ConvertServantJobTemplate defines the yurtctl convert servant job in yaml format - ConvertServantJobTemplate = ` -apiVersion: batch/v1 -kind: Job -metadata: - name: {{.jobName}} - namespace: kube-system -spec: - template: - spec: - hostPID: true - hostNetwork: true - restartPolicy: OnFailure - nodeName: {{.nodeName}} - volumes: - - name: host-var-tmp - hostPath: - path: /var/tmp - type: Directory - containers: - - name: yurtctl-servant - image: {{.yurtctl_servant_image}} - imagePullPolicy: IfNotPresent - command: - - /bin/sh - - -c - args: - - "cp /usr/local/bin/yurtctl /tmp && nsenter -t 1 -m -u -n -i -- /var/tmp/yurtctl convert {{.sub_command}} --yurthub-image {{.yurthub_image}} {{if .yurthub_healthcheck_timeout}}--yurthub-healthcheck-timeout {{.yurthub_healthcheck_timeout}} {{end}}--join-token {{.joinToken}} && rm /tmp/yurtctl" - securityContext: - privileged: true - volumeMounts: - - mountPath: /tmp - name: host-var-tmp - env: - - name: NODE_NAME - valueFrom: - fieldRef: - fieldPath: spec.nodeName - {{if .kubeadm_conf_path }} - - name: KUBELET_SVC - value: {{.kubeadm_conf_path}} - {{end}} ` // RevertServantJobTemplate defines the yurtctl revert servant job in yaml format RevertServantJobTemplate = ` @@ -352,7 +309,7 @@ spec: nodeName: {{.nodeName}} containers: - name: yurtctl-disable-node-controller - image: {{.yurtctl_servant_image}} + image: {{.node_servant_image}} imagePullPolicy: IfNotPresent command: - /bin/sh diff --git a/pkg/yurtctl/constants/yurt-app-manager-tmpl.go b/pkg/yurtctl/constants/yurt-app-manager-tmpl.go index 2cde69af51a..38d7ad57499 100644 --- a/pkg/yurtctl/constants/yurt-app-manager-tmpl.go +++ b/pkg/yurtctl/constants/yurt-app-manager-tmpl.go @@ -667,7 +667,7 @@ spec: - --v=4 command: - /usr/local/bin/yurt-app-manager - image: openyurt/yurt-app-manager:v0.4.0 + image: {{.image}} imagePullPolicy: Always name: manager ports: diff --git a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go index 70ff9144c9b..d2031ec4b41 100644 --- a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go +++ b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go @@ -207,6 +207,9 @@ spec: - --bind-address=$(NODE_IP) - --insecure-bind-address=$(NODE_IP) - --server-count=1 + {{if .certIP }} + - --cert-ips={{.certIP}} + {{end}} env: - name: NODE_IP valueFrom: diff --git a/pkg/yurtctl/util/kubernetes/apply_addons.go b/pkg/yurtctl/util/kubernetes/apply_addons.go index 6ecac5513f4..c643fa7b7d4 100644 --- a/pkg/yurtctl/util/kubernetes/apply_addons.go +++ b/pkg/yurtctl/util/kubernetes/apply_addons.go @@ -138,7 +138,7 @@ func DeployYurtAppManager( func DeployYurttunnelServer( client *kubernetes.Clientset, - cloudNodes []string, + certIP string, yurttunnelServerImage string, systemArchitecture string) error { // 1. create the ClusterRole @@ -185,6 +185,7 @@ func DeployYurttunnelServer( map[string]string{ "image": yurttunnelServerImage, "arch": systemArchitecture, + "certIP": certIP, "edgeWorkerLabel": projectinfo.GetEdgeWorkerLabelKey()}); err != nil { return err } diff --git a/pkg/yurtctl/util/kubernetes/util.go b/pkg/yurtctl/util/kubernetes/util.go index 0bc8146f461..adb93195495 100644 --- a/pkg/yurtctl/util/kubernetes/util.go +++ b/pkg/yurtctl/util/kubernetes/util.go @@ -61,6 +61,7 @@ import ( kubeadmcontants "k8s.io/kubernetes/cmd/kubeadm/app/constants" tokenphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/node" + nodeservant "github.com/openyurtio/openyurt/pkg/node-servant" "github.com/openyurtio/openyurt/pkg/yurtctl/constants" "github.com/openyurtio/openyurt/pkg/yurtctl/util" "github.com/openyurtio/openyurt/pkg/yurtctl/util/edgenode" @@ -69,8 +70,6 @@ import ( ) const ( - // ConvertJobNameBase is the prefix of the convert ServantJob name - ConvertJobNameBase = "yurtctl-servant-convert" // RevertJobNameBase is the prefix of the revert ServantJob name RevertJobNameBase = "yurtctl-servant-revert" // DisableNodeControllerJobNameBase is the prefix of the DisableNodeControllerJob name @@ -502,8 +501,9 @@ func RunServantJobs(cliSet *kubernetes.Clientset, tmplCtx map[string]string, nod } switch action { case "convert": - servantJobTemplate = constants.ConvertServantJobTemplate - jobBaseName = ConvertJobNameBase + // TODO use nodeservant.RenderNodeServantJob + servantJobTemplate = nodeservant.ConvertServantJobTemplate + jobBaseName = nodeservant.ConvertJobNameBase case "revert": servantJobTemplate = constants.RevertServantJobTemplate jobBaseName = RevertJobNameBase