diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index 536767dd0..e95166bdd 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -270,11 +270,13 @@ func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *kosmo if c.Options.MultiClusterService { serviceImportController := &mcs.ServiceImportController{ - LeafClient: mgr.GetClient(), - RootKosmosClient: kosmosClient, - EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName), - Logger: mgr.GetLogger(), - LeafNodeName: cluster.Name, + LeafClient: mgr.GetClient(), + RootKosmosClient: kosmosClient, + EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName), + Logger: mgr.GetLogger(), + LeafNodeName: cluster.Name, + // todo @wyz + IPFamilyType: cluster.Spec.ClusterLinkOptions.IPFamily, RootResourceManager: c.RootResourceManager, } if err := serviceImportController.AddController(mgr); err != nil { diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go index 48678d803..da00a9a5f 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go @@ -137,7 +137,6 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespac return err } } - klog.Infof("ServiceImport (%s/%s) deleted", namespace, name) return nil } diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go index 207990a33..020c39255 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go @@ -21,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" "github.com/kosmos.io/kosmos/pkg/utils" @@ -35,6 +36,7 @@ type ServiceImportController struct { LeafClient client.Client RootKosmosClient kosmosversioned.Interface LeafNodeName string + IPFamilyType kosmosv1alpha1.IPFamilyType EventRecorder record.EventRecorder Logger logr.Logger processor utils.AsyncWorker @@ -230,6 +232,12 @@ func (c *ServiceImportController) importEndpointSliceHandler(ctx context.Context clearEndpointSlice(endpointSlice, disConnectedAddress) } + if endpointSlice.AddressType == discoveryv1.AddressTypeIPv4 && c.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV6 || + endpointSlice.AddressType == discoveryv1.AddressTypeIPv6 && c.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV4 { + klog.Warningf("The endpointSlice's AddressType is not match leaf cluster %s IPFamilyType,so ignore it", c.LeafNodeName) + return nil + } + return c.createOrUpdateEndpointSliceInClient(ctx, endpointSlice, serviceImport.Name) } @@ -308,8 +316,13 @@ func clearEndpointSlice(slice *discoveryv1.EndpointSlice, disconnectedAddress [] } func (c *ServiceImportController) importServiceHandler(ctx context.Context, rootService *corev1.Service, serviceImport *mcsv1alpha1.ServiceImport) error { - clientService := generateService(rootService, serviceImport) - err := c.createOrUpdateServiceInClient(ctx, clientService) + err := c.checkServiceType(rootService) + if err != nil { + klog.Errorf("Cloud not create service in leaf cluster %s,Error: %v", c.LeafNodeName, err) + return err + } + clientService := c.generateService(rootService, serviceImport) + err = c.createOrUpdateServiceInClient(ctx, clientService) if err != nil { return err } @@ -426,12 +439,28 @@ func retainServiceFields(oldSvc, newSvc *corev1.Service) { newSvc.ResourceVersion = oldSvc.ResourceVersion } -func generateService(service *corev1.Service, serviceImport *mcsv1alpha1.ServiceImport) *corev1.Service { +func (c *ServiceImportController) generateService(service *corev1.Service, serviceImport *mcsv1alpha1.ServiceImport) *corev1.Service { clusterIP := corev1.ClusterIPNone if isServiceIPSet(service) { clusterIP = "" } + iPFamilies := make([]corev1.IPFamily, 0) + if c.IPFamilyType == kosmosv1alpha1.IPFamilyTypeALL { + iPFamilies = service.Spec.IPFamilies + } else if c.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV4 { + iPFamilies = append(iPFamilies, corev1.IPv4Protocol) + } else { + iPFamilies = append(iPFamilies, corev1.IPv6Protocol) + } + + var iPFamilyPolicy corev1.IPFamilyPolicy + if c.IPFamilyType == kosmosv1alpha1.IPFamilyTypeALL { + iPFamilyPolicy = *service.Spec.IPFamilyPolicy + } else { + iPFamilyPolicy = corev1.IPFamilyPolicySingleStack + } + return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: serviceImport.Namespace, @@ -444,12 +473,23 @@ func generateService(service *corev1.Service, serviceImport *mcsv1alpha1.Service Type: service.Spec.Type, ClusterIP: clusterIP, Ports: servicePorts(service), - IPFamilies: service.Spec.IPFamilies, - IPFamilyPolicy: service.Spec.IPFamilyPolicy, + IPFamilies: iPFamilies, + IPFamilyPolicy: &iPFamilyPolicy, }, } } +func (c *ServiceImportController) checkServiceType(service *corev1.Service) error { + if *service.Spec.IPFamilyPolicy == corev1.IPFamilyPolicySingleStack { + if service.Spec.IPFamilies[0] == corev1.IPv6Protocol && c.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV4 || + service.Spec.IPFamilies[0] == corev1.IPv4Protocol && c.IPFamilyType == kosmosv1alpha1.IPFamilyTypeIPV6 { + return fmt.Errorf("service's IPFamilyPolicy %s is not match the leaf cluster %s", *service.Spec.IPFamilyPolicy, c.LeafNodeName) + } + + } + return nil +} + func isServiceIPSet(service *corev1.Service) bool { return service.Spec.ClusterIP != corev1.ClusterIPNone && service.Spec.ClusterIP != "" }