Skip to content

Commit

Permalink
Fix race condition in ConntrackConnectionStore and FlowExporter
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <[email protected]>
  • Loading branch information
heanlan committed Apr 18, 2022
1 parent bd82ef6 commit e00f8c1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
13 changes: 13 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
return nil
}

// ForAllConnectionsDoWithoutLock execute the callback for each connection in connection
// map, without grabbing the lock. Caller is expected to grab lock.
func (cs *connectionStore) ForAllConnectionsDoWithoutLock(callback flowexporter.ConnectionMapCallBack) error {
for k, v := range cs.connections {
err := callback(k, v)
if err != nil {
klog.Errorf("Callback execution failed for flow with key: %v, conn: %v, k, v: %v", k, v, err)
return err
}
}
return nil
}

// AddConnToMap adds the connection to connections map given connection key.
// This is used only for unit tests.
func (cs *connectionStore) AddConnToMap(connKey *flowexporter.ConnectionKey, conn *flowexporter.Connection) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
return nil
}

if err := cs.ForAllConnectionsDo(deleteIfStaleOrResetConn); err != nil {
// Hold the lock until verify whether the connection exist in conntrack table.
cs.AcquireConnStoreLock()

if err := cs.ForAllConnectionsDoWithoutLock(deleteIfStaleOrResetConn); err != nil {
return []int{}, err
}

Expand Down Expand Up @@ -149,6 +152,9 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
}
connsLens = append(connsLens, len(filteredConnsList))
}

cs.ReleaseConnStoreLock()

metrics.TotalConnectionsInConnTrackTable.Set(float64(totalConns))
maxConns, err := cs.connDumper.GetMaxConnections()
if err != nil {
Expand Down Expand Up @@ -203,13 +209,12 @@ func (cs *ConntrackConnectionStore) addNetworkPolicyMetadata(conn *flowexporter.
// AddOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc.,
// or adds a new connection with the resolved K8s metadata.
func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection) {
conn.IsPresent = true
connKey := flowexporter.NewConnectionKey(conn)
cs.mutex.Lock()
defer cs.mutex.Unlock()

existingConn, exists := cs.connections[connKey]
if exists {
existingConn.IsPresent = true
existingConn.IsPresent = conn.IsPresent
if flowexporter.IsConnectionDying(existingConn) {
return
}
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ const (

nginxLBService = "nginx-loadbalancer"

connectionStorePollingInterval = 1 * time.Second
exporterActiveFlowExportTimeout = 2 * time.Second
exporterIdleFlowExportTimeout = 1 * time.Second
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
Expand Down Expand Up @@ -694,7 +695,7 @@ func (data *TestData) enableAntreaFlowExporter(ipfixCollector string) error {
// Enable flow exporter feature and add related config params to antrea agent configmap.
ac := func(config *agentconfig.AgentConfig) {
config.FeatureGates["FlowExporter"] = true
config.FlowPollInterval = "1s"
config.FlowPollInterval = connectionStorePollingInterval.String()
config.ActiveFlowExportTimeout = exporterActiveFlowExportTimeout.String()
config.IdleFlowExportTimeout = exporterIdleFlowExportTimeout.String()
if ipfixCollector != "" {
Expand Down

0 comments on commit e00f8c1

Please sign in to comment.