Skip to content

Commit

Permalink
conntrack: Add _IsFirst field (#328)
Browse files Browse the repository at this point in the history
* Add isFirst field to connections

* Fix tests

* Rename variable "test" to avoid collision with imported package name

* Add test for short and long connections with isFirst

* Extend buildMockConnTrackConfig() to accept updateConnectionInterval & endConnectionTimeout

* Update README.md
  • Loading branch information
ronensc authored Oct 24, 2022
1 parent 15efbb8 commit 45e8801
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 54 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ A possible output would look like:
{
"_RecordType": "endConnection",
"_HashId": "3e8ba98164baecaf",
"_IsFirst": true,
"SrcAddr": "10.0.0.1",
"SrcPort": 1234,
"DstAddr": "10.0.0.2",
Expand Down Expand Up @@ -648,6 +649,11 @@ Output fields that set `splitAB: true` (like in `Bytes`) are split into 2 fields
aggregate values separately based on direction A->B and B->A respectively.
When `splitAB` is absent, its default value is `false`.
The boolean field `_IsFirst` exists only in records of type `newConnection`, `updateConnection` and `endConnection`.
It is set to true only on the first record of the connection.
The `_IsFirst` fields is useful in cases where `newConnection` records are not outputted (to reduce the number output records)
and there is a need to count the total number of connections: simply counting `_IsFirst=true`
### Timebased TopK
It is sometimes desirable to return only a subset of records, such as those connections that use the most bandwidth.
Expand Down
1 change: 1 addition & 0 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
HashIdFieldName = "_HashId"
RecordTypeFieldName = "_RecordType"
IsFirstFieldName = "_IsFirst"
)

type ConnTrack struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/conntrack_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestConnTrack(t *testing.T) {
"TimeFlowStart": 1_637_501_079.0,
"_HashId": "d28db42bcd8aea8f",
"_RecordType": "endConnection",
"_IsFirst": false,
"numFlowLogs": 5.0,
}
// Wait for the record to be eventually forwarded to the writer
Expand Down
16 changes: 14 additions & 2 deletions pkg/pipeline/extract/conntrack/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type connection interface {
getNextUpdateReportTime() time.Time
toGenericMap() config.GenericMap
getHash() totalHashType
// markReported marks the connection as has been reported. That is, at least one connection record has been emitted
// for this connection (i.e. newConnection, updateConnection, endConnection).
// It returns true on the first invocation to indicate the first report. Otherwise, it returns false.
markReported() bool
}

type connType struct {
Expand All @@ -44,6 +48,7 @@ type connType struct {
aggFields map[string]float64
expiryTime time.Time
nextUpdateReportTime time.Time
isReported bool
}

func (c *connType) addAgg(fieldName string, initValue float64) {
Expand Down Expand Up @@ -95,15 +100,22 @@ func (c *connType) getHash() totalHashType {
return c.hash
}

func (c *connType) markReported() bool {
isFirst := !c.isReported
c.isReported = true
return isFirst
}

type connBuilder struct {
conn *connType
}

func NewConnBuilder() *connBuilder {
return &connBuilder{
conn: &connType{
aggFields: make(map[string]float64),
keys: config.GenericMap{},
aggFields: make(map[string]float64),
keys: config.GenericMap{},
isReported: false,
},
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
record := conn.toGenericMap()
addHashField(record, computedHash.hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection"))
isFirst := conn.markReported()
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
ct.metrics.outputRecords.WithLabelValues("newConnection").Inc()
}
Expand Down Expand Up @@ -123,6 +125,11 @@ func (ct *conntrackImpl) popEndConnections() []config.GenericMap {
record := conn.toGenericMap()
addHashField(record, conn.getHash().hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("EndConnection"))
var isFirst bool
if ct.shouldOutputEndConnection {
isFirst = conn.markReported()
}
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
shouldDelete, shouldStop = true, false
} else {
Expand All @@ -144,6 +151,11 @@ func (ct *conntrackImpl) prepareUpdateConnectionRecords() []config.GenericMap {
record := conn.toGenericMap()
addHashField(record, conn.getHash().hashTotal)
addTypeField(record, api.ConnTrackOutputRecordTypeName("UpdateConnection"))
var isFirst bool
if ct.shouldOutputUpdateConnection {
isFirst = conn.markReported()
}
addIsFirstField(record, isFirst)
outputRecords = append(outputRecords, record)
newNextUpdate := ct.clock.Now().Add(ct.config.UpdateConnectionInterval.Duration)
ct.connStore.updateNextReportTime(conn.getHash().hashTotal, newNextUpdate)
Expand Down Expand Up @@ -236,3 +248,7 @@ func addHashField(record config.GenericMap, hashId uint64) {
func addTypeField(record config.GenericMap, recordType string) {
record[api.RecordTypeFieldName] = recordType
}

func addIsFirstField(record config.GenericMap, isFirst bool) {
record[api.IsFirstFieldName] = isFirst
}
Loading

0 comments on commit 45e8801

Please sign in to comment.