Skip to content

Commit

Permalink
[ExternalNode] Handle ExternalNode from Antrea agent side
Browse files Browse the repository at this point in the history
1. Provide an example RBAC yaml file for Antrea
agent running on VM with definitions of ClusterRole, ServiceAccount
and ClusterRoleBinding.

2. Add ExternalNodeController to monitor ExternalNode CRUD, invoke
interfaces to operate OVS and update interface store with ExternalEntitiyInterface.

3. Defined the interfaces for OVS operations.

4. Add a channel for receiving ExternalNode updates from ExternalNodeController
and notifying NetworkPolicyController to reconcile rules related to the updated ExternalNodes.
This is to handle the case when NetworkPolicyController reconciles rules before ExternalEntitityInterface
is realized in the interface store.

5. Update NetworkPolicy reconciler to invoke GetInterfacesByEntity() and GetContainerInterfacesByPod()
for ExternalEntity and Pod separately.

Signed-off-by: Mengdie Song <[email protected]>
  • Loading branch information
mengdie-song committed May 17, 2022
1 parent 7242689 commit 004ba82
Show file tree
Hide file tree
Showing 15 changed files with 677 additions and 29 deletions.
101 changes: 101 additions & 0 deletions build/yamls/externalnode/vm-agent-rbac.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: vm-agent
namespace: vm-demo # Change the namespace to where vm-agent is expected to run.
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: vm-agent
rules:
# antrea-controller distributes the CA certificate as a ConfigMap named `antrea-ca` in the Antrea deployment Namespace.
# vm-agent needs to access `antrea-ca` to connect with antrea-controller.
- apiGroups:
- ""
resources:
- configmaps
resourceNames:
- antrea-ca
verbs:
- get
- watch
- list
# This is the content of built-in role kube-system/extension-apiserver-authentication-reader.
# But it doesn't have list/watch permission before K8s v1.17.0 so the extension apiserver (vm-agent) will
# have permission issue after bumping up apiserver library to a version that supports dynamic authentication.
# See https://github.com/kubernetes/kubernetes/pull/85375
# To support K8s clusters older than v1.17.0, we grant the required permissions directly instead of relying on
# the extension-apiserver-authentication role.
- apiGroups:
- ""
resourceNames:
- extension-apiserver-authentication
resources:
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- crd.antrea.io
resourceNames:
- VM1 # Change the ExternalNode name which vm-agent is expected to update.
resources:
- antreaagentinfos
verbs:
- get
- update
- apiGroups:
- controlplane.antrea.io
resources:
- networkpolicies
- appliedtogroups
- addressgroups
verbs:
- get
- watch
- list
- apiGroups:
- controlplane.antrea.io
resources:
- nodestatssummaries
verbs:
- create
- apiGroups:
- controlplane.antrea.io
resources:
- networkpolicies/status
verbs:
- create
- get
- apiGroups:
- crd.antrea.io
resources:
- externalentities
verbs:
- get
- watch
- list
- apiGroups:
- crd.antrea.io
resources:
- externalnodes
verbs:
- get
- watch
- list
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: vm-agent
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: vm-agent
subjects:
- kind: ServiceAccount
name: vm-agent
namespace: vm-demo # Change the namespace to where vm-agent is expected to run.
24 changes: 23 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/controller/serviceexternalip"
"antrea.io/antrea/pkg/agent/controller/traceflow"
"antrea.io/antrea/pkg/agent/externalnode"
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/exporter"
"antrea.io/antrea/pkg/agent/interfacestore"
Expand Down Expand Up @@ -277,7 +278,16 @@ func run(o *Options) error {
// podUpdateChannel is a channel for receiving Pod updates from CNIServer and
// notifying NetworkPolicyController and EgressController to reconcile rules
// related to the updated Pods.
podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)
var podUpdateChannel *channel.SubscribableChannel
// externalNodeUpdateChannel is a channel for receiving ExternalNode updates from ExternalNodeController and
// notifying NetworkPolicyController to reconcile rules related to the updated ExternalNodes.
var externalNodeUpdateChannel *channel.SubscribableChannel
if o.nodeType == config.K8sNode {
podUpdateChannel = channel.NewSubscribableChannel("PodUpdate", 100)
} else {
externalNodeUpdateChannel = channel.NewSubscribableChannel("ExternalNodeUpdate", 100)
}

// We set flow poll interval as the time interval for rule deletion in the async
// rule cache, which is implemented as part of the idAllocator. This is to preserve
// the rule info for populating NetworkPolicy fields in the Flow Exporter even
Expand All @@ -296,6 +306,7 @@ func run(o *Options) error {
ifaceStore,
nodeConfig.Name,
podUpdateChannel,
externalNodeUpdateChannel,
groupCounters,
groupIDUpdates,
antreaPolicyEnabled,
Expand Down Expand Up @@ -367,6 +378,7 @@ func run(o *Options) error {

var cniServer *cniserver.CNIServer
var cniPodInfoStore cnipodcache.CNIPodInfoStore
var externalNodeController *externalnode.ExternalNodeController
if o.nodeType == config.K8sNode {
isChaining := false
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
Expand Down Expand Up @@ -395,6 +407,13 @@ func run(o *Options) error {
return fmt.Errorf("error initializing CNI server: %v", err)
}
}
} else {
externalNodeInformer := crdInformerFactory.Crd().V1alpha1().ExternalNodes()
externalNodeController, err = externalnode.NewExternalNodeController(ovsBridgeClient, ofClient, externalNodeInformer,
ifaceStore, externalNodeUpdateChannel, o.config.Namespace)
if err != nil {
return fmt.Errorf("error creating ExternalNode controller: %v", err)
}
}

var traceflowController *traceflow.Controller
Expand Down Expand Up @@ -487,6 +506,9 @@ func run(o *Options) error {
go podUpdateChannel.Run(stopCh)
go cniServer.Run(stopCh)
go nodeRouteController.Run(stopCh)
} else {
go externalNodeUpdateChannel.Run(stopCh)
go externalNodeController.Run(stopCh)
}

go antreaClientProvider.Run(stopCh)
Expand Down
3 changes: 3 additions & 0 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,4 +452,7 @@ func (o *Options) setExternalNodeDefaultOptions() {
o.config.EnablePrometheusMetrics = new(bool)
*o.config.EnablePrometheusMetrics = false
}
if o.config.Namespace == "" {
o.config.Namespace = "default"
}
}
15 changes: 12 additions & 3 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/agent/cniserver"
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/externalnode"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/openflow/cookie"
Expand Down Expand Up @@ -275,9 +276,17 @@ func (i *Initializer) initInterfaceStore() error {
case interfacestore.AntreaTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
case interfacestore.AntreaHost:
// Not load the host interface, because it is configured on the OVS bridge port, and we don't need a
// specific interface in the interfaceStore.
intf = nil
if port.Name == i.ovsBridge {
// Not load the host interface, because it is configured on the OVS bridge port, and we don't need a
// specific interface in the interfaceStore.
intf = nil
} else {
var err error
intf, err = externalnode.ParseHostInterfaceConfig(i.ovsBridgeClient, port, ovsPort)
if err != nil {
return err
}
}
case interfacestore.AntreaContainer:
// The port should be for a container interface.
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true)
Expand Down
38 changes: 36 additions & 2 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/externalnode"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -336,7 +337,8 @@ func toServicesIndexFunc(obj interface{}) ([]string, error) {
}

// newRuleCache returns a new *ruleCache.
func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, serviceGroupIDUpdate <-chan string) *ruleCache {
func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Subscriber, externalNodeUpdateSubscriber channel.Subscriber,
serviceGroupIDUpdate <-chan string) *ruleCache {
rules := cache.NewIndexer(
ruleKeyFunc,
cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc, toServicesIndex: toServicesIndexFunc},
Expand All @@ -350,7 +352,14 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub
groupIDUpdates: serviceGroupIDUpdate,
}
// Subscribe Pod update events from CNIServer.
podUpdateSubscriber.Subscribe(cache.processPodUpdate)
if podUpdateSubscriber != nil {
podUpdateSubscriber.Subscribe(cache.processPodUpdate)
}
// Subscribe ExternalNode update events from ExternalNodeController
if externalNodeUpdateSubscriber != nil {
externalNodeUpdateSubscriber.Subscribe(cache.processExternalNodeUpdate)
}

go cache.processGroupIDUpdates()
return cache
}
Expand Down Expand Up @@ -379,6 +388,31 @@ func (c *ruleCache) processPodUpdate(e interface{}) {
}
}

// processExternalNodeUpdate will be called when ExternalNodeController publishes an ExternalNode update event.
// It finds out AppliedToGroups that contains this ExternalNode converted ExternalEntity and trigger reconciling
// of related rules.
// It can enforce NetworkPolicies to ExternalEntities after ExternalEntityInterface is realised in the interface store.
func (c *ruleCache) processExternalNodeUpdate(e interface{}) {
externalNode := e.(*crdv1alpha1.ExternalNode)
eeName := externalnode.GenExternalEntityName(externalNode)
c.appliedToSetLock.RLock()
defer c.appliedToSetLock.RUnlock()
externalEntityEquals := func(ee *v1beta.ExternalEntityReference, namespace string, name string) bool {
if ee.Namespace == namespace && ee.Name == name {
return true
}
return false
}
for group, memberSet := range c.appliedToSetByGroup {
for _, member := range memberSet.Items() {
if externalEntityEquals(member.ExternalEntity, externalNode.Namespace, eeName) {
c.onAppliedToGroupUpdate(group)
break
}
}
}
}

// processGroupIDUpdates is an infinite loop that takes Service groupID
// update events from the channel, finds out rules that refer this Service in
// ToServices field and use dirtyRuleHandler to re-queue these rules.
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, *channel.SubscribableCh
recorder := newDirtyRuleRecorder()
podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)
ch2 := make(chan string, 100)
c := newRuleCache(recorder.Record, podUpdateChannel, ch2)
c := newRuleCache(recorder.Record, podUpdateChannel, nil, ch2)
return c, recorder, podUpdateChannel
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
ifaceStore interfacestore.InterfaceStore,
nodeName string,
podUpdateSubscriber channel.Subscriber,
externalNodeUpdateSubscriber channel.Subscriber,
groupCounters []proxytypes.GroupCounter,
groupIDUpdates <-chan string,
antreaPolicyEnabled bool,
Expand Down Expand Up @@ -144,7 +145,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
}
}
c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled)
c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates)
c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalNodeUpdateSubscriber, groupIDUpdates)
if statusManagerEnabled {
c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
ch2 := make(chan string, 100)
groupIDAllocator := openflow.NewGroupAllocator(false)
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)}
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, groupCounters, ch2,
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2,
true, true, true, true, testAsyncDeleteInterval, "8.8.8.8:53", true, false)
reconciler := newMockReconciler()
controller.reconciler = reconciler
Expand Down
28 changes: 16 additions & 12 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,20 +873,22 @@ func (r *reconciler) GetRuleByFlowID(ruleFlowID uint32) (*types.PolicyRule, bool
func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 {
ofPorts := sets.NewInt32()
for _, m := range members {
var entityName, ns string
var ifaces []*interfacestore.InterfaceConfig
var name, ns string
if m.Pod != nil {
entityName, ns = m.Pod.Name, m.Pod.Namespace
name, ns = m.Pod.Name, m.Pod.Namespace
ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns)
} else if m.ExternalEntity != nil {
entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace
name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace
ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns)
}
ifaces := r.ifaceStore.GetInterfacesByEntity(entityName, ns)
if len(ifaces) == 0 {
// This might be because the container has been deleted during realization or hasn't been set up yet.
klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName)
klog.Infof("Can't find interface for %s/%s, skipping", ns, name)
continue
}
for _, iface := range ifaces {
klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, entityName)
klog.V(2).Infof("Got OFPort %v for %s/%s", iface.OFPort, ns, name)
ofPorts.Insert(iface.OFPort)
}
}
Expand All @@ -896,22 +898,24 @@ func (r *reconciler) getOFPorts(members v1beta2.GroupMemberSet) sets.Int32 {
func (r *reconciler) getIPs(members v1beta2.GroupMemberSet) sets.String {
ips := sets.NewString()
for _, m := range members {
var entityName, ns string
var ifaces []*interfacestore.InterfaceConfig
var name, ns string
if m.Pod != nil {
entityName, ns = m.Pod.Name, m.Pod.Namespace
name, ns = m.Pod.Name, m.Pod.Namespace
ifaces = r.ifaceStore.GetContainerInterfacesByPod(name, ns)
} else if m.ExternalEntity != nil {
entityName, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace
name, ns = m.ExternalEntity.Name, m.ExternalEntity.Namespace
ifaces = r.ifaceStore.GetInterfacesByEntity(name, ns)
}
ifaces := r.ifaceStore.GetInterfacesByEntity(entityName, ns)
if len(ifaces) == 0 {
// This might be because the container has been deleted during realization or hasn't been set up yet.
klog.Infof("Can't find interface for %s/%s, skipping", ns, entityName)
klog.Infof("Can't find interface for %s/%s, skipping", ns, name)
continue
}
for _, iface := range ifaces {
for _, ipAddr := range iface.IPs {
if ipAddr != nil {
klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, entityName)
klog.V(2).Infof("Got IP %v for %s/%s", iface.IPs, ns, name)
ips.Insert(ipAddr.String())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *fakeNetworkPolicyControl) getNetworkPolicyStatus() *v1beta2.NetworkPoli
}

func newTestStatusController() (*StatusController, *ruleCache, *fakeNetworkPolicyControl) {
ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), make(chan string, 100))
ruleCache := newRuleCache(func(s string) {}, channel.NewSubscribableChannel("PodUpdate", 100), nil, make(chan string, 100))
statusControl := &fakeNetworkPolicyControl{}
statusController := newStatusController(nil, testNode1, ruleCache)
statusController.statusControlInterface = statusControl
Expand Down
Loading

0 comments on commit 004ba82

Please sign in to comment.