Skip to content

Commit

Permalink
feat: add address for knode
Browse files Browse the repository at this point in the history
Signed-off-by: OrangeBao <[email protected]>
  • Loading branch information
OrangeBao committed Nov 12, 2023
1 parent bafe53b commit 2778c01
Showing 1 changed file with 100 additions and 0 deletions.
100 changes: 100 additions & 0 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -324,6 +325,99 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, nodes [
return nil
}

func (c *ClusterController) sortAddress(ctx context.Context, nodeName string, leafClient kubernetes.Interface, originAddress []corev1.NodeAddress) ([]corev1.NodeAddress, error) {
rootnodes, err := c.RootClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
klog.Errorf("create node %s failed, cannot get node from root cluster, err: %v", nodeName, err)
return nil, err
}

if len(rootnodes.Items) == 0 {
klog.Errorf("create node %s failed, cannot get node from root cluster, len of leafnodes is 0", nodeName)
return nil, err
}

isIPv4First := true
for _, addr := range rootnodes.Items[0].Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
if utils.IsIPv6(addr.Address) {
isIPv4First = false
}
break
}
}

address := []corev1.NodeAddress{}

for _, addr := range originAddress {
if addr.Type == corev1.NodeInternalIP {
address = append(address, corev1.NodeAddress{Type: corev1.NodeInternalIP, Address: addr.Address})
}
}

sort.Slice(address, func(i, j int) bool {
if isIPv4First {
if !utils.IsIPv6(address[i].Address) && utils.IsIPv6(address[j].Address) {
return true
}
if utils.IsIPv6(address[i].Address) && !utils.IsIPv6(address[j].Address) {
return false
}
return true
} else {
if !utils.IsIPv6(address[i].Address) && utils.IsIPv6(address[j].Address) {
return false
}
if utils.IsIPv6(address[i].Address) && !utils.IsIPv6(address[j].Address) {
return true
}
return true
}
})

return address, nil
}

func (c *ClusterController) setNodeStatus(ctx context.Context, nodeName string, leafClient kubernetes.Interface, node *corev1.Node, isNode2Node bool) error {
if isNode2Node {
if leafnode, err := leafClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}); err != nil {
klog.Errorf("create node %s failed, cannot get node from leaf cluster, err: %v", nodeName, err)
return err
} else {
node.Status = leafnode.Status
address, err := c.sortAddress(ctx, nodeName, leafClient, node.Status.Addresses)
if err != nil {
return err
}
node.Status.Addresses = address
return nil
}
}

leafnodes, err := leafClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
// TODO: LabelSelector
})
if err != nil {
klog.Errorf("create node %s failed, cannot get node from leaf cluster, err: %v", nodeName, err)
return err
}

if len(leafnodes.Items) == 0 {
klog.Errorf("create node %s failed, cannot get node from leaf cluster, len of leafnodes is 0", nodeName)
return err
}

address, err := c.sortAddress(ctx, nodeName, leafClient, leafnodes.Items[0].Status.Addresses)

if err != nil {
return err
}

node.Status.Addresses = address

return nil
}

func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, error) {
getNodeLen := func(cluster *kosmosv1alpha1.Cluster) int32 {
if cluster.Spec.ClusterTreeOptions.Enable {
Expand Down Expand Up @@ -351,12 +445,18 @@ func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alp
nodeAnnotations[utils.KosmosNodeOwnedByClusterAnnotations] = clusterName
node.SetAnnotations(nodeAnnotations)
}

if err := c.setNodeStatus(ctx, nodeName, leafClient, node, isNode2Node); err != nil {
return nil, err
}

node.Status.NodeInfo.KubeletVersion = serverVersion.GitVersion
node.Status.DaemonEndpoints = corev1.NodeDaemonEndpoints{
KubeletEndpoint: corev1.DaemonEndpoint{
Port: c.Options.ListenPort,
},
}

node, err = c.RootClient.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil {
if !errors.IsAlreadyExists(err) {
Expand Down

0 comments on commit 2778c01

Please sign in to comment.