From 6d53b3ae738367ba0a947040c8b97f6f67a3c57f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8F=A9=E8=BD=A9?= Date: Mon, 6 May 2024 11:48:21 +0800 Subject: [PATCH] enhance code quality --- cmd/agent/app/options/options.go | 8 +- cmd/agent/app/start.go | 9 +- pkg/engine/engine.go | 141 ++++-- pkg/engine/proxy.go | 159 ++----- pkg/engine/tunnel.go | 423 ++++++++++++++---- pkg/engine/utils.go | 30 ++ .../routedriver/driver.go | 0 .../routedriver/vxlan/utils.go | 2 +- .../routedriver/vxlan/vxlan.go | 8 +- .../routedriver/vxlan/vxlan_test.go | 4 +- pkg/tunnelengine/tunnelagent.go | 352 --------------- .../util/ipset/ipset.go | 0 .../util/iptables/constants.go | 0 .../util/iptables/iptables.go | 0 .../util/netlink/netlink.go | 0 .../util/utils.go | 4 +- .../util/utils_test.go | 0 .../vpndriver/driver.go | 2 +- .../vpndriver/driver_test.go | 0 .../vpndriver/libreswan/libreswan.go | 6 +- .../vpndriver/libreswan/libreswan_test.go | 6 +- .../vpndriver/wireguard/wireguard.go | 6 +- pkg/utils/utils.go | 28 +- 23 files changed, 531 insertions(+), 657 deletions(-) rename pkg/{networkengine => tunnelengine}/routedriver/driver.go (100%) rename pkg/{networkengine => tunnelengine}/routedriver/vxlan/utils.go (98%) rename pkg/{networkengine => tunnelengine}/routedriver/vxlan/vxlan.go (98%) rename pkg/{networkengine => tunnelengine}/routedriver/vxlan/vxlan_test.go (98%) delete mode 100644 pkg/tunnelengine/tunnelagent.go rename pkg/{networkengine => tunnelengine}/util/ipset/ipset.go (100%) rename pkg/{networkengine => tunnelengine}/util/iptables/constants.go (100%) rename pkg/{networkengine => tunnelengine}/util/iptables/iptables.go (100%) rename pkg/{networkengine => tunnelengine}/util/netlink/netlink.go (100%) rename pkg/{networkengine => tunnelengine}/util/utils.go (98%) rename pkg/{networkengine => tunnelengine}/util/utils_test.go (100%) rename pkg/{networkengine => tunnelengine}/vpndriver/driver.go (98%) rename pkg/{networkengine => tunnelengine}/vpndriver/driver_test.go (100%) rename pkg/{networkengine => tunnelengine}/vpndriver/libreswan/libreswan.go (98%) rename pkg/{networkengine => tunnelengine}/vpndriver/libreswan/libreswan_test.go (98%) rename pkg/{networkengine => tunnelengine}/vpndriver/wireguard/wireguard.go (98%) diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index a3c1f78..33179c2 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -26,10 +26,10 @@ import ( "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" - "github.com/openyurtio/raven/pkg/networkengine/routedriver/vxlan" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver/libreswan" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver/wireguard" + "github.com/openyurtio/raven/pkg/tunnelengine/routedriver/vxlan" + "github.com/openyurtio/raven/pkg/tunnelengine/vpndriver" + "github.com/openyurtio/raven/pkg/tunnelengine/vpndriver/libreswan" + "github.com/openyurtio/raven/pkg/tunnelengine/vpndriver/wireguard" "github.com/openyurtio/raven/pkg/utils" ) diff --git a/cmd/agent/app/start.go b/cmd/agent/app/start.go index c14bb85..5f24c81 100644 --- a/cmd/agent/app/start.go +++ b/cmd/agent/app/start.go @@ -19,7 +19,6 @@ package app import ( "context" "fmt" - "github.com/lorenzosaino/go-sysctl" "k8s.io/klog/v2" @@ -58,15 +57,17 @@ func NewRavenAgentCommand(ctx context.Context) *cobra.Command { // Run starts the raven-agent func Run(ctx context.Context, cfg *config.CompletedConfig) error { - klog.Info("Start raven agent") - defer klog.Info("Stop raven agent") if err := disableICMPRedirect(); err != nil { return err } if err := disableICMPRpFilter(); err != nil { return err } - engine := ravenengine.NewEngine(ctx, cfg.Config) + engine, err := ravenengine.NewEngine(ctx, cfg.Config) + if err != nil { + return err + } + klog.Info("engine successfully start") engine.Start() return nil } diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 4e7bd07..c06a8a9 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -28,23 +28,20 @@ type Engine struct { manager manager.Manager client client.Client option *Option + queue workqueue.RateLimitingInterface - tunnelQueue workqueue.RateLimitingInterface - tunnelEngine *TunnelEngine - - proxyQueue workqueue.RateLimitingInterface - proxyEngine *ProxyEngine + tunnel *TunnelEngine + proxy *ProxyEngine } -func NewEngine(ctx context.Context, cfg *config.Config) *Engine { +func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) { engine := &Engine{ - nodeName: cfg.NodeName, - nodeIP: cfg.NodeIP, - manager: cfg.Manager, - context: ctx, - option: NewEngineOption(), - tunnelQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tunnel"), - proxyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Proxy"), + nodeName: cfg.NodeName, + nodeIP: cfg.NodeIP, + manager: cfg.Manager, + context: ctx, + option: NewEngineOption(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"), } err := ctrl.NewControllerManagedBy(engine.manager). For(&v1beta1.Gateway{}, builder.WithPredicates(predicate.Funcs{ @@ -56,57 +53,129 @@ func NewEngine(ctx context.Context, cfg *config.Config) *Engine { return reconcile.Result{}, nil })) if err != nil { - klog.ErrorS(err, utils.FormatRavenEngine("failed to new raven agent controller with manager")) + klog.Errorf(utils.FormatRavenEngine("fail to new controller with manager, error %s", err.Error())) + return engine, err } engine.client = engine.manager.GetClient() - engine.tunnelEngine = newTunnelEngine(cfg, engine.client, engine.option, engine.tunnelQueue) - engine.proxyEngine = newProxyEngine(engine.context, cfg, engine.client, engine.option, engine.proxyQueue) - return engine + engine.tunnel = &TunnelEngine{ + nodeName: engine.nodeName, + forwardNodeIP: cfg.Tunnel.ForwardNodeIP, + natTraversal: cfg.Tunnel.NATTraversal, + config: cfg, + ravenClient: engine.client, + } + err = engine.tunnel.InitDriver() + if err != nil { + klog.Errorf(utils.FormatRavenEngine("fail to init tunnel driver, error %s", err.Error())) + return engine, err + } + + engine.proxy = &ProxyEngine{ + nodeName: engine.nodeName, + nodeIP: engine.nodeIP, + config: cfg, + client: engine.client, + option: engine.option, + ctx: engine.context, + proxyOption: newProxyOption(), + proxyCtx: newProxyContext(ctx), + } + return engine, nil } func (e *Engine) Start() { defer utilruntime.HandleCrash() - klog.Info(utils.FormatRavenEngine("engine successfully start")) go func() { if err := e.manager.Start(e.context); err != nil { - klog.ErrorS(err, utils.FormatRavenEngine("failed to start engine controller")) + klog.ErrorS(err, "failed to start engine controller") } }() - go wait.Until(e.tunnelEngine.worker, time.Second, e.context.Done()) - go wait.Until(e.proxyEngine.worker, time.Second, e.context.Done()) + go wait.Until(e.worker, time.Second, e.context.Done()) <-e.context.Done() e.cleanup() - klog.Info(utils.FormatRavenEngine("engine successfully stop")) +} + +func (e *Engine) worker() { + for e.processNextWorkItem() { + } +} + +func (e *Engine) processNextWorkItem() bool { + obj, quit := e.queue.Get() + if quit { + return false + } + gw, ok := obj.(*v1beta1.Gateway) + if !ok { + return false + } + defer e.queue.Done(gw) + e.findLocalGateway() + err := e.tunnel.Handler() + if err != nil { + e.handleEventErr(err, gw) + } + e.option.SetTunnelStatus(e.tunnel.Status()) + + err = e.proxy.Handler() + if err != nil { + e.handleEventErr(err, gw) + } + + return true +} + +func (e *Engine) findLocalGateway() { + e.tunnel.localGateway = nil + e.proxy.localGateway = nil + var gwList v1beta1.GatewayList + err := e.client.List(context.TODO(), &gwList) + if err != nil { + return + } + for _, gw := range gwList.Items { + for _, node := range gw.Status.Nodes { + if node.NodeName == e.nodeName { + e.tunnel.localGateway = gw.DeepCopy() + e.proxy.localGateway = gw.DeepCopy() + return + } + } + } } func (e *Engine) cleanup() { if e.option.GetTunnelStatus() { - err := e.tunnelEngine.clearDriver() + err := e.tunnel.CleanupDriver() if err != nil { klog.Errorf(utils.FormatRavenEngine("failed to cleanup tunnel driver, error %s", err.Error())) } } if e.option.GetProxyStatus() { - e.proxyEngine.stopServers() + e.proxy.stop() } } -func (e *Engine) enqueueTunnel(obj *v1beta1.Gateway) { - klog.Info(utils.FormatRavenEngine("enqueue gateway %s to tunnel queue", obj.Name)) - e.tunnelQueue.Add(obj) -} +func (e *Engine) handleEventErr(err error, gw *v1beta1.Gateway) { + if err == nil { + e.queue.Forget(gw) + return + } -func (e *Engine) enqueueProxy(obj *v1beta1.Gateway) { - klog.Info(utils.FormatRavenEngine("enqueue gateway %s to proxy queue", obj.Name)) - e.proxyQueue.Add(obj) + if e.queue.NumRequeues(gw) < utils.MaxRetries { + klog.Info(utils.FormatRavenEngine("error syncing event %s: %s", gw.GetName(), err.Error())) + e.queue.AddRateLimited(gw) + return + } + klog.Info(utils.FormatRavenEngine("dropping event %s out of the queue: %s", gw.GetName(), err.Error())) + e.queue.Forget(gw) } func (e *Engine) addGateway(evt event.CreateEvent) bool { gw, ok := evt.Object.(*v1beta1.Gateway) if ok { klog.InfoS(utils.FormatRavenEngine("adding gateway %s", gw.GetName())) - e.enqueueTunnel(gw.DeepCopy()) - e.enqueueProxy(gw.DeepCopy()) + e.queue.Add(gw.DeepCopy()) } return ok } @@ -119,8 +188,7 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool { if oldGw.ResourceVersion != newGw.ResourceVersion { update = true klog.InfoS(utils.FormatRavenEngine("updating gateway, %s", newGw.GetName())) - e.enqueueTunnel(newGw.DeepCopy()) - e.enqueueProxy(newGw.DeepCopy()) + e.queue.Add(newGw.DeepCopy()) } else { klog.InfoS(utils.FormatRavenEngine("skip handle update gateway"), klog.KObj(newGw)) } @@ -132,8 +200,7 @@ func (e *Engine) deleteGateway(evt event.DeleteEvent) bool { gw, ok := evt.Object.(*v1beta1.Gateway) if ok { klog.InfoS(utils.FormatRavenEngine("deleting gateway, %s", gw.GetName())) - e.enqueueTunnel(gw.DeepCopy()) - e.enqueueProxy(gw.DeepCopy()) + e.queue.Add(gw.DeepCopy()) } return ok } diff --git a/pkg/engine/proxy.go b/pkg/engine/proxy.go index 932daf5..87058b3 100644 --- a/pkg/engine/proxy.go +++ b/pkg/engine/proxy.go @@ -2,8 +2,6 @@ package engine import ( "context" - "crypto/sha256" - "encoding/hex" "fmt" "net" "sort" @@ -11,7 +9,6 @@ import ( "strings" "time" - "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,7 +29,7 @@ const ( SkipType ActionType = "Skip" ) -func JudgeType(curr, spec bool) ActionType { +func JudgeAction(curr, spec bool) ActionType { if curr && spec { return RestartType } @@ -50,70 +47,34 @@ type ProxyEngine struct { nodeIP string serverLocalEndpoints []string clientRemoteEndpoints []string - gateway *v1beta1.Gateway + localGateway *v1beta1.Gateway config *config.Config client client.Client - - ctx context.Context - option *Option - proxyCtx ProxyContext - proxyOption *proxyOption - queue workqueue.RateLimitingInterface -} - -func newProxyEngine(ctx context.Context, cfg *config.Config, client client.Client, opt *Option, queue workqueue.RateLimitingInterface) *ProxyEngine { - return &ProxyEngine{ - nodeName: cfg.NodeName, - nodeIP: cfg.NodeIP, - config: cfg, - client: client, - option: opt, - ctx: ctx, - proxyCtx: newProxyContext(ctx), - proxyOption: newProxyOption(), - queue: queue, - } -} - -func (p *ProxyEngine) worker() { - for p.processNextWorkItem() { - } + ctx context.Context + option *Option + proxyCtx ProxyContext + proxyOption *proxyOption } -func (p *ProxyEngine) processNextWorkItem() bool { - obj, quit := p.queue.Get() - if quit { - return false +func (p *ProxyEngine) Status() bool { + aep := getActiveEndpoints(p.localGateway, v1beta1.Proxy) + if aep == nil { + aep = getActiveEndpoints(findCentreGateway(p.client), v1beta1.Proxy) } - gw, ok := obj.(*v1beta1.Gateway) - if !ok { - return false + if aep != nil && aep.Config != nil { + enable, err := strconv.ParseBool(aep.Config[utils.RavenEnableProxy]) + if err == nil { + return enable + } } - defer p.queue.Done(gw) - - err := p.handler(gw) - p.handleEventErr(err, gw) - return true + return false } -func (p *ProxyEngine) handler(gw *v1beta1.Gateway) error { +func (p *ProxyEngine) Handler() error { var err error - p.gateway, err = utils.GetOwnGateway(p.client, p.nodeName) - if err != nil { - klog.Errorf(utils.FormatProxyServer("failed get gateway for %s, can not start proxy server", p.nodeName)) - return err - } - proxyStatus := p.option.GetProxyStatus() - if p.gateway != nil && gw.GetName() == p.gateway.GetName() { - proxyStatus = enableProxy(gw) - } else { - if gw.Spec.ExposeType != "" { - proxyStatus = enableProxy(gw) - } - } - p.option.SetProxyStatus(proxyStatus) - specServer, specClient := p.getRole(proxyStatus) - switch JudgeType(p.proxyOption.GetServerStatus(), specServer) { + p.option.SetProxyStatus(p.Status()) + specServer, specClient := p.getRole(p.option.GetProxyStatus()) + switch JudgeAction(p.proxyOption.GetServerStatus(), specServer) { case StartType: srcAddr := getSrcAddressForProxyServer(p.client, p.nodeName) err = p.startProxyServer() @@ -127,9 +88,9 @@ func (p *ProxyEngine) handler(gw *v1beta1.Gateway) error { p.serverLocalEndpoints = []string{} case RestartType: srcAddr := getSrcAddressForProxyServer(p.client, p.nodeName) - if computeHash(strings.Join(p.serverLocalEndpoints, ",")) != computeHash(strings.Join(srcAddr, ",")) { + if strings.Join(p.serverLocalEndpoints, ",") != strings.Join(srcAddr, ",") { p.stopProxyServer() - time.Sleep(time.Second) + time.Sleep(2 * time.Second) err = p.startProxyServer() if err != nil { klog.Errorf(utils.FormatProxyServer("failed to start proxy server, error %s", err.Error())) @@ -141,7 +102,7 @@ func (p *ProxyEngine) handler(gw *v1beta1.Gateway) error { } - switch JudgeType(p.proxyOption.GetClientStatus(), specClient) { + switch JudgeAction(p.proxyOption.GetClientStatus(), specClient) { case StartType: err = p.startProxyClient() if err != nil { @@ -151,14 +112,14 @@ func (p *ProxyEngine) handler(gw *v1beta1.Gateway) error { case StopType: p.stopProxyClient() case RestartType: - dstAddr := getDestAddressForProxyClient(p.client, p.gateway) + dstAddr := getDestAddressForProxyClient(p.client, p.localGateway) if len(dstAddr) < 1 { klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it")) return nil } - if computeHash(strings.Join(p.clientRemoteEndpoints, ",")) != computeHash(strings.Join(dstAddr, ",")) { + if strings.Join(p.clientRemoteEndpoints, ",") != strings.Join(dstAddr, ",") { p.stopProxyClient() - time.Sleep(time.Second) + time.Sleep(2 * time.Second) err = p.startProxyClient() if err != nil { klog.Errorf(utils.FormatProxyServer("failed to start proxy client, error %s", err.Error())) @@ -173,13 +134,13 @@ func (p *ProxyEngine) handler(gw *v1beta1.Gateway) error { func (p *ProxyEngine) startProxyServer() error { klog.Infoln(utils.FormatProxyServer("start raven l7 proxy server")) - if p.gateway == nil { + if p.localGateway == nil { return fmt.Errorf("unknown gateway for node %s, can not start proxy server", p.nodeName) } pe := &proxyengine.EnginConfig{ Name: p.nodeName, IP: p.nodeIP, - GatewayName: p.gateway.Name, + GatewayName: p.localGateway.Name, CertDir: p.config.Proxy.ProxyServerCertDir, MetaAddress: p.config.Proxy.ProxyMetricsAddress, CertIPs: p.config.Proxy.ProxyServerCertIPs, @@ -190,7 +151,7 @@ func (p *ProxyEngine) startProxyServer() error { ExposedAddress: p.config.Proxy.ExternalAddress, } ctx := p.proxyCtx.GetServerContext() - ps, err := proxyserver.NewProxyServer(pe, p.client, p.config.Manager.GetConfig(), p.gateway.DeepCopy()) + ps, err := proxyserver.NewProxyServer(pe, p.client, p.config.Manager.GetConfig(), p.localGateway.DeepCopy()) if err != nil { return fmt.Errorf("failed to new proxy server, error %s", err.Error()) } @@ -213,7 +174,7 @@ func (p *ProxyEngine) stopProxyServer() { func (p *ProxyEngine) startProxyClient() error { klog.Infoln(utils.FormatProxyClient("start raven l7 proxy client")) var err error - dstAddr := getDestAddressForProxyClient(p.client, p.gateway) + dstAddr := getDestAddressForProxyClient(p.client, p.localGateway) if len(dstAddr) < 1 { klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it")) return nil @@ -266,7 +227,7 @@ func getSrcAddressForProxyServer(client client.Client, nodeName string) []string return srcAddr } -func getDestAddressForProxyClient(client client.Client, ownGateway *v1beta1.Gateway) []string { +func getDestAddressForProxyClient(client client.Client, localGateway *v1beta1.Gateway) []string { destAddr := make([]string, 0) var gwList v1beta1.GatewayList err := client.List(context.TODO(), &gwList) @@ -277,7 +238,7 @@ func getDestAddressForProxyClient(client client.Client, ownGateway *v1beta1.Gate if gw.Spec.ExposeType == "" { continue } - if ownGateway != nil && ownGateway.Name == gw.Name { + if localGateway != nil && localGateway.Name == gw.Name { continue } for _, aep := range gw.Status.ActiveEndpoints { @@ -296,17 +257,11 @@ func (p *ProxyEngine) getRole(enableProxy bool) (enableServer, enableClient bool if !enableProxy { return } - var gwList v1beta1.GatewayList - err := p.client.List(p.ctx, &gwList) - if err != nil { - return - } - - for _, gw := range gwList.Items { - for _, aep := range gw.Status.ActiveEndpoints { + if p.localGateway != nil { + for _, aep := range p.localGateway.Status.ActiveEndpoints { if aep.NodeName == p.nodeName && aep.Type == v1beta1.Proxy { enableClient = true - if gw.Spec.ExposeType != "" { + if p.localGateway.Spec.ExposeType != "" { enableServer = true } else { enableServer = false @@ -314,7 +269,7 @@ func (p *ProxyEngine) getRole(enableProxy bool) (enableServer, enableClient bool return } } - for _, node := range gw.Status.Nodes { + for _, node := range p.localGateway.Status.Nodes { if node.NodeName == p.nodeName { enableServer = false enableClient = false @@ -327,7 +282,7 @@ func (p *ProxyEngine) getRole(enableProxy bool) (enableServer, enableClient bool return } -func (p *ProxyEngine) stopServers() { +func (p *ProxyEngine) stop() { if p.proxyOption.GetServerStatus() { cancelServer := p.proxyCtx.GetServerCancelFunc() cancelServer() @@ -337,43 +292,3 @@ func (p *ProxyEngine) stopServers() { cancelClient() } } - -func enableProxy(gw *v1beta1.Gateway) (enable bool) { - enable = false - for _, aep := range gw.Status.ActiveEndpoints { - if aep.Type == v1beta1.Proxy { - if aep.Config == nil { - enable = false - return - } - start, ok := aep.Config[utils.RavenEnableProxy] - if !ok { - enable = false - return - } - if strings.ToLower(start) == "true" { - enable = true - } - } - } - return -} - -func (p *ProxyEngine) handleEventErr(err error, event interface{}) { - if err == nil { - p.queue.Forget(event) - return - } - if p.queue.NumRequeues(event) < utils.MaxRetries { - klog.Infof("error syncing event %v: %v", event, err) - p.queue.AddRateLimited(event) - return - } - klog.Infof("dropping event %q out of the queue: %v", event, err) - p.queue.Forget(event) -} - -func computeHash(target string) string { - hash := sha256.Sum224([]byte(target)) - return strings.ToLower(hex.EncodeToString(hash[:])) -} diff --git a/pkg/engine/tunnel.go b/pkg/engine/tunnel.go index fa14b37..07efca6 100644 --- a/pkg/engine/tunnel.go +++ b/pkg/engine/tunnel.go @@ -1,122 +1,281 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package engine import ( + "context" "fmt" - "strings" + "net" + "reflect" + "strconv" - "k8s.io/client-go/util/workqueue" + "github.com/EvilSuperstars/go-cidrman" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/openyurtio/api/raven" "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" - "github.com/openyurtio/raven/pkg/networkengine/routedriver" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver" - "github.com/openyurtio/raven/pkg/tunnelengine" + "github.com/openyurtio/raven/pkg/tunnelengine/routedriver" + "github.com/openyurtio/raven/pkg/tunnelengine/vpndriver" + "github.com/openyurtio/raven/pkg/types" "github.com/openyurtio/raven/pkg/utils" ) type TunnelEngine struct { nodeName string - config *config.Config - client client.Client - option *Option - queue workqueue.RateLimitingInterface - routeDriver routedriver.Driver - vpnDriver vpndriver.Driver - tunnelHandler *tunnelengine.TunnelHandler -} + forwardNodeIP bool + natTraversal bool + + localGateway *v1beta1.Gateway + config *config.Config + ravenClient client.Client + routeDriver routedriver.Driver + vpnDriver vpndriver.Driver -func newTunnelEngine(cfg *config.Config, client client.Client, opt *Option, queue workqueue.RateLimitingInterface) *TunnelEngine { - return &TunnelEngine{nodeName: cfg.NodeName, config: cfg, client: client, option: opt, queue: queue} + nodeInfos map[types.NodeName]*v1beta1.NodeInfo + network *types.Network + lastSeenNetwork *types.Network } -func (t *TunnelEngine) worker() { - for t.processNextWorkItem() { +func (c *TunnelEngine) InitDriver() error { + var err error + c.routeDriver, err = routedriver.New(c.config.Tunnel.RouteDriver, c.config) + if err != nil { + return fmt.Errorf("fail to create route driver: %s, %s", c.config.Tunnel.RouteDriver, err) } + err = c.routeDriver.Init() + if err != nil { + return fmt.Errorf("fail to initialize route driver: %s, %s", c.config.Tunnel.RouteDriver, err) + } + c.vpnDriver, err = vpndriver.New(c.config.Tunnel.VPNDriver, c.config) + if err != nil { + return fmt.Errorf("fail to create vpn driver: %s, %s", c.config.Tunnel.VPNDriver, err) + } + err = c.vpnDriver.Init() + if err != nil { + return fmt.Errorf("fail to initialize vpn driver: %s, %s", c.config.Tunnel.VPNDriver, err) + } + klog.Info(utils.FormatTunnel("route driver %s and vpn driver %s are initialized", c.config.Tunnel.RouteDriver, c.config.Tunnel.VPNDriver)) + return nil } -func (t *TunnelEngine) processNextWorkItem() bool { - obj, quit := t.queue.Get() - if quit { - return false +func (c *TunnelEngine) CleanupDriver() error { + err := c.routeDriver.Cleanup() + if err != nil { + return fmt.Errorf("fail to cleanup route driver: %s", err.Error()) } - gw, ok := obj.(*v1beta1.Gateway) - if !ok { - return false + err = c.vpnDriver.Cleanup() + if err != nil { + return fmt.Errorf("fail to cleanup vpn driver: %s", err.Error()) } - defer t.queue.Done(gw) - err := t.handler(gw) - t.handleEventErr(err, gw) - return true + return nil +} + +func (c *TunnelEngine) Status() bool { + aep := getActiveEndpoints(c.localGateway, v1beta1.Tunnel) + if aep != nil && aep.Config != nil { + enable, err := strconv.ParseBool(aep.Config[utils.RavenEnableTunnel]) + if err == nil { + return enable + } + } + return false } -func (t *TunnelEngine) handler(gw *v1beta1.Gateway) error { - klog.Info(utils.FormatRavenEngine("update raven l3 tunnel config for gateway %s", gw.GetName())) - if t.config.Tunnel.NATTraversal { - if err := t.checkNatCapability(); err != nil { +// sync syncs full state according to the gateway list. +func (c *TunnelEngine) Handler() error { + if c.config.Tunnel.NATTraversal { + if err := c.checkNatCapability(); err != nil { + klog.Errorf(utils.FormatTunnel("fail to check the capability of NAT, error %s", err.Error())) return err } } - err := t.initDriver() + var gws v1beta1.GatewayList + err := c.ravenClient.List(context.Background(), &gws) if err != nil { - klog.Errorf(utils.FormatRavenEngine("failed to init raven l3 tunnel engine")) + return err } + // As we are going to rebuild a full state, so cleanup before proceeding. + c.network = &types.Network{ + LocalEndpoint: nil, + RemoteEndpoints: make(map[types.GatewayName]*types.Endpoint), + LocalNodeInfo: make(map[types.NodeName]*v1beta1.NodeInfo), + RemoteNodeInfo: make(map[types.NodeName]*v1beta1.NodeInfo), + } + c.nodeInfos = make(map[types.NodeName]*v1beta1.NodeInfo) - err = t.tunnelHandler.Handler() + for i := range gws.Items { + // try to update public IP if empty. + gw := &gws.Items[i] + if ep := getActiveEndpoints(gw, v1beta1.Tunnel); ep != nil { + if ep.PublicIP == "" || c.natTraversal && (ep.NATType == "" || ep.PublicPort == 0 && ep.NATType != utils.NATSymmetric) { + if ep.PublicIP == "" { + if err := c.configGatewayPublicIP(gw); err != nil { + klog.ErrorS(err, "error config gateway public ip", "gateway", klog.KObj(gw)) + } + } + if c.natTraversal && (ep.NATType == "" || ep.PublicPort == 0 && ep.NATType != utils.NATSymmetric) { + if err := c.configGatewayStunInfo(gw); err != nil { + klog.ErrorS(err, "error config gateway stun info", "gateway", klog.KObj(gw)) + } + } + continue + } + } + if !c.shouldHandleGateway(gw) { + continue + } + c.syncNodeInfo(gw.Status.Nodes) + } + for i := range gws.Items { + gw := &gws.Items[i] + if !c.shouldHandleGateway(gw) { + continue + } + c.syncGateway(gw) + } + if reflect.DeepEqual(c.network, c.lastSeenNetwork) { + klog.Info("network not changed, skip to process") + return nil + } + nw := c.network.Copy() + klog.InfoS("applying network", "localEndpoint", nw.LocalEndpoint, "remoteEndpoint", nw.RemoteEndpoints) + err = c.vpnDriver.Apply(nw, c.routeDriver.MTU) if err != nil { + klog.ErrorS(err, "error apply vpn driver") return err } - t.option.SetTunnelStatus(enableTunnel(gw)) + err = c.routeDriver.Apply(nw, c.vpnDriver.MTU) + if err != nil { + klog.ErrorS(err, "error apply route driver") + return err + } + + // Only update lastSeenNetwork when all operations succeeded. + c.lastSeenNetwork = c.network + return nil } -func (t *TunnelEngine) initDriver() error { - var err error - if t.routeDriver == nil { - t.routeDriver, err = routedriver.New(t.config.Tunnel.RouteDriver, t.config) - if err != nil { - return fmt.Errorf("fail to create route driver: %s, %s", t.config.Tunnel.RouteDriver, err) - } - err = t.routeDriver.Init() - if err != nil { - return fmt.Errorf("fail to initialize route driver: %s, %s", t.config.Tunnel.RouteDriver, err) - } - klog.Info(utils.FormatRavenEngine("route driver %s initialized", t.config.Tunnel.RouteDriver)) +func (c *TunnelEngine) syncNodeInfo(nodes []v1beta1.NodeInfo) { + for _, v := range nodes { + c.nodeInfos[types.NodeName(v.NodeName)] = v.DeepCopy() } +} - if t.vpnDriver == nil { - t.vpnDriver, err = vpndriver.New(t.config.Tunnel.VPNDriver, t.config) - if err != nil { - return fmt.Errorf("fail to create vpn driver: %s, %s", t.config.Tunnel.VPNDriver, err) +func (c *TunnelEngine) appendNodeIP(gw *v1beta1.Gateway) { + for i := range gw.Status.Nodes { + nodeSubnet := net.IPNet{ + IP: net.ParseIP(gw.Status.Nodes[i].PrivateIP), + Mask: []byte{0xff, 0xff, 0xff, 0xff}, } - err = t.vpnDriver.Init() - if err != nil { - return fmt.Errorf("fail to initialize vpn driver: %s, %s", t.config.Tunnel.VPNDriver, err) - } - klog.Info(utils.FormatRavenEngine("VPN driver %s initialized", t.config.Tunnel.VPNDriver)) + gw.Status.Nodes[i].Subnets = append(gw.Status.Nodes[i].Subnets, nodeSubnet.String()) } +} - if t.tunnelHandler == nil { - t.tunnelHandler = tunnelengine.NewTunnelHandler(t.nodeName, t.config.Tunnel.ForwardNodeIP, t.config.Tunnel.NATTraversal, t.client, t.routeDriver, t.vpnDriver) +func (c *TunnelEngine) getMergedSubnets(nodeInfo []v1beta1.NodeInfo) []string { + subnets := make([]string, 0) + for _, n := range nodeInfo { + subnets = append(subnets, n.Subnets...) } - return nil + subnets, _ = cidrman.MergeCIDRs(subnets) + return subnets } -func (t *TunnelEngine) clearDriver() error { - err := t.routeDriver.Cleanup() - if err != nil { - klog.Errorf(utils.FormatRavenEngine("fail to cleanup route driver: %s", err.Error())) +func (c *TunnelEngine) syncGateway(gw *v1beta1.Gateway) { + if c.forwardNodeIP { + c.appendNodeIP(gw) } - err = t.vpnDriver.Cleanup() - if err != nil { - klog.Errorf(utils.FormatRavenEngine("fail to cleanup vpn driver: %s", err.Error())) + aep := getActiveEndpoints(gw, v1beta1.Tunnel) + subnets := c.getMergedSubnets(gw.Status.Nodes) + cfg := make(map[string]string) + for k := range aep.Config { + cfg[k] = aep.Config[k] } - return nil + var nodeInfo *v1beta1.NodeInfo + if nodeInfo = c.nodeInfos[types.NodeName(aep.NodeName)]; nodeInfo == nil { + klog.Errorf("node %s is found in Endpoint but not existed in NodeInfo", aep.NodeName) + return + } + ep := &types.Endpoint{ + GatewayName: types.GatewayName(gw.Name), + NodeName: types.NodeName(aep.NodeName), + Subnets: subnets, + PrivateIP: nodeInfo.PrivateIP, + PublicPort: aep.PublicPort, + PublicIP: aep.PublicIP, + UnderNAT: aep.UnderNAT, + NATType: aep.NATType, + Config: cfg, + } + var isLocalGateway bool + defer func() { + for _, v := range gw.Status.Nodes { + if isLocalGateway { + c.network.LocalNodeInfo[types.NodeName(v.NodeName)] = v.DeepCopy() + } else { + c.network.RemoteNodeInfo[types.NodeName(v.NodeName)] = v.DeepCopy() + } + } + }() + + if gw.Name == c.localGateway.Name { + c.network.LocalEndpoint = ep + isLocalGateway = true + } else { + c.network.RemoteEndpoints[types.GatewayName(gw.Name)] = ep + } + } -func (t *TunnelEngine) checkNatCapability() error { +func (c *TunnelEngine) shouldHandleGateway(gateway *v1beta1.Gateway) bool { + ep := getActiveEndpoints(gateway, v1beta1.Tunnel) + if ep == nil { + klog.InfoS("no active endpoint , waiting for sync", "gateway", klog.KObj(gateway)) + return false + } + if ep.PublicIP == "" { + klog.InfoS("no public IP for gateway, waiting for sync", "gateway", klog.KObj(gateway)) + return false + } + if c.natTraversal { + if ep.NATType == "" { + klog.InfoS("no nat type for gateway, waiting for sync", "gateway", klog.KObj(gateway)) + return false + } + if ep.NATType != utils.NATSymmetric && ep.PublicPort == 0 { + klog.InfoS("no public port for gateway, waiting for sync", "gateway", klog.KObj(gateway)) + return false + } + } + if c.localGateway == nil { + klog.InfoS(fmt.Sprintf("no own gateway for node %s, skip it", c.nodeName), "gateway", klog.KObj(gateway)) + return false + } + return true +} + +func (c *TunnelEngine) checkNatCapability() error { natType, err := utils.GetNATType() if err != nil { return err @@ -134,37 +293,111 @@ func (t *TunnelEngine) checkNatCapability() error { return nil } -func (t *TunnelEngine) handleEventErr(err error, event interface{}) { - if err == nil { - t.queue.Forget(event) - return +func (c *TunnelEngine) configGatewayPublicIP(gateway *v1beta1.Gateway) error { + if getActiveEndpoints(gateway, v1beta1.Tunnel).NodeName != c.nodeName { + return nil } - if t.queue.NumRequeues(event) < utils.MaxRetries { - klog.Info(utils.FormatRavenEngine("error syncing event %v: %v", event, err)) - t.queue.AddRateLimited(event) - return + var publicIP string + var err error + if gateway.Spec.ExposeType == v1beta1.ExposeTypeLoadBalancer { + publicIP, err = c.getLoadBalancerPublicIP(gateway.GetName()) + if err != nil { + return err + } + } else { + publicIP, err = utils.GetPublicIP() + if err != nil { + return err + } } - klog.Info(utils.FormatRavenEngine("dropping event %q out of the queue: %v", event, err)) - t.queue.Forget(event) -} -func enableTunnel(gw *v1beta1.Gateway) (enable bool) { - enable = false - for _, aep := range gw.Status.ActiveEndpoints { - if aep.Type == v1beta1.Tunnel { - if aep.Config == nil { - enable = false - return - } - start, ok := aep.Config[utils.RavenEnableTunnel] - if !ok { - enable = false - return + // retry to update public ip of localGateway + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // get localGateway from api server + var apiGw v1beta1.Gateway + err := c.ravenClient.Get(context.Background(), client.ObjectKey{ + Name: gateway.Name, + }, &apiGw) + if err != nil { + return err + } + for k, v := range apiGw.Spec.Endpoints { + if v.NodeName == c.nodeName && v.Type == v1beta1.Tunnel { + apiGw.Spec.Endpoints[k].PublicIP = publicIP + err = c.ravenClient.Update(context.Background(), &apiGw) + return err } - if strings.ToLower(start) == "true" { - enable = true + } + return nil + }) + return err +} + +func (c *TunnelEngine) configGatewayStunInfo(gateway *v1beta1.Gateway) error { + if getActiveEndpoints(gateway, v1beta1.Tunnel).NodeName != c.nodeName { + return nil + } + + natType, err := utils.GetNATType() + if err != nil { + return err + } + + var publicPort int + if natType != utils.NATSymmetric { + publicPort, err = utils.GetPublicPort() + if err != nil { + return err + } + } + + // retry to update nat type of localGateway + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // get localGateway from api server + var apiGw v1beta1.Gateway + err := c.ravenClient.Get(context.Background(), client.ObjectKey{ + Name: gateway.Name, + }, &apiGw) + if err != nil { + return err + } + for k, v := range apiGw.Spec.Endpoints { + if v.NodeName == c.nodeName { + apiGw.Spec.Endpoints[k].NATType = natType + if natType != utils.NATSymmetric { + apiGw.Spec.Endpoints[k].PublicPort = publicPort + } + err = c.ravenClient.Update(context.Background(), &apiGw) + return err } } + return nil + }) + return err +} + +func (c *TunnelEngine) getLoadBalancerPublicIP(gwName string) (string, error) { + var svcList v1.ServiceList + err := c.ravenClient.List(context.TODO(), &svcList, &client.ListOptions{ + LabelSelector: labels.Set{ + raven.LabelCurrentGateway: gwName, + utils.LabelCurrentGatewayType: v1beta1.Tunnel, + utils.LabelCurrentGatewayEndpoints: c.nodeName, + }.AsSelector(), + }) + if err != nil { + return "", err + } + if len(svcList.Items) == 0 { + return "", apierrors.NewNotFound(v1.Resource("service"), fmt.Sprintf("%s-%s", "x-raven-proxy-svc-%s", gwName)) + } + svc := svcList.Items[0] + if svc.Status.LoadBalancer.Ingress == nil && len(svc.Status.LoadBalancer.Ingress) == 0 { + return "", apierrors.NewNotFound(v1.Resource("service"), svc.GetName()) + } + publicIP := svc.Status.LoadBalancer.Ingress[0].IP + if publicIP == "" { + return "", apierrors.NewServiceUnavailable(fmt.Sprintf("service %s/%s has no public ingress", svc.GetNamespace(), svc.GetName())) } - return + return publicIP, nil } diff --git a/pkg/engine/utils.go b/pkg/engine/utils.go index 7b72dd1..ce69e40 100644 --- a/pkg/engine/utils.go +++ b/pkg/engine/utils.go @@ -3,6 +3,10 @@ package engine import ( "context" "sync" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/openyurtio/api/raven/v1beta1" ) type Option struct { @@ -131,3 +135,29 @@ func (r *proxyContexts) GetServerCancelFunc() context.CancelFunc { defer r.mu.Unlock() return r.serverCancel } + +func findCentreGateway(client client.Client) *v1beta1.Gateway { + var gwList v1beta1.GatewayList + err := client.List(context.TODO(), &gwList) + if err != nil { + return nil + } + for _, gw := range gwList.Items { + if gw.Spec.ExposeType != "" { + return gw.DeepCopy() + } + } + return nil +} + +func getActiveEndpoints(gw *v1beta1.Gateway, aepType string) *v1beta1.Endpoint { + if gw == nil || gw.Status.ActiveEndpoints == nil { + return nil + } + for _, aep := range gw.Status.ActiveEndpoints { + if aep.Type == aepType { + return aep.DeepCopy() + } + } + return nil +} diff --git a/pkg/networkengine/routedriver/driver.go b/pkg/tunnelengine/routedriver/driver.go similarity index 100% rename from pkg/networkengine/routedriver/driver.go rename to pkg/tunnelengine/routedriver/driver.go diff --git a/pkg/networkengine/routedriver/vxlan/utils.go b/pkg/tunnelengine/routedriver/vxlan/utils.go similarity index 98% rename from pkg/networkengine/routedriver/vxlan/utils.go rename to pkg/tunnelengine/routedriver/vxlan/utils.go index f3abf9b..abf442b 100644 --- a/pkg/networkengine/routedriver/vxlan/utils.go +++ b/pkg/tunnelengine/routedriver/vxlan/utils.go @@ -27,7 +27,7 @@ import ( "golang.org/x/sys/unix" "k8s.io/klog/v2" - netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" + netlinkutil "github.com/openyurtio/raven/pkg/tunnelengine/util/netlink" "github.com/openyurtio/raven/pkg/utils" ) diff --git a/pkg/networkengine/routedriver/vxlan/vxlan.go b/pkg/tunnelengine/routedriver/vxlan/vxlan.go similarity index 98% rename from pkg/networkengine/routedriver/vxlan/vxlan.go rename to pkg/tunnelengine/routedriver/vxlan/vxlan.go index 0f74483..d64fd79 100644 --- a/pkg/networkengine/routedriver/vxlan/vxlan.go +++ b/pkg/tunnelengine/routedriver/vxlan/vxlan.go @@ -31,10 +31,10 @@ import ( "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" - "github.com/openyurtio/raven/pkg/networkengine/routedriver" - networkutil "github.com/openyurtio/raven/pkg/networkengine/util" - ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" - iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" + "github.com/openyurtio/raven/pkg/tunnelengine/routedriver" + networkutil "github.com/openyurtio/raven/pkg/tunnelengine/util" + ipsetutil "github.com/openyurtio/raven/pkg/tunnelengine/util/ipset" + iptablesutil "github.com/openyurtio/raven/pkg/tunnelengine/util/iptables" "github.com/openyurtio/raven/pkg/types" "github.com/openyurtio/raven/pkg/utils" ) diff --git a/pkg/networkengine/routedriver/vxlan/vxlan_test.go b/pkg/tunnelengine/routedriver/vxlan/vxlan_test.go similarity index 98% rename from pkg/networkengine/routedriver/vxlan/vxlan_test.go rename to pkg/tunnelengine/routedriver/vxlan/vxlan_test.go index c529e66..52efa2b 100644 --- a/pkg/networkengine/routedriver/vxlan/vxlan_test.go +++ b/pkg/tunnelengine/routedriver/vxlan/vxlan_test.go @@ -24,8 +24,8 @@ import ( "github.com/vishvananda/netlink" "github.com/openyurtio/api/raven/v1beta1" - networkutil "github.com/openyurtio/raven/pkg/networkengine/util" - netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" + networkutil "github.com/openyurtio/raven/pkg/tunnelengine/util" + netlinkutil "github.com/openyurtio/raven/pkg/tunnelengine/util/netlink" "github.com/openyurtio/raven/pkg/types" ) diff --git a/pkg/tunnelengine/tunnelagent.go b/pkg/tunnelengine/tunnelagent.go deleted file mode 100644 index c99f9af..0000000 --- a/pkg/tunnelengine/tunnelagent.go +++ /dev/null @@ -1,352 +0,0 @@ -/* -Copyright 2023 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tunnelengine - -import ( - "context" - "fmt" - "net" - "reflect" - - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/util/retry" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/EvilSuperstars/go-cidrman" - "github.com/openyurtio/api/raven" - "github.com/openyurtio/api/raven/v1beta1" - "github.com/openyurtio/raven/pkg/networkengine/routedriver" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver" - "github.com/openyurtio/raven/pkg/types" - "github.com/openyurtio/raven/pkg/utils" -) - -type TunnelHandler struct { - nodeName string - forwardNodeIP bool - natTraversal bool - ownGateway *v1beta1.Gateway - - ravenClient client.Client - routeDriver routedriver.Driver - vpnDriver vpndriver.Driver - - nodeInfos map[types.NodeName]*v1beta1.NodeInfo - network *types.Network - lastSeenNetwork *types.Network -} - -func NewTunnelHandler(nodeName string, forwardNodeIP bool, natTraversal bool, client client.Client, routeDriver routedriver.Driver, vpnDriver vpndriver.Driver) *TunnelHandler { - return &TunnelHandler{ - nodeName: nodeName, - forwardNodeIP: forwardNodeIP, - natTraversal: natTraversal, - ravenClient: client, - routeDriver: routeDriver, - vpnDriver: vpnDriver, - } -} - -// sync syncs full state according to the gateway list. -func (c *TunnelHandler) Handler() error { - var gws v1beta1.GatewayList - err := c.ravenClient.List(context.Background(), &gws) - if err != nil { - return err - } - // As we are going to rebuild a full state, so cleanup before proceeding. - c.network = &types.Network{ - LocalEndpoint: nil, - RemoteEndpoints: make(map[types.GatewayName]*types.Endpoint), - LocalNodeInfo: make(map[types.NodeName]*v1beta1.NodeInfo), - RemoteNodeInfo: make(map[types.NodeName]*v1beta1.NodeInfo), - } - c.nodeInfos = make(map[types.NodeName]*v1beta1.NodeInfo) - - c.ownGateway, err = utils.GetOwnGateway(c.ravenClient, c.nodeName) - if err != nil { - return fmt.Errorf("failed to get own gateway, error %s", err.Error()) - } - - for i := range gws.Items { - // try to update public IP if empty. - gw := &gws.Items[i] - if ep := getTunnelActiveEndpoints(gw); ep != nil { - if ep.PublicIP == "" || c.natTraversal && (ep.NATType == "" || ep.PublicPort == 0 && ep.NATType != utils.NATSymmetric) { - if ep.PublicIP == "" { - if err := c.configGatewayPublicIP(gw); err != nil { - klog.ErrorS(err, "error config gateway public ip", "gateway", klog.KObj(gw)) - } - } - if c.natTraversal && (ep.NATType == "" || ep.PublicPort == 0 && ep.NATType != utils.NATSymmetric) { - if err := c.configGatewayStunInfo(gw); err != nil { - klog.ErrorS(err, "error config gateway stun info", "gateway", klog.KObj(gw)) - } - } - continue - } - } - if !c.shouldHandleGateway(gw) { - continue - } - c.syncNodeInfo(gw.Status.Nodes) - } - for i := range gws.Items { - gw := &gws.Items[i] - if !c.shouldHandleGateway(gw) { - continue - } - c.syncGateway(gw) - } - if reflect.DeepEqual(c.network, c.lastSeenNetwork) { - klog.Info("network not changed, skip to process") - return nil - } - nw := c.network.Copy() - klog.InfoS("applying network", "localEndpoint", nw.LocalEndpoint, "remoteEndpoint", nw.RemoteEndpoints) - err = c.vpnDriver.Apply(nw, c.routeDriver.MTU) - if err != nil { - return err - } - err = c.routeDriver.Apply(nw, c.vpnDriver.MTU) - if err != nil { - return err - } - - // Only update lastSeenNetwork when all operations succeeded. - c.lastSeenNetwork = c.network - return nil -} - -func (c *TunnelHandler) syncNodeInfo(nodes []v1beta1.NodeInfo) { - for _, v := range nodes { - c.nodeInfos[types.NodeName(v.NodeName)] = v.DeepCopy() - } -} - -func (c *TunnelHandler) appendNodeIP(gw *v1beta1.Gateway) { - for i := range gw.Status.Nodes { - nodeSubnet := net.IPNet{ - IP: net.ParseIP(gw.Status.Nodes[i].PrivateIP), - Mask: []byte{0xff, 0xff, 0xff, 0xff}, - } - gw.Status.Nodes[i].Subnets = append(gw.Status.Nodes[i].Subnets, nodeSubnet.String()) - } -} - -func (c *TunnelHandler) getMergedSubnets(nodeInfo []v1beta1.NodeInfo) []string { - subnets := make([]string, 0) - for _, n := range nodeInfo { - subnets = append(subnets, n.Subnets...) - } - subnets, _ = cidrman.MergeCIDRs(subnets) - return subnets -} - -func (c *TunnelHandler) syncGateway(gw *v1beta1.Gateway) { - if c.forwardNodeIP { - c.appendNodeIP(gw) - } - aep := getTunnelActiveEndpoints(gw) - subnets := c.getMergedSubnets(gw.Status.Nodes) - cfg := make(map[string]string) - for k := range aep.Config { - cfg[k] = aep.Config[k] - } - var nodeInfo *v1beta1.NodeInfo - if nodeInfo = c.nodeInfos[types.NodeName(aep.NodeName)]; nodeInfo == nil { - klog.Errorf("node %s is found in Endpoint but not existed in NodeInfo", aep.NodeName) - return - } - ep := &types.Endpoint{ - GatewayName: types.GatewayName(gw.Name), - NodeName: types.NodeName(aep.NodeName), - Subnets: subnets, - PrivateIP: nodeInfo.PrivateIP, - PublicPort: aep.PublicPort, - PublicIP: aep.PublicIP, - UnderNAT: aep.UnderNAT, - NATType: aep.NATType, - Config: cfg, - } - var isLocalGateway bool - defer func() { - for _, v := range gw.Status.Nodes { - if isLocalGateway { - c.network.LocalNodeInfo[types.NodeName(v.NodeName)] = v.DeepCopy() - } else { - c.network.RemoteNodeInfo[types.NodeName(v.NodeName)] = v.DeepCopy() - } - } - }() - - if gw.Name == c.ownGateway.Name { - c.network.LocalEndpoint = ep - isLocalGateway = true - } else { - c.network.RemoteEndpoints[types.GatewayName(gw.Name)] = ep - } - -} - -func (c *TunnelHandler) shouldHandleGateway(gateway *v1beta1.Gateway) bool { - ep := getTunnelActiveEndpoints(gateway) - if ep == nil { - klog.InfoS("no active endpoint , waiting for sync", "gateway", klog.KObj(gateway)) - return false - } - if ep.PublicIP == "" { - klog.InfoS("no public IP for gateway, waiting for sync", "gateway", klog.KObj(gateway)) - return false - } - if c.natTraversal { - if ep.NATType == "" { - klog.InfoS("no nat type for gateway, waiting for sync", "gateway", klog.KObj(gateway)) - return false - } - if ep.NATType != utils.NATSymmetric && ep.PublicPort == 0 { - klog.InfoS("no public port for gateway, waiting for sync", "gateway", klog.KObj(gateway)) - return false - } - } - if c.ownGateway == nil { - klog.InfoS(fmt.Sprintf("no own gateway for node %s, skip it", c.nodeName), "gateway", klog.KObj(gateway)) - return false - } - return true -} - -func (c *TunnelHandler) configGatewayPublicIP(gateway *v1beta1.Gateway) error { - if getTunnelActiveEndpoints(gateway).NodeName != c.nodeName { - return nil - } - var publicIP string - var err error - if gateway.Spec.ExposeType == v1beta1.ExposeTypeLoadBalancer { - publicIP, err = c.getLoadBalancerPublicIP(gateway.GetName()) - if err != nil { - return err - } - } else { - publicIP, err = utils.GetPublicIP() - if err != nil { - return err - } - } - - // retry to update public ip of localGateway - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - // get localGateway from api server - var apiGw v1beta1.Gateway - err := c.ravenClient.Get(context.Background(), client.ObjectKey{ - Name: gateway.Name, - }, &apiGw) - if err != nil { - return err - } - for k, v := range apiGw.Spec.Endpoints { - if v.NodeName == c.nodeName && v.Type == v1beta1.Tunnel { - apiGw.Spec.Endpoints[k].PublicIP = publicIP - err = c.ravenClient.Update(context.Background(), &apiGw) - return err - } - } - return nil - }) - return err -} - -func (c *TunnelHandler) configGatewayStunInfo(gateway *v1beta1.Gateway) error { - if getTunnelActiveEndpoints(gateway).NodeName != c.nodeName { - return nil - } - - natType, err := utils.GetNATType() - if err != nil { - return err - } - - var publicPort int - if natType != utils.NATSymmetric { - publicPort, err = utils.GetPublicPort() - if err != nil { - return err - } - } - - // retry to update nat type of localGateway - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - // get localGateway from api server - var apiGw v1beta1.Gateway - err := c.ravenClient.Get(context.Background(), client.ObjectKey{ - Name: gateway.Name, - }, &apiGw) - if err != nil { - return err - } - for k, v := range apiGw.Spec.Endpoints { - if v.NodeName == c.nodeName { - apiGw.Spec.Endpoints[k].NATType = natType - if natType != utils.NATSymmetric { - apiGw.Spec.Endpoints[k].PublicPort = publicPort - } - err = c.ravenClient.Update(context.Background(), &apiGw) - return err - } - } - return nil - }) - return err -} - -func (c *TunnelHandler) getLoadBalancerPublicIP(gwName string) (string, error) { - var svcList v1.ServiceList - err := c.ravenClient.List(context.TODO(), &svcList, &client.ListOptions{ - LabelSelector: labels.Set{ - raven.LabelCurrentGateway: gwName, - utils.LabelCurrentGatewayType: v1beta1.Tunnel, - utils.LabelCurrentGatewayEndpoints: c.nodeName, - }.AsSelector(), - }) - if err != nil { - return "", err - } - if len(svcList.Items) == 0 { - return "", apierrors.NewNotFound(v1.Resource("service"), fmt.Sprintf("%s-%s", "x-raven-proxy-svc-%s", gwName)) - } - svc := svcList.Items[0] - if svc.Status.LoadBalancer.Ingress == nil && len(svc.Status.LoadBalancer.Ingress) == 0 { - return "", apierrors.NewNotFound(v1.Resource("service"), svc.GetName()) - } - publicIP := svc.Status.LoadBalancer.Ingress[0].IP - if publicIP == "" { - return "", apierrors.NewServiceUnavailable(fmt.Sprintf("service %s/%s has no public ingress", svc.GetNamespace(), svc.GetName())) - } - return publicIP, nil -} - -func getTunnelActiveEndpoints(gw *v1beta1.Gateway) *v1beta1.Endpoint { - for _, aep := range gw.Status.ActiveEndpoints { - if aep.Type == v1beta1.Tunnel { - return aep.DeepCopy() - } - } - return nil -} diff --git a/pkg/networkengine/util/ipset/ipset.go b/pkg/tunnelengine/util/ipset/ipset.go similarity index 100% rename from pkg/networkengine/util/ipset/ipset.go rename to pkg/tunnelengine/util/ipset/ipset.go diff --git a/pkg/networkengine/util/iptables/constants.go b/pkg/tunnelengine/util/iptables/constants.go similarity index 100% rename from pkg/networkengine/util/iptables/constants.go rename to pkg/tunnelengine/util/iptables/constants.go diff --git a/pkg/networkengine/util/iptables/iptables.go b/pkg/tunnelengine/util/iptables/iptables.go similarity index 100% rename from pkg/networkengine/util/iptables/iptables.go rename to pkg/tunnelengine/util/iptables/iptables.go diff --git a/pkg/networkengine/util/netlink/netlink.go b/pkg/tunnelengine/util/netlink/netlink.go similarity index 100% rename from pkg/networkengine/util/netlink/netlink.go rename to pkg/tunnelengine/util/netlink/netlink.go diff --git a/pkg/networkengine/util/utils.go b/pkg/tunnelengine/util/utils.go similarity index 98% rename from pkg/networkengine/util/utils.go rename to pkg/tunnelengine/util/utils.go index 23b0add..93ea159 100644 --- a/pkg/networkengine/util/utils.go +++ b/pkg/tunnelengine/util/utils.go @@ -28,8 +28,8 @@ import ( "github.com/vishvananda/netlink" "k8s.io/klog/v2" - ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" - netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" + ipsetutil "github.com/openyurtio/raven/pkg/tunnelengine/util/ipset" + netlinkutil "github.com/openyurtio/raven/pkg/tunnelengine/util/netlink" ) var ( diff --git a/pkg/networkengine/util/utils_test.go b/pkg/tunnelengine/util/utils_test.go similarity index 100% rename from pkg/networkengine/util/utils_test.go rename to pkg/tunnelengine/util/utils_test.go diff --git a/pkg/networkengine/vpndriver/driver.go b/pkg/tunnelengine/vpndriver/driver.go similarity index 98% rename from pkg/networkengine/vpndriver/driver.go rename to pkg/tunnelengine/vpndriver/driver.go index 59bada2..605e007 100644 --- a/pkg/networkengine/vpndriver/driver.go +++ b/pkg/tunnelengine/vpndriver/driver.go @@ -26,7 +26,7 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/raven/cmd/agent/app/config" - netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" + netlinkutil "github.com/openyurtio/raven/pkg/tunnelengine/util/netlink" "github.com/openyurtio/raven/pkg/types" "github.com/openyurtio/raven/pkg/utils" ) diff --git a/pkg/networkengine/vpndriver/driver_test.go b/pkg/tunnelengine/vpndriver/driver_test.go similarity index 100% rename from pkg/networkengine/vpndriver/driver_test.go rename to pkg/tunnelengine/vpndriver/driver_test.go diff --git a/pkg/networkengine/vpndriver/libreswan/libreswan.go b/pkg/tunnelengine/vpndriver/libreswan/libreswan.go similarity index 98% rename from pkg/networkengine/vpndriver/libreswan/libreswan.go rename to pkg/tunnelengine/vpndriver/libreswan/libreswan.go index 50395b0..900218a 100644 --- a/pkg/networkengine/vpndriver/libreswan/libreswan.go +++ b/pkg/tunnelengine/vpndriver/libreswan/libreswan.go @@ -28,9 +28,9 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/raven/cmd/agent/app/config" - iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" - netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver" + iptablesutil "github.com/openyurtio/raven/pkg/tunnelengine/util/iptables" + netlinkutil "github.com/openyurtio/raven/pkg/tunnelengine/util/netlink" + "github.com/openyurtio/raven/pkg/tunnelengine/vpndriver" "github.com/openyurtio/raven/pkg/types" "github.com/openyurtio/raven/pkg/utils" ) diff --git a/pkg/networkengine/vpndriver/libreswan/libreswan_test.go b/pkg/tunnelengine/vpndriver/libreswan/libreswan_test.go similarity index 98% rename from pkg/networkengine/vpndriver/libreswan/libreswan_test.go rename to pkg/tunnelengine/vpndriver/libreswan/libreswan_test.go index 529963d..bb59582 100644 --- a/pkg/networkengine/vpndriver/libreswan/libreswan_test.go +++ b/pkg/tunnelengine/vpndriver/libreswan/libreswan_test.go @@ -23,9 +23,9 @@ import ( "github.com/stretchr/testify/assert" - iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" - netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver" + iptablesutil "github.com/openyurtio/raven/pkg/tunnelengine/util/iptables" + netlinkutil "github.com/openyurtio/raven/pkg/tunnelengine/util/netlink" + "github.com/openyurtio/raven/pkg/tunnelengine/vpndriver" "github.com/openyurtio/raven/pkg/types" ) diff --git a/pkg/networkengine/vpndriver/wireguard/wireguard.go b/pkg/tunnelengine/vpndriver/wireguard/wireguard.go similarity index 98% rename from pkg/networkengine/vpndriver/wireguard/wireguard.go rename to pkg/tunnelengine/vpndriver/wireguard/wireguard.go index e17b24c..47e8be0 100644 --- a/pkg/networkengine/vpndriver/wireguard/wireguard.go +++ b/pkg/tunnelengine/vpndriver/wireguard/wireguard.go @@ -37,9 +37,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/openyurtio/raven/cmd/agent/app/config" - networkutil "github.com/openyurtio/raven/pkg/networkengine/util" - iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver" + networkutil "github.com/openyurtio/raven/pkg/tunnelengine/util" + iptablesutil "github.com/openyurtio/raven/pkg/tunnelengine/util/iptables" + "github.com/openyurtio/raven/pkg/tunnelengine/vpndriver" "github.com/openyurtio/raven/pkg/types" "github.com/openyurtio/raven/pkg/utils" ) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index d625de1..cad4572 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -17,45 +17,25 @@ limitations under the License. package utils import ( - "context" "fmt" - - "github.com/openyurtio/api/raven/v1beta1" - "sigs.k8s.io/controller-runtime/pkg/client" ) func FormatProxyServer(format string, args ...interface{}) string { s := fmt.Sprintf(format, args...) - return fmt.Sprintf("ProxyServer: %s", s) + return fmt.Sprintf("[Proxy Server]: %s", s) } func FormatProxyClient(format string, args ...interface{}) string { s := fmt.Sprintf(format, args...) - return fmt.Sprintf("ProxyClient: %s", s) + return fmt.Sprintf("[Proxy Client]: %s", s) } func FormatTunnel(format string, args ...interface{}) string { s := fmt.Sprintf(format, args...) - return fmt.Sprintf("Tunnel: %s", s) + return fmt.Sprintf("[Tunnel Agent]: %s", s) } func FormatRavenEngine(format string, args ...interface{}) string { s := fmt.Sprintf(format, args...) - return fmt.Sprintf("RavenEngine: %s", s) -} - -func GetOwnGateway(client client.Client, nodeName string) (*v1beta1.Gateway, error) { - var gwList v1beta1.GatewayList - err := client.List(context.TODO(), &gwList) - if err != nil { - return nil, err - } - for _, gw := range gwList.Items { - for _, node := range gw.Status.Nodes { - if node.NodeName == nodeName { - return gw.DeepCopy(), nil - } - } - } - return nil, nil + return fmt.Sprintf("[Raven Engine]: %s", s) }