Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in ConntrackConnectionStore and FlowExporter #3655

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,20 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
for k, v := range cs.connections {
err := callback(k, v)
if err != nil {
klog.Errorf("Callback execution failed for flow with key: %v, conn: %v, k, v: %v", k, v, err)
klog.ErrorS(err, "Callback execution failed for flow", "key", k, "conn", v)
return err
}
}
return nil
}

// ForAllConnectionsDoWithoutLock execute the callback for each connection in connection
// map, without grabbing the lock. Caller is expected to grab lock.
func (cs *connectionStore) ForAllConnectionsDoWithoutLock(callback flowexporter.ConnectionMapCallBack) error {
for k, v := range cs.connections {
err := callback(k, v)
if err != nil {
klog.ErrorS(err, "Callback execution failed for flow", "key", k, "conn", v)
return err
}
}
Expand Down
73 changes: 42 additions & 31 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,6 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) {
// TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export.
func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
klog.V(2).Infof("Polling conntrack")
// Reset IsPresent flag for all connections in connection map before dumping
// flows in conntrack module. If the connection does not exist in conntrack
// table and has been exported, then we will delete it from connection map.
// In addition, if the connection was not exported for a specific time period,
// then we consider it to be stale and delete it.
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if !conn.IsPresent {
// Delete the connection if it is ready to delete or it was not exported
// in the time period as specified by the stale connection timeout.
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
if err := cs.deleteConnWithoutLock(key); err != nil {
return err
}
}
} else {
conn.IsPresent = false
}
return nil
}

if err := cs.ForAllConnectionsDo(deleteIfStaleOrResetConn); err != nil {
return []int{}, err
}

var zones []uint16
var connsLens []int
Expand All @@ -137,18 +114,53 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
}
}
var totalConns int
var filteredConnsList []*flowexporter.Connection
for _, zone := range zones {
filteredConnsList, totalConnsPerZone, err := cs.connDumper.DumpFlows(zone)
filteredConnsListPerZone, totalConnsPerZone, err := cs.connDumper.DumpFlows(zone)
if err != nil {
return []int{}, err
}
totalConns += totalConnsPerZone
// Update only the Connection store. IPFIX records are generated based on Connection store.
for _, conn := range filteredConnsList {
cs.AddOrUpdateConn(conn)
}
filteredConnsList = append(filteredConnsList, filteredConnsListPerZone...)
connsLens = append(connsLens, len(filteredConnsList))
}

// Reset IsPresent flag for all connections in connection map before updating
// the dumped flows information in connection map. If the connection does not
// exist in conntrack table and has been exported, then we will delete it from
// connection map. In addition, if the connection was not exported for a specific
// time period, then we consider it to be stale and delete it.
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if !conn.IsPresent {
// Delete the connection if it is ready to delete or it was not exported
// in the time period as specified by the stale connection timeout.
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
if err := cs.deleteConnWithoutLock(key); err != nil {
return err
}
}
} else {
conn.IsPresent = false
}
return nil
}

// Hold the lock until we verify whether the connection exist in conntrack table,
// and finish updating the connection store.
cs.AcquireConnStoreLock()

if err := cs.ForAllConnectionsDoWithoutLock(deleteIfStaleOrResetConn); err != nil {
cs.ReleaseConnStoreLock()
return []int{}, err
heanlan marked this conversation as resolved.
Show resolved Hide resolved
}

// Update only the Connection store. IPFIX records are generated based on Connection store.
for _, conn := range filteredConnsList {
cs.AddOrUpdateConn(conn)
}

cs.ReleaseConnStoreLock()

metrics.TotalConnectionsInConnTrackTable.Set(float64(totalConns))
maxConns, err := cs.connDumper.GetMaxConnections()
if err != nil {
Expand Down Expand Up @@ -203,13 +215,12 @@ func (cs *ConntrackConnectionStore) addNetworkPolicyMetadata(conn *flowexporter.
// AddOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc.,
// or adds a new connection with the resolved K8s metadata.
func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection) {
conn.IsPresent = true
connKey := flowexporter.NewConnectionKey(conn)
cs.mutex.Lock()
defer cs.mutex.Unlock()

existingConn, exists := cs.connections[connKey]
if exists {
existingConn.IsPresent = true
existingConn.IsPresent = conn.IsPresent
if flowexporter.IsConnectionDying(existingConn) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
FlowKey: tuple1,
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
Mark: openflow.ServiceCTMark.GetValue(),
IsPresent: true,
IsActive: true,
DestinationPodName: "pod1",
DestinationPodNamespace: "ns1",
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ const (

nginxLBService = "nginx-loadbalancer"

exporterFlowPollInterval = 1 * time.Second
exporterActiveFlowExportTimeout = 2 * time.Second
exporterIdleFlowExportTimeout = 1 * time.Second
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
Expand Down Expand Up @@ -694,7 +695,7 @@ func (data *TestData) enableAntreaFlowExporter(ipfixCollector string) error {
// Enable flow exporter feature and add related config params to antrea agent configmap.
ac := func(config *agentconfig.AgentConfig) {
config.FeatureGates["FlowExporter"] = true
config.FlowPollInterval = "1s"
config.FlowPollInterval = exporterFlowPollInterval.String()
config.ActiveFlowExportTimeout = exporterActiveFlowExportTimeout.String()
config.IdleFlowExportTimeout = exporterIdleFlowExportTimeout.String()
if ipfixCollector != "" {
Expand Down