From 1f82b2c0af2d01a1b9581ea54ecb2c70085e6286 Mon Sep 17 00:00:00 2001 From: SataQiu <1527062125@qq.com> Date: Thu, 22 Apr 2021 19:11:40 +0800 Subject: [PATCH] feature: add dns controller to support access edge node by hostname --- cmd/yurt-tunnel-server/app/config/config.go | 2 + cmd/yurt-tunnel-server/app/options/options.go | 8 + cmd/yurt-tunnel-server/app/start.go | 12 + config/setup/yurt-tunnel-server.yaml | 1 + config/yaml-template/yurt-tunnel-server.yaml | 1 + .../constants/yurt-tunnel-server-tmpl.go | 1 + pkg/yurttunnel/dns/dns.go | 462 ++++++++++++++++++ pkg/yurttunnel/dns/event.go | 36 ++ pkg/yurttunnel/dns/handler.go | 246 ++++++++++ pkg/yurttunnel/dns/util.go | 116 +++++ pkg/yurttunnel/iptables/iptables.go | 57 +-- pkg/yurttunnel/util/util.go | 60 ++- 12 files changed, 950 insertions(+), 52 deletions(-) create mode 100644 pkg/yurttunnel/dns/dns.go create mode 100644 pkg/yurttunnel/dns/event.go create mode 100644 pkg/yurttunnel/dns/handler.go create mode 100644 pkg/yurttunnel/dns/util.go diff --git a/cmd/yurt-tunnel-server/app/config/config.go b/cmd/yurt-tunnel-server/app/config/config.go index 25961c4560f..f8940a7c4ce 100644 --- a/cmd/yurt-tunnel-server/app/config/config.go +++ b/cmd/yurt-tunnel-server/app/config/config.go @@ -28,7 +28,9 @@ import ( type Config struct { EgressSelectorEnabled bool EnableIptables bool + EnableDNSController bool IptablesSyncPeriod int + DNSSyncPeriod int CertDNSNames []string CertIPs []net.IP ListenAddrForAgent string diff --git a/cmd/yurt-tunnel-server/app/options/options.go b/cmd/yurt-tunnel-server/app/options/options.go index 1c733589db4..aa729751642 100644 --- a/cmd/yurt-tunnel-server/app/options/options.go +++ b/cmd/yurt-tunnel-server/app/options/options.go @@ -43,8 +43,10 @@ type ServerOptions struct { CertIPs string Version bool EnableIptables bool + EnableDNSController bool EgressSelectorEnabled bool IptablesSyncPeriod int + DNSSyncPeriod int TunnelAgentConnectPort string SecurePort string InsecurePort string @@ -59,7 +61,9 @@ func NewServerOptions() *ServerOptions { BindAddr: "0.0.0.0", InsecureBindAddr: "127.0.0.1", EnableIptables: true, + EnableDNSController: true, IptablesSyncPeriod: 60, + DNSSyncPeriod: 1800, ServerCount: 1, TunnelAgentConnectPort: constants.YurttunnelServerAgentPort, SecurePort: constants.YurttunnelServerMasterPort, @@ -88,8 +92,10 @@ func (o *ServerOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.CertDNSNames, "cert-dns-names", o.CertDNSNames, "DNS names that will be added into server's certificate. (e.g., dns1,dns2)") fs.StringVar(&o.CertIPs, "cert-ips", o.CertIPs, "IPs that will be added into server's certificate. (e.g., ip1,ip2)") fs.BoolVar(&o.EnableIptables, "enable-iptables", o.EnableIptables, "If allow iptable manager to set the dnat rule.") + fs.BoolVar(&o.EnableDNSController, "enable-dns-controller", o.EnableDNSController, "If allow DNS controller to set the dns rules.") fs.BoolVar(&o.EgressSelectorEnabled, "egress-selector-enable", o.EgressSelectorEnabled, "If the apiserver egress selector has been enabled.") fs.IntVar(&o.IptablesSyncPeriod, "iptables-sync-period", o.IptablesSyncPeriod, "The synchronization period of the iptable manager.") + fs.IntVar(&o.DNSSyncPeriod, "dns-sync-period", o.DNSSyncPeriod, "The synchronization period of the DNS controller.") fs.IntVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server.") fs.StringVar(&o.ProxyStrategy, "proxy-strategy", o.ProxyStrategy, "The strategy of proxying requests from tunnel server to agent.") fs.StringVar(&o.TunnelAgentConnectPort, "tunnel-agent-connect-port", o.TunnelAgentConnectPort, "The port on which to serve tcp packets from tunnel agent") @@ -103,7 +109,9 @@ func (o *ServerOptions) Config() (*config.Config, error) { cfg := &config.Config{ EgressSelectorEnabled: o.EgressSelectorEnabled, EnableIptables: o.EnableIptables, + EnableDNSController: o.EnableDNSController, IptablesSyncPeriod: o.IptablesSyncPeriod, + DNSSyncPeriod: o.DNSSyncPeriod, CertDNSNames: make([]string, 0), CertIPs: make([]net.IP, 0), ServerCount: o.ServerCount, diff --git a/cmd/yurt-tunnel-server/app/start.go b/cmd/yurt-tunnel-server/app/start.go index bf9f1500155..b5b2b7b8870 100644 --- a/cmd/yurt-tunnel-server/app/start.go +++ b/cmd/yurt-tunnel-server/app/start.go @@ -24,6 +24,7 @@ import ( "github.com/openyurtio/openyurt/cmd/yurt-tunnel-server/app/options" "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurttunnel/constants" + "github.com/openyurtio/openyurt/pkg/yurttunnel/dns" "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper/initializer" "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper/wraphandler" "github.com/openyurtio/openyurt/pkg/yurttunnel/iptables" @@ -73,6 +74,17 @@ func NewYurttunnelServerCommand(stopCh <-chan struct{}) *cobra.Command { // run starts the yurttunel-server func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error { + // 0. start the DNS controller + if cfg.EnableDNSController { + dnsController, err := dns.NewCoreDNSRecordController(cfg.Client, + cfg.SharedInformerFactory, + cfg.ListenInsecureAddrForMaster, + cfg.DNSSyncPeriod) + if err != nil { + return fmt.Errorf("fail to create a new dnsController, %v", err) + } + go dnsController.Run(stopCh) + } // 1. start the IP table manager if cfg.EnableIptables { iptablesMgr := iptables.NewIptablesManager(cfg.Client, diff --git a/config/setup/yurt-tunnel-server.yaml b/config/setup/yurt-tunnel-server.yaml index 2b8141d3298..53013ceeff8 100644 --- a/config/setup/yurt-tunnel-server.yaml +++ b/config/setup/yurt-tunnel-server.yaml @@ -127,6 +127,7 @@ spec: - yurt-tunnel-server args: - --bind-address=$(NODE_IP) + - --insecure-bind-address=$(NODE_IP) - --proxy-strategy=destHost - --v=2 env: diff --git a/config/yaml-template/yurt-tunnel-server.yaml b/config/yaml-template/yurt-tunnel-server.yaml index f566f963ac7..8938ddada3c 100644 --- a/config/yaml-template/yurt-tunnel-server.yaml +++ b/config/yaml-template/yurt-tunnel-server.yaml @@ -127,6 +127,7 @@ spec: - __project_prefix__-tunnel-server args: - --bind-address=$(NODE_IP) + - --insecure-bind-address=$(NODE_IP) - --proxy-strategy=destHost - --v=2 env: diff --git a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go index c26129c7a09..e49bf00e2fb 100644 --- a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go +++ b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go @@ -152,6 +152,7 @@ spec: - yurt-tunnel-server args: - --bind-address=$(NODE_IP) + - --insecure-bind-address=$(NODE_IP) - --server-count=1 env: - name: NODE_IP diff --git a/pkg/yurttunnel/dns/dns.go b/pkg/yurttunnel/dns/dns.go new file mode 100644 index 00000000000..46ec8cc9c5a --- /dev/null +++ b/pkg/yurttunnel/dns/dns.go @@ -0,0 +1,462 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +import ( + "context" + "fmt" + "net" + "os" + "sort" + "strconv" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + corelister "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurttunnel/constants" + "github.com/openyurtio/openyurt/pkg/yurttunnel/util" +) + +const ( + maxRetries = 15 + minSyncPeriod = 30 + + yurttunnelDNSRecordConfigMapNs = "kube-system" + yurttunnelDNSRecordNodeDataKey = "tunnel-nodes" + + dnatPortPrefix = "dnat-" +) + +var ( + yurttunnelDNSRecordConfigMapName = fmt.Sprintf("%s-tunnel-nodes", + strings.TrimRightFunc(projectinfo.GetProjectPrefix(), func(c rune) bool { return c == '-' })) +) + +// DNSRecordController interface defines the method for synchronizing +// the node dns records with k8s DNS component(such as CoreDNS) +type DNSRecordController interface { + Run(stopCh <-chan struct{}) +} + +// coreDNSRecordController implements the DNSRecordController +type coreDNSRecordController struct { + lock sync.Mutex + kubeClient clientset.Interface + sharedInformerFactor informers.SharedInformerFactory + nodeLister corelister.NodeLister + nodeListerSynced cache.InformerSynced + svcInformerSynced cache.InformerSynced + cmInformerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + tunnelServerIP string + insecurePort int + syncPeriod int +} + +// NewCoreDNSRecordController create a CoreDNSRecordController that synchronizes node dns records with CoreDNS configuration +func NewCoreDNSRecordController(client clientset.Interface, + informerFactory informers.SharedInformerFactory, + listenInsecureAddr string, + syncPeriod int) (DNSRecordController, error) { + + _, insecurePortStr, err := net.SplitHostPort(listenInsecureAddr) + if err != nil { + return nil, err + } + + insecurePort, err := strconv.Atoi(insecurePortStr) + if err != nil { + return nil, err + } + + dnsctl := &coreDNSRecordController{ + kubeClient: client, + syncPeriod: syncPeriod, + insecurePort: insecurePort, + sharedInformerFactor: informerFactory, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tunnel-dns"), + } + + nodeInformer := informerFactory.Core().V1().Nodes() + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dnsctl.addNode, + UpdateFunc: dnsctl.updateNode, + DeleteFunc: dnsctl.deleteNode, + }) + dnsctl.nodeLister = nodeInformer.Lister() + dnsctl.nodeListerSynced = nodeInformer.Informer().HasSynced + + svcInformer := informerFactory.InformerFor(&corev1.Service{}, newServiceInformer) + svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dnsctl.addService, + UpdateFunc: dnsctl.updateService, + DeleteFunc: dnsctl.deleteService, + }) + dnsctl.svcInformerSynced = svcInformer.HasSynced + + cmInformer := informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigMapInformer) + cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dnsctl.addConfigMap, + UpdateFunc: dnsctl.updateConfigMap, + DeleteFunc: dnsctl.deleteConfigMap, + }) + dnsctl.cmInformerSynced = cmInformer.HasSynced + + // override syncPeriod when the specified value is too small + if dnsctl.syncPeriod < minSyncPeriod { + dnsctl.syncPeriod = minSyncPeriod + } + + return dnsctl, nil +} + +// newServiceInformer creates a shared index informer that returns only interested services +func newServiceInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + selector := fmt.Sprintf("metadata.name=%v", constants.YurttunnelServerServiceName) + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = selector + } + return coreinformers.NewFilteredServiceInformer(cs, constants.YurttunnelServerServiceNs, resyncPeriod, nil, tweakListOptions) +} + +// newConfigMapInformer creates a shared index informer that returns only interested configmaps +func newConfigMapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + selector := fmt.Sprintf("metadata.name=%v", util.YurttunnelServerDnatConfigMapName) + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = selector + } + return coreinformers.NewFilteredConfigMapInformer(cs, util.YurttunnelServerDnatConfigMapNs, resyncPeriod, nil, tweakListOptions) +} + +func (dnsctl *coreDNSRecordController) Run(stopCh <-chan struct{}) { + electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) + id, err := os.Hostname() + if err != nil { + klog.Fatalf("failed to get hostname, %v", err) + } + rl, err := resourcelock.New("leases", metav1.NamespaceSystem, "tunnel-dns-controller", + dnsctl.kubeClient.CoreV1(), + dnsctl.kubeClient.CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: id + "_" + string(uuid.NewUUID()), + }) + if err != nil { + klog.Fatalf("error creating tunnel-dns-controller lock, %v", err) + } + + leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: metav1.Duration{Duration: time.Second * time.Duration(15)}.Duration, + RenewDeadline: metav1.Duration{Duration: time.Second * time.Duration(10)}.Duration, + RetryPeriod: metav1.Duration{Duration: time.Second * time.Duration(2)}.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + dnsctl.run(stopCh) + }, + OnStoppedLeading: func() { + klog.Fatalf("leaderelection lost") + }, + }, + WatchDog: electionChecker, + Name: "tunnel-dns-controller", + }) + panic("unreachable") +} + +func (dnsctl *coreDNSRecordController) run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer dnsctl.queue.ShutDown() + + klog.Infof("starting tunnel dns controller") + defer klog.Infof("shutting down tunnel dns controller") + + if !cache.WaitForNamedCacheSync("tunnel-dns-controller", stopCh, + dnsctl.nodeListerSynced, dnsctl.svcInformerSynced, dnsctl.cmInformerSynced) { + return + } + + if err := dnsctl.ensureCoreDNSRecordConfigMap(); err != nil { + klog.Errorf("failed to ensure dns record ConfigMap %v/%v, %v", + yurttunnelDNSRecordConfigMapNs, yurttunnelDNSRecordConfigMapName, err) + return + } + + go wait.Until(dnsctl.worker, time.Second, stopCh) + + // sync dns hosts as a whole + go wait.Until(dnsctl.syncDNSRecordAsWhole, time.Duration(dnsctl.syncPeriod)*time.Second, stopCh) + + // sync tunnel server svc + go wait.Until(func() { + if err := dnsctl.syncTunnelServerServiceAsWhole(); err != nil { + klog.Errorf("failed to sync tunnel server service, %v", err) + } + }, time.Duration(dnsctl.syncPeriod)*time.Second, stopCh) + + <-stopCh +} + +func (dnsctl *coreDNSRecordController) enqueue(obj interface{}, eventType EventType) { + e := &Event{ + Obj: obj, + Type: eventType, + } + dnsctl.queue.Add(e) +} + +func (dnsctl *coreDNSRecordController) worker() { + for dnsctl.processNextWorkItem() { + } +} + +func (dnsctl *coreDNSRecordController) processNextWorkItem() bool { + event, quit := dnsctl.queue.Get() + if quit { + return false + } + defer dnsctl.queue.Done(event) + + err := dnsctl.dispatch(event.(*Event)) + dnsctl.handleErr(err, event) + + return true +} + +func (dnsctl *coreDNSRecordController) dispatch(event *Event) error { + switch event.Type { + case NodeAdd: + return dnsctl.onNodeAdd(event.Obj.(*corev1.Node)) + case NodeUpdate: + return dnsctl.onNodeUpdate(event.Obj.(*corev1.Node)) + case NodeDelete: + return dnsctl.onNodeDelete(event.Obj.(*corev1.Node)) + case ServiceAdd: + return dnsctl.onServiceAdd(event.Obj.(*corev1.Service)) + case ServiceUpdate: + return dnsctl.onServiceUpdate(event.Obj.(*corev1.Service)) + case ServiceDelete: + return dnsctl.onServiceDelete(event.Obj.(*corev1.Service)) + case ConfigMapAdd: + return dnsctl.onConfigMapAdd(event.Obj.(*corev1.ConfigMap)) + case ConfigMapUpdate: + return dnsctl.onConfigMapUpdate(event.Obj.(*corev1.ConfigMap)) + case ConfigMapDelete: + return dnsctl.onConfigMapDelete(event.Obj.(*corev1.ConfigMap)) + default: + return nil + } +} + +func (dnsctl *coreDNSRecordController) handleErr(err error, event interface{}) { + if err == nil { + dnsctl.queue.Forget(event) + return + } + + if dnsctl.queue.NumRequeues(event) < maxRetries { + klog.Infof("error syncing event %v: %v", event, err) + dnsctl.queue.AddRateLimited(event) + return + } + + utilruntime.HandleError(err) + klog.Infof("dropping event %q out of the queue: %v", event, err) + dnsctl.queue.Forget(event) +} + +func (dnsctl *coreDNSRecordController) ensureCoreDNSRecordConfigMap() error { + _, err := dnsctl.kubeClient.CoreV1().ConfigMaps(constants.YurttunnelServerServiceNs). + Get(yurttunnelDNSRecordConfigMapName, metav1.GetOptions{}) + if err != nil && apierrors.IsNotFound(err) { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: yurttunnelDNSRecordConfigMapName, + Namespace: constants.YurttunnelServerServiceNs, + }, + Data: map[string]string{ + yurttunnelDNSRecordNodeDataKey: "", + }, + } + _, err = dnsctl.kubeClient.CoreV1().ConfigMaps(constants.YurttunnelServerServiceNs).Create(cm) + if err != nil { + return fmt.Errorf("failed to create ConfigMap %v/%v, %v", + constants.YurttunnelServerServiceNs, yurttunnelDNSRecordConfigMapName, err) + } + } + return err +} + +func (dnsctl *coreDNSRecordController) syncTunnelServerServiceAsWhole() error { + klog.V(2).Info("sync tunnel server service as whole") + dnatPorts, err := util.GetConfiguredDnatPorts(dnsctl.kubeClient, strconv.Itoa(dnsctl.insecurePort)) + if err != nil { + return err + } + return dnsctl.updateTunnelServerSvcDnatPorts(dnatPorts) +} + +func (dnsctl *coreDNSRecordController) syncDNSRecordAsWhole() { + klog.V(2).Info("sync dns record as whole") + + dnsctl.lock.Lock() + defer dnsctl.lock.Unlock() + + ip, err := dnsctl.getTunnelServerIP(false) + if err != nil { + klog.Errorf("failed to sync dns record as whole, %v", err) + return + } + + nodes, err := dnsctl.nodeLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to sync dns record as whole, %v", err) + return + } + + records := make([]string, 0, len(nodes)) + for i := range nodes { + node := nodes[i] + if !isEdgeNode(node) { + ip, err = getNodeHostIP(node) + if err != nil { + klog.Errorf("failed to parse node address for %v, %v", node.Name, err) + continue + } + } + records = append(records, formatDNSRecord(ip, node.Name)) + } + + if err := dnsctl.updateDNSRecords(records); err != nil { + klog.Errorf("failed to sync dns record as whole, %v", err) + } +} + +func (dnsctl *coreDNSRecordController) getTunnelServerIP(useCache bool) (string, error) { + if useCache && len(dnsctl.tunnelServerIP) != 0 { + return dnsctl.tunnelServerIP, nil + } + + svc, err := dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs). + Get(constants.YurttunnelServerServiceName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("failed to get %v/%v service, %v", + constants.YurttunnelServerServiceNs, constants.YurttunnelServerServiceName, err) + } + if len(svc.Spec.ClusterIP) == 0 { + return "", fmt.Errorf("unable find ClusterIP from %s/%s service, %v", + constants.YurttunnelServerServiceNs, constants.YurttunnelServerServiceName, err) + } + + // cache result + dnsctl.tunnelServerIP = svc.Spec.ClusterIP + + return dnsctl.tunnelServerIP, nil +} + +func (dnsctl *coreDNSRecordController) updateDNSRecords(records []string) error { + // keep sorted + sort.Strings(records) + + cm, err := dnsctl.kubeClient.CoreV1().ConfigMaps(constants.YurttunnelServerServiceNs). + Get(yurttunnelDNSRecordConfigMapName, metav1.GetOptions{}) + if err != nil { + return err + } + cm.Data[yurttunnelDNSRecordNodeDataKey] = strings.Join(records, "\n") + if _, err := dnsctl.kubeClient.CoreV1().ConfigMaps(constants.YurttunnelServerServiceNs).Update(cm); err != nil { + return fmt.Errorf("failed to update configmap %v/%v, %v", + constants.YurttunnelServerServiceNs, yurttunnelDNSRecordConfigMapName, err) + } + return nil +} + +func (dnsctl *coreDNSRecordController) updateTunnelServerSvcDnatPorts(ports []string) error { + svc, err := dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs). + Get(constants.YurttunnelServerServiceName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to sync tunnel server service, %v", err) + } + + changed := false + + svcPortMap := make(map[int32]corev1.ServicePort) + for i := range svc.Spec.Ports { + port := svc.Spec.Ports[i] + svcPortMap[port.Port] = port + } + + dnatPortMap := make(map[int]bool) + for _, dnatPort := range ports { + portInt, err := strconv.Atoi(dnatPort) + if err != nil { + klog.Errorf("failed to parse dnat port %q, %v", dnatPort, err) + continue + } + dnatPortMap[portInt] = true + + if p, ok := svcPortMap[int32(portInt)]; !ok || p.Protocol != corev1.ProtocolTCP { + port := corev1.ServicePort{ + Name: fmt.Sprintf("%v%v", dnatPortPrefix, dnatPort), + Port: int32(portInt), + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(dnsctl.insecurePort), + } + svc.Spec.Ports = append(svc.Spec.Ports, port) + changed = true + } + } + + updatedSvcPorts := make([]corev1.ServicePort, 0, len(svc.Spec.Ports)) + for i := range svc.Spec.Ports { + port := svc.Spec.Ports[i] + if strings.HasPrefix(port.Name, dnatPortPrefix) && !dnatPortMap[int(port.Port)] { + changed = true + continue + } + updatedSvcPorts = append(updatedSvcPorts, port) + } + + if !changed { + return nil + } + + svc.Spec.Ports = updatedSvcPorts + _, err = dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs).Update(svc) + if err != nil { + return fmt.Errorf("failed to sync tunnel server service, %v", err) + } + return nil +} diff --git a/pkg/yurttunnel/dns/event.go b/pkg/yurttunnel/dns/event.go new file mode 100644 index 00000000000..539fb4f3241 --- /dev/null +++ b/pkg/yurttunnel/dns/event.go @@ -0,0 +1,36 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +type EventType string + +const ( + NodeAdd EventType = "NODE_ADD" + NodeUpdate EventType = "NODE_UPDATE" + NodeDelete EventType = "NODE_DELETE" + ServiceAdd EventType = "SERVICE_ADD" + ServiceUpdate EventType = "SERVICE_UPDATE" + ServiceDelete EventType = "SERVICE_DELETE" + ConfigMapAdd EventType = "CONFIGMAP_ADD" + ConfigMapUpdate EventType = "CONFIGMAP_UPDATE" + ConfigMapDelete EventType = "CONFIGMAP_DELETE" +) + +type Event struct { + Obj interface{} + Type EventType +} diff --git a/pkg/yurttunnel/dns/handler.go b/pkg/yurttunnel/dns/handler.go new file mode 100644 index 00000000000..b050905df3a --- /dev/null +++ b/pkg/yurttunnel/dns/handler.go @@ -0,0 +1,246 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +import ( + "fmt" + "reflect" + "strings" + + "github.com/openyurtio/openyurt/pkg/yurttunnel/constants" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +func (dnsctl *coreDNSRecordController) addNode(obj interface{}) { + node, ok := obj.(*corev1.Node) + if !ok { + return + } + if node.DeletionTimestamp != nil { + dnsctl.deleteNode(node) + return + } + klog.V(2).Infof("enqueue node add event for %v", node.Name) + dnsctl.enqueue(node, NodeAdd) +} + +func (dnsctl *coreDNSRecordController) updateNode(oldObj, newObj interface{}) { + oldNode, ok := oldObj.(*corev1.Node) + if !ok { + return + } + newNode, ok := newObj.(*corev1.Node) + if !ok { + return + } + + oldIsEdgeNode, newIsEdgeNode := isEdgeNode(oldNode), isEdgeNode(newNode) + if oldIsEdgeNode == newIsEdgeNode { + return + } + + klog.V(2).Infof("enqueue node update event for %v, will update dns record", newNode.Name) + dnsctl.enqueue(newNode, NodeUpdate) +} + +func (dnsctl *coreDNSRecordController) deleteNode(obj interface{}) { + node, ok := obj.(*corev1.Node) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("can not get object from tombstone %#v", obj)) + return + } + node, ok = tombstone.Obj.(*corev1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object is not a node %#v", obj)) + return + } + } + klog.V(2).Infof("enqueue node delete event for %v", node.Name) + dnsctl.enqueue(node, NodeDelete) +} + +func (dnsctl *coreDNSRecordController) addConfigMap(obj interface{}) { + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + if cm.DeletionTimestamp != nil { + dnsctl.deleteConfigMap(cm) + return + } + klog.V(2).Infof("enqueue configmap add event for %v/%v", cm.Namespace, cm.Name) + dnsctl.enqueue(cm, ConfigMapAdd) +} + +func (dnsctl *coreDNSRecordController) updateConfigMap(oldObj, newObj interface{}) { + oldConfigMap, ok := oldObj.(*corev1.ConfigMap) + if !ok { + return + } + newConfigMap, ok := newObj.(*corev1.ConfigMap) + if !ok { + return + } + + if reflect.DeepEqual(oldConfigMap.Data, newConfigMap.Data) { + return + } + + klog.V(2).Infof("enqueue configmap update event for %v/%v, will sync tunnel server svc", newConfigMap.Namespace, newConfigMap.Name) + dnsctl.enqueue(newConfigMap, ConfigMapUpdate) +} + +func (dnsctl *coreDNSRecordController) deleteConfigMap(obj interface{}) { + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("can not get object from tombstone %#v", obj)) + return + } + cm, ok = tombstone.Obj.(*corev1.ConfigMap) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object is not a node %#v", obj)) + return + } + } + klog.V(2).Infof("enqueue configmap delete event for %v/%v", cm.Namespace, cm.Name) + dnsctl.enqueue(cm, ConfigMapDelete) +} + +func (dnsctl *coreDNSRecordController) addService(obj interface{}) { + svc, ok := obj.(*corev1.Service) + if !ok { + return + } + if svc.Namespace != constants.YurttunnelServerServiceNs || svc.Name != constants.YurttunnelServerServiceName { + return + } + klog.V(2).Infof("enqueue service add event for %v/%v", svc.Namespace, svc.Name) + dnsctl.enqueue(svc, ServiceAdd) +} + +func (dnsctl *coreDNSRecordController) updateService(oldObj, newObj interface{}) { + // do nothing +} + +func (dnsctl *coreDNSRecordController) deleteService(obj interface{}) { + // do nothing +} + +func (dnsctl *coreDNSRecordController) onConfigMapAdd(cm *corev1.ConfigMap) error { + return dnsctl.syncTunnelServerServiceAsWhole() +} + +func (dnsctl *coreDNSRecordController) onConfigMapUpdate(cm *corev1.ConfigMap) error { + return dnsctl.syncTunnelServerServiceAsWhole() +} + +func (dnsctl *coreDNSRecordController) onConfigMapDelete(cm *corev1.ConfigMap) error { + return dnsctl.syncTunnelServerServiceAsWhole() +} + +func (dnsctl *coreDNSRecordController) onNodeAdd(node *corev1.Node) error { + klog.V(2).Infof("adding node dns record for %v", node.Name) + return dnsctl.addOrUpdateNode(node) +} + +func (dnsctl *coreDNSRecordController) onNodeUpdate(node *corev1.Node) error { + klog.V(2).Infof("updating node dns record for %v", node.Name) + return dnsctl.addOrUpdateNode(node) +} + +func (dnsctl *coreDNSRecordController) addOrUpdateNode(node *corev1.Node) error { + ip, err := getNodeHostIP(node) + if err != nil { + return err + } + if isEdgeNode(node) { + ip, err = dnsctl.getTunnelServerIP(true) + if err != nil { + return err + } + } + + records, err := dnsctl.getCurrentDNSRecords() + if err != nil { + return err + } + + updatedRecords, changed, err := addOrUpdateRecord(records, formatDNSRecord(ip, node.Name)) + if err != nil { + return err + } + if !changed { + return nil + } + + return dnsctl.updateDNSRecords(updatedRecords) +} + +func (dnsctl *coreDNSRecordController) onNodeDelete(node *corev1.Node) error { + klog.V(2).Infof("deleting node dns record for %v", node.Name) + + dnsctl.lock.Lock() + defer dnsctl.lock.Unlock() + + records, err := dnsctl.getCurrentDNSRecords() + if err != nil { + return err + } + mergedRecords, changed := removeRecordByHostname(records, node.Name) + if !changed { + return nil + } + + return dnsctl.updateDNSRecords(mergedRecords) +} + +func (dnsctl *coreDNSRecordController) getCurrentDNSRecords() ([]string, error) { + cm, err := dnsctl.kubeClient.CoreV1().ConfigMaps(constants.YurttunnelServerServiceNs). + Get(yurttunnelDNSRecordConfigMapName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + data, ok := cm.Data[yurttunnelDNSRecordNodeDataKey] + if !ok { + return nil, fmt.Errorf("key %q not found in %s/%s ConfigMap, %v", + yurttunnelDNSRecordNodeDataKey, constants.YurttunnelServerServiceNs, yurttunnelDNSRecordConfigMapName, err) + } + + return strings.Split(data, "\n"), nil +} + +func (dnsctl *coreDNSRecordController) onServiceAdd(svc *corev1.Service) error { + dnsctl.syncDNSRecordAsWhole() + return nil +} + +func (dnsctl *coreDNSRecordController) onServiceUpdate(svc *corev1.Service) error { + return nil +} + +func (dnsctl *coreDNSRecordController) onServiceDelete(svc *corev1.Service) error { + return nil +} diff --git a/pkg/yurttunnel/dns/util.go b/pkg/yurttunnel/dns/util.go new file mode 100644 index 00000000000..ea135b2369e --- /dev/null +++ b/pkg/yurttunnel/dns/util.go @@ -0,0 +1,116 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dns + +import ( + "fmt" + "net" + "strings" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + + "github.com/openyurtio/openyurt/pkg/projectinfo" +) + +func isEdgeNode(node *corev1.Node) bool { + isEdgeNode, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + if ok && isEdgeNode == "true" { + return true + } + return false +} + +func formatDNSRecord(ip, host string) string { + return fmt.Sprintf("%s\t%s", ip, host) +} + +// getNodeHostIP returns the provided node's "primary" IP +func getNodeHostIP(node *v1.Node) (string, error) { + // re-sort the addresses with InternalIPs first and then ExternalIPs + allIPs := make([]net.IP, 0, len(node.Status.Addresses)) + for _, addr := range node.Status.Addresses { + if addr.Type == v1.NodeInternalIP { + ip := net.ParseIP(addr.Address) + if ip != nil { + allIPs = append(allIPs, ip) + break + } + } + } + for _, addr := range node.Status.Addresses { + if addr.Type == v1.NodeExternalIP { + ip := net.ParseIP(addr.Address) + if ip != nil { + allIPs = append(allIPs, ip) + break + } + } + } + if len(allIPs) == 0 { + return "", fmt.Errorf("host IP unknown; known addresses: %v", node.Status.Addresses) + } + + return allIPs[0].String(), nil +} + +func removeRecordByHostname(records []string, hostname string) (result []string, changed bool) { + result = make([]string, 0, len(records)) + for _, v := range result { + if !strings.HasSuffix(v, hostname) { + result = append(result, v) + } + } + return result, len(records) == len(result) +} + +func parseHostnameFromDNSRecord(record string) (string, error) { + arr := strings.Split(record, "\t") + if len(arr) != 2 { + return "", fmt.Errorf("failed to parse hostname, invalid dns record %q", record) + } + return arr[1], nil +} + +func addOrUpdateRecord(records []string, record string) (result []string, changed bool, err error) { + hostname, err := parseHostnameFromDNSRecord(record) + if err != nil { + return nil, false, err + } + + result = make([]string, len(records)) + copy(result, records) + + found := false + for i, v := range result { + if strings.HasSuffix(v, hostname) { + found = true + if v != record { + result[i] = record + changed = true + break + } + } + } + + if !found { + result = append(result, record) + changed = true + } + + return result, changed, nil +} diff --git a/pkg/yurttunnel/iptables/iptables.go b/pkg/yurttunnel/iptables/iptables.go index dc5c139aec1..d14c33098b7 100644 --- a/pkg/yurttunnel/iptables/iptables.go +++ b/pkg/yurttunnel/iptables/iptables.go @@ -23,8 +23,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" coreinformer "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -37,6 +35,7 @@ import ( "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurttunnel/server/metrics" + "github.com/openyurtio/openyurt/pkg/yurttunnel/util" ) const ( @@ -48,15 +47,9 @@ const ( yurttunnelServerPortChain = "TUNNEL-PORT" yurttunnelPortChainPrefix = "TUNNEL-PORT-" defaultSyncPeriod = 15 - - // constants related dnat rules configmap - yurttunnelServerDnatConfigMapNs = "kube-system" - yurttunnelServerDnatDataKey = "dnat-ports-pair" ) var ( - yurttunnelServerDnatConfigMapName = fmt.Sprintf("%s-tunnel-server-cfg", - strings.TrimRightFunc(projectinfo.GetProjectPrefix(), func(c rune) bool { return c == '-' })) tunnelCommentStr = strings.ReplaceAll(projectinfo.GetTunnelName(), "-", " ") iptablesJumpChains = []iptablesJumpChain{ { @@ -201,46 +194,6 @@ func (im *iptablesManager) deleteJumpChains(jumpChains []iptablesJumpChain) erro return nil } -func (im *iptablesManager) getConfiguredDnatPorts() []string { - ports := make([]string, 0) - c, err := im.kubeClient.CoreV1(). - ConfigMaps(yurttunnelServerDnatConfigMapNs). - Get(yurttunnelServerDnatConfigMapName, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - klog.V(4).Infof("configmap %s/%s is not found", - yurttunnelServerDnatConfigMapNs, - yurttunnelServerDnatConfigMapName) - } else { - klog.Errorf("fail to get configmap %s/%s: %v", - yurttunnelServerDnatConfigMapNs, - yurttunnelServerDnatConfigMapName, err) - } - return ports - } - - pairStr, ok := c.Data[yurttunnelServerDnatDataKey] - if !ok || len(pairStr) == 0 { - return ports - } - - portsPair := strings.Split(pairStr, ",") - for _, pair := range portsPair { - portPair := strings.Split(pair, "=") - // we only allowed user to add dnat rule that uses insecure port as the - // destination port. - if len(portPair) == 2 && - portPair[1] == im.insecurePort && - len(portPair[0]) != 0 { - if portPair[0] != "10250" && portPair[0] != "10255" { - ports = append(ports, portPair[0]) - } - } - } - - return ports -} - // getIPOfNodesWithoutAgent returns the ip addresses of all nodes that // are not running yurttunnel-agent func (im *iptablesManager) getIPOfNodesWithoutAgent() []string { @@ -508,7 +461,11 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string { // while the request to access the cloud node is returned func (im *iptablesManager) syncIptableSetting() { // check if there are new dnat ports - dnatPorts := im.getConfiguredDnatPorts() + dnatPorts, err := util.GetConfiguredDnatPorts(im.kubeClient, im.insecurePort) + if err != nil { + klog.Errorf("failed to sync iptables rules, %v", err) + return + } portsChanged, deletedDnatPorts := im.getDeletedPorts(dnatPorts) currentDnatPorts := append(dnatPorts, kubeletSecurePort, kubeletInsecurePort) @@ -518,7 +475,7 @@ func (im *iptablesManager) syncIptableSetting() { currentNodesIP := append(nodesIP, loopbackAddr) // update the iptable setting if necessary - err := im.ensurePortsIptables(currentDnatPorts, deletedDnatPorts, currentNodesIP, deletedNodesIP) + err = im.ensurePortsIptables(currentDnatPorts, deletedDnatPorts, currentNodesIP, deletedNodesIP) if err != nil { klog.Errorf("failed to ensurePortsIptables: %v", err) return diff --git a/pkg/yurttunnel/util/util.go b/pkg/yurttunnel/util/util.go index 569971617b5..c3b00099fb4 100644 --- a/pkg/yurttunnel/util/util.go +++ b/pkg/yurttunnel/util/util.go @@ -1,13 +1,30 @@ package util import ( + "fmt" "net/http" + "strings" - "github.com/openyurtio/openyurt/pkg/profile" + "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "github.com/gorilla/mux" + "github.com/openyurtio/openyurt/pkg/profile" + "github.com/openyurtio/openyurt/pkg/projectinfo" +) + +const ( + // constants related dnat rules configmap + YurttunnelServerDnatConfigMapNs = "kube-system" + yurttunnelServerDnatDataKey = "dnat-ports-pair" +) + +var ( + YurttunnelServerDnatConfigMapName = fmt.Sprintf("%s-tunnel-server-cfg", + strings.TrimRightFunc(projectinfo.GetProjectPrefix(), func(c rune) bool { return c == '-' })) ) // RunMetaServer start a http server for serving metrics and pprof requests. @@ -33,3 +50,42 @@ func RunMetaServer(addr string) { klog.InfoS("meta server stopped listening", "server endpoint", addr) }() } + +// GetConfiguredDnatPorts returns the DNAT ports configured for tunnel server. +// NOTE: We only allow user to add dnat rule that uses insecure port as the destination port currently. +func GetConfiguredDnatPorts(client clientset.Interface, insecurePort string) ([]string, error) { + ports := make([]string, 0) + c, err := client.CoreV1(). + ConfigMaps(YurttunnelServerDnatConfigMapNs). + Get(YurttunnelServerDnatConfigMapName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, fmt.Errorf("configmap %s/%s is not found", + YurttunnelServerDnatConfigMapNs, + YurttunnelServerDnatConfigMapName) + } else { + return nil, fmt.Errorf("fail to get configmap %s/%s: %v", + YurttunnelServerDnatConfigMapNs, + YurttunnelServerDnatConfigMapName, err) + } + } + + pairStr, ok := c.Data[yurttunnelServerDnatDataKey] + if !ok || len(pairStr) == 0 { + return ports, nil + } + + portsPair := strings.Split(pairStr, ",") + for _, pair := range portsPair { + portPair := strings.Split(pair, "=") + if len(portPair) == 2 && + portPair[1] == insecurePort && + len(portPair[0]) != 0 { + if portPair[0] != "10250" && portPair[0] != "10255" { + ports = append(ports, portPair[0]) + } + } + } + + return ports, nil +}