Skip to content

Commit

Permalink
feat: add webook and external eps support dualstack
Browse files Browse the repository at this point in the history
Signed-off-by: qiuwei <[email protected]>
  • Loading branch information
qiuwei68 committed Dec 6, 2024
1 parent c87aed4 commit 40f8530
Show file tree
Hide file tree
Showing 13 changed files with 540 additions and 160 deletions.
7 changes: 7 additions & 0 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
glnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/global.node.controller"
kosmos "github.com/kosmos.io/kosmos/pkg/kubenest/controller/kosmos"
vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller"
"github.com/kosmos.io/kosmos/pkg/kubenest/webhooks"
"github.com/kosmos.io/kosmos/pkg/scheme"
"github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag"
)
Expand Down Expand Up @@ -227,6 +228,7 @@ func run(ctx context.Context, config *config.Config) error {
LivenessEndpointName: "/healthz",
ReadinessEndpointName: "/readyz",
HealthProbeBindAddress: ":8081",
Port: 9443,
})
if err != nil {
return fmt.Errorf("failed to build controller manager: %v", err)
Expand All @@ -252,6 +254,11 @@ func run(ctx context.Context, config *config.Config) error {
return fmt.Errorf("could not create clientset: %v", err)
}

// 创建 Webhook 设置
if err := webhooks.SetupWebhookWithManager(mgr); err != nil {
return fmt.Errorf("unable to setup webhook: %v", err)
}

VirtualClusterInitController := controller.VirtualClusterInitController{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Expand Down
11 changes: 11 additions & 0 deletions deploy/virtual-cluster-operator-webhook-svc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: Service
metadata:
name: virtual-cluster-operator-webhook
namespace: kosmos-system
spec:
ports:
- port: 443 # Service 端口,API Server 会使用它访问 Webhook
targetPort: 9443 # 映射到 Pod 内的 9443 端口
selector:
app: virtual-cluster-operator
19 changes: 19 additions & 0 deletions deploy/webhooks/virtualcluster-validating-webhook.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: virtualcluster-validating-webhook
webhooks:
- name: virtualcluster.kosmos.io
clientConfig:
service:
name: virtual-cluster-operator-webhook # 服务名称
namespace: kosmos-system # 命名空间
path: /validate-kosmos-io-v1alpha1-virtualcluster # 默认路径
caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURDekNDQWZPZ0F3SUJBZ0lVTk4yRXo0ekoyY2wrTGVnWjBXaWgrYUxCSkF3d0RRWUpLb1pJaHZjTkFRRUwKQlFBd0ZURVRNQkVHQTFVRUF3d0tkMlZpYUc5dmF5MWpZVEFlRncweU5ERXhNakV3TmpNNU5UTmFGdzB6TkRFeApNVGt3TmpNNU5UTmFNQlV4RXpBUkJnTlZCQU1NQ25kbFltaHZiMnN0WTJFd2dnRWlNQTBHQ1NxR1NJYjNEUUVCCkFRVUFBNElCRHdBd2dnRUtBb0lCQVFETDBra01acnZFRXJDQVhzbzdURlZIRHVlMjFTWUx1a1RBOFpQNE9YelMKRW8wYlY1bjJ0UENUcm1JeTN5NU1rNmlZZVZoVTlOOXdzK0YxR0xzaHQ3WGZyVHExcG1wQ3NWNnBQMW9UZkZuVAptMDhWVVJCZ2JLK2VxT3QvcXMvbnBLWi9EZVN3NjRCbW9EMGNsVWJ5dWxqTHFGRTRSMXVLa0hlZlpBVklXaGhVCkRyNWNzb1Y1T1ZJUHhKaDh2d2RWcER2MldkZUxSNFJUcWVPTVJIL29QaytCa0VDYi9MMGRJdGtiQW0vcnJuZTUKWDFieUNXZ1V4eG1WdHRDbHpQTDRpcTNKRjdTYU1DRlg4dTFpMGxiSEpYYVNva0UyMzN1Z1p0ZEVqUVlOV0gyMAplc2I4aWptZ2JKV0tQbytxbzF4WHlSZ25HUmprZ3VWK2FkNW85NFZNdjgrZEFnTUJBQUdqVXpCUk1CMEdBMVVkCkRnUVdCQlFlcnp0ZGpPK2hNcmN1VTNjdGg2b3Y0MUJCT0RBZkJnTlZIU01FR0RBV2dCUWVyenRkak8raE1yY3UKVTNjdGg2b3Y0MUJCT0RBUEJnTlZIUk1CQWY4RUJUQURBUUgvTUEwR0NTcUdTSWIzRFFFQkN3VUFBNElCQVFBWgpuTjBWZ3czQ29GbGNablBjcEJPTGNEZUxKR2VWbDhyV0QrY2dGQjc5WEljL0kxYythRlpSbkVIZkJVSytpT3dVClBSZ2VSUzZiOVd0V29tVDhvMzlieWwwUWVrYVVwM3A1NGVIYUhINUtUYnJORkliZEU3bC9rRk0wMlV1eURxQU8KczNuakF3VUhjOUIwQmxLZ0ZHWFd4WWRXeWRaem00bXFGbk45ZEdlWldudXkrdHZSSzhSZjk5Rk1sNXh0dVZmYQo3amdpMHplVEx6NjBSK285cEtsN3FqK0xxZlRScVR5ZjVzbXdmSk0yS2h2ZENsb2ZKZjhxSGhhT0lZa094QnRmClBHbGpQQ0pYWk53VlgzZTBLbUV2QnNkamRLbGJjcVNHalpqLy8yVWhBTzU0aHJQUTkzem02R1FTdXZCU3lpR3YKMVB4eFdyVW1TZ25GRnJWb2F5Z20KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["kosmos.io"]
apiVersions: ["v1alpha1"]
resources: ["virtualclusters"]
admissionReviewVersions: ["v1"]
sideEffects: None
14 changes: 14 additions & 0 deletions pkg/kubenest/common/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package common

import (
"k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
)

type Resource struct {
Namespace string
Name string
Vc *v1alpha1.VirtualCluster
RootClientSet kubernetes.Interface
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,17 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -38,145 +36,158 @@ type APIServerExternalSyncController struct {
const APIServerExternalSyncControllerName string = "api-server-external-service-sync-controller"

func (e *APIServerExternalSyncController) SetupWithManager(mgr manager.Manager) error {
skipEvent := func(obj client.Object) bool {
return strings.Contains(obj.GetName(), "apiserver") && obj.GetNamespace() != ""
}

return controllerruntime.NewControllerManagedBy(mgr).
Named(APIServerExternalSyncControllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
For(&v1.Endpoints{},
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return skipEvent(createEvent.Object)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool { return skipEvent(updateEvent.ObjectNew) },
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
})).
Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(e.newVirtualClusterMapFunc())).
Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(e.newPodMapFunc())).
Complete(e)
}

func (e *APIServerExternalSyncController) newVirtualClusterMapFunc() handler.MapFunc {
func (e *APIServerExternalSyncController) newPodMapFunc() handler.MapFunc {
return func(a client.Object) []reconcile.Request {
var requests []reconcile.Request
vcluster := a.(*v1alpha1.VirtualCluster)

// Join the Reconcile queue only if the status of the vcluster is Completed
if vcluster.Status.Phase == v1alpha1.Completed {
klog.V(4).Infof("api-server-external-sync-controller: virtualcluster change to completed: %s", vcluster.Name)
// Add the vcluster to the Reconcile queue
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: vcluster.Name,
Namespace: vcluster.Namespace,
},
})
pod := a.(*v1.Pod)

// pod 的名称包含 "apiserver" 并且不包含 "kube-apiserver"
if strings.Contains(pod.Name, "apiserver") && !strings.Contains(pod.Name, "kube-apiserver") {
klog.V(4).Infof("api-server-external-sync-controller: Detected change in apiserver Pod: %s", pod.Name)

// 根据 pod 名称推断 vcluster 名称
parts := strings.SplitN(pod.Name, "-apiserver", 2)
vclusterName := parts[0]
klog.V(4).Infof("Derived vclusterName: %s from podName: %s", vclusterName, pod.Name)

// 查找与该 Pod 关联的 VirtualCluster
vcluster := &v1alpha1.VirtualCluster{}
if err := e.Client.Get(context.Background(), types.NamespacedName{
Namespace: pod.Namespace,
Name: vclusterName,
}, vcluster); err != nil {
klog.Errorf("Failed to get VirtualCluster %s: %v", vclusterName, err)
return nil
}

// 确保 VirtualCluster 状态为 Completed
if vcluster.Status.Phase == v1alpha1.Completed {
klog.V(4).Infof("VirtualCluster %s is completed, enqueueing for reconciliation", vclusterName)
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: vcluster.Name,
Namespace: vcluster.Namespace,
},
})
}
}
return requests
}
}

func (e *APIServerExternalSyncController) SyncAPIServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface) error {
kubeEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
klog.Errorf("Error getting endpoints: %v", err)
return err
}
klog.V(4).Infof("Endpoints for service 'kubernetes': %v", kubeEndpoints)
for _, subset := range kubeEndpoints.Subsets {
for _, address := range subset.Addresses {
klog.V(4).Infof("IP: %s", address.IP)
}
}

if len(kubeEndpoints.Subsets) != 1 {
return fmt.Errorf("eps %s Subsets length is not 1", "kubernetes")
}

if kubeEndpoints.Subsets[0].Addresses == nil || len(kubeEndpoints.Subsets[0].Addresses) == 0 {
klog.Errorf("eps %s Addresses length is nil", "kubernetes")
return err
}

apiServerExternalEndpoints, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("failed to get endpoints for %s : %v", constants.APIServerExternalService, err)
return err
func (e *APIServerExternalSyncController) SyncAPIServerExternalEPS(ctx context.Context, k8sClient kubernetes.Interface, vc *v1alpha1.VirtualCluster) error {
podList := &v1.PodList{}
if err := e.Client.List(ctx, podList, &client.ListOptions{
Namespace: vc.Namespace,
LabelSelector: labels.SelectorFromSet(map[string]string{
"virtualCluster-app": "apiserver",
}),
}); err != nil {
return fmt.Errorf("failed to list apiserver pods: %w", err)
}

updateEPS := apiServerExternalEndpoints.DeepCopy()

if apiServerExternalEndpoints != nil {
klog.V(4).Infof("apiServerExternalEndpoints: %v", apiServerExternalEndpoints)
} else {
klog.V(4).Info("apiServerExternalEndpoints is nil")
var addresses []v1.EndpointAddress
for _, pod := range podList.Items {
// 确保 Pod 处于 Running 状态并有 IP 地址
if pod.Status.Phase == v1.PodRunning && pod.Status.PodIP != "" {
klog.V(4).Infof("Found apiserver Pod: %s, IP: %s", pod.Name, pod.Status.PodIP)
addresses = append(addresses, v1.EndpointAddress{IP: pod.Status.PodIP})
}
}

if updateEPS != nil {
klog.V(4).Infof("updateEPS: %v", updateEPS)
} else {
klog.V(4).Info("updateEPS is nil")
apiServerPort, ok := vc.Status.PortMap[constants.APIServerPortKey]
if !ok {
return fmt.Errorf("failed to get API server port from VirtualCluster status")
}
klog.V(4).Infof("API server port: %d", apiServerPort)

// 构造 Endpoints 对象
newEndpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: constants.APIServerExternalService,
Namespace: constants.KosmosNs,
},
Subsets: []v1.EndpointSubset{
{
Addresses: addresses,
Ports: []v1.EndpointPort{
{
Name: "https",
Port: apiServerPort,
Protocol: v1.ProtocolTCP,
},
},
},
},
}

if len(updateEPS.Subsets) == 1 && len(updateEPS.Subsets[0].Addresses) == 1 {
ip := kubeEndpoints.Subsets[0].Addresses[0].IP
klog.V(4).Infof("IP address: %s", ip)
updateEPS.Subsets[0].Addresses[0].IP = ip
//避免不必要的更新
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
currentEndpoint, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Get(ctx, constants.APIServerExternalService, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Create(ctx, newEndpoint, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create api-server-external-service endpoint: %w", err)
}
klog.Infof("Created api-server-external-service Endpoint")
return nil
} else if err != nil {
return fmt.Errorf("failed to get existing api-server-external-service endpoint: %w", err)
}

if _, err := k8sClient.CoreV1().Endpoints(constants.DefaultNs).Update(ctx, updateEPS, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update endpoints for api-server-external-service: %v", err)
return err
// 比较 Endpoints 内容,判断是否需要更新
if !endpointsEqual(currentEndpoint, newEndpoint) {
_, err := k8sClient.CoreV1().Endpoints(constants.KosmosNs).Update(ctx, newEndpoint, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update api-server-external-service endpoint: %w", err)
}
klog.Infof("Updated api-server-external-service Endpoint")
} else {
klog.V(4).Info("No changes detected in Endpoint, skipping update")
}
} else {
klog.ErrorS(err, "Unexpected format of endpoints for api-server-external-service", "endpoint_data", updateEPS)
return err
}
return nil
})
}

return nil
// Endpoints 比较函数
func endpointsEqual(a, b *v1.Endpoints) bool {
return fmt.Sprintf("%v", a.Subsets) == fmt.Sprintf("%v", b.Subsets)
}

func (e *APIServerExternalSyncController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("============ %s start to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName)
defer klog.V(4).Infof("============ %s finish to reconcile %s ============", APIServerExternalSyncControllerName, request.NamespacedName)

var virtualClusterList v1alpha1.VirtualClusterList
if err := e.List(ctx, &virtualClusterList); err != nil {
var vc v1alpha1.VirtualCluster
if err := e.Get(ctx, request.NamespacedName, &vc); err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("VirtualCluster not found: %s", request.NamespacedName)
return reconcile.Result{}, nil
}
klog.V(4).Infof("query virtualcluster failed: %v", err)
klog.Errorf("Failed to get VirtualCluster: %v", err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
var targetVirtualCluster v1alpha1.VirtualCluster
hasVirtualCluster := false
for _, vc := range virtualClusterList.Items {
if vc.Namespace == request.Namespace {
targetVirtualCluster = vc
klog.V(4).Infof("virtualcluster %s found", targetVirtualCluster.Name)
hasVirtualCluster = true
break
}
}
if !hasVirtualCluster {
klog.V(4).Infof("virtualcluster %s not found", request.Namespace)
return reconcile.Result{}, nil
}

if targetVirtualCluster.Status.Phase != v1alpha1.Completed {
if vc.Status.Phase != v1alpha1.Completed {
klog.Infof("VirtualCluster %s is not in Completed phase", vc.Name)
return reconcile.Result{}, nil
}

k8sClient, err := util.GenerateKubeclient(&targetVirtualCluster)
k8sClient, err := util.GenerateKubeclient(&vc)
if err != nil {
klog.Errorf("virtualcluster %s crd kubernetes client failed: %v", targetVirtualCluster.Name, err)
klog.Errorf("Failed to generate Kubernetes client for VirtualCluster %s: %v", vc.Name, err)
return reconcile.Result{}, nil
}

if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return e.SyncAPIServerExternalEPS(ctx, k8sClient)
}); err != nil {
klog.Errorf("virtualcluster %s sync apiserver external endpoints failed: %v", targetVirtualCluster.Name, err)
if err := e.SyncAPIServerExternalEPS(ctx, k8sClient, &vc); err != nil {
klog.Errorf("Failed to sync apiserver external Endpoints: %v", err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

Expand Down
Loading

0 comments on commit 40f8530

Please sign in to comment.