Skip to content

Commit

Permalink
Use default port and protocol when dst is service in Traceflow
Browse files Browse the repository at this point in the history
If destination protocol/port isn't provided when destination is service
in antrea traceflow, it just uses destinaton service's IP(clusterIP) and
runs traceflow as Pod-to-IP(icmp) case which is incorrect.

To fix this, we use the first protocol and port from the service's 'ports'
list as default value.

Fixes antrea-io#6594

Signed-off-by: Kumar Atish <[email protected]>
  • Loading branch information
Atish-iaf committed Aug 11, 2024
1 parent c8f8593 commit fe7d756
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
14 changes: 13 additions & 1 deletion pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"antrea.io/libOpenflow/protocol"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -491,6 +492,14 @@ func (c *Controller) preparePacket(tf *crdv1beta1.Traceflow, intf *interfacestor
} else if packet.DestinationIP.To4() != nil {
return nil, errors.New("destination Service does not have an IPv6 ClusterIP")
}
switch dstSvc.Spec.Ports[0].Protocol {
case corev1.ProtocolTCP:
packet.IPProto = protocol.Type_TCP
packet.TCPFlags = uint8(2)
case corev1.ProtocolUDP:
packet.IPProto = protocol.Type_UDP
}
packet.DestinationPort = uint16(dstSvc.Spec.Ports[0].Port)
} else if !liveTraffic {
return nil, errors.New("destination is not specified")
}
Expand All @@ -507,7 +516,9 @@ func (c *Controller) preparePacket(tf *crdv1beta1.Traceflow, intf *interfacestor
packet.IPFlags = 0
}
} else if tf.Spec.Packet.IPHeader != nil {
packet.IPProto = uint8(tf.Spec.Packet.IPHeader.Protocol)
if tf.Spec.Packet.IPHeader.Protocol > 0 {
packet.IPProto = uint8(tf.Spec.Packet.IPHeader.Protocol)
}
if !liveTraffic {
packet.TTL = uint8(tf.Spec.Packet.IPHeader.TTL)
packet.IPFlags = uint16(tf.Spec.Packet.IPHeader.Flags)
Expand All @@ -534,6 +545,7 @@ func (c *Controller) preparePacket(tf *crdv1beta1.Traceflow, intf *interfacestor
}
} else if tf.Spec.Packet.TransportHeader.UDP != nil {
packet.IPProto = protocol.Type_UDP
packet.TCPFlags = uint8(0)
packet.SourcePort = uint16(tf.Spec.Packet.TransportHeader.UDP.SrcPort)
packet.DestinationPort = uint16(tf.Spec.Packet.TransportHeader.UDP.DstPort)
} else if tf.Spec.Packet.TransportHeader.ICMP != nil {
Expand Down
93 changes: 92 additions & 1 deletion pkg/agent/controller/traceflow/traceflow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
Expand All @@ -48,6 +49,7 @@ import (
var (
pod1IPv4 = "192.168.10.10"
pod2IPv4 = "192.168.11.10"
svc1IPv4 = "10.96.0.1"
dstIPv4 = "192.168.99.99"
pod1MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:0f")
pod2MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:00")
Expand Down Expand Up @@ -79,6 +81,23 @@ var (
Namespace: "default",
},
}

svc1 = v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "svc-1",
Namespace: "default",
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Port: 80,
Protocol: v1.ProtocolTCP,
},
},
Type: v1.ServiceTypeClusterIP,
ClusterIP: svc1IPv4,
},
}
)

type fakeTraceflowController struct {
Expand All @@ -88,13 +107,16 @@ type fakeTraceflowController struct {
mockOFClient *openflowtest.MockClient
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
networkPolicyQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier
egressQuerier *queriertest.MockEgressQuerier
}

func newFakeTraceflowController(t *testing.T, initObjects []runtime.Object, networkConfig *config.NetworkConfig, nodeConfig *config.NodeConfig) *fakeTraceflowController {
controller := gomock.NewController(t)
kubeClient := fake.NewSimpleClientset(&pod1, &pod2, &pod3)
kubeClient := fake.NewSimpleClientset(&pod1, &pod2, &pod3, &svc1)
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
serviceInformer := informerFactory.Core().V1().Services()
mockOFClient := openflowtest.NewMockClient(controller)
crdClient := fakeversioned.NewSimpleClientset(initObjects...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0)
Expand All @@ -110,6 +132,8 @@ func newFakeTraceflowController(t *testing.T, initObjects []runtime.Object, netw

tfController := &Controller{
kubeClient: kubeClient,
serviceLister: serviceInformer.Lister(),
serviceListerSynced: serviceInformer.Informer().HasSynced,
crdClient: crdClient,
traceflowInformer: traceflowInformer,
traceflowLister: traceflowInformer.Lister(),
Expand All @@ -132,6 +156,7 @@ func newFakeTraceflowController(t *testing.T, initObjects []runtime.Object, netw
mockOFClient: mockOFClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
networkPolicyQuerier: npQuerier,
egressQuerier: egressQuerier,
}
Expand Down Expand Up @@ -473,11 +498,77 @@ func TestPreparePacket(t *testing.T) {
IPProto: protocol.Type_IPv6ICMP,
},
},
{
name: "Pod-to-service packet without port and protocol",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf7", UID: "uid7"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: svc1.Namespace,
Service: svc1.Name,
},
Packet: crdv1beta1.Packet{
IPHeader: &crdv1beta1.IPHeader{},
},
},
},
expectedPacket: &binding.Packet{
SourceIP: net.ParseIP(pod1IPv4),
SourceMAC: pod1MAC,
DestinationIP: net.ParseIP(svc1IPv4).To4(),
DestinationPort: 80,
IPProto: protocol.Type_TCP,
TTL: 64,
TCPFlags: uint8(2),
},
},
{
name: "Pod-to-service packet with port and protocol",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf8", UID: "uid8"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: svc1.Namespace,
Service: svc1.Name,
},
Packet: crdv1beta1.Packet{
IPHeader: &crdv1beta1.IPHeader{
Protocol: 17,
},
TransportHeader: crdv1beta1.TransportHeader{
UDP: &crdv1beta1.UDPHeader{
DstPort: 8080,
},
},
},
},
},
expectedPacket: &binding.Packet{
SourceIP: net.ParseIP(pod1IPv4),
SourceMAC: pod1MAC,
DestinationIP: net.ParseIP(svc1IPv4).To4(),
DestinationPort: 8080,
IPProto: protocol.Type_UDP,
TTL: 64,
},
},
}

for _, tt := range tcs {
t.Run(tt.name, func(t *testing.T) {
tfc := newFakeTraceflowController(t, []runtime.Object{tt.tf}, nil, nil)
stopCh := make(chan struct{})
defer close(stopCh)
tfc.informerFactory.Start(stopCh)
tfc.informerFactory.WaitForCacheSync(stopCh)
podInterfaces := tfc.interfaceStore.GetContainerInterfacesByPod(pod1.Name, pod1.Namespace)
if tt.intf != nil {
podInterfaces[0] = tt.intf
Expand Down

0 comments on commit fe7d756

Please sign in to comment.