Skip to content

Commit

Permalink
1. Optimize the reconcile process, use the ipsec status command to ob…
Browse files Browse the repository at this point in the history
…tain the current VPN link status, and compare the expected status with the current status for configuration. (Wireguard has the same principle)

2. Add a regular synchronization mechanism
  • Loading branch information
珩轩 committed Jun 4, 2024
1 parent a1d1502 commit 1f91e43
Show file tree
Hide file tree
Showing 16 changed files with 608 additions and 342 deletions.
2 changes: 2 additions & 0 deletions charts/raven-agent/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
- --vpn-bind-port={{.Values.vpn.tunnelAddr}}
- --keep-alive-interval={{.Values.vpn.keepAliveInterval}}
- --keep-alive-timeout={{.Values.vpn.keepAliveTimeout}}
- --sync-raven-rules={{.Values.sync.syncRule}}
- --sync-raven-rules-period={{.Values.sync.syncPeriod}}
- --proxy-metric-bind-addr={{.Values.proxy.metricsBindAddr}}
- --proxy-internal-secure-addr={{.Values.proxy.internalSecureAddr}}
- --proxy-internal-insecure-addr={{.Values.proxy.internalInsecureAddr}}
Expand Down
5 changes: 4 additions & 1 deletion charts/raven-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ containerEnv:
secretKeyRef:
key: vpn-connection-psk
name: raven-agent-secret
sync:
syncRule: true
syncPeriod: 30m

vpn:
driver: libreswan
Expand Down Expand Up @@ -86,4 +89,4 @@ proxy:
metricsBindAddr: ":10266"

rollingUpdate:
maxUnavailable: 5%
maxUnavailable: 20%
8 changes: 6 additions & 2 deletions cmd/agent/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package config

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// Config is the main context object for raven agent
type Config struct {
NodeName string
NodeIP string
NodeName string
NodeIP string
SyncRules bool
SyncPeriod metav1.Duration

MetricsBindAddress string
HealthProbeAddr string

Expand Down
17 changes: 15 additions & 2 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,6 +51,8 @@ type AgentOptions struct {
Kubeconfig string
MetricsBindAddress string
HealthProbeAddr string
SyncRules bool
SyncPeriod metav1.Duration
}

type TunnelOptions struct {
Expand Down Expand Up @@ -91,6 +94,12 @@ func (o *AgentOptions) Validate() error {
}
}
}
if o.SyncPeriod.Duration < time.Minute {
o.SyncPeriod.Duration = time.Minute
}
if o.SyncPeriod.Duration > 24*time.Hour {
o.SyncPeriod.Duration = 24 * time.Hour
}
return nil
}

Expand All @@ -103,6 +112,8 @@ func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.RouteDriver, "route-driver", o.RouteDriver, `The Route driver name. (default "vxlan")`)
fs.StringVar(&o.MetricsBindAddress, "metric-bind-addr", o.MetricsBindAddress, `Binding address of tunnel metrics. (default ":10265")`)
fs.StringVar(&o.HealthProbeAddr, "health-probe-addr", o.HealthProbeAddr, `The address the healthz/readyz endpoint binds to.. (default ":10275")`)
fs.BoolVar(&o.SyncRules, "sync-raven-rules", true, "Whether to synchronize raven rules regularly")
fs.DurationVar(&o.SyncPeriod.Duration, "sync-raven-rules-period", 10*time.Minute, "The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute and the maximum value is 24 hour")

fs.StringVar(&o.VPNPort, "vpn-bind-port", o.VPNPort, `Binding port of vpn. (default ":4500")`)
fs.BoolVar(&o.NATTraversal, "nat-traversal", o.NATTraversal, `Enable NAT Traversal or not. (default "false")`)
Expand Down Expand Up @@ -141,8 +152,10 @@ func (o *AgentOptions) Config() (*config.Config, error) {
}
cfg = restclient.AddUserAgent(cfg, "raven-agent-ds")
c := &config.Config{
NodeName: o.NodeName,
NodeIP: o.NodeIP,
NodeName: o.NodeName,
NodeIP: o.NodeIP,
SyncRules: o.SyncRules,
SyncPeriod: o.SyncPeriod,
}
c.KubeConfig = cfg
c.MetricsBindAddress = resolveAddress(c.MetricsBindAddress, resolveLocalHost(), strconv.Itoa(DefaultTunnelMetricsPort))
Expand Down
13 changes: 12 additions & 1 deletion cmd/agent/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package app
import (
"context"
"fmt"
"sync"
"time"

"github.com/lorenzosaino/go-sysctl"
"github.com/spf13/cobra"
"k8s.io/klog/v2"

"github.com/openyurtio/raven/cmd/agent/app/config"
"github.com/openyurtio/raven/cmd/agent/app/options"
ravenengine "github.com/openyurtio/raven/pkg/engine"
"github.com/openyurtio/raven/pkg/features"
"github.com/spf13/cobra"
)

// NewRavenAgentCommand creates a new raven agent command
Expand Down Expand Up @@ -70,6 +72,15 @@ func Run(ctx context.Context, cfg *config.CompletedConfig) error {
}
klog.Info("engine successfully start")
engine.Start()
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ctx.Done()
time.Sleep(time.Second)
engine.Cleanup()
wg.Done()
}()
wg.Wait()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var GitCommit string
func main() {
klog.InitFlags(nil)
defer klog.Flush()
rand.Seed(time.Now().UnixNano())
rand.NewSource(time.Now().UnixNano())
klog.Infof("component: %s, git commit: %s\n", "raven-agent-ds", GitCommit)
cmd := app.NewRavenAgentCommand(server.SetupSignalContext())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
Expand Down
84 changes: 49 additions & 35 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand All @@ -22,26 +23,31 @@ import (
)

type Engine struct {
nodeName string
nodeIP string
context context.Context
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface
nodeName string
nodeIP string
syncRules bool
syncPeriod metav1.Duration

context context.Context
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface

tunnel *TunnelEngine
proxy *ProxyEngine
}

func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
engine := &Engine{
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
syncRules: cfg.SyncRules,
syncPeriod: cfg.SyncPeriod,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
}
err := ctrl.NewControllerManagedBy(engine.manager).
For(&v1beta1.Gateway{}, builder.WithPredicates(predicate.Funcs{
Expand All @@ -53,7 +59,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
return reconcile.Result{}, nil
}))
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to new controller with manager, error %s", err.Error()))
klog.Errorf("fail to new controller with manager, error %s", err.Error())
return engine, err
}
engine.client = engine.manager.GetClient()
Expand All @@ -66,7 +72,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
}
err = engine.tunnel.InitDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to init tunnel driver, error %s", err.Error()))
klog.Errorf("fail to init tunnel driver, error %s", err.Error())
return engine, err
}

Expand All @@ -90,9 +96,12 @@ func (e *Engine) Start() {
klog.ErrorS(err, "failed to start engine controller")
}
}()

go wait.Until(e.worker, time.Second, e.context.Done())
<-e.context.Done()
e.cleanup()

if e.syncRules {
go wait.Until(e.regularSync, e.syncPeriod.Duration, e.context.Done())
}
}

func (e *Engine) worker() {
Expand All @@ -110,19 +119,29 @@ func (e *Engine) processNextWorkItem() bool {
return false
}
defer e.queue.Done(gw)
e.findLocalGateway()
err := e.tunnel.Handler()
err := e.sync()
if err != nil {
e.handleEventErr(err, gw)
}
e.option.SetTunnelStatus(e.tunnel.Status())
return true
}

err = e.proxy.Handler()
func (e *Engine) sync() error {
e.findLocalGateway()
err := e.proxy.Handler()
if err != nil {
e.handleEventErr(err, gw)
return err
}
err = e.tunnel.Handler()
if err != nil {
return err
}
e.option.SetTunnelStatus(e.tunnel.Status())
return nil
}

return true
func (e *Engine) regularSync() {
e.queue.Add(&v1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: "gw-sync"}})
}

func (e *Engine) findLocalGateway() {
Expand All @@ -144,12 +163,9 @@ func (e *Engine) findLocalGateway() {
}
}

func (e *Engine) cleanup() {
func (e *Engine) Cleanup() {
if e.option.GetTunnelStatus() {
err := e.tunnel.CleanupDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("failed to cleanup tunnel driver, error %s", err.Error()))
}
e.tunnel.CleanupDriver()
}
if e.option.GetProxyStatus() {
e.proxy.stop()
Expand All @@ -163,18 +179,18 @@ func (e *Engine) handleEventErr(err error, gw *v1beta1.Gateway) {
}

if e.queue.NumRequeues(gw) < utils.MaxRetries {
klog.Info(utils.FormatRavenEngine("error syncing event %s: %s", gw.GetName(), err.Error()))
klog.Infof("error syncing event %s: %s", gw.GetName(), err.Error())
e.queue.AddRateLimited(gw)
return
}
klog.Info(utils.FormatRavenEngine("dropping event %s out of the queue: %s", gw.GetName(), err.Error()))
klog.Infof("dropping event %s out of the queue: %s", gw.GetName(), err.Error())
e.queue.Forget(gw)
}

func (e *Engine) addGateway(evt event.CreateEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("adding gateway %s", gw.GetName()))
klog.Infof("adding gateway %s", gw.GetName())
e.queue.Add(gw.DeepCopy())
}
return ok
Expand All @@ -187,10 +203,8 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
if ok1 && ok2 {
if oldGw.ResourceVersion != newGw.ResourceVersion {
update = true
klog.InfoS(utils.FormatRavenEngine("updating gateway, %s", newGw.GetName()))
klog.Infof("updating gateway, %s", newGw.GetName())
e.queue.Add(newGw.DeepCopy())
} else {
klog.InfoS(utils.FormatRavenEngine("skip handle update gateway"), klog.KObj(newGw))
}
}
return update
Expand All @@ -199,7 +213,7 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
func (e *Engine) deleteGateway(evt event.DeleteEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("deleting gateway, %s", gw.GetName()))
klog.Infof("deleting gateway, %s", gw.GetName())
e.queue.Add(gw.DeepCopy())
}
return ok
Expand Down
Loading

0 comments on commit 1f91e43

Please sign in to comment.