From cf186c1e2daa87baa350bfbca26dfaab073dad9a Mon Sep 17 00:00:00 2001 From: duanmengkk Date: Sat, 10 Aug 2024 22:15:10 +0800 Subject: [PATCH] add direct sync service feature Signed-off-by: duanmengkk --- .../cluster-manager/app/manager.go | 26 ++ .../cluster-manager/app/options/options.go | 12 +- .../cluster-manager/cluster_controller.go | 14 +- .../simple_sync_endpointslice_controller.go | 322 ++++++++++++++++ .../svc/simple_sync_service_controller.go | 354 ++++++++++++++++++ .../utils/leaf_resource_manager.go | 1 + 6 files changed, 719 insertions(+), 10 deletions(-) create mode 100644 pkg/clustertree/cluster-manager/controllers/svc/simple_sync_endpointslice_controller.go create mode 100644 pkg/clustertree/cluster-manager/controllers/svc/simple_sync_service_controller.go diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index a3e89bb3f..19435289a 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -24,6 +24,7 @@ import ( podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pv" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pvc" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/svc" nodeserver "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/node-server" leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/scheme" @@ -289,6 +290,31 @@ func run(ctx context.Context, opts *options.Options) error { } } + // init direct sync service and endpointslice controller + if opts.DirectClusterService { + simpleSyncServiceController := &svc.SimpleSyncServiceController{ + RootClient: mgr.GetClient(), + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, + AutoCreateMCSPrefix: opts.AutoCreateMCSPrefix, + ReservedNamespaces: opts.ReservedNamespaces, + } + if err := simpleSyncServiceController.SetupWithManager(mgr); err != nil { + return fmt.Errorf("error starting %s: %v", svc.SimpleSyncServiceControllerName, err) + } + + simpleSyncEpsController := &svc.SimpleSyncEPSController{ + RootClient: mgr.GetClient(), + GlobalLeafManager: globalLeafResourceManager, + GlobalLeafClientManager: globalLeafClientManager, + AutoCreateMCSPrefix: opts.AutoCreateMCSPrefix, + ReservedNamespaces: opts.ReservedNamespaces, + BackoffOptions: opts.BackoffOpts, + } + if err := simpleSyncEpsController.SetupWithManager(mgr); err != nil { + return fmt.Errorf("error starting %s: %v", svc.SimpleSyncEPSControllerName, err) + } + } go func() { if err = mgr.Start(ctx); err != nil { klog.Errorf("failed to start controller manager: %v", err) diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index 5bc7e7d98..653463789 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -25,11 +25,12 @@ const ( ) type Options struct { - LeaderElection componentbaseconfig.LeaderElectionConfiguration - KubernetesOptions KubernetesOptions - ListenPort int32 - DaemonSetController bool - MultiClusterService bool + LeaderElection componentbaseconfig.LeaderElectionConfiguration + KubernetesOptions KubernetesOptions + ListenPort int32 + DaemonSetController bool + MultiClusterService bool + DirectClusterService bool // If MultiClusterService is disabled, the clustertree will rewrite the dnsPolicy configuration for pods deployed in // the leaf clusters, directing them to the root cluster's CoreDNS, thus facilitating access to services across all @@ -88,6 +89,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.Int32Var(&o.ListenPort, "listen-port", 10250, "Listen port for requests from the kube-apiserver.") flags.BoolVar(&o.DaemonSetController, "daemonset-controller", false, "Turn on or off daemonset controller.") flags.BoolVar(&o.MultiClusterService, "multi-cluster-service", false, "Turn on or off mcs support.") + flags.BoolVar(&o.DirectClusterService, "direct-cluster-service", false, "Turn on or off direct cluster service.") flags.StringVar(&o.RootCoreDNSServiceNamespace, "root-coredns-service-namespace", CoreDNSServiceNamespace, "The namespace of the CoreDNS service in the root cluster, used to locate the CoreDNS service when MultiClusterService is disabled.") flags.StringVar(&o.RootCoreDNSServiceName, "root-coredns-service-name", CoreDNSServiceName, "The name of the CoreDNS service in the root cluster, used to locate the CoreDNS service when MultiClusterService is disabled.") flags.BoolVar(&o.OnewayStorageControllers, "oneway-storage-controllers", false, "Turn on or off oneway storage controllers.") diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index b430516a7..eff3d56fb 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -234,7 +234,9 @@ func (c *ClusterController) clearClusterControllers(cluster *kosmosv1alpha1.Clus delete(c.ManagerCancelFuncs, cluster.Name) delete(c.ControllerManagers, cluster.Name) + actualClusterName := leafUtils.GetActualClusterName(cluster) c.GlobalLeafResourceManager.RemoveLeafResource(cluster.Name) + c.GlobalLeafResourceManager.RemoveLeafResource(actualClusterName) } func (c *ClusterController) setupControllers( @@ -252,6 +254,7 @@ func (c *ClusterController) setupControllers( Namespace: "", IgnoreLabels: strings.Split("", ","), EnableServiceAccount: true, + IPFamilyType: cluster.Spec.ClusterLinkOptions.IPFamily, }, nodes) c.GlobalLeafClientManager.AddLeafClientResource(&leafUtils.LeafClientResource{ @@ -283,11 +286,12 @@ func (c *ClusterController) setupControllers( if c.Options.MultiClusterService { serviceImportController := &mcs.ServiceImportController{ - LeafClient: mgr.GetClient(), - LeafKosmosClient: leafKosmosClient, - EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName), - Logger: mgr.GetLogger(), - LeafNodeName: cluster.Name, + LeafClient: mgr.GetClient(), + LeafKosmosClient: leafKosmosClient, + EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName), + Logger: mgr.GetLogger(), + LeafNodeName: cluster.Name, + // todo Null pointer exception ? IPFamilyType: cluster.Spec.ClusterLinkOptions.IPFamily, RootResourceManager: c.RootResourceManager, ReservedNamespaces: c.Options.ReservedNamespaces, diff --git a/pkg/clustertree/cluster-manager/controllers/svc/simple_sync_endpointslice_controller.go b/pkg/clustertree/cluster-manager/controllers/svc/simple_sync_endpointslice_controller.go new file mode 100644 index 000000000..f183b382c --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/svc/simple_sync_endpointslice_controller.go @@ -0,0 +1,322 @@ +package svc + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + "k8s.io/utils/strings/slices" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + clustertreeutils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" + "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/flags" + "github.com/kosmos.io/kosmos/pkg/utils/helper" +) + +const SimpleSyncEPSControllerName = "simple-sync-endpointslice-controller" + +// SimpleSyncEPSController watches services in root cluster and sync endpointSlice to leaf cluster directly +type SimpleSyncEPSController struct { + RootClient client.Client + GlobalLeafManager clustertreeutils.LeafResourceManager + GlobalLeafClientManager clustertreeutils.LeafClientResourceManager + // AutoCreateMCSPrefix are the prefix of the namespace for endpointSlice to auto create in leaf cluster + AutoCreateMCSPrefix []string + // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources + ReservedNamespaces []string + BackoffOptions flags.BackoffOptions +} + +func (c *SimpleSyncEPSController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + klog.V(4).Infof("============ %s starts to reconcile %s ============", SimpleSyncEPSControllerName, request.NamespacedName.String()) + defer func() { + klog.V(4).Infof("============ %s has been reconciled =============", request.NamespacedName.String()) + }() + + var shouldDelete bool + eps := &discoveryv1.EndpointSlice{} + if err := c.RootClient.Get(ctx, request.NamespacedName, eps); err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("Cloud not get endpointSlice in root cluster,Error: %v", err) + return controllerruntime.Result{Requeue: true}, err + } + shouldDelete = true + } + + // The eps is not found in root cluster, we should delete it in leaf cluster. + if shouldDelete || !eps.DeletionTimestamp.IsZero() { + if err := c.cleanUpEpsInLeafCluster(request.Namespace, request.Name); err != nil { + klog.Errorf("Cleanup MCS resources failed, err: %v", err) + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err + } + return controllerruntime.Result{}, nil + } + + serviceName := helper.GetLabelOrAnnotationValue(eps.GetLabels(), utils.ServiceKey) + + service := &corev1.Service{} + if err := c.RootClient.Get(ctx, types.NamespacedName{Namespace: request.Namespace, Name: serviceName}, service); err != nil { + if apierrors.IsNotFound(err) { + klog.Errorf("Service %s/%s not found,ignore it, err: %v", request.Namespace, serviceName, err) + return controllerruntime.Result{}, nil + } else { + klog.Errorf("Get service %s/%s failed, err: %v", request.Namespace, serviceName, err) + return controllerruntime.Result{Requeue: true}, err + } + } + if !hasAutoMCSAnnotation(service) && !shouldEnqueueEps(eps, c.AutoCreateMCSPrefix, c.ReservedNamespaces) { + klog.V(4).Infof("Service %s/%s does not have auto mcs annotation and should not be enqueued, ignore it", request.Namespace, serviceName) + return controllerruntime.Result{}, nil + } + + err := c.syncEpsInLeafCluster(eps, serviceName) + if err != nil { + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err + } + return controllerruntime.Result{}, nil +} + +func (c *SimpleSyncEPSController) shouldEnqueue(endpointSlice *discoveryv1.EndpointSlice) bool { + if slices.Contains(c.ReservedNamespaces, endpointSlice.Namespace) { + return false + } + + if len(c.AutoCreateMCSPrefix) > 0 { + for _, prefix := range c.AutoCreateMCSPrefix { + if strings.HasPrefix(endpointSlice.GetNamespace(), prefix) { + return true + } + } + } + return false +} + +func shouldEnqueueEps(endpointSlice *discoveryv1.EndpointSlice, autoPrefix, reservedNamespaces []string) bool { + if slices.Contains(reservedNamespaces, endpointSlice.Namespace) { + return false + } + + if len(autoPrefix) > 0 { + for _, prefix := range autoPrefix { + if strings.HasPrefix(endpointSlice.GetNamespace(), prefix) { + return true + } + } + } + return false +} + +func (c *SimpleSyncEPSController) SetupWithManager(mgr manager.Manager) error { + epsPredicate := builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + endpointSlice, ok := event.Object.(*discoveryv1.EndpointSlice) + if !ok { + return false + } + + return c.shouldEnqueue(endpointSlice) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + endpointSlice, ok := deleteEvent.Object.(*discoveryv1.EndpointSlice) + if !ok { + return false + } + + return c.shouldEnqueue(endpointSlice) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + newEps, ok := updateEvent.ObjectNew.(*discoveryv1.EndpointSlice) + if !ok { + return false + } + + _, ok = updateEvent.ObjectOld.(*discoveryv1.EndpointSlice) + if !ok { + return false + } + + return c.shouldEnqueue(newEps) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + }, + ) + + return controllerruntime.NewControllerManagedBy(mgr). + For(&discoveryv1.EndpointSlice{}, epsPredicate). + Complete(c) +} + +func (c *SimpleSyncEPSController) createNamespace(client client.Client, namespace string) error { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + } + err := client.Create(context.TODO(), ns) + if err != nil { + return err + } + return nil +} + +// nolint:dupl +func (c *SimpleSyncEPSController) cleanUpEpsInLeafCluster(namespace string, name string) error { + clusters := c.GlobalLeafManager.ListClusters() + var errs []string + for _, cluster := range clusters { + leafClient, err := c.GlobalLeafManager.GetLeafResource(cluster) + if err != nil { + klog.Errorf("Failed to get leaf client for cluster %s: %v", cluster, err) + errs = append(errs, fmt.Sprintf("get leaf client for cluster %s: %v", cluster, err)) + continue + } + + lcr, err := c.leafClientResource(leafClient) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", leafClient.Cluster.Name) + errs = append(errs, fmt.Sprintf("get leaf client resource %v", leafClient.Cluster.Name)) + continue + } + + err = lcr.Clientset.DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + klog.Errorf("Failed to delete endpointSlice %s in cluster %s: %v", name, cluster, err) + errs = append(errs, fmt.Sprintf("delete endpointSlice %s in cluster %s: %v", name, cluster, err)) + } + } + + if len(errs) > 0 { + return errors.New("errors encountered: " + strings.Join(errs, "; ")) + } + return nil +} + +func (c *SimpleSyncEPSController) syncEpsInLeafCluster(eps *discoveryv1.EndpointSlice, serviceName string) error { + endpointSlice := eps.DeepCopy() + + clusters := c.GlobalLeafManager.ListClusters() + errsChan := make(chan string, len(clusters)) + var wg sync.WaitGroup + for _, cluster := range clusters { + wg.Add(1) + go func(cluster, serviceName string) { + defer wg.Done() + leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster) + if err != nil { + errsChan <- fmt.Sprintf("get leaf client for cluster %s: %v", cluster, err) + return + } + + lcr, err := c.leafClientResource(leafManager) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", leafManager.Cluster.Name) + errsChan <- fmt.Sprintf("get leaf client resource %v", leafManager.Cluster.Name) + return + } + + if endpointSlice.AddressType == discoveryv1.AddressTypeIPv4 && leafManager.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV6 || + endpointSlice.AddressType == discoveryv1.AddressTypeIPv6 && leafManager.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV4 { + klog.Warningf("The endpointSlice's AddressType is not match leaf cluster %s IPFamilyType,so ignore it", cluster) + klog.Errorf("The endpointSlice's AddressType is not match leaf cluster %s IPFamilyType,so ignore it", cluster) + return + } + + if err = c.createNamespace(lcr.Client, endpointSlice.Namespace); err != nil && !apierrors.IsAlreadyExists(err) { + errsChan <- fmt.Sprintf("Create namespace %s in leaf cluster %s failed: %v", endpointSlice.Namespace, cluster, err) + return + } + + newSlice := retainEndpointSlice(endpointSlice, serviceName) + + if err = lcr.Client.Create(context.TODO(), newSlice); err != nil { + if apierrors.IsAlreadyExists(err) { + if err = c.updateEndpointSlice(newSlice, lcr); err != nil { + errsChan <- fmt.Sprintf("Update endpointSlice %s in leaf cluster %s failed: %v", newSlice.Name, cluster, err) + return + } + } else { + errsChan <- fmt.Sprintf("Create endpointSlice %s in leaf cluster %s failed: %v", newSlice.Name, cluster, err) + return + } + } + }(cluster, serviceName) + } + wg.Wait() + close(errsChan) + + var errs []string + for err := range errsChan { + errs = append(errs, err) + } + if len(errs) > 0 { + return errors.New("errors encountered: " + strings.Join(errs, "; ")) + } + return nil +} + +func (c *SimpleSyncEPSController) updateEndpointSlice(slice *discoveryv1.EndpointSlice, leafManager *clustertreeutils.LeafClientResource) error { + eps := slice.DeepCopy() + return retry.RetryOnConflict(flags.DefaultUpdateRetryBackoff(c.BackoffOptions), func() error { + updateErr := leafManager.Client.Update(context.TODO(), eps) + if apierrors.IsNotFound(updateErr) { + return nil + } + if updateErr == nil { + return nil + } + klog.Errorf("Failed to update endpointSlice %s/%s: %v", eps.Namespace, eps.Name, updateErr) + newEps := &discoveryv1.EndpointSlice{} + getErr := leafManager.Client.Get(context.TODO(), client.ObjectKey{Namespace: eps.Namespace, Name: eps.Name}, newEps) + if getErr == nil { + //Make a copy, so we don't mutate the shared cache + eps = newEps.DeepCopy() + } else { + if apierrors.IsNotFound(getErr) { + return nil + } else { + klog.Errorf("Failed to get updated endpointSlice %s/%s: %v", eps.Namespace, eps.Name, getErr) + } + } + + return updateErr + }) +} + +func (c *SimpleSyncEPSController) leafClientResource(lr *clustertreeutils.LeafResource) (*clustertreeutils.LeafClientResource, error) { + actualClusterName := clustertreeutils.GetActualClusterName(lr.Cluster) + lcr, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} + +func retainEndpointSlice(original *discoveryv1.EndpointSlice, serviceName string) *discoveryv1.EndpointSlice { + endpointSlice := original.DeepCopy() + endpointSlice.ObjectMeta = metav1.ObjectMeta{ + Namespace: original.Namespace, + Name: original.Name, + } + helper.AddEndpointSliceLabel(endpointSlice, utils.ServiceKey, serviceName) + return endpointSlice +} diff --git a/pkg/clustertree/cluster-manager/controllers/svc/simple_sync_service_controller.go b/pkg/clustertree/cluster-manager/controllers/svc/simple_sync_service_controller.go new file mode 100644 index 000000000..9e14017f4 --- /dev/null +++ b/pkg/clustertree/cluster-manager/controllers/svc/simple_sync_service_controller.go @@ -0,0 +1,354 @@ +package svc + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "k8s.io/utils/strings/slices" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + clustertreeutils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" + "github.com/kosmos.io/kosmos/pkg/utils" +) + +const SimpleSyncServiceControllerName = "simple-sync-service-controller" + +// SimpleSyncServiceController watches services in root cluster and sync service to leaf cluster directly +type SimpleSyncServiceController struct { + RootClient client.Client + GlobalLeafManager clustertreeutils.LeafResourceManager + GlobalLeafClientManager clustertreeutils.LeafClientResourceManager + + // AutoCreateMCSPrefix are the prefix of the namespace for service to auto create in leaf cluster + AutoCreateMCSPrefix []string + // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources + ReservedNamespaces []string +} + +func (c *SimpleSyncServiceController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + klog.V(4).Infof("============ %s starts to reconcile %s ============", SimpleSyncServiceControllerName, request.NamespacedName.String()) + defer func() { + klog.V(4).Infof("============ %s has been reconciled =============", request.NamespacedName.String()) + }() + + var shouldDelete bool + service := &corev1.Service{} + if err := c.RootClient.Get(ctx, request.NamespacedName, service); err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("Cloud not get service in root cluster,Error: %v", err) + return controllerruntime.Result{Requeue: true}, err + } + shouldDelete = true + } + + // The service is being deleted, in which case we should clear service in all leaf cluster + if shouldDelete || !service.DeletionTimestamp.IsZero() { + if err := c.cleanUpServiceInLeafCluster(request.Namespace, request.Name); err != nil { + klog.Errorf("Cleanup service failed, err: %v", err) + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err + } + return controllerruntime.Result{}, nil + } + + err := c.syncServiceInLeafCluster(service) + if err != nil { + klog.Errorf("Sync service failed, err: %v", err) + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err + } + return controllerruntime.Result{}, nil +} + +func hasAutoMCSAnnotation(service *corev1.Service) bool { + annotations := service.GetAnnotations() + if annotations == nil { + return false + } + if _, exists := annotations[utils.AutoCreateMCSAnnotation]; exists { + return true + } + return false +} + +func (c *SimpleSyncServiceController) shouldEnqueue(service *corev1.Service) bool { + if slices.Contains(c.ReservedNamespaces, service.Namespace) { + return false + } + + if len(c.AutoCreateMCSPrefix) > 0 { + for _, prefix := range c.AutoCreateMCSPrefix { + if strings.HasPrefix(service.GetNamespace(), prefix) { + return true + } + } + } + + if hasAutoMCSAnnotation(service) { + return true + } + return false +} + +func (c *SimpleSyncServiceController) SetupWithManager(mgr manager.Manager) error { + servicePredicate := builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + service, ok := event.Object.(*corev1.Service) + if !ok { + return false + } + + return c.shouldEnqueue(service) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + service, ok := deleteEvent.Object.(*corev1.Service) + if !ok { + return false + } + + return c.shouldEnqueue(service) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + newService, ok := updateEvent.ObjectNew.(*corev1.Service) + if !ok { + return false + } + + _, ok = updateEvent.ObjectOld.(*corev1.Service) + if !ok { + return false + } + + return c.shouldEnqueue(newService) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + }, + ) + + return controllerruntime.NewControllerManagedBy(mgr). + For(&corev1.Service{}, servicePredicate). + Complete(c) +} + +func (c *SimpleSyncServiceController) createNamespace(client client.Client, namespace string) error { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + } + err := client.Create(context.TODO(), ns) + if err != nil { + return err + } + return nil +} + +// nolint:dupl +func (c *SimpleSyncServiceController) cleanUpServiceInLeafCluster(namespace string, name string) error { + clusters := c.GlobalLeafManager.ListClusters() + var errs []string + for _, cluster := range clusters { + leafClient, err := c.GlobalLeafManager.GetLeafResource(cluster) + if err != nil { + klog.Errorf("Failed to get leaf client for cluster %s: %v", cluster, err) + errs = append(errs, fmt.Sprintf("get leaf client for cluster %s: %v", cluster, err)) + continue + } + + lcr, err := c.leafClientResource(leafClient) + if err != nil { + klog.Errorf("Failed to get leaf client resource %v", leafClient.Cluster.Name) + errs = append(errs, fmt.Sprintf("get leaf client resource %v", leafClient.Cluster.Name)) + continue + } + + err = lcr.Clientset.CoreV1().Services(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + klog.Errorf("Failed to delete service %s in cluster %s: %v", name, cluster, err) + errs = append(errs, fmt.Sprintf("delete service %s in cluster %s: %v", name, cluster, err)) + } + } + + if len(errs) > 0 { + return errors.New("errors encountered: " + strings.Join(errs, "; ")) + } + return nil +} + +func (c *SimpleSyncServiceController) syncServiceInLeafCluster(service *corev1.Service) error { + clusters := c.GlobalLeafManager.ListClusters() + errsChan := make(chan string, len(clusters)) + var wg sync.WaitGroup + for _, cluster := range clusters { + wg.Add(1) + go func(cluster string) { + defer wg.Done() + leafManager, err := c.GlobalLeafManager.GetLeafResource(cluster) + if err != nil { + errsChan <- fmt.Sprintf("get leaf client for cluster %s: %v", cluster, err) + return + } + + lcr, err := c.leafClientResource(leafManager) + if err != nil { + errsChan <- fmt.Sprintf("get leaf client resource for cluster %s: %v", cluster, err) + return + } + + if err = c.createNamespace(lcr.Client, service.Namespace); err != nil && !apierrors.IsAlreadyExists(err) { + errsChan <- fmt.Sprintf("Create namespace %s in leaf cluster %s failed: %v", service.Namespace, cluster, err) + return + } + + err = c.checkServiceType(service, leafManager) + if err != nil { + errsChan <- fmt.Sprintf("check service type in leaf cluster %s failed: %v", cluster, err) + return + } + + clientService := c.generateService(service, leafManager) + err = c.createOrUpdateServiceInClient(clientService, leafManager, lcr) + if err != nil { + errsChan <- fmt.Sprintf("Create or update service in leaf cluster %s failed: %v", cluster, err) + } + }(cluster) + } + wg.Wait() + close(errsChan) + + var errs []string + for err := range errsChan { + errs = append(errs, err) + } + if len(errs) > 0 { + return errors.New("errors encountered: " + strings.Join(errs, "; ")) + } + return nil +} + +func (c *SimpleSyncServiceController) checkServiceType(service *corev1.Service, resource *clustertreeutils.LeafResource) error { + if *service.Spec.IPFamilyPolicy == corev1.IPFamilyPolicySingleStack { + if service.Spec.IPFamilies[0] == corev1.IPv6Protocol && resource.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV4 || + service.Spec.IPFamilies[0] == corev1.IPv4Protocol && resource.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV6 { + return fmt.Errorf("service's IPFamilyPolicy %s is not match the leaf cluster %s", *service.Spec.IPFamilyPolicy, resource.Cluster.Name) + } + } + return nil +} + +func (c *SimpleSyncServiceController) generateService(service *corev1.Service, resource *clustertreeutils.LeafResource) *corev1.Service { + clusterIP := corev1.ClusterIPNone + if isServiceIPSet(service) { + clusterIP = "" + } + + iPFamilies := make([]corev1.IPFamily, 0) + if resource.IPFamilyType == kosmosv1alpha1.IPFamilyTypeALL { + iPFamilies = service.Spec.IPFamilies + } else if resource.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV4 { + iPFamilies = append(iPFamilies, corev1.IPv4Protocol) + } else { + iPFamilies = append(iPFamilies, corev1.IPv6Protocol) + } + + var iPFamilyPolicy corev1.IPFamilyPolicy + if resource.IPFamilyType == kosmosv1alpha1.IPFamilyTypeALL { + iPFamilyPolicy = *service.Spec.IPFamilyPolicy + } else { + iPFamilyPolicy = corev1.IPFamilyPolicySingleStack + } + + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: service.Namespace, + Name: service.Name, + Annotations: map[string]string{ + utils.ServiceImportLabelKey: utils.MCSLabelValue, + }, + }, + Spec: corev1.ServiceSpec{ + Type: service.Spec.Type, + ClusterIP: clusterIP, + Ports: servicePorts(service), + IPFamilies: iPFamilies, + IPFamilyPolicy: &iPFamilyPolicy, + }, + } +} + +func (c *SimpleSyncServiceController) createOrUpdateServiceInClient(service *corev1.Service, leafManger *clustertreeutils.LeafResource, leafClient *clustertreeutils.LeafClientResource) error { + oldService := &corev1.Service{} + if err := leafClient.Client.Get(context.TODO(), types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, oldService); err != nil { + if apierrors.IsNotFound(err) { + if err = leafClient.Client.Create(context.TODO(), service); err != nil { + klog.Errorf("Create serviceImport service(%s/%s) in client cluster %s failed, Error: %v", service.Namespace, service.Name, leafManger.Cluster.Name, err) + return err + } else { + return nil + } + } + klog.Errorf("Get service(%s/%s) from in cluster %s failed, Error: %v", service.Namespace, service.Name, leafManger.Cluster.Name, err) + return err + } + + retainServiceFields(oldService, service) + + if err := leafClient.Client.Update(context.TODO(), service); err != nil { + if err != nil { + klog.Errorf("Update serviceImport service(%s/%s) in cluster %s failed, Error: %v", service.Namespace, service.Name, leafManger.Cluster.Name, err) + return err + } + } + return nil +} + +// nolint:dupl +func isServiceIPSet(service *corev1.Service) bool { + return service.Spec.ClusterIP != corev1.ClusterIPNone && service.Spec.ClusterIP != "" +} + +// nolint:dupl +func servicePorts(service *corev1.Service) []corev1.ServicePort { + ports := make([]corev1.ServicePort, len(service.Spec.Ports)) + for i, p := range service.Spec.Ports { + ports[i] = corev1.ServicePort{ + NodePort: p.NodePort, + Name: p.Name, + Protocol: p.Protocol, + Port: p.Port, + AppProtocol: p.AppProtocol, + } + } + return ports +} + +func (c *SimpleSyncServiceController) leafClientResource(lr *clustertreeutils.LeafResource) (*clustertreeutils.LeafClientResource, error) { + actualClusterName := clustertreeutils.GetActualClusterName(lr.Cluster) + lcr, err := c.GlobalLeafClientManager.GetLeafResource(actualClusterName) + if err != nil { + return nil, fmt.Errorf("get leaf client resource err: %v", err) + } + return lcr, nil +} + +// nolint:dupl +func retainServiceFields(oldSvc, newSvc *corev1.Service) { + newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP + newSvc.ResourceVersion = oldSvc.ResourceVersion +} diff --git a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go index 3a3475116..c47b22664 100644 --- a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go +++ b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go @@ -35,6 +35,7 @@ type LeafResource struct { Cluster *kosmosv1alpha1.Cluster Namespace string IgnoreLabels []string + IPFamilyType kosmosv1alpha1.IPFamilyType EnableServiceAccount bool Nodes []ClusterNode }