Skip to content

Commit

Permalink
Flexible Pipeline (#3058)
Browse files Browse the repository at this point in the history
Flexible Pipeline is framework to generate OVS pipelines with dynamic
table IDs. There are some abstractions introduced in this framework:

- **feature** is the interface to program a major function in Antrea
  data path
- **stage** in FlexiblePipeline is used to group tables which implement
  similar functions in a pipeline
- **pipeline** is used to implement a major function in Antrea data
  path
- **Table** is the basic unit to build OVS pipelines. A Table can be
  referenced by one or more features, but its member struct ofTable
  will be initialized and realized on OVS only when it is referenced
  by any activated features

At this moment, we have the following features:

- featurePodConnectivity, implementation of connectivity for Pods,
  activated by default
- featureNetworkPolicy, implementation of K8s NetworkPolicy and Antrea
  NetworkPolicy, activated by default
- featureService, implementation of K8s Service, activated by default
- featureEgress, implementation of Egress, activation is determined by
  feature gate Egress
- featureMulticast, implementation of multicast, activation is
  determined by feature gate Multicast

At this moment, we have the following stages:

- stageStart is only used to initialize PipelineClassifierTable
- stageClassifier is used to classify packets "category" (tunnel, local
  gateway or local Pod, etc)
- stageValidation is used to validate packets
- stageConntrackState is used to transform committed packets in CT zones
- stagePreRouting is similar to PREROUTING chain of nat table in iptables
  DNAT for Service connections is performed in this stage
- stageEgressSecurity is used to install egress rules for K8s
  NetworkPolicy and Antrea NetworkPolicy
- stageRouting is used to implement L3 Forwarding of packets.
- stagePostRouting is similar to POSTROUTING chain of nat table in
  iptables. SNAT for Service connections is performed in this stage
- stageSwitching is used to implement L2 Forwarding of packets
- stageIngressSecurity is used to install ingress rules for K8s
  NetworkPolicy and Antrea NetworkPolicy
- stageConntrack is used to commit non-Service connections
- stageOutput is used to output packets to target port

At this moment, we have the following pipelines:

- pipelineRoot is only used to initialize PipelineClassifierTable
- pipelineARP is used to process ARP packets
- pipelineIP is used to process IPv4/IPv6 packets
- pipelineMulticast is used to process multicast packets

After refactoring, PipelineClassifierTable is table 0. It's the only
fixed table ID. Packets are forwarded to different pipelines in this
table.

OVS pipelineARP is used to process ARP packets. Stages and tables
in this pipeline:

- stageValidation
  - ARPSpoofGuardTable, ARP-spoofing part of original SpoofGuardTable
- stageOutput
  - ARPResponderTable, renamed from arpResponderTable

OVS pipelineIP is used to process IPv4/IPv6 packets. Stages and
tables in this pipelines

- stageClassifier
  - ClassifierTable, original ClassifierTable (0)
- stageValidation
  - SpoofGuardTable, part of original SpoofGuardTable (10)
  - IPv6Table, original IPv6Table (21)
  - IPClassifierTable, new added for multicast
- stageConntrackState
  - SNATConntrackTable, original ServiceConntrackTable (35)
  - ConntrackTable, original ConntrackTable (30)
  - ConntrackStateTable, original ConntrackStateTable (31)
- stagePreRouting
  - PreRoutingClassifierTable, new added
  - NodePortMarkTable, original ServiceClassifierTable (35)
  - SessionAffinityTable, original SessionAffinityTable (41)
  - ServiceLBTable, original ServiceLBTable (41)
  - EndpointDNATTable, original EndpointDNATTable (42)
  - DNATTable, original DNATTable (40)
- stageEgressSecurity
  - AntreaPolicyEgressRuleTable, original AntreaPolicyEgressRuleTable
   (45)
  - EgressRuleTable, original EgressRuleTable (50)
  - EgressDefaultTable, original EgressDefaultTable (60)
  - EgressMetricTable, original EgressMetricTable (61)
- stageRouting
  - L3ForwardingTable, original L3ForwardingTable (70)
  - EgressMarkTable, original SNATTable (71)
  - L3DecTTLTable, original L3DecTTLTable (72)
- stagePostRouting
  - ServiceMarkTable, new added
  - SNATConntrackCommitTable, origin ServiceConntrackCommitTable
    (105)
- stageSwitching
  - L2ForwardingCalcTable, original L2ForwardingCalcTable (80)
- stageIngressSecurity
  - IngressSecurityClassifierTable, new added
  - AntreaPolicyIngressRuleTable, original AntreaPolicyIngressRuleTable
    (85)
  - IngressRuleTable, original IngressRuleTable (90)
  - IngressDefaultTable, original IngressDefaultTable (100)
  - IngressMetricTable, original IngressDefaultTable (101)
- stageConntrack
  - ConntrackCommitTable, original ConntrackCommitTable (105)
- stageOutput
  - L2ForwardingOutTable, original L2ForwardingOutTable (110)

OVS pipelineMulticast is used to process multicast packets.
Stages and tables in this pipeline:

- stageRouting
  - MulticastTable, original MulticastTable (22)

Removed tables:

- original ServiceHairpinTable (22)
- original DefaultTierEgressRuleTable (49)
- original HairpinSNATTable (108)

For hairpin connection, SNAT is performed by CT operation instead of
modifying source IP stateless. Another change is to use different IPs
to perform SNAT:

- Hairpin Service connection initiated through a local Pod, and SNAT
  is performed with the Antrea gateway IP.
- Hairpin Service connection initiated through the Antrea gateway,
  and SNAT is performed with a virtual IP.

Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl authored Mar 22, 2022
1 parent 7764255 commit 66208c5
Show file tree
Hide file tree
Showing 37 changed files with 4,201 additions and 2,407 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
c.ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonNP), "dnsresponse", c.fqdnController)
}
}
c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled)
c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled)
c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, groupIDUpdates)
if statusManagerEnabled {
c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,20 @@ func newNetworkPolicyWithMultipleRules(name string, uid types.UID, from, to, app
}
}

func prepareMockTables() {
openflow.InitMockTables(
map[*openflow.Table]uint8{
openflow.AntreaPolicyEgressRuleTable: uint8(5),
openflow.EgressRuleTable: uint8(6),
openflow.EgressDefaultTable: uint8(7),
openflow.AntreaPolicyIngressRuleTable: uint8(12),
openflow.IngressRuleTable: uint8(13),
openflow.IngressDefaultTable: uint8(14),
})
}

func TestAddSingleGroupRule(t *testing.T) {
prepareMockTables()
controller, clientset, reconciler := newTestController()
addressGroupWatcher := watch.NewFake()
appliedToGroupWatcher := watch.NewFake()
Expand Down Expand Up @@ -280,6 +293,7 @@ func TestAddSingleGroupRule(t *testing.T) {
}

func TestAddMultipleGroupsRule(t *testing.T) {
prepareMockTables()
controller, clientset, reconciler := newTestController()
addressGroupWatcher := watch.NewFake()
appliedToGroupWatcher := watch.NewFake()
Expand Down Expand Up @@ -359,6 +373,7 @@ func TestAddMultipleGroupsRule(t *testing.T) {
}

func TestDeleteRule(t *testing.T) {
prepareMockTables()
controller, clientset, reconciler := newTestController()
addressGroupWatcher := watch.NewFake()
appliedToGroupWatcher := watch.NewFake()
Expand Down Expand Up @@ -406,6 +421,7 @@ func TestDeleteRule(t *testing.T) {
}

func TestAddNetworkPolicyWithMultipleRules(t *testing.T) {
prepareMockTables()
controller, clientset, reconciler := newTestController()
addressGroupWatcher := watch.NewFake()
appliedToGroupWatcher := watch.NewFake()
Expand Down Expand Up @@ -488,6 +504,7 @@ func TestAddNetworkPolicyWithMultipleRules(t *testing.T) {
}

func TestNetworkPolicyMetrics(t *testing.T) {
prepareMockTables()
// Initialize NetworkPolicy metrics (prometheus)
metrics.InitializeNetworkPolicyMetrics()
controller, clientset, reconciler := newTestController()
Expand Down
19 changes: 11 additions & 8 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,19 @@ func newReconciler(ofClient openflow.Client,
groupCounters []proxytypes.GroupCounter,
v4Enabled bool,
v6Enabled bool,
antreaPolicyEnabled bool,
) *reconciler {
priorityAssigners := map[uint8]*tablePriorityAssigner{}
for _, table := range openflow.GetAntreaPolicyBaselineTierTables() {
priorityAssigners[table.GetID()] = &tablePriorityAssigner{
assigner: newPriorityAssigner(true),
if antreaPolicyEnabled {
for _, table := range openflow.GetAntreaPolicyBaselineTierTables() {
priorityAssigners[table.GetID()] = &tablePriorityAssigner{
assigner: newPriorityAssigner(true),
}
}
}
for _, table := range openflow.GetAntreaPolicyMultiTierTables() {
priorityAssigners[table.GetID()] = &tablePriorityAssigner{
assigner: newPriorityAssigner(false),
for _, table := range openflow.GetAntreaPolicyMultiTierTables() {
priorityAssigners[table.GetID()] = &tablePriorityAssigner{
assigner: newPriorityAssigner(false),
}
}
}
reconciler := &reconciler{
Expand Down Expand Up @@ -297,7 +300,7 @@ func (r *reconciler) getOFRuleTable(rule *CompletedRule) uint8 {
}
return openflow.EgressRuleTable.GetID()
}
var ruleTables []binding.Table
var ruleTables []*openflow.Table
if rule.Direction == v1beta2.DirectionIn {
ruleTables = openflow.GetAntreaPolicyIngressTables()
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/controller/networkpolicy/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ func newTestReconciler(t *testing.T, controller *gomock.Controller, ifaceStore i
ch := make(chan string, 100)
groupIDAllocator := openflow.NewGroupAllocator(v6Enabled)
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch)}
r := newReconciler(ofClient, ifaceStore, newIDAllocator(testAsyncDeleteInterval), f, groupCounters, v4Enabled, v6Enabled)
r := newReconciler(ofClient, ifaceStore, newIDAllocator(testAsyncDeleteInterval), f, groupCounters, v4Enabled, v6Enabled, true)
return r
}

func TestReconcilerForget(t *testing.T) {
prepareMockTables()
tests := []struct {
name string
lastRealizeds map[string]*lastRealized
Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,17 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
obs = append(obs, *ob)
}

// Collect Service DNAT and SNAT.
// Collect Service connections.
// - For packet is DNATed only, the final state is that ipDst != ctNwDst (in DNAT CT zone).
// - For packet is both DNATed and SNATed, the first state is also ipDst != ctNwDst (in DNAT CT zone), but the final
// state is that ipSrc != ctNwSrc (in SNAT CT zone). The state in DNAT CT zone cannot be recognized in SNAT CT zone.
if !tfState.receiverOnly {
if isValidCtNw(ctNwDst) && ipDst != ctNwDst {
if isValidCtNw(ctNwDst) && ipDst != ctNwDst || isValidCtNw(ctNwSrc) && ipSrc != ctNwSrc {
ob := &crdv1alpha1.Observation{
Component: crdv1alpha1.ComponentLB,
Action: crdv1alpha1.ActionForwarded,
TranslatedDstIP: ipDst,
}
// Service SNAT can only happen alongside DNAT
// and only for hairpinned packets at the moment.
if isValidCtNw(ctNwSrc) && ipSrc != ctNwSrc {
ob.TranslatedSrcIP = ipSrc
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/agent/controller/traceflow/packetin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,24 @@ import (
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
)

func prepareMockTables() {
openflow.InitMockTables(
map[*openflow.Table]uint8{
openflow.AntreaPolicyEgressRuleTable: uint8(5),
openflow.EgressRuleTable: uint8(6),
openflow.EgressDefaultTable: uint8(7),
openflow.EgressMetricTable: uint8(8),
openflow.AntreaPolicyIngressRuleTable: uint8(12),
openflow.IngressRuleTable: uint8(13),
openflow.IngressDefaultTable: uint8(14),
openflow.IngressMetricTable: uint8(15),
openflow.L2ForwardingOutTable: uint8(17),
})
}

func Test_getNetworkPolicyObservation(t *testing.T) {
prepareMockTables()

type args struct {
tableID uint8
ingress bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
ofclient "antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
agenttypes "antrea.io/antrea/pkg/agent/types"
cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
Expand Down Expand Up @@ -83,7 +82,7 @@ var (
Priority: nil,
Name: "",
FlowID: uint32(0),
TableID: ofclient.IngressRuleTable.GetID(),
TableID: uint8(10),
PolicyRef: &np1,
EnableLogging: false,
}
Expand Down
Loading

0 comments on commit 66208c5

Please sign in to comment.