Skip to content

Commit

Permalink
Merge pull request kosmos-io#434 from GreatLazyMan/clusterlink
Browse files Browse the repository at this point in the history
title: fix bug,some times clusternode created without podcidr
  • Loading branch information
duanmengkk authored Mar 18, 2024
2 parents e746b64 + 6471f2b commit 4488663
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
32 changes: 32 additions & 0 deletions pkg/clusterlink/controllers/nodecidr/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -16,6 +17,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils"
)
Expand Down Expand Up @@ -68,12 +70,14 @@ func (c *commonAdapter) start(stopCh <-chan struct{}) error {
informerFactory.WaitForCacheSync(stopCh)

c.sync = true
klog.Info("common informer started!")
return nil
}

func (c *commonAdapter) getCIDRByNodeName(nodeName string) ([]string, error) {
node, err := c.nodeLister.Get(nodeName)
if err != nil {
klog.Infof("get node %s error:%v", nodeName, err)
return nil, err
}

Expand Down Expand Up @@ -131,6 +135,7 @@ func NewCalicoAdapter(config *rest.Config,
func (c *calicoAdapter) start(stopCh <-chan struct{}) error {
client, err := dynamic.NewForConfig(c.config)
if err != nil {
klog.Errorf("init dynamic client err: %v", err)
return err
}
gvr := schema.GroupVersionResource{
Expand All @@ -146,6 +151,7 @@ func (c *calicoAdapter) start(stopCh <-chan struct{}) error {
DeleteFunc: c.OnDelete,
})
if err != nil {
klog.Errorf("add event handler error: %v", err)
return err
}

Expand All @@ -154,12 +160,14 @@ func (c *calicoAdapter) start(stopCh <-chan struct{}) error {
informerFactory.WaitForCacheSync(stopCh)

c.sync = true
klog.Info("calico blockaffinities informer started!")
return nil
}

func (c *calicoAdapter) getCIDRByNodeName(nodeName string) ([]string, error) {
blockAffinities, err := c.blockLister.List(labels.Everything())
if err != nil {
klog.Errorf("list blockAffinities error: %v", err)
return nil, err
}
var podCIDRS []string
Expand Down Expand Up @@ -192,6 +200,7 @@ func (c *calicoAdapter) synced() bool {
}

func (c *calicoAdapter) OnAdd(obj interface{}) {
klog.V(7).Info("add event")
runtimeObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return
Expand All @@ -203,11 +212,13 @@ func (c *calicoAdapter) OnAdd(obj interface{}) {
if !found {
return
}
klog.V(7).Info("add event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

// OnUpdate handles object update event and push the object to queue.
func (c *calicoAdapter) OnUpdate(oldObj, newObj interface{}) {
klog.V(7).Info("update event")
runtimeObj, ok := newObj.(*unstructured.Unstructured)
if !ok {
return
Expand All @@ -219,6 +230,7 @@ func (c *calicoAdapter) OnUpdate(oldObj, newObj interface{}) {
if !found {
return
}
klog.V(7).Info("update event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

Expand All @@ -241,18 +253,38 @@ func (c *calicoAdapter) OnDelete(obj interface{}) {
func requeue(originNodeName string, clusterNodeLister clusterlister.ClusterNodeLister, processor utils.AsyncWorker) {
clusterNodes, err := clusterNodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("list clusterNodes err: %v", err)
return
}

flag := false
for _, clusterNode := range clusterNodes {
if clusterNode.Spec.NodeName == originNodeName {
key, err := ClusterWideKeyFunc(clusterNode)
if err != nil {
klog.Errorf("make clusterNode as a reconsile key err: %v", err)
return
}

klog.V(7).Infof("key %s is enqueued!", originNodeName)
processor.Add(key)
flag = true
break
}
}
if !flag {
clusterNode := &clusterlinkv1alpha1.ClusterNode{
ObjectMeta: metav1.ObjectMeta{
Name: originNodeName,
},
}
key, err := ClusterWideKeyFunc(clusterNode)
if err != nil {
klog.Errorf("make clusterNode as a reconsile key err: %v", err)
return
}

klog.V(7).Infof("can't find match clusternode %s", originNodeName)
processor.Add(key)
}
}
58 changes: 55 additions & 3 deletions pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
informer "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"

clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
nodecontroller "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions"
clusterinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1"
Expand All @@ -35,6 +39,7 @@ type controller struct {
config *rest.Config

clusterLinkClient versioned.Interface
nodeLister lister.NodeLister

// RateLimiterOptions is the configuration for rate limiter which may significantly influence the performance of
// the controller.
Expand Down Expand Up @@ -97,14 +102,25 @@ func (c *controller) Start(ctx context.Context) error {
return err
}

client, err := kubernetes.NewForConfig(c.config)
if err != nil {
klog.Errorf("init kubernetes client err: %v", err)
return err
}

informerFactory := informer.NewSharedInformerFactory(client, 0)
c.nodeLister = informerFactory.Core().V1().Nodes().Lister()

stopCh := c.ctx.Done()
clusterInformerFactory.Start(stopCh)
clusterInformerFactory.WaitForCacheSync(stopCh)

// third step: init CNI Adapter
if cluster.Spec.ClusterLinkOptions.CNI == calicoCNI {
klog.Infof("cluster %s's cni is %s", c.clusterName, calicoCNI)
c.cniAdapter = NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor)
} else {
klog.Infof("cluster %s's cni is %s", c.clusterName, cluster.Spec.ClusterLinkOptions.CNI)
c.cniAdapter = NewCommonAdapter(c.config, c.clusterNodeLister, c.processor)
}
err = c.cniAdapter.start(stopCh)
Expand All @@ -129,10 +145,43 @@ func (c *controller) Reconcile(key utils.QueueKey) error {
clusterNode, err := c.clusterNodeLister.Get(clusterWideKey.Name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("Cluster Node %s has been removed.", clusterWideKey.NamespaceKey())
return nil
klog.Info("maybe clusterWideKey.Name is k8s node's name instead of clusternode's name,try to get node podCIDRs")
nodePodcidr, err := c.cniAdapter.getCIDRByNodeName(clusterWideKey.Name)
// get cluster node name by clustername and k8s node's name
clusterNodeName := nodecontroller.ClusterNodeName(c.clusterName, clusterWideKey.Name)
// if err is no nil, means get node error or list blockAffinities error
// do not reconsile
if err != nil {
klog.Errorf("get node %s's podCIDRs err: %v", clusterWideKey.Name, err)
return err
}
// we execute this Reconcile func due to some node cidr event, like blockaffinities is created
// so node podCIDRs should exist.If node podCIDRs is nil,maybe node is removed
if len(nodePodcidr) == 0 {
klog.Info("length of podCIDRs is 0 for node %s", clusterWideKey.Name)
_, err := c.nodeLister.Get(clusterWideKey.Name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("k8s node %s is not found, might be removed.", clusterWideKey.Name)
return nil
}
klog.Errorf("get node %s error:%v", clusterWideKey.Name, err)
c.processor.AddAfter(key, requeueTime)
return err
}
}
// If k8s node exist, clusternode must exist
clusterNode, err = c.clusterNodeLister.Get(clusterNodeName)
if err != nil {
klog.Infof("get clusternode %s err: %v", clusterNodeName, err)
c.processor.AddAfter(key, requeueTime)
return err
}
} else {
klog.Errorf("get clusternode %s err : %v", clusterWideKey.Name, err)
c.processor.AddAfter(key, requeueTime)
return err
}
return err
}

originNodeName := clusterNode.Spec.NodeName
Expand All @@ -151,6 +200,7 @@ func (c *controller) Reconcile(key utils.QueueKey) error {

podCIDRs, err := c.cniAdapter.getCIDRByNodeName(originNodeName)
if err != nil {
klog.Errorf("get node %s's podCIDRs err: %v", originNodeName, err)
return err
}

Expand All @@ -160,8 +210,10 @@ func (c *controller) Reconcile(key utils.QueueKey) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
_, err = c.clusterLinkClient.KosmosV1alpha1().ClusterNodes().Update(context.TODO(), clusterNodeCopy, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("update clusternode %s err: %v", clusterNodeCopy.Name, err)
return err
}
klog.Infof("update clusternode %s succcessfuly", clusterNodeCopy.Name)
return nil
})
}
Expand Down

0 comments on commit 4488663

Please sign in to comment.