Skip to content

Commit

Permalink
Implement NPL agent unification.
Browse files Browse the repository at this point in the history
* Unify agent behavior across Linux and Windows. Linux agent should support
allocating different nodeports for different protocols when the podports are the same.
* Replace map with cache.indexer for cachetable to reduce repeated insertion.
* Update port allocation related unit tests.
* Enable windows e2e test.
* Delete unused functions.

Signed-off-by: Shuyang Xin <[email protected]>
  • Loading branch information
XinShuYang committed Aug 4, 2022
1 parent 4b788e7 commit 16d5be1
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 299 deletions.
9 changes: 4 additions & 5 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,10 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
entries := c.portTable.GetDataForPodIP(podIP)
if nplExists {
for _, data := range entries {
for _, proto := range data.Protocols {
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %v", podIP, data.PodPort, proto.Protocol, err)
}
proto := data.Protocol
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %v", podIP, data.PodPort, proto.Protocol, err)
}
}
}
Expand Down
50 changes: 28 additions & 22 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ const (

func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener portcache.LocalPortOpener) *portcache.PortTable {
return &portcache.PortTable{
NodePortTable: make(map[string]*portcache.NodePortData),
PodEndpointTable: make(map[string]*portcache.NodePortData),
StartPort: defaultStartPort,
EndPort: defaultEndPort,
PortSearchStart: defaultStartPort,
PodPortRules: mockIPTables,
LocalPortOpener: mockPortOpener,
PortTableCache: cache.NewIndexer(portcache.GetPortTableKey, cache.Indexers{
portcache.NodePortIndex: portcache.NodePortIndexFunc,
portcache.PodEndpointIndex: portcache.PodEndpointIndexFunc,
portcache.PodIPIndex: portcache.PodIPIndexFunc,
}),
StartPort: defaultStartPort,
EndPort: defaultEndPort,
PortSearchStart: defaultStartPort,
PodPortRules: mockIPTables,
LocalPortOpener: mockPortOpener,
}
}

Expand Down Expand Up @@ -657,8 +660,8 @@ func TestMultiplePods(t *testing.T) {

// TestMultipleProtocols creates multiple Pods with multiple protocols and verifies that
// NPL annotations and iptable rules for both Pods and Protocols are updated correctly.
// In particular we make sure that a given NodePort is never used by more than one Pod,
// irrespective of which protocol is in use.
// In particular we make sure that a given NodePort is never used by more than one Pod.
// One Pod could use multiple NodePorts for different protocols with the same Pod port.
func TestMultipleProtocols(t *testing.T) {
tcpUdpSvcLabel := map[string]string{"tcp": "true", "udp": "true"}
udpSvcLabel := map[string]string{"tcp": "false", "udp": "true"}
Expand Down Expand Up @@ -702,8 +705,7 @@ func TestMultipleProtocols(t *testing.T) {
assert.True(t, testData.portTable.RuleExists(testPod2.Status.PodIP, defaultPort, protocolUDP))

// Update testSvc2 to serve TCP/80 and UDP/81 both, so pod2 is
// exposed on both TCP and UDP, with the same NodePort.

// exposed on both TCP and UDP, with different NodePorts.
testSvc2.Spec.Ports = append(testSvc2.Spec.Ports, corev1.ServicePort{
Port: 80,
Protocol: corev1.ProtocolTCP,
Expand All @@ -716,7 +718,16 @@ func TestMultipleProtocols(t *testing.T) {

pod2ValueUpdate, err := testData.pollForPodAnnotation(testPod2.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotationsPod2.Add(&pod2Value[0].NodePort, defaultPort, protocolTCP)

// The new NodePort should be the next available port number,
// because the implementation allocates ports sequentially.
var pod2nodeport int
if pod1Value[0].NodePort > pod2Value[0].NodePort {
pod2nodeport = pod1Value[0].NodePort + 1
} else {
pod2nodeport = pod2Value[0].NodePort + 1
}
expectedAnnotationsPod2.Add(&pod2nodeport, defaultPort, protocolTCP)
expectedAnnotationsPod2.Check(t, pod2ValueUpdate)
}

Expand Down Expand Up @@ -761,22 +772,17 @@ var (
portTakenError = fmt.Errorf("Port taken")
)

// TestNodePortAlreadyBoundTo validates that when a port is already bound to, a different port will
// be selected for NPL.
// TestNodePortAlreadyBoundTo validates that when a port with TCP protocol is already bound to,
// the next port should be selected for NPL if the same protocol is available.
func TestNodePortAlreadyBoundTo(t *testing.T) {
nodePort1 := defaultStartPort
nodePort2 := nodePort1 + 1
testConfig := newTestConfig().withCustomPortOpenerExpectations(func(mockPortOpener *portcachetesting.MockLocalPortOpener) {
gomock.InOrder(
// Based on the implementation, we know that TCP is checked first...
// 1. port1 is checked for TCP availability -> success
mockPortOpener.EXPECT().OpenLocalPort(nodePort1, protocolTCP).Return(&fakeSocket{}, nil),
// 2. port1 is checked for UDP availability (even if the Service uses TCP only) -> error
mockPortOpener.EXPECT().OpenLocalPort(nodePort1, protocolUDP).Return(nil, portTakenError),
// 3. port2 is checked for TCP availability -> success
// 1. port1 is checked for TCP availability -> error
mockPortOpener.EXPECT().OpenLocalPort(nodePort1, protocolTCP).Return(nil, portTakenError),
// 2. port2 is checked for TCP availability -> success
mockPortOpener.EXPECT().OpenLocalPort(nodePort2, protocolTCP).Return(&fakeSocket{}, nil),
// 4. port2 is checked for UDP availability -> success
mockPortOpener.EXPECT().OpenLocalPort(nodePort2, protocolUDP).Return(&fakeSocket{}, nil),
)
})
customNodePort := defaultStartPort + 1
Expand Down
175 changes: 125 additions & 50 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ import (
"net"
"sync"

"antrea.io/antrea/pkg/agent/nodeportlocal/rules"

"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

"antrea.io/antrea/pkg/agent/nodeportlocal/rules"
const (
NodePortIndex = "nodePortIndex"
PodEndpointIndex = "podEndpointIndex"
PodIPIndex = "podIPIndex"
)

// protocolSocketState represents the state of the socket corresponding to a
Expand All @@ -36,26 +43,16 @@ type ProtocolSocketData struct {
}

type NodePortData struct {
NodePort int
PodPort int
PodIP string
Protocols []ProtocolSocketData
}

func (d *NodePortData) FindProtocol(protocol string) *ProtocolSocketData {
for idx, protocolSocketData := range d.Protocols {
if protocolSocketData.Protocol == protocol {
return &d.Protocols[idx]
}
}
return nil
NodePort int
PodPort int
PodIP string
Protocol ProtocolSocketData
}

func (d *NodePortData) ProtocolInUse(protocol string) bool {
for _, protocolSocketData := range d.Protocols {
if protocolSocketData.Protocol == protocol {
return protocolSocketData.State == stateInUse
}
protocolSocketData := d.Protocol
if protocolSocketData.Protocol == protocol {
return protocolSocketData.State == stateInUse
}
return false
}
Expand All @@ -67,25 +64,102 @@ type LocalPortOpener interface {
type localPortOpener struct{}

type PortTable struct {
NodePortTable map[string]*NodePortData
PodEndpointTable map[string]*NodePortData
StartPort int
EndPort int
PortSearchStart int
PodPortRules rules.PodPortRules
LocalPortOpener LocalPortOpener
tableLock sync.RWMutex
PortTableCache cache.Indexer
StartPort int
EndPort int
PortSearchStart int
PodPortRules rules.PodPortRules
LocalPortOpener LocalPortOpener
tableLock sync.RWMutex
}

func GetPortTableKey(obj interface{}) (string, error) {
npData := obj.(*NodePortData)
key := fmt.Sprintf("%d:%s:%d:%s", npData.NodePort, npData.PodIP, npData.PodPort, npData.Protocol.Protocol)
return key, nil
}

func (pt *PortTable) addPortTableCache(npData *NodePortData) error {
if err := pt.PortTableCache.Add(npData); err != nil {
return err
}
return nil
}

func (pt *PortTable) delPortTableCache(npData *NodePortData) error {
if err := pt.PortTableCache.Delete(npData); err != nil {
return err
}
return nil
}

func (pt *PortTable) getPortTableCacheFromNodePortIndex(index string) (*NodePortData, bool) {
objs, _ := pt.PortTableCache.ByIndex(NodePortIndex, index)
if len(objs) == 0 {
return nil, false
}
return objs[0].(*NodePortData), true
}

func (pt *PortTable) getPortTableCacheFromPodEndpointIndex(index string) (*NodePortData, bool) {
objs, _ := pt.PortTableCache.ByIndex(PodEndpointIndex, index)
if len(objs) == 0 {
return nil, false
}
return objs[0].(*NodePortData), true
}

func (pt *PortTable) getPortTableCacheFromPodIPIndex(index string) ([]*NodePortData, bool) {
var npData []*NodePortData
objs, _ := pt.PortTableCache.ByIndex(PodIPIndex, index)
if len(objs) == 0 {
return nil, false
}
for _, obj := range objs {
npData = append(npData, obj.(*NodePortData))
}
return npData, true
}

func (pt *PortTable) releaseDataFromPortTableCache() error {
for _, obj := range pt.PortTableCache.List() {
data := obj.(*NodePortData)
if err := pt.delPortTableCache(data); err != nil {
return err
}
}
return nil
}

func NodePortIndexFunc(obj interface{}) ([]string, error) {
npData := obj.(*NodePortData)
nodePortTuple := NodePortProtoFormat(npData.NodePort, npData.Protocol.Protocol)
return []string{nodePortTuple}, nil
}

func PodEndpointIndexFunc(obj interface{}) ([]string, error) {
npData := obj.(*NodePortData)
podEndpointTuple := podIPPortProtoFormat(npData.PodIP, npData.PodPort, npData.Protocol.Protocol)
return []string{podEndpointTuple}, nil
}

func PodIPIndexFunc(obj interface{}) ([]string, error) {
npData := obj.(*NodePortData)
return []string{npData.PodIP}, nil
}

func NewPortTable(start, end int) (*PortTable, error) {
ptable := PortTable{
NodePortTable: make(map[string]*NodePortData),
PodEndpointTable: make(map[string]*NodePortData),
StartPort: start,
EndPort: end,
PortSearchStart: start,
PodPortRules: rules.InitRules(),
LocalPortOpener: &localPortOpener{},
PortTableCache: cache.NewIndexer(GetPortTableKey, cache.Indexers{
NodePortIndex: NodePortIndexFunc,
PodEndpointIndex: PodEndpointIndexFunc,
PodIPIndex: PodIPIndexFunc,
}),
StartPort: start,
EndPort: end,
PortSearchStart: start,
PodPortRules: rules.InitRules(),
LocalPortOpener: &localPortOpener{},
}
if err := ptable.PodPortRules.Init(); err != nil {
return nil, err
Expand All @@ -96,34 +170,40 @@ func NewPortTable(start, end int) (*PortTable, error) {
func (pt *PortTable) CleanupAllEntries() {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
pt.NodePortTable = make(map[string]*NodePortData)
pt.PodEndpointTable = make(map[string]*NodePortData)
pt.releaseDataFromPortTableCache()
}

func (pt *PortTable) GetDataForPodIP(ip string) []NodePortData {
func (pt *PortTable) GetDataForPodIP(ip string) []*NodePortData {
pt.tableLock.RLock()
defer pt.tableLock.RUnlock()
return pt.getDataForPodIP(ip)
}

func (pt *PortTable) getDataForPodIP(ip string) []NodePortData {
var allData []NodePortData
for i := range pt.NodePortTable {
if pt.NodePortTable[i].PodIP == ip {
allData = append(allData, *pt.NodePortTable[i])
}
func (pt *PortTable) getDataForPodIP(ip string) []*NodePortData {
allData, exist := pt.getPortTableCacheFromPodIPIndex(ip)
if exist == false {
return nil
}
return allData
}

func (pt *PortTable) getEntryByPodIPPort(ip string, port int) *NodePortData {
return pt.PodEndpointTable[podIPPortFormat(ip, port)]
// podIPPortProtoFormat formats the ip, port and protocol to string ip:port:protocol.
func podIPPortProtoFormat(ip string, port int, protocol string) string {
return fmt.Sprintf("%s:%d:%s", ip, port, protocol)
}

func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData {
data, exists := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol))
if exists == false {
return nil
}
return data
}

func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool {
pt.tableLock.RLock()
defer pt.tableLock.RUnlock()
if data := pt.getEntryByPodIPPort(podIP, podPort); data != nil {
if data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol); data != nil {
return data.ProtocolInUse(protocol)
}
return false
Expand All @@ -134,11 +214,6 @@ func NodePortProtoFormat(nodeport int, protocol string) string {
return fmt.Sprintf("%d:%s", nodeport, protocol)
}

// podIPPortFormat formats the ip, port to string ip:port.
func podIPPortFormat(ip string, port int) string {
return fmt.Sprintf("%s:%d", ip, port)
}

// openLocalPort binds to the provided port.
// This is inspired by the openLocalPort function in kube-proxy:
// https://github.com/kubernetes/kubernetes/blob/86f8c3ee91b6faec437f97e3991107747d7fc5e8/pkg/proxy/iptables/proxier.go#L1664
Expand Down
Loading

0 comments on commit 16d5be1

Please sign in to comment.