diff --git a/cmd/yurt-manager/app/options/options.go b/cmd/yurt-manager/app/options/options.go index 369c5ff944f..1d1237b46b1 100644 --- a/cmd/yurt-manager/app/options/options.go +++ b/cmd/yurt-manager/app/options/options.go @@ -92,6 +92,10 @@ func (y *YurtManagerOptions) ApplyTo(c *config.Config) error { if err := y.PlatformAdminController.ApplyTo(&c.ComponentConfig.PlatformAdminController); err != nil { return err } + if err := y.GatewayPickupController.ApplyTo(&c.ComponentConfig.GatewayPickupController); err != nil { + return err + } + return nil } diff --git a/pkg/apis/raven/well_known_labels_annotations.go b/pkg/apis/raven/well_known_labels_annotations.go index bf52194bc82..7a1ed4ae707 100644 --- a/pkg/apis/raven/well_known_labels_annotations.go +++ b/pkg/apis/raven/well_known_labels_annotations.go @@ -18,5 +18,6 @@ package raven const ( // LabelCurrentGateway indicates which gateway the node is currently belonging to - LabelCurrentGateway = "raven.openyurt.io/gateway" + LabelCurrentGateway = "raven.openyurt.io/gateway" + LabelCurrentGatewayType = "raven.openyurt.io/gateway-type" ) diff --git a/pkg/yurtmanager/controller/controller.go b/pkg/yurtmanager/controller/controller.go index 2df60ea13c1..297c990dd99 100644 --- a/pkg/yurtmanager/controller/controller.go +++ b/pkg/yurtmanager/controller/controller.go @@ -27,6 +27,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodepool" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/platformadmin" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven" + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/dns" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/gatewaypickup" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/servicetopology" servicetopologyendpoints "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/servicetopology/endpoints" @@ -56,6 +57,7 @@ func init() { controllerAddFuncs[delegatelease.ControllerName] = []AddControllerFn{delegatelease.Add} controllerAddFuncs[podbinding.ControllerName] = []AddControllerFn{podbinding.Add} controllerAddFuncs[raven.GatewayPickupControllerName] = []AddControllerFn{gatewaypickup.Add} + controllerAddFuncs[raven.GatewayDNSControllerName] = []AddControllerFn{dns.Add} controllerAddFuncs[nodepool.ControllerName] = []AddControllerFn{nodepool.Add} controllerAddFuncs[yurtcoordinatorcert.ControllerName] = []AddControllerFn{yurtcoordinatorcert.Add} controllerAddFuncs[servicetopology.ControllerName] = []AddControllerFn{servicetopologyendpoints.Add, servicetopologyendpointslice.Add} diff --git a/pkg/yurtmanager/controller/raven/common.go b/pkg/yurtmanager/controller/raven/common.go index 3ea1fd29fcd..583c3be51e0 100644 --- a/pkg/yurtmanager/controller/raven/common.go +++ b/pkg/yurtmanager/controller/raven/common.go @@ -23,4 +23,5 @@ var ( const ( ControllerName = "gateway" GatewayPickupControllerName = "raven-gateway-pickup" + GatewayDNSControllerName = "raven-dns" ) diff --git a/pkg/yurtmanager/controller/raven/dns/dns_controller.go b/pkg/yurtmanager/controller/raven/dns/dns_controller.go new file mode 100644 index 00000000000..43a3e4435c7 --- /dev/null +++ b/pkg/yurtmanager/controller/raven/dns/dns_controller.go @@ -0,0 +1,261 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +import ( + "context" + "fmt" + "net" + "sort" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + appconfig "github.com/openyurtio/openyurt/cmd/yurt-manager/app/config" + common "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven" + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/utils" +) + +func Format(format string, args ...interface{}) string { + s := fmt.Sprintf(format, args...) + return fmt.Sprintf("%s: %s", common.GatewayDNSControllerName, s) +} + +// Add creates a new Ravendns Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func Add(c *appconfig.CompletedConfig, mgr manager.Manager) error { + return add(mgr, newReconciler(mgr)) +} + +var _ reconcile.Reconciler = &ReconcileDns{} + +type ReconcileDns struct { + client.Client + scheme *runtime.Scheme + recorder record.EventRecorder +} + +// newReconciler returns a new reconcile.Reconciler +func newReconciler(mgr manager.Manager) reconcile.Reconciler { + return &ReconcileDns{ + Client: mgr.GetClient(), + scheme: mgr.GetScheme(), + recorder: mgr.GetEventRecorderFor(common.GatewayDNSControllerName), + } +} + +// add adds a new Controller to mgr with r as the reconcile.Reconciler +func add(mgr manager.Manager, r reconcile.Reconciler) error { + // Create a new controller + c, err := controller.New(common.GatewayDNSControllerName, mgr, controller.Options{ + Reconciler: r, MaxConcurrentReconciles: common.ConcurrentReconciles, + }) + if err != nil { + return err + } + + // Watch for changes to service + err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &EnqueueRequestForServiceEvent{}, predicate.NewPredicateFuncs( + func(obj client.Object) bool { + svc, ok := obj.(*corev1.Service) + if !ok { + return false + } + if svc.Spec.Type != corev1.ServiceTypeClusterIP { + return false + } + return svc.Namespace == utils.WorkingNamespace && svc.Name == utils.GatewayProxyInternalService + })) + if err != nil { + return err + } + //Watch for changes to nodes + err = c.Watch(&source.Kind{Type: &corev1.Node{}}, &EnqueueRequestForNodeEvent{}) + if err != nil { + return err + } + + return nil +} + +func (r *ReconcileDns) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + klog.V(4).Info(Format("Reconcile DNS configMap for gateway %s", req.Name)) + defer func() { + klog.V(4).Info(Format("finished DNS configMap for gateway %s", req.Name)) + }() + var proxyAddress = "" + //1. ensure configmap to record dns + cm, err := r.getProxyDNS(ctx, client.ObjectKey{Namespace: utils.WorkingNamespace, Name: utils.RavenProxyNodesConfig}) + if err != nil { + return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err + } + + // 2. acquired raven global config to check whether the proxy s enabled + enableProxy, _ := utils.CheckServer(ctx, r.Client) + if !enableProxy { + r.recorder.Event(cm.DeepCopy(), corev1.EventTypeNormal, "MaintainDNSRecord", "The Raven Layer 7 proxy feature is not enabled for the cluster") + } else { + svc, err := r.getService(ctx, types.NamespacedName{Namespace: utils.WorkingNamespace, Name: utils.GatewayProxyInternalService}) + if err != nil && !apierrors.IsNotFound(err) { + klog.V(2).Infof(Format("failed to get service %s/%s", utils.WorkingNamespace, utils.GatewayProxyInternalService)) + return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err + } + if apierrors.IsNotFound(err) || svc.DeletionTimestamp != nil { + r.recorder.Event(cm.DeepCopy(), corev1.EventTypeNormal, "MaintainDNSRecord", + fmt.Sprintf("The Raven Layer 7 proxy lacks service %s/%s", utils.WorkingNamespace, utils.GatewayProxyInternalService)) + } + if svc != nil { + if svc.Spec.ClusterIP == "" { + r.recorder.Event(cm.DeepCopy(), corev1.EventTypeNormal, "MaintainDNSRecord", + fmt.Sprintf("The service %s/%s cluster IP is empty", utils.WorkingNamespace, utils.GatewayProxyInternalService)) + } else { + proxyAddress = svc.Spec.ClusterIP + } + } + } + + //3. update dns record + nodeList := new(corev1.NodeList) + err = r.Client.List(ctx, nodeList, &client.ListOptions{}) + if err != nil { + return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, fmt.Errorf("failed to list node, error %s", err.Error()) + } + cm.Data[utils.ProxyNodesKey] = buildDNSRecords(nodeList, enableProxy, proxyAddress) + err = r.updateDNS(cm) + if err != nil { + return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, fmt.Errorf("failed to update configmap %s/%s, error %s", + cm.GetNamespace(), cm.GetName(), err.Error()) + } + return reconcile.Result{}, nil +} + +func (r ReconcileDns) getProxyDNS(ctx context.Context, objKey client.ObjectKey) (*corev1.ConfigMap, error) { + var cm corev1.ConfigMap + err := wait.PollImmediate(5*time.Second, time.Minute, func() (done bool, err error) { + err = r.Client.Get(ctx, objKey, &cm) + if err != nil { + if apierrors.IsNotFound(err) { + err = r.buildRavenDNSConfigMap() + if err != nil { + klog.Errorf(Format("failed to generate dns record , error %s", err.Error())) + return false, nil + } + } + klog.Error(Format("failed to get ConfigMap %s, error %s", objKey.String(), err.Error())) + return false, nil + } + return true, nil + }) + if err != nil { + return cm.DeepCopy(), fmt.Errorf("failed to get ConfigMap %s, error %s", objKey.String(), err.Error()) + } + return cm.DeepCopy(), nil +} + +func (r *ReconcileDns) buildRavenDNSConfigMap() error { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: utils.RavenProxyNodesConfig, + Namespace: utils.WorkingNamespace, + }, + Data: map[string]string{ + utils.ProxyNodesKey: "", + }, + } + err := r.Client.Create(context.TODO(), cm, &client.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create ConfigMap %s/%s, error %s", cm.GetNamespace(), cm.GetName(), err.Error()) + } + return nil +} + +func (r *ReconcileDns) getService(ctx context.Context, objectKey client.ObjectKey) (*corev1.Service, error) { + svc := corev1.Service{} + err := r.Client.Get(ctx, objectKey, &svc) + if err != nil { + return nil, err + } + return svc.DeepCopy(), nil +} + +func (r *ReconcileDns) updateDNS(cm *corev1.ConfigMap) error { + err := r.Client.Update(context.TODO(), cm, &client.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update configmap %s/%s, %s", cm.GetNamespace(), cm.GetName(), err.Error()) + } + return nil +} + +func buildDNSRecords(nodeList *corev1.NodeList, needProxy bool, proxyIp string) string { + // record node name <-> ip address + if needProxy && proxyIp == "" { + klog.Errorf(Format("internal proxy address is empty for dns record, redirect node internal address")) + needProxy = false + } + var err error + dns := make([]string, 0, len(nodeList.Items)) + for _, node := range nodeList.Items { + ip := proxyIp + if !needProxy { + ip, err = getHostIP(&node) + if err != nil { + klog.Errorf(Format("failed to parse node address for %s, %s", node.Name, err.Error())) + continue + } + } + dns = append(dns, fmt.Sprintf("%s\t%s", ip, node.Name)) + } + sort.Strings(dns) + return strings.Join(dns, "\n") +} + +func getHostIP(node *corev1.Node) (string, error) { + // get InternalIPs first and then ExternalIPs + var internalIP, externalIP net.IP + for _, addr := range node.Status.Addresses { + switch addr.Type { + case corev1.NodeInternalIP: + ip := net.ParseIP(addr.Address) + if ip != nil { + return ip.String(), nil + } + case corev1.NodeExternalIP: + ip := net.ParseIP(addr.Address) + if ip != nil { + externalIP = ip + } + } + } + if internalIP == nil && externalIP == nil { + return "", fmt.Errorf("host IP unknown; known addresses: %v", node.Status.Addresses) + } + return externalIP.String(), nil +} diff --git a/pkg/yurtmanager/controller/raven/dns/dns_controller_test.go b/pkg/yurtmanager/controller/raven/dns/dns_controller_test.go new file mode 100644 index 00000000000..034673839df --- /dev/null +++ b/pkg/yurtmanager/controller/raven/dns/dns_controller_test.go @@ -0,0 +1,167 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/openyurtio/openyurt/pkg/apis/raven" + ravenv1v1beta1 "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/utils" +) + +const ( + ProxyIP = "172.168.0.1" + Node1Name = "node-1" + Node2Name = "node-2" + Node3Name = "node-3" + Node4Name = "node-4" + Node1Address = "192.168.0.1" + Node2Address = "192.168.0.2" + Node3Address = "192.168.0.3" + Node4Address = "192.168.0.4" + MockGateway = "gw-mock" + MockProxySvc = "x-raven-proxy-internal-svc" +) + +func mockKubeClient() client.Client { + nodeList := &v1.NodeList{ + Items: []v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: Node1Name, + Labels: map[string]string{ + raven.LabelCurrentGateway: MockGateway, + }, + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: Node1Address, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: Node2Name, + Labels: map[string]string{ + raven.LabelCurrentGateway: MockGateway, + }, + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: Node2Address, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: Node3Name, + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: Node3Address, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: Node4Name, + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: Node4Address, + }, + }, + }, + }, + }, + } + + services := &v1.ServiceList{ + Items: []v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: MockProxySvc, + Namespace: utils.WorkingNamespace, + Labels: map[string]string{ + raven.LabelCurrentGateway: MockGateway, + raven.LabelCurrentGatewayType: ravenv1v1beta1.Proxy, + }, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + ClusterIP: ProxyIP, + Ports: []v1.ServicePort{}, + }, + }, + }, + } + + configmaps := &v1.ConfigMapList{ + Items: []v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: utils.RavenProxyNodesConfig, + Namespace: utils.WorkingNamespace, + }, + Data: map[string]string{ + utils.ProxyNodesKey: "", + }, + }, + }, + } + objs := []runtime.Object{nodeList, configmaps, services} + return fake.NewClientBuilder().WithRuntimeObjects(objs...).Build() +} + +func mockReconciler() *ReconcileDns { + return &ReconcileDns{ + Client: mockKubeClient(), + recorder: record.NewFakeRecorder(100), + } +} + +func TestReconcileDns_Reconcile(t *testing.T) { + r := mockReconciler() + t.Run("get dns configmap", func(t *testing.T) { + res, err := r.Reconcile(context.Background(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: utils.WorkingNamespace, Name: utils.RavenProxyNodesConfig}}) + assert.Equal(t, reconcile.Result{}, res) + assert.Equal(t, err, nil) + }) +} diff --git a/pkg/yurtmanager/controller/raven/dns/dns_enqueue_handlers.go b/pkg/yurtmanager/controller/raven/dns/dns_enqueue_handlers.go new file mode 100644 index 00000000000..08d4871bb13 --- /dev/null +++ b/pkg/yurtmanager/controller/raven/dns/dns_enqueue_handlers.go @@ -0,0 +1,105 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" + + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/utils" +) + +type EnqueueRequestForServiceEvent struct{} + +func (h *EnqueueRequestForServiceEvent) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) { + svc, ok := e.Object.(*corev1.Service) + if !ok { + klog.Error(Format("fail to assert runtime Object to v1.Service")) + return + } + if svc.Spec.ClusterIP == "" { + klog.Error(Format("failed to get cluster IP %s/%s", svc.Namespace, svc.Name)) + return + } + + klog.V(2).Infof(Format("enqueue configmap %s/%s due to service create event", utils.WorkingNamespace, utils.RavenProxyNodesConfig)) + utils.AddDNSConfigmapToWorkQueue(q) +} + +func (h *EnqueueRequestForServiceEvent) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) { + newSvc, ok := e.ObjectNew.(*corev1.Service) + if !ok { + klog.Error(Format("fail to assert runtime Object to v1.Service")) + return + } + oldSvc, ok := e.ObjectOld.(*corev1.Service) + if !ok { + klog.Error(Format("fail to assert runtime Object to v1.Service")) + return + } + if newSvc.Spec.ClusterIP != oldSvc.Spec.ClusterIP { + klog.V(2).Infof(Format("enqueue configmap %s/%s due to service update event", utils.WorkingNamespace, utils.RavenProxyNodesConfig)) + utils.AddDNSConfigmapToWorkQueue(q) + } +} + +func (h *EnqueueRequestForServiceEvent) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + _, ok := e.Object.(*corev1.Service) + if !ok { + klog.Error(Format("fail to assert runtime Object to v1.Service")) + return + } + klog.V(2).Infof(Format("enqueue configmap %s/%s due to service update event", utils.WorkingNamespace, utils.RavenProxyNodesConfig)) + utils.AddDNSConfigmapToWorkQueue(q) + return +} + +func (h *EnqueueRequestForServiceEvent) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) { + return +} + +type EnqueueRequestForNodeEvent struct{} + +func (h *EnqueueRequestForNodeEvent) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) { + _, ok := e.Object.(*corev1.Node) + if !ok { + klog.Error(Format("fail to assert runtime Object to v1.Node")) + return + } + klog.V(2).Infof(Format("enqueue configmap %s/%s due to node create event", utils.WorkingNamespace, utils.RavenProxyNodesConfig)) + utils.AddDNSConfigmapToWorkQueue(q) +} + +func (h *EnqueueRequestForNodeEvent) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) { + return +} + +func (h *EnqueueRequestForNodeEvent) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + _, ok := e.Object.(*corev1.Node) + if !ok { + klog.Error(Format("fail to assert runtime Object to v1.Node")) + return + } + klog.V(2).Infof(Format("enqueue configmap %s/%s due to node delete event", utils.WorkingNamespace, utils.RavenProxyNodesConfig)) + utils.AddDNSConfigmapToWorkQueue(q) +} + +func (h *EnqueueRequestForNodeEvent) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) { + +} diff --git a/pkg/yurtmanager/controller/raven/dns/dns_enqueue_handlers_test.go b/pkg/yurtmanager/controller/raven/dns/dns_enqueue_handlers_test.go new file mode 100644 index 00000000000..c91cfe382bf --- /dev/null +++ b/pkg/yurtmanager/controller/raven/dns/dns_enqueue_handlers_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the License); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an AS IS BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/utils" +) + +func mockService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: utils.WorkingNamespace, + Name: utils.GatewayProxyInternalService, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: ProxyIP, + }, + } +} + +func mockNode() *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: Node1Name, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: Node1Address, + }, + }, + }, + } +} + +func TestEnqueueRequestFoServiceEvent(t *testing.T) { + h := &EnqueueRequestForServiceEvent{} + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + svc := mockService() + clearQueue := func(queue workqueue.RateLimitingInterface) { + for queue.Len() > 0 { + item, _ := queue.Get() + queue.Done(item) + } + } + h.Create(event.CreateEvent{Object: svc}, queue) + if !assert.Equal(t, 1, queue.Len()) { + t.Errorf("failed to update service, expected %d, but get %d", 1, queue.Len()) + } + clearQueue(queue) + + deletedSvc := svc.DeepCopy() + time := metav1.Now() + deletedSvc.DeletionTimestamp = &time + h.Delete(event.DeleteEvent{Object: deletedSvc}, queue) + if !assert.Equal(t, 1, queue.Len()) { + t.Errorf("failed to update service, expected %d, but get %d", 1, queue.Len()) + } + clearQueue(queue) + + newSvc := svc.DeepCopy() + newSvc.Spec.ClusterIP = "0.0.0.0" + h.Update(event.UpdateEvent{ObjectOld: svc, ObjectNew: newSvc}, queue) + if !assert.Equal(t, 1, queue.Len()) { + t.Errorf("failed to update service, expected %d, but get %d", 1, queue.Len()) + } + clearQueue(queue) +} + +func TestEnqueueRequestForNodeEvent(t *testing.T) { + h := &EnqueueRequestForNodeEvent{} + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + node := mockNode() + clearQueue := func(queue workqueue.RateLimitingInterface) { + for queue.Len() > 0 { + item, _ := queue.Get() + queue.Done(item) + } + } + h.Create(event.CreateEvent{Object: node}, queue) + if !assert.Equal(t, 1, queue.Len()) { + t.Errorf("failed to create node, expected %d, but get %d", 1, queue.Len()) + } + clearQueue(queue) + + time := metav1.Now() + deletedNode := node.DeepCopy() + deletedNode.DeletionTimestamp = &time + h.Delete(event.DeleteEvent{Object: deletedNode}, queue) + if !assert.Equal(t, 1, queue.Len()) { + t.Errorf("failed to create node, expected %d, but get %d", 1, queue.Len()) + } + clearQueue(queue) +} diff --git a/pkg/yurtmanager/controller/raven/utils/utils.go b/pkg/yurtmanager/controller/raven/utils/utils.go index 0bb880c42f0..7219ebde5d8 100644 --- a/pkg/yurtmanager/controller/raven/utils/utils.go +++ b/pkg/yurtmanager/controller/raven/utils/utils.go @@ -31,8 +31,13 @@ import ( ) const ( - WorkingNamespace = "kube-system" - RavenGlobalConfig = "raven-cfg" + WorkingNamespace = "kube-system" + RavenGlobalConfig = "raven-cfg" + GatewayProxyInternalService = "x-raven-proxy-internal-svc" + GatewayProxyServiceNamePrefix = "x-raven-proxy-svc-" + GatewayTunnelServiceNamePrefix = "x-raven-tunnel-svc-" + RavenProxyNodesConfig = "edge-tunnel-nodes" + ProxyNodesKey = "tunnel-nodes" RavenEnableProxy = "EnableL7Proxy" RavenEnableTunnel = "EnableL3Tunnel" @@ -79,5 +84,10 @@ func CheckServer(ctx context.Context, client client.Client) (enableProxy, enable enableTunnel = true } return enableProxy, enableTunnel +} +func AddDNSConfigmapToWorkQueue(q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{Namespace: WorkingNamespace, Name: RavenProxyNodesConfig}, + }) }