diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index c76aadddab6..80061a9991b 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -77,6 +77,19 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM return nil } +// ForAllConnectionsDoWithoutLock execute the callback for each connection in connection +// map, without grabbing the lock. Caller is expected to grab lock. +func (cs *connectionStore) ForAllConnectionsDoWithoutLock(callback flowexporter.ConnectionMapCallBack) error { + for k, v := range cs.connections { + err := callback(k, v) + if err != nil { + klog.Errorf("Callback execution failed for flow with key: %v, conn: %v, k, v: %v", k, v, err) + return err + } + } + return nil +} + // AddConnToMap adds the connection to connections map given connection key. // This is used only for unit tests. func (cs *connectionStore) AddConnToMap(connKey *flowexporter.ConnectionKey, conn *flowexporter.Connection) { diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index a573057cb6e..dc307f78478 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -116,7 +116,10 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) { return nil } - if err := cs.ForAllConnectionsDo(deleteIfStaleOrResetConn); err != nil { + // Hold the lock until verify whether the connection exist in conntrack table. + cs.AcquireConnStoreLock() + + if err := cs.ForAllConnectionsDoWithoutLock(deleteIfStaleOrResetConn); err != nil { return []int{}, err } @@ -149,6 +152,9 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) { } connsLens = append(connsLens, len(filteredConnsList)) } + + cs.ReleaseConnStoreLock() + metrics.TotalConnectionsInConnTrackTable.Set(float64(totalConns)) maxConns, err := cs.connDumper.GetMaxConnections() if err != nil { @@ -203,13 +209,12 @@ func (cs *ConntrackConnectionStore) addNetworkPolicyMetadata(conn *flowexporter. // AddOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc., // or adds a new connection with the resolved K8s metadata. func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection) { + conn.IsPresent = true connKey := flowexporter.NewConnectionKey(conn) - cs.mutex.Lock() - defer cs.mutex.Unlock() existingConn, exists := cs.connections[connKey] if exists { - existingConn.IsPresent = true + existingConn.IsPresent = conn.IsPresent if flowexporter.IsConnectionDying(existingConn) { return } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 206e224e55e..95c82be976b 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -126,6 +126,7 @@ const ( nginxLBService = "nginx-loadbalancer" + connectionStorePollingInterval = 1 * time.Second exporterActiveFlowExportTimeout = 2 * time.Second exporterIdleFlowExportTimeout = 1 * time.Second aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond @@ -694,7 +695,7 @@ func (data *TestData) enableAntreaFlowExporter(ipfixCollector string) error { // Enable flow exporter feature and add related config params to antrea agent configmap. ac := func(config *agentconfig.AgentConfig) { config.FeatureGates["FlowExporter"] = true - config.FlowPollInterval = "1s" + config.FlowPollInterval = connectionStorePollingInterval.String() config.ActiveFlowExportTimeout = exporterActiveFlowExportTimeout.String() config.IdleFlowExportTimeout = exporterIdleFlowExportTimeout.String() if ipfixCollector != "" {