Skip to content

Commit

Permalink
🌈 feat: StatelessCNI: Applying stateless CNI mode changes in network …
Browse files Browse the repository at this point in the history
…package. (#2197)

* Apllying stateless CNI mode in network package.

* Addresing the commetns.
  • Loading branch information
behzad-mir committed Oct 5, 2023
1 parent 8104b1c commit 476cfa7
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 18 deletions.
11 changes: 7 additions & 4 deletions cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
options := make(map[string]any)
networkID, err = plugin.getNetworkName(args.Netns, &ipamAddResult, nwCfg)

endpointID := GetEndpointID(args)
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
policies := cni.GetPoliciesFromNwCfg(nwCfg.AdditionalArgs)

// Check whether the network already exists.
Expand Down Expand Up @@ -1050,12 +1050,15 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
// Log the error but return success if the network is not found.
// if cni hits this, mostly state file would be missing and it can be reboot scenario where
// container runtime tries to delete and create pods which existed before reboot.
// this condition will not apply to stateless CNI since the network struct will be crated on each call
err = nil
return err
if !plugin.nm.IsStatelessCNIMode() {
return err
}
}
}

endpointID := GetEndpointID(args)
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
// Query the endpoint.
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID); err != nil {
logger.Info("[cni-net] GetEndpoint",
Expand Down Expand Up @@ -1086,7 +1089,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
zap.String("endpointID", endpointID))
sendEvent(plugin, fmt.Sprintf("Deleting endpoint:%v", endpointID))
// Delete the endpoint.
if err = plugin.nm.DeleteEndpoint(networkID, endpointID); err != nil {
if err = plugin.nm.DeleteEndpoint(networkID, endpointID, epInfo); err != nil {
// return a retriable error so the container runtime will retry this DEL later
// the implementation of this function returns nil if the endpoint doens't exist, so
// we don't have to check that here
Expand Down
2 changes: 1 addition & 1 deletion cnm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (plugin *netPlugin) deleteEndpoint(w http.ResponseWriter, r *http.Request)
}

// Process request.
err = plugin.nm.DeleteEndpoint(req.NetworkID, req.EndpointID)
err = plugin.nm.DeleteEndpoint(req.NetworkID, req.EndpointID, nil)
if err != nil {
plugin.SendErrorResponse(w, err)
return
Expand Down
1 change: 1 addition & 0 deletions network/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type EndpointInfo struct {
VnetCidrs string
ServiceCidrs string
NATInfo []policy.NATInfo
HNSEndpointID string
}

// RouteInfo contains information about an IP route.
Expand Down
149 changes: 137 additions & 12 deletions network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,39 @@
package network

import (
"context"
"net"
"sync"
"time"

cnms "github.com/Azure/azure-container-networking/cnms/cnmspackage"
cnsclient "github.com/Azure/azure-container-networking/cns/client"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/netio"
"github.com/Azure/azure-container-networking/netlink"
"github.com/Azure/azure-container-networking/platform"
"github.com/Azure/azure-container-networking/store"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
// Network store key.
storeKey = "Network"
VlanIDKey = "VlanID"
AzureCNS = "azure-cns"
SNATIPKey = "NCPrimaryIPKey"
RoutesKey = "RoutesKey"
IPTablesKey = "IPTablesKey"
genericData = "com.docker.network.generic"
ipv6AddressMask = 128
storeKey = "Network"
VlanIDKey = "VlanID"
AzureCNS = "azure-cns"
SNATIPKey = "NCPrimaryIPKey"
RoutesKey = "RoutesKey"
IPTablesKey = "IPTablesKey"
genericData = "com.docker.network.generic"
ipv6AddressMask = 128
cnsBaseURL = "" // fallback to default http://localhost:10090
cnsReqTimeout = 15 * time.Second
StateLessCNIIsNotSet = "StateLess CNI mode is not enabled"
InfraInterfaceName = "eth0"
ContainerIDLength = 8
EndpointIfIndex = 0 // Azure CNI supports only one interface
)

var Ipv4DefaultRouteDstPrefix = net.IPNet{
Expand Down Expand Up @@ -61,6 +71,8 @@ type EndpointClient interface {

// NetworkManager manages the set of container networking resources.
type networkManager struct {
StatelessCniMode bool
CnsClient *cnsclient.Client
Version string
TimeStamp time.Time
ExternalInterfaces map[string]*externalInterface
Expand All @@ -86,7 +98,7 @@ type NetworkManager interface {
GetNumEndpointsByContainerID(containerID string) int

CreateEndpoint(client apipaClient, networkID string, epInfo *EndpointInfo) error
DeleteEndpoint(networkID string, endpointID string) error
DeleteEndpoint(networkID string, endpointID string, epInfo *EndpointInfo) error
GetEndpointInfo(networkID string, endpointID string) (*EndpointInfo, error)
GetAllEndpoints(networkID string) (map[string]*EndpointInfo, error)
GetEndpointInfoBasedOnPODDetails(networkID string, podName string, podNameSpace string, doExactMatchForPodName bool) (*EndpointInfo, error)
Expand All @@ -95,6 +107,8 @@ type NetworkManager interface {
UpdateEndpoint(networkID string, existingEpInfo *EndpointInfo, targetEpInfo *EndpointInfo) error
GetNumberOfEndpoints(ifName string, networkID string) int
SetupNetworkUsingState(networkMonitor *cnms.NetworkMonitor) error
GetEndpointID(containerID, ifName string) string
IsStatelessCNIMode() bool
}

// Creates a new network manager.
Expand All @@ -113,6 +127,10 @@ func NewNetworkManager(nl netlink.NetlinkInterface, plc platform.ExecClient, net
func (nm *networkManager) Initialize(config *common.PluginConfig, isRehydrationRequired bool) error {
nm.Version = config.Version
nm.store = config.Store
if config.Stateless {
err := nm.SetStatelessCNIMode()
return errors.Wrapf(err, "Failed to initialize stateles CNI")
}

// Restore persisted state.
err := nm.restore(isRehydrationRequired)
Expand All @@ -123,6 +141,23 @@ func (nm *networkManager) Initialize(config *common.PluginConfig, isRehydrationR
func (nm *networkManager) Uninitialize() {
}

// SetStatelessCNIMode enable the statelessCNI falg and inititlizes a CNSClient
func (nm *networkManager) SetStatelessCNIMode() error {
nm.StatelessCniMode = true
// Create CNS client
client, err := cnsclient.New(cnsBaseURL, cnsReqTimeout)
if err != nil {
return errors.Wrapf(err, "failed to initialize CNS client")
}
nm.CnsClient = client
return nil
}

// IsStatelessCNIMode checks if the Stateless CNI mode has been enabled or not
func (nm *networkManager) IsStatelessCNIMode() bool {
return nm.StatelessCniMode
}

// Restore reads network manager state from persistent store.
func (nm *networkManager) restore(isRehydrationRequired bool) error {
// Skip if a store is not provided.
Expand Down Expand Up @@ -221,6 +256,10 @@ func (nm *networkManager) restore(isRehydrationRequired bool) error {

// Save writes network manager state to persistent store.
func (nm *networkManager) save() error {
// CNI is not maintaining the state in Steless Mode.
if nm.IsStatelessCNIMode() {
return nil
}
// Skip if a store is not provided.
if nm.store == nil {
return nil
Expand Down Expand Up @@ -338,16 +377,19 @@ func (nm *networkManager) CreateEndpoint(cli apipaClient, networkID string, epIn

if nw.VlanId != 0 {
if epInfo.Data[VlanIDKey] == nil {
logger.Info("overriding endpoint vlanid with network vlanid")
epInfo.Data[VlanIDKey] = nw.VlanId
}
}

_, err = nw.newEndpoint(cli, nm.netlink, nm.plClient, nm.netio, epInfo)
ep, err := nw.newEndpoint(cli, nm.netlink, nm.plClient, nm.netio, epInfo)
if err != nil {
return err
}

if nm.IsStatelessCNIMode() {
return nm.UpdateEndpointState(ep)
}

err = nm.save()
if err != nil {
return err
Expand All @@ -356,11 +398,27 @@ func (nm *networkManager) CreateEndpoint(cli apipaClient, networkID string, epIn
return nil
}

// UpdateEndpointState will make a call to CNS updatEndpointState API in the stateless CNI mode
// It will add HNSEndpointID or HostVeth name to the endpoint state
func (nm *networkManager) UpdateEndpointState(ep *endpoint) error {
logger.Info("Calling cns updateEndpoint API with ", zap.String("containerID: ", ep.ContainerID), zap.String("HnsId: ", ep.HnsId), zap.String("HostIfName: ", ep.HostIfName))
response, err := nm.CnsClient.UpdateEndpoint(context.TODO(), ep.ContainerID, ep.HnsId, ep.HostIfName)
if err != nil {
return errors.Wrapf(err, "Update endpoint API returend with error")
}
logger.Info("Update endpoint API returend ", zap.String("podname: ", response.ReturnCode.String()))
return nil
}

// DeleteEndpoint deletes an existing container endpoint.
func (nm *networkManager) DeleteEndpoint(networkID, endpointID string) error {
func (nm *networkManager) DeleteEndpoint(networkID, endpointID string, epInfo *EndpointInfo) error {
nm.Lock()
defer nm.Unlock()

if nm.IsStatelessCNIMode() {
return nm.DeleteEndpointState(networkID, epInfo)
}

nw, err := nm.getNetwork(networkID)
if err != nil {
return err
Expand All @@ -379,11 +437,64 @@ func (nm *networkManager) DeleteEndpoint(networkID, endpointID string) error {
return nil
}

func (nm *networkManager) DeleteEndpointState(networkID string, epInfo *EndpointInfo) error {
nw := &network{
Id: networkID,
Mode: opModeTransparentVlan,
SnatBridgeIP: "",
extIf: &externalInterface{
Name: InfraInterfaceName,
MacAddress: nil,
},
}

ep := &endpoint{
Id: epInfo.Id,
HnsId: epInfo.HNSEndpointID,
HostIfName: epInfo.IfName,
LocalIP: "",
VlanID: 0,
AllowInboundFromHostToNC: false,
AllowInboundFromNCToHost: false,
EnableSnatOnHost: false,
EnableMultitenancy: false,
NetworkContainerID: epInfo.Id,
}
logger.Info("Deleting endpoint with", zap.String("Endpoint Info: ", epInfo.PrettyString()), zap.String("HNISID : ", ep.HnsId))
return nw.deleteEndpointImpl(netlink.NewNetlink(), platform.NewExecClient(), nil, ep)
}

// GetEndpointInfo returns information about the given endpoint.
func (nm *networkManager) GetEndpointInfo(networkId string, endpointId string) (*EndpointInfo, error) {
nm.Lock()
defer nm.Unlock()

if nm.IsStatelessCNIMode() {
logger.Info("calling cns getEndpoint API")
endpointResponse, err := nm.CnsClient.GetEndpoint(context.TODO(), endpointId)
if err != nil {
return nil, errors.Wrapf(err, "Get endpoint API returend with error")
}
epInfo := &EndpointInfo{
Id: endpointId,
IfIndex: EndpointIfIndex, // Azure CNI supports only one interface
IfName: endpointResponse.EndpointInfo.HostVethName,
ContainerID: endpointId,
PODName: endpointResponse.EndpointInfo.PodName,
PODNameSpace: endpointResponse.EndpointInfo.PodNamespace,
NetworkContainerID: endpointId,
HNSEndpointID: endpointResponse.EndpointInfo.HnsEndpointID,
}

for _, ip := range endpointResponse.EndpointInfo.IfnameToIPMap {
epInfo.IPAddresses = ip.IPv4
epInfo.IPAddresses = append(epInfo.IPAddresses, ip.IPv6...)

}
logger.Info("returning getEndpoint API with", zap.String("Endpoint Info: ", epInfo.PrettyString()), zap.String("HNISID : ", epInfo.HNSEndpointID))
return epInfo, nil
}

nw, err := nm.getNetwork(networkId)
if err != nil {
return nil, err
Expand Down Expand Up @@ -543,3 +654,17 @@ func (nm *networkManager) GetNumberOfEndpoints(ifName string, networkId string)
func (nm *networkManager) SetupNetworkUsingState(networkMonitor *cnms.NetworkMonitor) error {
return nm.monitorNetworkState(networkMonitor)
}

// GetEndpointID returns a unique endpoint ID based on the CNI mode.
func (nm *networkManager) GetEndpointID(containerID, ifName string) string {
if nm.IsStatelessCNIMode() {
return containerID
}
if len(containerID) > ContainerIDLength {
containerID = containerID[:ContainerIDLength]
} else {
log.Printf("Container ID is not greater than 8 ID: %v", containerID)
return ""
}
return containerID + "-" + ifName
}
20 changes: 19 additions & 1 deletion network/manager_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,29 @@ func (nm *MockNetworkManager) CreateEndpoint(_ apipaClient, networkID string, ep
}

// DeleteEndpoint mock
func (nm *MockNetworkManager) DeleteEndpoint(networkID, endpointID string) error {
func (nm *MockNetworkManager) DeleteEndpoint(networkID string, endpointID string, ep *EndpointInfo) error {
delete(nm.TestEndpointInfoMap, endpointID)
return nil
}

// SetStatelessCNIMode enable the statelessCNI falg and inititlizes a CNSClient
func (nm *MockNetworkManager) SetStatelessCNIMode() error {
return nil
}

// IsStatelessCNIMode checks if the Stateless CNI mode has been enabled or not
func (nm *MockNetworkManager) IsStatelessCNIMode() bool {
return false
}

// GetEndpointID returns the ContainerID value
func (nm *MockNetworkManager) GetEndpointID(containerID, ifName string) string {
if nm.IsStatelessCNIMode() {
return containerID
}
return containerID + "-" + ifName
}

func (nm *MockNetworkManager) GetAllEndpoints(networkID string) (map[string]*EndpointInfo, error) {
return nm.TestEndpointInfoMap, nil
}
Expand Down

0 comments on commit 476cfa7

Please sign in to comment.