From fe7d7562d5de730017655a4855514fc89fbae901 Mon Sep 17 00:00:00 2001 From: Kumar Atish Date: Thu, 8 Aug 2024 19:14:18 +0530 Subject: [PATCH] Use default port and protocol when dst is service in Traceflow If destination protocol/port isn't provided when destination is service in antrea traceflow, it just uses destinaton service's IP(clusterIP) and runs traceflow as Pod-to-IP(icmp) case which is incorrect. To fix this, we use the first protocol and port from the service's 'ports' list as default value. Fixes #6594 Signed-off-by: Kumar Atish --- .../traceflow/traceflow_controller.go | 14 ++- .../traceflow/traceflow_controller_test.go | 93 ++++++++++++++++++- 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/pkg/agent/controller/traceflow/traceflow_controller.go b/pkg/agent/controller/traceflow/traceflow_controller.go index 2835ac1efbb..b7c265576d4 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller.go +++ b/pkg/agent/controller/traceflow/traceflow_controller.go @@ -24,6 +24,7 @@ import ( "time" "antrea.io/libOpenflow/protocol" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -491,6 +492,14 @@ func (c *Controller) preparePacket(tf *crdv1beta1.Traceflow, intf *interfacestor } else if packet.DestinationIP.To4() != nil { return nil, errors.New("destination Service does not have an IPv6 ClusterIP") } + switch dstSvc.Spec.Ports[0].Protocol { + case corev1.ProtocolTCP: + packet.IPProto = protocol.Type_TCP + packet.TCPFlags = uint8(2) + case corev1.ProtocolUDP: + packet.IPProto = protocol.Type_UDP + } + packet.DestinationPort = uint16(dstSvc.Spec.Ports[0].Port) } else if !liveTraffic { return nil, errors.New("destination is not specified") } @@ -507,7 +516,9 @@ func (c *Controller) preparePacket(tf *crdv1beta1.Traceflow, intf *interfacestor packet.IPFlags = 0 } } else if tf.Spec.Packet.IPHeader != nil { - packet.IPProto = uint8(tf.Spec.Packet.IPHeader.Protocol) + if tf.Spec.Packet.IPHeader.Protocol > 0 { + packet.IPProto = uint8(tf.Spec.Packet.IPHeader.Protocol) + } if !liveTraffic { packet.TTL = uint8(tf.Spec.Packet.IPHeader.TTL) packet.IPFlags = uint16(tf.Spec.Packet.IPHeader.Flags) @@ -534,6 +545,7 @@ func (c *Controller) preparePacket(tf *crdv1beta1.Traceflow, intf *interfacestor } } else if tf.Spec.Packet.TransportHeader.UDP != nil { packet.IPProto = protocol.Type_UDP + packet.TCPFlags = uint8(0) packet.SourcePort = uint16(tf.Spec.Packet.TransportHeader.UDP.SrcPort) packet.DestinationPort = uint16(tf.Spec.Packet.TransportHeader.UDP.DstPort) } else if tf.Spec.Packet.TransportHeader.ICMP != nil { diff --git a/pkg/agent/controller/traceflow/traceflow_controller_test.go b/pkg/agent/controller/traceflow/traceflow_controller_test.go index 7e9dd211736..2b2040e9f19 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller_test.go +++ b/pkg/agent/controller/traceflow/traceflow_controller_test.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/util/workqueue" @@ -48,6 +49,7 @@ import ( var ( pod1IPv4 = "192.168.10.10" pod2IPv4 = "192.168.11.10" + svc1IPv4 = "10.96.0.1" dstIPv4 = "192.168.99.99" pod1MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:0f") pod2MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:00") @@ -79,6 +81,23 @@ var ( Namespace: "default", }, } + + svc1 = v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-1", + Namespace: "default", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Port: 80, + Protocol: v1.ProtocolTCP, + }, + }, + Type: v1.ServiceTypeClusterIP, + ClusterIP: svc1IPv4, + }, + } ) type fakeTraceflowController struct { @@ -88,13 +107,16 @@ type fakeTraceflowController struct { mockOFClient *openflowtest.MockClient crdClient *fakeversioned.Clientset crdInformerFactory crdinformers.SharedInformerFactory + informerFactory informers.SharedInformerFactory networkPolicyQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier egressQuerier *queriertest.MockEgressQuerier } func newFakeTraceflowController(t *testing.T, initObjects []runtime.Object, networkConfig *config.NetworkConfig, nodeConfig *config.NodeConfig) *fakeTraceflowController { controller := gomock.NewController(t) - kubeClient := fake.NewSimpleClientset(&pod1, &pod2, &pod3) + kubeClient := fake.NewSimpleClientset(&pod1, &pod2, &pod3, &svc1) + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + serviceInformer := informerFactory.Core().V1().Services() mockOFClient := openflowtest.NewMockClient(controller) crdClient := fakeversioned.NewSimpleClientset(initObjects...) crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) @@ -110,6 +132,8 @@ func newFakeTraceflowController(t *testing.T, initObjects []runtime.Object, netw tfController := &Controller{ kubeClient: kubeClient, + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, crdClient: crdClient, traceflowInformer: traceflowInformer, traceflowLister: traceflowInformer.Lister(), @@ -132,6 +156,7 @@ func newFakeTraceflowController(t *testing.T, initObjects []runtime.Object, netw mockOFClient: mockOFClient, crdClient: crdClient, crdInformerFactory: crdInformerFactory, + informerFactory: informerFactory, networkPolicyQuerier: npQuerier, egressQuerier: egressQuerier, } @@ -473,11 +498,77 @@ func TestPreparePacket(t *testing.T) { IPProto: protocol.Type_IPv6ICMP, }, }, + { + name: "Pod-to-service packet without port and protocol", + tf: &crdv1beta1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf7", UID: "uid7"}, + Spec: crdv1beta1.TraceflowSpec{ + Source: crdv1beta1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1beta1.Destination{ + Namespace: svc1.Namespace, + Service: svc1.Name, + }, + Packet: crdv1beta1.Packet{ + IPHeader: &crdv1beta1.IPHeader{}, + }, + }, + }, + expectedPacket: &binding.Packet{ + SourceIP: net.ParseIP(pod1IPv4), + SourceMAC: pod1MAC, + DestinationIP: net.ParseIP(svc1IPv4).To4(), + DestinationPort: 80, + IPProto: protocol.Type_TCP, + TTL: 64, + TCPFlags: uint8(2), + }, + }, + { + name: "Pod-to-service packet with port and protocol", + tf: &crdv1beta1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf8", UID: "uid8"}, + Spec: crdv1beta1.TraceflowSpec{ + Source: crdv1beta1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1beta1.Destination{ + Namespace: svc1.Namespace, + Service: svc1.Name, + }, + Packet: crdv1beta1.Packet{ + IPHeader: &crdv1beta1.IPHeader{ + Protocol: 17, + }, + TransportHeader: crdv1beta1.TransportHeader{ + UDP: &crdv1beta1.UDPHeader{ + DstPort: 8080, + }, + }, + }, + }, + }, + expectedPacket: &binding.Packet{ + SourceIP: net.ParseIP(pod1IPv4), + SourceMAC: pod1MAC, + DestinationIP: net.ParseIP(svc1IPv4).To4(), + DestinationPort: 8080, + IPProto: protocol.Type_UDP, + TTL: 64, + }, + }, } for _, tt := range tcs { t.Run(tt.name, func(t *testing.T) { tfc := newFakeTraceflowController(t, []runtime.Object{tt.tf}, nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + tfc.informerFactory.Start(stopCh) + tfc.informerFactory.WaitForCacheSync(stopCh) podInterfaces := tfc.interfaceStore.GetContainerInterfacesByPod(pod1.Name, pod1.Namespace) if tt.intf != nil { podInterfaces[0] = tt.intf