Skip to content

Commit

Permalink
WIP: refactor network events to return list of struct instead of strings
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Nov 13, 2024
1 parent d5792b0 commit dadc2d4
Show file tree
Hide file tree
Showing 83 changed files with 8,971 additions and 2,019 deletions.
16 changes: 10 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/containernetworking/cni v1.1.2 // indirect
github.com/containernetworking/plugins v1.2.0 // indirect
github.com/coreos/go-iptables v0.6.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
Expand Down Expand Up @@ -109,16 +109,17 @@ require (
github.com/safchain/ethtool v0.3.1-0.20231027162144-83e5e0097c91 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/urfave/cli/v2 v2.2.0 // indirect
github.com/urfave/cli/v2 v2.27.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.25.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/sdk v1.29.0 // indirect
Expand All @@ -141,13 +142,16 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/component-base v0.30.2 // indirect
k8s.io/component-base v0.31.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
lukechampine.com/uint128 v1.2.0 // indirect
sigs.k8s.io/controller-runtime v0.18.4 // indirect
sigs.k8s.io/controller-runtime v0.19.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

// HACK
replace github.com/ovn-org/ovn-kubernetes/go-controller => github.com/jotak/ovn-kubernetes/go-controller v0.0.0-20241113125023-088b4a3228a8
67 changes: 46 additions & 21 deletions go.sum

Large diffs are not rendered by default.

8 changes: 1 addition & 7 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,7 @@ func RecordToMap(fr *model.Record) config.GenericMap {
}

if len(fr.NetworkMonitorEventsMD) != 0 {
var metadata []string
for _, md := range fr.NetworkMonitorEventsMD {
if md != "" {
metadata = append(metadata, md)
}
}
out["NetworkEvents"] = metadata
out["NetworkEvents"] = fr.NetworkMonitorEventsMD
}
return out
}
Expand Down
52 changes: 52 additions & 0 deletions pkg/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ func TestPBFlowToMap(t *testing.T) {
DnsFlags: 0x80,
DnsErrno: 0,
TimeFlowRtt: durationpb.New(someDuration),
NetworkEventsMetadata: []*pbflow.NetworkEvent{
{
Events: map[string]string{"Name": "test1"},
},
{
Events: map[string]string{"Feature": "NetworkPolicy"},
},
{
Events: map[string]string{"Namespace": "test-namespace"},
},
{
Events: map[string]string{"Direction": "ingress"},
},
{
Events: map[string]string{"Name": "test2"},
},
{
Events: map[string]string{"Feature": "NetworkPolicy"},
},
{
Events: map[string]string{"Namespace": "test-namespace"},
},
{
Events: map[string]string{"Direction": "egress"},
},
},
}

out := PBFlowToMap(flow)
Expand Down Expand Up @@ -103,6 +129,32 @@ func TestPBFlowToMap(t *testing.T) {
"DnsFlagsResponseCode": "NoError",
"DnsErrno": uint8(0),
"TimeFlowRttNs": someDuration.Nanoseconds(),
"NetworkEvents": []config.GenericMap{
{
"Name": "test1",
},
{
"Feature": "NetworkPolicy",
},
{
"Namespace": "test-namespace",
},
{
"Direction": "ingress",
},
{
"Name": "test2",
},
{
"Feature": "NetworkPolicy",
},
{
"Namespace": "test-namespace",
},
{
"Direction": "egress",
},
},
}, out)

}
9 changes: 5 additions & 4 deletions pkg/flow/account_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flow

import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"testing"
"time"

Expand Down Expand Up @@ -112,7 +113,7 @@ func TestEvict_MaxEntries(t *testing.T) {
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]string, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
},
k2: {
RawRecord: model.RawRecord{
Expand All @@ -124,7 +125,7 @@ func TestEvict_MaxEntries(t *testing.T) {
TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond),
DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]string, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
},
}, received)
}
Expand Down Expand Up @@ -194,7 +195,7 @@ func TestEvict_Period(t *testing.T) {
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]string, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
Expand All @@ -212,7 +213,7 @@ func TestEvict_Period(t *testing.T) {
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]string, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
}, *records[0])

// no more flows are evicted
Expand Down
3 changes: 2 additions & 1 deletion pkg/flow/deduper_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flow

import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"net"
"testing"
"time"
Expand Down Expand Up @@ -78,7 +79,7 @@ var (
DstMac: model.MacAddr{0x2}, SrcMac: model.MacAddr{0x2}, IfIndex: 2,
}, Metrics: ebpf.BpfFlowMetrics{
Packets: 2, Bytes: 456, Flags: 1, FlowRtt: 100,
}}, Interface: "123456789", NetworkMonitorEventsMD: []string{"test netpol1"}}
}}, Interface: "123456789", NetworkMonitorEventsMD: []config.GenericMap{{"Name": "test netpol1"}}}
)

func TestDedupe(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/model/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package model
import (
"encoding/binary"
"fmt"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"io"
"net"
"reflect"
Expand Down Expand Up @@ -59,7 +60,7 @@ type Record struct {
// Calculated RTT which is set when record is created by calling NewRecord
TimeFlowRtt time.Duration
DupList []map[string]uint8
NetworkMonitorEventsMD []string
NetworkMonitorEventsMD []config.GenericMap
}

func NewRecord(
Expand All @@ -86,7 +87,7 @@ func NewRecord(
record.DNSLatency = time.Duration(metrics.DnsRecord.Latency)
}
record.DupList = make([]map[string]uint8, 0)
record.NetworkMonitorEventsMD = make([]string, 0)
record.NetworkMonitorEventsMD = make([]config.GenericMap, 0)
return &record
}

Expand Down Expand Up @@ -151,7 +152,7 @@ func IP(ia IPAddr) net.IP {
}

// IntEncodeV4 encodes an IPv4 address as an integer (in network encoding, big endian).
// It assumes that the passed IP is already IPv4. Otherwise it would just encode the
// It assumes that the passed IP is already IPv4. Otherwise, it would just encode the
// last 4 bytes of an IPv6 address
func IntEncodeV4(ia [net.IPv6len]uint8) uint32 {
return binary.BigEndian.Uint32(ia[net.IPv6len-net.IPv4len : net.IPv6len])
Expand Down
Loading

0 comments on commit dadc2d4

Please sign in to comment.