Skip to content

Commit

Permalink
cherry-pick: install api-server-external-service in virtualcluster
Browse files Browse the repository at this point in the history
Signed-off-by: qiuwei <[email protected]>
  • Loading branch information
qiuwei68 committed Jun 28, 2024
1 parent 5d3be3b commit 00bd24d
Show file tree
Hide file tree
Showing 8 changed files with 417 additions and 1 deletion.
9 changes: 9 additions & 0 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ func startEndPointsControllers(mgr manager.Manager) error {
return fmt.Errorf("error starting %s: %v", endpointscontroller.KonnectivitySyncControllerName, err)
}

ApiServerExternalSyncController := endpointscontroller.ApiServerExternalSyncController{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(constants.GlobalNodeControllerName),
}

if err := ApiServerExternalSyncController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", endpointscontroller.ApiServerExternalSyncControllerName, err)
}

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kubenest/constants/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ const (
KosmosJoinControllerName = "kosmos-join-controller"
KosmosNs = "kosmos-system"
SystemNs = "kube-system"
DefaultNs = "default"
DefauleImageRepositoryEnv = "IMAGE_REPOSITIRY"
DefauleImageVersionEnv = "IMAGE_VERSION"
DefaultCoreDnsImageTagEnv = "COREDNS_IMAGE_TAG"
DefauleVirtualControllerLabelEnv = "VIRTUAL_CONTROLLER_LABEL"
VirtualClusterFinalizerName = "kosmos.io/virtual-cluster-finalizer"
ServiceType = "NodePort"
Expand Down Expand Up @@ -115,6 +117,9 @@ const (
StateLabelKey = "kosmos-io/state"

KonnectivityServerSuffix = "konnectivity-server"

//in virtual cluster
ApiServerExternalService = "api-server-external-service"
)

type Action string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package endpointcontroller

import (
"context"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
"github.com/kosmos.io/kosmos/pkg/utils"
)

type ApiServerExternalSyncController struct {
client.Client
EventRecorder record.EventRecorder
}

const ApiServerExternalSyncControllerName string = "api-server-external-service-sync-controller"

func (e *ApiServerExternalSyncController) SetupWithManager(mgr manager.Manager) error {
skipEvent := func(obj client.Object) bool {
return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != ""
}

return controllerruntime.NewControllerManagedBy(mgr).
Named(ApiServerExternalSyncControllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
For(&v1.Endpoints{},
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return skipEvent(createEvent.Object)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) },
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
})).
Complete(e)
}

func (e *ApiServerExternalSyncController) SyncApiServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error {
kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
klog.Errorf("Error getting endpoints: %v", err)
return fmt.Errorf("failed to get endpoints for kubernetes service: %v", err)
} else {
klog.Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints)
for _, subset := range kubeEndpoints.Subsets {
for _, address := range subset.Addresses {
klog.Infof("IP: %s", address.IP)
}
}
}

if len(kubeEndpoints.Subsets) != 1 {
return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes")
}

if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 {
return fmt.Errorf("eps %s Addresses length is nil", "kubernetes")
}

apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get endpoints for %s : %v", constants.ApiServerExternalService, err)
}

updateEPS := apiServerExternalEndpoints.DeepCopy()

if apiServerExternalEndpoints != nil {
klog.Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints)
} else {
klog.Info("apiServerExternalEndpoints is nil")
}

if updateEPS != nil {
klog.Infof("updateEPS: %v", updateEPS)
} else {
klog.Info("updateEPS is nil")
}

if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 {
ip := kubeEndpoints.Subsets[0].Addresses[0].IP
klog.Infof("IP address: %s", ip)
updateEPS.Subsets[0].Addresses[0].IP = ip

if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update endpoints for api-server-external-service: %v", err)
}
} else {
klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS)
return fmt.Errorf("unexpected format of endpoints for api-server-external-service")
}

return nil
}

func (e *ApiServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("============ %s start to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName)
defer klog.V(4).Infof("============ %s finish to reconcile %s ============", ApiServerExternalSyncControllerName, request.NamespacedName)

var virtualClusterList v1alpha1.VirtualClusterList
if err := e.List(ctx, &virtualClusterList); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
klog.V(4).Infof("query virtualcluster failed: %v", err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
var targetVirtualCluster v1alpha1.VirtualCluster
hasVirtualCluster := false
for _, vc := range virtualClusterList.Items {
if vc.Namespace == request.Namespace {
targetVirtualCluster = vc
klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name)
hasVirtualCluster = true
break
}
}
if !hasVirtualCluster {
klog.V(4).Infof("virtualcluster %s not found", request.Namespace)
return reconcile.Result{}, nil
}

if targetVirtualCluster.Status.Phase != v1alpha1.AllNodeReady && targetVirtualCluster.Status.Phase != v1alpha1.Completed {
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster)
if err != nil {
klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err)
return reconcile.Result{}, nil
}

if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return e.SyncApiServerExternalEPS(ctx, k8sClient)
}); err != nil {
klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

return reconcile.Result{}, nil
}
158 changes: 158 additions & 0 deletions pkg/kubenest/controlplane/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package controlplane

import (
"context"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/virtualcluster"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
)

func EnsureApiServerExternalEndPoint(dynamicClient dynamic.Interface) error {
err := installApiServerExternalEndpointInVirtualCluster(dynamicClient)
if err != nil {
return err
}

err = installApiServerExternalServiceInVirtualCluster(dynamicClient)
if err != nil {
return err
}
return nil
}

func installApiServerExternalEndpointInVirtualCluster(dynamicClient dynamic.Interface) error {
klog.Info("begin to get kubernetes endpoint")
kubeEndpointUnstructured, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
klog.Error("get Kubernetes endpoint failed", err)
return errors.Wrap(err, "failed to get kubernetes endpoint")
}
klog.V(4).Info("the Kubernetes endpoint is:", kubeEndpointUnstructured)

if kubeEndpointUnstructured != nil {
kubeEndpoint := &corev1.Endpoints{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeEndpointUnstructured.Object, kubeEndpoint)
if err != nil {
klog.Error("switch Kubernetes endpoint to typed object failed", err)
return errors.Wrap(err, "failed to convert kubernetes endpoint to typed object")
}

newEndpoint := kubeEndpoint.DeepCopy()
newEndpoint.Name = constants.ApiServerExternalService
newEndpoint.Namespace = constants.DefaultNs
newEndpoint.ResourceVersion = ""
newEndpointUnstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newEndpoint)
if err != nil {
klog.Error("switch new endpoint to unstructured object failed", err)
return errors.Wrap(err, "failed to convert new endpoint to unstructured object")
}
klog.V(4).Info("after switch the Endpoint unstructured is:", newEndpointUnstructuredObj)

newEndpointUnstructured := &unstructured.Unstructured{Object: newEndpointUnstructuredObj}
createResult, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs).Create(context.TODO(), newEndpointUnstructured, metav1.CreateOptions{})
if err != nil {
klog.Error("create api-server-external-service endpoint failed", err)
return errors.Wrap(err, "failed to create api-server-external-service endpoint")
} else {
klog.Info("success create api-server-external-service endpoint:", createResult)
}
} else {
return errors.New("kubernetes endpoint does not exist")
}

return nil
}

func installApiServerExternalServiceInVirtualCluster(dynamicClient dynamic.Interface) error {
port, err := getEndPointPort(dynamicClient)
if err != nil {
return fmt.Errorf("error when getEndPointPort: %w", err)
}
apiServerExternalServiceBytes, err := util.ParseTemplate(virtualcluster.ApiServerExternalService, struct {
ServicePort int32
}{
ServicePort: port,
})
if err != nil {
return fmt.Errorf("error when parsing api-server-external-serive template: %w", err)
}

var obj unstructured.Unstructured
if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &obj); err != nil {
return fmt.Errorf("err when decoding api-server-external service in virtual cluster: %w", err)
}

err = util.CreateObject(dynamicClient, "default", "api-server-external-service", &obj)
if err != nil {
return fmt.Errorf("error when creating api-server-external service in virtual cluster err: %w", err)
}
return nil
}

func getEndPointPort(dynamicClient dynamic.Interface) (int32, error) {
klog.Info("begin to get Endpoints ports...")
endpointsRes := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "endpoints",
}).Namespace(constants.DefaultNs)

endpointsRaw, err := endpointsRes.Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil {
klog.Errorf("get Endpoints failed: %v", err)
return 0, err
}

subsets, found, err := unstructured.NestedSlice(endpointsRaw.Object, "subsets")
if !found || err != nil {
klog.Errorf("The subsets field was not found or parsing error occurred: %v", err)
return 0, fmt.Errorf("subsets field not found or error parsing it")
}

if len(subsets) == 0 {
klog.Errorf("subsets is empty")
return 0, fmt.Errorf("No subsets found in the endpoints")
}

subset := subsets[0].(map[string]interface{})
ports, found, err := unstructured.NestedSlice(subset, "ports")
if !found || err != nil {
klog.Errorf("ports field not found or parsing error: %v", err)
return 0, fmt.Errorf("ports field not found or error parsing it")
}

if len(ports) == 0 {
klog.Errorf("Port not found in the endpoint")
return 0, fmt.Errorf("No ports found in the endpoint")
}

port := ports[0].(map[string]interface{})
portNum, found, err := unstructured.NestedInt64(port, "port")
if !found || err != nil {
klog.Errorf("ports field not found or parsing error: %v", err)
return 0, fmt.Errorf("port field not found or error parsing it")
}

klog.Infof("The port number was successfully obtained: %d", portNum)
return int32(portNum), nil
}
1 change: 1 addition & 0 deletions pkg/kubenest/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase {
initPhase.AppendTask(tasks.NewCoreDNSTask())
// add server
initPhase.AppendTask(tasks.NewComponentsFromManifestsTask())
initPhase.AppendTask(tasks.NewEndPointTask())

initPhase.SetDataInitializer(func() (workflow.RunData, error) {
return newRunData(opts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package virtualcluster

const (
ApiServerExternalService = `
apiVersion: v1
kind: Service
metadata:
name: api-server-external-service
namespace: default
spec:
type: NodePort
ports:
- protocol: TCP
port: {{ .ServicePort }}
targetPort: {{ .ServicePort }}
nodePort: 30443
`
)
2 changes: 1 addition & 1 deletion pkg/kubenest/tasks/coredns.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func runCheckCoreDnsTask(r workflow.RunData) error {
func runCoreDnsVirtualTask(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct")
return errors.New("Virtual cluster coreDns task invoked with an invalid data struct")
}

secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(),
Expand Down
Loading

0 comments on commit 00bd24d

Please sign in to comment.