Skip to content

Commit

Permalink
Update incorporating changes in NPL for review comments - part14
Browse files Browse the repository at this point in the history
  • Loading branch information
monotosh-avi committed Jan 6, 2021
1 parent 1aafc92 commit 1cdac8b
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 30 deletions.
4 changes: 3 additions & 1 deletion cmd/antrea-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type AgentConfig struct {
// Flow export frequency should be greater than or equal to 1.
// Defaults to "12".
FlowExportFrequency uint `yaml:"flowExportFrequency,omitempty"`
// Provide the port range used by NodePortLocal for programming IPTABLES rules. This is only used if NodePortLocal is enabled.
// Provide the port range used by NodePortLocal. When the NodePortLocal feature is enabled, a port from that range will be assigned
// whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports),
// and all Node traffic directed to that Pod will be forwarded to the Pod.
NPLPortRange string `yaml:"nplPortRange,omitempty"`
}
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/k8s/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,6 @@ func (c *Controller) updatePodAnnotation(pod *corev1.Pod) error {
klog.Warningf("Unable to update Pod %s with annotation: %+v", pod.Name, err)
return err
}
klog.V(2).Infof("Successfully updated Pod %s/%s annotation", pod.Namespace, pod.Name)
klog.V(2).Infof("Successfully updated annotation for Pod %s/%s", pod.Namespace, pod.Name)
return nil
}
16 changes: 10 additions & 6 deletions pkg/agent/nodeportlocal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func NewNPLController(kubeClient clientset.Interface, pt *portcache.PortTable) *
return &nplCtrl
}

func podKeyFunc(pod *corev1.Pod) string {
return pod.Namespace + "/" + pod.Name
}

// Run starts to watch and process Pod updates for the Node where Antrea Agent is running.
// It starts a queue and a fixed number of workers to process the objects from the queue.
func (c *Controller) Run(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -102,7 +106,7 @@ func (c *Controller) EnqueueObjAdd(obj interface{}) {
return
}
}
podKey := pod.Namespace + "/" + pod.Name
podKey := podKeyFunc(pod)
c.queue.Add(podKey)

}
Expand All @@ -117,7 +121,7 @@ func (c *Controller) EnqueueObjUpdate(oldObj, newObj interface{}) {
return
}
}
podKey := pod.Namespace + "/" + pod.Name
podKey := podKeyFunc(pod)
c.queue.Add(podKey)

}
Expand All @@ -132,7 +136,7 @@ func (c *Controller) EnqueueObjDel(obj interface{}) {
return
}
}
podKey := pod.Namespace + "/" + pod.Name
podKey := podKeyFunc(pod)
c.queue.Add(podKey)

}
Expand Down Expand Up @@ -186,7 +190,7 @@ func (c *Controller) addRuleForPod(pod *corev1.Pod) error {
return nil
}

// HandleDeletePod Removes rules from port table and
// HandleDeletePod removes rules from port table and
// rules programmed in the system based on implementation type (e.g. IPTABLES)
func (c *Controller) HandleDeletePod(key string) error {
klog.Infof("Got delete event for Pod: %s", key)
Expand Down Expand Up @@ -228,7 +232,7 @@ func (c *Controller) HandleAddUpdatePod(key string, obj interface{}) error {
if !c.portTable.RuleExists(podIP, int(cport.ContainerPort)) {
err = c.addRuleForPod(newPod)
if err != nil {
return fmt.Errorf("Failed to add rule for Pod %s/%s: %s", newPod.Namespace, newPod.Name, err.Error())
return fmt.Errorf("failed to add rule for Pod %s/%s: %s", newPod.Namespace, newPod.Name, err.Error())
}
updatePodAnnotation = true
}
Expand All @@ -248,7 +252,7 @@ func (c *Controller) HandleAddUpdatePod(key string, obj interface{}) error {
removeFromPodAnnotation(newPod, data.PodPort)
err := c.portTable.DeleteRule(podIP, int(data.PodPort))
if err != nil {
return fmt.Errorf("Failed to delete rule for Pod IP %s, Pod Port %d: %s", podIP, data.PodPort, err.Error())
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d: %s", podIP, data.PodPort, err.Error())
}
updatePodAnnotation = true
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/npl_agent_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
const resyncPeriod = 60 * time.Minute

// InitializeNPLAgent initializes the NodePortLocal (NPL) agent.
// It initializes the port table cache to keep rack of Node ports available for use by NPL,
// It initializes the port table cache to keep track of Node ports available for use by NPL,
// sets up event handlers to handle Pod add, update and delete events.
// When a Pod gets created, a free Node port is obtained from the port table cache and a DNAT rule is added to NAT traffic to the Pod's ip:port.
func InitializeNPLAgent(kubeClient clientset.Interface, portRange, nodeName string) (*k8s.Controller, error) {
Expand Down
42 changes: 24 additions & 18 deletions pkg/agent/nodeportlocal/nplagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,21 @@ func TestPodAdd(t *testing.T) {
var found bool

a := assert.New(t)
r := require.New(t)

testPod := getTestPod()
p, err := kubeClient.CoreV1().Pods(defaultNS).Create(context.TODO(), &testPod, metav1.CreateOptions{})
require.Nil(t, err, "Pod creation failed")
r.Nil(err, "Pod creation failed")
t.Logf("successfully created Pod: %v", p)

err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
updatedPod, err := kubeClient.CoreV1().Pods(defaultNS).Get(context.TODO(), defaultPodName, metav1.GetOptions{})
require.Nil(t, err, "Failed to get Pod")
r.Nil(err, "Failed to get Pod")
ann = updatedPod.GetAnnotations()
data, found = ann[k8s.NPLAnnotationStr]
return found, nil
})
require.Nil(t, err, "Poll for annotation check failed")
r.Nil(err, "Poll for annotation check failed")

var nplData []k8s.NPLAnnotation
json.Unmarshal([]byte(data), &nplData)
Expand All @@ -142,17 +143,18 @@ func TestPodUpdate(t *testing.T) {
var data string

a := assert.New(t)
r := require.New(t)

testPod := getTestPod()
testPod.Spec.Containers[0].Ports[0].ContainerPort = 8080
testPod.ResourceVersion = "2"
p, err := kubeClient.CoreV1().Pods(defaultNS).Update(context.TODO(), &testPod, metav1.UpdateOptions{})
require.Nil(t, err, "Pod creation failed")
r.Nil(err, "Pod creation failed")
t.Logf("successfully created Pod: %v", p)

err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
updatedPod, err := kubeClient.CoreV1().Pods(defaultNS).Get(context.TODO(), defaultPodName, metav1.GetOptions{})
require.Nil(t, err, "Failed to get Pod")
r.Nil(err, "Failed to get Pod")
ann = updatedPod.GetAnnotations()
data, _ = ann[k8s.NPLAnnotationStr]
json.Unmarshal([]byte(data), &nplData)
Expand All @@ -165,22 +167,24 @@ func TestPodUpdate(t *testing.T) {
}
return false, nil
})
require.Nil(t, err, "Poll for annotation check failed")
r.Nil(err, "Poll for annotation check failed")

a.Equal(portTable.RuleExists(defaultPodIP, defaultPort), false)
a.Equal(portTable.RuleExists(defaultPodIP, 8080), true)
}

// Make sure that when a pod gets deleted, corresponding entry gets deleted from local port cache also
func TestPodDel(t *testing.T) {
r := require.New(t)

err := kubeClient.CoreV1().Pods(defaultNS).Delete(context.TODO(), defaultPodName, metav1.DeleteOptions{})
require.Nil(t, err, "Pod deletion failed")
r.Nil(err, "Pod deletion failed")
t.Logf("successfully deleted Pod: %s", defaultPodName)

err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
return !portTable.RuleExists(defaultPodIP, 8080), nil
})
require.Nil(t, err, "Poll for rule check failed")
r.Nil(err, "Poll for rule check failed")
}

// Create a Pod with multiple ports and verify that Pod annotation and local port cache are updated correctly
Expand All @@ -190,22 +194,23 @@ func TestPodAddMultiPort(t *testing.T) {
var found bool

a := assert.New(t)
r := require.New(t)

testPod := getTestPod()
newPort := corev1.ContainerPort{ContainerPort: 90}
testPod.Spec.Containers[0].Ports = append(testPod.Spec.Containers[0].Ports, newPort)
p, err := kubeClient.CoreV1().Pods(defaultNS).Create(context.TODO(), &testPod, metav1.CreateOptions{})
require.Nil(t, err, "Pod creation failed")
r.Nil(err, "Pod creation failed")
t.Logf("successfully created Pod: %v", p)

err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
updatedPod, err := kubeClient.CoreV1().Pods(defaultNS).Get(context.TODO(), defaultPodName, metav1.GetOptions{})
require.Nil(t, err, "Failed to get Pod")
r.Nil(err, "Failed to get Pod")
ann = updatedPod.GetAnnotations()
data, found = ann[k8s.NPLAnnotationStr]
return found, nil
})
require.Nil(t, err, "Poll for annotation check failed")
r.Nil(err, "Poll for annotation check failed")

var nplData []k8s.NPLAnnotation
err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
Expand All @@ -215,7 +220,7 @@ func TestPodAddMultiPort(t *testing.T) {
}
return false, nil
})
require.Nil(t, err, "Poll for annotation length check failed")
r.Nil(err, "Poll for annotation length check failed")

a.Equal(nplData[0].NodeIP, defaultHostIP)
a.Equal(nplData[0].PodPort, defaultPort)
Expand All @@ -234,29 +239,30 @@ func TestAddMultiplePods(t *testing.T) {
var found bool

a := assert.New(t)
r := require.New(t)

testPod1 := getTestPod()
testPod1.Name = "pod1"
testPod1.Status.PodIP = "10.10.10.1"
p, err := kubeClient.CoreV1().Pods(defaultNS).Create(context.TODO(), &testPod1, metav1.CreateOptions{})
require.Nil(t, err, "Pod creation failed")
r.Nil(err, "Pod creation failed")
t.Logf("successfully created Pod: %v", p)

testPod2 := getTestPod()
testPod2.Name = "pod2"
testPod2.Status.PodIP = "10.10.10.2"
p, err = kubeClient.CoreV1().Pods(defaultNS).Create(context.TODO(), &testPod2, metav1.CreateOptions{})
require.Nil(t, err, "Pod creation failed")
r.Nil(err, "Pod creation failed")
t.Logf("successfully created Pod: %v", p)

err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
updatedPod, err := kubeClient.CoreV1().Pods(defaultNS).Get(context.TODO(), testPod1.Name, metav1.GetOptions{})
require.Nil(t, err, "Failed to get Pod")
r.Nil(err, "Failed to get Pod")
ann = updatedPod.GetAnnotations()
data, found = ann[k8s.NPLAnnotationStr]
return found, nil
})
require.Nil(t, err, "Poll for annotation check failed")
r.Nil(err, "Poll for annotation check failed")

var nplData []k8s.NPLAnnotation
json.Unmarshal([]byte(data), &nplData)
Expand All @@ -268,12 +274,12 @@ func TestAddMultiplePods(t *testing.T) {

wait.Poll(time.Second, 20*time.Second, func() (bool, error) {
updatedPod, err := kubeClient.CoreV1().Pods(defaultNS).Get(context.TODO(), testPod2.Name, metav1.GetOptions{})
require.Nil(t, err, "Failed to get Pod")
r.Nil(err, "Failed to get Pod")
ann = updatedPod.GetAnnotations()
data, found = ann[k8s.NPLAnnotationStr]
return found, nil
})
require.Nil(t, err, "Poll for annotation check failed")
r.Nil(err, "Poll for annotation check failed")

json.Unmarshal([]byte(data), &nplData)

Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/nodeportlocal/rules/iptable_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ func (ipt *IPTableRules) CreateChains() error {
}

// AddRule appends a DNAT rule in NodePortLocalChain chain of NAT table
func (ipt *IPTableRules) AddRule(port int, podip string) error {
func (ipt *IPTableRules) AddRule(port int, podIP string) error {
ruleSpec := []string{
"-p", "tcp", "-m", "tcp", "--dport",
fmt.Sprint(port), "-j", "DNAT", "--to-destination", podip,
fmt.Sprint(port), "-j", "DNAT", "--to-destination", podIP,
}
err := ipt.table.EnsureRule(iptables.NATTable, NodePortLocalChain, ruleSpec)
if err != nil {
return fmt.Errorf("IPTABLES rule creation failed for NPL with error: %v", err)
}
klog.Infof("successfully added rule for pod %s: %d", podip, port)
klog.Infof("successfully added rule for Pod %s: %d", podIP, port)
return nil
}

Expand Down

0 comments on commit 1cdac8b

Please sign in to comment.