Skip to content

Commit

Permalink
[Windows] CNI Server installs OpenFlow entries after PortStatus messa…
Browse files Browse the repository at this point in the history
…ge is received

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Oct 25, 2024
1 parent b4916eb commit cb09094
Show file tree
Hide file tree
Showing 25 changed files with 1,020 additions and 136 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func run(o *Options) error {
o.config.CNISocket,
o.config.HostProcPathPrefix,
nodeConfig,
localPodInformer.Get(),
k8sClient,
routeClient,
isChaining,
Expand Down
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.23.0

require (
antrea.io/libOpenflow v0.14.0
antrea.io/ofnet v0.12.0
antrea.io/libOpenflow v0.15.0
antrea.io/ofnet v0.14.0
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Mellanox/sriovnet v1.1.0
Expand Down Expand Up @@ -113,10 +113,9 @@ require (
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cenkalti/rpc2 v1.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE=
antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0=
antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao=
antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg=
antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI=
antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ=
antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc=
antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA=
github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8=
github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU=
github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k=
github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
26 changes: 19 additions & 7 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -76,9 +78,11 @@ type podConfigurator struct {
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
// Pod secondary network interfaces.
isSecondaryNetwork bool
podIfMonitor *podIfaceMonitor
}

func newPodConfigurator(
kubeClient clientset.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
routeClient route.Interface,
Expand All @@ -88,11 +92,13 @@ func newPodConfigurator(
isOvsHardwareOffloadEnabled bool,
disableTXChecksumOffload bool,
podUpdateNotifier channel.Notifier,
podLister v1.PodLister,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload)
if err != nil {
return nil, err
}
ifMonitor := newPodInterfaceMonitor(kubeClient, podLister, ofClient, ifaceStore, podUpdateNotifier)
return &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
Expand All @@ -101,6 +107,7 @@ func newPodConfigurator(
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podIfMonitor: ifMonitor,
}, nil
}

Expand Down Expand Up @@ -166,13 +173,13 @@ func getContainerIPsString(ips []net.IP) string {
// not created for a Pod interface.
func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
if portData.ExternalIDs == nil {
klog.V(2).Infof("OVS port %s has no external_ids", portData.Name)
klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name)
return nil
}

containerID, found := portData.ExternalIDs[ovsExternalIDContainerID]
if !found {
klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID)
klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name)
return nil
}

Expand All @@ -187,8 +194,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in

containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC])
if err != nil {
klog.Errorf("Failed to parse MAC address from OVS external config %s: %v",
portData.ExternalIDs[ovsExternalIDMAC], err)
klog.ErrorS(err, "Failed to parse MAC address from OVS external config")
}
podName, _ := portData.ExternalIDs[ovsExternalIDPodName]
podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace]
Expand Down Expand Up @@ -279,7 +285,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s
func (pc *podConfigurator) removeInterfaces(containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}

Expand Down Expand Up @@ -498,7 +504,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int.
func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error {
containerID := containerConfig.ContainerID
klog.V(2).Infof("Deleting Openflow entries for container %s", containerID)
klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID)
if !pc.isSecondaryNetwork {
if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil {
return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err)
Expand All @@ -513,6 +519,12 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil {
return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err)
}

// Remove unready Pod info from local cache when Pod is deleted. This is called only on Windows.
if pc.podIfMonitor != nil {
pc.podIfMonitor.deleteUnreadyPod(containerConfig.InterfaceName)
}

// Remove container configuration from cache.
pc.ifaceStore.DeleteInterface(containerConfig)
if !pc.isSecondaryNetwork {
Expand Down Expand Up @@ -558,7 +570,7 @@ func (pc *podConfigurator) connectInterceptedInterface(
func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}
for _, ip := range containerConfig.IPs {
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
fakeclientset "k8s.io/client-go/kubernetes/fake"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
ipamtest "antrea.io/antrea/pkg/agent/cniserver/ipam/testing"
Expand Down Expand Up @@ -682,12 +683,13 @@ func TestDeleteVLANSecondaryInterface(t *testing.T) {
}

func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator *fakeInterfaceConfigurator) *podConfigurator {
kubeClient := fakeclientset.NewSimpleClientset()
gwMAC, _ := net.ParseMAC("00:00:11:11:11:11")
mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller)
mockOFClient = openflowtest.NewMockClient(controller)
ifaceStore = interfacestore.NewInterfaceStore()
mockRoute = routetest.NewMockInterface(controller)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator, _ := newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil)
configurator.ifConfigurator = testIfaceConfigurator
return configurator
}
Expand Down
35 changes: 8 additions & 27 deletions pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,27 @@
package cniserver

import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/util/k8s"
)

// connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously
// in another goroutine. The function is for containerd runtime. The host interface is created after
// CNI call completes.
func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) error {
ovsPortName := ifConfig.InterfaceName
// Add the OVS port into unReadyPorts. This operation is performed before we update OVSDB, otherwise we
// need to think about the race condition between the current goroutine with the listener.
// Note, we may add OVS port into "unReadyOVSPorts" map even if the update OVSDB operation is failed,
// because it is also a case that the Pod's networking is not ready.
pc.podIfMonitor.addUnreadyPodInterface(ifConfig)
return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error {
if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil {
return err
}
ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName, true)
if err != nil {
return err
}
containerID := ifConfig.ContainerID
klog.V(2).Infof("Setting up Openflow entries for container %s", containerID)
if err := pc.ofClient.InstallPodFlows(ovsPortName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil {
return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err)
}
// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
// Notify the Pod update event to required components.
event := types.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
pc.podUpdateNotifier.Notify(event)
return nil
})
}
Expand All @@ -75,7 +57,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
// Because of this, we need to wait asynchronously for the interface to be created: we create the OVS port
// and set the OVS Interface type "" first, and change the OVS Interface type to "internal" to connect to the
// container interface after it is created. After OVS connects to the container interface, an OFPort is allocated.
klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID)
klog.V(2).InfoS("Adding OVS port for container", "port", ovsPortName, "container", containerID)
ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig)
portUUID, err := pc.createOVSPort(ovsPortName, ovsAttachInfo, containerConfig.VLANID)
if err != nil {
Expand Down Expand Up @@ -105,7 +87,7 @@ func (pc *podConfigurator) configureInterfaces(
// See: https://github.com/kubernetes/kubernetes/issues/57253#issuecomment-358897721.
interfaceConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if found {
klog.V(2).Infof("Found an existing OVS port for container %s, returning", containerID)
klog.V(2).InfoS("Found an existing OVS port for container, returning", "container", containerID)
mac := interfaceConfig.MAC.String()
hostIface := &current.Interface{
Name: interfaceConfig.InterfaceName,
Expand All @@ -128,9 +110,8 @@ func (pc *podConfigurator) configureInterfaces(
func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) {
for i := range ifConfigs {
ifaceConfig := ifConfigs[i]
pod := k8s.NamespacedName(ifaceConfig.PodNamespace, ifaceConfig.PodName)
if err := pc.connectInterfaceToOVSAsync(ifaceConfig, containerAccess); err != nil {
klog.Errorf("Failed to reconcile Pod %s: %v", pod, err)
klog.ErrorS(err, "Failed to reconcile Pod", "name", ifaceConfig.PodName, "Namespace", ifaceConfig.PodNamespace)
}
}
}
Loading

0 comments on commit cb09094

Please sign in to comment.