From 206a7a133caf006e830980a84811318349dd35fd Mon Sep 17 00:00:00 2001 From: hujiajing Date: Wed, 6 Jul 2022 00:17:55 +0800 Subject: [PATCH] Add antctl join command Signed-off-by: hjiajing --- .../commonarea/remote_common_area.go | 6 +- pkg/antctl/antctl.go | 6 + pkg/antctl/raw/multicluster/commands.go | 3 + pkg/antctl/raw/multicluster/join/join.go | 382 ++++++++++++++++++ 4 files changed, 394 insertions(+), 3 deletions(-) create mode 100644 pkg/antctl/raw/multicluster/join/join.go diff --git a/multicluster/controllers/multicluster/commonarea/remote_common_area.go b/multicluster/controllers/multicluster/commonarea/remote_common_area.go index 8c09498fc9a..934d7cc5d20 100644 --- a/multicluster/controllers/multicluster/commonarea/remote_common_area.go +++ b/multicluster/controllers/multicluster/commonarea/remote_common_area.go @@ -120,7 +120,7 @@ func NewRemoteCommonArea(clusterID common.ClusterID, clusterSetID common.Cluster clusterSetNamespace string) (CommonArea, error) { klog.InfoS("Create a RemoteCommonArea", "Cluster", clusterID) - crtData, token, err := getSecretCACrtAndToken(secret) + crtData, token, err := GetSecretCACrtAndToken(secret) if err != nil { return nil, err } @@ -174,9 +174,9 @@ func NewRemoteCommonArea(clusterID common.ClusterID, clusterSetID common.Cluster } /** - * getSecretCACrtAndToken returns the access credentials from Secret. + * GetSecretCACrtAndToken returns the access credentials from Secret. */ -func getSecretCACrtAndToken(secretObj *v1.Secret) ([]byte, []byte, error) { +func GetSecretCACrtAndToken(secretObj *v1.Secret) ([]byte, []byte, error) { caData, found := secretObj.Data[v1.ServiceAccountRootCAKey] if !found { return nil, nil, fmt.Errorf("ca.crt data not found in Secret %v", secretObj.GetName()) diff --git a/pkg/antctl/antctl.go b/pkg/antctl/antctl.go index 88038d12eb3..6044924fa50 100644 --- a/pkg/antctl/antctl.go +++ b/pkg/antctl/antctl.go @@ -621,6 +621,12 @@ $ antctl get podmulticaststats pod -n namespace`, supportController: false, commandGroup: mc, }, + { + cobraCommand: multicluster.JoinCmd, + supportAgent: false, + supportController: false, + commandGroup: mc, + }, { cobraCommand: set.SetCmd, supportAgent: false, diff --git a/pkg/antctl/raw/multicluster/commands.go b/pkg/antctl/raw/multicluster/commands.go index 7367d3cb03a..2106e3ae02a 100644 --- a/pkg/antctl/raw/multicluster/commands.go +++ b/pkg/antctl/raw/multicluster/commands.go @@ -22,6 +22,7 @@ import ( deleteCmd "antrea.io/antrea/pkg/antctl/raw/multicluster/delete" "antrea.io/antrea/pkg/antctl/raw/multicluster/deploy" "antrea.io/antrea/pkg/antctl/raw/multicluster/get" + "antrea.io/antrea/pkg/antctl/raw/multicluster/join" ) var GetCmd = &cobra.Command{ @@ -49,6 +50,8 @@ var DeployCmd = &cobra.Command{ Short: "Deploy Antrea Multi-cluster Controller to a leader or member cluster", } +var JoinCmd = join.NewJoinCommand() + func init() { GetCmd.AddCommand(get.NewClusterSetCommand()) GetCmd.AddCommand(get.NewResourceImportCommand()) diff --git a/pkg/antctl/raw/multicluster/join/join.go b/pkg/antctl/raw/multicluster/join/join.go new file mode 100644 index 00000000000..e04d81d2128 --- /dev/null +++ b/pkg/antctl/raw/multicluster/join/join.go @@ -0,0 +1,382 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package join + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "strings" + "time" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" + + multiclusterv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + multiclusterv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" + "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" + "antrea.io/antrea/pkg/antctl/raw" + multiclusterscheme "antrea.io/antrea/pkg/antctl/raw/multicluster/scheme" +) + +type clusterOptions struct { + Namespace string `yaml:"namespace,omitempty"` + ClusterID string `yaml:"clusterID,omitempty"` + Secret string `yaml:"secret,omitempty"` + Server string `yaml:"server,omitempty"` + ClusterSet string `yaml:"clusterSet,omitempty"` +} + +type joinOptions struct { + LeaderClusterOpt clusterOptions `yaml:"leaderCluster"` + MemberClusterOpt clusterOptions `yaml:"memberCluster"` + ConfigFile string +} + +type meta struct { + Kind string `yaml:"kind"` + ApiVersion string `yaml:"apiVersion"` +} + +var options *joinOptions + +func (o *joinOptions) validateAndComplete() error { + if o.ConfigFile != "" { + raw, err := ioutil.ReadFile(o.ConfigFile) + if err != nil { + return err + } + typeMeta := &meta{} + if err := yamlUnmarshall(raw, typeMeta); err != nil { + return err + } + if typeMeta.Kind != "ClusterSetConfig" || typeMeta.ApiVersion != "multicluster.antrea.io/v1alpha1" { + return fmt.Errorf("unknown apiVersion or kind in config: %s", o.ConfigFile) + } + if err := yamlUnmarshall(raw, &options); err != nil { + return err + } + } + + if o.LeaderClusterOpt.ClusterID == "" { + return fmt.Errorf("the leader clusterID cannot be empty") + } + if o.LeaderClusterOpt.Server == "" { + return fmt.Errorf("the leader server cannot be empty") + } + if o.LeaderClusterOpt.Secret == "" { + return fmt.Errorf("the leader secret cannot be empty") + } + if o.LeaderClusterOpt.Namespace == "" { + return fmt.Errorf("the leader namespace cannot be empty") + } + if o.MemberClusterOpt.ClusterSet == "" { + return fmt.Errorf("the member cluster set cannot be empty") + } + if o.MemberClusterOpt.ClusterID == "" { + return fmt.Errorf("the member cluster id cannot be empty") + } + if o.MemberClusterOpt.Namespace == "" { + return fmt.Errorf("the member namespace cannot be empty") + } + + return nil +} + +func yamlUnmarshall(raw []byte, v interface{}) error { + d := yaml.NewDecoder(bytes.NewReader(raw)) + d.KnownFields(false) + return d.Decode(v) +} + +var joinExamples = strings.Trim(` +# Join the ClusterSet from a member cluster with command line options + $ antctl mc join --leader-namespace= \ + --leader-cluster-id= \ + --secret= \ + --leader-server= \ + --cluster-set= \ + --namespace= \ + --cluster-id= + +# Join the ClusterSet from a member cluster with config file. If you use both command line options and config file, +the argument in config file will be used. + $ antctl mc join --config= + +# Config file example: +apiVersion: multicluster.antrea.io/v1alpha1 +kind: ClusterSetConfig +memberCluster: + namespace: kube-system + clusterid: test-cluster-south + clusterset: test-clusterset +leaderCluster: + namespace: antrea-multicluster + secret: member-access-token + server: https://172.18.0.3:6443 + clusterid: test-cluster-north + clusterset: test-clusterset +`, "\n") + +func NewJoinCommand() *cobra.Command { + command := &cobra.Command{ + Use: "join", + Short: "Join the ClusterSet from a member cluster", + Args: cobra.MaximumNArgs(0), + Example: joinExamples, + RunE: joinRunE, + } + + o := joinOptions{} + options = &o + command.Flags().StringVarP(&options.LeaderClusterOpt.Namespace, "leader-namespace", "", "", "Namespace of the leader cluster") + command.Flags().StringVarP(&options.LeaderClusterOpt.ClusterID, "leader-cluster-id", "", "", "Cluster ID of the leader cluster") + command.Flags().StringVarP(&options.LeaderClusterOpt.Secret, "secret", "", "", "Secret of the leader cluster") + command.Flags().StringVarP(&options.LeaderClusterOpt.Server, "leader-server", "", "", "Server to the leader cluster") + command.Flags().StringVarP(&options.MemberClusterOpt.ClusterSet, "cluster-set", "", "", "ClusterSet to join") + command.Flags().StringVarP(&options.MemberClusterOpt.Namespace, "namespace", "", "", "Namespace of the member cluster") + command.Flags().StringVarP(&options.MemberClusterOpt.ClusterID, "cluster-id", "", "", "Cluster ID of the member cluster") + command.Flags().StringVarP(&options.ConfigFile, "config", "f", "", "Config file of the member cluster") + + return command +} + +func joinRunE(cmd *cobra.Command, args []string) error { + if err := options.validateAndComplete(); err != nil { + return err + } + + kubeconfig, err := raw.ResolveKubeconfig(cmd) + if err != nil { + return err + } + restConfigTmpl := rest.CopyConfig(kubeconfig) + raw.SetupKubeconfig(restConfigTmpl) + + k8sClient, err := client.New(restConfigTmpl, client.Options{Scheme: multiclusterscheme.Scheme}) + if err != nil { + return err + } + + memberClusterNamespace := options.MemberClusterOpt.Namespace + memberClusterID := options.MemberClusterOpt.ClusterID + memberClusterSet := options.MemberClusterOpt.ClusterSet + + fmt.Fprintf(cmd.OutOrStdout(), "Creating ClusterClaim \"%s\" in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimID, memberClusterNamespace) + var createErr error + if createErr = k8sClient.Create(context.TODO(), newClusterClaim(memberClusterID, memberClusterNamespace)); createErr != nil { + if apierrors.IsAlreadyExists(createErr) { + fmt.Fprintf(cmd.OutOrStdout(), "ClusterClaim \"%s\" already exists in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimID, memberClusterNamespace) + createErr = nil + } else { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to create ClusterClaim \"%s\": %v\n", multiclusterv1alpha2.WellKnownClusterClaimID, createErr) + return createErr + } + } else { + fmt.Fprintf(cmd.OutOrStdout(), "ClusterClaim \"%s\" created in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimID, memberClusterNamespace) + defer func() { + if createErr != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Deleting ClusterClaim \"%s\" in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimID, memberClusterNamespace) + if err := k8sClient.Delete(context.TODO(), newClusterClaim(memberClusterID, memberClusterNamespace)); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to delete ClusterClaim \"%s\": %v\n", multiclusterv1alpha2.WellKnownClusterClaimID, err) + } else { + fmt.Fprintf(cmd.OutOrStdout(), "ClusterClaim \"%s\" deleted in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimID, memberClusterNamespace) + } + } + }() + } + + fmt.Fprintf(cmd.OutOrStdout(), "Creating ClusterClaim \"%s\" in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimClusterSet, memberClusterNamespace) + if createErr = k8sClient.Create(context.TODO(), newClusterSetClaim(options.MemberClusterOpt.ClusterSet, memberClusterNamespace)); createErr != nil { + if apierrors.IsAlreadyExists(createErr) { + fmt.Fprintf(cmd.OutOrStdout(), "ClusterClaim \"%s\" already exists in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimClusterSet, memberClusterNamespace) + createErr = nil + } else { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to create ClusterClaim \"%s\": %v\n", multiclusterv1alpha2.WellKnownClusterClaimClusterSet, createErr) + return createErr + } + } else { + fmt.Fprintf(cmd.OutOrStdout(), "ClusterClaim \"%s\" created in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimClusterSet, memberClusterNamespace) + defer func() { + if createErr != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Deleting ClusterClaim \"%s\" in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimClusterSet, memberClusterNamespace) + if err := k8sClient.Delete(context.TODO(), newClusterSetClaim(options.MemberClusterOpt.ClusterSet, memberClusterNamespace)); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to delete ClusterClaim \"%s\": %v\n", multiclusterv1alpha2.WellKnownClusterClaimClusterSet, err) + return + } + fmt.Fprintf(cmd.OutOrStdout(), "ClusterClaim deleted \"%s\" in Namespace %s\n", multiclusterv1alpha2.WellKnownClusterClaimClusterSet, memberClusterNamespace) + } + }() + } + + fmt.Fprintf(cmd.OutOrStdout(), "Creating ClusterSet \"%s\" in Namespace %s\n", memberClusterSet, memberClusterNamespace) + if createErr = k8sClient.Create(context.TODO(), newClusterSet(options.MemberClusterOpt.ClusterSet, memberClusterNamespace, options.LeaderClusterOpt.Server, + options.LeaderClusterOpt.Secret, memberClusterID, options.LeaderClusterOpt.ClusterID, options.LeaderClusterOpt.Namespace)); createErr != nil { + if apierrors.IsAlreadyExists(createErr) { + fmt.Fprintf(cmd.OutOrStdout(), "ClusterSet \"%s\" already exists in Namespace %s\n", memberClusterSet, memberClusterNamespace) + createErr = nil + } else { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to create ClusterSet \"%s\": %v\n", memberClusterSet, createErr) + return createErr + } + } else { + fmt.Fprintf(cmd.OutOrStdout(), "ClusterSet \"%s\" created in Namespace %s\n", memberClusterSet, memberClusterNamespace) + } + + fmt.Fprintf(cmd.OutOrStdout(), "Waiting for member cluster ready\n") + if err := waitForMemberClusterReady(cmd, k8sClient); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to wait for member cluster ready: %v\n", err) + return err + } + fmt.Fprintf(cmd.OutOrStdout(), "Member cluster joined successfully\n") + + return nil +} + +func waitForMemberClusterReady(cmd *cobra.Command, k8sClient client.Client) error { + fmt.Fprintf(cmd.OutOrStdout(), "Waiting for ClusterSet \"%s\" in Namespace %s to be registered in leader cluster\n", + options.MemberClusterOpt.ClusterSet, options.MemberClusterOpt.Namespace) + secret := &v1.Secret{} + if err := k8sClient.Get(context.TODO(), client.ObjectKey{Name: options.LeaderClusterOpt.Secret, Namespace: options.MemberClusterOpt.Namespace}, secret); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to get secret \"%s\" in Namespace %s: %v\n", options.LeaderClusterOpt.Secret, options.MemberClusterOpt.Namespace, err) + return err + } + + crtData, token, err := commonarea.GetSecretCACrtAndToken(secret) + if err != nil { + return err + } + config, err := clientcmd.BuildConfigFromFlags(options.LeaderClusterOpt.Server, "") + if err != nil { + return err + } + config.BearerToken = string(token) + config.CAData = crtData + + remoteClient, err := client.New(config, client.Options{Scheme: multiclusterscheme.Scheme}) + if err != nil { + return err + } + + memberClusterAnnounce := "member-announce-from-" + options.MemberClusterOpt.ClusterID + fmt.Fprintf(cmd.OutOrStdout(), "Waiting for member cluster \"%s\" in Namespace %s in leader cluster\n", options.MemberClusterOpt.ClusterID, options.MemberClusterOpt.Namespace) + if err := waitForMemberCluserAnnounceReady(remoteClient, memberClusterAnnounce, options.LeaderClusterOpt.Namespace); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to wait for MemberClusterAnnounce \"%s\" in Namespace %s in leader cluster: %v\n", memberClusterAnnounce, options.LeaderClusterOpt.Namespace, err) + return err + } + + if err := waitForLeaderClusterSetReady(remoteClient, options.LeaderClusterOpt.ClusterSet, options.LeaderClusterOpt.Namespace, options.MemberClusterOpt.ClusterID); err != nil { + fmt.Fprintf(cmd.OutOrStdout(), "Failed to wait for ClusterSet \"%s\" in Namespace %s in leader cluster: %v\n", options.LeaderClusterOpt.ClusterSet, options.LeaderClusterOpt.Namespace, err) + return err + } + + return nil +} + +func waitForMemberCluserAnnounceReady(client client.Client, memberClusterAnnounce string, namespace string) error { + return wait.PollImmediate( + 1*time.Second, + 30*time.Second, + func() (bool, error) { + announce := &multiclusterv1alpha1.MemberClusterAnnounce{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: memberClusterAnnounce, Namespace: namespace}, announce); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + return announce.DeletionTimestamp == nil, nil + }) +} + +func waitForLeaderClusterSetReady(client client.Client, name string, namespace string, memberClusterID string) error { + return wait.PollImmediate( + 1*time.Second, + 3*time.Minute, + func() (bool, error) { + clusterSet := &multiclusterv1alpha1.ClusterSet{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, clusterSet); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + + for _, member := range clusterSet.Status.ClusterStatuses { + if member.ClusterID == memberClusterID { + for _, cond := range member.Conditions { + if cond.Reason == "Connected" { + return cond.Status == "True", nil + } + } + } + } + + return false, nil + }) +} + +func newClusterClaim(name string, namespace string) *multiclusterv1alpha2.ClusterClaim { + return &multiclusterv1alpha2.ClusterClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: multiclusterv1alpha2.WellKnownClusterClaimID, + Namespace: namespace, + }, + Value: name, + } +} + +func newClusterSetClaim(name string, namespace string) *multiclusterv1alpha2.ClusterClaim { + return &multiclusterv1alpha2.ClusterClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: multiclusterv1alpha2.WellKnownClusterClaimClusterSet, + Namespace: namespace, + }, + Value: name, + } +} + +func newClusterSet(name string, namespace string, leaderServer string, secret string, memberClusterID string, leaderClusterID string, leaderNamespace string) *multiclusterv1alpha1.ClusterSet { + return &multiclusterv1alpha1.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: multiclusterv1alpha1.ClusterSetSpec{ + Members: []multiclusterv1alpha1.MemberCluster{ + { + ClusterID: memberClusterID, + }, + }, + Leaders: []multiclusterv1alpha1.MemberCluster{ + { + ClusterID: leaderClusterID, + Secret: secret, + Server: leaderServer, + }, + }, + Namespace: leaderNamespace, + }, + } +}