Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: fix bug,some times clusternode created without podcidr #446

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pkg/clusterlink/controllers/nodecidr/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -16,6 +17,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils"
)
Expand Down Expand Up @@ -68,12 +70,14 @@ func (c *commonAdapter) start(stopCh <-chan struct{}) error {
informerFactory.WaitForCacheSync(stopCh)

c.sync = true
klog.Info("common informer started!")
return nil
}

func (c *commonAdapter) getCIDRByNodeName(nodeName string) ([]string, error) {
node, err := c.nodeLister.Get(nodeName)
if err != nil {
klog.Infof("get node %s error:%v", nodeName, err)
return nil, err
}

Expand Down Expand Up @@ -131,6 +135,7 @@ func NewCalicoAdapter(config *rest.Config,
func (c *calicoAdapter) start(stopCh <-chan struct{}) error {
client, err := dynamic.NewForConfig(c.config)
if err != nil {
klog.Errorf("init dynamic client err: %v", err)
return err
}
gvr := schema.GroupVersionResource{
Expand All @@ -146,6 +151,7 @@ func (c *calicoAdapter) start(stopCh <-chan struct{}) error {
DeleteFunc: c.OnDelete,
})
if err != nil {
klog.Errorf("add event handler error: %v", err)
return err
}

Expand All @@ -154,12 +160,14 @@ func (c *calicoAdapter) start(stopCh <-chan struct{}) error {
informerFactory.WaitForCacheSync(stopCh)

c.sync = true
klog.Info("calico blockaffinities informer started!")
return nil
}

func (c *calicoAdapter) getCIDRByNodeName(nodeName string) ([]string, error) {
blockAffinities, err := c.blockLister.List(labels.Everything())
if err != nil {
klog.Errorf("list blockAffinities error: %v", err)
return nil, err
}
var podCIDRS []string
Expand Down Expand Up @@ -192,6 +200,7 @@ func (c *calicoAdapter) synced() bool {
}

func (c *calicoAdapter) OnAdd(obj interface{}) {
klog.V(7).Info("add event")
runtimeObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return
Expand All @@ -203,11 +212,13 @@ func (c *calicoAdapter) OnAdd(obj interface{}) {
if !found {
return
}
klog.V(7).Info("add event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

// OnUpdate handles object update event and push the object to queue.
func (c *calicoAdapter) OnUpdate(oldObj, newObj interface{}) {
klog.V(7).Info("update event")
runtimeObj, ok := newObj.(*unstructured.Unstructured)
if !ok {
return
Expand All @@ -219,6 +230,7 @@ func (c *calicoAdapter) OnUpdate(oldObj, newObj interface{}) {
if !found {
return
}
klog.V(7).Info("update event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

Expand All @@ -241,18 +253,38 @@ func (c *calicoAdapter) OnDelete(obj interface{}) {
func requeue(originNodeName string, clusterNodeLister clusterlister.ClusterNodeLister, processor utils.AsyncWorker) {
clusterNodes, err := clusterNodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("list clusterNodes err: %v", err)
return
}

flag := false
for _, clusterNode := range clusterNodes {
if clusterNode.Spec.NodeName == originNodeName {
key, err := ClusterWideKeyFunc(clusterNode)
if err != nil {
klog.Errorf("make clusterNode as a reconsile key err: %v", err)
return
}

klog.V(7).Infof("key %s is enqueued!", originNodeName)
processor.Add(key)
flag = true
break
}
}
if !flag {
clusterNode := &clusterlinkv1alpha1.ClusterNode{
ObjectMeta: metav1.ObjectMeta{
Name: originNodeName,
},
}
key, err := ClusterWideKeyFunc(clusterNode)
if err != nil {
klog.Errorf("make clusterNode as a reconsile key err: %v", err)
return
}

klog.V(7).Infof("can't find match clusternode %s", originNodeName)
processor.Add(key)
}
}
58 changes: 55 additions & 3 deletions pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
informer "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"

clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
nodecontroller "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions"
clusterinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1"
Expand All @@ -35,6 +39,7 @@ type controller struct {
config *rest.Config

clusterLinkClient versioned.Interface
nodeLister lister.NodeLister

// RateLimiterOptions is the configuration for rate limiter which may significantly influence the performance of
// the controller.
Expand Down Expand Up @@ -97,14 +102,25 @@ func (c *controller) Start(ctx context.Context) error {
return err
}

client, err := kubernetes.NewForConfig(c.config)
if err != nil {
klog.Errorf("init kubernetes client err: %v", err)
return err
}

informerFactory := informer.NewSharedInformerFactory(client, 0)
c.nodeLister = informerFactory.Core().V1().Nodes().Lister()

stopCh := c.ctx.Done()
clusterInformerFactory.Start(stopCh)
clusterInformerFactory.WaitForCacheSync(stopCh)

// third step: init CNI Adapter
if cluster.Spec.ClusterLinkOptions.CNI == calicoCNI {
klog.Infof("cluster %s's cni is %s", c.clusterName, calicoCNI)
c.cniAdapter = NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor)
} else {
klog.Infof("cluster %s's cni is %s", c.clusterName, cluster.Spec.ClusterLinkOptions.CNI)
c.cniAdapter = NewCommonAdapter(c.config, c.clusterNodeLister, c.processor)
}
err = c.cniAdapter.start(stopCh)
Expand All @@ -129,10 +145,43 @@ func (c *controller) Reconcile(key utils.QueueKey) error {
clusterNode, err := c.clusterNodeLister.Get(clusterWideKey.Name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("Cluster Node %s has been removed.", clusterWideKey.NamespaceKey())
return nil
klog.Info("maybe clusterWideKey.Name is k8s node's name instead of clusternode's name,try to get node podCIDRs")
nodePodcidr, err := c.cniAdapter.getCIDRByNodeName(clusterWideKey.Name)
// get cluster node name by clustername and k8s node's name
clusterNodeName := nodecontroller.ClusterNodeName(c.clusterName, clusterWideKey.Name)
// if err is no nil, means get node error or list blockAffinities error
// do not reconsile
if err != nil {
klog.Errorf("get node %s's podCIDRs err: %v", clusterWideKey.Name, err)
return err
}
// we execute this Reconcile func due to some node cidr event, like blockaffinities is created
// so node podCIDRs should exist.If node podCIDRs is nil,maybe node is removed
if len(nodePodcidr) == 0 {
klog.Info("length of podCIDRs is 0 for node %s", clusterWideKey.Name)
_, err := c.nodeLister.Get(clusterWideKey.Name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("k8s node %s is not found, might be removed.", clusterWideKey.Name)
return nil
}
klog.Errorf("get node %s error:%v", clusterWideKey.Name, err)
c.processor.AddAfter(key, requeueTime)
return err
}
}
// If k8s node exist, clusternode must exist
clusterNode, err = c.clusterNodeLister.Get(clusterNodeName)
if err != nil {
klog.Infof("get clusternode %s err: %v", clusterNodeName, err)
c.processor.AddAfter(key, requeueTime)
return err
}
} else {
klog.Errorf("get clusternode %s err : %v", clusterWideKey.Name, err)
c.processor.AddAfter(key, requeueTime)
return err
}
return err
}

originNodeName := clusterNode.Spec.NodeName
Expand All @@ -151,6 +200,7 @@ func (c *controller) Reconcile(key utils.QueueKey) error {

podCIDRs, err := c.cniAdapter.getCIDRByNodeName(originNodeName)
if err != nil {
klog.Errorf("get node %s's podCIDRs err: %v", originNodeName, err)
return err
}

Expand All @@ -160,8 +210,10 @@ func (c *controller) Reconcile(key utils.QueueKey) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
_, err = c.clusterLinkClient.KosmosV1alpha1().ClusterNodes().Update(context.TODO(), clusterNodeCopy, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("update clusternode %s err: %v", clusterNodeCopy.Name, err)
return err
}
klog.Infof("update clusternode %s succcessfuly", clusterNodeCopy.Name)
return nil
})
}
Expand Down
Loading