From 1a5747516ed1ad37bbdf45536ec367e679adf361 Mon Sep 17 00:00:00 2001 From: OrangeBao Date: Tue, 19 Dec 2023 15:27:39 +0800 Subject: [PATCH] feat: periodic check of environmental variables Signed-off-by: OrangeBao --- pkg/clusterlink/network/env.go | 58 ++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/pkg/clusterlink/network/env.go b/pkg/clusterlink/network/env.go index 4b7e2217e..d47f5fde0 100644 --- a/pkg/clusterlink/network/env.go +++ b/pkg/clusterlink/network/env.go @@ -2,17 +2,64 @@ package network import ( "bytes" + "context" "fmt" "os" "strings" + "sync" + "time" ipt "github.com/coreos/go-iptables/iptables" "github.com/pkg/errors" "github.com/vishvananda/netlink" + "k8s.io/apimachinery/pkg/util/wait" "github.com/kosmos.io/kosmos/pkg/clusterlink/network/iptables" ) +var ( + watchDog *WatchDog +) + +type WatchDog struct { + WatchTasks []WatchTask + lock sync.Mutex +} + +func (w *WatchDog) AddTask(path string, contents []byte) { + w.lock.Lock() + defer w.lock.Unlock() + for i, task := range w.WatchTasks { + if task.Path == path { + w.WatchTasks[i].Contents = contents + return + } + } + w.WatchTasks = append(w.WatchTasks, WatchTask{Path: path, Contents: contents}) +} + +func (w *WatchDog) Watch(ctx context.Context) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, task := range w.WatchTasks { + // nolint:errcheck + setSysctl(task.Path, task.Contents) + } +} + +type WatchTask struct { + Path string + Contents []byte +} + +func init() { + watchDog = &WatchDog{ + WatchTasks: []WatchTask{}, + } + go wait.UntilWithContext(context.Background(), watchDog.Watch, 30*time.Second) +} + func UpdateDefaultIp6tablesBehavior(ifaceName string) error { iptableHandler, err := iptables.New(ipt.ProtocolIPv6) if err != nil { @@ -43,6 +90,11 @@ func UpdateDefaultIp4tablesBehavior(ifaceName string) error { return nil } +func setSysctlAndWatch(path string, contents []byte) error { + watchDog.AddTask(path, contents) + return setSysctl(path, contents) +} + // submariner\pkg\netlink\netlink.go func setSysctl(path string, contents []byte) error { existing, err := os.ReadFile(path) @@ -63,13 +115,13 @@ func setSysctl(path string, contents []byte) error { func EnableLooseModeByIFaceNmae(ifaceName string) error { // Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface. - err := setSysctl("/proc/sys/net/ipv4/conf/"+ifaceName+"/rp_filter", []byte("2")) + err := setSysctlAndWatch("/proc/sys/net/ipv4/conf/"+ifaceName+"/rp_filter", []byte("2")) return errors.Wrapf(err, "unable to update rp_filter proc entry for interface %q", ifaceName) } func EnableDisableIPV6ByIFaceNmae(ifaceName string) error { // Enable ipv6 (disable_ipv6=0) - err := setSysctl("/proc/sys/net/ipv6/conf/"+ifaceName+"/disable_ipv6", []byte("0")) + err := setSysctlAndWatch("/proc/sys/net/ipv6/conf/"+ifaceName+"/disable_ipv6", []byte("0")) return errors.Wrapf(err, "unable to update disable_ipv6 proc entry for interface %q", ifaceName) } @@ -84,7 +136,7 @@ func EnableLooseModeForFlannel() error { for _, link := range links { linkName := link.Attrs().Name if strings.HasPrefix(linkName, FLANNEL_DEV_NAME_PREFIX) { - if err := setSysctl("/proc/sys/net/ipv4/conf/"+linkName+"/rp_filter", []byte("2")); err != nil { + if err := setSysctlAndWatch("/proc/sys/net/ipv4/conf/"+linkName+"/rp_filter", []byte("2")); err != nil { errs = errors.Wrapf(errors.Wrapf(err, "unable to update flannel rp_filter proc entry for interface %q", linkName), fmt.Sprint(errs)) } }