diff --git a/pkg/clusterlink/controllers/nodecidr/adapter.go b/pkg/clusterlink/controllers/nodecidr/adapter.go index 20c2c07b3..0b45f447d 100644 --- a/pkg/clusterlink/controllers/nodecidr/adapter.go +++ b/pkg/clusterlink/controllers/nodecidr/adapter.go @@ -4,6 +4,7 @@ import ( "strings" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -16,6 +17,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils" ) @@ -68,12 +70,14 @@ func (c *commonAdapter) start(stopCh <-chan struct{}) error { informerFactory.WaitForCacheSync(stopCh) c.sync = true + klog.Info("common informer started!") return nil } func (c *commonAdapter) getCIDRByNodeName(nodeName string) ([]string, error) { node, err := c.nodeLister.Get(nodeName) if err != nil { + klog.Infof("get node %s error:%v", nodeName, err) return nil, err } @@ -131,6 +135,7 @@ func NewCalicoAdapter(config *rest.Config, func (c *calicoAdapter) start(stopCh <-chan struct{}) error { client, err := dynamic.NewForConfig(c.config) if err != nil { + klog.Errorf("init dynamic client err: %v", err) return err } gvr := schema.GroupVersionResource{ @@ -146,6 +151,7 @@ func (c *calicoAdapter) start(stopCh <-chan struct{}) error { DeleteFunc: c.OnDelete, }) if err != nil { + klog.Errorf("add event handler error: %v", err) return err } @@ -154,12 +160,14 @@ func (c *calicoAdapter) start(stopCh <-chan struct{}) error { informerFactory.WaitForCacheSync(stopCh) c.sync = true + klog.Info("calico blockaffinities informer started!") return nil } func (c *calicoAdapter) getCIDRByNodeName(nodeName string) ([]string, error) { blockAffinities, err := c.blockLister.List(labels.Everything()) if err != nil { + klog.Errorf("list blockAffinities error: %v", err) return nil, err } var podCIDRS []string @@ -192,6 +200,7 @@ func (c *calicoAdapter) synced() bool { } func (c *calicoAdapter) OnAdd(obj interface{}) { + klog.V(7).Info("add event") runtimeObj, ok := obj.(*unstructured.Unstructured) if !ok { return @@ -203,11 +212,13 @@ func (c *calicoAdapter) OnAdd(obj interface{}) { if !found { return } + klog.V(7).Info("add event Enqueue") requeue(node, c.clusterNodeLister, c.processor) } // OnUpdate handles object update event and push the object to queue. func (c *calicoAdapter) OnUpdate(oldObj, newObj interface{}) { + klog.V(7).Info("update event") runtimeObj, ok := newObj.(*unstructured.Unstructured) if !ok { return @@ -219,6 +230,7 @@ func (c *calicoAdapter) OnUpdate(oldObj, newObj interface{}) { if !found { return } + klog.V(7).Info("update event Enqueue") requeue(node, c.clusterNodeLister, c.processor) } @@ -241,18 +253,38 @@ func (c *calicoAdapter) OnDelete(obj interface{}) { func requeue(originNodeName string, clusterNodeLister clusterlister.ClusterNodeLister, processor utils.AsyncWorker) { clusterNodes, err := clusterNodeLister.List(labels.Everything()) if err != nil { + klog.Errorf("list clusterNodes err: %v", err) return } + flag := false for _, clusterNode := range clusterNodes { if clusterNode.Spec.NodeName == originNodeName { key, err := ClusterWideKeyFunc(clusterNode) if err != nil { + klog.Errorf("make clusterNode as a reconsile key err: %v", err) return } + klog.V(7).Infof("key %s is enqueued!", originNodeName) processor.Add(key) + flag = true break } } + if !flag { + clusterNode := &clusterlinkv1alpha1.ClusterNode{ + ObjectMeta: metav1.ObjectMeta{ + Name: originNodeName, + }, + } + key, err := ClusterWideKeyFunc(clusterNode) + if err != nil { + klog.Errorf("make clusterNode as a reconsile key err: %v", err) + return + } + + klog.V(7).Infof("can't find match clusternode %s", originNodeName) + processor.Add(key) + } } diff --git a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go index d529040ce..f564befbe 100644 --- a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go +++ b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go @@ -9,12 +9,16 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + informer "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + lister "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + nodecontroller "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" clusterinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1" @@ -35,6 +39,7 @@ type controller struct { config *rest.Config clusterLinkClient versioned.Interface + nodeLister lister.NodeLister // RateLimiterOptions is the configuration for rate limiter which may significantly influence the performance of // the controller. @@ -97,14 +102,25 @@ func (c *controller) Start(ctx context.Context) error { return err } + client, err := kubernetes.NewForConfig(c.config) + if err != nil { + klog.Errorf("init kubernetes client err: %v", err) + return err + } + + informerFactory := informer.NewSharedInformerFactory(client, 0) + c.nodeLister = informerFactory.Core().V1().Nodes().Lister() + stopCh := c.ctx.Done() clusterInformerFactory.Start(stopCh) clusterInformerFactory.WaitForCacheSync(stopCh) // third step: init CNI Adapter if cluster.Spec.ClusterLinkOptions.CNI == calicoCNI { + klog.Infof("cluster %s's cni is %s", c.clusterName, calicoCNI) c.cniAdapter = NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor) } else { + klog.Infof("cluster %s's cni is %s", c.clusterName, cluster.Spec.ClusterLinkOptions.CNI) c.cniAdapter = NewCommonAdapter(c.config, c.clusterNodeLister, c.processor) } err = c.cniAdapter.start(stopCh) @@ -129,10 +145,43 @@ func (c *controller) Reconcile(key utils.QueueKey) error { clusterNode, err := c.clusterNodeLister.Get(clusterWideKey.Name) if err != nil { if apierrors.IsNotFound(err) { - klog.Infof("Cluster Node %s has been removed.", clusterWideKey.NamespaceKey()) - return nil + klog.Info("maybe clusterWideKey.Name is k8s node's name instead of clusternode's name,try to get node podCIDRs") + nodePodcidr, err := c.cniAdapter.getCIDRByNodeName(clusterWideKey.Name) + // get cluster node name by clustername and k8s node's name + clusterNodeName := nodecontroller.ClusterNodeName(c.clusterName, clusterWideKey.Name) + // if err is no nil, means get node error or list blockAffinities error + // do not reconsile + if err != nil { + klog.Errorf("get node %s's podCIDRs err: %v", clusterWideKey.Name, err) + return err + } + // we execute this Reconcile func due to some node cidr event, like blockaffinities is created + // so node podCIDRs should exist.If node podCIDRs is nil,maybe node is removed + if len(nodePodcidr) == 0 { + klog.Info("length of podCIDRs is 0 for node %s", clusterWideKey.Name) + _, err := c.nodeLister.Get(clusterWideKey.Name) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("k8s node %s is not found, might be removed.", clusterWideKey.Name) + return nil + } + klog.Errorf("get node %s error:%v", clusterWideKey.Name, err) + c.processor.AddAfter(key, requeueTime) + return err + } + } + // If k8s node exist, clusternode must exist + clusterNode, err = c.clusterNodeLister.Get(clusterNodeName) + if err != nil { + klog.Infof("get clusternode %s err: %v", clusterNodeName, err) + c.processor.AddAfter(key, requeueTime) + return err + } + } else { + klog.Errorf("get clusternode %s err : %v", clusterWideKey.Name, err) + c.processor.AddAfter(key, requeueTime) + return err } - return err } originNodeName := clusterNode.Spec.NodeName @@ -151,6 +200,7 @@ func (c *controller) Reconcile(key utils.QueueKey) error { podCIDRs, err := c.cniAdapter.getCIDRByNodeName(originNodeName) if err != nil { + klog.Errorf("get node %s's podCIDRs err: %v", originNodeName, err) return err } @@ -160,8 +210,10 @@ func (c *controller) Reconcile(key utils.QueueKey) error { return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { _, err = c.clusterLinkClient.KosmosV1alpha1().ClusterNodes().Update(context.TODO(), clusterNodeCopy, metav1.UpdateOptions{}) if err != nil { + klog.Errorf("update clusternode %s err: %v", clusterNodeCopy.Name, err) return err } + klog.Infof("update clusternode %s succcessfuly", clusterNodeCopy.Name) return nil }) }