From 1f9d4e5b4404f7b3148a0b4a9780c52547f37c69 Mon Sep 17 00:00:00 2001 From: renxiangyu Date: Wed, 21 Feb 2024 10:13:45 +0800 Subject: [PATCH] feature: precheck ns and api-resources in sync leaf Signed-off-by: renxiangyu --- .../cluster-manager/app/manager.go | 18 +- .../cluster-manager/app/options/options.go | 8 + examples/promote_policy_demo.yaml | 17 +- .../controllers/promote/precheck/precheck.go | 321 ++++++++++++++++++ .../promote/promote_policy_controller.go | 47 ++- .../controllers/promote/requests/request.go | 2 + .../controllers/promote/utils/utils.go | 29 ++ .../manifest/manifest_deployments.go | 1 + pkg/utils/constants.go | 6 + 9 files changed, 421 insertions(+), 28 deletions(-) create mode 100644 pkg/clustertree/cluster-manager/controllers/promote/precheck/precheck.go create mode 100644 pkg/clustertree/cluster-manager/controllers/promote/utils/utils.go diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index f844967ea..c786ea7cb 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -241,13 +241,6 @@ func run(ctx context.Context, opts *options.Options) error { return fmt.Errorf("error starting root pvc controller %v", err) } - //promotePolicyController := controllers.PromotePolicyController{ - // Client: mgr.GetClient(), - //} - //if err := promotePolicyController.SetupWithManager(mgr); err != nil { - // return fmt.Errorf("error starting promotePolicyController %s: %v", controllers.PromotePolicyControllerName, err) - //} - rootPVController := pv.RootPVController{ RootClient: mgr.GetClient(), GlobalLeafManager: globalleafManager, @@ -277,11 +270,12 @@ func run(ctx context.Context, opts *options.Options) error { } //promotePolicyController := promote.PromotePolicyController{ - // RootClient: mgr.GetClient(), - // RootClientSet: rootClient, - // RootDynamicClient: dynamicClient, - // RootDiscoveryClient: discoveryClient, - // GlobalLeafManager: globalleafManager, + // RootClient: mgr.GetClient(), + // RootClientSet: rootClient, + // RootDynamicClient: dynamicClient, + // RootDiscoveryClient: discoveryClient, + // GlobalLeafManager: globalleafManager, + // PromotePolicyOptions: opts.PromotePolicyOptions, //} //if err = promotePolicyController.SetupWithManager(mgr); err != nil { // return fmt.Errorf("error starting %s: %v", promote.PromotePolicyControllerName, err) diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index a2b031f71..27a1ceb9a 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -50,6 +50,8 @@ type Options struct { BackoffOpts flags.BackoffOptions SyncPeriod time.Duration + + PromotePolicyOptions PromotePolicyOptions } type KubernetesOptions struct { @@ -59,6 +61,11 @@ type KubernetesOptions struct { Burst int `json:"burst,omitempty" yaml:"burst,omitempty"` } +type PromotePolicyOptions struct { + // ExcludeNamespaces are the ns name excluded by default when you need to sync leaf cluster resources + ForbidNamespaces []string +} + func NewOptions() (*Options, error) { var leaderElection componentbaseconfigv1alpha1.LeaderElectionConfiguration componentbaseconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&leaderElection) @@ -93,6 +100,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringSliceVar(&o.AutoCreateMCSPrefix, "auto-mcs-prefix", []string{}, "The prefix of namespace for service to auto create mcs resources") flags.StringSliceVar(&o.ReservedNamespaces, "reserved-namespaces", []string{"kube-system"}, "The namespaces protected by Kosmos that the controller-manager will skip.") flags.DurationVar(&o.SyncPeriod, "sync-period", 0, "the sync period for informer to resync.") + flags.StringSliceVar(&o.PromotePolicyOptions.ForbidNamespaces, "forbid-promote-namespace", []string{}, "This is forbidden to promote namespace") o.RateLimiterOpts.AddFlags(flags) o.BackoffOpts.AddFlags(flags) options.BindLeaderElectionFlags(&o.LeaderElection, flags) diff --git a/examples/promote_policy_demo.yaml b/examples/promote_policy_demo.yaml index 2440fced4..e87d6d4e5 100644 --- a/examples/promote_policy_demo.yaml +++ b/examples/promote_policy_demo.yaml @@ -4,15 +4,18 @@ metadata: name: promote-pilicy-sample spec: includedNamespaces: - - namespace1 - - namespace2 + - test + - kube-system excludedNamespaces: - - namespace3 + - kube-system includedNamespaceScopedResources: - - deployment - - service + - pods + - daemonsets.apps +# - serviceexports.multicluster.x-k8s.io + - nodeconfigs.kosmos.io excludedNamespaceScopedResources: - - pod +# - pods +# - nodeconfigs.kosmos.io clusterName: - cluster + cluster7 diff --git a/pkg/clustertree/cluster-manager/controllers/promote/precheck/precheck.go b/pkg/clustertree/cluster-manager/controllers/promote/precheck/precheck.go new file mode 100644 index 000000000..9ee9e891c --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/precheck/precheck.go @@ -0,0 +1,321 @@ +package precheck + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils" + constants "github.com/kosmos.io/kosmos/pkg/utils" +) + +type kubernetesPrecheck struct { + request *requests.PromoteRequest +} + +func NewKubernetesPrecheck(request *requests.PromoteRequest) (*kubernetesPrecheck, error) { + if request != nil { + return &kubernetesPrecheck{request: request}, nil + } else { + return nil, fmt.Errorf("request is nil") + } +} +func (kb *kubernetesPrecheck) Precheck() error { + // check namespace + err := checkNamespaces(kb.request, kb.request.ForbidNamespaces) + if err != nil { + return err + } + + // check ApiResources + err = checkApiResources(kb.request) + if err != nil { + return err + } + + return nil +} + +func checkApiResources(request *requests.PromoteRequest) error { + // judge k8s version + leafVersion, err := request.LeafDiscoveryClient.ServerVersion() + if err != nil { + return err + } + rootVersion, err := request.RootDiscoveryClient.ServerVersion() + if err != nil { + return err + } + if !strings.EqualFold(leafVersion.GitVersion, rootVersion.GitVersion) { + return fmt.Errorf("kubernetes version is not same in leaf cluster and rootcluster") + } + + includedResources := request.Spec.IncludedNamespaceScopedResources + excludedResources := request.Spec.ExcludedNamespaceScopedResources + + for _, excludedResource := range excludedResources { + if excludedResource == "*" { + return fmt.Errorf("precheck failed, excluded resources has \"*\" ") + } + } + + for _, includedResource := range includedResources { + // add all resources to includedResources + if includedResource == "*" { + // gets all preferred api resources for the leaf cluster + leafApiResourcesMap, err := getApiResourcesMap(request.LeafClientSet) + if err != nil { + return fmt.Errorf("precheck failed, getApiResourcesMap in leaf cluster fauled, err: %s", err) + } + var tmp []string + for name := range leafApiResourcesMap { + tmp = append(tmp, name) + } + includedResources = tmp + break + } + } + + // needsStringMap is excludedResources converted into map + excludeMap, err := utils.ToMapSetE(excludedResources) + if err != nil { + return fmt.Errorf("includedResources convert to map failed, err: %s", err) + } + excludeStringMap := make(map[string]string) + for _, value := range excludeMap.(map[interface{}]interface{}) { + valueString := value.(string) + excludeStringMap[valueString] = valueString + } + + // get all native api resources + nativeApiResourcesMap, err := getNativeApiResourcesMap(request.LeafClientSet, request.LeafDynamicClient) + if err != nil { + return fmt.Errorf("get native api resource failed, err: %s", err) + } + + // get all crds in leaf + leafCRDList, err := listCRD(request.LeafDynamicClient) + if err != nil { + return fmt.Errorf("leaf client get crd failed, err: %s", err) + } + leafCRDMap, err := utils.ToMapSetE(leafCRDList) + if err != nil { + return fmt.Errorf("includedResources convert to map failed, err: %s", err) + } + leafCRDStringMap := make(map[string]*apiextensionsv1.CustomResourceDefinition) + for _, value := range leafCRDMap.(map[interface{}]interface{}) { + crd := value.(*apiextensionsv1.CustomResourceDefinition) + leafCRDStringMap[crd.Name] = crd + } + + // get all crds in root + rootCRDList, err := listCRD(request.RootDynamicClient) + if err != nil { + return fmt.Errorf("root client get crd failed, err: %s", err) + } + rootCRDMap, err := utils.ToMapSetE(rootCRDList) + if err != nil { + return fmt.Errorf("includedResources convert to map failed, err: %s", err) + } + rootCRDStringMap := make(map[string]*apiextensionsv1.CustomResourceDefinition) + for _, value := range rootCRDMap.(map[interface{}]interface{}) { + crd := value.(*apiextensionsv1.CustomResourceDefinition) + rootCRDStringMap[crd.Name] = crd + } + + // judge whether the preferred version of resources for root cluster and leaf cluster is the same + for _, indcludeResource := range includedResources { + // not judge excluded resource + if _, ok := excludeStringMap[indcludeResource]; ok { + continue + } + // not judge native api resource + if _, ok := nativeApiResourcesMap[indcludeResource]; ok { + continue + } + + leafCRD, ok := leafCRDStringMap[indcludeResource] + if ok { + return fmt.Errorf("crd %s do not exist in the leaf cluster", leafCRD.Name) + } + rootCRD, ok := rootCRDStringMap[indcludeResource] + if ok { + return fmt.Errorf("crd %s do not exist in the root cluster", rootCRD.Name) + } + if !strings.EqualFold(leafCRD.Spec.Versions[0].Name, rootCRD.Spec.Versions[0].Name) { + return fmt.Errorf("crd %s version is different in that it is %s in leaf cluster and %s in root cluster", + rootCRD.Name, leafCRD.Spec.Versions[0].Name, rootCRD.Spec.Versions[0].Name) + } + } + + return nil +} + +func getNativeApiResourcesMap(clientSet kubernetes.Interface, dynamicClient dynamic.Interface) (map[string]string, error) { + nativeApiResourcesMap, err := getApiResourcesMap(clientSet) + if err != nil { + return nil, fmt.Errorf("precheck failed, getApiResourcesMap in leaf cluster fauled, err: %s", err) + } + + leafCRDList, err := listCRD(dynamicClient) + if err != nil { + return nil, fmt.Errorf("leaf client get crd failed, err: %s", err) + } + for _, crd := range leafCRDList { + delete(nativeApiResourcesMap, crd.Name) + } + return nativeApiResourcesMap, nil +} + +// getApiResourcesMap gets all preferred api resources for cluster +func getApiResourcesMap(clientSet kubernetes.Interface) (map[string]string, error) { + apiResources, err := clientSet.Discovery().ServerPreferredResources() + if err != nil { + return nil, fmt.Errorf("get api-reources in leaf failed, err: %s", err) + } + apiResourcesMap, err := utils.ToMapSetE(apiResources) + if err != nil { + return nil, fmt.Errorf("apiResources convert to map failed, err: %s", err) + } + apiResourcesStringMap := make(map[string]string) + for _, value := range apiResourcesMap.(map[interface{}]interface{}) { + valueString := value.(*metav1.APIResourceList) + groupVersion := valueString.GroupVersion + var group, version string + if i := strings.Index(valueString.GroupVersion, "/"); i >= 0 { + group = groupVersion[:i] + version = groupVersion[i+1:] + } else { + group = "" + version = groupVersion + } + for _, resource := range valueString.APIResources { + nameGroup := resource.Name + if group != "" { + nameGroup = fmt.Sprintf("%s.%s", nameGroup, group) + } + apiResourcesStringMap[nameGroup] = version + } + } + return apiResourcesStringMap, nil +} + +func checkNamespaces(request *requests.PromoteRequest, forbidNamespace []string) error { + includes := request.NamespaceIncludesExcludes.GetIncludes() + excludes := request.NamespaceIncludesExcludes.GetExcludes() + leafNamespaceList, err := request.LeafClientSet.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + rootNamespaceList, err := request.RootClientSet.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + + // it is meaningless to include * in exclude + for _, exclude := range excludes { + if exclude == "*" { + return fmt.Errorf("precheck failed, excludes has \"*\" ") + } + } + + for _, include := range includes { + // add all resources to includes + if include == "*" { + var tmp []string + for _, item := range leafNamespaceList.Items { + tmp = append(tmp, item.Name) + } + includes = tmp + break + } + } + + // needsStringMap removes namespace from exclude + needsMap, err := utils.ToMapSetE(includes) + if err != nil { + return fmt.Errorf("includes convert to map failed, err: %s", err) + } + needsStringMap := make(map[string]string) + for _, value := range needsMap.(map[interface{}]interface{}) { + valueString := value.(string) + needsStringMap[valueString] = valueString + } + + for _, exclude := range excludes { + value, found := needsStringMap[exclude] + if !found { + return fmt.Errorf("excludes has wrong namespace: %s", value) + } + delete(needsStringMap, exclude) + } + + for _, forbid := range forbidNamespace { + if _, ok := needsStringMap[forbid]; ok { + return fmt.Errorf("promote this %s namesapcethe is forbidden", forbid) + } + } + + // judge whether the leaf cluster contains the namespace + leafNamespaceMap, err := utils.ToMapSetE(leafNamespaceList.Items) + if err != nil { + return fmt.Errorf("leafNamespaceList convert to map failed, err: %s", err) + } + leafNamespaceStringMap := make(map[string]corev1.Namespace) + for _, value := range leafNamespaceMap.(map[interface{}]interface{}) { + namespace := value.(corev1.Namespace) + leafNamespaceStringMap[namespace.Name] = namespace + } + + for _, need := range needsStringMap { + if _, ok := leafNamespaceStringMap[need]; !ok { + return fmt.Errorf("precheck failed, leaf cluster don't have this namespace: %s", need) + } + } + + // judge whether the master cluster already contains the namespace in include + rootNamespaceMap, err := utils.ToMapSetE(rootNamespaceList.Items) + if err != nil { + return fmt.Errorf("rootNamespaceList convert to map failed, err: %s", err) + } + rootNamespaceStringMap := make(map[string]corev1.Namespace) + for _, value := range rootNamespaceMap.(map[interface{}]interface{}) { + namespace := value.(corev1.Namespace) + rootNamespaceStringMap[namespace.Name] = namespace + } + for _, need := range needsStringMap { + if _, ok := rootNamespaceStringMap[need]; ok { + return fmt.Errorf("precheck failed, the same namespace exists for the master cluster and leaf cluster: %s", need) + } + } + return nil +} + +// listCRD retrieves the list of crds from Kubernetes. +func listCRD(dynamicClient dynamic.Interface) ([]*apiextensionsv1.CustomResourceDefinition, error) { + objs, err := dynamicClient.Resource(constants.GVR_CRD).List(context.TODO(), metav1.ListOptions{}) + + if err != nil { + return nil, err + } + + retObj := make([]*apiextensionsv1.CustomResourceDefinition, 0) + + for _, obj := range objs.Items { + tmpObj := &apiextensionsv1.CustomResourceDefinition{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &tmpObj); err != nil { + return nil, err + } + retObj = append(retObj, tmpObj) + } + + return retObj, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go b/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go index cfaae4a1e..37e6d4545 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/promote_policy_controller.go @@ -16,10 +16,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/backup" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/constants" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/detach" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/precheck" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/requests" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/restore" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/promote/utils/collections" @@ -32,11 +34,12 @@ const ( ) type PromotePolicyController struct { - RootClient client.Client - RootClientSet kubernetes.Interface - RootDynamicClient *dynamic.DynamicClient - RootDiscoveryClient *discovery.DiscoveryClient - GlobalLeafManager leafUtils.LeafResourceManager + RootClient client.Client + RootClientSet kubernetes.Interface + RootDynamicClient *dynamic.DynamicClient + RootDiscoveryClient *discovery.DiscoveryClient + GlobalLeafManager leafUtils.LeafResourceManager + PromotePolicyOptions options.PromotePolicyOptions } func (p *PromotePolicyController) SetupWithManager(mgr ctrl.Manager) error { @@ -76,6 +79,16 @@ func (p *PromotePolicyController) Reconcile(ctx context.Context, request reconci return reconcile.Result{}, fmt.Errorf("error prepareSyncRequest: %v", err) } + err = runPrecheck(promoteRequest) + if err != nil { + promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedPrecheck + promoteRequest.Status.FailureReason = err.Error() + if err := p.RootClient.Patch(context.TODO(), promoteRequest.PromotePolicy, client.MergeFrom(original)); err != nil { + klog.Errorf("error updating syncleaf %s final status", original.Name) + } + return reconcile.Result{}, err + } + backupFile, err := runBackup(promoteRequest) if err != nil { promoteRequest.Status.Phase = v1alpha1.PromotePolicyPhaseFailedBackup @@ -117,14 +130,14 @@ func (p *PromotePolicyController) Reconcile(ctx context.Context, request reconci return reconcile.Result{}, nil } -func (s *PromotePolicyController) preparePromoteRequest(promote *v1alpha1.PromotePolicy, lf *leafUtils.LeafResource) (*requests.PromoteRequest, error) { +func (p *PromotePolicyController) preparePromoteRequest(promote *v1alpha1.PromotePolicy, lf *leafUtils.LeafResource) (*requests.PromoteRequest, error) { // todo validate params request := &requests.PromoteRequest{ PromotePolicy: promote.DeepCopy(), - RootClientSet: s.RootClientSet, - RootDynamicClient: s.RootDynamicClient, - RootDiscoveryClient: s.RootDiscoveryClient, + RootClientSet: p.RootClientSet, + RootDynamicClient: p.RootDynamicClient, + RootDiscoveryClient: p.RootDiscoveryClient, LeafClientSet: lf.Clientset, LeafDynamicClient: lf.DynamicClient, LeafDiscoveryClient: lf.DiscoveryClient, @@ -132,10 +145,26 @@ func (s *PromotePolicyController) preparePromoteRequest(promote *v1alpha1.Promot BackedUpItems: make(map[requests.ItemKey]struct{}), DetachedItems: make(map[requests.ItemKey]struct{}), RestoredItems: make(map[requests.ItemKey]struct{}), + ForbidNamespaces: p.PromotePolicyOptions.ForbidNamespaces, } return request, nil } +func runPrecheck(promoteRequest *requests.PromoteRequest) error { + klog.Info("start precheck...") + prechecker, err := precheck.NewKubernetesPrecheck(promoteRequest) + if err != nil { + return errors.Wrap(err, "error new precheck instance") + } + + err = prechecker.Precheck() + if err != nil { + return errors.Wrap(err, "error precheck") + } + + return nil +} + func runBackup(promoteRequest *requests.PromoteRequest) (file string, err error) { klog.Info("Setting up backup temp file") filePath := constants.BackupDir + time.Now().Format("20060102-150405") diff --git a/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go b/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go index 3180b2858..04c512c50 100644 --- a/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go +++ b/pkg/clustertree/cluster-manager/controllers/promote/requests/request.go @@ -58,6 +58,8 @@ type PromoteRequest struct { BackedUpItems map[ItemKey]struct{} DetachedItems map[ItemKey]struct{} RestoredItems map[ItemKey]struct{} + + ForbidNamespaces []string } // ResourceSelector is a collection of included/excluded namespaces, diff --git a/pkg/clustertree/cluster-manager/controllers/promote/utils/utils.go b/pkg/clustertree/cluster-manager/controllers/promote/utils/utils.go new file mode 100644 index 000000000..f2cce784f --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/promote/utils/utils.go @@ -0,0 +1,29 @@ +package utils + +import ( + "fmt" + "reflect" +) + +// ToMapSetE converts a slice or array to map[interface{}]interface{} with error +// interface{} is slice's item +func ToMapSetE(i interface{}) (interface{}, error) { + // judge the validation of the input + if i == nil { + return nil, fmt.Errorf("unable to convert %#v of type %T to map[interface{}]interface{}", i, i) + } + kind := reflect.TypeOf(i).Kind() + if kind != reflect.Slice && kind != reflect.Array { + return nil, fmt.Errorf("the input %#v of type %T isn't a slice or array", i, i) + } + + // execute the convert + v := reflect.ValueOf(i) + m := make(map[interface{}]interface{}) + for j := 0; j < v.Len(); j++ { + value := v.Index(j).Interface() + key := fmt.Sprintf("%v", v.Index(j).Interface()) + m[key] = value + } + return m, nil +} diff --git a/pkg/kosmosctl/manifest/manifest_deployments.go b/pkg/kosmosctl/manifest/manifest_deployments.go index 864e6e1cf..a1d5f0efb 100644 --- a/pkg/kosmosctl/manifest/manifest_deployments.go +++ b/pkg/kosmosctl/manifest/manifest_deployments.go @@ -136,6 +136,7 @@ spec: command: - clustertree-cluster-manager - --multi-cluster-service=true + - --forbid-promote-namespace=kube-node-lease,kube-public,kube-system - --v=4 volumes: - name: credentials diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index 145496536..2e6fe0217 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -190,3 +190,9 @@ var GVR_SERVICE = schema.GroupVersionResource{ Version: "v1", Resource: "services", } + +var GVR_CRD = schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", +}