diff --git a/pkg/clusterlink/network/env.go b/pkg/clusterlink/network/env.go index 4b7e2217e..d47f5fde0 100644 --- a/pkg/clusterlink/network/env.go +++ b/pkg/clusterlink/network/env.go @@ -2,17 +2,64 @@ package network import ( "bytes" + "context" "fmt" "os" "strings" + "sync" + "time" ipt "github.com/coreos/go-iptables/iptables" "github.com/pkg/errors" "github.com/vishvananda/netlink" + "k8s.io/apimachinery/pkg/util/wait" "github.com/kosmos.io/kosmos/pkg/clusterlink/network/iptables" ) +var ( + watchDog *WatchDog +) + +type WatchDog struct { + WatchTasks []WatchTask + lock sync.Mutex +} + +func (w *WatchDog) AddTask(path string, contents []byte) { + w.lock.Lock() + defer w.lock.Unlock() + for i, task := range w.WatchTasks { + if task.Path == path { + w.WatchTasks[i].Contents = contents + return + } + } + w.WatchTasks = append(w.WatchTasks, WatchTask{Path: path, Contents: contents}) +} + +func (w *WatchDog) Watch(ctx context.Context) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, task := range w.WatchTasks { + // nolint:errcheck + setSysctl(task.Path, task.Contents) + } +} + +type WatchTask struct { + Path string + Contents []byte +} + +func init() { + watchDog = &WatchDog{ + WatchTasks: []WatchTask{}, + } + go wait.UntilWithContext(context.Background(), watchDog.Watch, 30*time.Second) +} + func UpdateDefaultIp6tablesBehavior(ifaceName string) error { iptableHandler, err := iptables.New(ipt.ProtocolIPv6) if err != nil { @@ -43,6 +90,11 @@ func UpdateDefaultIp4tablesBehavior(ifaceName string) error { return nil } +func setSysctlAndWatch(path string, contents []byte) error { + watchDog.AddTask(path, contents) + return setSysctl(path, contents) +} + // submariner\pkg\netlink\netlink.go func setSysctl(path string, contents []byte) error { existing, err := os.ReadFile(path) @@ -63,13 +115,13 @@ func setSysctl(path string, contents []byte) error { func EnableLooseModeByIFaceNmae(ifaceName string) error { // Enable loose mode (rp_filter=2) reverse path filtering on the vxlan interface. - err := setSysctl("/proc/sys/net/ipv4/conf/"+ifaceName+"/rp_filter", []byte("2")) + err := setSysctlAndWatch("/proc/sys/net/ipv4/conf/"+ifaceName+"/rp_filter", []byte("2")) return errors.Wrapf(err, "unable to update rp_filter proc entry for interface %q", ifaceName) } func EnableDisableIPV6ByIFaceNmae(ifaceName string) error { // Enable ipv6 (disable_ipv6=0) - err := setSysctl("/proc/sys/net/ipv6/conf/"+ifaceName+"/disable_ipv6", []byte("0")) + err := setSysctlAndWatch("/proc/sys/net/ipv6/conf/"+ifaceName+"/disable_ipv6", []byte("0")) return errors.Wrapf(err, "unable to update disable_ipv6 proc entry for interface %q", ifaceName) } @@ -84,7 +136,7 @@ func EnableLooseModeForFlannel() error { for _, link := range links { linkName := link.Attrs().Name if strings.HasPrefix(linkName, FLANNEL_DEV_NAME_PREFIX) { - if err := setSysctl("/proc/sys/net/ipv4/conf/"+linkName+"/rp_filter", []byte("2")); err != nil { + if err := setSysctlAndWatch("/proc/sys/net/ipv4/conf/"+linkName+"/rp_filter", []byte("2")); err != nil { errs = errors.Wrapf(errors.Wrapf(err, "unable to update flannel rp_filter proc entry for interface %q", linkName), fmt.Sprint(errs)) } } diff --git a/pkg/utils/informers/InformerFactory.go b/pkg/utils/informers/InformerFactory.go new file mode 100644 index 000000000..f06cb63a3 --- /dev/null +++ b/pkg/utils/informers/InformerFactory.go @@ -0,0 +1,130 @@ +package informers + +import ( + "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + klog2 "k8s.io/klog/v2" + + kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + kosmosinformers "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" +) + +const DefaultResync = 3600 * time.Second + +type InformerFactory interface { + //K8sInformerFactory return the default InformerFactory + K8sInformerFactory() informers.SharedInformerFactory + + //KosmosInformerFactory return the kosmos InformerFactory + KosmosInformerFactory() kosmosinformers.SharedInformerFactory + + // SyncCache blocks until all register informers' caches were synced + // or the stop channel gets closed. + SyncCache(stopCh <-chan struct{}) error +} + +type informerFactory struct { + k8sClient kubernetes.Interface + kosmosClient kosmosversioned.Interface + + k8sInformerFactory informers.SharedInformerFactory + kosmosInformerFactory kosmosinformers.SharedInformerFactory + + k8sResources map[schema.GroupVersion][]string + kosmosResources map[schema.GroupVersion][]string +} + +func NewInformerFactory( + k8sClient kubernetes.Interface, + kosmosClient kosmosversioned.Interface, + k8sResources map[schema.GroupVersion][]string, + kosmosResources map[schema.GroupVersion][]string) InformerFactory { + if k8sClient == nil || kosmosClient == nil { + klog2.Fatal("Leaf client is nil, exit os !") + } + k8sInformerFactory := informers.NewSharedInformerFactory(k8sClient, DefaultResync) + kosmosInformerFactory := kosmosinformers.NewSharedInformerFactory(kosmosClient, DefaultResync) + return &informerFactory{ + k8sClient: k8sClient, + kosmosClient: kosmosClient, + k8sInformerFactory: k8sInformerFactory, + kosmosInformerFactory: kosmosInformerFactory, + k8sResources: k8sResources, + kosmosResources: kosmosResources} +} + +func (i *informerFactory) K8sInformerFactory() informers.SharedInformerFactory { + return i.k8sInformerFactory +} + +func (i *informerFactory) KosmosInformerFactory() kosmosinformers.SharedInformerFactory { + return i.kosmosInformerFactory +} + +// SyncCache blocks until all register informers' caches were synced +// or the stop channel gets closed. +func (i *informerFactory) SyncCache(stopCh <-chan struct{}) error { + discoveryClient := i.k8sClient.Discovery() + + if i.k8sResources != nil && len(i.k8sResources) != 0 { + registerFunc := func(resource schema.GroupVersionResource) (interface{}, error) { + return i.k8sInformerFactory.ForResource(resource) + } + if err := registerCacheInSharedInformerFactory(discoveryClient, registerFunc, i.k8sResources); err != nil { + return err + } + } + i.k8sInformerFactory.Start(stopCh) + i.k8sInformerFactory.WaitForCacheSync(stopCh) + + if i.kosmosResources != nil && len(i.kosmosResources) != 0 { + registerFunc := func(resource schema.GroupVersionResource) (interface{}, error) { + return i.kosmosInformerFactory.ForResource(resource) + } + if err := registerCacheInSharedInformerFactory(discoveryClient, registerFunc, i.kosmosResources); err != nil { + return err + } + } + + i.kosmosInformerFactory.Start(stopCh) + i.kosmosInformerFactory.WaitForCacheSync(stopCh) + + return nil +} + +// registerCacheInSharedInformerFactory is for register gvr to informer factory +func registerCacheInSharedInformerFactory(discoveryClient discovery.DiscoveryInterface, registerFunc func(resource schema.GroupVersionResource) (interface{}, error), gvrs map[schema.GroupVersion][]string) error { + for groupVersion, resourceNames := range gvrs { + apiResourceList, err := discoveryClient.ServerResourcesForGroupVersion(groupVersion.String()) + if err != nil { + klog2.Errorf("get %s ApiResource List error,", groupVersion.String(), err) + return err + } + for _, resourceName := range resourceNames { + if !apiResourceExists(apiResourceList.APIResources, resourceName) { + klog2.Errorf("resource %s not exists in the cluster", resourceName) + } else { + groupVersionResource := groupVersion.WithResource(resourceName) + if _, err = registerFunc(groupVersionResource); err != nil { + return err + } + } + } + } + return nil +} + +// apiResourceExists judge the current gvr is exist +func apiResourceExists(apiResources []v1.APIResource, resourceName string) bool { + for _, apiResource := range apiResources { + if apiResource.Name == resourceName { + return true + } + } + return false +}