diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index 494432d00..14e77225b 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -28,6 +28,7 @@ import ( glnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/global.node.controller" kosmos "github.com/kosmos.io/kosmos/pkg/kubenest/controller/kosmos" vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" + "github.com/kosmos.io/kosmos/pkg/kubenest/webhooks" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" ) @@ -227,6 +228,7 @@ func run(ctx context.Context, config *config.Config) error { LivenessEndpointName: "/healthz", ReadinessEndpointName: "/readyz", HealthProbeBindAddress: ":8081", + Port: 9443, }) if err != nil { return fmt.Errorf("failed to build controller manager: %v", err) @@ -252,6 +254,11 @@ func run(ctx context.Context, config *config.Config) error { return fmt.Errorf("could not create clientset: %v", err) } + // 创建 Webhook 设置 + if err := webhooks.SetupWebhookWithManager(mgr); err != nil { + return fmt.Errorf("unable to setup webhook: %v", err) + } + VirtualClusterInitController := controller.VirtualClusterInitController{ Client: mgr.GetClient(), Config: mgr.GetConfig(), diff --git a/deploy/virtual-cluster-operator-webhook-svc.yaml b/deploy/virtual-cluster-operator-webhook-svc.yaml new file mode 100644 index 000000000..765a2b1ed --- /dev/null +++ b/deploy/virtual-cluster-operator-webhook-svc.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: virtual-cluster-operator-webhook + namespace: kosmos-system +spec: + ports: + - port: 443 # Service 端口,API Server 会使用它访问 Webhook + targetPort: 9443 # 映射到 Pod 内的 9443 端口 + selector: + app: virtual-cluster-operator diff --git a/deploy/webhooks/virtualcluster-validating-webhook.yaml b/deploy/webhooks/virtualcluster-validating-webhook.yaml new file mode 100644 index 000000000..22db4633b --- /dev/null +++ b/deploy/webhooks/virtualcluster-validating-webhook.yaml @@ -0,0 +1,19 @@ +apiVersion: admissionregistration.k8s.io/v1 +kind: ValidatingWebhookConfiguration +metadata: + name: virtualcluster-validating-webhook +webhooks: + - name: virtualcluster.kosmos.io + clientConfig: + service: + name: virtual-cluster-operator-webhook # 服务名称 + namespace: kosmos-system # 命名空间 + path: /validate-kosmos-io-v1alpha1-virtualcluster # 默认路径 + caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURDekNDQWZPZ0F3SUJBZ0lVTk4yRXo0ekoyY2wrTGVnWjBXaWgrYUxCSkF3d0RRWUpLb1pJaHZjTkFRRUwKQlFBd0ZURVRNQkVHQTFVRUF3d0tkMlZpYUc5dmF5MWpZVEFlRncweU5ERXhNakV3TmpNNU5UTmFGdzB6TkRFeApNVGt3TmpNNU5UTmFNQlV4RXpBUkJnTlZCQU1NQ25kbFltaHZiMnN0WTJFd2dnRWlNQTBHQ1NxR1NJYjNEUUVCCkFRVUFBNElCRHdBd2dnRUtBb0lCQVFETDBra01acnZFRXJDQVhzbzdURlZIRHVlMjFTWUx1a1RBOFpQNE9YelMKRW8wYlY1bjJ0UENUcm1JeTN5NU1rNmlZZVZoVTlOOXdzK0YxR0xzaHQ3WGZyVHExcG1wQ3NWNnBQMW9UZkZuVAptMDhWVVJCZ2JLK2VxT3QvcXMvbnBLWi9EZVN3NjRCbW9EMGNsVWJ5dWxqTHFGRTRSMXVLa0hlZlpBVklXaGhVCkRyNWNzb1Y1T1ZJUHhKaDh2d2RWcER2MldkZUxSNFJUcWVPTVJIL29QaytCa0VDYi9MMGRJdGtiQW0vcnJuZTUKWDFieUNXZ1V4eG1WdHRDbHpQTDRpcTNKRjdTYU1DRlg4dTFpMGxiSEpYYVNva0UyMzN1Z1p0ZEVqUVlOV0gyMAplc2I4aWptZ2JKV0tQbytxbzF4WHlSZ25HUmprZ3VWK2FkNW85NFZNdjgrZEFnTUJBQUdqVXpCUk1CMEdBMVVkCkRnUVdCQlFlcnp0ZGpPK2hNcmN1VTNjdGg2b3Y0MUJCT0RBZkJnTlZIU01FR0RBV2dCUWVyenRkak8raE1yY3UKVTNjdGg2b3Y0MUJCT0RBUEJnTlZIUk1CQWY4RUJUQURBUUgvTUEwR0NTcUdTSWIzRFFFQkN3VUFBNElCQVFBWgpuTjBWZ3czQ29GbGNablBjcEJPTGNEZUxKR2VWbDhyV0QrY2dGQjc5WEljL0kxYythRlpSbkVIZkJVSytpT3dVClBSZ2VSUzZiOVd0V29tVDhvMzlieWwwUWVrYVVwM3A1NGVIYUhINUtUYnJORkliZEU3bC9rRk0wMlV1eURxQU8KczNuakF3VUhjOUIwQmxLZ0ZHWFd4WWRXeWRaem00bXFGbk45ZEdlWldudXkrdHZSSzhSZjk5Rk1sNXh0dVZmYQo3amdpMHplVEx6NjBSK285cEtsN3FqK0xxZlRScVR5ZjVzbXdmSk0yS2h2ZENsb2ZKZjhxSGhhT0lZa094QnRmClBHbGpQQ0pYWk53VlgzZTBLbUV2QnNkamRLbGJjcVNHalpqLy8yVWhBTzU0aHJQUTkzem02R1FTdXZCU3lpR3YKMVB4eFdyVW1TZ25GRnJWb2F5Z20KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo= + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["kosmos.io"] + apiVersions: ["v1alpha1"] + resources: ["virtualclusters"] + admissionReviewVersions: ["v1"] + sideEffects: None diff --git a/pkg/kubenest/common/resource.go b/pkg/kubenest/common/resource.go new file mode 100644 index 000000000..9d6bca715 --- /dev/null +++ b/pkg/kubenest/common/resource.go @@ -0,0 +1,14 @@ +package common + +import ( + "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" +) + +type Resource struct { + Namespace string + Name string + Vc *v1alpha1.VirtualCluster + RootClientSet kubernetes.Interface +} diff --git a/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go index ad065ab57..ad72f3fc6 100644 --- a/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go +++ b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go @@ -8,19 +8,17 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -38,145 +36,158 @@ type APIServerExternalSyncController struct { const APIServerExternalSyncControllerName string = "api-server-external-service-sync-controller" func (e *APIServerExternalSyncController) SetupWithManager(mgr manager.Manager) error { - skipEvent := func(obj client.Object) bool { - return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != "" - } - return controllerruntime.NewControllerManagedBy(mgr). Named(APIServerExternalSyncControllerName). WithOptions(controller.Options{MaxConcurrentReconciles: 5}). - For(&v1.Endpoints{}, - builder.WithPredicates(predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - return skipEvent(createEvent.Object) - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false }, - })). - Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(e.newVirtualClusterMapFunc())). + Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(e.newPodMapFunc())). Complete(e) } -func (e *APIServerExternalSyncController) newVirtualClusterMapFunc() handler.MapFunc { +func (e *APIServerExternalSyncController) newPodMapFunc() handler.MapFunc { return func(a client.Object) []reconcile.Request { var requests []reconcile.Request - vcluster := a.(*v1alpha1.VirtualCluster) - - // Join the Reconcile queue only if the status of the vcluster is Completed - if vcluster.Status.Phase == v1alpha1.Completed { - klog.V(4).Infof("api-server-external-sync-controller: virtualcluster change to completed: %s", vcluster.Name) - // Add the vcluster to the Reconcile queue - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: vcluster.Name, - Namespace: vcluster.Namespace, - }, - }) + pod := a.(*v1.Pod) + + // pod 的名称包含 "apiserver" 并且不包含 "kube-apiserver" + if strings.Contains(pod.Name, "apiserver") && !strings.Contains(pod.Name, "kube-apiserver") { + klog.V(4).Infof("api-server-external-sync-controller: Detected change in apiserver Pod: %s", pod.Name) + + // 根据 pod 名称推断 vcluster 名称 + parts := strings.SplitN(pod.Name, "-apiserver", 2) + vclusterName := parts[0] + klog.V(4).Infof("Derived vclusterName: %s from podName: %s", vclusterName, pod.Name) + + // 查找与该 Pod 关联的 VirtualCluster + vcluster := &v1alpha1.VirtualCluster{} + if err := e.Client.Get(context.Background(), types.NamespacedName{ + Namespace: pod.Namespace, + Name: vclusterName, + }, vcluster); err != nil { + klog.Errorf("Failed to get VirtualCluster %s: %v", vclusterName, err) + return nil + } + + // 确保 VirtualCluster 状态为 Completed + if vcluster.Status.Phase == v1alpha1.Completed { + klog.V(4).Infof("VirtualCluster %s is completed, enqueueing for reconciliation", vclusterName) + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: vcluster.Name, + Namespace: vcluster.Namespace, + }, + }) + } } return requests } } -func (e *APIServerExternalSyncController) SyncAPIServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error { - kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{}) - if err != nil { - klog.Errorf("Error getting endpoints: %v", err) - return err - } - klog.V(4).Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints) - for _, subset := range kubeEndpoints.Subsets { - for _, address := range subset.Addresses { - klog.V(4).Infof("IP: %s", address.IP) - } - } - - if len(kubeEndpoints.Subsets) != 1 { - return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes") - } - - if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 { - klog.Errorf("eps %s Addresses length is nil", "kubernetes") - return err - } - - apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - klog.Errorf("failed to get endpoints for %s : %v", constants.APIServerExternalService, err) - return err +func (e *APIServerExternalSyncController) SyncAPIServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface, vc *v1alpha1.VirtualCluster) error { + podList := &v1.PodList{} + if err := e.Client.List(ctx, podList, &client.ListOptions{ + Namespace: vc.Namespace, + LabelSelector: labels.SelectorFromSet(map[string]string{ + "virtualCluster-app": "apiserver", + }), + }); err != nil { + return fmt.Errorf("failed to list apiserver pods: %w", err) } - updateEPS := apiServerExternalEndpoints.DeepCopy() - - if apiServerExternalEndpoints != nil { - klog.V(4).Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints) - } else { - klog.V(4).Info("apiServerExternalEndpoints is nil") + var addresses []v1.EndpointAddress + for _, pod := range podList.Items { + // 确保 Pod 处于 Running 状态并有 IP 地址 + if pod.Status.Phase == v1.PodRunning && pod.Status.PodIP != "" { + klog.V(4).Infof("Found apiserver Pod: %s, IP: %s", pod.Name, pod.Status.PodIP) + addresses = append(addresses, v1.EndpointAddress{IP: pod.Status.PodIP}) + } } - if updateEPS != nil { - klog.V(4).Infof("updateEPS: %v", updateEPS) - } else { - klog.V(4).Info("updateEPS is nil") + apiServerPort, ok := vc.Status.PortMap[constants.APIServerPortKey] + if !ok { + return fmt.Errorf("failed to get API server port from VirtualCluster status") + } + klog.V(4).Infof("API server port: %d", apiServerPort) + + // 构造 Endpoints 对象 + newEndpoint := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: addresses, + Ports: []v1.EndpointPort{ + { + Name: "https", + Port: apiServerPort, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, } - if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 { - ip := kubeEndpoints.Subsets[0].Addresses[0].IP - klog.V(4).Infof("IP address: %s", ip) - updateEPS.Subsets[0].Addresses[0].IP = ip + //避免不必要的更新 + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentEndpoint, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + _, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Create(ctx, newEndpoint, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create api-server-external-service endpoint: %w", err) + } + klog.Infof("Created api-server-external-service Endpoint") + return nil + } else if err != nil { + return fmt.Errorf("failed to get existing api-server-external-service endpoint: %w", err) + } - if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil { - klog.Errorf("failed to update endpoints for api-server-external-service: %v", err) - return err + // 比较 Endpoints 内容,判断是否需要更新 + if !endpointsEqual(currentEndpoint, newEndpoint) { + _, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Update(ctx, newEndpoint, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update api-server-external-service endpoint: %w", err) + } + klog.Infof("Updated api-server-external-service Endpoint") + } else { + klog.V(4).Info("No changes detected in Endpoint, skipping update") } - } else { - klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS) - return err - } + return nil + }) +} - return nil +// Endpoints 比较函数 +func endpointsEqual(a, b *v1.Endpoints) bool { + return fmt.Sprintf("%v", a.Subsets) == fmt.Sprintf("%v", b.Subsets) } func (e *APIServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { klog.V(4).Infof("============ %s start to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName) defer klog.V(4).Infof("============ %s finish to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName) - var virtualClusterList v1alpha1.VirtualClusterList - if err := e.List(ctx, &virtualClusterList); err != nil { + var vc v1alpha1.VirtualCluster + if err := e.Get(ctx, request.NamespacedName, &vc); err != nil { if apierrors.IsNotFound(err) { + klog.Infof("VirtualCluster not found: %s", request.NamespacedName) return reconcile.Result{}, nil } - klog.V(4).Infof("query virtualcluster failed: %v", err) + klog.Errorf("Failed to get VirtualCluster: %v", err) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } - var targetVirtualCluster v1alpha1.VirtualCluster - hasVirtualCluster := false - for _, vc := range virtualClusterList.Items { - if vc.Namespace == request.Namespace { - targetVirtualCluster = vc - klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name) - hasVirtualCluster = true - break - } - } - if !hasVirtualCluster { - klog.V(4).Infof("virtualcluster %s not found", request.Namespace) - return reconcile.Result{}, nil - } - if targetVirtualCluster.Status.Phase != v1alpha1.Completed { + if vc.Status.Phase != v1alpha1.Completed { + klog.Infof("VirtualCluster %s is not in Completed phase", vc.Name) return reconcile.Result{}, nil } - k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster) + k8sClient, err := util.GenerateKubeclient(&vc) if err != nil { - klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err) + klog.Errorf("Failed to generate Kubernetes client for VirtualCluster %s: %v", vc.Name, err) return reconcile.Result{}, nil } - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - return e.SyncAPIServerExternalEPS(ctx, k8sClient) - }); err != nil { - klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err) + if err := e.SyncAPIServerExternalEPS(ctx, k8sClient, &vc); err != nil { + klog.Errorf("Failed to sync apiserver external Endpoints: %v", err) return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } diff --git a/pkg/kubenest/controlplane/endpoint.go b/pkg/kubenest/controlplane/endpoint.go index e809eddd5..69892635e 100644 --- a/pkg/kubenest/controlplane/endpoint.go +++ b/pkg/kubenest/controlplane/endpoint.go @@ -12,13 +12,25 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/kubenest/common" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/virtualcluster" "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/utils" ) -func EnsureAPIServerExternalEndPoint(kubeClient kubernetes.Interface) error { - err := CreateOrUpdateAPIServerExternalEndpoint(kubeClient) +type IPFamilies struct { + IPv4 bool + IPv6 bool +} + +func EnsureAPIServerExternalEndPoint(kubeClient kubernetes.Interface, resource common.Resource) error { + err := EnsureKosmosSystemNamespace(kubeClient) + if err != nil { + return err + } + + err = CreateOrUpdateAPIServerExternalEndpoint(kubeClient, resource) if err != nil { return err } @@ -30,70 +42,98 @@ func EnsureAPIServerExternalEndPoint(kubeClient kubernetes.Interface) error { return nil } -func CreateOrUpdateAPIServerExternalEndpoint(kubeClient kubernetes.Interface) error { - klog.V(4).Info("begin to get kubernetes endpoint") - kubeEndpoint, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) +func CreateOrUpdateAPIServerExternalEndpoint(kubeClient kubernetes.Interface, resource common.Resource) error { + klog.V(4).Info("begin to create or update api-server-external-service endpoint") + // 获取API Server 所在的节点信息 + nodes, err := getAPIServerNodes(resource.RootClientSet, resource.Namespace) if err != nil { - klog.Error("get Kubernetes endpoint failed", err) - return errors.Wrap(err, "failed to get kubernetes endpoint") - } - klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpoint) - - newEndpoint := kubeEndpoint.DeepCopy() - newEndpoint.Name = constants.APIServerExternalService - newEndpoint.Namespace = constants.DefaultNs - newEndpoint.ResourceVersion = "" - - // Reconstruct the Ports without the 'name' field - for i := range newEndpoint.Subsets { - for j := range newEndpoint.Subsets[i].Ports { - newEndpoint.Subsets[i].Ports[j] = corev1.EndpointPort{ - Port: newEndpoint.Subsets[i].Ports[j].Port, - Protocol: newEndpoint.Subsets[i].Ports[j].Protocol, + return fmt.Errorf("failed to get API server nodes: %w", err) + } + if len(nodes.Items) == 0 { + return fmt.Errorf("no API server nodes found in the cluster") + } + // 收集API Server节点的InternalIp地址 + var addresses []corev1.EndpointAddress + for _, node := range nodes.Items { + klog.V(4).Infof("API server node: %s", node.Name) + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + klog.V(4).Infof("Node internal IP: %s", address.Address) + addresses = append(addresses, corev1.EndpointAddress{ + IP: address.Address, + }) } } } - // Try to create the endpoint - _, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Create(context.TODO(), newEndpoint, metav1.CreateOptions{}) - if err != nil { - if !apierrors.IsAlreadyExists(err) { - klog.Error("create api-server-external-service endpoint failed", err) - return errors.Wrap(err, "failed to create api-server-external-service endpoint") - } + if len(addresses) == 0 { + return fmt.Errorf("no internal IP addresses found for the API server nodes") + } - // Endpoint already exists, retrieve it - existingEndpoint, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) - if err != nil { - klog.Error("get existing api-server-external-service endpoint failed", err) - return errors.Wrap(err, "failed to get existing api-server-external-service endpoint") - } + // 获取API Server 的端口信息 + apiServerPort, ok := resource.Vc.Status.PortMap[constants.APIServerPortKey] + if !ok { + return fmt.Errorf("failed to get API server port from VirtualCluster status") + } + klog.V(4).Infof("API server port: %d", apiServerPort) - // Update the existing endpoint - newEndpoint.SetResourceVersion(existingEndpoint.ResourceVersion) - newEndpoint.SetUID(existingEndpoint.UID) - _, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Update(context.TODO(), newEndpoint, metav1.UpdateOptions{}) - if err != nil { - klog.Error("update api-server-external-service endpoint failed", err) - return errors.Wrap(err, "failed to update api-server-external-service endpoint") + // 创建或更新 api-server-external-service Endpoint + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.APIServerExternalService, + Namespace: constants.KosmosNs, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: addresses, + Ports: []corev1.EndpointPort{ + { + Name: "https", + Port: apiServerPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + } + + _, err = kubeClient.CoreV1().Endpoints(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + // 创建 Endpoint + _, err = kubeClient.CoreV1().Endpoints(constants.KosmosNs).Create(context.TODO(), endpoint, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create api-server-external-service endpoint: %w", err) + } + klog.V(4).Info("api-server-external-service endpoint created successfully") + } else { + return fmt.Errorf("failed to get api-server-external-service endpoint: %w", err) } - klog.V(4).Info("successfully updated api-server-external-service endpoint") } else { - klog.V(4).Info("successfully created api-server-external-service endpoint") + // 更新 Endpoint + _, err = kubeClient.CoreV1().Endpoints(constants.KosmosNs).Update(context.TODO(), endpoint, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update api-server-external-service endpoint: %w", err) + } + klog.V(4).Info("api-server-external-service endpoint updated successfully") } return nil } func CreateOrUpdateAPIServerExternalService(kubeClient kubernetes.Interface) error { - port, err := getEndPointPort(kubeClient) + port, ipFamilies, err := getEndPointInfo(kubeClient) if err != nil { return fmt.Errorf("error when getEndPointPort: %w", err) } apiServerExternalServiceBytes, err := util.ParseTemplate(virtualcluster.APIServerExternalService, struct { ServicePort int32 + IPv4 bool + IPv6 bool }{ ServicePort: port, + IPv4: ipFamilies.IPv4, + IPv6: ipFamilies.IPv6, }) if err != nil { return fmt.Errorf("error when parsing api-server-external-serive template: %w", err) @@ -103,42 +143,181 @@ func CreateOrUpdateAPIServerExternalService(kubeClient kubernetes.Interface) err if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &svc); err != nil { return fmt.Errorf("err when decoding api-server-external-service in virtual cluster: %w", err) } - _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + _, err = kubeClient.CoreV1().Services(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { // Try to create the service - _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Create(context.TODO(), &svc, metav1.CreateOptions{}) + _, err = kubeClient.CoreV1().Services(constants.KosmosNs).Create(context.TODO(), &svc, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("error when creating api-server-external-service: %w", err) } + klog.V(4).Info("successfully created api-server-external-service service") } else { return fmt.Errorf("error when get api-server-external-service: %w", err) } + } else { + _, err = kubeClient.CoreV1().Services(constants.KosmosNs).Update(context.TODO(), &svc, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("error when updating api-server-external-service: %w", err) + } + klog.V(4).Info("successfully updated api-server-external-service service") } - klog.V(4).Info("successfully created api-server-external-service service") + return nil } -func getEndPointPort(kubeClient kubernetes.Interface) (int32, error) { +func getEndPointInfo(kubeClient kubernetes.Interface) (int32, IPFamilies, error) { klog.V(4).Info("begin to get Endpoints ports...") - endpoints, err := kubeClient.CoreV1().Endpoints(constants.DefaultNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + endpoints, err := kubeClient.CoreV1().Endpoints(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) if err != nil { klog.Errorf("get Endpoints failed: %v", err) - return 0, err + return 0, IPFamilies{}, err } if len(endpoints.Subsets) == 0 { klog.Errorf("subsets is empty") - return 0, fmt.Errorf("No subsets found in the endpoints") + return 0, IPFamilies{}, fmt.Errorf("No subsets found in the endpoints") } subset := endpoints.Subsets[0] + if len(subset.Ports) == 0 { klog.Errorf("Port not found in the endpoint") - return 0, fmt.Errorf("No ports found in the endpoint") + return 0, IPFamilies{}, fmt.Errorf("No ports found in the endpoint") } port := subset.Ports[0].Port klog.V(4).Infof("The port number was successfully obtained: %d", port) - return port, nil + + ipFamilies := IPFamilies{ + IPv4: false, + IPv6: false, + } + + // Check if the addresses contain IPv4 or IPv6 + for _, address := range subset.Addresses { + if utils.IsIPv4(address.IP) { + ipFamilies.IPv4 = true + } + if utils.IsIPv6(address.IP) { + ipFamilies.IPv6 = true + } + } + + klog.V(4).Infof("IPv4: %v, IPv6: %v", ipFamilies.IPv4, ipFamilies.IPv6) + + return port, ipFamilies, nil +} + +func EnsureKosmosSystemNamespace(kubeClient kubernetes.Interface) error { + // 检查 kosmos-system 命名空间是否存在 + _, err := kubeClient.CoreV1().Namespaces().Get(context.Background(), constants.KosmosNs, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.KosmosNs, + }, + } + _, err = kubeClient.CoreV1().Namespaces().Create(context.Background(), namespace, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create kosmos-system namespace: %v", err) + } + fmt.Println("Created kosmos-system namespace") + return nil + } else { + return fmt.Errorf("failed to get kosmos-system namespace: %v", err) + } + } + // 命名空间已存在 + return nil +} + +func getAPIServerNodes(rootClientSet kubernetes.Interface, namespace string) (*corev1.NodeList, error) { + klog.V(4).Info("begin to get API server nodes") + // 获取带有 virtualCluster-app=apiserver 标签的 kube-apiserver Pod + apiServerPods, err := rootClientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "virtualCluster-app=apiserver", + }) + if err != nil { + klog.Errorf("failed to list kube-apiserver pod: %v", err) + return nil, errors.Wrap(err, "failed to list kube-apiserver pods") + } + // 收集所有 API Server Pod 所在的节点名称 + var nodeNames []string + for _, pod := range apiServerPods.Items { + klog.V(4).Infof("API server pod %s is on node: %s", pod.Name, pod.Spec.NodeName) + nodeNames = append(nodeNames, pod.Spec.NodeName) + } + + if len(nodeNames) == 0 { + klog.Errorf("no API server pods found in the namespace") + return nil, fmt.Errorf("no API server pods found") + } + + // 查询每个节点并收集节点信息 + var nodesList []corev1.Node + for _, nodeName := range nodeNames { + node, err := rootClientSet.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("failed to get node %s: %v", nodeName, err) + return nil, fmt.Errorf("failed to get node %s: %v", nodeName, err) + } + klog.V(4).Infof("Found node: %s", node.Name) + nodesList = append(nodesList, *node) + } + + nodes := &corev1.NodeList{ + Items: nodesList, + } + + klog.V(4).Infof("got %d API server nodes", len(nodes.Items)) + + if len(nodes.Items) == 0 { + klog.Errorf("no nodes found for the API server pods") + return nil, fmt.Errorf("no nodes found for the API server pods") + } + + return nodes, nil +} + +func detectIPFamiliesFromEndpoints(kubeClient kubernetes.Interface) ([]string, error) { + klog.V(4).Info("Detecting IP families from endpoint addresses...") + + // 获取 API server 的 endpoints + endpoints, err := kubeClient.CoreV1().Endpoints(constants.KosmosNs).Get(context.TODO(), constants.APIServerExternalService, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get endpoints: %w", err) + } + + ipv4Detected := false + ipv6Detected := false + + // 遍历 Endpoints 中的 Address,判断是否为 IPv4 或 IPv6 + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + if utils.IsIPv4(address.IP) { + ipv4Detected = true + } + if utils.IsIPv6(address.IP) { + ipv6Detected = true + } + } + } + + if !ipv4Detected && !ipv6Detected { + return nil, fmt.Errorf("no valid IP addresses found in the endpoint") + } + + // 动态决定 ipFamilies + ipFamilies := []string{} + if ipv4Detected { + ipFamilies = append(ipFamilies, "IPv4") + } + if ipv6Detected { + ipFamilies = append(ipFamilies, "IPv6") + } + + klog.V(4).Infof("Detected IP families from endpoint: %v", ipFamilies) + return ipFamilies, nil } diff --git a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go index e7c0a2a45..6ee2a6628 100644 --- a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go +++ b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go @@ -307,7 +307,11 @@ spec: image: {{ .ImageRepository }}/kas-network-proxy-server:{{ .Version }} resources: requests: - cpu: 1m + cpu: 100m + memory: 128Mi + limits: + cpu: 500m + memory: 256Mi securityContext: allowPrivilegeEscalation: false runAsUser: 0 @@ -484,9 +488,11 @@ spec: image: {{ .ImageRepository }}/kas-network-proxy-agent:{{ .Version }} resources: requests: - cpu: 50m + cpu: 100m + memory: 100Mi limits: - memory: 30Mi + cpu: 500m + memory: 500Mi command: [ "/proxy-agent"] args: [ "--logtostderr=true", diff --git a/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go b/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go index 614fbd753..1c69d9f65 100644 --- a/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go +++ b/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go @@ -6,13 +6,23 @@ apiVersion: v1 kind: Service metadata: name: api-server-external-service - namespace: default + namespace: kosmos-system spec: + ipFamilies: + {{- if .IPv4 }} + - IPv4 + {{- end }} + {{- if .IPv6 }} + - IPv6 + {{- end }} + ipFamilyPolicy: PreferDualStack type: NodePort ports: - - protocol: TCP + - name: https + protocol: TCP port: {{ .ServicePort }} targetPort: {{ .ServicePort }} nodePort: 30443 + sessionAffinity: None ` ) diff --git a/pkg/kubenest/tasks/endpoint.go b/pkg/kubenest/tasks/endpoint.go index 8fcd9d31b..198619e5d 100644 --- a/pkg/kubenest/tasks/endpoint.go +++ b/pkg/kubenest/tasks/endpoint.go @@ -9,6 +9,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/kubenest/common" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" "github.com/kosmos.io/kosmos/pkg/kubenest/util" @@ -60,7 +61,14 @@ func runEndPointInVirtualClusterTask(r workflow.RunData) error { return err } - err = controlplane.EnsureAPIServerExternalEndPoint(kubeClient) + resource := common.Resource{ + Namespace: data.GetNamespace(), + Name: data.GetName(), + Vc: data.VirtualCluster(), + RootClientSet: data.RemoteClient(), + } + + err = controlplane.EnsureAPIServerExternalEndPoint(kubeClient, resource) if err != nil { return err } diff --git a/pkg/kubenest/webhooks/setup.go b/pkg/kubenest/webhooks/setup.go new file mode 100644 index 000000000..1407f8c59 --- /dev/null +++ b/pkg/kubenest/webhooks/setup.go @@ -0,0 +1,16 @@ +package webhooks + +import ( + ctrl "sigs.k8s.io/controller-runtime" + + v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/kubenest/webhooks/validators" +) + +// SetupWebhookWithManager sets up the webhook with the manager +func SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&v1alpha1.VirtualCluster{}). + WithValidator(&validators.VirtualClusterValidator{}). // Updated validator + Complete() +} diff --git a/pkg/kubenest/webhooks/types.go b/pkg/kubenest/webhooks/types.go new file mode 100644 index 000000000..62c2dd229 --- /dev/null +++ b/pkg/kubenest/webhooks/types.go @@ -0,0 +1 @@ +package webhooks diff --git a/pkg/kubenest/webhooks/validators/virtualcluster_validator.go b/pkg/kubenest/webhooks/validators/virtualcluster_validator.go new file mode 100644 index 000000000..0140e7096 --- /dev/null +++ b/pkg/kubenest/webhooks/validators/virtualcluster_validator.go @@ -0,0 +1,91 @@ +package validators + +import ( + "context" + "fmt" + "k8s.io/klog/v2" + "reflect" + + v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" +) + +// VirtualClusterValidator validates VirtualCluster resources +type VirtualClusterValidator struct{} + +// ValidateCreate validates VirtualCluster during CREATE operation +func (v *VirtualClusterValidator) ValidateCreate(ctx context.Context, obj runtime.Object) error { + virtualCluster, ok := obj.(*v1alpha1.VirtualCluster) + if !ok { + return fmt.Errorf("expected a VirtualCluster object but got %T", obj) + } + + // Validate PromotePolicies: NodeCount > 0 + for _, policy := range virtualCluster.Spec.PromotePolicies { + if policy.NodeCount <= 0 { + return fmt.Errorf("PromotePolicy NodeCount must be greater than 0, found %d", policy.NodeCount) + } + } + return nil +} + +// ValidateUpdate validates VirtualCluster during UPDATE operation +func (v *VirtualClusterValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error { + klog.Info("Starting ValidateUpdate for VirtualCluster") + + oldVirtualCluster, ok := oldObj.(*v1alpha1.VirtualCluster) + if !ok { + err := fmt.Errorf("expected an old VirtualCluster object but got %T", oldObj) + klog.Error(err) + return err + } + + newVirtualCluster, ok := newObj.(*v1alpha1.VirtualCluster) + if !ok { + err := fmt.Errorf("expected a new VirtualCluster object but got %T", newObj) + klog.Error(err) + return err + } + + // If the object is being deleted, skip validation + if !newVirtualCluster.DeletionTimestamp.IsZero() { + klog.Info("VirtualCluster is being deleted, skipping validation") + return nil + } + + klog.Infof("Old VirtualCluster: %+v", oldVirtualCluster.Spec.PromoteResources.NodeInfos) + klog.Infof("New VirtualCluster: %+v", newVirtualCluster.Spec.PromoteResources.NodeInfos) + + // Check if NodeInfos has been modified + if !reflect.DeepEqual(oldVirtualCluster.Spec.PromoteResources.NodeInfos, newVirtualCluster.Spec.PromoteResources.NodeInfos) { + klog.Info("Detected modification in NodeInfos, validating against NodeCount") + + // Compute the total NodeCount from PromotePolicies + nodeCount := int32(0) + for _, policy := range newVirtualCluster.Spec.PromotePolicies { + nodeCount += policy.NodeCount + } + + klog.Infof("Computed NodeCount from PromotePolicies: %d", nodeCount) + klog.Infof("NodeInfos count in new VirtualCluster: %d", len(newVirtualCluster.Spec.PromoteResources.NodeInfos)) + + // Validate NodeInfos count matches NodeCount + if int32(len(newVirtualCluster.Spec.PromoteResources.NodeInfos)) != nodeCount { + err := fmt.Errorf("mismatch between NodeInfos count (%d) and total NodeCount (%d)", + len(newVirtualCluster.Spec.PromoteResources.NodeInfos), nodeCount) + klog.Error(err) + return err + } + } else { + klog.Info("No changes detected in NodeInfos, skipping validation") + } + + klog.Info("ValidateUpdate for VirtualCluster completed successfully") + return nil +} + +// ValidateDelete validates VirtualCluster during DELETE operation +func (v *VirtualClusterValidator) ValidateDelete(ctx context.Context, obj runtime.Object) error { + // Allow all DELETE operations without validation + return nil +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 71b0a17df..e6636f67d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "fmt" + "net" "os" "strings" @@ -31,6 +32,12 @@ func IsIPv6(s string) bool { return false } +// IsIPv4 checks if the given IP address is IPv4. +func IsIPv4(ip string) bool { + parsedIP := net.ParseIP(ip) + return parsedIP != nil && parsedIP.To4() != nil +} + func GetEnvWithDefaultValue(envName string, defaultValue string) string { v := os.Getenv(envName) if len(v) == 0 {