From 43415613c9eb03db0a761a3946c52c765be85e61 Mon Sep 17 00:00:00 2001 From: wenyingd Date: Mon, 13 Jun 2022 16:42:42 +0800 Subject: [PATCH] [Multicast] support encap mode 1, Use a single routine to send local multicast groups in an IGMP v3 Report message to notify all the other Nodes in the cluster 2. Multicast controller maintains both local Pod members and remote Nodes which has Pod members for each multicast group found in the cluster 3. Add remote Node members in the OpenFlow group buckets. Signed-off-by: wenyingd --- cmd/antrea-agent/agent.go | 11 +- .../noderoute/node_route_controller.go | 15 + .../noderoute/node_route_controller_test.go | 4 +- pkg/agent/multicast/mcast_controller.go | 310 +++++++++++---- pkg/agent/multicast/mcast_controller_test.go | 357 ++++++++++++++++-- pkg/agent/multicast/mcast_discovery.go | 81 +++- pkg/agent/multicast/mcast_discovery_test.go | 185 +++++++++ pkg/agent/multicast/mcast_route_test.go | 7 +- pkg/agent/openflow/client.go | 49 ++- pkg/agent/openflow/multicast.go | 91 ++++- pkg/agent/openflow/pipeline.go | 32 +- pkg/agent/openflow/testing/mock_openflow.go | 36 +- pkg/agent/types/multicast.go | 1 + pkg/ovs/openflow/interfaces.go | 1 + pkg/ovs/openflow/ofctrl_group.go | 8 + test/e2e/multicast_test.go | 48 ++- 16 files changed, 1067 insertions(+), 169 deletions(-) create mode 100644 pkg/agent/multicast/mcast_discovery_test.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 901d48cae7f..8432c78808a 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -250,6 +250,10 @@ func run(o *Options) error { ipsecCertController = ipseccertificate.NewIPSecCertificateController(k8sClient, ovsBridgeClient, nodeConfig.Name) } + // podUpdateChannel is a channel for receiving Pod updates from CNIServer and + // notifying NetworkPolicyController and EgressController to reconcile rules + // related to the updated Pods. + nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100) nodeRouteController := noderoute.NewNodeRouteController( k8sClient, informerFactory, @@ -262,6 +266,7 @@ func run(o *Options) error { agentInitializer.GetWireGuardClient(), o.config.AntreaProxy.ProxyAll, ipsecCertController, + nodeUpdateChannel, ) var mcRouteController *mcroute.MCRouteController @@ -521,6 +526,8 @@ func run(o *Options) error { go podUpdateChannel.Run(stopCh) + go nodeUpdateChannel.Run(stopCh) + go routeClient.Run(stopCh) go cniServer.Run(stopCh) @@ -652,7 +659,9 @@ func run(o *Options) error { ovsBridgeClient, podUpdateChannel, o.igmpQueryInterval, - validator) + validator, + networkConfig.TrafficEncapMode.SupportsEncap(), + nodeUpdateChannel) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 61105539235..ba7ffcc1c05 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -40,6 +40,7 @@ import ( "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/agent/wireguard" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/runtime" @@ -84,6 +85,7 @@ type Controller struct { // or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate // to be configured before installing routes/flows to peer Nodes to prevent unencrypted traffic across Nodes. ipsecCertificateManager ipseccertificate.Manager + nodeUpdateSubscriber channel.Notifier } // NewNodeRouteController instantiates a new Controller object which will process Node events @@ -100,6 +102,7 @@ func NewNodeRouteController( wireguardClient wireguard.Interface, proxyAll bool, ipsecCertificateManager ipseccertificate.Manager, + nodeUpdateSubscriber channel.Notifier, ) *Controller { nodeInformer := informerFactory.Core().V1().Nodes() svcLister := informerFactory.Core().V1().Services() @@ -120,6 +123,7 @@ func NewNodeRouteController( wireGuardClient: wireguardClient, proxyAll: proxyAll, ipsecCertificateManager: ipsecCertificateManager, + nodeUpdateSubscriber: nodeUpdateSubscriber, } nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -325,6 +329,7 @@ func (c *Controller) reconcile() error { if err := c.removeStaleWireGuardPeers(); err != nil { return fmt.Errorf("error when removing stale WireGuard peers: %v", err) } + c.notifyNodeList() return nil } @@ -419,6 +424,7 @@ func (c *Controller) processNextWorkItem() bool { // If no error occurs we Forget this item so it does not get queued again until // another change happens. c.queue.Forget(key) + c.notifyNodeList() } else { // Put the item back on the workqueue to handle any transient errors. c.queue.AddRateLimited(key) @@ -819,3 +825,12 @@ func (c *Controller) getNodeTransportAddrs(node *corev1.Node) (*utilip.DualStack } return peerNodeIPs, nil } + +func (c *Controller) notifyNodeList() { + nodesMap := make(map[string]*utilip.DualStackIPs) + for _, n := range c.installedNodes.List() { + node := n.(*nodeRouteInfo) + nodesMap[node.nodeName] = node.nodeIPs + } + c.nodeUpdateSubscriber.Notify(nodesMap) +} diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index fd70d204270..c8aa7cda73e 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -35,6 +35,7 @@ import ( "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/channel" utilip "antrea.io/antrea/pkg/util/ip" ) @@ -75,10 +76,11 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig) (*fakeCont routeClient := routetest.NewMockInterface(ctrl) interfaceStore := interfacestore.NewInterfaceStore() ipsecCertificateManager := &fakeIPsecCertificateManager{} + nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100) c := NewNodeRouteController(clientset, informerFactory, ofClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{ IPv4: nil, MAC: gatewayMAC, - }}, nil, false, ipsecCertificateManager) + }}, nil, false, ipsecCertificateManager, nodeUpdateChannel) return &fakeController{ Controller: c, clientset: clientset, diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 12a14bbcb16..38d4e3f6936 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -35,6 +35,7 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + utilip "antrea.io/antrea/pkg/util/ip" ) type eventType uint8 @@ -57,10 +58,11 @@ var ( ) type mcastGroupEvent struct { - group net.IP - eType eventType - time time.Time - iface *interfacestore.InterfaceConfig + group net.IP + eType eventType + time time.Time + iface *interfacestore.InterfaceConfig + sender net.IP } type GroupMemberStatus struct { @@ -68,6 +70,7 @@ type GroupMemberStatus struct { // localMembers is a map for the local Pod member and its last update time, key is the Pod's interface name, // and value is its last update time. localMembers map[string]time.Time + remoteMembers sets.String lastIGMPReport time.Time ofGroupID binding.GroupIDType } @@ -91,11 +94,12 @@ func (c *Controller) eventHandler(stopCh <-chan struct{}) { // addGroupMemberStatus adds the new group into groupCache. func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { status := &GroupMemberStatus{ - group: e.group, - lastIGMPReport: e.time, - localMembers: map[string]time.Time{e.iface.InterfaceName: e.time}, - ofGroupID: c.v4GroupAllocator.Allocate(), + group: e.group, + ofGroupID: c.v4GroupAllocator.Allocate(), + remoteMembers: sets.NewString(), + localMembers: make(map[string]time.Time), } + status = addGroupMember(status, e) c.groupCache.Add(status) c.queue.Add(e.group.String()) klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName) @@ -112,17 +116,17 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent newStatus := &GroupMemberStatus{ group: status.group, localMembers: make(map[string]time.Time), + remoteMembers: status.remoteMembers, lastIGMPReport: status.lastIGMPReport, ofGroupID: status.ofGroupID, } for m, t := range status.localMembers { newStatus.localMembers[m] = t } - _, exist := status.localMembers[e.iface.InterfaceName] + exist := memberExists(status, e) switch e.eType { case groupJoin: - newStatus.lastIGMPReport = e.time - newStatus.localMembers[e.iface.InterfaceName] = e.time + newStatus = addGroupMember(newStatus, e) c.groupCache.Update(newStatus) if !exist { klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) @@ -130,18 +134,21 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent } case groupLeave: if exist { - delete(newStatus.localMembers, e.iface.InterfaceName) + newStatus = deleteGroupMember(newStatus, e) c.groupCache.Update(newStatus) - klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) - // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are - // other local members in the multicast group. - if !found || len(newStatus.localMembers) > 0 { - c.queue.Add(newStatus.group.String()) + if e.iface.Type == interfacestore.ContainerInterface { + _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) + // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are + // other local members in the multicast group. + if !found || len(newStatus.localMembers) > 0 { + c.queue.Add(newStatus.group.String()) + } else { + // Check if all local members have left the multicast group. + klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.checkLastMember(e.group) + } } else { - // Check if all local members have left the multicast group. - klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - c.checkLastMember(e.group) + c.queue.Add(newStatus.group.String()) } } } @@ -175,6 +182,7 @@ func (c *Controller) clearStaleGroups() { if now.Sub(lastUpdate) > c.mcastGroupTimeout { ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: member, + Type: interfacestore.ContainerInterface, } event := &mcastGroupEvent{ group: status.group, @@ -200,6 +208,7 @@ func (c *Controller) removeLocalInterface(podEvent types.PodUpdate) { interfaceName := util.GenerateContainerInterfaceName(podEvent.PodName, podEvent.PodNamespace, podEvent.ContainerID) ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: interfaceName, + Type: interfacestore.ContainerInterface, } groupStatuses := c.getGroupMemberStatusesByPod(interfaceName) for _, g := range groupStatuses { @@ -223,16 +232,23 @@ type Controller struct { groupCache cache.Indexer queue workqueue.RateLimitingInterface // installedGroups saves the groups which are configured on both OVS and the host. - installedGroups sets.String - installedGroupsMutex sync.RWMutex - mRouteClient *MRouteClient - ovsBridgeClient ovsconfig.OVSBridgeClient + installedGroups sets.String + installedGroupsMutex sync.RWMutex + installedLocalGroups sets.String + installedLocalGroupsMutex sync.RWMutex + mRouteClient *MRouteClient + ovsBridgeClient ovsconfig.OVSBridgeClient // queryInterval is the interval to send IGMP query messages. queryInterval time.Duration // mcastGroupTimeout is the timeout to detect a group as stale if no IGMP report is received within the time. mcastGroupTimeout time.Duration // the group ID in OVS for group which IGMP queries are sent to queryGroupId binding.GroupIDType + // nodeGroupID is the OpenFlow group ID in OVS which is used to send IGMP report message to other Nodes. + nodeGroupID binding.GroupIDType + // installedNodes is the installed Node set that the IGMP report message is sent to. + installedNodes sets.String + encapEnabled bool } func NewMulticastController(ofClient openflow.Client, @@ -244,28 +260,37 @@ func NewMulticastController(ofClient openflow.Client, ovsBridgeClient ovsconfig.OVSBridgeClient, podUpdateSubscriber channel.Subscriber, igmpQueryInterval time.Duration, - validator types.McastNetworkPolicyController) *Controller { + validator types.McastNetworkPolicyController, + isEncap bool, + nodeUpdateChannel channel.Subscriber) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) - groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator) + groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator, isEncap) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) c := &Controller{ - ofClient: ofClient, - ifaceStore: ifaceStore, - v4GroupAllocator: v4GroupAllocator, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, - queryInterval: igmpQueryInterval, - mcastGroupTimeout: igmpQueryInterval * 3, - queryGroupId: v4GroupAllocator.Allocate(), + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + installedLocalGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, + queryInterval: igmpQueryInterval, + mcastGroupTimeout: igmpQueryInterval * 3, + queryGroupId: v4GroupAllocator.Allocate(), + encapEnabled: isEncap, + } + if isEncap { + c.nodeGroupID = v4GroupAllocator.Allocate() + c.installedNodes = sets.NewString() + nodeUpdateChannel.Subscribe(c.nodeChanged) } podUpdateSubscriber.Subscribe(c.memberChanged) return c @@ -290,6 +315,16 @@ func (c *Controller) Initialize() error { if err != nil { return err } + if c.encapEnabled { + // Install OpenFlow group for a new multicast group which has local Pod receivers joined. + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, nil); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + } + if err := c.ofClient.InstallMulticastToRemoteFlows(c.nodeGroupID); err != nil { + klog.ErrorS(err, "Failed to install OpenFlow group and flow to send IGMP report to other Nodes") + return err + } + } return nil } @@ -301,6 +336,10 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } }, c.queryInterval, stopCh) + if c.encapEnabled { + go wait.NonSlidingUntil(c.syncLocalGroupsToOtherNodes, c.queryInterval, stopCh) + } + // Periodically check the group member status, and remove the groups in which no members exist go wait.NonSlidingUntil(c.clearStaleGroups, c.queryInterval, stopCh) go c.eventHandler(stopCh) @@ -370,19 +409,73 @@ func (c *Controller) syncGroup(groupKey string) error { return nil } status := obj.(*GroupMemberStatus) - memberPorts := make([]uint32, 0, len(status.localMembers)+1) - memberPorts = append(memberPorts, config.HostGatewayOFPort) - for memberInterfaceName := range status.localMembers { - obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName) - if !found { - klog.InfoS("Failed to find interface from cache", "interface", memberInterfaceName) - continue + var memberPorts []uint32 + if len(status.localMembers) > 0 { + memberPorts = make([]uint32, 0, len(status.localMembers)+1) + for memberInterfaceName := range status.localMembers { + obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName) + if !found { + klog.InfoS("Failed to find interface from cache", "interface", memberInterfaceName) + continue + } + memberPorts = append(memberPorts, uint32(obj.OFPort)) + } + memberPorts = append(memberPorts, config.HostGatewayOFPort) + } + var remoteNodeReceivers []net.IP + if c.encapEnabled { + remoteNodeReceivers = make([]net.IP, len(status.remoteMembers)) + for i := range status.remoteMembers.List() { + nodeIPStr := status.remoteMembers.List()[i] + remoteNodeReceivers[i] = net.ParseIP(nodeIPStr) + } + } + installLocalMulticastGroup := func() error { + if !c.localGroupHasInstalled(groupKey) && len(status.localMembers) > 0 { + if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { + klog.ErrorS(err, "Failed to install multicast group identified with local members", "group", groupKey) + return err + } + if c.encapEnabled { + if err := c.igmpSnooper.sendIGMPJoinReport([]net.IP{status.group}); err != nil { + klog.ErrorS(err, "Failed to sync local multicast group to other Nodes", "group", groupKey) + } + } + c.addLocalInstalledGroup(groupKey) + klog.InfoS("New local multicast group is added", "group", groupKey) } - memberPorts = append(memberPorts, uint32(obj.OFPort)) + return nil } if c.groupHasInstalled(groupKey) { + localRemoved := false if c.groupIsStale(status) { - // Remove the multicast flow entry if no local Pod is in the group. + if c.localGroupHasInstalled(groupKey) { + err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) + if err != nil { + klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) + return err + } + klog.InfoS("Remove multicast route entry", "group", status.group) + err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) + if err != nil { + klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) + return err + } + + if c.encapEnabled { + group := net.ParseIP(groupKey) + // Send IGMP leave message to other Nodes to notify the current Node leaves the given multicast group. + if err := c.igmpSnooper.sendIGMPLeaveReport([]net.IP{group}); err != nil { + klog.ErrorS(err, "Failed to send IGMP leave message to other Nodes", "group", groupKey) + } + } + c.delInstalledLocalGroup(groupKey) + } + localRemoved = true + } + // TODO: add check for remote report is timeout + if status.remoteMembers.Len() == 0 && localRemoved { + // Remove the multicast OpenFlow flow and group entries if none Pod member on local or remote Node is in the group. if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) return err @@ -393,40 +486,31 @@ func (c *Controller) syncGroup(groupKey string) error { return err } c.v4GroupAllocator.Release(status.ofGroupID) - err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) - if err != nil { - klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) - return err - } - err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) - if err != nil { - klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) - return err - } c.delInstalledGroup(groupKey) c.groupCache.Delete(status) klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) return nil } - // Reinstall OpenFlow group because the local Pod receivers have changed. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + // Reinstall OpenFlow group because the remote node receivers have changed. + klog.V(2).InfoS("Updating OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "remoteReceivers", remoteNodeReceivers) + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Updated OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.V(2).InfoS("Updated OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) return nil } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.V(2).InfoS("Installed OpenFlow group for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) // Install OpenFlow flow to forward packets to local Pod receivers which are included in the group. if err := c.ofClient.InstallMulticastFlows(status.group, status.ofGroupID); err != nil { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } - if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { - klog.ErrorS(err, "Failed to join multicast group for multicast interfaces", "group", status.group) + klog.V(2).InfoS("Installed OpenFlow flow for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) + if err := installLocalMulticastGroup(); err != nil { return err } c.addInstalledGroup(groupKey) @@ -461,6 +545,24 @@ func (c *Controller) delInstalledGroup(groupKey string) { c.installedGroupsMutex.Unlock() } +func (c *Controller) localGroupHasInstalled(groupKey string) bool { + c.installedLocalGroupsMutex.RLock() + defer c.installedLocalGroupsMutex.RUnlock() + return c.installedLocalGroups.Has(groupKey) +} + +func (c *Controller) addLocalInstalledGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Insert(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + +func (c *Controller) delInstalledLocalGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Delete(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) { obj, ok, _ := c.groupCache.GetByKey(e.group.String()) switch e.eType { @@ -509,7 +611,7 @@ func (c *Controller) updateQueryGroup() error { memberPorts = append(memberPorts, uint32(iface.OFPort)) } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts, nil); err != nil { return err } klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", types.McastAllHosts.String(), @@ -517,6 +619,47 @@ func (c *Controller) updateQueryGroup() error { return nil } +// syncLocalGroupsToOtherNodes sends IGMP join message to other Nodes in the same cluster to notify what multicast groups +// are joined by this Node. This function is used only with encap mode. +func (c *Controller) syncLocalGroupsToOtherNodes() { + if c.installedLocalGroups.Len() == 0 { + return + } + localGroups := make([]net.IP, c.installedLocalGroups.Len()) + c.installedLocalGroupsMutex.RLock() + for i := range c.installedLocalGroups.List() { + localGroups[i] = net.ParseIP(c.installedLocalGroups.List()[i]) + } + c.installedLocalGroupsMutex.RUnlock() + if err := c.igmpSnooper.sendIGMPJoinReport(localGroups); err != nil { + klog.ErrorS(err, "Failed to sync local multicast groups to other Nodes") + } +} + +func (c *Controller) nodeChanged(e interface{}) { + nodes, ok := e.(map[string]*utilip.DualStackIPs) + if !ok { + klog.InfoS("Failed to cast nodeUpdateEvent") + return + } + var nodeIPs []net.IP + updatedNodes := sets.NewString() + for _, ips := range nodes { + if ips != nil && ips.IPv4 != nil { + nodeIPs = append(nodeIPs, ips.IPv4) + updatedNodes.Insert(ips.IPv4.String()) + } + } + if c.installedNodes.Equal(updatedNodes) { + klog.V(2).InfoS("Nodes in the cluster are not changed, ignore the event") + return + } + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, nodeIPs); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + } + c.installedNodes = updatedNodes +} + func podInterfaceIndexFunc(obj interface{}) ([]string, error) { groupState := obj.(*GroupMemberStatus) podInterfaces := make([]string, 0, len(groupState.localMembers)) @@ -596,3 +739,34 @@ func (c *Controller) GetAllPodsStats() map[*interfacestore.InterfaceConfig]*PodT } return statsMap } + +func memberExists(status *GroupMemberStatus, e *mcastGroupEvent) bool { + var exist bool + if e.iface.Type == interfacestore.ContainerInterface { + _, exist = status.localMembers[e.iface.InterfaceName] + } else if e.iface.Type == interfacestore.TunnelInterface { + exist = status.remoteMembers.Has(e.sender.String()) + } + return exist +} + +func addGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + status.localMembers[e.iface.InterfaceName] = e.time + status.lastIGMPReport = e.time + } else { + status.remoteMembers = status.remoteMembers.Insert(e.sender.String()) + } + return status +} + +func deleteGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + delete(status.localMembers, e.iface.InterfaceName) + klog.InfoS("Deleted local member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + } else { + status.remoteMembers = status.remoteMembers.Delete(e.sender.String()) + klog.InfoS("Deleted remote member from multicast group", "group", e.group.String(), "member", e.sender) + } + return status +} diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index a0cfd86c814..102228b5df3 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -1,6 +1,3 @@ -//go:build linux -// +build linux - // Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,12 +15,13 @@ package multicast import ( + "fmt" "net" + "os" "sync" "testing" "time" - "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" @@ -31,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "antrea.io/antrea/pkg/agent/config" @@ -45,6 +44,7 @@ import ( "antrea.io/antrea/pkg/apis/crd/v1alpha1" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + utilip "antrea.io/antrea/pkg/util/ip" ) var ( @@ -69,10 +69,8 @@ var ( OFPort: 2, }, } - nodeIf1IP = net.ParseIP("192.168.20.22") - externalInterfaceIP = net.ParseIP("192.168.50.23") - pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") - pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") + nodeIf1IP = net.ParseIP("192.168.20.22") + nodeUpdateChannel *channel.SubscribableChannel ) func TestAddGroupMemberStatus(t *testing.T) { @@ -83,7 +81,7 @@ func TestAddGroupMemberStatus(t *testing.T) { time: time.Now(), iface: if1, } - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, @@ -97,7 +95,7 @@ func TestAddGroupMemberStatus(t *testing.T) { assert.True(t, ok) assert.Equal(t, mgroup.String(), key) mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) err = mctrl.syncGroup(key) @@ -106,10 +104,9 @@ func TestAddGroupMemberStatus(t *testing.T) { } func TestUpdateGroupMemberStatus(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) assert.Nil(t, err) - igmpMaxResponseTime = time.Second * 1 mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ group: mgroup, @@ -143,9 +140,8 @@ func TestUpdateGroupMemberStatus(t *testing.T) { } func TestCheckLastMember(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 - igmpMaxResponseTime = time.Second * 1 lastProbe := time.Now() mgroup := net.ParseIP("224.96.1.2") testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { @@ -195,7 +191,7 @@ func TestCheckLastMember(t *testing.T) { mctrl.queue.Forget(obj) } mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) for _, tc := range []struct { ev *mcastGroupEvent exists bool @@ -209,7 +205,7 @@ func TestCheckLastMember(t *testing.T) { } func TestClearStaleGroups(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 err := mctrl.initialize(t) assert.Nil(t, err) @@ -253,12 +249,14 @@ func TestClearStaleGroups(t *testing.T) { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addLocalInstalledGroup(g.group.String()) } fakePort := int32(1) for _, g := range staleGroups { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addLocalInstalledGroup(g.group.String()) for m := range g.localMembers { mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}} mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true) @@ -283,7 +281,7 @@ func TestClearStaleGroups(t *testing.T) { } func TestProcessPacketIn(t *testing.T) { - mockController := newMockMulticastController(t) + mockController := newMockMulticastController(t, false) snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) @@ -449,6 +447,289 @@ func TestProcessPacketIn(t *testing.T) { } } +func TestEncapModeInitialize(t *testing.T) { + mockController := newMockMulticastController(t, true) + assert.True(t, mockController.nodeGroupID != 0) + err := mockController.initialize(t) + assert.Nil(t, err) +} + +func TestEncapLocalReportAndNotifyRemote(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ + {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + go wait.Until(mockController.worker, time.Second, stopCh) + + iface1 := createInterface("pod1", 3) + iface2 := createInterface("pod2", 4) + mgroup := net.ParseIP("224.2.100.4") + for _, tc := range []struct { + e *mcastGroupEvent + interfaces []*interfacestore.InterfaceConfig + groupChanged bool + ifaceCheck bool + }{ + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: false, ifaceCheck: false}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + } { + groupKey := tc.e.group.String() + if tc.e.eType == groupJoin { + if tc.groupChanged { + mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) + } + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } else { + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + if len(tc.interfaces) == 1 { + mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, types.McastAllHosts, gomock.Any(), gomock.Any()).AnyTimes() + } + if !tc.groupChanged { + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } + if tc.groupChanged { + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(tc.e.group) + mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + } + } + mockController.addOrUpdateGroupEvent(tc.e) + + if tc.groupChanged { + err := wait.PollImmediate(time.Millisecond*100, time.Second*3, func() (done bool, err error) { + if tc.e.eType == groupJoin { + return mockController.localGroupHasInstalled(groupKey) && mockController.groupHasInstalled(groupKey), nil + } else { + return !mockController.localGroupHasInstalled(groupKey) && !mockController.groupHasInstalled(groupKey), nil + } + }) + assert.Nil(t, err) + } else { + time.Sleep(time.Millisecond * 200) + } + } +} + +type nodeUpdateTestController struct { + *Controller + notifyCh chan struct{} +} + +func (c *nodeUpdateTestController) nodeChanged(e interface{}) { + c.Controller.nodeChanged(e) + close(c.notifyCh) +} + +func newNodeUpdateTestController(t *testing.T) *nodeUpdateTestController { + controller := gomock.NewController(t) + mockOFClient = openflowtest.NewMockClient(controller) + groupAllocator := openflow.NewGroupAllocator(false) + mockController := &Controller{ofClient: mockOFClient, nodeGroupID: groupAllocator.Allocate(), installedNodes: sets.NewString()} + return &nodeUpdateTestController{ + Controller: mockController, + } +} + +func TestNodeUpdate(t *testing.T) { + mockController := newNodeUpdateTestController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + nodeUpdateChannel = channel.NewSubscribableChannel("NodeUpdate", 100) + nodeUpdateChannel.Subscribe(mockController.nodeChanged) + go nodeUpdateChannel.Run(stopCh) + + for _, tc := range []struct { + nodeUpdates map[string]string + nodesChanged bool + }{ + { + nodesChanged: true, + nodeUpdates: map[string]string{ + "n1": "10.10.10.11", + "n2": "10.10.10.12", + }, + }, + { + nodesChanged: false, + nodeUpdates: map[string]string{ + "n1": "10.10.10.11", + "n2": "10.10.10.12", + }, + }, + { + nodesChanged: true, + nodeUpdates: map[string]string{ + "n1": "10.10.10.11", + "n2": "10.10.10.12", + "n3": "10.10.10.13", + }, + }, + { + nodesChanged: true, + nodeUpdates: map[string]string{ + "n2": "10.10.10.12", + "n3": "10.10.10.13", + }, + }, + } { + nodeNameIPsMap := make(map[string]*utilip.DualStackIPs, len(tc.nodeUpdates)) + expectedNodeIPStrSet := sets.NewString() + for name, ipStr := range tc.nodeUpdates { + nodeIP := net.ParseIP(ipStr) + nodeNameIPsMap[name] = &utilip.DualStackIPs{ + IPv4: nodeIP, + } + expectedNodeIPStrSet = expectedNodeIPStrSet.Insert(ipStr) + } + mockController.notifyCh = make(chan struct{}) + go func() { + if tc.nodesChanged { + mockOFClient.EXPECT().InstallMulticastGroup(mockController.nodeGroupID, nil, gomock.Any()).Return(nil).Times(1) + } + nodeUpdateChannel.Notify(nodeNameIPsMap) + }() + + <-mockController.notifyCh + assert.Equal(t, expectedNodeIPStrSet, mockController.installedNodes, fmt.Sprintf("installedNodes: %v, expectedNodes: %v", mockController.installedNodes, expectedNodeIPStrSet)) + } +} + +func TestRemoteMemberJoinLeave(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + + stopCh := make(chan struct{}) + defer close(stopCh) + + stopStr := "done" + eventHandler := func(stopCh <-chan struct{}) { + for { + select { + case e := <-mockController.groupEventCh: + if e.group.Equal(net.IPv4zero) { + mockController.queue.Add(stopStr) + } else { + mockController.addOrUpdateGroupEvent(e) + } + case <-stopCh: + return + } + } + } + go eventHandler(stopCh) + + for _, tc := range []struct { + groupStrs []string + nodeStr string + isJoin bool + }{ + {groupStrs: []string{"224.2.100.2", "224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.2", "224.2.100.5"}, nodeStr: "10.10.10.12", isJoin: true}, + {groupStrs: []string{"224.2.100.2"}, nodeStr: "10.10.10.12", isJoin: false}, + } { + groups := make([]net.IP, len(tc.groupStrs)) + for i, g := range tc.groupStrs { + groups[i] = net.ParseIP(g) + } + node := net.ParseIP(tc.nodeStr) + testRemoteReport(t, mockController, groups, node, tc.isJoin, stopStr) + } +} + +func testRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, node net.IP, nodeJoin bool, stopStr string) { + tunnelPort := uint32(2) + proto := uint8(protocol.IGMPIsEx) + if !nodeJoin { + proto = uint8(protocol.IGMPToIn) + } + for _, g := range groups { + var exists bool + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + if !exists { + mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) + } else { + status := obj.(*GroupMemberStatus) + exists = status.remoteMembers.Has(node.String()) + if nodeJoin && exists || !nodeJoin && !exists { + continue + } + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), nil, gomock.Any()) + } + + processNextItem := func(stopStr string) { + for { + obj, quit := mockController.queue.Get() + if quit { + return + } + key := obj.(string) + if key == stopStr { + mockController.queue.Forget(key) + mockController.queue.Done(obj) + return + } + if err := mockController.syncGroup(key); err != nil { + t.Errorf("Failed to process %s: %v", key, err) + } + mockController.queue.Forget(key) + mockController.queue.Done(obj) + } + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + processNextItem(stopStr) + wg.Done() + }() + + err := processRemoteReport(t, mockController, groups, node, proto, tunnelPort) + assert.Nil(t, err) + mockController.groupEventCh <- &mcastGroupEvent{group: net.IPv4zero} + wg.Wait() + + for _, g := range groups { + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + assert.True(t, exists) + status := obj.(*GroupMemberStatus) + if nodeJoin { + assert.True(t, status.remoteMembers.Has(node.String())) + } else { + assert.False(t, status.remoteMembers.Has(node.String())) + } + } + for _, g := range groups { + assert.True(t, mockController.groupHasInstalled(g.String())) + } +} + +func processRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, remoteNode net.IP, reportType uint8, tunnelPort uint32) error { + pkt := generatePacketInForRemoteReport(t, mockController.igmpSnooper, groups, remoteNode, reportType, tunnelPort) + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, nodeIf1IP), true) + return mockController.igmpSnooper.processPacketIn(&pkt) +} + func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEvent) { obj, exits, err := cache.GetByKey(event.group.String()) assert.Nil(t, err) @@ -467,7 +748,7 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven } } -func newMockMulticastController(t *testing.T) *Controller { +func newMockMulticastController(t *testing.T, isEncap bool) *Controller { controller := gomock.NewController(t) mockOFClient = openflowtest.NewMockClient(controller) mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller) @@ -479,17 +760,25 @@ func newMockMulticastController(t *testing.T) *Controller { mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) groupAllocator := openflow.NewGroupAllocator(false) podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100) - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator) + + if isEncap { + nodeUpdateChannel = channel.NewSubscribableChannel("NodeUpdate", 100) + } + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator, isEncap, nodeUpdateChannel) return mctrl } func (c *Controller) initialize(t *testing.T) error { mockOFClient.EXPECT().InstallMulticastInitialFlows(uint8(0)).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{}) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(0)).Times(1).Return([]uint16{0}, nil) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(1)).Times(1).Return([]uint16{1, 2}, nil) + if c.encapEnabled { + mockOFClient.EXPECT().InstallMulticastGroup(c.nodeGroupID, gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastToRemoteFlows(c.nodeGroupID).Times(1) + } return c.Initialize() } @@ -509,32 +798,13 @@ func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig func createIGMPReportPacketIn(joinedGroups []net.IP, leftGroups []net.IP, version uint8, ofport uint32) []*ofctrl.PacketIn { joinMessages := createIGMPJoinMessage(joinedGroups, version) leaveMessages := createIGMPLeaveMessage(leftGroups, version) - generatePacket := func(m util.Message) ofctrl.PacketIn { - pkt := openflow13.NewPacketIn() - matchInport := openflow13.NewInPortField(ofport) - pkt.Match.AddField(*matchInport) - ipPacket := &protocol.IPv4{ - Version: 0x4, - IHL: 5, - Protocol: IGMPProtocolNumber, - Length: 20 + m.Len(), - Data: m, - } - pkt.Data = protocol.Ethernet{ - HWDst: pktInDstMAC, - HWSrc: pktInSrcMAC, - Ethertype: protocol.IPv4_MSG, - Data: ipPacket, - } - return ofctrl.PacketIn(*pkt) - } pkts := make([]*ofctrl.PacketIn, 0) for _, m := range joinMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } for _, m := range leaveMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } return pkts @@ -578,3 +848,8 @@ func createIGMPJoinMessage(groups []net.IP, version uint8) []util.Message { } return pkts } + +func TestMain(m *testing.M) { + igmpMaxResponseTime = time.Second + os.Exit(m.Run()) +} diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index d6ffcef5a75..3743ae59baf 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -39,6 +39,11 @@ const ( IGMPProtocolNumber = 2 ) +const ( + openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC" + openflowKeyInPort = "OXM_OF_IN_PORT" +) + var ( // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message @@ -46,6 +51,8 @@ var ( igmpMaxResponseTime = time.Second * 10 // igmpQueryDstMac is the MAC address used in the dst MAC field in the IGMP query message igmpQueryDstMac, _ = net.ParseMAC("01:00:5e:00:00:01") + // igmpReportDstMac is the MAC address used in the dst MAC field in the IGMP report message + igmpReportDstMac, _ = net.ParseMAC("01:00:5e:00:00:16") ) type IGMPSnooper struct { @@ -62,6 +69,7 @@ type IGMPSnooper struct { // Similar to igmpReportANPStats, it stores ACNP stats for IGMP reports. igmpReportACNPStats map[apitypes.UID]map[string]*types.RuleMetric igmpReportACNPStatsMutex sync.Mutex + encapEnabled bool } func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { @@ -92,7 +100,7 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow13.NXRange) (uint32, func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) { matches := pktIn.GetMatches() - ofPortField := matches.GetMatchByName("OXM_OF_IN_PORT") + ofPortField := matches.GetMatchByName(openflowKeyInPort) if ofPortField == nil { return nil, errors.New("in_port field not found") } @@ -127,6 +135,11 @@ func (s *IGMPSnooper) validate(event *mcastGroupEvent, igmpType uint8, packetInD // Return true directly if there is no validator. return true, nil } + // MulticastValidator only validates the IGMP report message sent from Pods. The report message received from tunnel + // port is sent from Antrea Agent on a different Node, and returns true directly. + if event.iface.Type == interfacestore.TunnelInterface { + return true, nil + } if event.iface.Type != interfacestore.ContainerInterface { return true, fmt.Errorf("interface is not container") } @@ -201,6 +214,42 @@ func (s *IGMPSnooper) collectStats() (igmpANPStats, igmpACNPStats map[apitypes.U return igmpANPStats, igmpACNPStats } +func (s *IGMPSnooper) sendIGMPReport(groupRecordType uint8, groups []net.IP) error { + igmp, err := s.generateIGMPReportPacket(groupRecordType, groups) + if err != nil { + return err + } + if err := s.ofClient.SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, igmp); err != nil { + return err + } + klog.V(2).InfoS("Sent packetOut for IGMP v3 report", "groups", groups) + return nil +} + +func (s *IGMPSnooper) generateIGMPReportPacket(groupRecordType uint8, groups []net.IP) (util.Message, error) { + records := make([]protocol.IGMPv3GroupRecord, len(groups)) + for i, group := range groups { + records[i] = protocol.IGMPv3GroupRecord{ + Type: groupRecordType, + MulticastAddress: group, + } + } + return &protocol.IGMPv3MembershipReport{ + Type: protocol.IGMPv3Report, + Checksum: 0, + NumberOfGroups: uint16(len(records)), + GroupRecords: records, + }, nil +} + +func (s *IGMPSnooper) sendIGMPJoinReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPIsEx, groups) +} + +func (s *IGMPSnooper) sendIGMPLeaveReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPToIn, groups) +} + func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { now := time.Now() iface, err := s.parseSrcInterface(pktIn) @@ -209,8 +258,15 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { } klog.V(2).InfoS("Received PacketIn for IGMP packet", "in_port", iface.OFPort) podName := "unknown" + var sender net.IP if iface.Type == interfacestore.ContainerInterface { podName = iface.PodName + } else if iface.Type == interfacestore.TunnelInterface { + var err error + sender, err = s.parseSrcNode(pktIn) + if err != nil { + return err + } } igmp, err := parseIGMPPacket(pktIn.Data) if err != nil { @@ -240,10 +296,11 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { evtType = groupLeave } event := &mcastGroupEvent{ - group: mgroup, - eType: evtType, - time: now, - iface: iface, + group: mgroup, + eType: evtType, + time: now, + iface: iface, + sender: sender, } s.validatePacketAndNotify(event, igmpType, pktIn.Data) } @@ -261,6 +318,16 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { return nil } +func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) { + matches := pktIn.GetMatches() + tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc) + if tunSrcField == nil { + return nil, errors.New("in_port field not found") + } + tunSrc := tunSrcField.GetValue().(net.IP) + return tunSrc, nil +} + func generateIGMPQueryPacket(group net.IP, version uint8, queryInterval time.Duration) (util.Message, error) { // The max response time field in IGMP protocol uses a value in units of 1/10 second. // See https://datatracker.ietf.org/doc/html/rfc2236 and https://datatracker.ietf.org/doc/html/rfc3376 @@ -332,8 +399,8 @@ func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) { } } -func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController) *IGMPSnooper { - snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval} +func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController, encapEnabled bool) *IGMPSnooper { + snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval, encapEnabled: encapEnabled} snooper.igmpReportACNPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) snooper.igmpReportANPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonMC), "MulticastGroupDiscovery", snooper) diff --git a/pkg/agent/multicast/mcast_discovery_test.go b/pkg/agent/multicast/mcast_discovery_test.go new file mode 100644 index 00000000000..2d36f5899ab --- /dev/null +++ b/pkg/agent/multicast/mcast_discovery_test.go @@ -0,0 +1,185 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "net" + "sync" + "testing" + + "antrea.io/libOpenflow/openflow13" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + + "antrea.io/antrea/pkg/agent/interfacestore" + ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/ovs/ovsconfig" +) + +var ( + pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") + pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") +) + +type snooperValidator struct { + eventCh chan *mcastGroupEvent + groupJoinedNodes map[string]sets.String + groupLeftNodes map[string]sets.String +} + +func (v *snooperValidator) processPackets(expectedPackets int) { + appendSrcNode := func(groupKey string, groupNodes map[string]sets.String, nodeIP net.IP) map[string]sets.String { + _, exists := groupNodes[groupKey] + if !exists { + groupNodes[groupKey] = sets.NewString() + } + groupNodes[groupKey] = groupNodes[groupKey].Insert(nodeIP.String()) + return groupNodes + } + for i := 0; i < expectedPackets; i++ { + select { + case e := <-v.eventCh: + groupKey := e.group.String() + if e.eType == groupJoin { + v.groupJoinedNodes = appendSrcNode(groupKey, v.groupJoinedNodes, e.sender) + } else { + v.groupLeftNodes = appendSrcNode(groupKey, v.groupLeftNodes, e.sender) + } + } + } +} + +func TestIGMPRemoteReport(t *testing.T) { + controller := gomock.NewController(t) + mockOFClient := openflowtest.NewMockClient(controller) + mockIfaceStore := ifaceStoretest.NewMockInterfaceStore(controller) + eventCh := make(chan *mcastGroupEvent, 100) + snooper := &IGMPSnooper{ofClient: mockOFClient, eventCh: eventCh, ifaceStore: mockIfaceStore} + + localNodeIP := net.ParseIP("1.2.3.4") + tunnelPort := uint32(1) + wg := sync.WaitGroup{} + + generateRemotePackets := func(groups []net.IP, nodes []net.IP, igmpMsgType uint8) []ofctrl.PacketIn { + packets := make([]ofctrl.PacketIn, 0, len(nodes)) + for _, srcNode := range nodes { + pkt := generatePacketInForRemoteReport(t, snooper, groups, srcNode, igmpMsgType, tunnelPort) + packets = append(packets, pkt) + } + return packets + } + validateGroupNodes := func(groups []net.IP, expectedNodesIPs []net.IP, testGroupNodes map[string]sets.String) { + if len(expectedNodesIPs) == 0 { + return + } + for _, g := range groups { + expectedNodes := sets.NewString() + for _, n := range expectedNodesIPs { + expectedNodes.Insert(n.String()) + } + nodes, exists := testGroupNodes[g.String()] + assert.True(t, exists) + assert.True(t, nodes.HasAll(expectedNodes.List()...)) + } + } + testPacketProcess := func(groups []net.IP, joinedNodes []net.IP, leftNodes []net.IP) { + validator := snooperValidator{eventCh: eventCh, groupJoinedNodes: make(map[string]sets.String), groupLeftNodes: make(map[string]sets.String)} + packets := make([]ofctrl.PacketIn, 0, len(joinedNodes)+len(leftNodes)) + packets = append(packets, generateRemotePackets(groups, joinedNodes, protocol.IGMPIsEx)...) + packets = append(packets, generateRemotePackets(groups, leftNodes, protocol.IGMPToIn)...) + + eventCount := len(groups) * len(packets) + wg.Add(1) + go func() { + validator.processPackets(eventCount) + wg.Done() + }() + + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, localNodeIP), true).Times(len(packets)) + for i := range packets { + pkt := &packets[i] + err := snooper.processPacketIn(pkt) + assert.Nil(t, err, "Failed to process IGMP Report message") + } + + wg.Wait() + + validateGroupNodes(groups, joinedNodes, validator.groupJoinedNodes) + validateGroupNodes(groups, leftNodes, validator.groupLeftNodes) + } + + for _, tc := range []struct { + groupsStrings []string + joinedNodesStrings []string + leftNodesStrings []string + }{ + {groupsStrings: []string{"225.1.2.3", "225.1.2.4"}, joinedNodesStrings: []string{"1.2.3.5", "1.2.3.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + {groupsStrings: []string{"225.1.2.5"}, joinedNodesStrings: []string{"1.2.3.5"}}, + {groupsStrings: []string{"225.1.2.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + } { + var groups, joinedNodes, leftNodes []net.IP + for _, g := range tc.groupsStrings { + groups = append(groups, net.ParseIP(g)) + } + for _, n := range tc.joinedNodesStrings { + joinedNodes = append(joinedNodes, net.ParseIP(n)) + } + for _, n := range tc.leftNodesStrings { + leftNodes = append(leftNodes, net.ParseIP(n)) + } + testPacketProcess(groups, joinedNodes, leftNodes) + } +} + +func generatePacket(m util.Message, ofport uint32, srcNodeIP net.IP) ofctrl.PacketIn { + pkt := openflow13.NewPacketIn() + matchInport := openflow13.NewInPortField(ofport) + pkt.Match.AddField(*matchInport) + if srcNodeIP != nil { + matchTunSrc := openflow13.NewTunnelIpv4SrcField(srcNodeIP, nil) + pkt.Match.AddField(*matchTunSrc) + } + ipPacket := &protocol.IPv4{ + Version: 0x4, + IHL: 5, + Protocol: IGMPProtocolNumber, + Length: 20 + m.Len(), + Data: m, + } + pkt.Data = protocol.Ethernet{ + HWDst: pktInDstMAC, + HWSrc: pktInSrcMAC, + Ethertype: protocol.IPv4_MSG, + Data: ipPacket, + } + return ofctrl.PacketIn(*pkt) +} + +func generatePacketInForRemoteReport(t *testing.T, snooper *IGMPSnooper, groups []net.IP, srcNode net.IP, igmpMsgType uint8, tunnelPort uint32) ofctrl.PacketIn { + msg, err := snooper.generateIGMPReportPacket(igmpMsgType, groups) + assert.Nil(t, err, "Failed to generate IGMP Report message") + return generatePacket(msg, tunnelPort, srcNode) +} + +func createTunnelInterface(tunnelPort uint32, localNodeIP net.IP) *interfacestore.InterfaceConfig { + tunnelInterface := interfacestore.NewTunnelInterface("antrea-tun0", ovsconfig.GeneveTunnel, localNodeIP, false) + tunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: int32(tunnelPort)} + return tunnelInterface +} diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index b9f4c14d69c..fda79598031 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -33,9 +33,10 @@ import ( ) var ( - addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} - addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} - nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} + externalInterfaceIP = net.ParseIP("192.168.50.23") + addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} + addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} + nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} ) func TestParseIGMPMsg(t *testing.T) { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index d4995f2117d..b8bcfb0b25a 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -19,6 +19,7 @@ import ( "math/rand" "net" + "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" ofutil "antrea.io/libOpenflow/util" v1 "k8s.io/api/core/v1" @@ -280,11 +281,18 @@ type Client interface { // InstallMulticastInitialFlows installs OpenFlow to packetIn the IGMP messages and output the Multicast traffic to // antrea-gw0 so that local Pods could access external Multicast servers. InstallMulticastInitialFlows(pktInReason uint8) error + // InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error + // UninstallMulticastFlows removes the flow matching the given multicastIP. UninstallMulticastFlows(multicastIP net.IP) error + + // InstallMulticastRemoteReportFlows installs OpenFlow to forward the IGMP report messages to the other Nodes, + // and packetIn the messages to Antrea Agent on peer Node. + // This function is called with encap mode, and used to sync the multicast groups in the cluster. + InstallMulticastToRemoteFlows(groupID binding.GroupIDType) error // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, @@ -304,7 +312,13 @@ type Client interface { // UninstallTrafficControlReturnPortFlow removes the flow to classify the packets from a return port. UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error - InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32) error + InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error + + // SendIGMPRemoteReportPacketOut sends the IGMP report packet as a packet-out to remote Nodes via the tunnel port. + SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error // InstallMulticlusterNodeFlows installs flows to handle cross-cluster packets between a regular // Node and a local Gateway. @@ -800,7 +814,7 @@ func (c *client) generatePipelines() { if c.enableMulticast { // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort, c.networkConfig.TrafficEncapMode.SupportsEncap(), config.DefaultTunOFPort) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } @@ -1190,6 +1204,15 @@ func (c *client) UninstallMulticastFlows(multicastIP net.IP) error { return c.deleteFlows(c.featureMulticast.cachedFlows, cacheKey) } +func (c *client) InstallMulticastToRemoteFlows(groupID binding.GroupIDType) error { + firstMulticastTable := c.pipelines[pipelineMulticast].GetFirstTable() + flows := c.featureMulticast.multicastToRemoteFlows(groupID, firstMulticastTable) + cacheKey := "multicast-encap" + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) +} + func (c *client) SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, dstIP net.IP, @@ -1240,7 +1263,25 @@ func (c *client) UninstallTrafficControlReturnPortFlow(returnOFPort uint32) erro return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey) } -func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32) error { +func (c *client) SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error { + srcMAC := c.nodeConfig.GatewayConfig.MAC.String() + srcIP := c.nodeConfig.NodeTransportIPv4Addr.IP.String() + dstMACStr := dstMAC.String() + dstIPStr := dstIP.String() + packetOutBuilder, err := setBasePacketOutBuilder(c.bridge.BuildPacketOut(), srcMAC, dstMACStr, srcIP, dstIPStr, openflow13.P_CONTROLLER, 0) + if err != nil { + return err + } + // Set protocol, L4 message, and target OF Group ID. + packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolIGMP).SetL4Packet(igmp) + packetOutObj := packetOutBuilder.Done() + return c.bridge.SendPacketOut(packetOutObj) +} + +func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() table := MulticastOutputTable @@ -1248,7 +1289,7 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive table = MulticastIngressRuleTable } - if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers...); err != nil { + if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers, remoteNodeReceivers); err != nil { return err } return nil diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 79df3424a88..7488d068c15 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -19,8 +19,10 @@ import ( "net" "sync" + "antrea.io/libOpenflow/openflow13" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/types" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -31,6 +33,8 @@ type featureMulticast struct { ipProtocols []binding.Protocol bridge binding.Bridge gatewayPort uint32 + encapEnabled bool + tunnelPort uint32 cachedFlows *flowCategoryCache groupCache sync.Map @@ -43,7 +47,7 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32, encapEnabled bool, tunnelPort uint32) *featureMulticast { return &featureMulticast{ cookieAllocator: cookieAllocator, ipProtocols: ipProtocols, @@ -53,6 +57,8 @@ func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding groupCache: sync.Map{}, enableAntreaPolicy: anpEnabled, gatewayPort: gwPort, + encapEnabled: encapEnabled, + tunnelPort: tunnelPort, } } @@ -68,9 +74,7 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b func (f *featureMulticast) initFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() - return []binding.Flow{ - f.multicastOutputFlow(cookieID), - } + return f.multicastOutputFlows(cookieID) } func (f *featureMulticast) replayFlows() []binding.Flow { @@ -78,7 +82,7 @@ func (f *featureMulticast) replayFlows() []binding.Flow { return getCachedFlows(f.cachedFlows) } -func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports ...uint32) error { +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) error { group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() for i := range ports { group = group.Bucket(). @@ -87,6 +91,14 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ResubmitToTable(tableID). Done() } + for _, ip := range remoteIPs { + group = group.Bucket(). + LoadToRegField(OFPortFoundRegMark.GetField(), OFPortFoundRegMark.GetValue()). + LoadToRegField(TargetOFPortField, f.tunnelPort). + SetTunnelDst(ip). + ResubmitToTable(MulticastOutputTable.GetID()). + Done() + } if err := group.Add(); err != nil { return fmt.Errorf("error when installing Multicast receiver Group: %w", err) } @@ -94,12 +106,38 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, return nil } -func (f *featureMulticast) multicastOutputFlow(cookieID uint64) binding.Flow { - return MulticastOutputTable.ofTable.BuildFlow(priorityNormal). - Cookie(cookieID). - MatchRegMark(OFPortFoundRegMark). - Action().OutputToRegField(TargetOFPortField). - Done() +func (f *featureMulticast) multicastOutputFlows(cookieID uint64) []binding.Flow { + flows := []binding.Flow{ + MulticastOutputTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done(), + } + if f.encapEnabled { + // When running with encap mode, drop the multicast packets if it is received from tunnel port and expected to + // output to antrea-gw0, or received from antrea-gw0 and expected to output to tunnel. These flows are used to + // avoid duplication on packet forwarding. For example, if the packet is received on tunnel port, it means + // the sender is a Pod on other Node, then the packet is already sent to external via antrea-gw0 on the source + // Node. On the reverse, if the packet is received on antrea-gw0, it means the sender is from external, then + // the Pod receivers on other Nodes should also receive the packets from the underlay network. + flows = append(flows, MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromTunnelRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.HostGatewayOFPort). + Action().Drop(). + Done(), + MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromGatewayRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.DefaultTunOFPort). + Action().Drop(). + Done(), + ) + } + return flows } func (f *featureMulticast) multicastSkipIGMPMetricFlows() []binding.Flow { @@ -147,3 +185,34 @@ func (f *featureMulticast) replayGroups() { return true }) } + +func (f *featureMulticast) multicastToRemoteFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow { + return []binding.Flow{ + // This flow outputs the IGMP report message sent from Antrea Agent to an OpenFlow group which is expected to + // broadcast to all the other Nodes in the cluster. The multicast groups in side the IGMP report message + // include the ones local Pods have joined in. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchInPort(openflow13.P_CONTROLLER). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().Group(groupID). + Done(), + // This flow ensures the IGMP report message sent from Antrea Agent to bypass the check in SpoofGuardTable. + ClassifierTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(openflow13.P_CONTROLLER). + Action().GotoTable(SpoofGuardTable.GetNext()). + Done(), + // This flow ensures the multicast packet sent from a different Node via the tunnel port to enter Multicast + // pipeline. + ClassifierTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(config.DefaultTunOFPort). + MatchProtocol(binding.ProtocolIP). + MatchDstIPNet(*types.McastCIDR). + Action().LoadRegMark(FromTunnelRegMark). + Action().GotoTable(firstMulticastTable.GetID()). + Done(), + } +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 3fdb03c5f69..b928ea13bef 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2666,20 +2666,24 @@ func pipelineClassifyFlow(cookieID uint64, protocol binding.Protocol, pipeline b // igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastRoutingTable // and sends it to antrea-agent. func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { - flows := []binding.Flow{ - // Set a custom reason for the IGMP packets, and then send it to antrea-agent and forward it normally in the - // OVS bridge, so that the OVS multicast db cache can be updated, and antrea-agent can identify the local multicast - // group and its members in the meanwhile. - // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in - // the packet. - MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). - Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchProtocol(binding.ProtocolIGMP). - MatchRegMark(FromLocalRegMark). - Action().LoadRegMark(CustomReasonIGMPRegMark). - Action().SendToController(reason). - Action().Normal(). - Done(), + var flows []binding.Flow + sourceMarks := []*binding.RegMark{FromLocalRegMark} + if f.encapEnabled { + sourceMarks = append(sourceMarks, FromTunnelRegMark) + } + for _, m := range sourceMarks { + flows = append(flows, + // Set a custom reason for the IGMP packets, and then send it to antrea-agent. Then antrea-agent can identify + // the local multicast group and its members in the meanwhile. + // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in + // the packet. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchRegMark(m). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().SendToController(reason). + Done()) } return flows } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index f59dc19007f..235c19212b9 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -297,17 +297,17 @@ func (mr *MockClientMockRecorder) InstallMulticastFlows(arg0, arg1 interface{}) } // InstallMulticastGroup mocks base method -func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32) error { +func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32, arg2 []net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1) + ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // InstallMulticastGroup indicates an expected call of InstallMulticastGroup -func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1, arg2) } // InstallMulticastInitialFlows mocks base method @@ -324,6 +324,20 @@ func (mr *MockClientMockRecorder) InstallMulticastInitialFlows(arg0 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastInitialFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastInitialFlows), arg0) } +// InstallMulticastToRemoteFlows mocks base method +func (m *MockClient) InstallMulticastToRemoteFlows(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastToRemoteFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastToRemoteFlows indicates an expected call of InstallMulticastToRemoteFlows +func (mr *MockClientMockRecorder) InstallMulticastToRemoteFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastToRemoteFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastToRemoteFlows), arg0) +} + // InstallMulticlusterClassifierFlows mocks base method func (m *MockClient) InstallMulticlusterClassifierFlows(arg0 uint32, arg1 bool) error { m.ctrl.T.Helper() @@ -670,6 +684,20 @@ func (mr *MockClientMockRecorder) SendIGMPQueryPacketOut(arg0, arg1, arg2, arg3 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPQueryPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPQueryPacketOut), arg0, arg1, arg2, arg3) } +// SendIGMPRemoteReportPacketOut mocks base method +func (m *MockClient) SendIGMPRemoteReportPacketOut(arg0 net.HardwareAddr, arg1 net.IP, arg2 util.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendIGMPRemoteReportPacketOut", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendIGMPRemoteReportPacketOut indicates an expected call of SendIGMPRemoteReportPacketOut +func (mr *MockClientMockRecorder) SendIGMPRemoteReportPacketOut(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPRemoteReportPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPRemoteReportPacketOut), arg0, arg1, arg2) +} + // SendTCPPacketOut mocks base method func (m *MockClient) SendTCPPacketOut(arg0, arg1, arg2, arg3 string, arg4, arg5 uint32, arg6 bool, arg7, arg8 uint16, arg9 uint32, arg10 byte, arg11 func(openflow.PacketOutBuilder) openflow.PacketOutBuilder) error { m.ctrl.T.Helper() diff --git a/pkg/agent/types/multicast.go b/pkg/agent/types/multicast.go index da06447af3b..ae910dea965 100644 --- a/pkg/agent/types/multicast.go +++ b/pkg/agent/types/multicast.go @@ -32,6 +32,7 @@ type IGMPNPRuleInfo struct { var ( McastAllHosts = net.ParseIP("224.0.0.1").To4() + IGMPv3Router = net.ParseIP("224.0.0.22").To4() _, McastCIDR, _ = net.ParseCIDR("224.0.0.0/4") ) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 74e362e69b0..e55ceb2df47 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -342,6 +342,7 @@ type BucketBuilder interface { LoadRegRange(regID int, data uint32, rng *Range) BucketBuilder LoadToRegField(field *RegField, data uint32) BucketBuilder ResubmitToTable(tableID uint8) BucketBuilder + SetTunnelDst(addr net.IP) BucketBuilder Done() Group } diff --git a/pkg/ovs/openflow/ofctrl_group.go b/pkg/ovs/openflow/ofctrl_group.go index ee37a35adc5..899f61f9acc 100644 --- a/pkg/ovs/openflow/ofctrl_group.go +++ b/pkg/ovs/openflow/ofctrl_group.go @@ -16,6 +16,7 @@ package openflow import ( "fmt" + "net" "antrea.io/libOpenflow/openflow13" "antrea.io/ofnet/ofctrl" @@ -128,6 +129,13 @@ func (b *bucketBuilder) ResubmitToTable(tableID uint8) BucketBuilder { return b } +// SetTunnelDst is an actions to set tunnel destination address when the bucket is seleted. +func (b *bucketBuilder) SetTunnelDst(addr net.IP) BucketBuilder { + setTunDstAct := &ofctrl.SetTunnelDstAction{IP: addr} + b.bucket.AddAction(setTunDstAct.GetActionMessage()) + return b +} + // Weight sets the weight of a bucket. func (b *bucketBuilder) Weight(val uint16) BucketBuilder { b.bucket.Weight = val diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 136e9e43d6f..076316cd997 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -32,6 +32,7 @@ import ( "antrea.io/antrea/pkg/agent/multicast" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + agentconfig "antrea.io/antrea/pkg/config/agent" "antrea.io/antrea/pkg/features" ) @@ -56,6 +57,21 @@ func TestMulticast(t *testing.T) { if err != nil { t.Fatalf("Error computing multicast interfaces: %v", err) } + t.Run("testMulticastWithNoEncap", func(t *testing.T) { + runMulticastTestCases(t, data, nodeMulticastInterfaces, true) + }) + t.Run("testMulticastWithEncap", func(t *testing.T) { + ac := func(config *agentconfig.AgentConfig) { + config.TrafficEncapMode = "encap" + } + if err := data.mutateAntreaConfigMap(nil, ac, false, true); err != nil { + t.Fatalf("Failed to deploy cluster with encap mode: %v", err) + } + runMulticastTestCases(t, data, nodeMulticastInterfaces, false) + }) +} + +func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { t.Run("testMulticastBetweenPodsInTwoNodes", func(t *testing.T) { skipIfNumNodesLessThan(t, 2) testcases := []multicastTestcase{ @@ -92,7 +108,7 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) @@ -132,7 +148,7 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) @@ -556,7 +572,7 @@ func testMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, send } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) @@ -602,19 +618,21 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc continue } for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { - _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) - if err != nil { - return false, err - } - // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, - // the receivers should configure corresponding inbound multicast routes. - if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { - if len(mRouteResult) == 0 { - return false, nil + if checkReceiverRoute { + _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) + if err != nil { + return false, err } - } else { - if len(mRouteResult) != 0 { - return false, nil + // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, + // the receivers should configure corresponding inbound multicast routes. + if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { + if len(mRouteResult) == 0 { + return false, nil + } + } else { + if len(mRouteResult) != 0 { + return false, nil + } } } _, mAddrResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip maddr show %s | grep %s", receiverMulticastInterface, mc.group.String()))