Skip to content

Commit

Permalink
Interface name wasn't populated yet so add func to find ifname
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Jan 10, 2024
1 parent cd878fd commit 58c5255
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 21 deletions.
14 changes: 10 additions & 4 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
out["Packets"] = flow.Packets
}
var interfaces []interface{}
var directions []interface{}
for _, entry := range flow.GetDupList() {
out["Interfaces"] = append([]interface{}{entry.Interface}, interfaces...)
out["FlowDirections"] = append([]interface{}{int(entry.Direction.Number())}, directions...)
var flowDirections []interface{}

if len(flow.GetDupList()) != 0 {
for _, entry := range flow.GetDupList() {
interfaces = append(interfaces, entry.Interface)
flowDirections = append(flowDirections, entry.Direction)
}
out["Interfaces"] = interfaces
out["FlowDirections"] = flowDirections
}

ethType := ethernet.EtherType(flow.EthProtocol)
if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 {
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
Expand Down
3 changes: 2 additions & 1 deletion pkg/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestPBFlowToMap(t *testing.T) {
delete(out, "TimeReceived")
assert.Equal(t, config.GenericMap{
"FlowDirection": 1,
"FlowDirections": []interface{}{1},
"FlowDirections": []interface{}{pbflow.Direction(1)},
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
Expand Down
32 changes: 20 additions & 12 deletions pkg/exporter/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,16 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
}
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: fr.Interface,
Direction: pbflow.Direction(m[fr.Interface]),
})
if len(fr.DupList) != 0 {
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
for key, value := range m {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: key,
Direction: pbflow.Direction(value),
})
}
}
}
return &pbflowRecord
}
Expand Down Expand Up @@ -142,12 +146,16 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
}
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: fr.Interface,
Direction: pbflow.Direction(m[fr.Interface]),
})
if len(fr.DupList) != 0 {
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
for key, value := range m {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: key,
Direction: pbflow.Direction(value),
})
}
}
}
return &pbflowRecord
}
Expand Down
31 changes: 27 additions & 4 deletions pkg/flow/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package flow

import (
"container/list"
"reflect"
"time"

"github.com/sirupsen/logrus"

"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
)

var dlog = logrus.WithField("component", "flow/Deduper")
Expand Down Expand Up @@ -89,8 +91,17 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
*fwd = append(*fwd, r)
}
if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
ifName := utils.GetInterfaceName(r.Id.IfIndex)
mergeEntry[ifName] = r.Id.Direction
if dupEntryNew(*fEntry.dupList, mergeEntry) {
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
dlog.Debugf("merge list entries dump:")
for _, entry := range *fEntry.dupList {
for k, v := range entry {
dlog.Debugf("interface %s dir %d", k, v)
}
}
}
}
return
}
Expand All @@ -106,22 +117,34 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
expiryTime: timeNow().Add(c.expire),
}
if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction
ifName := utils.GetInterfaceName(r.Id.IfIndex)
mergeEntry[ifName] = r.Id.Direction
r.DupList = append(r.DupList, mergeEntry)
e.dupList = &r.DupList
}
c.ifaces[rk] = c.entries.PushFront(&e)
*fwd = append(*fwd, r)
}

func dupEntryNew(dupList []map[string]uint8, mergeEntry map[string]uint8) bool {
for _, entry := range dupList {
if reflect.DeepEqual(entry, mergeEntry) {
return false
}
}
return true
}

func (c *deduperCache) removeExpired() {
now := timeNow()
ele := c.entries.Back()
evicted := 0
for ele != nil && now.After(ele.Value.(*entry).expiryTime) {
evicted++
c.entries.Remove(ele)
delete(c.ifaces, *ele.Value.(*entry).key)
fEntry := ele.Value.(*entry)
fEntry.dupList = nil
delete(c.ifaces, *fEntry.key)
ele = c.entries.Back()
}
if evicted > 0 {
Expand Down
14 changes: 14 additions & 0 deletions pkg/flow/deduper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
)

var (
Expand Down Expand Up @@ -133,6 +134,19 @@ func TestDedupeMerge(t *testing.T) {
deduped := receiveTimeout(t, output)
assert.Equal(t, []*Record{oneIf2}, deduped)
assert.Equal(t, 2, len(oneIf2.DupList))

expectedMap := []map[string]uint8{
{
utils.GetInterfaceName(oneIf2.Id.IfIndex): oneIf2.Id.Direction,
},
{
utils.GetInterfaceName(oneIf1.Id.IfIndex): oneIf1.Id.Direction,
},
}

for k, v := range oneIf2.DupList {
assert.Equal(t, expectedMap[k], v)
}
}

type timerMock struct {
Expand Down
8 changes: 8 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,11 @@ func utsnameStr[T int8 | uint8](in []T) string {
}
return string(out)
}

func GetInterfaceName(ifIndex uint32) string {
iface, err := net.InterfaceByIndex(int(ifIndex))
if err != nil {
return ""
}
return iface.Name
}

0 comments on commit 58c5255

Please sign in to comment.