Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: install api-server-external-service in virtualcluster #628

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ func startEndPointsControllers(mgr manager.Manager) error {
return fmt.Errorf("error starting %s: %v", endpointscontroller.KonnectivitySyncControllerName, err)
}

ApiServerExternalSyncController := endpointscontroller.ApiServerExternalSyncController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName),
}

if err := ApiServerExternalSyncController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", endpointscontroller.ApiServerExternalSyncControllerName, err)
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kubenest/constants/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
KosmosJoinControllerName = "kosmos-join-controller"
KosmosNs = "kosmos-system"
SystemNs = "kube-system"
DefaultNs = "default"
DefaultImageRepositoryEnv = "IMAGE_REPOSITIRY"
DefaultImageVersionEnv = "IMAGE_VERSION"
DefaultCoreDnsImageTagEnv = "COREDNS_IMAGE_TAG"
Expand Down Expand Up @@ -117,6 +118,9 @@ const (
StateLabelKey = "kosmos-io/state"

KonnectivityServerSuffix = "konnectivity-server"

//in virtual cluster
ApiServerExternalService = "api-server-external-service"
)

type Action string
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package endpointcontroller

import (
"context"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
"github.com/kosmos.io/kosmos/pkg/utils"
)

type ApiServerExternalSyncController struct {
client.Client
EventRecorder record.EventRecorder
}

const ApiServerExternalSyncControllerName string = "api-server-external-service-sync-controller"

func (e *ApiServerExternalSyncController) SetupWithManager(mgr manager.Manager) error {
skipEvent := func(obj client.Object) bool {
return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != ""
}

return controllerruntime.NewControllerManagedBy(mgr).
Named(ApiServerExternalSyncControllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
For(&v1.Endpoints{},
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return skipEvent(createEvent.Object)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) },
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
})).
Complete(e)
}

func (e *ApiServerExternalSyncController) SyncApiServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error {
kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
klog.Errorf("Error getting endpoints: %v", err)
return fmt.Errorf("failed to get endpoints for kubernetes service: %v", err)
} else {
klog.Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints)
for _, subset := range kubeEndpoints.Subsets {
for _, address := range subset.Addresses {
klog.Infof("IP: %s", address.IP)
}
}
}

if len(kubeEndpoints.Subsets) != 1 {
return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes")
}

if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 {
return fmt.Errorf("eps %s Addresses length is nil", "kubernetes")
}

apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get endpoints for %s : %v", constants.ApiServerExternalService, err)
}

updateEPS := apiServerExternalEndpoints.DeepCopy()

if apiServerExternalEndpoints != nil {
klog.Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints)
} else {
klog.Info("apiServerExternalEndpoints is nil")
}

if updateEPS != nil {
klog.Infof("updateEPS: %v", updateEPS)
} else {
klog.Info("updateEPS is nil")
}

if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 {
ip := kubeEndpoints.Subsets[0].Addresses[0].IP
klog.Infof("IP address: %s", ip)
updateEPS.Subsets[0].Addresses[0].IP = ip

if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update endpoints for api-server-external-service: %v", err)
}
} else {
klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS)
return fmt.Errorf("unexpected format of endpoints for api-server-external-service")
}

return nil
}

func (e *ApiServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("============ %s start to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName)
defer klog.V(4).Infof("============ %s finish to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName)

var virtualClusterList v1alpha1.VirtualClusterList
if err := e.List(ctx, &virtualClusterList); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
klog.V(4).Infof("query virtualcluster failed: %v", err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
var targetVirtualCluster v1alpha1.VirtualCluster
hasVirtualCluster := false
for _, vc := range virtualClusterList.Items {
if vc.Namespace == request.Namespace {
targetVirtualCluster = vc
klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name)
hasVirtualCluster = true
break
}
}
if !hasVirtualCluster {
klog.V(4).Infof("virtualcluster %s not found", request.Namespace)
return reconcile.Result{}, nil
}

if targetVirtualCluster.Status.Phase != v1alpha1.AllNodeReady && targetVirtualCluster.Status.Phase != v1alpha1.Completed {
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster)
if err != nil {
klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err)
return reconcile.Result{}, nil
}

if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return e.SyncApiServerExternalEPS(ctx, k8sClient)
}); err != nil {
klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

return reconcile.Result{}, nil
}
158 changes: 158 additions & 0 deletions pkg/kubenest/controlplane/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package controlplane

import (
"context"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/virtualcluster"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
)

func EnsureApiServerExternalEndPoint(dynamicClient dynamic.Interface) error {
err := installApiServerExternalEndpointInVirtualCluster(dynamicClient)
if err != nil {
return err
}

err = installApiServerExternalServiceInVirtualCluster(dynamicClient)
if err != nil {
return err
}
return nil
}

func installApiServerExternalEndpointInVirtualCluster(dynamicClient dynamic.Interface) error {
klog.Info("begin to get kubernetes endpoint")
kubeEndpointUnstructured, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
klog.Error("get Kubernetes endpoint failed", err)
return errors.Wrap(err, "failed to get kubernetes endpoint")
}
klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpointUnstructured)

if kubeEndpointUnstructured != nil {
kubeEndpoint := &corev1.Endpoints{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeEndpointUnstructured.Object, kubeEndpoint)
if err != nil {
klog.Error("switch Kubernetes endpoint to typed object failed", err)
return errors.Wrap(err, "failed to convert kubernetes endpoint to typed object")
}

newEndpoint := kubeEndpoint.DeepCopy()
newEndpoint.Name = constants.ApiServerExternalService
newEndpoint.Namespace = constants.DefaultNs
newEndpoint.ResourceVersion = ""
newEndpointUnstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newEndpoint)
if err != nil {
klog.Error("switch new endpoint to unstructured object failed", err)
return errors.Wrap(err, "failed to convert new endpoint to unstructured object")
}
klog.V(4).Info("after switch the Endpoint unstructured is:", newEndpointUnstructuredObj)

newEndpointUnstructured := &unstructured.Unstructured{Object: newEndpointUnstructuredObj}
createResult, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs).Create(context.TODO(), newEndpointUnstructured, metav1.CreateOptions{})
if err != nil {
klog.Error("create api-server-external-service endpoint failed", err)
return errors.Wrap(err, "failed to create api-server-external-service endpoint")
} else {
klog.Info("success create api-server-external-service endpoint:", createResult)
}
} else {
return errors.New("kubernetes endpoint does not exist")
}

return nil
}

func installApiServerExternalServiceInVirtualCluster(dynamicClient dynamic.Interface) error {
port, err := getEndPointPort(dynamicClient)
if err != nil {
return fmt.Errorf("error when getEndPointPort: %w", err)
}
apiServerExternalServiceBytes, err := util.ParseTemplate(virtualcluster.ApiServerExternalService, struct {
ServicePort int32
}{
ServicePort: port,
})
if err != nil {
return fmt.Errorf("error when parsing api-server-external-serive template: %w", err)
}

var obj unstructured.Unstructured
if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &obj); err != nil {
return fmt.Errorf("err when decoding api-server-external service in virtual cluster: %w", err)
}

err = util.CreateObject(dynamicClient, "default", "api-server-external-service", &obj)
if err != nil {
return fmt.Errorf("error when creating api-server-external service in virtual cluster err: %w", err)
}
return nil
}

func getEndPointPort(dynamicClient dynamic.Interface) (int32, error) {
klog.Info("begin to get Endpoints ports...")
endpointsRes := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs)

endpointsRaw, err := endpointsRes.Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil {
klog.Errorf("get Endpoints failed: %v", err)
return 0, err
}

subsets, found, err := unstructured.NestedSlice(endpointsRaw.Object, "subsets")
if !found || err != nil {
klog.Errorf("The subsets field was not found or parsing error occurred: %v", err)
return 0, fmt.Errorf("subsets field not found or error parsing it")
}

if len(subsets) == 0 {
klog.Errorf("subsets is empty")
return 0, fmt.Errorf("No subsets found in the endpoints")
}

subset := subsets[0].(map[string]interface{})
ports, found, err := unstructured.NestedSlice(subset, "ports")
if !found || err != nil {
klog.Errorf("ports field not found or parsing error: %v", err)
return 0, fmt.Errorf("ports field not found or error parsing it")
}

if len(ports) == 0 {
klog.Errorf("Port not found in the endpoint")
return 0, fmt.Errorf("No ports found in the endpoint")
}

port := ports[0].(map[string]interface{})
portNum, found, err := unstructured.NestedInt64(port, "port")
if !found || err != nil {
klog.Errorf("ports field not found or parsing error: %v", err)
return 0, fmt.Errorf("port field not found or error parsing it")
}

klog.Infof("The port number was successfully obtained: %d", portNum)
return int32(portNum), nil
}
1 change: 1 addition & 0 deletions pkg/kubenest/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase {
initPhase.AppendTask(tasks.NewCoreDNSTask())
// add server
initPhase.AppendTask(tasks.NewComponentsFromManifestsTask())
initPhase.AppendTask(tasks.NewEndPointTask())

initPhase.SetDataInitializer(func() (workflow.RunData, error) {
return newRunData(opts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package virtualcluster

const (
ApiServerExternalService = `
apiVersion: v1
kind: Service
metadata:
name: api-server-external-service
namespace: default
spec:
type: NodePort
ports:
- protocol: TCP
port: {{ .ServicePort }}
targetPort: {{ .ServicePort }}
nodePort: 30443
`
)
2 changes: 1 addition & 1 deletion pkg/kubenest/tasks/coredns.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func runCheckCoreDnsTask(r workflow.RunData) error {
func runCoreDnsVirtualTask(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct")
return errors.New("Virtual cluster coreDns task invoked with an invalid data struct")
}

secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(),
Expand Down
Loading
Loading