Skip to content

Commit

Permalink
fix: network connectivity speed
Browse files Browse the repository at this point in the history
Signed-off-by: renxiangyu <[email protected]>
  • Loading branch information
renxiangyu committed Dec 5, 2023
1 parent 88a3efe commit efc8fc9
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 29 deletions.
81 changes: 52 additions & 29 deletions pkg/kosmosctl/floater/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type CommandCheckOptions struct {

SrcFloater *Floater
DstFloater *Floater

routinesMaxNum int
}

type PrintCheckData struct {
Expand Down Expand Up @@ -92,6 +94,7 @@ func NewCmdCheck() *cobra.Command {
flags.StringVar(&o.Port, "port", "8889", "Port used by floater.")
flags.IntVarP(&o.PodWaitTime, "pod-wait-time", "w", 30, "Time for wait pod(floater) launch.")
flags.StringVar(&o.Protocol, "protocol", string(TCP), "Protocol for the network problem.")
flags.IntVarP(&o.routinesMaxNum, "routines-max-number", "", 5, "Number of goroutines to use.")

return cmd
}
Expand Down Expand Up @@ -199,60 +202,80 @@ func (o *CommandCheckOptions) Run() error {
func (o *CommandCheckOptions) RunRange(iPodInfos []*FloatInfo, jPodInfos []*FloatInfo) []*PrintCheckData {
var resultData []*PrintCheckData

goroutinePool := utils.NewGoroutinePool(o.routinesMaxNum)

if len(iPodInfos) > 0 && len(jPodInfos) > 0 {
for _, iPodInfo := range iPodInfos {
for _, jPodInfo := range jPodInfos {
for _, ip := range jPodInfo.PodIPs {
var targetIP string
var err error
var cmdResult *command.Result
if o.DstFloater != nil {
targetIP, err = netmap.NetMap(ip, o.DstFloater.CIDRsMap)
} else {
targetIP = ip
}
if err != nil {
cmdResult = command.ParseError(err)
} else {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: targetIP,
routineIPodInfo := iPodInfo
routineJPodInfo := jPodInfo
routineIp := ip
goroutinePool.Submit(func(args ...interface{}) {
var targetIP string
var err error
var cmdResult *command.Result
if o.DstFloater != nil {
targetIP, err = netmap.NetMap(routineIp, o.DstFloater.CIDRsMap)
} else {
targetIP = routineIp
}
if err != nil {
cmdResult = command.ParseError(err)
} else {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: targetIP,
}
cmdResult = o.SrcFloater.CommandExec(routineIPodInfo, cmdObj)
}
cmdResult = o.SrcFloater.CommandExec(iPodInfo, cmdObj)
}
resultData = append(resultData, &PrintCheckData{
*cmdResult,
iPodInfo.NodeName, jPodInfo.NodeName, targetIP,
})
resultData = append(resultData, &PrintCheckData{
*cmdResult,
routineIPodInfo.NodeName, routineJPodInfo.NodeName, targetIP,
})
}, routineIPodInfo, routineJPodInfo, routineIp)
}
}
}
}

goroutinePool.Wait()
goroutinePool.Shutdown()

return resultData
}

func (o *CommandCheckOptions) RunNative(iNodeInfos []*FloatInfo, jNodeInfos []*FloatInfo) []*PrintCheckData {
var resultData []*PrintCheckData

goroutinePool := utils.NewGoroutinePool(o.routinesMaxNum)

if len(iNodeInfos) > 0 && len(jNodeInfos) > 0 {
for _, iNodeInfo := range iNodeInfos {
for _, jNodeInfo := range jNodeInfos {
for _, ip := range jNodeInfo.NodeIPs {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: ip,
}
cmdResult := o.SrcFloater.CommandExec(iNodeInfo, cmdObj)
resultData = append(resultData, &PrintCheckData{
*cmdResult,
iNodeInfo.NodeName, jNodeInfo.NodeName, ip,
})
routineINodeInfo := iNodeInfo
routineJNodeInfo := jNodeInfo
routineIp := ip
goroutinePool.Submit(func(args ...interface{}) {
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
cmdObj := &command.Ping{
TargetIP: routineIp,
}
cmdResult := o.SrcFloater.CommandExec(routineINodeInfo, cmdObj)
resultData = append(resultData, &PrintCheckData{
*cmdResult,
routineINodeInfo.NodeName, routineJNodeInfo.NodeName, routineIp,
})
}, routineINodeInfo, routineJNodeInfo, routineIp)
}
}
}
}

goroutinePool.Wait()
goroutinePool.Shutdown()

return resultData
}

Expand Down
57 changes: 57 additions & 0 deletions pkg/utils/goroutine_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package utils

import (
"fmt"
"sync"
)

type GoroutinePool struct {
pool chan int
funcChan chan RoutineFunc
waitGroup *sync.WaitGroup
}

type RoutineFunc struct {
f interface{}
args []interface{}
}

func NewGoroutinePool(size int) *GoroutinePool {
return &GoroutinePool{
pool: make(chan int, size),
funcChan: make(chan RoutineFunc, size),
waitGroup: &sync.WaitGroup{},
}
}

func (g *GoroutinePool) Submit(f interface{}, args ...interface{}) {
g.funcChan <- RoutineFunc{f: f, args: args}
g.pool <- 1
g.waitGroup.Add(1)

go func() {
task := <-g.funcChan
switch f := task.f.(type) {
case func():
f()
case func(args ...interface{}):
f(task.args...)
default:
fmt.Println("Invalid task type")
}
defer g.Done()
}()
}

func (g *GoroutinePool) Wait() {
g.waitGroup.Wait()
}

func (g *GoroutinePool) Done() {
<-g.pool
g.waitGroup.Done()
}

func (g *GoroutinePool) Shutdown() {
close(g.pool)
}
126 changes: 126 additions & 0 deletions pkg/utils/goroutine_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package utils

import (
"fmt"
"sync"
"testing"
"time"
)

func TestGoroutinePool_Submit(t *testing.T) {
type fields struct {
pool chan int
funcChan chan RoutineFunc
waitGroup *sync.WaitGroup
}
type args struct {
f interface{}
args []interface{}
}
tests := []struct {
name string
fields fields
args []args
}{
// func has no parameters
{"Test1", fields{
pool: make(chan int, 5),
funcChan: make(chan RoutineFunc, 5),
waitGroup: &sync.WaitGroup{},
}, []args{
{
f: func() {
for i := 0; i < 10; i++ {
fmt.Println(i)
time.Sleep(1000)
}
},
args: nil,
}, {
f: func() {
for i := 10; i < 20; i++ {
fmt.Println(i)
time.Sleep(1000)
}
},
args: nil,
},
}},
// func has parameters
{"Test2", fields{
pool: make(chan int, 5),
funcChan: make(chan RoutineFunc, 5),
waitGroup: &sync.WaitGroup{},
}, []args{
{
f: func(args ...interface{}) {
for _, arg := range args {
fmt.Println(arg)
time.Sleep(1000)
}
},
args: []interface{}{"a", "b", "c", "d"},
}, {
f: func(args ...interface{}) {
for _, arg := range args {
fmt.Println(arg)
time.Sleep(1000)
}
},
args: []interface{}{"e", "f", "g", "h"},
},
}},
// the thread capacity is 1
{"Test3", fields{
pool: make(chan int, 1),
funcChan: make(chan RoutineFunc, 1),
waitGroup: &sync.WaitGroup{},
}, []args{
{
f: func() {
for i := 0; i < 10; i++ {
fmt.Println(i)
time.Sleep(1000)
}
},
args: nil,
}, {
f: func() {
for i := 10; i < 20; i++ {
fmt.Println(i)
time.Sleep(1000)
}
},
args: nil,
},
}},
//incorrect func parameter
{"Test4", fields{
pool: make(chan int, 5),
funcChan: make(chan RoutineFunc, 5),
waitGroup: &sync.WaitGroup{},
}, []args{
{
f: func(a int) {
fmt.Println(a)
},
args: []interface{}{"hello"},
},
}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := &GoroutinePool{
pool: tt.fields.pool,
funcChan: tt.fields.funcChan,
waitGroup: tt.fields.waitGroup,
}
for _, arg := range tt.args {
g.Submit(arg.f, arg.args...)
}
g.Wait()
g.Shutdown()
fmt.Println("success!")
})
}
}

0 comments on commit efc8fc9

Please sign in to comment.