diff --git a/hack/k8s-in-k8s/kubelet_node_helper.sh b/hack/k8s-in-k8s/kubelet_node_helper.sh index 65180e719..c6d7c5898 100755 --- a/hack/k8s-in-k8s/kubelet_node_helper.sh +++ b/hack/k8s-in-k8s/kubelet_node_helper.sh @@ -8,6 +8,7 @@ LOG_NAME=${2:-kubelet} JOIN_HOST=$2 JOIN_TOKEN=$3 JOIN_CA_HASH=$4 +CHECK_PORT=$2 function unjoin() { # before unjoin, you need delete node by kubectl @@ -232,6 +233,13 @@ function version() { echo "$SCRIPT_VERSION" } +function port() { + echo "check port(1/1): use netstat check port :$CHECK_PORT" + output=$(netstat -ant | awk '{print $4}' | grep ":$CHECK_PORT" | wc -l) + # Do not modify the fixed format + echo "port:$CHECK_PORT/$output" +} + # See how we were called. case "$1" in unjoin) @@ -240,6 +248,9 @@ case "$1" in join) join ;; + port) + port + ;; health) health ;; @@ -256,6 +267,6 @@ case "$1" in version ;; *) - echo $"usage: $0 unjoin|join|health|log|check|version|revert" + echo $"usage: $0 unjoin|join|health|log|check|version|revert|port" exit 1 esac \ No newline at end of file diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index ff0132777..11c91605b 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -5,11 +5,13 @@ import ( "encoding/base64" "fmt" "sort" + "strings" "sync" "time" "github.com/pkg/errors" "gopkg.in/yaml.v3" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -35,7 +37,10 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" "github.com/kosmos.io/kosmos/pkg/kubenest/util" + apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" ) type VirtualClusterInitController struct { @@ -632,12 +637,13 @@ func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataK return &hostPool, nil } -func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { +// Return false to indicate that the port is not occupied +func (c *VirtualClusterInitController) isPortAllocated(port int32, hostAddress []string) bool { vcList := &v1alpha1.VirtualClusterList{} err := c.List(context.Background(), vcList) if err != nil { klog.Errorf("list virtual cluster error: %v", err) - return false + return true } for _, vc := range vcList.Items { @@ -655,7 +661,81 @@ func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { } } - return false + ret, err := checkPortOnHostWithAddresses(port, hostAddress) + if err != nil { + klog.Errorf("check port on host error: %v", err) + return true + } + return ret +} + +// Return false to indicate that the port is not occupied +func checkPortOnHostWithAddresses(port int32, hostAddress []string) (bool, error) { + for _, addr := range hostAddress { + flag, err := CheckPortOnHost(addr, port) + if err != nil { + return false, err + } + if flag { + return true, nil + } + } + return false, nil +} + +func findAddress(node corev1.Node) (string, error) { + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + return addr.Address, nil + } + } + return "", fmt.Errorf("cannot find internal IP address in node addresses, node name: %s", node.GetName()) +} + +// Return false to indicate that the port is not occupied +func CheckPortOnHost(addr string, port int32) (bool, error) { + hostExectorHelper := exector.NewExectorHelper(addr, "") + joinCmdStrCmd := &exector.CMDExector{ + Cmd: fmt.Sprintf("bash %s port %d", env.GetExectorShellName(), port), + } + + var ret *exector.ExectorReturn + err := apiclient.TryRunCommand(func() error { + ret = hostExectorHelper.DoExector(context.TODO().Done(), joinCmdStrCmd) + if ret.Status != exector.SUCCESS { + return fmt.Errorf("chekc port failed, err: %s", ret.String()) + } + return nil + }, 3) + + if err != nil { + klog.Errorf("check port on host error! addr:%s, port %d, err: %s", addr, port, err.Error()) + return true, err + } + klog.V(4).Infof("check port on host, addr: %s, port %d, result: %s", addr, port, ret.String()) + + return !strings.HasSuffix(ret.LastLog, fmt.Sprintf("port:%d/0", port)), nil +} + +func (c *VirtualClusterInitController) findHostAddresses() ([]string, error) { + nodes, err := c.RootClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: env.GetControlPlaneLabel(), + }) + if err != nil { + return nil, err + } + + ret := []string{} + + for _, node := range nodes.Items { + addr, err := findAddress(node) + if err != nil { + return nil, err + } + + ret = append(ret, addr) + } + return ret, nil } // AllocateHostPort allocate host port for virtual cluster @@ -670,11 +750,20 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 if err != nil { return 0, err } + + hostAddress, err := c.findHostAddresses() + if err != nil { + return 0, err + } + ports := func() []int32 { ports := make([]int32, 0) for _, p := range hostPool.PortsPool { - if !c.isPortAllocated(p) { + if !c.isPortAllocated(p, hostAddress) { ports = append(ports, p) + if len(ports) > constants.VirtualClusterPortNum { + break + } } } return ports