Skip to content

Commit

Permalink
Merge pull request #655 from OrangeBao/main
Browse files Browse the repository at this point in the history
Cherry-Pick: check if the host is occupying the port
  • Loading branch information
duanmengkk authored Jul 22, 2024
2 parents 5a58120 + 06e0cd2 commit 598e3aa
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
13 changes: 12 additions & 1 deletion hack/k8s-in-k8s/kubelet_node_helper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ LOG_NAME=${2:-kubelet}
JOIN_HOST=$2
JOIN_TOKEN=$3
JOIN_CA_HASH=$4
CHECK_PORT=$2

function unjoin() {
# before unjoin, you need delete node by kubectl
Expand Down Expand Up @@ -330,6 +331,13 @@ function version() {
echo "$SCRIPT_VERSION"
}

function port() {
echo "check port(1/1): use netstat check port :$CHECK_PORT"
output=$(netstat -ant | awk '{print $4}' | grep ":$CHECK_PORT" | wc -l)
# Do not modify the fixed format
echo "port:$CHECK_PORT/$output"
}

# See how we were called.
case "$1" in
unjoin)
Expand All @@ -338,6 +346,9 @@ case "$1" in
join)
join
;;
port)
port
;;
health)
health
;;
Expand All @@ -354,6 +365,6 @@ case "$1" in
version
;;
*)
echo $"usage: $0 unjoin|join|health|log|check|version|revert"
echo $"usage: $0 unjoin|join|health|log|check|version|revert|port"
exit 1
esac
97 changes: 93 additions & 4 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"encoding/base64"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -34,7 +36,10 @@ import (
"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client"
)

type VirtualClusterInitController struct {
Expand Down Expand Up @@ -631,12 +636,13 @@ func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataK
return &hostPool, nil
}

func (c *VirtualClusterInitController) isPortAllocated(port int32) bool {
// Return false to indicate that the port is not occupied
func (c *VirtualClusterInitController) isPortAllocated(port int32, hostAddress []string) bool {
vcList := &v1alpha1.VirtualClusterList{}
err := c.List(context.Background(), vcList)
if err != nil {
klog.Errorf("list virtual cluster error: %v", err)
return false
return true
}

for _, vc := range vcList.Items {
Expand All @@ -654,7 +660,81 @@ func (c *VirtualClusterInitController) isPortAllocated(port int32) bool {
}
}

return false
ret, err := checkPortOnHostWithAddresses(port, hostAddress)
if err != nil {
klog.Errorf("check port on host error: %v", err)
return true
}
return ret
}

// Return false to indicate that the port is not occupied
func checkPortOnHostWithAddresses(port int32, hostAddress []string) (bool, error) {
for _, addr := range hostAddress {
flag, err := CheckPortOnHost(addr, port)
if err != nil {
return false, err
}
if flag {
return true, nil
}
}
return false, nil
}

func findAddress(node corev1.Node) (string, error) {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
return addr.Address, nil
}
}
return "", fmt.Errorf("cannot find internal IP address in node addresses, node name: %s", node.GetName())
}

// Return false to indicate that the port is not occupied
func CheckPortOnHost(addr string, port int32) (bool, error) {
hostExectorHelper := exector.NewExectorHelper(addr, "")
joinCmdStrCmd := &exector.CMDExector{
Cmd: fmt.Sprintf("bash %s port %d", env.GetExectorShellName(), port),
}

var ret *exector.ExectorReturn
err := apiclient.TryRunCommand(func() error {
ret = hostExectorHelper.DoExector(context.TODO().Done(), joinCmdStrCmd)
if ret.Status != exector.SUCCESS {
return fmt.Errorf("chekc port failed, err: %s", ret.String())
}
return nil
}, 3)

if err != nil {
klog.Errorf("check port on host error! addr:%s, port %d, err: %s", addr, port, err.Error())
return true, err
}
klog.V(4).Infof("check port on host, addr: %s, port %d, result: %s", addr, port, ret.String())

return !strings.HasSuffix(ret.LastLog, fmt.Sprintf("port:%d/0", port)), nil
}

func (c *VirtualClusterInitController) findHostAddresses() ([]string, error) {
nodes, err := c.RootClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: env.GetControlPlaneLabel(),
})
if err != nil {
return nil, err
}

ret := []string{}

for _, node := range nodes.Items {
addr, err := findAddress(node)
if err != nil {
return nil, err
}

ret = append(ret, addr)
}
return ret, nil
}

// AllocateHostPort allocate host port for virtual cluster
Expand All @@ -669,11 +749,20 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1
if err != nil {
return 0, err
}

hostAddress, err := c.findHostAddresses()
if err != nil {
return 0, err
}

ports := func() []int32 {
ports := make([]int32, 0)
for _, p := range hostPool.PortsPool {
if !c.isPortAllocated(p) {
if !c.isPortAllocated(p, hostAddress) {
ports = append(ports, p)
if len(ports) > constants.VirtualClusterPortNum {
break
}
}
}
return ports
Expand Down

0 comments on commit 598e3aa

Please sign in to comment.