Skip to content

Commit

Permalink
fix: network connectivity speed
Browse files Browse the repository at this point in the history
Signed-off-by: renxiangyu <[email protected]>
  • Loading branch information
renxiangyu committed Jan 9, 2024
1 parent 88a3efe commit 4b34d56
Showing 1 changed file with 110 additions and 44 deletions.
154 changes: 110 additions & 44 deletions pkg/kosmosctl/floater/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strconv"
"sync"

"github.com/olekukonko/tablewriter"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -45,6 +46,11 @@ type CommandCheckOptions struct {

SrcFloater *Floater
DstFloater *Floater

routinesMaxNum int
routineInfoChan chan routineInfo
waitGroup sync.WaitGroup
resultDataChan chan *PrintCheckData
}

type PrintCheckData struct {
Expand All @@ -54,6 +60,12 @@ type PrintCheckData struct {
TargetIP string
}

type routineInfo struct {
IInfo *FloatInfo
JInfo *FloatInfo
routineIp string
}

func NewCmdCheck() *cobra.Command {
o := &CommandCheckOptions{
Version: version.GetReleaseVersion().PatchRelease(),
Expand Down Expand Up @@ -92,6 +104,11 @@ func NewCmdCheck() *cobra.Command {
flags.StringVar(&o.Port, "port", "8889", "Port used by floater.")
flags.IntVarP(&o.PodWaitTime, "pod-wait-time", "w", 30, "Time for wait pod(floater) launch.")
flags.StringVar(&o.Protocol, "protocol", string(TCP), "Protocol for the network problem.")
flags.IntVarP(&o.routinesMaxNum, "routines-max-number", "", 5, "Number of goroutines to use.")

o.routineInfoChan = make(chan routineInfo, o.routinesMaxNum)
o.waitGroup = sync.WaitGroup{}
o.resultDataChan = make(chan *PrintCheckData, o.routinesMaxNum)

return cmd
}
Expand Down Expand Up @@ -197,63 +214,112 @@ func (o *CommandCheckOptions) Run() error {
}

func (o *CommandCheckOptions) RunRange(iPodInfos []*FloatInfo, jPodInfos []*FloatInfo) []*PrintCheckData {
var resultData []*PrintCheckData
var resultDatas []*PrintCheckData

if len(iPodInfos) > 0 && len(jPodInfos) > 0 {
for _, iPodInfo := range iPodInfos {
for _, jPodInfo := range jPodInfos {
for _, ip := range jPodInfo.PodIPs {
var targetIP string
var err error
var cmdResult *command.Result
if o.DstFloater != nil {
targetIP, err = netmap.NetMap(ip, o.DstFloater.CIDRsMap)
} else {
targetIP = ip
}
if err != nil {
cmdResult = command.ParseError(err)
} else {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: targetIP,
}
cmdResult = o.SrcFloater.CommandExec(iPodInfo, cmdObj)
}
resultData = append(resultData, &PrintCheckData{
*cmdResult,
iPodInfo.NodeName, jPodInfo.NodeName, targetIP,
})
}
go o.toRoutineInfoChan(iPodInfos, jPodInfos)
}

for i := 0; i < o.routinesMaxNum; i++ {
o.waitGroup.Add(1)
go o.checkRange()
}

go func() {
o.waitGroup.Wait()
close(o.resultDataChan)
}()

for resultData := range o.resultDataChan {
resultDatas = append(resultDatas, resultData)
}
return resultDatas
}

func (o *CommandCheckOptions) checkRange() {
defer o.waitGroup.Done()
for routineInfo := range o.routineInfoChan {
var targetIP string
var err error
var cmdResult *command.Result
if o.DstFloater != nil {
targetIP, err = netmap.NetMap(routineInfo.routineIp, o.DstFloater.CIDRsMap)
} else {
targetIP = routineInfo.routineIp
}
if err != nil {
cmdResult = command.ParseError(err)
} else {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: targetIP,
}
cmdResult = o.SrcFloater.CommandExec(routineInfo.IInfo, cmdObj)
}
resultData := &PrintCheckData{
*cmdResult,
routineInfo.IInfo.NodeName, routineInfo.JInfo.NodeName, targetIP,
}
o.resultDataChan <- resultData
}
}

return resultData
func (o *CommandCheckOptions) toRoutineInfoChan(iInfos []*FloatInfo, jInfos []*FloatInfo) {
for _, iInfo := range iInfos {
for _, jInfo := range jInfos {
for _, ip := range jInfo.NodeIPs {
routineIInfo := iInfo
routineJInfo := jInfo
routineIp := ip
info := routineInfo{
IInfo: routineIInfo,
JInfo: routineJInfo,
routineIp: routineIp,
}
o.routineInfoChan <- info
}
}
}
close(o.routineInfoChan)
}

func (o *CommandCheckOptions) RunNative(iNodeInfos []*FloatInfo, jNodeInfos []*FloatInfo) []*PrintCheckData {
var resultData []*PrintCheckData
var resultDatas []*PrintCheckData

if len(iNodeInfos) > 0 && len(jNodeInfos) > 0 {
for _, iNodeInfo := range iNodeInfos {
for _, jNodeInfo := range jNodeInfos {
for _, ip := range jNodeInfo.NodeIPs {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: ip,
}
cmdResult := o.SrcFloater.CommandExec(iNodeInfo, cmdObj)
resultData = append(resultData, &PrintCheckData{
*cmdResult,
iNodeInfo.NodeName, jNodeInfo.NodeName, ip,
})
}
}
}
go o.toRoutineInfoChan(iNodeInfos, jNodeInfos)
}

return resultData
for i := 0; i < o.routinesMaxNum; i++ {
o.waitGroup.Add(1)
go o.checkNative()
}

go func() {
o.waitGroup.Wait()
close(o.resultDataChan)
}()

for resultData := range o.resultDataChan {
resultDatas = append(resultDatas, resultData)
}
return resultDatas
}

func (o *CommandCheckOptions) checkNative() {
defer o.waitGroup.Done()
for routineInfo := range o.routineInfoChan {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: routineInfo.routineIp,
}
cmdResult := o.SrcFloater.CommandExec(routineInfo.IInfo, cmdObj)
resultData := &PrintCheckData{
*cmdResult,
routineInfo.IInfo.NodeName, routineInfo.JInfo.NodeName, routineInfo.routineIp,
}
o.resultDataChan <- resultData
}
}

func (o *CommandCheckOptions) PrintResult(resultData []*PrintCheckData) {
Expand Down

0 comments on commit 4b34d56

Please sign in to comment.