Skip to content

Commit

Permalink
[Multicast] support encap mode
Browse files Browse the repository at this point in the history
1, Use a single routine to send local multicast groups in an IGMP v3
   Report message to notify all the other Nodes in the cluster
2. Multicast controller maintains both local Pod members and remote
   Nodes which has Pod members for each multicast group found in the
   cluster
3. Add remote Node members in the OpenFlow group buckets.

Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Jun 28, 2022
1 parent f96c0a3 commit 5ac8704
Show file tree
Hide file tree
Showing 15 changed files with 1,059 additions and 162 deletions.
11 changes: 10 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ func run(o *Options) error {
ipsecCertController = ipseccertificate.NewIPSecCertificateController(k8sClient, ovsBridgeClient, nodeConfig.Name)
}

// podUpdateChannel is a channel for receiving Pod updates from CNIServer and
// notifying NetworkPolicyController and EgressController to reconcile rules
// related to the updated Pods.
nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100)
nodeRouteController := noderoute.NewNodeRouteController(
k8sClient,
informerFactory,
Expand All @@ -262,6 +266,7 @@ func run(o *Options) error {
agentInitializer.GetWireGuardClient(),
o.config.AntreaProxy.ProxyAll,
ipsecCertController,
nodeUpdateChannel,
)

var mcRouteController *mcroute.MCRouteController
Expand Down Expand Up @@ -521,6 +526,8 @@ func run(o *Options) error {

go podUpdateChannel.Run(stopCh)

go nodeUpdateChannel.Run(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)
Expand Down Expand Up @@ -652,7 +659,9 @@ func run(o *Options) error {
ovsBridgeClient,
podUpdateChannel,
o.igmpQueryInterval,
validator)
validator,
networkConfig.TrafficEncapMode.SupportsEncap(),
nodeUpdateChannel)
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/wireguard"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/runtime"
Expand Down Expand Up @@ -84,6 +85,7 @@ type Controller struct {
// or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate
// to be configured before installing routes/flows to peer Nodes to prevent unencrypted traffic across Nodes.
ipsecCertificateManager ipseccertificate.Manager
nodeUpdateSubscriber channel.Notifier
}

// NewNodeRouteController instantiates a new Controller object which will process Node events
Expand All @@ -100,6 +102,7 @@ func NewNodeRouteController(
wireguardClient wireguard.Interface,
proxyAll bool,
ipsecCertificateManager ipseccertificate.Manager,
nodeUpdateSubscriber channel.Notifier,
) *Controller {
nodeInformer := informerFactory.Core().V1().Nodes()
svcLister := informerFactory.Core().V1().Services()
Expand All @@ -120,6 +123,7 @@ func NewNodeRouteController(
wireGuardClient: wireguardClient,
proxyAll: proxyAll,
ipsecCertificateManager: ipsecCertificateManager,
nodeUpdateSubscriber: nodeUpdateSubscriber,
}
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -325,6 +329,7 @@ func (c *Controller) reconcile() error {
if err := c.removeStaleWireGuardPeers(); err != nil {
return fmt.Errorf("error when removing stale WireGuard peers: %v", err)
}
c.notifyNodeList()
return nil
}

Expand Down Expand Up @@ -419,6 +424,7 @@ func (c *Controller) processNextWorkItem() bool {
// If no error occurs we Forget this item so it does not get queued again until
// another change happens.
c.queue.Forget(key)
c.notifyNodeList()
} else {
// Put the item back on the workqueue to handle any transient errors.
c.queue.AddRateLimited(key)
Expand Down Expand Up @@ -819,3 +825,12 @@ func (c *Controller) getNodeTransportAddrs(node *corev1.Node) (*utilip.DualStack
}
return peerNodeIPs, nil
}

func (c *Controller) notifyNodeList() {
nodesMap := make(map[string]*utilip.DualStackIPs)
for _, n := range c.installedNodes.List() {
node := n.(*nodeRouteInfo)
nodesMap[node.nodeName] = node.nodeIPs
}
c.nodeUpdateSubscriber.Notify(nodesMap)
}
4 changes: 3 additions & 1 deletion pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
utilip "antrea.io/antrea/pkg/util/ip"
)

Expand Down Expand Up @@ -75,10 +76,11 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig) (*fakeCont
routeClient := routetest.NewMockInterface(ctrl)
interfaceStore := interfacestore.NewInterfaceStore()
ipsecCertificateManager := &fakeIPsecCertificateManager{}
nodeUpdateChannel := channel.NewSubscribableChannel("NodeUpdate", 100)
c := NewNodeRouteController(clientset, informerFactory, ofClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{
IPv4: nil,
MAC: gatewayMAC,
}}, nil, false, ipsecCertificateManager)
}}, nil, false, ipsecCertificateManager, nodeUpdateChannel)
return &fakeController{
Controller: c,
clientset: clientset,
Expand Down
Loading

0 comments on commit 5ac8704

Please sign in to comment.