Skip to content

Commit

Permalink
Merge pull request #274 from OrangeBao/fix_address
Browse files Browse the repository at this point in the history
feat: do not use podIP as the address of the node
  • Loading branch information
kosmos-robot authored Nov 22, 2023
2 parents 2aa0100 + f76f998 commit b863b39
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 13 deletions.
2 changes: 2 additions & 0 deletions deploy/clustertree-cluster-manager.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: PREFERRED-ADDRESS-TYPE
value: InternalDNS
volumeMounts:
- name: credentials
mountPath: "/etc/cluster-tree/cert"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,17 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
if node != nil {
clone.Labels = mergeMap(node.GetLabels(), clone.GetLabels())
clone.Annotations = mergeMap(node.GetAnnotations(), clone.GetAnnotations())
spec := corev1.NodeSpec{
Taints: rootNode.Spec.Taints,
}

clone.Spec = spec
// TODO @duanmengkk
// spec := corev1.NodeSpec{
// Taints: rootNode.Spec.Taints,
// }
clone.Spec.Taints = rootNode.Spec.Taints
clone.Status = node.Status
clone.Status.Addresses = leafUtils.GetAddress()
clone.Status.Addresses, err = leafUtils.GetAddress(ctx, c.RootClientset, node.Status.Addresses)
if err != nil {
klog.Errorf("GetAddress node %s, err: %v, ", rootNode.Name, err)
return reconcile.Result{}, err
}
}
}

Expand Down
26 changes: 23 additions & 3 deletions pkg/clustertree/cluster-manager/utils/leaf_model_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (h AggregationModelHandler) CreateNodeInRoot(ctx context.Context, cluster *
},
}

node.Status.Addresses = GetAddress()
// node.Status.Addresses = GetAddress()

node, err = h.RootClientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -99,7 +99,24 @@ func (h AggregationModelHandler) UpdateNodeStatus(ctx context.Context, n []*core
clone := node.DeepCopy()
clone.Status.Conditions = utils.NodeConditions()

nodeListInLeaf := &corev1.NodeList{}
err = h.LeafClient.List(ctx, nodeListInLeaf)
if err != nil {
return fmt.Errorf("cannot get node in leaf cluster while update node status err: %v", err)
}

if len(nodeListInLeaf.Items) == 0 {
return fmt.Errorf("cannot get node in leaf cluster while update node status, leaf node item is 0")
}

clone.Status.Addresses, err = GetAddress(ctx, h.RootClientset, nodeListInLeaf.Items[0].Status.Addresses)

if err != nil {
return err
}

patch, err := utils.CreateMergePatch(node, clone)

if err != nil {
return fmt.Errorf("cannot get node while update node status %s, err: %v", node.Name, err)
}
Expand Down Expand Up @@ -177,7 +194,7 @@ func (h DispersionModelHandler) CreateNodeInRoot(ctx context.Context, cluster *k
},
}

node.Status.Addresses = GetAddress()
// node.Status.Addresses = GetAddress()

node, err = h.RootClientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -214,7 +231,10 @@ func (h DispersionModelHandler) UpdateNodeStatus(ctx context.Context, n []*corev

rootCopy := nodeRoot.DeepCopy()
nodeRoot.Status = nodeInLeaf.Status
nodeRoot.Status.Addresses = GetAddress()
nodeRoot.Status.Addresses, err = GetAddress(ctx, h.RootClientset, nodeInLeaf.Status.Addresses)
if err != nil {
return err
}
nodeRoot.Status.Allocatable = rootCopy.Status.Allocatable
nodeRoot.Status.Capacity = rootCopy.Status.Capacity

Expand Down
81 changes: 77 additions & 4 deletions pkg/clustertree/cluster-manager/utils/rootcluster.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package utils

import (
"context"
"fmt"
"os"
"sort"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils"
)

const (
Expand All @@ -22,9 +28,76 @@ func IsRootCluster(cluster *kosmosv1alpha1.Cluster) bool {
return false
}

func GetAddress() []corev1.NodeAddress {
address := []corev1.NodeAddress{
{Type: corev1.NodeInternalIP, Address: os.Getenv("KNODE_POD_IP")},
func GetAddress(ctx context.Context, rootClient kubernetes.Interface, originAddress []corev1.NodeAddress) ([]corev1.NodeAddress, error) {
preferredAddressType := corev1.NodeAddressType(os.Getenv("PREFERRED-ADDRESS-TYPE"))

if len(preferredAddressType) == 0 {
preferredAddressType = corev1.NodeInternalDNS
}

prefixAddress := []corev1.NodeAddress{
{Type: preferredAddressType, Address: os.Getenv("KNODE_POD_IP")},
}

address, err := SortAddress(ctx, rootClient, originAddress)

if err != nil {
return nil, err
}

return append(prefixAddress, address...), nil
}

func SortAddress(ctx context.Context, rootClient kubernetes.Interface, originAddress []corev1.NodeAddress) ([]corev1.NodeAddress, error) {
rootnodes, err := rootClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("create node failed, cannot get node from root cluster, err: %v", err)
}
return address

if len(rootnodes.Items) == 0 {
return nil, fmt.Errorf("create node failed, cannot get node from root cluster, len of leafnodes is 0")
}

isIPv4First := true
for _, addr := range rootnodes.Items[0].Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
if utils.IsIPv6(addr.Address) {
isIPv4First = false
}
break
}
}

address := []corev1.NodeAddress{}
otherAddress := []corev1.NodeAddress{}

for _, addr := range originAddress {
if addr.Type == corev1.NodeInternalIP {
address = append(address, corev1.NodeAddress{Type: corev1.NodeInternalIP, Address: addr.Address})
} else {
otherAddress = append(otherAddress, addr)
}
}

sort.Slice(address, func(i, j int) bool {
if isIPv4First {
if !utils.IsIPv6(address[i].Address) && utils.IsIPv6(address[j].Address) {
return true
}
if utils.IsIPv6(address[i].Address) && !utils.IsIPv6(address[j].Address) {
return false
}
return true
} else {
if !utils.IsIPv6(address[i].Address) && utils.IsIPv6(address[j].Address) {
return false
}
if utils.IsIPv6(address[i].Address) && !utils.IsIPv6(address[j].Address) {
return true
}
return true
}
})

return append(address, otherAddress...), nil
}

0 comments on commit b863b39

Please sign in to comment.