Skip to content

Commit

Permalink
Improve multi-protocol support for NPL
Browse files Browse the repository at this point in the history
There is an edge case, which is not handled well by the current NPL
controller implementation: if a given Pod port needs to be exposed for
both TCP and UDP, and the first available Node port is only available
for TCP, it will be still be selected and NPL rule installation will
succeed for TCP but not for UDP.

We have 2 options:

1. reserve the port for both protocols ahead of time, even if the port
is only needed for one protocol initially.
2. support using different Node ports for different protocols, even when
the Pod port is the same.

In this patch, we go with option 1) to preserve the "nice" property that
a give Pod port maps to a unique Node port.

Fixes #2894

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas committed Oct 19, 2021
1 parent 373c25e commit d490051
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 114 deletions.
16 changes: 7 additions & 9 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
}
podPorts[targetPortProto] = struct{}{}
portData := c.portTable.GetEntry(podIP, port)
if portData != nil && !portData.HasProtocol(protocol) {
if portData != nil && !portData.ProtocolInUse(protocol) {
// If the PortTable has an entry for the Pod but does not have an
// entry with protocol, we enforce AddRule for the missing Protocol.
portData = nil
Expand Down Expand Up @@ -592,14 +592,12 @@ func (c *NPLController) waitForRulesInitialization() {
klog.V(2).InfoS("Found NodePortLocal annotation for which the allocated port doesn't fall into the configured range", "pod", klog.KObj(pod))
continue
}
for _, protocol := range npl.Protocols {
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
Protocol: protocol,
})
}
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
Protocols: npl.Protocols,
})
}
}

Expand Down
38 changes: 23 additions & 15 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ import (
)

func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener portcache.LocalPortOpener) *portcache.PortTable {
ptable := portcache.PortTable{StartPort: defaultStartPort, EndPort: defaultEndPort}
ptable.NodePortTable = make(map[int]*portcache.NodePortData)
ptable.PodEndpointTable = make(map[string]*portcache.NodePortData)
ptable.PodPortRules = mockIPTables
ptable.LocalPortOpener = mockPortOpener
return &ptable
return &portcache.PortTable{
NodePortTable: make(map[int]*portcache.NodePortData),
PodEndpointTable: make(map[string]*portcache.NodePortData),
StartPort: defaultStartPort,
EndPort: defaultEndPort,
PortSearchStart: defaultStartPort,
PodPortRules: mockIPTables,
LocalPortOpener: mockPortOpener,
}
}

const (
Expand Down Expand Up @@ -351,7 +354,7 @@ func TestSvcTypeUpdate(t *testing.T) {
defer testData.tearDown()

// Update Service type to NodePort.
testSvc.Spec.Type = "NodePort"
testSvc.Spec.Type = corev1.ServiceTypeNodePort
testData.updateServiceOrFail(testSvc)

// Check that annotation and the rule are removed.
Expand All @@ -360,7 +363,7 @@ func TestSvcTypeUpdate(t *testing.T) {
assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP))

// Update Service type to ClusterIP.
testSvc.Spec.Type = "ClusterIP"
testSvc.Spec.Type = corev1.ServiceTypeClusterIP
testData.updateServiceOrFail(testSvc)

_, err = testData.pollForPodAnnotation(testPod.Name, true)
Expand Down Expand Up @@ -747,14 +750,19 @@ var (
// TestNodePortAlreadyBoundTo validates that when a port is already bound to, a different port will
// be selected for NPL.
func TestNodePortAlreadyBoundTo(t *testing.T) {
var nodePort int
nodePort1 := defaultStartPort
nodePort2 := nodePort1 + 1
testConfig := newTestConfig().withCustomPortOpenerExpectations(func(mockPortOpener *portcachetesting.MockLocalPortOpener) {
gomock.InOrder(
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any(), gomock.Any()).Return(nil, portTakenError),
mockPortOpener.EXPECT().OpenLocalPort(gomock.Any(), protocolTCP).DoAndReturn(func(port int, protocol string) (portcache.Closeable, error) {
nodePort = port
return &fakeSocket{}, nil
}),
// Based on the implementation, we know that TCP is checked first...
// 1. port1 is checked for TCP availability -> success
mockPortOpener.EXPECT().OpenLocalPort(nodePort1, protocolTCP).Return(&fakeSocket{}, nil),
// 2. port1 is checked for UDP availability (even if the Service uses TCP only) -> error
mockPortOpener.EXPECT().OpenLocalPort(nodePort1, protocolUDP).Return(nil, portTakenError),
// 3. port2 is checked for TCP availability -> success
mockPortOpener.EXPECT().OpenLocalPort(nodePort2, protocolTCP).Return(&fakeSocket{}, nil),
// 4. port2 is checked for UDP availability -> success
mockPortOpener.EXPECT().OpenLocalPort(nodePort2, protocolUDP).Return(&fakeSocket{}, nil),
)
})
customNodePort := defaultStartPort + 1
Expand All @@ -763,7 +771,7 @@ func TestNodePortAlreadyBoundTo(t *testing.T) {

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
expectedAnnotations := newExpectedNPLAnnotations().Add(&nodePort, defaultPort, protocolTCP)
expectedAnnotations := newExpectedNPLAnnotations().Add(&nodePort2, defaultPort, protocolTCP)
expectedAnnotations.Check(t, value)
}

Expand Down
Loading

0 comments on commit d490051

Please sign in to comment.