Skip to content

Commit

Permalink
Modify OF table name
Browse files Browse the repository at this point in the history
1. Use Multipart messages to query OVS table features and modify the table name property if the table is defined in Antrea pipeline
2. Use openflow.Table in the ofClient under pkg/agent/openflow directly, and remove the pipeline map and FlowTables in pipeline.go
3. Remove the typed type TableIDType, and use uint8 for table id directly.

Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Sep 28, 2021
1 parent 0946ca6 commit 766b38e
Show file tree
Hide file tree
Showing 35 changed files with 814 additions and 683 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.17

require (
antrea.io/libOpenflow v0.2.0
antrea.io/ofnet v0.1.0
antrea.io/libOpenflow v0.5.2
antrea.io/ofnet v0.2.1
github.com/Mellanox/sriovnet v1.0.2
github.com/Microsoft/go-winio v0.4.16-0.20201130162521-d1ffc52c7331
github.com/Microsoft/hcsshim v0.8.9
Expand Down
9 changes: 5 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
antrea.io/libOpenflow v0.2.0 h1:bBNT3CI8q2FMQRdphP0dynImRK1LBDmA+cQOu7JULj4=
antrea.io/libOpenflow v0.2.0/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/ofnet v0.1.0 h1:r5c/TM5pa8xSVd5xEUj1L2vYfc4EjIzCWs6cHbeuVFc=
antrea.io/ofnet v0.1.0/go.mod h1:fLmHHD9XWeVza2pz/HEdLkGyA7pNutxlXCqodlwWQsA=
antrea.io/libOpenflow v0.3.1/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/libOpenflow v0.5.2 h1:EFTyAHlG6UH8ZHpiPi6QPVPETqoIk0eB2B6i88VqacM=
antrea.io/libOpenflow v0.5.2/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/ofnet v0.2.1 h1:klAQOV0r+vXzN6VBjpyRcQBU3v6QHwwnXreDpIB18jA=
antrea.io/ofnet v0.2.1/go.mod h1:wtADZGRIRApPAv5BAHY8l8KLLnAY3LKD2p33zTTB5Gs=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/apiserver/handlers/ovsflows/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ func dumpMatchedFlows(aq agentquerier.AgentQuerier, flowKeys []string) ([]Respon
return resps, nil
}

func dumpFlows(aq agentquerier.AgentQuerier, table binding.TableIDType) ([]Response, error) {
func dumpFlows(aq agentquerier.AgentQuerier, table uint8) ([]Response, error) {
resps := []Response{}
var flowStrs []string
var err error
if table != binding.TableIDAll {
flowStrs, err = aq.GetOVSCtlClient().DumpTableFlows(uint8(table))
flowStrs, err = aq.GetOVSCtlClient().DumpTableFlows(table)
} else {
flowStrs, err = aq.GetOVSCtlClient().DumpFlows()
}
Expand Down Expand Up @@ -89,16 +89,16 @@ func getTableFlows(aq agentquerier.AgentQuerier, tables string) ([]Response, err
var resps []Response
for _, tableSeg := range strings.Split(tables, ",") {
tableSeg = strings.TrimSpace(tableSeg)
var tableNumber binding.TableIDType
var tableNumber uint8
// Table nubmer is a 8-bit unsigned integer.
n, err := strconv.ParseUint(tableSeg, 10, 8)
if err == nil {
tableNumber = binding.TableIDType(n)
tableNumber = uint8(n)
if openflow.GetFlowTableName(tableNumber) == "" {
return nil, nil
}
} else {
tableNumber = openflow.GetFlowTableNumber(tableSeg)
tableNumber = openflow.GetFlowTableID(tableSeg)
if tableNumber == binding.TableIDAll {
return nil, nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/apiserver/handlers/ovsflows/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (

"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/openflow"
oftest "antrea.io/antrea/pkg/agent/openflow/testing"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
agentquerier "antrea.io/antrea/pkg/agent/querier"
aqtest "antrea.io/antrea/pkg/agent/querier/testing"
cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing"
"antrea.io/antrea/pkg/querier"
queriertest "antrea.io/antrea/pkg/querier/testing"
Expand Down Expand Up @@ -131,6 +133,11 @@ func TestServiceFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

// Create openflow.Client to ensure the OVS tables are added into the cache.
bridgeName := "testbr"
bridgeMgmtAddr := binding.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName)
openflow.NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, true)

testcases := []testCase{
{
test: "Existing Service",
Expand Down
22 changes: 11 additions & 11 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,19 @@ func getMatchRegField(matchers *ofctrl.Matchers, field *binding.RegField) *ofctr

// getMatch receives ofctrl matchers and table id, match field.
// Modifies match field to Ingress/Egress register based on tableID.
func getMatch(matchers *ofctrl.Matchers, tableID binding.TableIDType, disposition uint32) *ofctrl.MatchField {
func getMatch(matchers *ofctrl.Matchers, tableID uint8, disposition uint32) *ofctrl.MatchField {
// Get match from CNPDenyConjIDReg if disposition is not allow.
if disposition != openflow.DispositionAllow {
return getMatchRegField(matchers, openflow.CNPDenyConjIDField)
}
// Get match from ingress/egress reg if disposition is allow
for _, table := range append(openflow.GetAntreaPolicyEgressTables(), openflow.EgressRuleTable) {
if tableID == table {
if tableID == table.GetID() {
return getMatchRegField(matchers, openflow.TFEgressConjIDField)
}
}
for _, table := range append(openflow.GetAntreaPolicyIngressTables(), openflow.IngressRuleTable) {
if tableID == table {
if tableID == table.GetID() {
return getMatchRegField(matchers, openflow.TFIngressConjIDField)
}
}
Expand All @@ -130,7 +130,7 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, c *Controller, ob *logInfo) er
matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table name
tableID := binding.TableIDType(pktIn.TableId)
tableID := pktIn.TableId
ob.tableName = openflow.GetFlowTableName(tableID)

// Get disposition Allow or Drop
Expand Down Expand Up @@ -326,7 +326,7 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table ID
tableID := binding.TableIDType(pktIn.TableId)
tableID := pktIn.TableId
// Get disposition Allow, Drop or Reject
match = getMatchRegField(matchers, openflow.APDispositionField)
id, err := getInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange())
Expand Down Expand Up @@ -363,10 +363,10 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
}
} else {
// For K8s NetworkPolicy implicit drop action, we cannot get name/namespace.
if tableID == openflow.IngressDefaultTable {
if tableID == openflow.IngressDefaultTable.GetID() {
denyConn.IngressNetworkPolicyType = registry.PolicyTypeK8sNetworkPolicy
denyConn.IngressNetworkPolicyRuleAction = flowexporter.RuleActionToUint8(disposition)
} else if tableID == openflow.EgressDefaultTable {
} else if tableID == openflow.EgressDefaultTable.GetID() {
denyConn.EgressNetworkPolicyType = registry.PolicyTypeK8sNetworkPolicy
denyConn.EgressNetworkPolicyRuleAction = flowexporter.RuleActionToUint8(disposition)
}
Expand All @@ -375,18 +375,18 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
return nil
}

func isAntreaPolicyIngressTable(tableID binding.TableIDType) bool {
func isAntreaPolicyIngressTable(tableID uint8) bool {
for _, table := range openflow.GetAntreaPolicyIngressTables() {
if table == tableID {
if table.GetID() == tableID {
return true
}
}
return false
}

func isAntreaPolicyEgressTable(tableID binding.TableIDType) bool {
func isAntreaPolicyEgressTable(tableID uint8) bool {
for _, table := range openflow.GetAntreaPolicyEgressTables() {
if table == tableID {
if table.GetID() == tableID {
return true
}
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ type reconciler struct {
idAllocator *idAllocator

// priorityAssigners provides interfaces to manage OF priorities for each OVS table.
priorityAssigners map[binding.TableIDType]*tablePriorityAssigner
priorityAssigners map[uint8]*tablePriorityAssigner
// ipv4Enabled tells if IPv4 is supported on this Node or not.
ipv4Enabled bool
// ipv6Enabled tells is IPv6 is supported on this Node or not.
Expand All @@ -208,14 +208,14 @@ func newReconciler(ofClient openflow.Client,
idAllocator *idAllocator,
fqdnController *fqdnController,
) *reconciler {
priorityAssigners := map[binding.TableIDType]*tablePriorityAssigner{}
priorityAssigners := map[uint8]*tablePriorityAssigner{}
for _, table := range openflow.GetAntreaPolicyBaselineTierTables() {
priorityAssigners[table] = &tablePriorityAssigner{
priorityAssigners[table.GetID()] = &tablePriorityAssigner{
assigner: newPriorityAssigner(true),
}
}
for _, table := range openflow.GetAntreaPolicyMultiTierTables() {
priorityAssigners[table] = &tablePriorityAssigner{
priorityAssigners[table.GetID()] = &tablePriorityAssigner{
assigner: newPriorityAssigner(false),
}
}
Expand Down Expand Up @@ -279,28 +279,28 @@ func (r *reconciler) Reconcile(rule *CompletedRule) error {
// getOFRuleTable retreives the OpenFlow table to install the CompletedRule.
// The decision is made based on whether the rule is created for a CNP/ANP, and
// the Tier of that NetworkPolicy.
func (r *reconciler) getOFRuleTable(rule *CompletedRule) binding.TableIDType {
func (r *reconciler) getOFRuleTable(rule *CompletedRule) uint8 {
if !rule.isAntreaNetworkPolicyRule() {
if rule.Direction == v1beta2.DirectionIn {
return openflow.IngressRuleTable
return openflow.IngressRuleTable.GetID()
}
return openflow.EgressRuleTable
return openflow.EgressRuleTable.GetID()
}
var ruleTables []binding.TableIDType
var ruleTables []binding.Table
if rule.Direction == v1beta2.DirectionIn {
ruleTables = openflow.GetAntreaPolicyIngressTables()
} else {
ruleTables = openflow.GetAntreaPolicyEgressTables()
}
if *rule.TierPriority != baselineTierPriority {
return ruleTables[0]
return ruleTables[0].GetID()
}
return ruleTables[1]
return ruleTables[1].GetID()
}

// getOFPriority retrieves the OFPriority for the input CompletedRule to be installed,
// and re-arranges installed priorities on OVS if necessary.
func (r *reconciler) getOFPriority(rule *CompletedRule, table binding.TableIDType, pa *tablePriorityAssigner) (*uint16, bool, error) {
func (r *reconciler) getOFPriority(rule *CompletedRule, tableID uint8, pa *tablePriorityAssigner) (*uint16, bool, error) {
if !rule.isAntreaNetworkPolicyRule() {
klog.V(2).Infof("Assigning default priority for k8s NetworkPolicy.")
return nil, true, nil
Expand All @@ -326,7 +326,7 @@ func (r *reconciler) getOFPriority(rule *CompletedRule, table binding.TableIDTyp
}
// Re-assign installed priorities on OVS
if len(priorityUpdates) > 0 {
err := r.ofClient.ReassignFlowPriorities(priorityUpdates, table)
err := r.ofClient.ReassignFlowPriorities(priorityUpdates, tableID)
if err != nil {
revertFunc()
return nil, registered, err
Expand All @@ -344,7 +344,7 @@ func (r *reconciler) getOFPriority(rule *CompletedRule, table binding.TableIDTyp
func (r *reconciler) BatchReconcile(rules []*CompletedRule) error {
var rulesToInstall []*CompletedRule
var priorities []*uint16
prioritiesByTable := map[binding.TableIDType][]*uint16{}
prioritiesByTable := map[uint8][]*uint16{}
for _, rule := range rules {
if _, exists := r.lastRealizeds.Load(rule.ID); exists {
klog.Errorf("rule %s already realized during the initialization phase", rule.ID)
Expand Down Expand Up @@ -382,7 +382,7 @@ func (r *reconciler) BatchReconcile(rules []*CompletedRule) error {
// registerOFPriorities constructs a Priority type for each CompletedRule in the input list,
// and registers those Priorities with appropriate tablePriorityAssigner based on Tier.
func (r *reconciler) registerOFPriorities(rules []*CompletedRule) error {
prioritiesToRegister := map[binding.TableIDType][]types.Priority{}
prioritiesToRegister := map[uint8][]types.Priority{}
for _, rule := range rules {
if rule.isAntreaNetworkPolicyRule() {
ruleTable := r.getOFRuleTable(rule)
Expand All @@ -403,7 +403,7 @@ func (r *reconciler) registerOFPriorities(rules []*CompletedRule) error {
}

// add converts CompletedRule to PolicyRule(s) and invokes installOFRule to install them.
func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16, table binding.TableIDType) error {
func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16, table uint8) error {
klog.V(2).Infof("Adding new rule %v", rule)
ofRuleByServicesMap, lastRealized := r.computeOFRulesForAdd(rule, ofPriority, table)
for svcKey, ofRule := range ofRuleByServicesMap {
Expand All @@ -424,7 +424,7 @@ func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16, table binding.
return nil
}

func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint16, table binding.TableIDType) (
func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint16, table uint8) (
map[servicesKey]*types.PolicyRule, *lastRealized) {
lastRealized := newLastRealized(rule)
// TODO: Handle the case that the following processing fails or partially succeeds.
Expand Down Expand Up @@ -570,7 +570,7 @@ func (r *reconciler) batchAdd(rules []*CompletedRule, ofPriorities []*uint16) er

// update calculates the difference of Addresses between oldRule and newRule,
// and invokes Openflow client's methods to reconcile them.
func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule, ofPriority *uint16, table binding.TableIDType) error {
func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule, ofPriority *uint16, table uint8) error {
klog.V(2).Infof("Updating existing rule %v", newRule)
// staleOFIDs tracks servicesKey that are no long needed.
// Firstly fill it with the last realized ofIDs.
Expand Down Expand Up @@ -758,7 +758,7 @@ func (r *reconciler) updateOFRule(ofID uint32, addedFrom []types.Address, addedT
return nil
}

func (r *reconciler) uninstallOFRule(ofID uint32, table binding.TableIDType) error {
func (r *reconciler) uninstallOFRule(ofID uint32, table uint8) error {
klog.V(2).Infof("Uninstalling ofRule %d", ofID)
stalePriorities, err := r.ofClient.UninstallPolicyRuleFlows(ofID)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/controller/networkpolicy/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,23 +611,23 @@ func TestReconcileWithTransientError(t *testing.T) {
To: ipsToOFAddresses(sets.NewString("1.1.1.1")),
Service: []v1beta2.Service{serviceTCP80, serviceTCP8080},
PolicyRef: &np1,
TableID: openflow.EgressRuleTable,
TableID: openflow.EgressRuleTable.GetID(),
},
{
Direction: v1beta2.DirectionOut,
From: ipsToOFAddresses(sets.NewString("2.2.2.2")),
To: ipsToOFAddresses(sets.NewString("1.1.1.2")),
Service: []v1beta2.Service{serviceTCP443, serviceTCP8080},
PolicyRef: &np1,
TableID: openflow.EgressRuleTable,
TableID: openflow.EgressRuleTable.GetID(),
},
{
Direction: v1beta2.DirectionOut,
From: ipsToOFAddresses(sets.NewString("2.2.2.2")),
To: append(ipsToOFAddresses(sets.NewString("1.1.1.3")), openflow.NewIPNetAddress(ipNet)),
Service: []v1beta2.Service{serviceTCP8080},
PolicyRef: &np1,
TableID: openflow.EgressRuleTable,
TableID: openflow.EgressRuleTable.GetID(),
},
}
for _, policyRule := range policyRules {
Expand Down
32 changes: 20 additions & 12 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
}

// Get drop table.
if tableID == uint8(openflow.EgressMetricTable) || tableID == uint8(openflow.IngressMetricTable) {
ob := getNetworkPolicyObservation(tableID, tableID == uint8(openflow.IngressMetricTable))
if tableID == openflow.EgressMetricTable.GetID() || tableID == openflow.IngressMetricTable.GetID() {
ob := getNetworkPolicyObservation(tableID, tableID == openflow.IngressMetricTable.GetID())
if match := getMatchRegField(matchers, openflow.CNPDenyConjIDField); match != nil {
notAllowConjInfo, err := getRegValue(match, nil)
if err != nil {
Expand All @@ -222,13 +222,13 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
}
}
obs = append(obs, *ob)
} else if tableID == uint8(openflow.EgressDefaultTable) || tableID == uint8(openflow.IngressDefaultTable) {
ob := getNetworkPolicyObservation(tableID, tableID == uint8(openflow.IngressDefaultTable))
} else if tableID == openflow.EgressDefaultTable.GetID() || tableID == openflow.IngressDefaultTable.GetID() {
ob := getNetworkPolicyObservation(tableID, tableID == openflow.IngressDefaultTable.GetID())
obs = append(obs, *ob)
}

// Get output table.
if tableID == uint8(openflow.L2ForwardingOutTable) {
if tableID == openflow.L2ForwardingOutTable.GetID() {
ob := new(crdv1alpha1.Observation)
tunnelDstIP := ""
isIPv6 := c.nodeConfig.NodeIPv6Addr != nil
Expand Down Expand Up @@ -262,7 +262,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
// Output port is Pod port, packet is delivered.
ob.Action = crdv1alpha1.ActionDelivered
}
ob.ComponentInfo = openflow.GetFlowTableName(binding.TableIDType(tableID))
ob.ComponentInfo = openflow.L2ForwardingOutTable.GetName()
ob.Component = crdv1alpha1.ComponentForwarding
obs = append(obs, *ob)
}
Expand Down Expand Up @@ -340,22 +340,30 @@ func getNetworkPolicyObservation(tableID uint8, ingress bool) *crdv1alpha1.Obser
ob.Component = crdv1alpha1.ComponentNetworkPolicy
if ingress {
switch tableID {
case uint8(openflow.IngressMetricTable), uint8(openflow.IngressDefaultTable):
case openflow.IngressMetricTable.GetID():
// Packet dropped by ANP/default drop rule
ob.ComponentInfo = openflow.GetFlowTableName(binding.TableIDType(tableID))
ob.ComponentInfo = openflow.IngressMetricTable.GetName()
ob.Action = crdv1alpha1.ActionDropped
case openflow.IngressDefaultTable.GetID():
// Packet dropped by ANP/default drop rule
ob.ComponentInfo = openflow.IngressDefaultTable.GetName()
ob.Action = crdv1alpha1.ActionDropped
default:
ob.ComponentInfo = openflow.GetFlowTableName(openflow.IngressRuleTable)
ob.ComponentInfo = openflow.IngressRuleTable.GetName()
ob.Action = crdv1alpha1.ActionForwarded
}
} else {
switch tableID {
case uint8(openflow.EgressMetricTable), uint8(openflow.EgressDefaultTable):
case openflow.EgressMetricTable.GetID():
// Packet dropped by ANP/default drop rule
ob.ComponentInfo = openflow.EgressMetricTable.GetName()
ob.Action = crdv1alpha1.ActionDropped
case openflow.EgressDefaultTable.GetID():
// Packet dropped by ANP/default drop rule
ob.ComponentInfo = openflow.GetFlowTableName(binding.TableIDType(tableID))
ob.ComponentInfo = openflow.EgressDefaultTable.GetName()
ob.Action = crdv1alpha1.ActionDropped
default:
ob.ComponentInfo = openflow.GetFlowTableName(openflow.EgressRuleTable)
ob.ComponentInfo = openflow.EgressRuleTable.GetName()
ob.Action = crdv1alpha1.ActionForwarded
}
}
Expand Down
Loading

0 comments on commit 766b38e

Please sign in to comment.