Skip to content

Commit

Permalink
[Multicast] support encap mode
Browse files Browse the repository at this point in the history
1, Use a single routine to send local multicast groups in an IGMP v3
   Report message to notify all the other Nodes in the cluster
2. Multicast controller maintains both local Pod members and remote
   Nodes which has Pod members for each multicast group found in the
   cluster
3. Add remote Node members in the OpenFlow group buckets.
4. Agent drops the duplicated multicast packet received from underlay by
   - adding iptables rules in raw table antreaPreRouting chain to drop
     multicast packets sent from other Nodes, because the Pod multicast
     traffic is received from tunnel with encap mode.
   - adding an ipset to maintain IPs of other Nodes in the cluster, which
     is used as source in the iptables rule.

Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Jul 8, 2022
1 parent f022dd8 commit c5e37e7
Show file tree
Hide file tree
Showing 30 changed files with 1,292 additions and 201 deletions.
2 changes: 1 addition & 1 deletion build/charts/antrea/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ featureGates:
# IPAM when configuring secondary network interfaces with Multus.
{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "AntreaIPAM" "default" false) }}

# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }}

# Enable Antrea Multi-cluster Gateway to support cross-cluster traffic.
Expand Down
2 changes: 1 addition & 1 deletion build/charts/antrea/conf/antrea-controller.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ featureGates:
# Enable collecting and exposing NetworkPolicy statistics.
{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NetworkPolicyStats" "default" true) }}

# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }}

# Enable controlling SNAT IPs of Pod egress traffic.
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,7 @@ data:
# IPAM when configuring secondary network interfaces with Multus.
# AntreaIPAM: false
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable Antrea Multi-cluster Gateway to support cross-cluster traffic.
Expand Down Expand Up @@ -2872,7 +2872,7 @@ data:
# Enable collecting and exposing NetworkPolicy statistics.
# NetworkPolicyStats: true
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable controlling SNAT IPs of Pod egress traffic.
Expand Down Expand Up @@ -3692,7 +3692,7 @@ spec:
kubectl.kubernetes.io/default-container: antrea-agent
# Automatically restart Pods with a RollingUpdate if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745
checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f
labels:
app: antrea
component: antrea-agent
Expand Down Expand Up @@ -3932,7 +3932,7 @@ spec:
annotations:
# Automatically restart Pod if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745
checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f
labels:
app: antrea
component: antrea-controller
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,7 @@ data:
# IPAM when configuring secondary network interfaces with Multus.
# AntreaIPAM: false
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable Antrea Multi-cluster Gateway to support cross-cluster traffic.
Expand Down Expand Up @@ -2872,7 +2872,7 @@ data:
# Enable collecting and exposing NetworkPolicy statistics.
# NetworkPolicyStats: true
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable controlling SNAT IPs of Pod egress traffic.
Expand Down Expand Up @@ -3692,7 +3692,7 @@ spec:
kubectl.kubernetes.io/default-container: antrea-agent
# Automatically restart Pods with a RollingUpdate if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745
checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f
labels:
app: antrea
component: antrea-agent
Expand Down Expand Up @@ -3934,7 +3934,7 @@ spec:
annotations:
# Automatically restart Pod if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745
checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f
labels:
app: antrea
component: antrea-controller
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,7 @@ data:
# IPAM when configuring secondary network interfaces with Multus.
# AntreaIPAM: false
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable Antrea Multi-cluster Gateway to support cross-cluster traffic.
Expand Down Expand Up @@ -2872,7 +2872,7 @@ data:
# Enable collecting and exposing NetworkPolicy statistics.
# NetworkPolicyStats: true
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable controlling SNAT IPs of Pod egress traffic.
Expand Down Expand Up @@ -3692,7 +3692,7 @@ spec:
kubectl.kubernetes.io/default-container: antrea-agent
# Automatically restart Pods with a RollingUpdate if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447
checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904
labels:
app: antrea
component: antrea-agent
Expand Down Expand Up @@ -3932,7 +3932,7 @@ spec:
annotations:
# Automatically restart Pod if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447
checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904
labels:
app: antrea
component: antrea-controller
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2596,7 +2596,7 @@ data:
# IPAM when configuring secondary network interfaces with Multus.
# AntreaIPAM: false
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable Antrea Multi-cluster Gateway to support cross-cluster traffic.
Expand Down Expand Up @@ -2885,7 +2885,7 @@ data:
# Enable collecting and exposing NetworkPolicy statistics.
# NetworkPolicyStats: true
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable controlling SNAT IPs of Pod egress traffic.
Expand Down Expand Up @@ -3705,7 +3705,7 @@ spec:
kubectl.kubernetes.io/default-container: antrea-agent
# Automatically restart Pods with a RollingUpdate if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019
checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8
checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4
labels:
app: antrea
Expand Down Expand Up @@ -3991,7 +3991,7 @@ spec:
annotations:
# Automatically restart Pod if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019
checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8
labels:
app: antrea
component: antrea-controller
Expand Down
8 changes: 4 additions & 4 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,7 @@ data:
# IPAM when configuring secondary network interfaces with Multus.
# AntreaIPAM: false
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable Antrea Multi-cluster Gateway to support cross-cluster traffic.
Expand Down Expand Up @@ -2872,7 +2872,7 @@ data:
# Enable collecting and exposing NetworkPolicy statistics.
# NetworkPolicyStats: true
# Enable multicast traffic. This feature is supported only with noEncap mode.
# Enable multicast traffic.
# Multicast: false
# Enable controlling SNAT IPs of Pod egress traffic.
Expand Down Expand Up @@ -3692,7 +3692,7 @@ spec:
kubectl.kubernetes.io/default-container: antrea-agent
# Automatically restart Pods with a RollingUpdate if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86
checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa
labels:
app: antrea
component: antrea-agent
Expand Down Expand Up @@ -3932,7 +3932,7 @@ spec:
annotations:
# Automatically restart Pod if the ConfigMap changes
# See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments
checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86
checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa
labels:
app: antrea
component: antrea-controller
Expand Down
11 changes: 10 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ func run(o *Options) error {
ipsecCertController = ipseccertificate.NewIPSecCertificateController(k8sClient, ovsBridgeClient, nodeConfig.Name)
}

// podUpdateChannel is a channel for receiving Pod updates from CNIServer and
// notifying NetworkPolicyController and EgressController to reconcile rules
// related to the updated Pods.
nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100)
nodeRouteController := noderoute.NewNodeRouteController(
k8sClient,
informerFactory,
Expand All @@ -262,6 +266,7 @@ func run(o *Options) error {
agentInitializer.GetWireGuardClient(),
o.config.AntreaProxy.ProxyAll,
ipsecCertController,
nodeUpdateChannel,
)

var mcRouteController *mcroute.MCRouteController
Expand Down Expand Up @@ -521,6 +526,8 @@ func run(o *Options) error {

go podUpdateChannel.Run(stopCh)

go nodeUpdateChannel.Run(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)
Expand Down Expand Up @@ -652,7 +659,9 @@ func run(o *Options) error {
ovsBridgeClient,
podUpdateChannel,
o.igmpQueryInterval,
validator)
validator,
networkConfig.TrafficEncapMode.SupportsEncap(),
nodeUpdateChannel)
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/wireguard"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/runtime"
Expand Down Expand Up @@ -84,6 +85,7 @@ type Controller struct {
// or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate
// to be configured before installing routes/flows to peer Nodes to prevent unencrypted traffic across Nodes.
ipsecCertificateManager ipseccertificate.Manager
nodeUpdateSubscriber channel.Notifier
}

// NewNodeRouteController instantiates a new Controller object which will process Node events
Expand All @@ -100,6 +102,7 @@ func NewNodeRouteController(
wireguardClient wireguard.Interface,
proxyAll bool,
ipsecCertificateManager ipseccertificate.Manager,
nodeUpdateSubscriber channel.Notifier,
) *Controller {
nodeInformer := informerFactory.Core().V1().Nodes()
svcLister := informerFactory.Core().V1().Services()
Expand All @@ -120,6 +123,7 @@ func NewNodeRouteController(
wireGuardClient: wireguardClient,
proxyAll: proxyAll,
ipsecCertificateManager: ipsecCertificateManager,
nodeUpdateSubscriber: nodeUpdateSubscriber,
}
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -325,6 +329,7 @@ func (c *Controller) reconcile() error {
if err := c.removeStaleWireGuardPeers(); err != nil {
return fmt.Errorf("error when removing stale WireGuard peers: %v", err)
}
c.notifyNodeList()
return nil
}

Expand Down Expand Up @@ -419,6 +424,7 @@ func (c *Controller) processNextWorkItem() bool {
// If no error occurs we Forget this item so it does not get queued again until
// another change happens.
c.queue.Forget(key)
c.notifyNodeList()
} else {
// Put the item back on the workqueue to handle any transient errors.
c.queue.AddRateLimited(key)
Expand Down Expand Up @@ -471,6 +477,9 @@ func (c *Controller) deleteNodeRoute(nodeName string) error {
if err := c.ofClient.UninstallNodeFlows(nodeName); err != nil {
return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err)
}
if err := c.routeClient.DeleteNodeIPs(nodeRouteInfo.nodeIPs.IPv4, nodeRouteInfo.nodeIPs.IPv6); err != nil {
return fmt.Errorf("failed to delete Node %s from ipset: %v", nodeName, err)
}
c.installedNodes.Delete(obj)

if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec {
Expand Down Expand Up @@ -618,6 +627,10 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
}
}

if err := c.routeClient.AddNodeIPs(peerNodeIPs.IPv4, peerNodeIPs.IPv6); err != nil {
return fmt.Errorf("failed to add Node %s in ipset: %v", nodeName, err)
}

c.installedNodes.Add(&nodeRouteInfo{
nodeName: nodeName,
podCIDRs: peerPodCIDRs,
Expand Down Expand Up @@ -819,3 +832,12 @@ func (c *Controller) getNodeTransportAddrs(node *corev1.Node) (*utilip.DualStack
}
return peerNodeIPs, nil
}

func (c *Controller) notifyNodeList() {
nodesMap := make(map[string]*utilip.DualStackIPs)
for _, n := range c.installedNodes.List() {
node := n.(*nodeRouteInfo)
nodesMap[node.nodeName] = node.nodeIPs
}
c.nodeUpdateSubscriber.Notify(nodesMap)
}
9 changes: 8 additions & 1 deletion pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
)

Expand Down Expand Up @@ -75,10 +76,11 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig) (*fakeCont
routeClient := routetest.NewMockInterface(ctrl)
interfaceStore := interfacestore.NewInterfaceStore()
ipsecCertificateManager := &fakeIPsecCertificateManager{}
nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100)
c := NewNodeRouteController(clientset, informerFactory, ofClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{
IPv4: nil,
MAC: gatewayMAC,
}}, nil, false, ipsecCertificateManager)
}}, nil, false, ipsecCertificateManager, nodeUpdateChannel)
return &fakeController{
Controller: c,
clientset: clientset,
Expand Down Expand Up @@ -145,6 +147,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.routeClient.EXPECT().AddNodeIPs(nodeIP1, nil).Times(1)
c.processNextWorkItem()

// Since node1 is not deleted yet, routes and flows for node2 shouldn't be installed as its PodCIDR is duplicate.
Expand All @@ -155,11 +158,13 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
c.clientset.CoreV1().Nodes().Delete(context.TODO(), node1.Name, metav1.DeleteOptions{})
c.ofClient.EXPECT().UninstallNodeFlows("node1").Times(1)
c.routeClient.EXPECT().DeleteRoutes(podCIDR).Times(1)
c.routeClient.EXPECT().DeleteNodeIPs(nodeIP1, nil).Times(1)
c.processNextWorkItem()

// After node1 is deleted, routes and flows should be installed for node2 successfully.
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node2", nodeIP2, podCIDRGateway).Times(1)
c.routeClient.EXPECT().AddNodeIPs(nodeIP2, nil).Times(1)
c.processNextWorkItem()
}()

Expand Down Expand Up @@ -222,11 +227,13 @@ func TestIPInPodSubnets(t *testing.T) {
c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.routeClient.EXPECT().AddNodeIPs(nodeIP1, nil).Times(1)
c.processNextWorkItem()

c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{})
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1)
c.routeClient.EXPECT().AddNodeIPs(nodeIP2, nil).Times(1)
c.processNextWorkItem()

assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.1.1")))
Expand Down
Loading

0 comments on commit c5e37e7

Please sign in to comment.