diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index 56c33d4cb..fb67581da 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -75,6 +75,15 @@ func startEndPointsControllers(mgr manager.Manager) error { return fmt.Errorf("error starting %s: %v", endpointscontroller.KonnectivitySyncControllerName, err) } + ApiServerExternalSyncController := endpointscontroller.ApiServerExternalSyncController{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName), + } + + if err := ApiServerExternalSyncController.SetupWithManager(mgr); err != nil { + return fmt.Errorf("error starting %s: %v", endpointscontroller.ApiServerExternalSyncControllerName, err) + } + return nil } diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 914b7c265..a46013008 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -13,8 +13,10 @@ const ( KosmosJoinControllerName = "kosmos-join-controller" KosmosNs = "kosmos-system" SystemNs = "kube-system" + DefaultNs = "default" DefauleImageRepositoryEnv = "IMAGE_REPOSITIRY" DefauleImageVersionEnv = "IMAGE_VERSION" + DefaultCoreDnsImageTagEnv = "COREDNS_IMAGE_TAG" DefauleVirtualControllerLabelEnv = "VIRTUAL_CONTROLLER_LABEL" VirtualClusterFinalizerName = "kosmos.io/virtual-cluster-finalizer" ServiceType = "NodePort" @@ -115,6 +117,9 @@ const ( StateLabelKey = "kosmos-io/state" KonnectivityServerSuffix = "konnectivity-server" + + //in virtual cluster + ApiServerExternalService = "api-server-external-service" ) type Action string diff --git a/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go new file mode 100644 index 000000000..231cb0f29 --- /dev/null +++ b/pkg/kubenest/controller/endpoints.sync.controller/apiserver_external_sync_controller.go @@ -0,0 +1,158 @@ +package endpointcontroller + +import ( + "context" + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/utils" +) + +type ApiServerExternalSyncController struct { + client.Client + EventRecorder record.EventRecorder +} + +const ApiServerExternalSyncControllerName string = "api-server-external-service-sync-controller" + +func (e *ApiServerExternalSyncController) SetupWithManager(mgr manager.Manager) error { + skipEvent := func(obj client.Object) bool { + return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != "" + } + + return controllerruntime.NewControllerManagedBy(mgr). + Named(ApiServerExternalSyncControllerName). + WithOptions(controller.Options{MaxConcurrentReconciles: 5}). + For(&v1.Endpoints{}, + builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return skipEvent(createEvent.Object) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false }, + })). + Complete(e) +} + +func (e *ApiServerExternalSyncController) SyncApiServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error { + kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{}) + if err != nil { + klog.Errorf("Error getting endpoints: %v", err) + return fmt.Errorf("failed to get endpoints for kubernetes service: %v", err) + } else { + klog.Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints) + for _, subset := range kubeEndpoints.Subsets { + for _, address := range subset.Addresses { + klog.Infof("IP: %s", address.IP) + } + } + } + + if len(kubeEndpoints.Subsets) != 1 { + return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes") + } + + if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 { + return fmt.Errorf("eps %s Addresses length is nil", "kubernetes") + } + + apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.ApiServerExternalService, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get endpoints for %s : %v", constants.ApiServerExternalService, err) + } + + updateEPS := apiServerExternalEndpoints.DeepCopy() + + if apiServerExternalEndpoints != nil { + klog.Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints) + } else { + klog.Info("apiServerExternalEndpoints is nil") + } + + if updateEPS != nil { + klog.Infof("updateEPS: %v", updateEPS) + } else { + klog.Info("updateEPS is nil") + } + + if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 { + ip := kubeEndpoints.Subsets[0].Addresses[0].IP + klog.Infof("IP address: %s", ip) + updateEPS.Subsets[0].Addresses[0].IP = ip + + if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update endpoints for api-server-external-service: %v", err) + } + } else { + klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS) + return fmt.Errorf("unexpected format of endpoints for api-server-external-service") + } + + return nil +} + +func (e *ApiServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + klog.V(4).Infof("============ %s start to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName) + defer klog.V(4).Infof("============ %s finish to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName) + + var virtualClusterList v1alpha1.VirtualClusterList + if err := e.List(ctx, &virtualClusterList); err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(4).Infof("query virtualcluster failed: %v", err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + var targetVirtualCluster v1alpha1.VirtualCluster + hasVirtualCluster := false + for _, vc := range virtualClusterList.Items { + if vc.Namespace == request.Namespace { + targetVirtualCluster = vc + klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name) + hasVirtualCluster = true + break + } + } + if !hasVirtualCluster { + klog.V(4).Infof("virtualcluster %s not found", request.Namespace) + return reconcile.Result{}, nil + } + + if targetVirtualCluster.Status.Phase != v1alpha1.AllNodeReady && targetVirtualCluster.Status.Phase != v1alpha1.Completed { + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + + k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster) + if err != nil { + klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err) + return reconcile.Result{}, nil + } + + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + return e.SyncApiServerExternalEPS(ctx, k8sClient) + }); err != nil { + klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err) + return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil + } + + return reconcile.Result{}, nil +} diff --git a/pkg/kubenest/controlplane/endpoint.go b/pkg/kubenest/controlplane/endpoint.go new file mode 100644 index 000000000..f0e05a76e --- /dev/null +++ b/pkg/kubenest/controlplane/endpoint.go @@ -0,0 +1,158 @@ +package controlplane + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/dynamic" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/virtualcluster" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +func EnsureApiServerExternalEndPoint(dynamicClient dynamic.Interface) error { + err := installApiServerExternalEndpointInVirtualCluster(dynamicClient) + if err != nil { + return err + } + + err = installApiServerExternalServiceInVirtualCluster(dynamicClient) + if err != nil { + return err + } + return nil +} + +func installApiServerExternalEndpointInVirtualCluster(dynamicClient dynamic.Interface) error { + klog.Info("begin to get kubernetes endpoint") + kubeEndpointUnstructured, err := dynamicClient.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "endpoints", + }).Namespace(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{}) + if err != nil { + klog.Error("get Kubernetes endpoint failed", err) + return errors.Wrap(err, "failed to get kubernetes endpoint") + } + klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpointUnstructured) + + if kubeEndpointUnstructured != nil { + kubeEndpoint := &corev1.Endpoints{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeEndpointUnstructured.Object, kubeEndpoint) + if err != nil { + klog.Error("switch Kubernetes endpoint to typed object failed", err) + return errors.Wrap(err, "failed to convert kubernetes endpoint to typed object") + } + + newEndpoint := kubeEndpoint.DeepCopy() + newEndpoint.Name = constants.ApiServerExternalService + newEndpoint.Namespace = constants.DefaultNs + newEndpoint.ResourceVersion = "" + newEndpointUnstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newEndpoint) + if err != nil { + klog.Error("switch new endpoint to unstructured object failed", err) + return errors.Wrap(err, "failed to convert new endpoint to unstructured object") + } + klog.V(4).Info("after switch the Endpoint unstructured is:", newEndpointUnstructuredObj) + + newEndpointUnstructured := &unstructured.Unstructured{Object: newEndpointUnstructuredObj} + createResult, err := dynamicClient.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "endpoints", + }).Namespace(constants.DefaultNs).Create(context.TODO(), newEndpointUnstructured, metav1.CreateOptions{}) + if err != nil { + klog.Error("create api-server-external-service endpoint failed", err) + return errors.Wrap(err, "failed to create api-server-external-service endpoint") + } else { + klog.Info("success create api-server-external-service endpoint:", createResult) + } + } else { + return errors.New("kubernetes endpoint does not exist") + } + + return nil +} + +func installApiServerExternalServiceInVirtualCluster(dynamicClient dynamic.Interface) error { + port, err := getEndPointPort(dynamicClient) + if err != nil { + return fmt.Errorf("error when getEndPointPort: %w", err) + } + apiServerExternalServiceBytes, err := util.ParseTemplate(virtualcluster.ApiServerExternalService, struct { + ServicePort int32 + }{ + ServicePort: port, + }) + if err != nil { + return fmt.Errorf("error when parsing api-server-external-serive template: %w", err) + } + + var obj unstructured.Unstructured + if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &obj); err != nil { + return fmt.Errorf("err when decoding api-server-external service in virtual cluster: %w", err) + } + + err = util.CreateObject(dynamicClient, "default", "api-server-external-service", &obj) + if err != nil { + return fmt.Errorf("error when creating api-server-external service in virtual cluster err: %w", err) + } + return nil +} + +func getEndPointPort(dynamicClient dynamic.Interface) (int32, error) { + klog.Info("begin to get Endpoints ports...") + endpointsRes := dynamicClient.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "endpoints", + }).Namespace(constants.DefaultNs) + + endpointsRaw, err := endpointsRes.Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{}) + if err != nil { + klog.Errorf("get Endpoints failed: %v", err) + return 0, err + } + + subsets, found, err := unstructured.NestedSlice(endpointsRaw.Object, "subsets") + if !found || err != nil { + klog.Errorf("The subsets field was not found or parsing error occurred: %v", err) + return 0, fmt.Errorf("subsets field not found or error parsing it") + } + + if len(subsets) == 0 { + klog.Errorf("subsets is empty") + return 0, fmt.Errorf("No subsets found in the endpoints") + } + + subset := subsets[0].(map[string]interface{}) + ports, found, err := unstructured.NestedSlice(subset, "ports") + if !found || err != nil { + klog.Errorf("ports field not found or parsing error: %v", err) + return 0, fmt.Errorf("ports field not found or error parsing it") + } + + if len(ports) == 0 { + klog.Errorf("Port not found in the endpoint") + return 0, fmt.Errorf("No ports found in the endpoint") + } + + port := ports[0].(map[string]interface{}) + portNum, found, err := unstructured.NestedInt64(port, "port") + if !found || err != nil { + klog.Errorf("ports field not found or parsing error: %v", err) + return 0, fmt.Errorf("port field not found or error parsing it") + } + + klog.Infof("The port number was successfully obtained: %d", portNum) + return int32(portNum), nil +} diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 2cecd402b..b36cd7ea3 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -68,6 +68,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase { initPhase.AppendTask(tasks.NewCoreDNSTask()) // add server initPhase.AppendTask(tasks.NewComponentsFromManifestsTask()) + initPhase.AppendTask(tasks.NewEndPointTask()) initPhase.SetDataInitializer(func() (workflow.RunData, error) { return newRunData(opts) diff --git a/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go b/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go new file mode 100644 index 000000000..1ac24b81d --- /dev/null +++ b/pkg/kubenest/manifest/controlplane/virtualcluster/manifests_service.go @@ -0,0 +1,18 @@ +package virtualcluster + +const ( + ApiServerExternalService = ` +apiVersion: v1 +kind: Service +metadata: + name: api-server-external-service + namespace: default +spec: + type: NodePort + ports: + - protocol: TCP + port: {{ .ServicePort }} + targetPort: {{ .ServicePort }} + nodePort: 30443 +` +) diff --git a/pkg/kubenest/tasks/coredns.go b/pkg/kubenest/tasks/coredns.go index 037873c7d..526fa121a 100644 --- a/pkg/kubenest/tasks/coredns.go +++ b/pkg/kubenest/tasks/coredns.go @@ -188,7 +188,7 @@ func runCheckCoreDnsTask(r workflow.RunData) error { func runCoreDnsVirtualTask(r workflow.RunData) error { data, ok := r.(InitData) if !ok { - return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct") + return errors.New("Virtual cluster coreDns task invoked with an invalid data struct") } secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), diff --git a/pkg/kubenest/tasks/endpoint.go b/pkg/kubenest/tasks/endpoint.go new file mode 100644 index 000000000..edfa81e6f --- /dev/null +++ b/pkg/kubenest/tasks/endpoint.go @@ -0,0 +1,67 @@ +package tasks + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/controlplane" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +func NewEndPointTask() workflow.Task { + return workflow.Task{ + Name: "endpoint", + Run: runEndpoint, + RunSubTasks: true, + Tasks: []workflow.Task{ + { + Name: "deploy-endpoint-in-virtual-cluster", + Run: runEndPointInVirtualClusterTask, + }, + }, + } +} + +func runEndpoint(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("endPoint task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[endPoint] Running endPoint task", "virtual cluster", klog.KObj(data)) + return nil +} + +func runEndPointInVirtualClusterTask(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("Virtual cluster endpoint task invoked with an invalid data struct") + } + + secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), + fmt.Sprintf("%s-%s", data.GetName(), constants.AdminConfig), metav1.GetOptions{}) + if err != nil { + return errors.Wrap(err, "Get virtualcluster kubeconfig secret error") + } + config, err := clientcmd.RESTConfigFromKubeConfig(secret.Data[constants.KubeConfig]) + if err != nil { + return err + } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return err + } + + err = controlplane.EnsureApiServerExternalEndPoint(dynamicClient) + if err != nil { + return err + } + return nil +}