diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 66247216af2..ed3f4dbd93c 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -46,7 +46,7 @@ featureGates: # IPAM when configuring secondary network interfaces with Multus. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "AntreaIPAM" "default" false) }} -# Enable multicast traffic. This feature is supported only with noEncap mode. +# Enable multicast traffic. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }} # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. diff --git a/build/charts/antrea/conf/antrea-controller.conf b/build/charts/antrea/conf/antrea-controller.conf index 440952a8757..745e8df9496 100644 --- a/build/charts/antrea/conf/antrea-controller.conf +++ b/build/charts/antrea/conf/antrea-controller.conf @@ -17,7 +17,7 @@ featureGates: # Enable collecting and exposing NetworkPolicy statistics. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NetworkPolicyStats" "default" true) }} -# Enable multicast traffic. This feature is supported only with noEncap mode. +# Enable multicast traffic. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }} # Enable controlling SNAT IPs of Pod egress traffic. diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 9488aecd104..ed02b623473 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-agent @@ -3932,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 25e27b37618..8f178089294 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-agent @@ -3934,7 +3934,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 3a5dbc75ac4..7822ecf0c82 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 + checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 labels: app: antrea component: antrea-agent @@ -3932,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 + checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 06a718aec3c..04fd7cbf74a 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2596,7 +2596,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2885,7 +2885,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3705,7 +3705,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 + checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -3991,7 +3991,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 + checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 69e328f1dd2..ba216d1e056 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 + checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa labels: app: antrea component: antrea-agent @@ -3932,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 + checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 901d48cae7f..8432c78808a 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -250,6 +250,10 @@ func run(o *Options) error { ipsecCertController = ipseccertificate.NewIPSecCertificateController(k8sClient, ovsBridgeClient, nodeConfig.Name) } + // podUpdateChannel is a channel for receiving Pod updates from CNIServer and + // notifying NetworkPolicyController and EgressController to reconcile rules + // related to the updated Pods. + nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100) nodeRouteController := noderoute.NewNodeRouteController( k8sClient, informerFactory, @@ -262,6 +266,7 @@ func run(o *Options) error { agentInitializer.GetWireGuardClient(), o.config.AntreaProxy.ProxyAll, ipsecCertController, + nodeUpdateChannel, ) var mcRouteController *mcroute.MCRouteController @@ -521,6 +526,8 @@ func run(o *Options) error { go podUpdateChannel.Run(stopCh) + go nodeUpdateChannel.Run(stopCh) + go routeClient.Run(stopCh) go cniServer.Run(stopCh) @@ -652,7 +659,9 @@ func run(o *Options) error { ovsBridgeClient, podUpdateChannel, o.igmpQueryInterval, - validator) + validator, + networkConfig.TrafficEncapMode.SupportsEncap(), + nodeUpdateChannel) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 61105539235..2f41d79731b 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -40,6 +40,7 @@ import ( "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/agent/wireguard" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/runtime" @@ -84,6 +85,7 @@ type Controller struct { // or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate // to be configured before installing routes/flows to peer Nodes to prevent unencrypted traffic across Nodes. ipsecCertificateManager ipseccertificate.Manager + nodeUpdateSubscriber channel.Notifier } // NewNodeRouteController instantiates a new Controller object which will process Node events @@ -100,6 +102,7 @@ func NewNodeRouteController( wireguardClient wireguard.Interface, proxyAll bool, ipsecCertificateManager ipseccertificate.Manager, + nodeUpdateSubscriber channel.Notifier, ) *Controller { nodeInformer := informerFactory.Core().V1().Nodes() svcLister := informerFactory.Core().V1().Services() @@ -120,6 +123,7 @@ func NewNodeRouteController( wireGuardClient: wireguardClient, proxyAll: proxyAll, ipsecCertificateManager: ipsecCertificateManager, + nodeUpdateSubscriber: nodeUpdateSubscriber, } nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -325,6 +329,7 @@ func (c *Controller) reconcile() error { if err := c.removeStaleWireGuardPeers(); err != nil { return fmt.Errorf("error when removing stale WireGuard peers: %v", err) } + c.notifyNodeList() return nil } @@ -419,6 +424,7 @@ func (c *Controller) processNextWorkItem() bool { // If no error occurs we Forget this item so it does not get queued again until // another change happens. c.queue.Forget(key) + c.notifyNodeList() } else { // Put the item back on the workqueue to handle any transient errors. c.queue.AddRateLimited(key) @@ -471,6 +477,9 @@ func (c *Controller) deleteNodeRoute(nodeName string) error { if err := c.ofClient.UninstallNodeFlows(nodeName); err != nil { return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err) } + if err := c.routeClient.DeleteNodeIPs(nodeRouteInfo.nodeIPs.IPv4, nodeRouteInfo.nodeIPs.IPv6); err != nil { + return fmt.Errorf("failed to delete Node %s from ipset: %v", nodeName, err) + } c.installedNodes.Delete(obj) if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec { @@ -618,6 +627,10 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error { } } + if err := c.routeClient.AddNodeIPs(peerNodeIPs.IPv4, peerNodeIPs.IPv6); err != nil { + return fmt.Errorf("failed to add Node %s in ipset: %v", nodeName, err) + } + c.installedNodes.Add(&nodeRouteInfo{ nodeName: nodeName, podCIDRs: peerPodCIDRs, @@ -819,3 +832,12 @@ func (c *Controller) getNodeTransportAddrs(node *corev1.Node) (*utilip.DualStack } return peerNodeIPs, nil } + +func (c *Controller) notifyNodeList() { + nodesMap := make(map[string]*utilip.DualStackIPs) + for _, n := range c.installedNodes.List() { + node := n.(*nodeRouteInfo) + nodesMap[node.nodeName] = node.nodeIPs + } + c.nodeUpdateSubscriber.Notify(nodesMap) +} diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index fd70d204270..d90183a93ad 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -35,6 +35,7 @@ import ( "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/channel" utilip "antrea.io/antrea/pkg/util/ip" ) @@ -75,10 +76,11 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig) (*fakeCont routeClient := routetest.NewMockInterface(ctrl) interfaceStore := interfacestore.NewInterfaceStore() ipsecCertificateManager := &fakeIPsecCertificateManager{} + nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100) c := NewNodeRouteController(clientset, informerFactory, ofClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{ IPv4: nil, MAC: gatewayMAC, - }}, nil, false, ipsecCertificateManager) + }}, nil, false, ipsecCertificateManager, nodeUpdateChannel) return &fakeController{ Controller: c, clientset: clientset, @@ -145,6 +147,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.routeClient.EXPECT().AddNodeIPs(nodeIP1, nil).Times(1) c.processNextWorkItem() // Since node1 is not deleted yet, routes and flows for node2 shouldn't be installed as its PodCIDR is duplicate. @@ -155,11 +158,13 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { c.clientset.CoreV1().Nodes().Delete(context.TODO(), node1.Name, metav1.DeleteOptions{}) c.ofClient.EXPECT().UninstallNodeFlows("node1").Times(1) c.routeClient.EXPECT().DeleteRoutes(podCIDR).Times(1) + c.routeClient.EXPECT().DeleteNodeIPs(nodeIP1, nil).Times(1) c.processNextWorkItem() // After node1 is deleted, routes and flows should be installed for node2 successfully. c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) c.routeClient.EXPECT().AddRoutes(podCIDR, "node2", nodeIP2, podCIDRGateway).Times(1) + c.routeClient.EXPECT().AddNodeIPs(nodeIP2, nil).Times(1) c.processNextWorkItem() }() @@ -222,11 +227,13 @@ func TestIPInPodSubnets(t *testing.T) { c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.routeClient.EXPECT().AddNodeIPs(nodeIP1, nil).Times(1) c.processNextWorkItem() c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1) + c.routeClient.EXPECT().AddNodeIPs(nodeIP2, nil).Times(1) c.processNextWorkItem() assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.1.1"))) diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 12a14bbcb16..9e481cfea5b 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -35,6 +35,7 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + utilip "antrea.io/antrea/pkg/util/ip" ) type eventType uint8 @@ -61,13 +62,18 @@ type mcastGroupEvent struct { eType eventType time time.Time iface *interfacestore.InterfaceConfig + // srcNode is the Node IP where the IGMP report message is sent from. It is set only with encap mode. + srcNode net.IP } type GroupMemberStatus struct { group net.IP // localMembers is a map for the local Pod member and its last update time, key is the Pod's interface name, // and value is its last update time. - localMembers map[string]time.Time + localMembers map[string]time.Time + // remoteMembers is a set for Nodes which have joined the multicast group in the cluster. The Node's IP is + // added in the set. + remoteMembers sets.String lastIGMPReport time.Time ofGroupID binding.GroupIDType } @@ -91,11 +97,12 @@ func (c *Controller) eventHandler(stopCh <-chan struct{}) { // addGroupMemberStatus adds the new group into groupCache. func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { status := &GroupMemberStatus{ - group: e.group, - lastIGMPReport: e.time, - localMembers: map[string]time.Time{e.iface.InterfaceName: e.time}, - ofGroupID: c.v4GroupAllocator.Allocate(), + group: e.group, + ofGroupID: c.v4GroupAllocator.Allocate(), + remoteMembers: sets.NewString(), + localMembers: make(map[string]time.Time), } + status = addGroupMember(status, e) c.groupCache.Add(status) c.queue.Add(e.group.String()) klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName) @@ -112,17 +119,17 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent newStatus := &GroupMemberStatus{ group: status.group, localMembers: make(map[string]time.Time), + remoteMembers: status.remoteMembers, lastIGMPReport: status.lastIGMPReport, ofGroupID: status.ofGroupID, } for m, t := range status.localMembers { newStatus.localMembers[m] = t } - _, exist := status.localMembers[e.iface.InterfaceName] + exist := memberExists(status, e) switch e.eType { case groupJoin: - newStatus.lastIGMPReport = e.time - newStatus.localMembers[e.iface.InterfaceName] = e.time + newStatus = addGroupMember(newStatus, e) c.groupCache.Update(newStatus) if !exist { klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) @@ -130,18 +137,21 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent } case groupLeave: if exist { - delete(newStatus.localMembers, e.iface.InterfaceName) + newStatus = deleteGroupMember(newStatus, e) c.groupCache.Update(newStatus) - klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) - // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are - // other local members in the multicast group. - if !found || len(newStatus.localMembers) > 0 { - c.queue.Add(newStatus.group.String()) + if e.iface.Type == interfacestore.ContainerInterface { + _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) + // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are + // other local members in the multicast group. + if !found || len(newStatus.localMembers) > 0 { + c.queue.Add(newStatus.group.String()) + } else { + // Check if all local members have left the multicast group. + klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.checkLastMember(e.group) + } } else { - // Check if all local members have left the multicast group. - klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - c.checkLastMember(e.group) + c.queue.Add(newStatus.group.String()) } } } @@ -175,6 +185,7 @@ func (c *Controller) clearStaleGroups() { if now.Sub(lastUpdate) > c.mcastGroupTimeout { ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: member, + Type: interfacestore.ContainerInterface, } event := &mcastGroupEvent{ group: status.group, @@ -200,6 +211,7 @@ func (c *Controller) removeLocalInterface(podEvent types.PodUpdate) { interfaceName := util.GenerateContainerInterfaceName(podEvent.PodName, podEvent.PodNamespace, podEvent.ContainerID) ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: interfaceName, + Type: interfacestore.ContainerInterface, } groupStatuses := c.getGroupMemberStatusesByPod(interfaceName) for _, g := range groupStatuses { @@ -223,16 +235,23 @@ type Controller struct { groupCache cache.Indexer queue workqueue.RateLimitingInterface // installedGroups saves the groups which are configured on both OVS and the host. - installedGroups sets.String - installedGroupsMutex sync.RWMutex - mRouteClient *MRouteClient - ovsBridgeClient ovsconfig.OVSBridgeClient + installedGroups sets.String + installedGroupsMutex sync.RWMutex + installedLocalGroups sets.String + installedLocalGroupsMutex sync.RWMutex + mRouteClient *MRouteClient + ovsBridgeClient ovsconfig.OVSBridgeClient // queryInterval is the interval to send IGMP query messages. queryInterval time.Duration // mcastGroupTimeout is the timeout to detect a group as stale if no IGMP report is received within the time. mcastGroupTimeout time.Duration // the group ID in OVS for group which IGMP queries are sent to queryGroupId binding.GroupIDType + // nodeGroupID is the OpenFlow group ID in OVS which is used to send IGMP report messages to other Nodes. + nodeGroupID binding.GroupIDType + // installedNodes is the installed Node set that the IGMP report message is sent to. + installedNodes sets.String + encapEnabled bool } func NewMulticastController(ofClient openflow.Client, @@ -244,28 +263,37 @@ func NewMulticastController(ofClient openflow.Client, ovsBridgeClient ovsconfig.OVSBridgeClient, podUpdateSubscriber channel.Subscriber, igmpQueryInterval time.Duration, - validator types.McastNetworkPolicyController) *Controller { + validator types.McastNetworkPolicyController, + isEncap bool, + nodeUpdateChannel channel.Subscriber) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) - groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator) + groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator, isEncap) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) + multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap) c := &Controller{ - ofClient: ofClient, - ifaceStore: ifaceStore, - v4GroupAllocator: v4GroupAllocator, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, - queryInterval: igmpQueryInterval, - mcastGroupTimeout: igmpQueryInterval * 3, - queryGroupId: v4GroupAllocator.Allocate(), + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + installedLocalGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, + queryInterval: igmpQueryInterval, + mcastGroupTimeout: igmpQueryInterval * 3, + queryGroupId: v4GroupAllocator.Allocate(), + encapEnabled: isEncap, + } + if isEncap { + c.nodeGroupID = v4GroupAllocator.Allocate() + c.installedNodes = sets.NewString() + nodeUpdateChannel.Subscribe(c.nodeChanged) } podUpdateSubscriber.Subscribe(c.memberChanged) return c @@ -290,6 +318,16 @@ func (c *Controller) Initialize() error { if err != nil { return err } + if c.encapEnabled { + // Install OpenFlow group for a new multicast group which has local Pod receivers joined. + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, nil); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + } + if err := c.ofClient.InstallMulticastToRemoteFlows(c.nodeGroupID); err != nil { + klog.ErrorS(err, "Failed to install OpenFlow group and flow to send IGMP report to other Nodes") + return err + } + } return nil } @@ -301,6 +339,10 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } }, c.queryInterval, stopCh) + if c.encapEnabled { + go wait.NonSlidingUntil(c.syncLocalGroupsToOtherNodes, c.queryInterval, stopCh) + } + // Periodically check the group member status, and remove the groups in which no members exist go wait.NonSlidingUntil(c.clearStaleGroups, c.queryInterval, stopCh) go c.eventHandler(stopCh) @@ -380,54 +422,107 @@ func (c *Controller) syncGroup(groupKey string) error { } memberPorts = append(memberPorts, uint32(obj.OFPort)) } + var remoteNodeReceivers []net.IP + if c.encapEnabled { + remoteNodeReceivers = make([]net.IP, len(status.remoteMembers)) + for i := range status.remoteMembers.List() { + nodeIPStr := status.remoteMembers.List()[i] + remoteNodeReceivers[i] = net.ParseIP(nodeIPStr) + } + } + installLocalMulticastGroup := func() error { + if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { + klog.ErrorS(err, "Failed to install multicast group identified with local members", "group", groupKey) + return err + } + if c.encapEnabled { + if err := c.igmpSnooper.sendIGMPJoinReport([]net.IP{status.group}); err != nil { + klog.ErrorS(err, "Failed to sync local multicast group to other Nodes", "group", groupKey) + } + } + c.addInstalledLocalGroup(groupKey) + klog.InfoS("New local multicast group is added", "group", groupKey) + return nil + } + deleteLocalMulticastGroup := func() error { + err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) + if err != nil { + klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) + return err + } + klog.InfoS("Remove multicast route entry", "group", status.group) + err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) + if err != nil { + klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) + return err + } + + if c.encapEnabled { + group := net.ParseIP(groupKey) + // Send IGMP leave message to other Nodes to notify the current Node leaves the given multicast group. + if err := c.igmpSnooper.sendIGMPLeaveReport([]net.IP{group}); err != nil { + klog.ErrorS(err, "Failed to send IGMP leave message to other Nodes", "group", groupKey) + } + } + c.delInstalledLocalGroup(groupKey) + return nil + } if c.groupHasInstalled(groupKey) { if c.groupIsStale(status) { - // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { - klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) - return err - } - // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { - klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) - return err + if c.localGroupHasInstalled(groupKey) { + if err := deleteLocalMulticastGroup(); err != nil { + return err + } } - c.v4GroupAllocator.Release(status.ofGroupID) - err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) - if err != nil { - klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) - return err + // TODO: add check for remote report is timeout + // remoteMembers is always empty with noEncap mode. + if status.remoteMembers.Len() == 0 { + // Remove the multicast OpenFlow flow and group entries if none Pod member on local or remote Node is in the group. + if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) + return err + } + // Remove the multicast flow entry if no local Pod is in the group. + if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) + return err + } + c.v4GroupAllocator.Release(status.ofGroupID) + c.delInstalledGroup(groupKey) + c.groupCache.Delete(status) + klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) + return nil } - err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) - if err != nil { - klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) + } else if !c.localGroupHasInstalled(groupKey) { + // Install multicast flows and routing entries for the multicast group that local Pods join. + if err := installLocalMulticastGroup(); err != nil { return err } - c.delInstalledGroup(groupKey) - c.groupCache.Delete(status) - klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) - return nil } - // Reinstall OpenFlow group because the local Pod receivers have changed. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + // Reinstall OpenFlow group because the remote node receivers have changed. + klog.V(2).InfoS("Updating OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "remoteReceivers", remoteNodeReceivers) + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Updated OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.InfoS("Updated OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) return nil } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.V(2).InfoS("Installing OpenFlow group for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) // Install OpenFlow flow to forward packets to local Pod receivers which are included in the group. if err := c.ofClient.InstallMulticastFlows(status.group, status.ofGroupID); err != nil { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } - if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { - klog.ErrorS(err, "Failed to join multicast group for multicast interfaces", "group", status.group) - return err + klog.InfoS("Installed OpenFlow flow and group for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) + if len(status.localMembers) > 0 { + err := installLocalMulticastGroup() + if err != nil { + return err + } } c.addInstalledGroup(groupKey) return nil @@ -461,6 +556,24 @@ func (c *Controller) delInstalledGroup(groupKey string) { c.installedGroupsMutex.Unlock() } +func (c *Controller) localGroupHasInstalled(groupKey string) bool { + c.installedLocalGroupsMutex.RLock() + defer c.installedLocalGroupsMutex.RUnlock() + return c.installedLocalGroups.Has(groupKey) +} + +func (c *Controller) addInstalledLocalGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Insert(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + +func (c *Controller) delInstalledLocalGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Delete(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) { obj, ok, _ := c.groupCache.GetByKey(e.group.String()) switch e.eType { @@ -509,7 +622,7 @@ func (c *Controller) updateQueryGroup() error { memberPorts = append(memberPorts, uint32(iface.OFPort)) } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts, nil); err != nil { return err } klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", types.McastAllHosts.String(), @@ -517,6 +630,49 @@ func (c *Controller) updateQueryGroup() error { return nil } +// syncLocalGroupsToOtherNodes sends IGMP join message to other Nodes in the same cluster to notify what multicast groups +// are joined by this Node. This function is used only with encap mode. +func (c *Controller) syncLocalGroupsToOtherNodes() { + if c.installedLocalGroups.Len() == 0 { + return + } + localGroups := make([]net.IP, c.installedLocalGroups.Len()) + c.installedLocalGroupsMutex.RLock() + for i := range c.installedLocalGroups.List() { + localGroups[i] = net.ParseIP(c.installedLocalGroups.List()[i]) + } + c.installedLocalGroupsMutex.RUnlock() + if err := c.igmpSnooper.sendIGMPJoinReport(localGroups); err != nil { + klog.ErrorS(err, "Failed to sync local multicast groups to other Nodes") + } +} + +func (c *Controller) nodeChanged(e interface{}) { + nodes, ok := e.(map[string]*utilip.DualStackIPs) + if !ok { + klog.InfoS("Failed to cast nodeUpdateEvent") + return + } + var nodeIPs []net.IP + updatedNodes := sets.NewString() + for _, ips := range nodes { + if ips != nil && ips.IPv4 != nil { + nodeIPs = append(nodeIPs, ips.IPv4) + updatedNodes.Insert(ips.IPv4.String()) + } + } + if c.installedNodes.Equal(updatedNodes) { + klog.V(2).InfoS("Nodes in the cluster are not changed, ignore the event") + return + } + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, nodeIPs); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + } + c.installedNodes = updatedNodes + // Notify local installed multicast groups to other Nodes in the cluster. + c.syncLocalGroupsToOtherNodes() +} + func podInterfaceIndexFunc(obj interface{}) ([]string, error) { groupState := obj.(*GroupMemberStatus) podInterfaces := make([]string, 0, len(groupState.localMembers)) @@ -596,3 +752,34 @@ func (c *Controller) GetAllPodsStats() map[*interfacestore.InterfaceConfig]*PodT } return statsMap } + +func memberExists(status *GroupMemberStatus, e *mcastGroupEvent) bool { + var exist bool + if e.iface.Type == interfacestore.ContainerInterface { + _, exist = status.localMembers[e.iface.InterfaceName] + } else if e.iface.Type == interfacestore.TunnelInterface { + exist = status.remoteMembers.Has(e.srcNode.String()) + } + return exist +} + +func addGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + status.localMembers[e.iface.InterfaceName] = e.time + status.lastIGMPReport = e.time + } else { + status.remoteMembers = status.remoteMembers.Insert(e.srcNode.String()) + } + return status +} + +func deleteGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + delete(status.localMembers, e.iface.InterfaceName) + klog.InfoS("Deleted local member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + } else { + status.remoteMembers = status.remoteMembers.Delete(e.srcNode.String()) + klog.InfoS("Deleted remote member from multicast group", "group", e.group.String(), "member", e.srcNode) + } + return status +} diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index a0cfd86c814..c14edec7b12 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -1,6 +1,3 @@ -//go:build linux -// +build linux - // Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,12 +15,13 @@ package multicast import ( + "fmt" "net" + "os" "sync" "testing" "time" - "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" @@ -31,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "antrea.io/antrea/pkg/agent/config" @@ -45,6 +44,7 @@ import ( "antrea.io/antrea/pkg/apis/crd/v1alpha1" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + utilip "antrea.io/antrea/pkg/util/ip" ) var ( @@ -69,10 +69,8 @@ var ( OFPort: 2, }, } - nodeIf1IP = net.ParseIP("192.168.20.22") - externalInterfaceIP = net.ParseIP("192.168.50.23") - pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") - pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") + nodeIf1IP = net.ParseIP("192.168.20.22") + nodeUpdateChannel *channel.SubscribableChannel ) func TestAddGroupMemberStatus(t *testing.T) { @@ -83,7 +81,7 @@ func TestAddGroupMemberStatus(t *testing.T) { time: time.Now(), iface: if1, } - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, @@ -97,7 +95,7 @@ func TestAddGroupMemberStatus(t *testing.T) { assert.True(t, ok) assert.Equal(t, mgroup.String(), key) mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) err = mctrl.syncGroup(key) @@ -106,10 +104,9 @@ func TestAddGroupMemberStatus(t *testing.T) { } func TestUpdateGroupMemberStatus(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) assert.Nil(t, err) - igmpMaxResponseTime = time.Second * 1 mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ group: mgroup, @@ -143,9 +140,8 @@ func TestUpdateGroupMemberStatus(t *testing.T) { } func TestCheckLastMember(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 - igmpMaxResponseTime = time.Second * 1 lastProbe := time.Now() mgroup := net.ParseIP("224.96.1.2") testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { @@ -195,7 +191,7 @@ func TestCheckLastMember(t *testing.T) { mctrl.queue.Forget(obj) } mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) for _, tc := range []struct { ev *mcastGroupEvent exists bool @@ -209,7 +205,7 @@ func TestCheckLastMember(t *testing.T) { } func TestClearStaleGroups(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 err := mctrl.initialize(t) assert.Nil(t, err) @@ -253,12 +249,14 @@ func TestClearStaleGroups(t *testing.T) { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addInstalledLocalGroup(g.group.String()) } fakePort := int32(1) for _, g := range staleGroups { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addInstalledLocalGroup(g.group.String()) for m := range g.localMembers { mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}} mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true) @@ -283,7 +281,7 @@ func TestClearStaleGroups(t *testing.T) { } func TestProcessPacketIn(t *testing.T) { - mockController := newMockMulticastController(t) + mockController := newMockMulticastController(t, false) snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) @@ -449,6 +447,313 @@ func TestProcessPacketIn(t *testing.T) { } } +func TestEncapModeInitialize(t *testing.T) { + mockController := newMockMulticastController(t, true) + assert.True(t, mockController.nodeGroupID != 0) + err := mockController.initialize(t) + assert.Nil(t, err) +} + +func TestEncapLocalReportAndNotifyRemote(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ + {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + go wait.Until(mockController.worker, time.Second, stopCh) + + iface1 := createInterface("pod1", 3) + iface2 := createInterface("pod2", 4) + mgroup := net.ParseIP("224.2.100.4") + for _, tc := range []struct { + e *mcastGroupEvent + interfaces []*interfacestore.InterfaceConfig + groupChanged bool + ifaceCheck bool + }{ + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: false, ifaceCheck: false}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + } { + groupKey := tc.e.group.String() + if tc.e.eType == groupJoin { + if tc.groupChanged { + mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) + } + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } else { + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + if len(tc.interfaces) == 1 { + mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, types.McastAllHosts, gomock.Any(), gomock.Any()).AnyTimes() + } + if !tc.groupChanged { + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } + if tc.groupChanged { + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(tc.e.group) + mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + } + } + mockController.addOrUpdateGroupEvent(tc.e) + + if tc.groupChanged { + err := wait.PollImmediate(time.Millisecond*100, time.Second*3, func() (done bool, err error) { + if tc.e.eType == groupJoin { + return mockController.localGroupHasInstalled(groupKey) && mockController.groupHasInstalled(groupKey), nil + } else { + return !mockController.localGroupHasInstalled(groupKey) && !mockController.groupHasInstalled(groupKey), nil + } + }) + assert.Nil(t, err) + } else { + time.Sleep(time.Millisecond * 200) + } + } +} + +type nodeUpdateTestController struct { + *Controller + notifyCh chan struct{} +} + +func (c *nodeUpdateTestController) nodeChanged(e interface{}) { + c.Controller.nodeChanged(e) + close(c.notifyCh) +} + +func newNodeUpdateTestController(t *testing.T) *nodeUpdateTestController { + controller := gomock.NewController(t) + mockOFClient = openflowtest.NewMockClient(controller) + groupAllocator := openflow.NewGroupAllocator(false) + mockMulticastSocket = multicasttest.NewMockRouteInterface(controller) + mRouteClient := &MRouteClient{ + socket: mockMulticastSocket, + multicastInterfaceConfigs: []multicastInterfaceConfig{ + {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, + }, + } + mockController := &Controller{ + ofClient: mockOFClient, + nodeGroupID: groupAllocator.Allocate(), + mRouteClient: mRouteClient, + installedNodes: sets.NewString(), + installedLocalGroups: sets.NewString(), + igmpSnooper: &IGMPSnooper{ + ofClient: mockOFClient, + }, + } + return &nodeUpdateTestController{ + Controller: mockController, + } +} + +func TestNodeUpdate(t *testing.T) { + mockController := newNodeUpdateTestController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + nodeUpdateChannel = channel.NewSubscribableChannel("NodeUpdate", 100) + nodeUpdateChannel.Subscribe(mockController.nodeChanged) + go nodeUpdateChannel.Run(stopCh) + mockController.addInstalledLocalGroup("224.2.100.1") + + for _, tc := range []struct { + nodeUpdates map[string]string + nodesChanged bool + }{ + { + nodesChanged: true, + nodeUpdates: map[string]string{ + "n1": "10.10.10.11", + "n2": "10.10.10.12", + }, + }, + { + nodesChanged: false, + nodeUpdates: map[string]string{ + "n1": "10.10.10.11", + "n2": "10.10.10.12", + }, + }, + { + nodesChanged: true, + nodeUpdates: map[string]string{ + "n1": "10.10.10.11", + "n2": "10.10.10.12", + "n3": "10.10.10.13", + }, + }, + { + nodesChanged: true, + nodeUpdates: map[string]string{ + "n2": "10.10.10.12", + "n3": "10.10.10.13", + }, + }, + { + nodesChanged: true, + nodeUpdates: map[string]string{ + "n3": "10.10.10.13", + "n4": "10.10.10.14", + }, + }, + } { + nodeNameIPsMap := make(map[string]*utilip.DualStackIPs, len(tc.nodeUpdates)) + expectedNodeIPStrSet := sets.NewString() + for name, ipStr := range tc.nodeUpdates { + nodeIP := net.ParseIP(ipStr) + nodeNameIPsMap[name] = &utilip.DualStackIPs{ + IPv4: nodeIP, + } + expectedNodeIPStrSet = expectedNodeIPStrSet.Insert(ipStr) + } + mockController.notifyCh = make(chan struct{}) + if tc.nodesChanged { + mockOFClient.EXPECT().InstallMulticastGroup(mockController.nodeGroupID, nil, gomock.Any()).Return(nil).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()).Times(1) + } + go func() { + nodeUpdateChannel.Notify(nodeNameIPsMap) + }() + + <-mockController.notifyCh + assert.Equal(t, expectedNodeIPStrSet, mockController.installedNodes, fmt.Sprintf("installedNodes: %v, expectedNodes: %v", mockController.installedNodes, expectedNodeIPStrSet)) + } +} + +func TestRemoteMemberJoinLeave(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + stopStr := "done" + eventHandler := func(stopCh <-chan struct{}) { + for { + select { + case e := <-mockController.groupEventCh: + if e.group.Equal(net.IPv4zero) { + mockController.queue.Add(stopStr) + } else { + mockController.addOrUpdateGroupEvent(e) + } + case <-stopCh: + return + } + } + } + go eventHandler(stopCh) + + for _, tc := range []struct { + groupStrs []string + nodeStr string + isJoin bool + }{ + {groupStrs: []string{"224.2.100.2", "224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.2", "224.2.100.5"}, nodeStr: "10.10.10.12", isJoin: true}, + {groupStrs: []string{"224.2.100.2"}, nodeStr: "10.10.10.12", isJoin: false}, + } { + groups := make([]net.IP, len(tc.groupStrs)) + for i, g := range tc.groupStrs { + groups[i] = net.ParseIP(g) + } + node := net.ParseIP(tc.nodeStr) + testRemoteReport(t, mockController, groups, node, tc.isJoin, stopStr) + } +} + +func testRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, node net.IP, nodeJoin bool, stopStr string) { + tunnelPort := uint32(2) + proto := uint8(protocol.IGMPIsEx) + if !nodeJoin { + proto = uint8(protocol.IGMPToIn) + } + for _, g := range groups { + var exists bool + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + if !exists { + mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) + } else { + status := obj.(*GroupMemberStatus) + exists = status.remoteMembers.Has(node.String()) + if nodeJoin && exists || !nodeJoin && !exists { + continue + } + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), []uint32{config.HostGatewayOFPort}, gomock.Any()) + } + + processNextItem := func(stopStr string) { + for { + obj, quit := mockController.queue.Get() + if quit { + return + } + key := obj.(string) + if key == stopStr { + mockController.queue.Forget(key) + mockController.queue.Done(obj) + return + } + if err := mockController.syncGroup(key); err != nil { + t.Errorf("Failed to process %s: %v", key, err) + } + mockController.queue.Forget(key) + mockController.queue.Done(obj) + } + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + processNextItem(stopStr) + wg.Done() + }() + + err := processRemoteReport(t, mockController, groups, node, proto, tunnelPort) + assert.Nil(t, err) + mockController.groupEventCh <- &mcastGroupEvent{group: net.IPv4zero} + wg.Wait() + + for _, g := range groups { + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + assert.True(t, exists) + status := obj.(*GroupMemberStatus) + if nodeJoin { + assert.True(t, status.remoteMembers.Has(node.String())) + } else { + assert.False(t, status.remoteMembers.Has(node.String())) + } + } + for _, g := range groups { + assert.True(t, mockController.groupHasInstalled(g.String())) + } +} + +func processRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, remoteNode net.IP, reportType uint8, tunnelPort uint32) error { + pkt := generatePacketInForRemoteReport(t, mockController.igmpSnooper, groups, remoteNode, reportType, tunnelPort) + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, nodeIf1IP), true) + return mockController.igmpSnooper.processPacketIn(&pkt) +} + func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEvent) { obj, exits, err := cache.GetByKey(event.group.String()) assert.Nil(t, err) @@ -467,7 +772,7 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven } } -func newMockMulticastController(t *testing.T) *Controller { +func newMockMulticastController(t *testing.T, isEncap bool) *Controller { controller := gomock.NewController(t) mockOFClient = openflowtest.NewMockClient(controller) mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller) @@ -479,17 +784,25 @@ func newMockMulticastController(t *testing.T) *Controller { mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) groupAllocator := openflow.NewGroupAllocator(false) podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100) - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator) + + if isEncap { + nodeUpdateChannel = channel.NewSubscribableChannel("NodeUpdate", 100) + } + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator, isEncap, nodeUpdateChannel) return mctrl } func (c *Controller) initialize(t *testing.T) error { mockOFClient.EXPECT().InstallMulticastInitialFlows(uint8(0)).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{}) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(0)).Times(1).Return([]uint16{0}, nil) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(1)).Times(1).Return([]uint16{1, 2}, nil) + if c.encapEnabled { + mockOFClient.EXPECT().InstallMulticastGroup(c.nodeGroupID, gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastToRemoteFlows(c.nodeGroupID).Times(1) + } return c.Initialize() } @@ -509,32 +822,13 @@ func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig func createIGMPReportPacketIn(joinedGroups []net.IP, leftGroups []net.IP, version uint8, ofport uint32) []*ofctrl.PacketIn { joinMessages := createIGMPJoinMessage(joinedGroups, version) leaveMessages := createIGMPLeaveMessage(leftGroups, version) - generatePacket := func(m util.Message) ofctrl.PacketIn { - pkt := openflow13.NewPacketIn() - matchInport := openflow13.NewInPortField(ofport) - pkt.Match.AddField(*matchInport) - ipPacket := &protocol.IPv4{ - Version: 0x4, - IHL: 5, - Protocol: IGMPProtocolNumber, - Length: 20 + m.Len(), - Data: m, - } - pkt.Data = protocol.Ethernet{ - HWDst: pktInDstMAC, - HWSrc: pktInSrcMAC, - Ethertype: protocol.IPv4_MSG, - Data: ipPacket, - } - return ofctrl.PacketIn(*pkt) - } pkts := make([]*ofctrl.PacketIn, 0) for _, m := range joinMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } for _, m := range leaveMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } return pkts @@ -578,3 +872,8 @@ func createIGMPJoinMessage(groups []net.IP, version uint8) []util.Message { } return pkts } + +func TestMain(m *testing.M) { + igmpMaxResponseTime = time.Second + os.Exit(m.Run()) +} diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index d6ffcef5a75..648163dd367 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -39,6 +39,11 @@ const ( IGMPProtocolNumber = 2 ) +const ( + openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC" + openflowKeyInPort = "OXM_OF_IN_PORT" +) + var ( // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message @@ -46,6 +51,8 @@ var ( igmpMaxResponseTime = time.Second * 10 // igmpQueryDstMac is the MAC address used in the dst MAC field in the IGMP query message igmpQueryDstMac, _ = net.ParseMAC("01:00:5e:00:00:01") + // igmpReportDstMac is the MAC address used in the dst MAC field in the IGMP report message + igmpReportDstMac, _ = net.ParseMAC("01:00:5e:00:00:16") ) type IGMPSnooper struct { @@ -62,6 +69,7 @@ type IGMPSnooper struct { // Similar to igmpReportANPStats, it stores ACNP stats for IGMP reports. igmpReportACNPStats map[apitypes.UID]map[string]*types.RuleMetric igmpReportACNPStatsMutex sync.Mutex + encapEnabled bool } func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { @@ -92,7 +100,7 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow13.NXRange) (uint32, func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) { matches := pktIn.GetMatches() - ofPortField := matches.GetMatchByName("OXM_OF_IN_PORT") + ofPortField := matches.GetMatchByName(openflowKeyInPort) if ofPortField == nil { return nil, errors.New("in_port field not found") } @@ -127,6 +135,11 @@ func (s *IGMPSnooper) validate(event *mcastGroupEvent, igmpType uint8, packetInD // Return true directly if there is no validator. return true, nil } + // MulticastValidator only validates the IGMP report message sent from Pods. The report message received from tunnel + // port is sent from Antrea Agent on a different Node, and returns true directly. + if event.iface.Type == interfacestore.TunnelInterface { + return true, nil + } if event.iface.Type != interfacestore.ContainerInterface { return true, fmt.Errorf("interface is not container") } @@ -201,6 +214,42 @@ func (s *IGMPSnooper) collectStats() (igmpANPStats, igmpACNPStats map[apitypes.U return igmpANPStats, igmpACNPStats } +func (s *IGMPSnooper) sendIGMPReport(groupRecordType uint8, groups []net.IP) error { + igmp, err := s.generateIGMPReportPacket(groupRecordType, groups) + if err != nil { + return err + } + if err := s.ofClient.SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, igmp); err != nil { + return err + } + klog.V(2).InfoS("Sent packetOut for IGMP v3 report", "groups", groups) + return nil +} + +func (s *IGMPSnooper) generateIGMPReportPacket(groupRecordType uint8, groups []net.IP) (util.Message, error) { + records := make([]protocol.IGMPv3GroupRecord, len(groups)) + for i, group := range groups { + records[i] = protocol.IGMPv3GroupRecord{ + Type: groupRecordType, + MulticastAddress: group, + } + } + return &protocol.IGMPv3MembershipReport{ + Type: protocol.IGMPv3Report, + Checksum: 0, + NumberOfGroups: uint16(len(records)), + GroupRecords: records, + }, nil +} + +func (s *IGMPSnooper) sendIGMPJoinReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPIsEx, groups) +} + +func (s *IGMPSnooper) sendIGMPLeaveReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPToIn, groups) +} + func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { now := time.Now() iface, err := s.parseSrcInterface(pktIn) @@ -209,8 +258,15 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { } klog.V(2).InfoS("Received PacketIn for IGMP packet", "in_port", iface.OFPort) podName := "unknown" + var srcNode net.IP if iface.Type == interfacestore.ContainerInterface { podName = iface.PodName + } else if iface.Type == interfacestore.TunnelInterface { + var err error + srcNode, err = s.parseSrcNode(pktIn) + if err != nil { + return err + } } igmp, err := parseIGMPPacket(pktIn.Data) if err != nil { @@ -240,10 +296,11 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { evtType = groupLeave } event := &mcastGroupEvent{ - group: mgroup, - eType: evtType, - time: now, - iface: iface, + group: mgroup, + eType: evtType, + time: now, + iface: iface, + srcNode: srcNode, } s.validatePacketAndNotify(event, igmpType, pktIn.Data) } @@ -261,6 +318,16 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { return nil } +func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) { + matches := pktIn.GetMatches() + tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc) + if tunSrcField == nil { + return nil, errors.New("in_port field not found") + } + tunSrc := tunSrcField.GetValue().(net.IP) + return tunSrc, nil +} + func generateIGMPQueryPacket(group net.IP, version uint8, queryInterval time.Duration) (util.Message, error) { // The max response time field in IGMP protocol uses a value in units of 1/10 second. // See https://datatracker.ietf.org/doc/html/rfc2236 and https://datatracker.ietf.org/doc/html/rfc3376 @@ -332,8 +399,8 @@ func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) { } } -func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController) *IGMPSnooper { - snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval} +func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController, encapEnabled bool) *IGMPSnooper { + snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval, encapEnabled: encapEnabled} snooper.igmpReportACNPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) snooper.igmpReportANPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonMC), "MulticastGroupDiscovery", snooper) diff --git a/pkg/agent/multicast/mcast_discovery_test.go b/pkg/agent/multicast/mcast_discovery_test.go new file mode 100644 index 00000000000..3af352277f3 --- /dev/null +++ b/pkg/agent/multicast/mcast_discovery_test.go @@ -0,0 +1,185 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "net" + "sync" + "testing" + + "antrea.io/libOpenflow/openflow13" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + + "antrea.io/antrea/pkg/agent/interfacestore" + ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/ovs/ovsconfig" +) + +var ( + pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") + pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") +) + +type snooperValidator struct { + eventCh chan *mcastGroupEvent + groupJoinedNodes map[string]sets.String + groupLeftNodes map[string]sets.String +} + +func (v *snooperValidator) processPackets(expectedPackets int) { + appendSrcNode := func(groupKey string, groupNodes map[string]sets.String, nodeIP net.IP) map[string]sets.String { + _, exists := groupNodes[groupKey] + if !exists { + groupNodes[groupKey] = sets.NewString() + } + groupNodes[groupKey] = groupNodes[groupKey].Insert(nodeIP.String()) + return groupNodes + } + for i := 0; i < expectedPackets; i++ { + select { + case e := <-v.eventCh: + groupKey := e.group.String() + if e.eType == groupJoin { + v.groupJoinedNodes = appendSrcNode(groupKey, v.groupJoinedNodes, e.srcNode) + } else { + v.groupLeftNodes = appendSrcNode(groupKey, v.groupLeftNodes, e.srcNode) + } + } + } +} + +func TestIGMPRemoteReport(t *testing.T) { + controller := gomock.NewController(t) + mockOFClient := openflowtest.NewMockClient(controller) + mockIfaceStore := ifaceStoretest.NewMockInterfaceStore(controller) + eventCh := make(chan *mcastGroupEvent, 100) + snooper := &IGMPSnooper{ofClient: mockOFClient, eventCh: eventCh, ifaceStore: mockIfaceStore} + + localNodeIP := net.ParseIP("1.2.3.4") + tunnelPort := uint32(1) + wg := sync.WaitGroup{} + + generateRemotePackets := func(groups []net.IP, nodes []net.IP, igmpMsgType uint8) []ofctrl.PacketIn { + packets := make([]ofctrl.PacketIn, 0, len(nodes)) + for _, srcNode := range nodes { + pkt := generatePacketInForRemoteReport(t, snooper, groups, srcNode, igmpMsgType, tunnelPort) + packets = append(packets, pkt) + } + return packets + } + validateGroupNodes := func(groups []net.IP, expectedNodesIPs []net.IP, testGroupNodes map[string]sets.String) { + if len(expectedNodesIPs) == 0 { + return + } + for _, g := range groups { + expectedNodes := sets.NewString() + for _, n := range expectedNodesIPs { + expectedNodes.Insert(n.String()) + } + nodes, exists := testGroupNodes[g.String()] + assert.True(t, exists) + assert.True(t, nodes.HasAll(expectedNodes.List()...)) + } + } + testPacketProcess := func(groups []net.IP, joinedNodes []net.IP, leftNodes []net.IP) { + validator := snooperValidator{eventCh: eventCh, groupJoinedNodes: make(map[string]sets.String), groupLeftNodes: make(map[string]sets.String)} + packets := make([]ofctrl.PacketIn, 0, len(joinedNodes)+len(leftNodes)) + packets = append(packets, generateRemotePackets(groups, joinedNodes, protocol.IGMPIsEx)...) + packets = append(packets, generateRemotePackets(groups, leftNodes, protocol.IGMPToIn)...) + + eventCount := len(groups) * len(packets) + wg.Add(1) + go func() { + validator.processPackets(eventCount) + wg.Done() + }() + + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, localNodeIP), true).Times(len(packets)) + for i := range packets { + pkt := &packets[i] + err := snooper.processPacketIn(pkt) + assert.Nil(t, err, "Failed to process IGMP Report message") + } + + wg.Wait() + + validateGroupNodes(groups, joinedNodes, validator.groupJoinedNodes) + validateGroupNodes(groups, leftNodes, validator.groupLeftNodes) + } + + for _, tc := range []struct { + groupsStrings []string + joinedNodesStrings []string + leftNodesStrings []string + }{ + {groupsStrings: []string{"225.1.2.3", "225.1.2.4"}, joinedNodesStrings: []string{"1.2.3.5", "1.2.3.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + {groupsStrings: []string{"225.1.2.5"}, joinedNodesStrings: []string{"1.2.3.5"}}, + {groupsStrings: []string{"225.1.2.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + } { + var groups, joinedNodes, leftNodes []net.IP + for _, g := range tc.groupsStrings { + groups = append(groups, net.ParseIP(g)) + } + for _, n := range tc.joinedNodesStrings { + joinedNodes = append(joinedNodes, net.ParseIP(n)) + } + for _, n := range tc.leftNodesStrings { + leftNodes = append(leftNodes, net.ParseIP(n)) + } + testPacketProcess(groups, joinedNodes, leftNodes) + } +} + +func generatePacket(m util.Message, ofport uint32, srcNodeIP net.IP) ofctrl.PacketIn { + pkt := openflow13.NewPacketIn() + matchInport := openflow13.NewInPortField(ofport) + pkt.Match.AddField(*matchInport) + if srcNodeIP != nil { + matchTunSrc := openflow13.NewTunnelIpv4SrcField(srcNodeIP, nil) + pkt.Match.AddField(*matchTunSrc) + } + ipPacket := &protocol.IPv4{ + Version: 0x4, + IHL: 5, + Protocol: IGMPProtocolNumber, + Length: 20 + m.Len(), + Data: m, + } + pkt.Data = protocol.Ethernet{ + HWDst: pktInDstMAC, + HWSrc: pktInSrcMAC, + Ethertype: protocol.IPv4_MSG, + Data: ipPacket, + } + return ofctrl.PacketIn(*pkt) +} + +func generatePacketInForRemoteReport(t *testing.T, snooper *IGMPSnooper, groups []net.IP, srcNode net.IP, igmpMsgType uint8, tunnelPort uint32) ofctrl.PacketIn { + msg, err := snooper.generateIGMPReportPacket(igmpMsgType, groups) + assert.Nil(t, err, "Failed to generate IGMP Report message") + return generatePacket(msg, tunnelPort, srcNode) +} + +func createTunnelInterface(tunnelPort uint32, localNodeIP net.IP) *interfacestore.InterfaceConfig { + tunnelInterface := interfacestore.NewTunnelInterface("antrea-tun0", ovsconfig.GeneveTunnel, localNodeIP, false) + tunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: int32(tunnelPort)} + return tunnelInterface +} diff --git a/pkg/agent/multicast/mcast_route.go b/pkg/agent/multicast/mcast_route.go index 279c11c4f19..bf2b9ea1251 100644 --- a/pkg/agent/multicast/mcast_route.go +++ b/pkg/agent/multicast/mcast_route.go @@ -32,7 +32,7 @@ const ( MulticastRecvBufferSize = 128 ) -func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.String) *MRouteClient { +func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.String, encapEnabled bool) *MRouteClient { var m = &MRouteClient{ igmpMsgChan: make(chan []byte, workerCount), nodeConfig: nodeconfig, @@ -83,11 +83,11 @@ type MRouteClient struct { // by making these interfaces accept multicast traffic with multicast ip:mgroup. // https://tldp.org/HOWTO/Multicast-HOWTO-6.html#ss6.4 func (c *MRouteClient) multicastInterfacesJoinMgroup(mgroup net.IP) error { + groupIP := mgroup.To4() for _, config := range c.multicastInterfaceConfigs { addrIP := config.IPv4Addr.IP.To4() - groupIP := mgroup.To4() err := c.socket.MulticastInterfaceJoinMgroup(groupIP, addrIP, config.Name) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "address already in use") { return err } } @@ -95,9 +95,9 @@ func (c *MRouteClient) multicastInterfacesJoinMgroup(mgroup net.IP) error { } func (c *MRouteClient) multicastInterfacesLeaveMgroup(mgroup net.IP) error { + groupIP := mgroup.To4() for _, config := range c.multicastInterfaceConfigs { addrIP := config.IPv4Addr.IP.To4() - groupIP := mgroup.To4() err := c.socket.MulticastInterfaceLeaveMgroup(groupIP, addrIP, config.Name) if err != nil { return err @@ -158,7 +158,7 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error) } // addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces, -// allowing multicast sender Pods to send multicast traffic to external. +// allowing multicast srcNode Pods to send multicast traffic to external. func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) { klog.V(2).InfoS("Adding outbound multicast route entry", "src", src, "group", group, "outboundVIFs", c.externalInterfaceVIFs) err = c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs) diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index b9f4c14d69c..4fff580436c 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -33,9 +33,10 @@ import ( ) var ( - addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} - addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} - nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} + externalInterfaceIP = net.ParseIP("192.168.50.23") + addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} + addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} + nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} ) func TestParseIGMPMsg(t *testing.T) { @@ -117,7 +118,7 @@ func newMockMulticastRouteClient(t *testing.T) *MRouteClient { groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.NewString(if1.InterfaceName)) + return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.NewString(if1.InterfaceName), false) } func (c *MRouteClient) initialize(t *testing.T) error { diff --git a/pkg/agent/multicast/testing/mock_multicast.go b/pkg/agent/multicast/testing/mock_multicast.go index 79a2701ebe3..f6940ce9f14 100644 --- a/pkg/agent/multicast/testing/mock_multicast.go +++ b/pkg/agent/multicast/testing/mock_multicast.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index d4995f2117d..cd421fa766c 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -19,6 +19,7 @@ import ( "math/rand" "net" + "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" ofutil "antrea.io/libOpenflow/util" v1 "k8s.io/api/core/v1" @@ -280,11 +281,18 @@ type Client interface { // InstallMulticastInitialFlows installs OpenFlow to packetIn the IGMP messages and output the Multicast traffic to // antrea-gw0 so that local Pods could access external Multicast servers. InstallMulticastInitialFlows(pktInReason uint8) error + // InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error + // UninstallMulticastFlows removes the flow matching the given multicastIP. UninstallMulticastFlows(multicastIP net.IP) error + + // InstallMulticastRemoteReportFlows installs OpenFlow to forward the IGMP report messages to the other Nodes, + // and packetIn the messages to Antrea Agent on peer Node. + // This function is called with encap mode, and used to sync the multicast groups in the cluster. + InstallMulticastToRemoteFlows(groupID binding.GroupIDType) error // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, @@ -304,7 +312,13 @@ type Client interface { // UninstallTrafficControlReturnPortFlow removes the flow to classify the packets from a return port. UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error - InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32) error + InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error + + // SendIGMPRemoteReportPacketOut sends the IGMP report packet as a packet-out to remote Nodes via the tunnel port. + SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error // InstallMulticlusterNodeFlows installs flows to handle cross-cluster packets between a regular // Node and a local Gateway. @@ -800,7 +814,7 @@ func (c *client) generatePipelines() { if c.enableMulticast { // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort, c.networkConfig.TrafficEncapMode.SupportsEncap(), config.DefaultTunOFPort) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } @@ -1190,6 +1204,15 @@ func (c *client) UninstallMulticastFlows(multicastIP net.IP) error { return c.deleteFlows(c.featureMulticast.cachedFlows, cacheKey) } +func (c *client) InstallMulticastToRemoteFlows(groupID binding.GroupIDType) error { + firstMulticastTable := c.pipelines[pipelineMulticast].GetFirstTable() + flows := c.featureMulticast.multicastToRemoteFlows(groupID, firstMulticastTable) + cacheKey := "multicast_encap" + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) +} + func (c *client) SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, dstIP net.IP, @@ -1240,7 +1263,25 @@ func (c *client) UninstallTrafficControlReturnPortFlow(returnOFPort uint32) erro return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey) } -func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32) error { +func (c *client) SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error { + srcMAC := c.nodeConfig.GatewayConfig.MAC.String() + srcIP := c.nodeConfig.NodeTransportIPv4Addr.IP.String() + dstMACStr := dstMAC.String() + dstIPStr := dstIP.String() + packetOutBuilder, err := setBasePacketOutBuilder(c.bridge.BuildPacketOut(), srcMAC, dstMACStr, srcIP, dstIPStr, openflow13.P_CONTROLLER, 0) + if err != nil { + return err + } + // Set protocol, L4 message, and target OF Group ID. + packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolIGMP).SetL4Packet(igmp) + packetOutObj := packetOutBuilder.Done() + return c.bridge.SendPacketOut(packetOutObj) +} + +func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() table := MulticastOutputTable @@ -1248,7 +1289,7 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive table = MulticastIngressRuleTable } - if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers...); err != nil { + if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers, remoteNodeReceivers); err != nil { return err } return nil diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 79df3424a88..7488d068c15 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -19,8 +19,10 @@ import ( "net" "sync" + "antrea.io/libOpenflow/openflow13" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/types" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -31,6 +33,8 @@ type featureMulticast struct { ipProtocols []binding.Protocol bridge binding.Bridge gatewayPort uint32 + encapEnabled bool + tunnelPort uint32 cachedFlows *flowCategoryCache groupCache sync.Map @@ -43,7 +47,7 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32, encapEnabled bool, tunnelPort uint32) *featureMulticast { return &featureMulticast{ cookieAllocator: cookieAllocator, ipProtocols: ipProtocols, @@ -53,6 +57,8 @@ func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding groupCache: sync.Map{}, enableAntreaPolicy: anpEnabled, gatewayPort: gwPort, + encapEnabled: encapEnabled, + tunnelPort: tunnelPort, } } @@ -68,9 +74,7 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b func (f *featureMulticast) initFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() - return []binding.Flow{ - f.multicastOutputFlow(cookieID), - } + return f.multicastOutputFlows(cookieID) } func (f *featureMulticast) replayFlows() []binding.Flow { @@ -78,7 +82,7 @@ func (f *featureMulticast) replayFlows() []binding.Flow { return getCachedFlows(f.cachedFlows) } -func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports ...uint32) error { +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) error { group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() for i := range ports { group = group.Bucket(). @@ -87,6 +91,14 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ResubmitToTable(tableID). Done() } + for _, ip := range remoteIPs { + group = group.Bucket(). + LoadToRegField(OFPortFoundRegMark.GetField(), OFPortFoundRegMark.GetValue()). + LoadToRegField(TargetOFPortField, f.tunnelPort). + SetTunnelDst(ip). + ResubmitToTable(MulticastOutputTable.GetID()). + Done() + } if err := group.Add(); err != nil { return fmt.Errorf("error when installing Multicast receiver Group: %w", err) } @@ -94,12 +106,38 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, return nil } -func (f *featureMulticast) multicastOutputFlow(cookieID uint64) binding.Flow { - return MulticastOutputTable.ofTable.BuildFlow(priorityNormal). - Cookie(cookieID). - MatchRegMark(OFPortFoundRegMark). - Action().OutputToRegField(TargetOFPortField). - Done() +func (f *featureMulticast) multicastOutputFlows(cookieID uint64) []binding.Flow { + flows := []binding.Flow{ + MulticastOutputTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done(), + } + if f.encapEnabled { + // When running with encap mode, drop the multicast packets if it is received from tunnel port and expected to + // output to antrea-gw0, or received from antrea-gw0 and expected to output to tunnel. These flows are used to + // avoid duplication on packet forwarding. For example, if the packet is received on tunnel port, it means + // the sender is a Pod on other Node, then the packet is already sent to external via antrea-gw0 on the source + // Node. On the reverse, if the packet is received on antrea-gw0, it means the sender is from external, then + // the Pod receivers on other Nodes should also receive the packets from the underlay network. + flows = append(flows, MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromTunnelRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.HostGatewayOFPort). + Action().Drop(). + Done(), + MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromGatewayRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.DefaultTunOFPort). + Action().Drop(). + Done(), + ) + } + return flows } func (f *featureMulticast) multicastSkipIGMPMetricFlows() []binding.Flow { @@ -147,3 +185,34 @@ func (f *featureMulticast) replayGroups() { return true }) } + +func (f *featureMulticast) multicastToRemoteFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow { + return []binding.Flow{ + // This flow outputs the IGMP report message sent from Antrea Agent to an OpenFlow group which is expected to + // broadcast to all the other Nodes in the cluster. The multicast groups in side the IGMP report message + // include the ones local Pods have joined in. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchInPort(openflow13.P_CONTROLLER). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().Group(groupID). + Done(), + // This flow ensures the IGMP report message sent from Antrea Agent to bypass the check in SpoofGuardTable. + ClassifierTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(openflow13.P_CONTROLLER). + Action().GotoTable(SpoofGuardTable.GetNext()). + Done(), + // This flow ensures the multicast packet sent from a different Node via the tunnel port to enter Multicast + // pipeline. + ClassifierTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(config.DefaultTunOFPort). + MatchProtocol(binding.ProtocolIP). + MatchDstIPNet(*types.McastCIDR). + Action().LoadRegMark(FromTunnelRegMark). + Action().GotoTable(firstMulticastTable.GetID()). + Done(), + } +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 3fdb03c5f69..b928ea13bef 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2666,20 +2666,24 @@ func pipelineClassifyFlow(cookieID uint64, protocol binding.Protocol, pipeline b // igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastRoutingTable // and sends it to antrea-agent. func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { - flows := []binding.Flow{ - // Set a custom reason for the IGMP packets, and then send it to antrea-agent and forward it normally in the - // OVS bridge, so that the OVS multicast db cache can be updated, and antrea-agent can identify the local multicast - // group and its members in the meanwhile. - // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in - // the packet. - MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). - Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchProtocol(binding.ProtocolIGMP). - MatchRegMark(FromLocalRegMark). - Action().LoadRegMark(CustomReasonIGMPRegMark). - Action().SendToController(reason). - Action().Normal(). - Done(), + var flows []binding.Flow + sourceMarks := []*binding.RegMark{FromLocalRegMark} + if f.encapEnabled { + sourceMarks = append(sourceMarks, FromTunnelRegMark) + } + for _, m := range sourceMarks { + flows = append(flows, + // Set a custom reason for the IGMP packets, and then send it to antrea-agent. Then antrea-agent can identify + // the local multicast group and its members in the meanwhile. + // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in + // the packet. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchRegMark(m). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().SendToController(reason). + Done()) } return flows } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index f59dc19007f..235c19212b9 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -297,17 +297,17 @@ func (mr *MockClientMockRecorder) InstallMulticastFlows(arg0, arg1 interface{}) } // InstallMulticastGroup mocks base method -func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32) error { +func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32, arg2 []net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1) + ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // InstallMulticastGroup indicates an expected call of InstallMulticastGroup -func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1, arg2) } // InstallMulticastInitialFlows mocks base method @@ -324,6 +324,20 @@ func (mr *MockClientMockRecorder) InstallMulticastInitialFlows(arg0 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastInitialFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastInitialFlows), arg0) } +// InstallMulticastToRemoteFlows mocks base method +func (m *MockClient) InstallMulticastToRemoteFlows(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastToRemoteFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastToRemoteFlows indicates an expected call of InstallMulticastToRemoteFlows +func (mr *MockClientMockRecorder) InstallMulticastToRemoteFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastToRemoteFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastToRemoteFlows), arg0) +} + // InstallMulticlusterClassifierFlows mocks base method func (m *MockClient) InstallMulticlusterClassifierFlows(arg0 uint32, arg1 bool) error { m.ctrl.T.Helper() @@ -670,6 +684,20 @@ func (mr *MockClientMockRecorder) SendIGMPQueryPacketOut(arg0, arg1, arg2, arg3 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPQueryPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPQueryPacketOut), arg0, arg1, arg2, arg3) } +// SendIGMPRemoteReportPacketOut mocks base method +func (m *MockClient) SendIGMPRemoteReportPacketOut(arg0 net.HardwareAddr, arg1 net.IP, arg2 util.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendIGMPRemoteReportPacketOut", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendIGMPRemoteReportPacketOut indicates an expected call of SendIGMPRemoteReportPacketOut +func (mr *MockClientMockRecorder) SendIGMPRemoteReportPacketOut(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPRemoteReportPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPRemoteReportPacketOut), arg0, arg1, arg2) +} + // SendTCPPacketOut mocks base method func (m *MockClient) SendTCPPacketOut(arg0, arg1, arg2, arg3 string, arg4, arg5 uint32, arg6 bool, arg7, arg8 uint16, arg9 uint32, arg10 byte, arg11 func(openflow.PacketOutBuilder) openflow.PacketOutBuilder) error { m.ctrl.T.Helper() diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index f0e1fc04a57..5ae84f9ec67 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -80,4 +80,12 @@ type Interface interface { // DeleteLocalAntreaFlexibleIPAMPodRule is used to delete related IP set entries when an AntreaFlexibleIPAM Pod is deleted. DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error + + // AddNodeIPs adds Node IPs into the ipset when a new Node joins the cluster. + // The ipset is consumed with encap mode when multicast is enabled. + AddNodeIPs(nodeIP4 net.IP, nodeIP6 net.IP) error + + // DeleteNodeIPs deletes NodeIPs from the ipset when a Node leaves the cluster. + // The ipset is consumed with encap mode when multicast is enabled. + DeleteNodeIPs(nodeIP4 net.IP, nodeIP6 net.IP) error } diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 75b7a48714b..83ada49cc59 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -57,6 +57,10 @@ const ( localAntreaFlexibleIPAMPodIPSet = "LOCAL-FLEXIBLE-IPAM-POD-IP" // localAntreaFlexibleIPAMPodIP6Set contains all AntreaFlexibleIPAM Pod IPv6s of this Node. localAntreaFlexibleIPAMPodIP6Set = "LOCAL-FLEXIBLE-IPAM-POD-IP6" + // clusterNodeIPSet contains all other Node IPs in the cluster. + clusterNodeIPSet = "CLUSTER-NODE-IP" + // clusterNodeIP6Set contains all other Node IP6s in the cluster. + clusterNodeIP6Set = "CLUSTER-NODE-IP6" // Antrea proxy NodePort IP antreaNodePortIPSet = "ANTREA-NODEPORT-IP" @@ -114,6 +118,10 @@ type Client struct { clusterIPv4CIDR *net.IPNet // clusterIPv6CIDR stores the calculated ClusterIP CIDR for IPv6. clusterIPv6CIDR *net.IPNet + // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster + clusterNodeIPs sync.Map + // clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster + clusterNodeIP6s sync.Map } // NewClient returns a route client. @@ -339,6 +347,29 @@ func (c *Client) syncIPSet() error { } } + if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + if err := ipset.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false); err != nil { + return err + } + if err := ipset.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, false); err != nil { + return err + } + c.clusterNodeIPs.Range(func(k, _ interface{}) bool { + ipSetEntry := k.(string) + if err := ipset.AddEntry(clusterNodeIPSet, ipSetEntry); err != nil { + return false + } + return true + }) + c.clusterNodeIPs.Range(func(k, _ interface{}) bool { + ipSetEntry := k.(string) + if err := ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return false + } + return true + }) + } + return nil } @@ -444,6 +475,7 @@ func (c *Client) syncIPTables() error { antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, antreaNodePortIPSet, + clusterNodeIPSet, config.VirtualNodePortDNATIPv4, config.VirtualServiceIPv4, snatMarkToIPv4) @@ -459,6 +491,7 @@ func (c *Client) syncIPTables() error { antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, antreaNodePortIP6Set, + clusterNodeIP6Set, config.VirtualNodePortDNATIPv6, config.VirtualServiceIPv6, snatMarkToIPv6) @@ -473,7 +506,8 @@ func (c *Client) syncIPTables() error { func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, - nodePortIPSet string, + nodePortIPSet, + clusterNodeIPSet string, nodePortDNATVirtualIP, serviceVirtualIP net.IP, snatMarkToIP map[uint32]net.IP) *bytes.Buffer { @@ -512,6 +546,19 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, "-j", iptables.NoTrackTarget, }...) } + + if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + // Drop the multicast packets forwarded from other Nodes in the cluster. This is because + // the packet sent out from the sender Pod is already received via tunnel port with encap mode, + // and the one forwarded via the underlay network is to send to external receivers + writeLine(iptablesData, []string{ + "-A", antreaPreRoutingChain, + "-m", "comment", "--comment", `"Antrea: drop Pod multicast traffic forwarded via underlay network"`, + "-m", "set", "--match-set", clusterNodeIPSet, "src", + "-d", types.McastCIDR.String(), + "-j", iptables.DROPTarget, + }...) + } } writeLine(iptablesData, "COMMIT") @@ -1421,6 +1468,54 @@ func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) err return nil } +func (c *Client) AddNodeIPs(nodeIP4 net.IP, nodeIP6 net.IP) error { + if !c.multicastEnabled || !c.networkConfig.TrafficEncapMode.SupportsEncap() { + return nil + } + for _, nodeIP := range []net.IP{nodeIP4, nodeIP6} { + if nodeIP == nil { + continue + } + ipSetEntry := nodeIP.String() + if nodeIP.To4() != nil { + if err := ipset.AddEntry(clusterNodeIPSet, ipSetEntry); err != nil { + return err + } + c.clusterNodeIPs.Store(ipSetEntry, struct{}{}) + } else { + if err := ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return err + } + c.clusterNodeIP6s.Store(ipSetEntry, struct{}{}) + } + } + return nil +} + +func (c *Client) DeleteNodeIPs(nodeIP4 net.IP, nodeIP6 net.IP) error { + if !c.multicastEnabled || !c.networkConfig.TrafficEncapMode.SupportsEncap() { + return nil + } + for _, nodeIP := range []net.IP{nodeIP4, nodeIP6} { + if nodeIP == nil { + continue + } + ipSetEntry := nodeIP.String() + if nodeIP.To4() != nil { + if err := ipset.DelEntry(clusterNodeIPSet, ipSetEntry); err != nil { + return err + } + c.clusterNodeIPs.Store(ipSetEntry, struct{}{}) + } else { + if err := ipset.DelEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return err + } + c.clusterNodeIP6s.Store(ipSetEntry, struct{}{}) + } + } + return nil +} + func getTransProtocolStr(protocol binding.Protocol) string { if protocol == binding.ProtocolTCP || protocol == binding.ProtocolTCPv6 { return "tcp" diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 9d2d250e44a..e8c03cff560 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -419,3 +419,11 @@ func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { return nil } + +func (c *Client) AddNodeIPs(nodeIP4 net.IP, nodeIP6 net.IP) error { + return nil +} + +func (c *Client) DeleteNodeIPs(nodeIP4 net.IP, nodeIP6 net.IP) error { + return nil +} diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 67ee9270b63..d420a826ed7 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2021 Antrea Authors +// Copyright 2022 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -92,6 +92,20 @@ func (mr *MockInterfaceMockRecorder) AddLocalAntreaFlexibleIPAMPodRule(arg0 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLocalAntreaFlexibleIPAMPodRule", reflect.TypeOf((*MockInterface)(nil).AddLocalAntreaFlexibleIPAMPodRule), arg0) } +// AddNodeIPs mocks base method +func (m *MockInterface) AddNodeIPs(arg0, arg1 net.IP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddNodeIPs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddNodeIPs indicates an expected call of AddNodeIPs +func (mr *MockInterfaceMockRecorder) AddNodeIPs(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodeIPs", reflect.TypeOf((*MockInterface)(nil).AddNodeIPs), arg0, arg1) +} + // AddNodePort mocks base method func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() @@ -176,6 +190,20 @@ func (mr *MockInterfaceMockRecorder) DeleteLocalAntreaFlexibleIPAMPodRule(arg0 i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLocalAntreaFlexibleIPAMPodRule", reflect.TypeOf((*MockInterface)(nil).DeleteLocalAntreaFlexibleIPAMPodRule), arg0) } +// DeleteNodeIPs mocks base method +func (m *MockInterface) DeleteNodeIPs(arg0, arg1 net.IP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteNodeIPs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteNodeIPs indicates an expected call of DeleteNodeIPs +func (mr *MockInterfaceMockRecorder) DeleteNodeIPs(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodeIPs", reflect.TypeOf((*MockInterface)(nil).DeleteNodeIPs), arg0, arg1) +} + // DeleteNodePort mocks base method func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 uint16, arg2 openflow.Protocol) error { m.ctrl.T.Helper() diff --git a/pkg/agent/types/multicast.go b/pkg/agent/types/multicast.go index da06447af3b..ae910dea965 100644 --- a/pkg/agent/types/multicast.go +++ b/pkg/agent/types/multicast.go @@ -32,6 +32,7 @@ type IGMPNPRuleInfo struct { var ( McastAllHosts = net.ParseIP("224.0.0.1").To4() + IGMPv3Router = net.ParseIP("224.0.0.22").To4() _, McastCIDR, _ = net.ParseCIDR("224.0.0.0/4") ) diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 6e9a05d7db3..11173aa46b2 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -36,6 +36,7 @@ const ( RawTable = "raw" AcceptTarget = "ACCEPT" + DROPTarget = "DROP" MasqueradeTarget = "MASQUERADE" MarkTarget = "MARK" ReturnTarget = "RETURN" diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 74e362e69b0..e55ceb2df47 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -342,6 +342,7 @@ type BucketBuilder interface { LoadRegRange(regID int, data uint32, rng *Range) BucketBuilder LoadToRegField(field *RegField, data uint32) BucketBuilder ResubmitToTable(tableID uint8) BucketBuilder + SetTunnelDst(addr net.IP) BucketBuilder Done() Group } diff --git a/pkg/ovs/openflow/ofctrl_group.go b/pkg/ovs/openflow/ofctrl_group.go index ee37a35adc5..b16495b0c4a 100644 --- a/pkg/ovs/openflow/ofctrl_group.go +++ b/pkg/ovs/openflow/ofctrl_group.go @@ -16,6 +16,7 @@ package openflow import ( "fmt" + "net" "antrea.io/libOpenflow/openflow13" "antrea.io/ofnet/ofctrl" @@ -128,6 +129,13 @@ func (b *bucketBuilder) ResubmitToTable(tableID uint8) BucketBuilder { return b } +// SetTunnelDst is an action to set tunnel destination address when the bucket is selected. +func (b *bucketBuilder) SetTunnelDst(addr net.IP) BucketBuilder { + setTunDstAct := &ofctrl.SetTunnelDstAction{IP: addr} + b.bucket.AddAction(setTunDstAct.GetActionMessage()) + return b +} + // Weight sets the weight of a bucket. func (b *bucketBuilder) Weight(val uint16) BucketBuilder { b.bucket.Weight = val diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 136e9e43d6f..93008f29a13 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -32,6 +32,8 @@ import ( "antrea.io/antrea/pkg/agent/multicast" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + agentconfig "antrea.io/antrea/pkg/config/agent" + controllerconfig "antrea.io/antrea/pkg/config/controller" "antrea.io/antrea/pkg/features" ) @@ -56,6 +58,24 @@ func TestMulticast(t *testing.T) { if err != nil { t.Fatalf("Error computing multicast interfaces: %v", err) } + t.Run("testMulticastWithNoEncap", func(t *testing.T) { + runMulticastTestCases(t, data, nodeMulticastInterfaces, true) + }) + t.Run("testMulticastWithEncap", func(t *testing.T) { + ac := func(config *agentconfig.AgentConfig) { + config.TrafficEncapMode = "encap" + } + cc := func(config *controllerconfig.ControllerConfig) { + config.FeatureGates["Multicast"] = true + } + if err := data.mutateAntreaConfigMap(cc, ac, true, true); err != nil { + t.Fatalf("Failed to deploy cluster with encap mode: %v", err) + } + runMulticastTestCases(t, data, nodeMulticastInterfaces, false) + }) +} + +func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { t.Run("testMulticastBetweenPodsInTwoNodes", func(t *testing.T) { skipIfNumNodesLessThan(t, 2) testcases := []multicastTestcase{ @@ -92,7 +112,7 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) @@ -132,7 +152,7 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) @@ -556,7 +576,7 @@ func testMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, send } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) @@ -602,19 +622,21 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc continue } for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { - _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) - if err != nil { - return false, err - } - // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, - // the receivers should configure corresponding inbound multicast routes. - if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { - if len(mRouteResult) == 0 { - return false, nil + if checkReceiverRoute { + _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) + if err != nil { + return false, err } - } else { - if len(mRouteResult) != 0 { - return false, nil + // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, + // the receivers should configure corresponding inbound multicast routes. + if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { + if len(mRouteResult) == 0 { + return false, nil + } + } else { + if len(mRouteResult) != 0 { + return false, nil + } } } _, mAddrResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip maddr show %s | grep %s", receiverMulticastInterface, mc.group.String()))