Skip to content

Commit

Permalink
[Windows] CNI Server installs OpenFlow entries after PortStatus messa…
Browse files Browse the repository at this point in the history
…ge is received

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Oct 23, 2024
1 parent 90b1cb9 commit 4fcbe6f
Show file tree
Hide file tree
Showing 21 changed files with 731 additions and 73 deletions.
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.23.0

require (
antrea.io/libOpenflow v0.14.0
antrea.io/ofnet v0.12.0
antrea.io/libOpenflow v0.15.0
antrea.io/ofnet v0.14.0
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Mellanox/sriovnet v1.1.0
Expand Down Expand Up @@ -113,10 +113,9 @@ require (
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cenkalti/rpc2 v1.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE=
antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0=
antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao=
antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg=
antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI=
antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ=
antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc=
antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA=
github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8=
github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU=
github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k=
github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -76,9 +77,11 @@ type podConfigurator struct {
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
// Pod secondary network interfaces.
isSecondaryNetwork bool
podIfMonitor *podIfaceMonitor
}

func newPodConfigurator(
kubeClient clientset.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
routeClient route.Interface,
Expand All @@ -93,6 +96,7 @@ func newPodConfigurator(
if err != nil {
return nil, err
}
ifMonitor := newPodInterfaceMonitor(kubeClient, ofClient, ifaceStore, podUpdateNotifier)
return &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
Expand All @@ -101,6 +105,7 @@ func newPodConfigurator(
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podIfMonitor: ifMonitor,
}, nil
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
agenttypes "antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/util/channel"
)

// connectInterfaceToOVS connects an existing interface to the OVS bridge.
Expand Down Expand Up @@ -113,3 +116,16 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte
klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName)
}
}

type podIfaceMonitor struct {
}

func newPodInterfaceMonitor(_ clientset.Interface,
_ openflow.Client,
_ interfacestore.InterfaceStore,
_ channel.Notifier,
) *podIfaceMonitor {
return &podIfaceMonitor{}
}

func (pc *podIfaceMonitor) monitorUnReadyInterface(stopCh <-chan struct{}) {}
4 changes: 3 additions & 1 deletion pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
fakeclientset "k8s.io/client-go/kubernetes/fake"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
ipamtest "antrea.io/antrea/pkg/agent/cniserver/ipam/testing"
Expand Down Expand Up @@ -682,12 +683,13 @@ func TestDeleteVLANSecondaryInterface(t *testing.T) {
}

func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator *fakeInterfaceConfigurator) *podConfigurator {
kubeClient := fakeclientset.NewSimpleClientset()
gwMAC, _ := net.ParseMAC("00:00:11:11:11:11")
mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller)
mockOFClient = openflowtest.NewMockClient(controller)
ifaceStore = interfacestore.NewInterfaceStore()
mockRoute = routetest.NewMockInterface(controller)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator, _ := newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator.ifConfigurator = testIfaceConfigurator
return configurator
}
Expand Down
205 changes: 186 additions & 19 deletions pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,46 @@
package cniserver

import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"time"

"antrea.io/libOpenflow/openflow15"
current "github.com/containernetworking/cni/pkg/types/100"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

const (
podNotReadyTimeInSeconds = 30
)

// connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously
// in another goroutine. The function is for containerd runtime. The host interface is created after
// CNI call completes.
func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) error {
ovsPortName := ifConfig.InterfaceName
// Add the OVS port into unReadyPorts. This operation is performed before we update OVSDB, otherwise we
// need to think about the race condition between the current goroutine with the listener.
// Note, we may add OVS port into "unReadyOVSPorts" map even if the update OVSDB operation is failed,
// because it is also a case that the Pod's networking is not ready.
pc.podIfMonitor.addUnReadyPodInterface(ifConfig)
return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error {
if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil {
return err
}
ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName, true)
if err != nil {
return err
}
containerID := ifConfig.ContainerID
klog.V(2).Infof("Setting up Openflow entries for container %s", containerID)
if err := pc.ofClient.InstallPodFlows(ovsPortName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil {
return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err)
}
// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
// Notify the Pod update event to required components.
event := types.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
pc.podUpdateNotifier.Notify(event)
return nil
})
}
Expand Down Expand Up @@ -134,3 +135,169 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte
}
}
}

type unReadyPodInfo struct {
podName string
podNamespace string
annotated bool
createTime time.Time
}

type podIfaceMonitor struct {
kubeClient clientset.Interface
ifaceStore interfacestore.InterfaceStore
ofClient openflow.Client
podUpdateNotifier channel.Notifier

// unReadyInterfaces is a map to store the OVS ports which is waiting for the PortStatus from OpenFlow switch.
// The key in the map is the OVS port name, and its value is unReadyPodInfo.
// It is used only on Windows now.
unReadyInterfaces sync.Map
statusCh chan *openflow15.PortStatus
}

func newPodInterfaceMonitor(kubeClient clientset.Interface,
ofClient openflow.Client,
ifaceStore interfacestore.InterfaceStore,
podUpdateNotifier channel.Notifier,
) *podIfaceMonitor {
statusCh := make(chan *openflow15.PortStatus)
ofClient.SubscribeOFPortStatusMessage(statusCh)
return &podIfaceMonitor{
kubeClient: kubeClient,
ofClient: ofClient,
ifaceStore: ifaceStore,
podUpdateNotifier: podUpdateNotifier,
unReadyInterfaces: sync.Map{},
statusCh: statusCh,
}
}

func (m *podIfaceMonitor) monitorUnReadyInterface(stopCh <-chan struct{}) {
klog.Info("Started the monitor to wait for new OpenFlow ports")
go func() {
for {
select {
case <-stopCh:
return
case status := <-m.statusCh:
klog.V(2).InfoS("Received PortStatus message", "message", status)
// Update Pod OpenFlow entries only after the OpenFlow port state is live.
if status.Desc.State == openflow15.PS_LIVE {
m.updateUnReadyPod(status)
}
case <-time.Tick(time.Second * 5):
m.checkUnReadyPods()
}
}
}()
}

func (m *podIfaceMonitor) updatePodFlows(ifName string, ofPort int32) error {
ifConfig, found := m.ifaceStore.GetInterfaceByName(ifName)
if !found {
klog.Info("Interface config is not found", "name", ifName)
return nil
}
containerID := ifConfig.ContainerID

// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
m.ifaceStore.UpdateInterface(ifConfig)

// Install OpenFlow entries for the Pod.
klog.V(2).Infof("Setting up Openflow entries for container %s", containerID)
if err := m.ofClient.InstallPodFlows(ifName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil {
return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err)
}

// Notify the Pod update event to required components.
event := types.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
m.podUpdateNotifier.Notify(event)

// Remove the annotation from Pod if exists.
m.updatePodUnreadyAnnotation(ifConfig.PodNamespace, ifConfig.PodName, false)
return nil
}

func (m *podIfaceMonitor) updatePodUnreadyAnnotation(podNamespace, podName string, addAnnotation bool) {
pod, err := m.kubeClient.CoreV1().Pods(podNamespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get Pod when trying to update 'unready' annotations", "Namespace", podNamespace, "Name", podName)
return
}

annotated := false
if pod.Annotations != nil {
_, annotated = pod.Annotations[types.PodNotReadyAnnotationKey]
}

if addAnnotation && !annotated {
// Add the annotation on Pod with '"pod.antrea.io/not-ready": ""'
patch, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{types.PodNotReadyAnnotationKey: ""},
},
})
m.kubeClient.CoreV1().Pods(podNamespace).Patch(context.Background(), podName, apitypes.MergePatchType, patch, metav1.PatchOptions{})
} else if !addAnnotation && annotated {
// Remove the annotation on Pod with '"pod.antrea.io/not-ready": ""'
patch, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{types.PodNotReadyAnnotationKey: nil},
},
})
m.kubeClient.CoreV1().Pods(podNamespace).Patch(context.Background(), podName, apitypes.MergePatchType, patch, metav1.PatchOptions{})
}
}

func (m *podIfaceMonitor) updateUnReadyPod(status *openflow15.PortStatus) {
ovsPort := string(bytes.Trim(status.Desc.Name, "\x00"))
obj, found := m.unReadyInterfaces.Load(ovsPort)
if !found {
klog.InfoS("OVS port is not found", "ovsPort", ovsPort)
return
}
podInfo := obj.(*unReadyPodInfo)
ofPort := status.Desc.PortNo
if err := m.updatePodFlows(ovsPort, int32(ofPort)); err != nil {
klog.ErrorS(err, "Failed to update Pod's OpenFlow entries", "PodName", podInfo.podName, "PodNamespace", podInfo.podNamespace, "OVSPort", ovsPort)
return
}
// Delete the Pod from unReadyPods
m.unReadyInterfaces.Delete(ovsPort)
}

func (m *podIfaceMonitor) checkUnReadyPods() {
m.unReadyInterfaces.Range(func(key, value any) bool {
podInfo := value.(*unReadyPodInfo)
if !podInfo.annotated && time.Now().Sub(podInfo.createTime).Seconds() > podNotReadyTimeInSeconds {
m.updatePodUnreadyAnnotation(podInfo.podNamespace, podInfo.podName, true)
podInfo.annotated = true
m.unReadyInterfaces.Store(key, podInfo)
}
return true
})
}

func (m *podIfaceMonitor) addUnReadyPodInterface(ifConfig *interfacestore.InterfaceConfig) {
klog.InfoS("Added OVS port into unready interfaces", "ovsPort", ifConfig.InterfaceName,
"podName", ifConfig.PodName, "podNamespace", ifConfig.PodNamespace)
m.unReadyInterfaces.Store(ifConfig.InterfaceName, &unReadyPodInfo{
podName: ifConfig.PodName,
podNamespace: ifConfig.PodNamespace,
annotated: false,
createTime: time.Now(),
})
}

// getPortStatusCh returns the channel used to receive OpenFlow.PortStatus message.
// This function is added for test.
func (m *podIfaceMonitor) getPortStatusCh() chan *openflow15.PortStatus {
return m.statusCh
}
Loading

0 comments on commit 4fcbe6f

Please sign in to comment.