diff --git a/cluster/images/floater.Dockerfile b/cluster/images/floater.Dockerfile index 2f61e773..19462ba5 100644 --- a/cluster/images/floater.Dockerfile +++ b/cluster/images/floater.Dockerfile @@ -4,6 +4,6 @@ ARG BINARY RUN apk add --no-cache ca-certificates RUN apk update && apk upgrade -RUN apk add ip6tables iptables curl +RUN apk add --no-cache ip6tables iptables curl netcat-openbsd COPY ${BINARY} /bin/${BINARY} diff --git a/pkg/command/check.go b/pkg/command/check.go index 8060cab1..98e81196 100644 --- a/pkg/command/check.go +++ b/pkg/command/check.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "strconv" "sync" @@ -67,6 +68,10 @@ func (o *CheckOptions) LoadConfig() { func (o *CheckOptions) Complete() error { o.LoadConfig() + if o.DoOption == nil { + return fmt.Errorf("config.json load error") + } + srcfloater := &share.Floater{ Namespace: o.DoOption.Namespace, Name: share.DefaultFloaterName, @@ -108,6 +113,19 @@ func (o *CheckOptions) Validate() error { if len(o.DoOption.Namespace) == 0 { return fmt.Errorf("namespace must be specified") } + if len(o.DoOption.CustomizedTargetPortList) != 0 { + for _, port := range o.DoOption.CustomizedTargetPortList { + portInt, err := strconv.Atoi(port) + if err != nil { + return fmt.Errorf("invalid port: %s", port) + } else if portInt <= 0 || portInt > 65535 { + return fmt.Errorf("invalid port: %d", portInt) + } + } + if len(o.DoOption.CustomizedTargetIPList) == 0 { + return fmt.Errorf("if CustomizedTargetPortList is not null, CustomizedTargetIPList should be assigned") + } + } return nil } diff --git a/pkg/command/init.go b/pkg/command/init.go index 2809814b..149f56cd 100644 --- a/pkg/command/init.go +++ b/pkg/command/init.go @@ -47,14 +47,18 @@ func NewInitCmd() *cobra.Command { func (o *InitOptions) Run() error { doOptions := share.DoOptions{ - Namespace: utils.DefaultNamespace, - Port: "8889", - PodWaitTime: 30, - Protocol: string(utils.TCP), - MaxNum: 3, - AutoClean: false, - CmdTimeout: 10, - Version: "0.2.1", + Namespace: utils.DefaultNamespace, + Port: "8889", + CustomizedTargetPortList: []string{}, + CustomizedTargetIPList: []string{}, + TargetDNSServer: "", + TargetHostToLookup: "", + PodWaitTime: 30, + Protocol: string(utils.TCP), + MaxNum: 3, + AutoClean: false, + CmdTimeout: 10, + Version: "0.2.1", // src SrcImageRepository: utils.DefaultImageRepository, SrcKubeConfig: utils.DefaultKubeConfigPath, diff --git a/pkg/command/resume.go b/pkg/command/resume.go index 6af518c6..93702cd5 100644 --- a/pkg/command/resume.go +++ b/pkg/command/resume.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "strconv" "github.com/kosmos.io/netdoctor/pkg/command/share" "github.com/kosmos.io/netdoctor/pkg/utils" @@ -60,6 +61,9 @@ func (o *ResumeOptions) LoadConfig() { func (o *ResumeOptions) Complete() error { o.LoadConfig() + if o.DoOption == nil { + return fmt.Errorf("config.json load error") + } srcfloater := &share.Floater{ Namespace: o.DoOption.Namespace, @@ -97,7 +101,10 @@ func (o *ResumeOptions) Complete() error { var resumeData []*share.PrintCheckData - utils.ReadResume(&resumeData) + err := utils.ReadResume(&resumeData) + if err != nil { + klog.Error("read resumeData error") + } o.DoOption.ResumeRecord = resumeData @@ -109,6 +116,17 @@ func (o *ResumeOptions) Validate() error { return fmt.Errorf("namespace must be specified") } + if len(o.DoOption.CustomizedTargetPortList) != 0 { + for _, port := range o.DoOption.CustomizedTargetPortList { + portInt, err := strconv.Atoi(port) + if err != nil { + return fmt.Errorf("invalid port: %s", port) + } else if portInt <= 0 || portInt > 65535 { + return fmt.Errorf("invalid port: %d", portInt) + } + } + } + return nil } diff --git a/pkg/command/share/do.go b/pkg/command/share/do.go index 194c1f14..7053d300 100644 --- a/pkg/command/share/do.go +++ b/pkg/command/share/do.go @@ -6,6 +6,8 @@ import ( command "github.com/kosmos.io/netdoctor/pkg/command/share/remote-command" "github.com/kosmos.io/netdoctor/pkg/utils" + + progressbar "github.com/schollz/progressbar/v3" "k8s.io/klog/v2" ) @@ -13,9 +15,13 @@ type DoOptions struct { Namespace string `json:"namespace,omitempty"` Version string `json:"version,omitempty"` - Protocol string `json:"protocol,omitempty"` - PodWaitTime int `json:"podWaitTime,omitempty"` - Port string `json:"port,omitempty"` + Protocol string `json:"protocol,omitempty"` + PodWaitTime int `json:"podWaitTime,omitempty"` + Port string `json:"port,omitempty"` + CustomizedTargetPortList []string `json:"customizedTargetPortList,omitempty"` + CustomizedTargetIPList []string `json:"customizedTargetIPList,omitempty"` + TargetDNSServer string `json:"targetDNSServer,omitempty"` + TargetHostToLookup string `json:"targetHostToLookup,omitempty"` MaxNum int `json:"maxNum,omitempty"` CmdTimeout int `json:"cmdTimeout,omitempty"` @@ -94,6 +100,20 @@ func (o *DoOptions) SaveOpts() { } } +func (o *DoOptions) SkipPod(podInfo *FloatInfo) bool { + // is check: no skip + if len(o.ResumeRecord) == 0 { + return false + } + // is resume: filt + for _, r := range o.ResumeRecord { + if r.SrcNodeName == podInfo.NodeName { + return false + } + } + return true +} + func (o *DoOptions) Skip(podInfo *FloatInfo, targetIP string) bool { // is check: no skip if len(o.ResumeRecord) == 0 { @@ -112,34 +132,66 @@ func (o *DoOptions) RunRange(iPodInfos []*FloatInfo, jPodInfos []*FloatInfo) []* var resultData []*PrintCheckData mutex := sync.Mutex{} - barctl := utils.NewBar(len(jPodInfos) * len(iPodInfos)) + var barctl *progressbar.ProgressBar + + if len(o.CustomizedTargetIPList) != 0 && len(o.CustomizedTargetPortList) != 0 || + o.Protocol == string(utils.DNS) { + barctl = utils.NewBar(len(iPodInfos)) + } else { + barctl = utils.NewBar(len(jPodInfos) * len(iPodInfos)) + } worker := func(iPodInfo *FloatInfo) { - for _, jPodInfo := range jPodInfos { - for _, ip := range jPodInfo.PodIPs { - var targetIP string - var err error - var cmdResult *command.Result - targetIP = ip - if err != nil { - cmdResult = command.ParseError(err) - } else { - // isSkip - if o.Skip(iPodInfo, targetIP) { - continue + var cmdObj command.Command + if len(o.CustomizedTargetIPList) != 0 && len(o.CustomizedTargetPortList) != 0 { + cmdObj = command.NewCmd(o.Protocol, o.CustomizedTargetIPList, o.CustomizedTargetPortList) + } else if o.Protocol == string(utils.DNS) { + cmdObj = command.NewCmd(o.Protocol, o.TargetHostToLookup, o.TargetDNSServer) + } else { + for _, jPodInfo := range jPodInfos { + for _, ip := range jPodInfo.PodIPs { + var targetIP string + var err error + var cmdResult *command.Result + targetIP = ip + if err != nil { + cmdResult = command.ParseError(err) + } else { + // isSkip + if o.Skip(iPodInfo, targetIP) { + continue + } + // ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized + cmdObj := command.NewCmd(o.Protocol, targetIP, o.Port) + cmdResult = o.SrcFloater.CommandExec(iPodInfo, cmdObj) } - // ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized - cmdObj := command.NewCmd(o.Protocol, targetIP, o.Port) - cmdResult = o.SrcFloater.CommandExec(iPodInfo, cmdObj) + mutex.Lock() + resultData = append(resultData, &PrintCheckData{ + *cmdResult, + iPodInfo.NodeName, jPodInfo.NodeName, targetIP, + }) + mutex.Unlock() + } + err := barctl.Add(1) + if err != nil { + klog.Error("processs bar event add error") } - mutex.Lock() - resultData = append(resultData, &PrintCheckData{ - *cmdResult, - iPodInfo.NodeName, jPodInfo.NodeName, targetIP, - }) - mutex.Unlock() } - barctl.Add(1) + return + } + if o.SkipPod(iPodInfo) { + return + } + cmdResult := o.SrcFloater.CommandExec(iPodInfo, cmdObj) + mutex.Lock() + resultData = append(resultData, &PrintCheckData{ + *cmdResult, + iPodInfo.NodeName, iPodInfo.NodeName, cmdObj.GetTargetStr(), + }) + mutex.Unlock() + err := barctl.Add(1) + if err != nil { + klog.Error("processs bar event add error") } } diff --git a/pkg/command/share/floater.go b/pkg/command/share/floater.go index fc26a6ae..fa54c8be 100644 --- a/pkg/command/share/floater.go +++ b/pkg/command/share/floater.go @@ -303,7 +303,13 @@ func (f *Floater) CommandExec(fInfo *FloatInfo, cmd command.Command) *command.Re if err != nil { // klog.Infof("error: %s", err) - return command.ParseError(fmt.Errorf("%s, stderr: %s", err, errBuffer.String())) + errString := errBuffer.String() + if len(errString) != 0 { + return command.ParseError(fmt.Errorf("%s, stderr: %s", err, errString)) + } else { + outString := outBuffer.String() + return command.ParseError(fmt.Errorf("%s, stderr: %s", err, outString)) + } } return cmd.ParseResult(outBuffer.String()) diff --git a/pkg/command/share/printer.go b/pkg/command/share/printer.go index 0a898816..408847b0 100644 --- a/pkg/command/share/printer.go +++ b/pkg/command/share/printer.go @@ -8,6 +8,7 @@ import ( command "github.com/kosmos.io/netdoctor/pkg/command/share/remote-command" "github.com/kosmos.io/netdoctor/pkg/utils" "github.com/olekukonko/tablewriter" + "k8s.io/klog/v2" ) type PrintCheckData struct { @@ -19,13 +20,13 @@ type PrintCheckData struct { func PrintResult(resultData []*PrintCheckData) { table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"S/N", "SRC_NODE_NAME", "DST_NODE_NAME", "TARGET_IP", "RESULT"}) + table.SetHeader([]string{"S/N", "SRC_NODE_NAME", "DST_NODE_NAME", "TARGETP", "RESULT"}) tableException := tablewriter.NewWriter(os.Stdout) - tableException.SetHeader([]string{"S/N", "SRC_NODE_NAME", "DST_NODE_NAME", "TARGET_IP", "RESULT", "LOG"}) + tableException.SetHeader([]string{"S/N", "SRC_NODE_NAME", "DST_NODE_NAME", "TARGET", "RESULT", "LOG"}) tableFailed := tablewriter.NewWriter(os.Stdout) - tableFailed.SetHeader([]string{"S/N", "SRC_NODE_NAME", "DST_NODE_NAME", "TARGET_IP", "RESULT", "LOG"}) + tableFailed.SetHeader([]string{"S/N", "SRC_NODE_NAME", "DST_NODE_NAME", "TARGET", "RESULT", "LOG"}) resumeData := []*PrintCheckData{} @@ -61,10 +62,20 @@ func PrintResult(resultData []*PrintCheckData) { }) } } - fmt.Println("") - table.Render() - fmt.Println("") - tableException.Render() - - utils.WriteResume(resumeData) + if table.NumLines() > 0 { + fmt.Println("") + table.Render() + } + if tableFailed.NumLines() > 0 { + fmt.Println("") + tableFailed.Render() + } + if tableException.NumLines() > 0 { + fmt.Println("") + tableException.Render() + } + err := utils.WriteResume(resumeData) + if err != nil { + klog.Error("write resumeData error") + } } diff --git a/pkg/command/share/remote-command/curl.go b/pkg/command/share/remote-command/curl.go index 391d73f3..6f481ce4 100644 --- a/pkg/command/share/remote-command/curl.go +++ b/pkg/command/share/remote-command/curl.go @@ -11,6 +11,10 @@ type Curl struct { Port string } +func (c *Curl) GetTargetStr() string { + return fmt.Sprintf("%s:%s", c.TargetIP, c.Port) +} + func (c *Curl) GetCommandStr() string { // execute once if utils.IsIPv6(c.TargetIP) { diff --git a/pkg/command/share/remote-command/interface.go b/pkg/command/share/remote-command/interface.go index 8a842109..f1a5e976 100644 --- a/pkg/command/share/remote-command/interface.go +++ b/pkg/command/share/remote-command/interface.go @@ -20,6 +20,7 @@ type Result struct { type Command interface { GetCommandStr() string ParseResult(string) *Result + GetTargetStr() string } func ParseError(err error) *Result { @@ -42,15 +43,29 @@ func PrintStatus(status int) string { return "UNEXCEPTIONED" } -func NewCmd(protocol string, args ...string) Command { - if protocol == string(utils.TCP) { - return &Curl{ - TargetIP: args[0], - Port: args[1], +func NewCmd(protocol string, args ...any) Command { + switch args[1].(type) { + case []string: + return &Ncat{ + Protocol: protocol, + TargetIP: args[0].([]string), + Port: args[1].([]string), } - } else { - return &Ping{ - TargetIP: args[0], + default: + if protocol == string(utils.TCP) { + return &Curl{ + TargetIP: args[0].(string), + Port: args[1].(string), + } + } else if protocol == string(utils.DNS) { + return &Nslookup{ + TargetHost: args[0].(string), + DNSServer: args[1].(string), + } + } else { + return &Ping{ + TargetIP: args[0].(string), + } } } } diff --git a/pkg/command/share/remote-command/ncat.go b/pkg/command/share/remote-command/ncat.go new file mode 100644 index 00000000..b2b41e0e --- /dev/null +++ b/pkg/command/share/remote-command/ncat.go @@ -0,0 +1,59 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/kosmos.io/netdoctor/pkg/utils" +) + +type Ncat struct { + Protocol string + TargetIP []string + Port []string +} + +func (c *Ncat) GetTargetStr() string { + targetip := strings.Join(c.TargetIP, ",") + targetport := strings.Join(c.Port, ",") + return fmt.Sprintf("IPs: %s; Ports: %s", targetip, targetport) +} + +func (c *Ncat) GetCommandStr() string { + // execute once + cmdStrList := []string{} + for _, ip := range c.TargetIP { + if c.Protocol == string(utils.UDP) { + // TODO: find a better way + // netcat send udp packet, if remote server explictly response "reject" icmp packet, + // netcat will mark failed. If remote server response nothing, netcat mark succeeded. + // But if remote server connect timetout, server also doesn't response anything, + // netcat consider it succeeded, it's wrong. + cmdStrList = append(cmdStrList, fmt.Sprintf("nc -w 1 -z -d -u -v %s %s 2>&1", ip, strings.Join(c.Port, " "))) + } else { + cmdStrList = append(cmdStrList, fmt.Sprintf("nc -w 1 -z -d -v %s %s 2>&1", ip, strings.Join(c.Port, " "))) + } + } + + return strings.Join(cmdStrList, " && ") +} + +func (c *Ncat) ParseResult(result string) *Result { + isSucceed := CommandFailed + index := strings.LastIndex(result, "succeeded") + if index != -1 { + isSucceed = CommandSuccessed + } + index = strings.LastIndex(result, "Connection refused") + if index != -1 { + isSucceed = CommandFailed + } + index = strings.LastIndex(result, "timed out") + if index != -1 { + isSucceed = CommandFailed + } + return &Result{ + Status: isSucceed, + ResultStr: fmt.Sprintf("%s %s", c.GetCommandStr(), result), + } +} diff --git a/pkg/command/share/remote-command/nslookup.go b/pkg/command/share/remote-command/nslookup.go new file mode 100644 index 00000000..eb09bf76 --- /dev/null +++ b/pkg/command/share/remote-command/nslookup.go @@ -0,0 +1,52 @@ +package command + +import ( + "fmt" + "strings" +) + +type Nslookup struct { + TargetHost string + DNSServer string +} + +func (c *Nslookup) GetTargetStr() string { + var targethost string + var targetdns string + if len(c.TargetHost) == 0 { + targethost = fmt.Sprintf("dns:%s", "kubernetes.default.svc.cluster.local") + } else { + targethost = c.TargetHost + } + if len(c.DNSServer) == 0 { + targetdns = "coredns" + } else { + targetdns = c.DNSServer + } + return fmt.Sprintf("host: %s; dns: %s", targethost, targetdns) +} + +func (c *Nslookup) GetCommandStr() string { + // execute once + if c.TargetHost == "" { + c.TargetHost = "kubernetes.default.svc.cluster.local" + } + return fmt.Sprintf("nslookup %s %s", c.TargetHost, c.DNSServer) +} + +func (c *Nslookup) ParseResult(result string) *Result { + // klog.Infof("curl result parser: %s", result) + isSucceed := CommandSuccessed + index := strings.LastIndex(result, "server can't find") + if index != -1 { + isSucceed = CommandFailed + } + index = strings.LastIndex(result, "connection timed out") + if index != -1 { + isSucceed = CommandFailed + } + return &Result{ + Status: isSucceed, + ResultStr: fmt.Sprintf("%s %s", c.GetCommandStr(), result), + } +} diff --git a/pkg/command/share/remote-command/ping.go b/pkg/command/share/remote-command/ping.go index fd7ad2a0..b9275c8e 100644 --- a/pkg/command/share/remote-command/ping.go +++ b/pkg/command/share/remote-command/ping.go @@ -11,6 +11,10 @@ type Ping struct { TargetIP string } +func (c *Ping) GetTargetStr() string { + return c.TargetIP +} + func (c *Ping) GetCommandStr() string { // execute once return fmt.Sprintf("ping -c 1 %s", c.TargetIP) diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index 2324a0ed..a2c0d7d5 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -12,4 +12,5 @@ const ( TCP Protocol = "tcp" UDP Protocol = "udp" IPv4 Protocol = "ipv4" + DNS Protocol = "dns" )