From 71e0fab4a01bd382ac370401ca7866a0770c1712 Mon Sep 17 00:00:00 2001 From: luolanzone Date: Fri, 14 May 2021 00:14:05 +0800 Subject: [PATCH] add livetraffic support in octant plugin (#2124) --- pkg/graphviz/traceflow.go | 64 +- .../octant/cmd/antrea-octant-plugin/main.go | 2 +- .../cmd/antrea-octant-plugin/traceflow.go | 769 ++++++++++++------ 3 files changed, 578 insertions(+), 257 deletions(-) diff --git a/pkg/graphviz/traceflow.go b/pkg/graphviz/traceflow.go index 2cef03cd843..877d1caf7fb 100644 --- a/pkg/graphviz/traceflow.go +++ b/pkg/graphviz/traceflow.go @@ -15,7 +15,6 @@ package graphviz import ( - "errors" "fmt" "strconv" "strings" @@ -42,7 +41,7 @@ var ( clusterDstName = "cluster_destination" ) -// createDirectedEdgeWithDefaultStyle creates a node with default style (usually used to represent a component in traceflow) . +// createNodeWithDefaultStyle creates a node with default style (usually used to represent a component in traceflow) . func createNodeWithDefaultStyle(graph *gographviz.Graph, parentGraph string, name string) (*gographviz.Node, error) { err := graph.AddNode(parentGraph, name, map[string]string{ "shape": "box", @@ -81,7 +80,7 @@ func createDirectedEdgeWithDefaultStyle(graph *gographviz.Graph, start *gographv } edges := graph.Edges.SrcToDsts[start.Name][end.Name] if len(edges) == 0 { - return nil, errors.New(fmt.Sprintf("Failed to create a new edge between node %s and node %s", start.Name, end.Name)) + return nil, fmt.Errorf("failed to create a new edge between node %s and node %s", start.Name, end.Name) } edge := edges[len(edges)-1] if isForwardDir { @@ -92,7 +91,7 @@ func createDirectedEdgeWithDefaultStyle(graph *gographviz.Graph, start *gographv return edge, nil } -// createDirectedEdgeWithDefaultStyle creates a cluster with default style. +// createClusterWithDefaultStyle creates a cluster with default style. // In Graphviz, cluster is a subgraph which is surrounded by a rectangle and the nodes belonging to the cluster are drawn together. // In traceflow, a cluster is usually used to represent a K8s node. func createClusterWithDefaultStyle(graph *gographviz.Graph, name string) (*gographviz.SubGraph, error) { @@ -151,6 +150,13 @@ func getSrcNodeName(tf *crdv1alpha1.Traceflow) string { if len(tf.Spec.Source.Namespace) > 0 && len(tf.Spec.Source.Pod) > 0 { return getWrappedStr(tf.Spec.Source.Namespace + "/" + tf.Spec.Source.Pod) } + if tf.Spec.LiveTraffic { + if len(tf.Spec.Source.IP) > 0 { + return getWrappedStr(tf.Spec.Source.IP) + } else { + return getWrappedStr(tf.Status.CapturedPacket.SrcIP) + } + } return "" } @@ -165,6 +171,9 @@ func getDstNodeName(tf *crdv1alpha1.Traceflow) string { if len(tf.Spec.Destination.Namespace) > 0 && len(tf.Spec.Destination.Pod) > 0 { return getWrappedStr(tf.Spec.Destination.Namespace + "/" + tf.Spec.Destination.Pod) } + if tf.Spec.LiveTraffic { + return getWrappedStr(tf.Status.CapturedPacket.DstIP) + } return "" } @@ -341,6 +350,53 @@ func GenGraph(tf *crdv1alpha1.Traceflow) (string, error) { graph.Attrs[gographviz.Label] = getTraceflowStatusMessage(tf) } if tf == nil || senderRst == nil || tf.Status.Phase != crdv1alpha1.Succeeded || len(senderRst.Observations) == 0 { + // For live traffic, when the source is IP or empty, there is no result from the sender Node result in the traceflow status. + if senderRst == nil && tf.Spec.LiveTraffic && tf.Status.Phase == crdv1alpha1.Succeeded { + // Draw the nodes for the sender. + srcCluster, err := createClusterWithDefaultStyle(graph, clusterSrcName) + if err != nil { + return "", err + } + srcCluster.Attrs[gographviz.Label] = "source" + srcCluster.Attrs[gographviz.LabelJust] = "l" + // For live traffic data, we only know src IP from capturedPacket + node, err := createEndpointNodeWithDefaultStyle(graph, srcCluster.Name, getWrappedStr(tf.Status.CapturedPacket.SrcIP)) + if err != nil { + return "", err + } + + // create an invisble edge before destination cluster, otherwise the source cluster will + // always be on the right even source subGraph is before desitination subGraph in graph string. + err = graph.AddEdge(node.Name, node.Name, true, map[string]string{ + "style": "invis", + }) + if err != nil { + return "", err + } + dstCluster, err := createClusterWithDefaultStyle(graph, clusterDstName) + if err != nil { + return "", err + } + nodes, err := genSubGraph(graph, dstCluster, receiverRst, &tf.Spec, getDstNodeName(tf), false, 0) + if err != nil { + return "", err + } + // Draw the cross-cluster edge. + edge, err := createDirectedEdgeWithDefaultStyle(graph, node, nodes[len(nodes)-1], true) + if err != nil { + return "", err + } + edge.Attrs[gographviz.Constraint] = "false" + + // add an anonymous subgraph to make two nodes in the same level. + // refer to https://github.com/awalterschulze/gographviz/issues/59 + graph.AddAttr("G", "newrank", "true") + graph.AddSubGraph("G", "force_node_same_level", map[string]string{"rank": "same"}) + graph.AddNode("force_node_same_level", node.Name, nil) + graph.AddNode("force_node_same_level", nodes[len(nodes)-1].Name, nil) + + return genOutput(graph, false), nil + } return genOutput(graph, true), nil } diff --git a/plugins/octant/cmd/antrea-octant-plugin/main.go b/plugins/octant/cmd/antrea-octant-plugin/main.go index a3569dbdfef..48856ed469c 100644 --- a/plugins/octant/cmd/antrea-octant-plugin/main.go +++ b/plugins/octant/cmd/antrea-octant-plugin/main.go @@ -73,7 +73,7 @@ func main() { a := newAntreaOctantPlugin() capabilities := &plugin.Capabilities{ - ActionNames: []string{addTfAction, showGraphAction}, + ActionNames: []string{addTfAction, addLiveTfAction, showGraphAction}, IsModule: true, } diff --git a/plugins/octant/cmd/antrea-octant-plugin/traceflow.go b/plugins/octant/cmd/antrea-octant-plugin/traceflow.go index d528c2023db..0cccd35a745 100644 --- a/plugins/octant/cmd/antrea-octant-plugin/traceflow.go +++ b/plugins/octant/cmd/antrea-octant-plugin/traceflow.go @@ -16,6 +16,7 @@ package main import ( "context" + "errors" "fmt" "log" "net" @@ -28,7 +29,7 @@ import ( "github.com/vmware-tanzu/octant/pkg/view/component" "github.com/vmware-tanzu/octant/pkg/view/flexlayout" "k8s.io/apimachinery/pkg/api/validation" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/graphviz" @@ -36,6 +37,7 @@ import ( var ( addTfAction = "traceflow/addTf" + addLiveTfAction = "traceflow/addLiveTf" showGraphAction = "traceflow/showGraphAction" ) @@ -47,6 +49,8 @@ const ( tfNameCol = "Trace" srcNamespaceCol = "Source Namespace" srcPodCol = "Source Pod" + srcTypeCol = "Source Type" + srcCol = "Source" srcPortCol = "Source Port" dstTypeCol = "Destination Type" dstNamespaceCol = "Destination Namespace" @@ -56,8 +60,11 @@ const ( phaseCol = "Phase" ageCol = "Age" traceNameCol = "Trace Name" + dropOnlyCol = "Drop Only" + timeoutCol = "Timeout" TIME_FORMAT_YYYYMMDD_HHMMSS = "20060102-150405" + invalidInputMsg = "Invalid user input, CRD creation or Traceflow request may fail: " ) // getDstName gets the name of destination for specific traceflow. @@ -88,301 +95,480 @@ func getDstType(tf *crdv1alpha1.Traceflow) string { return "" } -// actionHandler handlers clicks and actions from "Start New Trace" and "Generate Trace Graph" buttons. +// actionHandler handlers clicks and actions from "Start New Trace", "Start New Live-traffic Trace" and "Generate Trace Graph" buttons. func (p *antreaOctantPlugin) actionHandler(request *service.ActionRequest) error { actionName, err := request.Payload.String("action") if err != nil { - log.Printf("Failed to get input at string: %s", err) + log.Printf("Failed to get input at string: %s\n", err) return nil } - switch actionName { case addTfAction: - srcNamespace, err := request.Payload.String(srcNamespaceCol) + srcNamespace, err := checkNamespace(request) if err != nil { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get srcNamespace as string: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to get source namespace as "+ - "string: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) return nil } - if errs := validation.ValidateNamespaceName(srcNamespace, false); len(errs) != 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to validate source namespace string %s, errs: %#v", srcNamespace, errs) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid source namespace string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + + // Judge the destination type and get destination according to the type. + dstType, err := checkDstType(request) + if err != nil { return nil } - - srcPod, err := request.Payload.String(srcPodCol) + dst, err := checkDst(request) if err != nil { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get srcPod as string: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to get source pod as "+ - "string: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) return nil } - if errs := validation.NameIsDNSSubdomain(srcPod, false); len(errs) != 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to validate source pod string %s, errs: %#v", srcPod, errs) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid source pod string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + dstNamespace, err := checkDstNamespace(request) + if err != nil { return nil } - // Judge the destination type and get destination according to the type. - dstType, err := request.Payload.StringSlice(dstTypeCol) - if err != nil || len(dstType) == 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get dstType as string slice: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid destination type choice, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + source := crdv1alpha1.Source{} + var sourceName string + srcPod, err := request.Payload.String(srcPodCol) + if err != nil { + alertPrinter(request, invalidInputMsg+"failed to get srcPod as string", + "Failed to get source Pod as string", nil, err) return nil } - dst, err := request.Payload.String(dstCol) + err = validatePodName(request, srcPod, "source") if err != nil { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get dst as string: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to get destination as "+ - "string: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) return nil } - dstNamespace, err := request.Payload.OptionalString(dstNamespaceCol) + err = validateNamespace(request, srcNamespace, "source") if err != nil { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get dstNamespace as string: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to get destination namespace as "+ - "string: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) return nil } - var destination crdv1alpha1.Destination - switch dstType[0] { - case crdv1alpha1.DstTypePod: - if errs := validation.NameIsDNSSubdomain(dst, false); len(errs) != 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to validate destination pod string %s, errs: %#v", dst, errs) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid destination pod string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - return nil - } - if errs := validation.ValidateNamespaceName(dstNamespace, false); len(errs) != 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to validate destination namespace string %s, errs: %#v", dstNamespace, errs) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid destination namespace string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - return nil - } - destination = crdv1alpha1.Destination{ - Namespace: dstNamespace, - Pod: dst, - } - case crdv1alpha1.DstTypeIPv4: - s := net.ParseIP(dst) - if s == nil { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get destination IP as a valid IPv4 IP: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid destination IPv4 string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - return nil - } - if s.To4() == nil { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get destination IP as a valid IPv4 IP: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid destination IPv4 string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - return nil - } - destination = crdv1alpha1.Destination{ - IP: dst, - } - case crdv1alpha1.DstTypeService: - if errs := validation.ValidateNamespaceName(dstNamespace, false); len(errs) != 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to validate destination namespace string %s, errs: %#v", dstNamespace, errs) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid destination namespace string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - return nil - } - if errs := validation.NameIsDNS1035Label(dst, false); len(errs) != 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to validate destination service string %s, errs: %#v", dst, errs) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Invalid destination service string, "+ - "please check your input and submit again."), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - return nil - } - destination = crdv1alpha1.Destination{ - Namespace: dstNamespace, - Service: dst, - } + source = crdv1alpha1.Source{ + Namespace: srcNamespace, + Pod: srcPod, } + sourceName = srcPod - // It is not required for users to input port numbers. - hasSrcPort, hasDstPort := true, true - srcPort, err := request.Payload.Uint16(srcPortCol) + destination, err := checkDestination(request, dst, dstType, dstNamespace, false, true) if err != nil { - hasSrcPort = false + return nil } - dstPort, err := request.Payload.Uint16(dstPortCol) + + // It is not required for users to input port numbers and timeout + hasSrcPort, hasDstPort, srcPort, dstPort := checkPorts(request) + hasTimeout, timeout := checkTimeout(request) + protocol, err := checkProtocol(request) if err != nil { - hasDstPort = false + return nil } - protocol, err := request.Payload.StringSlice(protocolCol) - if err != nil || len(protocol) == 0 { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "failed to get protocol as string slice: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to get protocol as "+ - "string: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - return nil + tfName := sourceName + "-" + dst + "-" + time.Now().Format(TIME_FORMAT_YYYYMMDD_HHMMSS) + tf := initTfSpec(tfName, source, destination, protocol) + if hasTimeout { + tf.Spec.Timeout = timeout } - // Judge whether the name of trace flow is duplicated. - // If it is, then the user creates more than one traceflows in one second, which is not allowed. - tfName := srcPod + "-" + dst + "-" + time.Now().Format(TIME_FORMAT_YYYYMMDD_HHMMSS) - ctx := context.Background() - tfOld, _ := p.client.CrdV1alpha1().Traceflows().Get(ctx, tfName, v1.GetOptions{}) - if tfOld.Name == tfName { - log.Printf("Invalid user input, CRD creation or Traceflow request may fail: "+ - "duplicate traceflow \"%s\": same source pod and destination pod in less than one second: %+v. ", tfName, tfOld) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Duplicate traceflow: same source pod "+ - "and destination pod in less than one second"), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + updateIPHeader(tf, hasSrcPort, hasDstPort, srcPort, dstPort) + p.createTfCR(tf, request, context.Background(), tfName) + return nil + case addLiveTfAction: + srcNamespace, err := checkNamespace(request) + if err != nil { + return nil + } + // Judge the destination type and get destination according to the type. + dstType, err := checkDstType(request) + if err != nil { + return nil + } + dst, err := checkDst(request) + if err != nil { + return nil + } + dstNamespace, err := checkDstNamespace(request) + if err != nil { return nil } + source := crdv1alpha1.Source{} + var sourceName string + var srcLen int + var isSrcPodType bool + dstLen := len(dst) - tf := &crdv1alpha1.Traceflow{ - ObjectMeta: v1.ObjectMeta{ - Name: tfName, - }, - Spec: crdv1alpha1.TraceflowSpec{ - Source: crdv1alpha1.Source{ - Namespace: srcNamespace, - Pod: srcPod, - }, - Destination: destination, - Packet: crdv1alpha1.Packet{ - IPHeader: crdv1alpha1.IPHeader{ - Protocol: crdv1alpha1.SupportedProtocols[protocol[0]], - }, - }, - }, + // Judge the source type and get source according to the type. + srcType, err := request.Payload.StringSlice(srcTypeCol) + if err != nil || len(srcType) == 0 { + alertPrinter(request, invalidInputMsg+"failed to get srcType as string slice", + "Invalid source type choice, please check your input and submit again", nil, err) + return nil + } + src, err := request.Payload.String(srcCol) + if err != nil { + alertPrinter(request, invalidInputMsg+"failed to get src as string", + "Failed to get source as string", nil, err) + return nil + } + srcLen = len(src) + if srcLen == 0 && dstLen == 0 { + alertPrinter(request, invalidInputMsg+"one of source/destination must be set, and must be a Pod.", + "One of source/destination must be set, and must be a Pod, please check your input and submit again.", nil, nil) + return nil } - switch tf.Spec.Packet.IPHeader.Protocol { - case crdv1alpha1.TCPProtocol: - { - tf.Spec.Packet.TransportHeader.TCP = &crdv1alpha1.TCPHeader{ - Flags: 2, - } - if hasSrcPort { - tf.Spec.Packet.TransportHeader.TCP.SrcPort = int32(srcPort) + if srcLen > 0 { + switch srcType[0] { + case "Pod": + err := validatePodName(request, src, "source") + if err != nil { + return nil } - if hasDstPort { - tf.Spec.Packet.TransportHeader.TCP.DstPort = int32(dstPort) + err = validateNamespace(request, srcNamespace, "source") + if err != nil { + return nil } - } - case crdv1alpha1.UDPProtocol: - { - tf.Spec.Packet.TransportHeader.UDP = &crdv1alpha1.UDPHeader{} - if hasSrcPort { - tf.Spec.Packet.TransportHeader.UDP.SrcPort = int32(srcPort) + isSrcPodType = true + source = crdv1alpha1.Source{ + Namespace: srcNamespace, + Pod: src, } - if hasDstPort { - tf.Spec.Packet.TransportHeader.UDP.DstPort = int32(dstPort) + case "IPv4": + err := validateIP(request, src, "source") + if err != nil { + return nil } - } - case crdv1alpha1.ICMPProtocol: - { - tf.Spec.Packet.TransportHeader.ICMP = &crdv1alpha1.ICMPEchoRequestHeader{ - ID: 0, - Sequence: 0, + source = crdv1alpha1.Source{ + IP: src, } } + sourceName = src + } + destination := crdv1alpha1.Destination{} + if dstLen == 0 && isSrcPodType { + goto ContinueWithoutCheckDst } - log.Printf("Get user input successfully, traceflow: %+v", tf) - tf, err = p.client.CrdV1alpha1().Traceflows().Create(ctx, tf, v1.CreateOptions{}) + + destination, err = checkDestination(request, dst, dstType, dstNamespace, true, isSrcPodType) if err != nil { - log.Printf("Failed to create traceflow CRD \"%s\", err: %s", tfName, err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to create traceflow CRD, "+ - "err: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) return nil } - log.Printf("Create traceflow CRD \"%s\" successfully, Traceflow Results: %+v", tfName, tf) - alert := action.CreateAlert(action.AlertTypeSuccess, fmt.Sprintf("Traceflow \"%s\" is created successfully", - tfName), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) - // Automatically delete the traceflow CRD after created for 300s(5min). - go func(tfName string) { - age := time.Second * 300 - time.Sleep(age) - err := p.client.CrdV1alpha1().Traceflows().Delete(context.Background(), tfName, v1.DeleteOptions{}) - if err != nil { - log.Printf("Failed to delete traceflow CRD \"%s\", err: %s", tfName, err) - return - } - log.Printf("Deleted traceflow CRD \"%s\" successfully after %.0f seconds", tfName, age.Seconds()) - }(tf.Name) - p.lastTf = tf - p.graph, err = graphviz.GenGraph(p.lastTf) + ContinueWithoutCheckDst: + // It is not required for users to input port numbers. + hasSrcPort, hasDstPort, srcPort, dstPort := checkPorts(request) + hasTimeout, timeout := checkTimeout(request) + protocol, err := checkProtocol(request) if err != nil { - log.Printf("Failed to generate traceflow graph \"%s\", err: %s", tfName, err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to generate traceflow graph, "+ - "err: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) return nil } + // It is not required for users to input port numbers. + dropOnlyChecked := false + dropOnly, err := request.Payload.StringSlice(dropOnlyCol) + if err != nil || len(dropOnly) == 0 { + alertPrinter(request, invalidInputMsg+"failed to get dropOnly as string slice", + "Failed to get dropOnly as string, please check your input and submit again", nil, err) + return nil + } + if dropOnly[0] == "Yes" { + dropOnlyChecked = true + } + + tfName := "live-" + if srcLen == 0 { + tfName += "dst-" + dst + } else if dstLen == 0 { + tfName += "src-" + sourceName + } else { + tfName += sourceName + "-" + dst + } + tfName += "-" + time.Now().Format(TIME_FORMAT_YYYYMMDD_HHMMSS) + tf := initTfSpec(tfName, source, destination, protocol) + tf.Spec.LiveTraffic = true + if dropOnlyChecked { + tf.Spec.DroppedOnly = true + } + if hasTimeout { + tf.Spec.Timeout = timeout + } + + updateIPHeader(tf, hasSrcPort, hasDstPort, srcPort, dstPort) + p.createTfCR(tf, request, context.Background(), tfName) return nil case showGraphAction: - name, err := request.Payload.String(traceNameCol) - if err != nil { - log.Printf("Failed to get name at string: %s", err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to get graph name as "+ - "string: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + traceName, err := request.Payload.StringSlice(traceNameCol) + if err != nil || len(traceName) == 0 { + alertPrinter(request, invalidInputMsg+"failed to get graph name as string", + "Failed to get graph name as string", nil, err) return nil } + + name := traceName[0] // Invoke GenGraph to show ctx := context.Background() tf, err := p.client.CrdV1alpha1().Traceflows().Get(ctx, name, v1.GetOptions{}) if err != nil { - log.Printf("Failed to get traceflow CRD \"%s\", err: %s ", name, err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to get traceflow CRD, "+ - "err: %s ", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + alertPrinter(request, invalidInputMsg+"failed to get traceflow CRD "+name, + "Failed to get traceflow CRD", nil, err) return nil } - log.Printf("Get traceflow CRD \"%s\" successfully, Traceflow Results: %+v", name, tf) + log.Printf("Get traceflow CRD \"%s\" successfully, Traceflow Results: %+v\n", name, tf) p.lastTf = tf p.graph, err = graphviz.GenGraph(p.lastTf) if err != nil { - log.Printf("Failed to generate traceflow graph \"%s\", err: %s", name, err) - alert := action.CreateAlert(action.AlertTypeError, fmt.Sprintf("Failed to generate traceflow graph, "+ - "err: %s", err), action.DefaultAlertExpiration) - request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + alertPrinter(request, "Failed to generate traceflow graph "+name, "Failed to generate traceflow graph", nil, err) return nil } return nil default: - log.Fatalf("Failed to find defined handler after receiving action request for %s", pluginName) + log.Fatalf("Failed to find defined handler after receiving action request for %s\n", pluginName) return nil } } +func checkNamespace(request *service.ActionRequest) (string, error) { + srcNamespace, err := request.Payload.String(srcNamespaceCol) + if err != nil { + alertPrinter(request, invalidInputMsg+"failed to get srcNamespace as string", + "Failed to get source Namespace as string", nil, err) + return "", err + } + return srcNamespace, nil +} + +func checkDstType(request *service.ActionRequest) (string, error) { + dstType, err := request.Payload.StringSlice(dstTypeCol) + if err != nil || len(dstType) == 0 { + alertPrinter(request, invalidInputMsg+"failed to get dstType as string slice", + "Invalid destination type choice, please check your input and submit again", nil, err) + return "", err + } + return dstType[0], nil +} + +func checkDst(request *service.ActionRequest) (string, error) { + dst, err := request.Payload.String(dstCol) + if err != nil { + alertPrinter(request, invalidInputMsg+"failed to get dst as string", + "Failed to get destination as string", nil, err) + return "", err + } + return dst, nil +} + +func checkDstNamespace(request *service.ActionRequest) (string, error) { + dstNamespace, err := request.Payload.OptionalString(dstNamespaceCol) + if err != nil { + alertPrinter(request, invalidInputMsg+"failed to get dstNamespace as string", + "Failed to get destination Namespace as string", nil, err) + return "", err + } + return dstNamespace, nil +} + +func checkDestination(request *service.ActionRequest, dst string, dstType string, dstNamespace string, isLiveTraffic bool, isSrcPodType bool) (crdv1alpha1.Destination, error) { + var destination crdv1alpha1.Destination + switch dstType { + case crdv1alpha1.DstTypePod: + err := validatePodName(request, dst, "destination") + if err != nil { + return crdv1alpha1.Destination{}, err + } + err = validateNamespace(request, dstNamespace, "destination") + if err != nil { + return crdv1alpha1.Destination{}, err + } + destination = crdv1alpha1.Destination{ + Namespace: dstNamespace, + Pod: dst, + } + case crdv1alpha1.DstTypeIPv4: + if isLiveTraffic && !isSrcPodType { + alertPrinter(request, invalidInputMsg+"one of source/destination must be a Pod: "+dst+".", + "One of source/destination must be a Pod, please check your input and submit again.", nil, nil) + return crdv1alpha1.Destination{}, errors.New("one of source/destination must be a Pod") + } + err := validateIP(request, dst, "destination") + if err != nil { + return crdv1alpha1.Destination{}, err + } + destination = crdv1alpha1.Destination{ + IP: dst, + } + case crdv1alpha1.DstTypeService: + if isLiveTraffic && !isSrcPodType { + alertPrinter(request, invalidInputMsg+"one of source/destination must be a Pod: "+dst+".", + "One of source/destination must be a Pod, please check your input and submit again.", nil, nil) + return crdv1alpha1.Destination{}, errors.New("one of source/destination must be a Pod") + } + err := validateNamespace(request, dstNamespace, "destination") + if err != nil { + return crdv1alpha1.Destination{}, err + } + if errs := validation.NameIsDNS1035Label(dst, false); len(errs) != 0 { + alertPrinter(request, invalidInputMsg+"failed to validate destination service string: "+dst, + "Invalid destination service string, please check your input and submit again", errs, nil) + return crdv1alpha1.Destination{}, errors.New("invalid destination") + } + destination = crdv1alpha1.Destination{ + Namespace: dstNamespace, + Service: dst, + } + } + return destination, nil + +} + +func checkPorts(request *service.ActionRequest) (bool, bool, uint16, uint16) { + hasSrcPort, hasDstPort := true, true + srcPort, err := request.Payload.Uint16(srcPortCol) + if err != nil { + hasSrcPort = false + } + dstPort, err := request.Payload.Uint16(dstPortCol) + if err != nil { + hasDstPort = false + } + return hasSrcPort, hasDstPort, srcPort, dstPort +} + +func checkProtocol(request *service.ActionRequest) (string, error) { + protocol, err := request.Payload.StringSlice(protocolCol) + if err != nil || len(protocol) == 0 { + alertPrinter(request, invalidInputMsg+"failed to get protocol as string slice", + "Failed to get protocol as string, please check your input and submit again", nil, err) + return "", err + } + return protocol[0], nil +} + +func checkTimeout(request *service.ActionRequest) (bool, uint16) { + hasTimeout := true + timeout, err := request.Payload.Uint16(timeoutCol) + if err != nil { + hasTimeout = false + } + return hasTimeout, timeout +} + +func updateIPHeader(tf *crdv1alpha1.Traceflow, hasSrcPort bool, hasDstPort bool, srcPort uint16, dstPort uint16) { + switch tf.Spec.Packet.IPHeader.Protocol { + case crdv1alpha1.TCPProtocol: + tf.Spec.Packet.TransportHeader.TCP = &crdv1alpha1.TCPHeader{ + Flags: 2, + } + if hasSrcPort { + tf.Spec.Packet.TransportHeader.TCP.SrcPort = int32(srcPort) + } + if hasDstPort { + tf.Spec.Packet.TransportHeader.TCP.DstPort = int32(dstPort) + } + case crdv1alpha1.UDPProtocol: + tf.Spec.Packet.TransportHeader.UDP = &crdv1alpha1.UDPHeader{} + if hasSrcPort { + tf.Spec.Packet.TransportHeader.UDP.SrcPort = int32(srcPort) + } + if hasDstPort { + tf.Spec.Packet.TransportHeader.UDP.DstPort = int32(dstPort) + } + case crdv1alpha1.ICMPProtocol: + tf.Spec.Packet.TransportHeader.ICMP = &crdv1alpha1.ICMPEchoRequestHeader{ + ID: 0, + Sequence: 0, + } + } +} + +func validatePodName(request *service.ActionRequest, podName string, podType string) error { + if errs := validation.NameIsDNSSubdomain(podName, false); len(errs) != 0 { + alertPrinter(request, invalidInputMsg+"failed to validate "+podType+" Pod string "+podName, + "Invalid "+podType+" Pod string, please check your input and submit again", errs, nil) + return errors.New("invalid Pod name") + } + return nil +} + +func validateNamespace(request *service.ActionRequest, namespace string, namespaceType string) error { + if errs := validation.ValidateNamespaceName(namespace, false); len(errs) != 0 { + alertPrinter(request, invalidInputMsg+"failed to validate "+namespaceType+" Namespace string "+namespace, + "Invalid "+namespaceType+" Namespace string, please check your input and submit again", errs, nil) + return errors.New("invalid Namespace") + } + return nil +} + +func validateIP(request *service.ActionRequest, ipStr string, ipType string) error { + s := net.ParseIP(ipStr) + if s == nil { + alertPrinter(request, invalidInputMsg+"failed to get "+ipType+" IP as a valid IPv4 IP.", + "Invalid "+ipType+" IPv4 string, please check your input and submit again.", nil, nil) + return errors.New("invalid IP") + } + if s.To4() == nil { + alertPrinter(request, invalidInputMsg+"failed to get "+ipType+" IP as a valid IPv4 IP.", + "Invalid "+ipType+" IPv4 string, please check your input and submit again.", nil, nil) + return errors.New("invalid IP") + } + return nil +} + +func initTfSpec(tfName string, source crdv1alpha1.Source, destination crdv1alpha1.Destination, protocol string) *crdv1alpha1.Traceflow { + return &crdv1alpha1.Traceflow{ + ObjectMeta: v1.ObjectMeta{ + Name: tfName, + }, + Spec: crdv1alpha1.TraceflowSpec{ + Source: source, + Destination: destination, + Packet: crdv1alpha1.Packet{ + IPHeader: crdv1alpha1.IPHeader{ + Protocol: crdv1alpha1.SupportedProtocols[protocol], + }, + }, + }, + } +} + +func alertPrinter(request *service.ActionRequest, logMsg string, alertMsg string, errs []string, err error) { + var alert action.Alert + if len(errs) > 0 { + log.Printf(logMsg+", err: %s\n", errs) + alert = action.CreateAlert(action.AlertTypeError, fmt.Sprintf(alertMsg+", err: %s", errs), action.DefaultAlertExpiration) + } else if err != nil { + log.Printf(logMsg+", err: %#v\n", err) + alert = action.CreateAlert(action.AlertTypeError, fmt.Sprintf(alertMsg+", err: %#v", err), action.DefaultAlertExpiration) + } else { + log.Println(logMsg) + alert = action.CreateAlert(action.AlertTypeError, alertMsg, action.DefaultAlertExpiration) + } + request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) +} + +func (p *antreaOctantPlugin) createTfCR(tf *crdv1alpha1.Traceflow, request *service.ActionRequest, ctx context.Context, tfName string) { + log.Printf("Get user input successfully, traceflow: %+v\n", tf) + tf, err := p.client.CrdV1alpha1().Traceflows().Create(ctx, tf, v1.CreateOptions{}) + if err != nil { + alertPrinter(request, invalidInputMsg+"Failed to create traceflow CRD "+tfName, + "Failed to create traceflow CRD", nil, err) + return + } + log.Printf("Create traceflow CRD \"%s\" successfully, Traceflow Results: %+v\n", tfName, tf) + alert := action.CreateAlert(action.AlertTypeSuccess, fmt.Sprintf("Traceflow \"%s\" is created successfully", + tfName), action.DefaultAlertExpiration) + request.DashboardClient.SendAlert(request.Context(), request.ClientID, alert) + // Automatically delete the traceflow CRD after created for 300s(5min). + go func(tfName string) { + age := time.Second * 300 + time.Sleep(age) + err := p.client.CrdV1alpha1().Traceflows().Delete(context.Background(), tfName, v1.DeleteOptions{}) + if err != nil { + log.Printf("Failed to delete traceflow CRD \"%s\", err: %s\n", tfName, err) + return + } + log.Printf("Deleted traceflow CRD \"%s\" successfully after %.0f seconds\n", tfName, age.Seconds()) + }(tf.Name) + p.lastTf = tf + p.graph, err = graphviz.GenGraph(p.lastTf) + if err != nil { + alertPrinter(request, invalidInputMsg+"Failed to generate traceflow graph "+tfName, + "Failed to generate traceflow graph", nil, err) + return + } +} + // traceflowHandler handlers the layout of Traceflow page. func (p *antreaOctantPlugin) traceflowHandler(request service.Request) (component.ContentResponse, error) { layout := flexlayout.New() @@ -418,24 +604,97 @@ func (p *antreaOctantPlugin) traceflowHandler(request service.Request) (componen i++ } - form := component.Form{Fields: []component.FormField{ - component.NewFormFieldText(srcNamespaceCol, srcNamespaceCol, ""), - component.NewFormFieldText(srcPodCol, srcPodCol, ""), - component.NewFormFieldNumber(srcPortCol, srcPortCol, ""), - component.NewFormFieldSelect(dstTypeCol, dstTypeCol, dstTypeSelect, false), - component.NewFormFieldText(dstNamespaceCol+" (Not required when destination is an IP)", dstNamespaceCol, ""), - component.NewFormFieldText(dstCol, dstCol, ""), - component.NewFormFieldNumber(dstPortCol, dstPortCol, ""), - component.NewFormFieldSelect(protocolCol, protocolCol, protocolSelect, false), + srcNamespaceField := component.NewFormFieldText(srcNamespaceCol+" (Not required when source is an IP)", srcNamespaceCol, "") + srcPodField := component.NewFormFieldText(srcPodCol, srcPodCol, "") + srcPortField := component.NewFormFieldNumber(srcPortCol, srcPortCol, "") + dstTypeField := component.NewFormFieldSelect(dstTypeCol, dstTypeCol, dstTypeSelect, false) + dstNamespaceField := component.NewFormFieldText(dstNamespaceCol+" (Not required when destination is an IP)", dstNamespaceCol, "") + dstField := component.NewFormFieldText(dstCol, dstCol, "") + dstPortField := component.NewFormFieldNumber(dstPortCol, dstPortCol, "") + protocolField := component.NewFormFieldSelect(protocolCol, protocolCol, protocolSelect, false) + + defaultTimeout := strconv.Itoa(int(crdv1alpha1.DefaultTraceflowTimeout)) + timeoutField := component.NewFormFieldNumber(timeoutCol+" (Default value is "+defaultTimeout+" seconds)", timeoutCol, defaultTimeout) + + tfFields := []component.FormField{ + srcNamespaceField, + srcPodField, + srcPortField, + dstTypeField, + dstNamespaceField, + dstField, + dstPortField, + protocolField, + timeoutField, component.NewFormFieldHidden("action", addTfAction), - }} + } + + form := component.Form{Fields: tfFields} addTf := component.Action{ Name: "Start New Trace", Title: "Start New Trace", Form: form, } + + dropOnlySelect := []component.InputChoice{ + {Label: "Yes", Value: "Yes", Checked: false}, + {Label: "No", Value: "No", Checked: true}, + } + // only Pod and IPv4 are supported for source in live traffic trace flow + srcTypeSelect := make([]component.InputChoice, 2) + for i, t := range []string{"Pod", "IPv4"} { + srcTypeSelect[i] = component.InputChoice{ + Label: t, + Value: t, + Checked: false, + } + // Set the default source type. + if t == "Pod" { + srcTypeSelect[i].Checked = true + } + } + srcTypeField := component.NewFormFieldSelect(srcTypeCol, srcTypeCol, srcTypeSelect, false) + srcField := component.NewFormFieldText(srcCol, srcCol, "") + dropOnlyField := component.NewFormFieldSelect(dropOnlyCol+" (Only capture packets dropped by NetworkPolicies)", dropOnlyCol, dropOnlySelect, false) + + liveTfFields := []component.FormField{ + srcNamespaceField, + srcTypeField, + srcField, + srcPortField, + dstTypeField, + dstNamespaceField, + dstField, + dstPortField, + protocolField, + timeoutField, + dropOnlyField, + component.NewFormFieldHidden("action", addLiveTfAction), + } + + liveForm := component.Form{Fields: liveTfFields} + addLiveTf := component.Action{ + Name: "Start New Live-traffic Trace", + Title: "Start New Live-traffic Trace", + Form: liveForm, + } + + // Construct the available list of traceflow CRD. + tfsItems := p.getSortedTfItems() + traceflowSelect := make([]component.InputChoice, len(tfsItems)) + for i, t := range tfsItems { + traceflowSelect[i] = component.InputChoice{ + Label: t.Name, + Value: t.Name, + Checked: false, + } + } + if len(tfsItems) > 0 { + traceflowSelect[0].Checked = true + } + graphForm := component.Form{Fields: []component.FormField{ - component.NewFormFieldText(traceNameCol, traceNameCol, ""), + component.NewFormFieldSelect(traceNameCol, traceNameCol, traceflowSelect, false), component.NewFormFieldHidden("action", showGraphAction), }} genGraph := component.Action{ @@ -445,32 +704,33 @@ func (p *antreaOctantPlugin) traceflowHandler(request service.Request) (componen } card.SetBody(component.NewText("")) card.AddAction(addTf) + card.AddAction(addLiveTf) card.AddAction(genGraph) graphCard := component.NewCard(component.TitleFromString("Antrea Traceflow Graph")) if p.lastTf.Name != "" { // Invoke GenGraph to show - log.Printf("Generating content from CRD...") + log.Printf("Generating content from CRD...\n") ctx := context.Background() tf, err := p.client.CrdV1alpha1().Traceflows().Get(ctx, p.lastTf.Name, v1.GetOptions{}) if err != nil { - log.Printf("Failed to get latest CRD, using traceflow results cache, last traceflow name: %s, err: %s", p.lastTf.Name, err) + log.Printf("Failed to get latest CRD, using traceflow results cache, last traceflow name: %s, err: %s\n", p.lastTf.Name, err) p.graph, err = graphviz.GenGraph(p.lastTf) if err != nil { - log.Printf("Failed to generate traceflow graph \"%s\", err: %s", p.lastTf.Name, err) + log.Printf("Failed to generate traceflow graph \"%s\", err: %s\n", p.lastTf.Name, err) return component.EmptyContentResponse, nil } - log.Printf("Generated content from CRD cache successfully, last traceflow name: %s", p.lastTf.Name) + log.Printf("Generated content from CRD cache successfully, last traceflow name: %s\n", p.lastTf.Name) } else { p.lastTf = tf p.graph, err = graphviz.GenGraph(p.lastTf) if err != nil { - log.Printf("Failed to generate traceflow graph \"%s\", err: %s", p.lastTf.Name, err) + log.Printf("Failed to generate traceflow graph \"%s\", err: %s\n", p.lastTf.Name, err) return component.EmptyContentResponse, nil } - log.Printf("Generated content from latest CRD successfully, last traceflow name %s", p.lastTf.Name) + log.Printf("Generated content from latest CRD successfully, last traceflow name %s\n", p.lastTf.Name) } - log.Printf("Traceflow Results: %+v", p.lastTf) + log.Printf("Traceflow Results: %+v\n", p.lastTf) } if p.graph != "" { graphCard.SetBody(component.NewGraphviz(p.graph)) @@ -480,13 +740,13 @@ func (p *antreaOctantPlugin) traceflowHandler(request service.Request) (componen listSection := layout.AddSection() err := listSection.Add(card, component.WidthFull) if err != nil { - log.Printf("Failed to add card to section: %s", err) + log.Printf("Failed to add card to section: %s\n", err) return component.EmptyContentResponse, nil } if p.graph != "" { err = listSection.Add(graphCard, component.WidthFull) if err != nil { - log.Printf("Failed to add graphCard to section: %s", err) + log.Printf("Failed to add graphCard to section: %s\n", err) return component.EmptyContentResponse, nil } } @@ -507,17 +767,9 @@ func (p *antreaOctantPlugin) traceflowHandler(request service.Request) (componen // getTfTable gets the table for displaying Traceflow information func (p *antreaOctantPlugin) getTfTable(request service.Request) *component.Table { - ctx := context.Background() - tfs, err := p.client.CrdV1alpha1().Traceflows().List(ctx, v1.ListOptions{ResourceVersion: "0"}) - if err != nil { - log.Fatalf("Failed to get Traceflows %v", err) - return nil - } - sort.Slice(tfs.Items, func(p, q int) bool { - return tfs.Items[p].CreationTimestamp.Unix() > tfs.Items[q].CreationTimestamp.Unix() - }) + tfsItems := p.getSortedTfItems() tfRows := make([]component.TableRow, 0) - for _, tf := range tfs.Items { + for _, tf := range tfsItems { tfRows = append(tfRows, component.TableRow{ tfNameCol: component.NewLink(tf.Name, tf.Name, octantTraceflowCRDPath+tf.Name), srcNamespaceCol: component.NewText(tf.Spec.Source.Namespace), @@ -533,3 +785,16 @@ func (p *antreaOctantPlugin) getTfTable(request service.Request) *component.Tabl tfCols := component.NewTableCols(tfNameCol, srcNamespaceCol, srcPodCol, dstNamespaceCol, dstTypeCol, dstCol, protocolCol, phaseCol, ageCol) return component.NewTableWithRows(traceflowTitle, "We couldn't find any traceflows!", tfCols, tfRows) } + +func (p *antreaOctantPlugin) getSortedTfItems() []crdv1alpha1.Traceflow { + ctx := context.Background() + tfs, err := p.client.CrdV1alpha1().Traceflows().List(ctx, v1.ListOptions{ResourceVersion: "0"}) + if err != nil { + log.Fatalf("Failed to get traceflows: %v\n", err) + return nil + } + sort.Slice(tfs.Items, func(p, q int) bool { + return tfs.Items[p].CreationTimestamp.Unix() > tfs.Items[q].CreationTimestamp.Unix() + }) + return tfs.Items +}