diff --git a/cmd/system-probe/modules/network_tracer_test.go b/cmd/system-probe/modules/network_tracer_test.go index bbe67d3d3df99..8a7a31cde06dd 100644 --- a/cmd/system-probe/modules/network_tracer_test.go +++ b/cmd/system-probe/modules/network_tracer_test.go @@ -28,14 +28,15 @@ func TestDecode(t *testing.T) { BufferedData: network.BufferedData{ Conns: []network.ConnectionStats{ {ConnectionTuple: network.ConnectionTuple{ - Source: util.AddressFromString("10.1.1.1"), - Dest: util.AddressFromString("10.2.2.2"), - Pid: 6000, - NetNS: 7, - SPort: 1000, - DPort: 9000, - Type: network.UDP, - Family: network.AFINET6, + Source: util.AddressFromString("10.1.1.1"), + Dest: util.AddressFromString("10.2.2.2"), + Pid: 6000, + NetNS: 7, + SPort: 1000, + DPort: 9000, + Type: network.UDP, + Family: network.AFINET6, + Direction: network.LOCAL, }, Monotonic: network.StatCounters{ SentBytes: 1, @@ -54,7 +55,6 @@ func TestDecode(t *testing.T) { ReplSrcPort: 40, ReplDstPort: 70, }, - Direction: network.LOCAL, }, }, }, diff --git a/pkg/config/setup/system_probe.go b/pkg/config/setup/system_probe.go index 26ea84571d050..12a94b923c7b7 100644 --- a/pkg/config/setup/system_probe.go +++ b/pkg/config/setup/system_probe.go @@ -217,6 +217,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) { cfg.BindEnvAndSetDefault(join(netNS, "conntrack_init_timeout"), 10*time.Second) cfg.BindEnvAndSetDefault(join(netNS, "allow_netlink_conntracker_fallback"), true) cfg.BindEnvAndSetDefault(join(netNS, "enable_ebpf_conntracker"), true) + cfg.BindEnvAndSetDefault(join(netNS, "enable_cilium_lb_conntracker"), false) cfg.BindEnvAndSetDefault(join(spNS, "source_excludes"), map[string][]string{}) cfg.BindEnvAndSetDefault(join(spNS, "dest_excludes"), map[string][]string{}) diff --git a/pkg/ebpf/maps/generic_map.go b/pkg/ebpf/maps/generic_map.go index 5f77ba9c4e958..02036ce8a7605 100644 --- a/pkg/ebpf/maps/generic_map.go +++ b/pkg/ebpf/maps/generic_map.go @@ -124,12 +124,35 @@ func validateValueTypeForMapType[V any](t ebpf.MapType) error { return nil } +func validateKeyValueSizes[K, V any](m *ebpf.Map) error { + var k K + tk := reflect.TypeOf(k) + if tk.Size() != uintptr(m.KeySize()) { + return fmt.Errorf("map key size (%d) does not match key type (%T) size (%d)", m.KeySize(), k, tk.Size()) + } + var v V + tv := reflect.TypeOf(v) + tvSize := tv.Size() + if isPerCPU(m.Type()) { + tvSize = tv.Elem().Size() + } + if tvSize != uintptr(m.ValueSize()) { + return fmt.Errorf("map value size (%d) does not match value type (%T) size (%d)", m.ValueSize(), v, tvSize) + } + + return nil +} + // Map creates a new GenericMap from an existing ebpf.Map func Map[K any, V any](m *ebpf.Map) (*GenericMap[K, V], error) { if err := validateValueTypeForMapType[V](m.Type()); err != nil { return nil, err } + if err := validateKeyValueSizes[K, V](m); err != nil { + return nil, err + } + // See if we can perform binary.Read on the key type. If we can't we can't use the batch API // for this map var kval K diff --git a/pkg/ebpf/maps/generic_map_test.go b/pkg/ebpf/maps/generic_map_test.go index aa760871e4d15..837932b518cce 100644 --- a/pkg/ebpf/maps/generic_map_test.go +++ b/pkg/ebpf/maps/generic_map_test.go @@ -573,6 +573,30 @@ func TestIterateWithPointerKey(t *testing.T) { require.Equal(t, expectedNumbers, actualNumbers) } +func TestValidateMapKeyValueSize(t *testing.T) { + m, err := ebpf.NewMap(&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 1, + KeySize: 8, + ValueSize: 8, + }) + + require.NoError(t, err, "could not create map") + t.Cleanup(func() { m.Close() }) + + gm, err := Map[uint32, uint64](m) + assert.Error(t, err) + assert.Nil(t, gm) + + gm2, err := Map[uint64, uint32](m) + assert.Error(t, err) + assert.Nil(t, gm2) + + gm3, err := Map[uint64, uint64](m) + assert.NoError(t, err) + assert.NotNil(t, gm3) +} + func TestGenericHashMapCanUseBatchAPI(t *testing.T) { hash, err := ebpf.NewMap(&ebpf.MapSpec{ Type: ebpf.Hash, diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index 47f8c85cda215..bf0e4ad498786 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -209,9 +209,12 @@ type Config struct { // default is true EnableConntrackAllNamespaces bool - // EnableEbpfConntracker enables the ebpf based network conntracker. Used only for testing at the moment + // EnableEbpfConntracker enables the ebpf based network conntracker EnableEbpfConntracker bool + // EnableCiliumLBConntracker enables the cilium load balancer conntracker + EnableCiliumLBConntracker bool + // ClosedChannelSize specifies the size for closed channel for the tracer ClosedChannelSize int @@ -360,6 +363,7 @@ func New() *Config { IgnoreConntrackInitFailure: cfg.GetBool(sysconfig.FullKeyPath(netNS, "ignore_conntrack_init_failure")), ConntrackInitTimeout: cfg.GetDuration(sysconfig.FullKeyPath(netNS, "conntrack_init_timeout")), EnableEbpfConntracker: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_ebpf_conntracker")), + EnableCiliumLBConntracker: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_cilium_lb_conntracker")), EnableGatewayLookup: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_gateway_lookup")), diff --git a/pkg/network/encoding/encoding_test.go b/pkg/network/encoding/encoding_test.go index e0d0eacfde10c..c34183e192695 100644 --- a/pkg/network/encoding/encoding_test.go +++ b/pkg/network/encoding/encoding_test.go @@ -192,14 +192,15 @@ func TestSerialization(t *testing.T) { BufferedData: network.BufferedData{ Conns: []network.ConnectionStats{ {ConnectionTuple: network.ConnectionTuple{ - Source: util.AddressFromString("10.1.1.1"), - Dest: util.AddressFromString("10.2.2.2"), - Pid: 6000, - NetNS: 7, - SPort: 1000, - DPort: 9000, - Type: network.TCP, - Family: network.AFINET6, + Source: util.AddressFromString("10.1.1.1"), + Dest: util.AddressFromString("10.2.2.2"), + Pid: 6000, + NetNS: 7, + SPort: 1000, + DPort: 9000, + Type: network.TCP, + Family: network.AFINET6, + Direction: network.LOCAL, }, Monotonic: network.StatCounters{ SentBytes: 1, @@ -222,7 +223,6 @@ func TestSerialization(t *testing.T) { ReplDstPort: 80, }, - Direction: network.LOCAL, Via: &network.Via{ Subnet: network.Subnet{ Alias: "subnet-foo", @@ -231,14 +231,14 @@ func TestSerialization(t *testing.T) { ProtocolStack: protocols.Stack{Application: protocols.HTTP}, }, {ConnectionTuple: network.ConnectionTuple{ - Source: util.AddressFromString("10.1.1.1"), - Dest: util.AddressFromString("8.8.8.8"), - SPort: 1000, - DPort: 53, - Type: network.UDP, - Family: network.AFINET6, + Source: util.AddressFromString("10.1.1.1"), + Dest: util.AddressFromString("8.8.8.8"), + SPort: 1000, + DPort: 53, + Type: network.UDP, + Family: network.AFINET6, + Direction: network.LOCAL, }, - Direction: network.LOCAL, StaticTags: tagOpenSSL | tagTLS, ProtocolStack: protocols.Stack{Application: protocols.HTTP2}, DNSStats: map[dns.Hostname]map[dns.QueryType]dns.Stats{ diff --git a/pkg/network/encoding/marshal/dns_test.go b/pkg/network/encoding/marshal/dns_test.go index e1ffed438d9dc..24852d91f722e 100644 --- a/pkg/network/encoding/marshal/dns_test.go +++ b/pkg/network/encoding/marshal/dns_test.go @@ -27,14 +27,14 @@ func TestFormatConnectionDNS(t *testing.T) { BufferedData: network.BufferedData{ Conns: []network.ConnectionStats{ {ConnectionTuple: network.ConnectionTuple{ - Source: util.AddressFromString("10.1.1.1"), - Dest: util.AddressFromString("8.8.8.8"), - SPort: 1000, - DPort: 53, - Type: network.UDP, - Family: network.AFINET6, - }, + Source: util.AddressFromString("10.1.1.1"), + Dest: util.AddressFromString("8.8.8.8"), + SPort: 1000, + DPort: 53, + Type: network.UDP, + Family: network.AFINET6, Direction: network.LOCAL, + }, DNSStats: map[dns.Hostname]map[dns.QueryType]dns.Stats{ dns.ToHostname("foo.com"): { dns.TypeA: { diff --git a/pkg/network/event_common.go b/pkg/network/event_common.go index d8cf93f592535..6eea961c2aa3c 100644 --- a/pkg/network/event_common.go +++ b/pkg/network/event_common.go @@ -234,14 +234,15 @@ type StatCookie = uint64 // ConnectionTuple represents the unique network key for a connection type ConnectionTuple struct { - Source util.Address - Dest util.Address - Pid uint32 - NetNS uint32 - SPort uint16 - DPort uint16 - Type ConnectionType - Family ConnectionFamily + Source util.Address + Dest util.Address + Pid uint32 + NetNS uint32 + SPort uint16 + DPort uint16 + Type ConnectionType + Family ConnectionFamily + Direction ConnectionDirection } func (c ConnectionTuple) String() string { @@ -285,7 +286,6 @@ type ConnectionStats struct { ProtocolStack protocols.Stack // keep these fields last because they are 1 byte each and otherwise inflate the struct size due to alignment - Direction ConnectionDirection SPortIsEphemeral EphemeralPortType IntraHost bool IsAssured bool diff --git a/pkg/network/event_test.go b/pkg/network/event_test.go index 60ac66900964b..9370bf3315972 100644 --- a/pkg/network/event_test.go +++ b/pkg/network/event_test.go @@ -52,16 +52,16 @@ func TestBeautifyKey(t *testing.T) { }, { ConnectionTuple: ConnectionTuple{ - Pid: 32065, - Type: 0, - Family: AFINET, - Source: util.AddressFromString("172.21.148.124"), - Dest: util.AddressFromString("130.211.21.187"), - SPort: 52012, - DPort: 443, + Pid: 32065, + Type: 0, + Family: AFINET, + Source: util.AddressFromString("172.21.148.124"), + Dest: util.AddressFromString("130.211.21.187"), + SPort: 52012, + DPort: 443, + Direction: 2, }, - Direction: 2, - Cookie: 2, + Cookie: 2, }, } { bk := c.ByteKey(buf) diff --git a/pkg/network/netlink/conntracker.go b/pkg/network/netlink/conntracker.go index 16e9ff3a4f8c6..0bbfe2891abff 100644 --- a/pkg/network/netlink/conntracker.go +++ b/pkg/network/netlink/conntracker.go @@ -16,14 +16,12 @@ import ( "sync" "time" + "github.com/hashicorp/golang-lru/v2/simplelru" "github.com/prometheus/client_golang/prometheus" "github.com/syndtr/gocapability/capability" "golang.org/x/sys/unix" - "github.com/hashicorp/golang-lru/v2/simplelru" - telemetryComp "github.com/DataDog/datadog-agent/comp/core/telemetry" - "github.com/DataDog/datadog-agent/pkg/network" "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/process/util" @@ -49,7 +47,7 @@ type Conntracker interface { // Collect returns the current state of all metrics of the collector Collect(metrics chan<- prometheus.Metric) GetTranslationForConn(*network.ConnectionTuple) *network.IPTranslation - // GetType returns a string describing whether the conntracker is "ebpf" or "netlink" + // GetType returns a string describing the conntracker type GetType() string DeleteTranslation(*network.ConnectionTuple) DumpCachedTable(context.Context) (map[uint32][]DebugConntrackEntry, error) diff --git a/pkg/network/netlink/consumer.go b/pkg/network/netlink/consumer.go index c912db22715e5..942c0b1e7b771 100644 --- a/pkg/network/netlink/consumer.go +++ b/pkg/network/netlink/consumer.go @@ -339,7 +339,14 @@ func (c *Consumer) dumpTable(family uint8, output chan Event, ns netns.NsHandle) // LoadNfConntrackKernelModule requests a dummy connection tuple from netlink conntrack which is discarded but has // the side effect of loading the nf_conntrack_netlink module -func LoadNfConntrackKernelModule(ns netns.NsHandle) error { +func LoadNfConntrackKernelModule(cfg *config.Config) error { + ns, err := cfg.GetRootNetNs() + if err != nil { + return fmt.Errorf("error fetching root net namespace, will not attempt to load nf_conntrack_netlink module: %w", err) + } + + defer ns.Close() + sock, err := NewSocket(ns) if err != nil { ino, errIno := kernel.GetInoForNs(ns) diff --git a/pkg/network/resolver_test.go b/pkg/network/resolver_test.go index c0977141ee34b..0d4aa826b64c8 100644 --- a/pkg/network/resolver_test.go +++ b/pkg/network/resolver_test.go @@ -19,15 +19,15 @@ import ( func TestResolveLocalConnections(t *testing.T) { conns := []ConnectionStats{ {ConnectionTuple: ConnectionTuple{ - Pid: 8579, - Source: util.AddressFromString("172.29.132.189"), - SPort: 37432, - Dest: util.AddressFromString("172.29.168.124"), - DPort: 8080, - NetNS: 4026533024, - Type: TCP, - }, + Pid: 8579, + Source: util.AddressFromString("172.29.132.189"), + SPort: 37432, + Dest: util.AddressFromString("172.29.168.124"), + DPort: 8080, + NetNS: 4026533024, + Type: TCP, Direction: OUTGOING, + }, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -37,15 +37,15 @@ func TestResolveLocalConnections(t *testing.T) { IntraHost: true, }, {ConnectionTuple: ConnectionTuple{ - Pid: 8576, - Source: util.AddressFromString("172.29.132.189"), - SPort: 46822, - Dest: util.AddressFromString("172.29.168.124"), - DPort: 8080, - NetNS: 4026533024, - Type: TCP, - }, + Pid: 8576, + Source: util.AddressFromString("172.29.132.189"), + SPort: 46822, + Dest: util.AddressFromString("172.29.168.124"), + DPort: 8080, + NetNS: 4026533024, + Type: TCP, Direction: OUTGOING, + }, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -55,15 +55,15 @@ func TestResolveLocalConnections(t *testing.T) { IntraHost: true, }, {ConnectionTuple: ConnectionTuple{ - Pid: 1342852, - Source: util.AddressFromString("172.29.168.124"), - SPort: 8080, - Dest: util.AddressFromString("172.29.132.189"), - DPort: 46822, - NetNS: 4026533176, - Type: TCP, - }, + Pid: 1342852, + Source: util.AddressFromString("172.29.168.124"), + SPort: 8080, + Dest: util.AddressFromString("172.29.132.189"), + DPort: 46822, + NetNS: 4026533176, + Type: TCP, Direction: INCOMING, + }, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -73,15 +73,15 @@ func TestResolveLocalConnections(t *testing.T) { IntraHost: true, }, {ConnectionTuple: ConnectionTuple{ - Pid: 1344818, - Source: util.AddressFromString("172.29.168.124"), - SPort: 8080, - Dest: util.AddressFromString("172.29.132.189"), - DPort: 37432, - NetNS: 4026533176, - Type: TCP, - }, + Pid: 1344818, + Source: util.AddressFromString("172.29.168.124"), + SPort: 8080, + Dest: util.AddressFromString("172.29.132.189"), + DPort: 37432, + NetNS: 4026533176, + Type: TCP, Direction: INCOMING, + }, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -116,12 +116,13 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "raddr resolution with nat", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 1, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1234, - Dest: util.AddressFromString("10.1.1.2"), - DPort: 1234, - NetNS: 1, + Pid: 1, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1234, + Dest: util.AddressFromString("10.1.1.2"), + DPort: 1234, + NetNS: 1, + Direction: INCOMING, }, IPTranslation: &IPTranslation{ ReplDstIP: util.AddressFromString("127.0.0.1"), @@ -129,7 +130,6 @@ func TestResolveLoopbackConnections(t *testing.T) { ReplSrcIP: util.AddressFromString("10.1.1.2"), ReplSrcPort: 1234, }, - Direction: INCOMING, IntraHost: true, ContainerID: struct { Source *intern.Value @@ -143,12 +143,13 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "raddr resolution with nat to localhost", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 2, - NetNS: 1, - Source: util.AddressFromString("10.1.1.2"), - SPort: 1234, - Dest: util.AddressFromString("10.1.1.1"), - DPort: 1234, + Pid: 2, + NetNS: 1, + Source: util.AddressFromString("10.1.1.2"), + SPort: 1234, + Dest: util.AddressFromString("10.1.1.1"), + DPort: 1234, + Direction: OUTGOING, }, IPTranslation: &IPTranslation{ ReplDstIP: util.AddressFromString("10.1.1.2"), @@ -156,7 +157,6 @@ func TestResolveLoopbackConnections(t *testing.T) { ReplSrcIP: util.AddressFromString("127.0.0.1"), ReplSrcPort: 1234, }, - Direction: OUTGOING, IntraHost: true, ContainerID: struct { Source *intern.Value @@ -170,15 +170,15 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "raddr failed localhost resolution", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 3, - NetNS: 3, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1235, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1234, + Pid: 3, + NetNS: 3, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1235, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1234, + Direction: INCOMING, }, IntraHost: true, - Direction: INCOMING, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -191,15 +191,15 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "raddr resolution within same netns (3)", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 5, - NetNS: 3, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1240, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1235, + Pid: 5, + NetNS: 3, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1240, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1235, + Direction: OUTGOING, }, IntraHost: true, - Direction: OUTGOING, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -212,15 +212,15 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "raddr resolution within same netns (1)", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 3, - NetNS: 3, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1235, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1240, + Pid: 3, + NetNS: 3, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1235, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1240, + Direction: INCOMING, }, IntraHost: true, - Direction: INCOMING, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -233,15 +233,15 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "raddr resolution within same netns (2)", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 5, - NetNS: 3, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1240, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1235, + Pid: 5, + NetNS: 3, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1240, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1235, + Direction: OUTGOING, }, IntraHost: true, - Direction: OUTGOING, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -254,15 +254,15 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "raddr failed resolution, known address in different netns", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 5, - NetNS: 4, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1240, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1235, + Pid: 5, + NetNS: 4, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1240, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1235, + Direction: OUTGOING, }, IntraHost: true, - Direction: OUTGOING, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -275,14 +275,14 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "failed laddr and raddr resolution", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 10, - NetNS: 10, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1234, - Dest: util.AddressFromString("10.1.1.1"), - DPort: 1235, - }, + Pid: 10, + NetNS: 10, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1234, + Dest: util.AddressFromString("10.1.1.1"), + DPort: 1235, Direction: OUTGOING, + }, IntraHost: false, }, expectedRaddrID: "", @@ -290,14 +290,14 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "failed resolution: unknown pid for laddr, raddr address in different netns from known address", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 11, - NetNS: 10, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1250, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1240, - }, + Pid: 11, + NetNS: 10, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1250, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1240, Direction: OUTGOING, + }, IntraHost: true, }, expectedRaddrID: "", @@ -305,14 +305,14 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "localhost resolution within same netns 1/2", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 6, - NetNS: 7, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1260, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1250, - }, + Pid: 6, + NetNS: 7, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1260, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1250, Direction: OUTGOING, + }, IntraHost: true, ContainerID: struct { Source *intern.Value @@ -326,14 +326,14 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "localhost resolution within same netns 2/2", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 7, - NetNS: 7, - Source: util.AddressFromString("127.0.0.1"), - SPort: 1250, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1260, - }, + Pid: 7, + NetNS: 7, + Source: util.AddressFromString("127.0.0.1"), + SPort: 1250, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1260, Direction: INCOMING, + }, IntraHost: true, ContainerID: struct { Source *intern.Value @@ -347,14 +347,14 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "zero src netns failed resolution", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 22, - NetNS: 0, - Source: util.AddressFromString("127.0.0.1"), - SPort: 8282, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 1250, - }, + Pid: 22, + NetNS: 0, + Source: util.AddressFromString("127.0.0.1"), + SPort: 8282, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 1250, Direction: OUTGOING, + }, ContainerID: struct { Source *intern.Value Dest *intern.Value @@ -367,14 +367,14 @@ func TestResolveLoopbackConnections(t *testing.T) { { name: "zero src and dst netns failed resolution", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Pid: 21, - NetNS: 0, - Source: util.AddressFromString("127.0.0.1"), - SPort: 8181, - Dest: util.AddressFromString("127.0.0.1"), - DPort: 8282, - }, + Pid: 21, + NetNS: 0, + Source: util.AddressFromString("127.0.0.1"), + SPort: 8181, + Dest: util.AddressFromString("127.0.0.1"), + DPort: 8282, Direction: OUTGOING, + }, ContainerID: struct { Source *intern.Value Dest *intern.Value diff --git a/pkg/network/state_test.go b/pkg/network/state_test.go index 2e6c77c8f12f7..4dfff03a85747 100644 --- a/pkg/network/state_test.go +++ b/pkg/network/state_test.go @@ -1918,13 +1918,13 @@ func TestDetermineConnectionIntraHost(t *testing.T) { { name: "local connection with nat on both sides (outgoing)", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("169.254.169.254"), - SPort: 12345, - DPort: 80, - NetNS: 1212, - }, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("169.254.169.254"), + SPort: 12345, + DPort: 80, + NetNS: 1212, Direction: OUTGOING, + }, IPTranslation: &IPTranslation{ ReplSrcIP: util.AddressFromString("127.0.0.1"), ReplDstIP: util.AddressFromString("1.1.1.1"), @@ -1937,13 +1937,13 @@ func TestDetermineConnectionIntraHost(t *testing.T) { { name: "local connection with nat on both sides (incoming)", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("127.0.0.1"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 8181, - DPort: 12345, - NetNS: 1233, - }, + Source: util.AddressFromString("127.0.0.1"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 8181, + DPort: 12345, + NetNS: 1233, Direction: INCOMING, + }, IPTranslation: &IPTranslation{ ReplSrcIP: util.AddressFromString("1.1.1.1"), ReplDstIP: util.AddressFromString("169.254.169.254"), @@ -1956,13 +1956,13 @@ func TestDetermineConnectionIntraHost(t *testing.T) { { name: "remote connection with source translation (redirect)", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("4.4.4.4"), - Dest: util.AddressFromString("2.2.2.2"), - SPort: 12345, - DPort: 80, - NetNS: 2, - }, + Source: util.AddressFromString("4.4.4.4"), + Dest: util.AddressFromString("2.2.2.2"), + SPort: 12345, + DPort: 80, + NetNS: 2, Direction: INCOMING, + }, IPTranslation: &IPTranslation{ ReplSrcIP: util.AddressFromString("2.2.2.2"), ReplDstIP: util.AddressFromString("127.0.0.1"), @@ -1975,66 +1975,66 @@ func TestDetermineConnectionIntraHost(t *testing.T) { { name: "local connection, same network ns", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("2.2.2.2"), - SPort: 12345, - DPort: 80, - NetNS: 1, - }, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("2.2.2.2"), + SPort: 12345, + DPort: 80, + NetNS: 1, Direction: OUTGOING, }, + }, intraHost: true, }, { name: "local connection, same network ns", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("2.2.2.2"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 80, - DPort: 12345, - NetNS: 1, - }, + Source: util.AddressFromString("2.2.2.2"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 80, + DPort: 12345, + NetNS: 1, Direction: INCOMING, - }, + }}, + intraHost: true, }, { name: "local connection, different network ns", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("2.2.2.2"), - SPort: 12345, - DPort: 80, - NetNS: 1, - }, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("2.2.2.2"), + SPort: 12345, + DPort: 80, + NetNS: 1, Direction: OUTGOING, }, + }, intraHost: true, }, { name: "local connection, different network ns", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("2.2.2.2"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 80, - DPort: 12345, - NetNS: 2, - }, + Source: util.AddressFromString("2.2.2.2"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 80, + DPort: 12345, + NetNS: 2, Direction: INCOMING, }, + }, intraHost: true, }, { name: "remote connection", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("3.3.3.3"), - SPort: 12345, - DPort: 80, - NetNS: 1, - }, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("3.3.3.3"), + SPort: 12345, + DPort: 80, + NetNS: 1, Direction: OUTGOING, }, + }, intraHost: false, }, } @@ -2071,81 +2071,81 @@ func TestIntraHostFixDirection(t *testing.T) { { name: "outgoing both non-ephemeral", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 123, - DPort: 456, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 123, + DPort: 456, + Direction: OUTGOING, }, IntraHost: true, - Direction: OUTGOING, }, direction: OUTGOING, }, { name: "outgoing non ephemeral to ephemeral", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 123, - DPort: 49612, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 123, + DPort: 49612, + Direction: OUTGOING, }, IntraHost: true, - Direction: OUTGOING, }, direction: INCOMING, }, { name: "outgoing ephemeral to non ephemeral", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 49612, - DPort: 123, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 49612, + DPort: 123, + Direction: OUTGOING, }, IntraHost: true, - Direction: OUTGOING, }, direction: OUTGOING, }, { name: "incoming udp non ephemeral to ephemeral", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Type: UDP, - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 49612, - DPort: 123, + Type: UDP, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 49612, + DPort: 123, + Direction: INCOMING, }, IntraHost: true, - Direction: INCOMING, }, direction: OUTGOING, }, { name: "incoming udp ephemeral to non ephemeral", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Type: UDP, - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 123, - DPort: 49612, + Type: UDP, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 123, + DPort: 49612, + Direction: INCOMING, }, IntraHost: true, - Direction: INCOMING, }, direction: INCOMING, }, { name: "incoming tcp non ephemeral to ephemeral", conn: ConnectionStats{ConnectionTuple: ConnectionTuple{ - Type: TCP, - Source: util.AddressFromString("1.1.1.1"), - Dest: util.AddressFromString("1.1.1.1"), - SPort: 49612, - DPort: 123, + Type: TCP, + Source: util.AddressFromString("1.1.1.1"), + Dest: util.AddressFromString("1.1.1.1"), + SPort: 49612, + DPort: 123, + Direction: INCOMING, }, IntraHost: true, - Direction: INCOMING, }, direction: INCOMING, }, @@ -2395,17 +2395,17 @@ func TestKafkaStatsWithMultipleClients(t *testing.T) { func TestConnectionRollup(t *testing.T) { conns := []ConnectionStats{ {ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("172.29.141.26"), - SPort: 50010, - Family: AFINET, - NetNS: 4026532341, - Pid: 28385, - Dest: util.AddressFromString("10.100.0.10"), - DPort: 53, - Type: UDP, + Source: util.AddressFromString("172.29.141.26"), + SPort: 50010, + Family: AFINET, + NetNS: 4026532341, + Pid: 28385, + Dest: util.AddressFromString("10.100.0.10"), + DPort: 53, + Type: UDP, + Direction: OUTGOING, }, // should be rolled up with next connection - Direction: OUTGOING, IntraHost: false, IPTranslation: &IPTranslation{ ReplDstIP: util.AddressFromString("172.29.141.26"), @@ -2429,17 +2429,17 @@ func TestConnectionRollup(t *testing.T) { IsClosed: true, }, {ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("172.29.141.26"), - SPort: 49155, - Family: AFINET, - NetNS: 4026532341, - Pid: 28385, - Dest: util.AddressFromString("10.100.0.10"), - DPort: 53, - Type: UDP, + Source: util.AddressFromString("172.29.141.26"), + SPort: 49155, + Family: AFINET, + NetNS: 4026532341, + Pid: 28385, + Dest: util.AddressFromString("10.100.0.10"), + DPort: 53, + Type: UDP, + Direction: OUTGOING, }, // should be rolled up with previous connection - Direction: OUTGOING, IntraHost: false, IPTranslation: &IPTranslation{ ReplDstIP: util.AddressFromString("172.29.141.26"), @@ -2463,17 +2463,17 @@ func TestConnectionRollup(t *testing.T) { IsClosed: true, }, {ConnectionTuple: ConnectionTuple{ - Family: AFINET, - Source: util.AddressFromString("172.29.141.26"), - SPort: 52907, - NetNS: 4026532341, - Pid: 28385, - Dest: util.AddressFromString("10.100.0.10"), - DPort: 53, - Type: UDP, + Family: AFINET, + Source: util.AddressFromString("172.29.141.26"), + SPort: 52907, + NetNS: 4026532341, + Pid: 28385, + Dest: util.AddressFromString("10.100.0.10"), + DPort: 53, + Type: UDP, + Direction: OUTGOING, }, // should be rolled up with next connection - Direction: OUTGOING, IntraHost: false, IPTranslation: &IPTranslation{ ReplDstIP: util.AddressFromString("172.29.141.26"), @@ -2497,17 +2497,17 @@ func TestConnectionRollup(t *testing.T) { IsClosed: true, }, {ConnectionTuple: ConnectionTuple{ - Family: AFINET, - Source: util.AddressFromString("172.29.141.26"), - SPort: 52904, - NetNS: 4026532341, - Pid: 28385, - Dest: util.AddressFromString("10.100.0.10"), - DPort: 53, - Type: UDP, + Family: AFINET, + Source: util.AddressFromString("172.29.141.26"), + SPort: 52904, + NetNS: 4026532341, + Pid: 28385, + Dest: util.AddressFromString("10.100.0.10"), + DPort: 53, + Type: UDP, + Direction: OUTGOING, }, // should be rolled up with previous connection - Direction: OUTGOING, IntraHost: false, IPTranslation: &IPTranslation{ ReplDstIP: util.AddressFromString("172.29.141.26"), @@ -2531,17 +2531,17 @@ func TestConnectionRollup(t *testing.T) { IsClosed: true, }, {ConnectionTuple: ConnectionTuple{ - Family: AFINET, - Source: util.AddressFromString("172.29.141.26"), - SPort: 37240, - NetNS: 4026532341, - Pid: 28385, - Dest: util.AddressFromString("10.100.0.10"), - DPort: 53, - Type: UDP, + Family: AFINET, + Source: util.AddressFromString("172.29.141.26"), + SPort: 37240, + NetNS: 4026532341, + Pid: 28385, + Dest: util.AddressFromString("10.100.0.10"), + DPort: 53, + Type: UDP, + Direction: OUTGOING, }, // this should not be rolled up as the duration is > 2 mins - Direction: OUTGOING, IntraHost: false, IPTranslation: &IPTranslation{ ReplDstIP: util.AddressFromString("172.29.141.26"), @@ -2565,14 +2565,15 @@ func TestConnectionRollup(t *testing.T) { IsClosed: true, }, {ConnectionTuple: ConnectionTuple{ - Pid: 5652, - Source: util.AddressFromString("172.29.160.125"), - SPort: 8443, - Dest: util.AddressFromString("172.29.166.243"), - DPort: 38633, - Family: AFINET, - Type: TCP, - NetNS: 4026531992, + Pid: 5652, + Source: util.AddressFromString("172.29.160.125"), + SPort: 8443, + Dest: util.AddressFromString("172.29.166.243"), + DPort: 38633, + Family: AFINET, + Type: TCP, + NetNS: 4026531992, + Direction: INCOMING, }, ContainerID: struct{ Source, Dest *intern.Value }{Source: intern.GetByString("403ca32ba9b1c3955ba79a84039c9de34d81c83aa3a27ece70b19b3df84c9460")}, SPortIsEphemeral: EphemeralFalse, @@ -2584,7 +2585,6 @@ func TestConnectionRollup(t *testing.T) { RecvPackets: 3, TCPEstablished: 1, }, - Direction: INCOMING, RTT: 262, RTTVar: 131, IntraHost: false, @@ -2593,14 +2593,15 @@ func TestConnectionRollup(t *testing.T) { IsClosed: true, }, {ConnectionTuple: ConnectionTuple{ - Pid: 5652, - Source: util.AddressFromString("172.29.160.125"), - SPort: 8443, - Dest: util.AddressFromString("172.29.154.189"), - DPort: 60509, - Family: AFINET, - Type: TCP, - NetNS: 4026531992, + Pid: 5652, + Source: util.AddressFromString("172.29.160.125"), + SPort: 8443, + Dest: util.AddressFromString("172.29.154.189"), + DPort: 60509, + Family: AFINET, + Type: TCP, + NetNS: 4026531992, + Direction: INCOMING, }, ContainerID: struct{ Source, Dest *intern.Value }{Source: intern.GetByString("403ca32ba9b1c3955ba79a84039c9de34d81c83aa3a27ece70b19b3df84c9460")}, SPortIsEphemeral: EphemeralFalse, @@ -2612,7 +2613,6 @@ func TestConnectionRollup(t *testing.T) { RecvPackets: 3, TCPEstablished: 1, }, - Direction: INCOMING, RTT: 254, RTTVar: 127, IntraHost: false, @@ -2621,14 +2621,15 @@ func TestConnectionRollup(t *testing.T) { IsClosed: true, }, {ConnectionTuple: ConnectionTuple{ - Pid: 5652, - Source: util.AddressFromString("172.29.160.125"), - SPort: 8443, - Dest: util.AddressFromString("172.29.166.243"), - DPort: 34715, - Family: AFINET, - Type: TCP, - NetNS: 4026531992, + Pid: 5652, + Source: util.AddressFromString("172.29.160.125"), + SPort: 8443, + Dest: util.AddressFromString("172.29.166.243"), + DPort: 34715, + Family: AFINET, + Type: TCP, + NetNS: 4026531992, + Direction: INCOMING, }, ContainerID: struct{ Source, Dest *intern.Value }{Source: intern.GetByString("403ca32ba9b1c3955ba79a84039c9de34d81c83aa3a27ece70b19b3df84c9460")}, SPortIsEphemeral: EphemeralFalse, @@ -2640,7 +2641,6 @@ func TestConnectionRollup(t *testing.T) { RecvPackets: 8, TCPEstablished: 1, }, - Direction: INCOMING, RTT: 250, RTTVar: 66, IntraHost: false, @@ -2816,31 +2816,31 @@ func TestFilterConnections(t *testing.T) { func TestDNSPIDCollision(t *testing.T) { conns := []ConnectionStats{ {ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("10.1.1.1"), - Dest: util.AddressFromString("8.8.8.8"), - Pid: 1, - SPort: 1000, - DPort: 53, - Type: UDP, - Family: AFINET, - }, + Source: util.AddressFromString("10.1.1.1"), + Dest: util.AddressFromString("8.8.8.8"), + Pid: 1, + SPort: 1000, + DPort: 53, + Type: UDP, + Family: AFINET, Direction: LOCAL, - Cookie: 1, + }, + Cookie: 1, Monotonic: StatCounters{ RecvBytes: 2, }, }, {ConnectionTuple: ConnectionTuple{ - Source: util.AddressFromString("10.1.1.1"), - Dest: util.AddressFromString("8.8.8.8"), - Pid: 2, - SPort: 1000, - DPort: 53, - Type: UDP, - Family: AFINET, - }, + Source: util.AddressFromString("10.1.1.1"), + Dest: util.AddressFromString("8.8.8.8"), + Pid: 2, + SPort: 1000, + DPort: 53, + Type: UDP, + Family: AFINET, Direction: LOCAL, - Cookie: 2, + }, + Cookie: 2, Monotonic: StatCounters{ RecvBytes: 2, }, diff --git a/pkg/network/tracer/chain_conntracker.go b/pkg/network/tracer/chain_conntracker.go new file mode 100644 index 0000000000000..9725b1df2143e --- /dev/null +++ b/pkg/network/tracer/chain_conntracker.go @@ -0,0 +1,106 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux_bpf + +// Package tracer implements the functionality of the network tracer +package tracer + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/DataDog/datadog-agent/pkg/network" + "github.com/DataDog/datadog-agent/pkg/network/netlink" +) + +type chainConntracker struct { + ctks []netlink.Conntracker +} + +func chainConntrackers(ctks ...netlink.Conntracker) netlink.Conntracker { + if len(ctks) == 1 { + return ctks[0] + } + + // filter out no-ops + filtered := ctks[:0] + for _, ctk := range ctks { + if ctk != nil && ctk.GetType() != "" { + filtered = append(filtered, ctk) + } + } + + if len(filtered) == 0 { + return netlink.NewNoOpConntracker() + } + + if len(filtered) == 1 { + return filtered[0] + } + + return &chainConntracker{ + ctks: filtered, + } +} + +// Describe returns all descriptions of the collector +func (ct *chainConntracker) Describe(descs chan<- *prometheus.Desc) { + for _, ctk := range ct.ctks { + ctk.Describe(descs) + } +} + +// Collect returns the current state of all metrics of the collector +func (ct *chainConntracker) Collect(metrics chan<- prometheus.Metric) { + for _, ctk := range ct.ctks { + ctk.Collect(metrics) + } +} + +func (ct *chainConntracker) GetTranslationForConn(c *network.ConnectionTuple) *network.IPTranslation { + for _, ctk := range ct.ctks { + if trans := ctk.GetTranslationForConn(c); trans != nil { + return trans + } + } + + return nil +} + +// GetType returns a string describing whether the conntracker is "ebpf" or "netlink" +func (ct *chainConntracker) GetType() string { + return "chain" +} + +func (ct *chainConntracker) DeleteTranslation(c *network.ConnectionTuple) { + for _, ctk := range ct.ctks { + ctk.DeleteTranslation(c) + } +} + +func (ct *chainConntracker) DumpCachedTable(ctx context.Context) (map[uint32][]netlink.DebugConntrackEntry, error) { + res := map[uint32][]netlink.DebugConntrackEntry{} + for _, ctk := range ct.ctks { + var m map[uint32][]netlink.DebugConntrackEntry + var err error + if m, err = ctk.DumpCachedTable(ctx); err != nil { + return res, err + } + + for k, v := range m { + res[k] = append(res[k], v...) + } + } + + return res, nil +} + +func (ct *chainConntracker) Close() { + for _, ctk := range ct.ctks { + ctk.Close() + } +} diff --git a/pkg/network/tracer/cilium_lb.go b/pkg/network/tracer/cilium_lb.go new file mode 100644 index 0000000000000..ca22db0ce5ab7 --- /dev/null +++ b/pkg/network/tracer/cilium_lb.go @@ -0,0 +1,295 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux_bpf + +package tracer + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "net" + "sync" + "time" + "unsafe" + + "github.com/cilium/ebpf" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sys/unix" + + "github.com/DataDog/datadog-agent/pkg/ebpf/maps" + "github.com/DataDog/datadog-agent/pkg/network" + "github.com/DataDog/datadog-agent/pkg/network/config" + "github.com/DataDog/datadog-agent/pkg/network/netlink" + "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/DataDog/datadog-agent/pkg/telemetry" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const ( + //revive:disable + // from github.com/cilium/cilium/pkg/maps/ctmap/lookup.go + TUPLE_F_OUT = 0 + TUPLE_F_IN = 1 + TUPLE_F_RELATED = 2 + TUPLE_F_SERVICE = 4 + //revive:enable +) + +const ciliumConntrackerModuleName = "network_tracer__cilium_conntracker" + +var ciliumConntrackerTelemetry = struct { + getsDuration telemetry.Histogram + getsTotal telemetry.Counter +}{ + telemetry.NewHistogram(ciliumConntrackerModuleName, "gets_duration_nanoseconds", []string{}, "Histogram measuring the time spent retrieving connection tuples from the EBPF map", defaultBuckets), + telemetry.NewCounter(ciliumConntrackerModuleName, "gets_total", []string{}, "Counter measuring the total number of attempts to get connection tuples from the EBPF map"), +} + +type tupleKey4 struct { + DestAddr [4]byte + SourceAddr [4]byte + SourcePort uint16 + DestPort uint16 + NextHeader uint8 + Flags uint8 +} + +type ctEntry struct { + Reserved0 uint64 + BackendID uint64 + Packets uint64 + Bytes uint64 + Lifetime uint32 + Flags uint16 + // RevNAT is in network byte order + RevNAT uint16 + IfIndex uint16 + TxFlagsSeen uint8 + RxFlagsSeen uint8 + SourceSecurityID uint32 + LastTxReport uint32 + LastRxReport uint32 +} + +type backend4KeyV3 struct { + ID uint32 +} + +type backend4ValueV3 struct { + Address [4]byte + Port uint16 + Proto uint8 + Flags uint8 + ClusterID uint16 + Zone uint8 + Pad uint8 +} + +type backend struct { + addr util.Address + port uint16 +} + +type ciliumLoadBalancerConntracker struct { + m sync.Mutex + backends *maps.GenericMap[backend4KeyV3, backend4ValueV3] + ctTCP, ctUDP *maps.GenericMap[tupleKey4, ctEntry] + backendIDToBackend map[uint32]backend + stop chan struct{} + closeOnce sync.Once +} + +func newCiliumLoadBalancerConntracker(cfg *config.Config) (netlink.Conntracker, error) { + if !cfg.EnableCiliumLBConntracker { + return netlink.NewNoOpConntracker(), nil + } + + ctTCP, err := ebpf.LoadPinnedMap("/sys/fs/bpf/tc/globals/cilium_ct4_global", &ebpf.LoadPinOptions{ + ReadOnly: true, + }) + if err != nil { + return nil, fmt.Errorf("error loading pinned ct TCP map: %w", err) + } + + ctUDP, err := ebpf.LoadPinnedMap("/sys/fs/bpf/tc/globals/cilium_ct_any4_global", &ebpf.LoadPinOptions{ + ReadOnly: true, + }) + if err != nil { + return nil, fmt.Errorf("error loading pinned ct UDP map: %w", err) + } + + backends, err := ebpf.LoadPinnedMap("/sys/fs/bpf/tc/globals/cilium_lb4_backends_v3", &ebpf.LoadPinOptions{ + ReadOnly: true, + }) + if err != nil { + return nil, fmt.Errorf("error loading pinned backends map: %w", err) + } + + clb := &ciliumLoadBalancerConntracker{ + backendIDToBackend: make(map[uint32]backend), + } + if clb.ctTCP, err = maps.Map[tupleKey4, ctEntry](ctTCP); err != nil { + return nil, fmt.Errorf("could not make generic map for ct TCP map: %w", err) + } + if clb.ctUDP, err = maps.Map[tupleKey4, ctEntry](ctUDP); err != nil { + return nil, fmt.Errorf("could not make generic map for ct UDP map: %w", err) + } + if clb.backends, err = maps.Map[backend4KeyV3, backend4ValueV3](backends); err != nil { + return nil, fmt.Errorf("could not make generic map for backends map: %w", err) + } + + clb.stop = make(chan struct{}) + go func() { + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + for { + select { + case <-tick.C: + clb.updateBackends() + case <-clb.stop: + close(clb.stop) + return + } + } + }() + + return clb, nil +} + +func ntohs(n uint16) uint16 { + return binary.BigEndian.Uint16([]byte{byte(n), byte(n >> 8)}) +} + +func htons(n uint16) uint16 { + b := make([]byte, 2) + binary.BigEndian.PutUint16(b, n) + return *(*uint16)(unsafe.Pointer(&b[0])) +} + +func (clb *ciliumLoadBalancerConntracker) updateBackends() { + clb.m.Lock() + defer clb.m.Unlock() + + it := clb.backends.Iterate() + var k backend4KeyV3 + var v backend4ValueV3 + for it.Next(&k, &v) { + clb.backendIDToBackend[k.ID] = backend{ + addr: util.AddressFromNetIP(net.IPv4(v.Address[0], v.Address[1], v.Address[2], v.Address[3])), + port: ntohs(v.Port), + } + } + + if it.Err() != nil { + log.Warnf("error iterating lb backends map: %s", it.Err()) + } +} + +// Describe returns all descriptions of the collector +func (clb *ciliumLoadBalancerConntracker) Describe(chan<- *prometheus.Desc) { +} + +// Collect returns the current state of all metrics of the collector +func (clb *ciliumLoadBalancerConntracker) Collect(chan<- prometheus.Metric) { +} + +// GetTranslationForConn returns the network address translation for a given connection tuple +func (clb *ciliumLoadBalancerConntracker) GetTranslationForConn(c *network.ConnectionTuple) *network.IPTranslation { + // TODO: add ipv6 support + if c.Family != network.AFINET { + return nil + } + + if c.Direction != network.OUTGOING { + return nil + } + + if c.Dest.IsLoopback() { + return nil + } + + startTime := time.Now() + defer func() { + ciliumConntrackerTelemetry.getsTotal.Inc() + ciliumConntrackerTelemetry.getsDuration.Observe(float64(time.Since(startTime).Nanoseconds())) + }() + + queryMap := clb.ctTCP + t := tupleKey4{ + Flags: TUPLE_F_OUT | TUPLE_F_SERVICE, + NextHeader: uint8(unix.IPPROTO_TCP), + SourcePort: htons(c.SPort), + DestPort: htons(c.DPort), + SourceAddr: c.Source.As4(), + DestAddr: c.Dest.As4(), + } + if c.Type == network.UDP { + t.NextHeader = unix.IPPROTO_UDP + queryMap = clb.ctUDP + } + + log.TraceFunc(func() string { + return fmt.Sprintf("looking up tuple %+v in ct map", t) + }) + + var ctEntry ctEntry + var err error + if err = queryMap.Lookup(&t, &ctEntry); err != nil { + if !errors.Is(err, ebpf.ErrKeyNotExist) { + log.Warnf("error looking up %+v in ct map: %s", t, err) + } + + log.TraceFunc(func() string { + return fmt.Sprintf("lookup failed for %+v in ct map", t) + }) + + return nil + } + + log.TraceFunc(func() string { + return fmt.Sprintf("found ct entry for %+v: %+v", t, ctEntry) + }) + + clb.m.Lock() + defer clb.m.Unlock() + + if b, ok := clb.backendIDToBackend[uint32(ctEntry.BackendID)]; ok && (b.addr != c.Dest || b.port != c.DPort) { + return &network.IPTranslation{ + ReplDstIP: c.Source, + ReplDstPort: c.SPort, + ReplSrcIP: b.addr, + ReplSrcPort: b.port, + } + } + + return nil +} + +// GetType returns a string describing the conntracker type +func (clb *ciliumLoadBalancerConntracker) GetType() string { + return "cilium_lb" +} + +// DeleteTranslation delete the network address translation for a tuple +func (clb *ciliumLoadBalancerConntracker) DeleteTranslation(*network.ConnectionTuple) { +} + +// DumpCachedTable dumps the in-memory address translation table +func (clb *ciliumLoadBalancerConntracker) DumpCachedTable(context.Context) (map[uint32][]netlink.DebugConntrackEntry, error) { + return nil, nil +} + +// Close closes the conntracker +func (clb *ciliumLoadBalancerConntracker) Close() { + clb.closeOnce.Do(func() { + clb.stop <- struct{}{} + <-clb.stop + clb.ctTCP.Map().Close() + clb.backends.Map().Close() + }) +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go index b7d525bce1531..44def44a2860c 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go @@ -109,16 +109,16 @@ func makeTCPStates(synPkt testCapture) *network.ConnectionStats { } return &network.ConnectionStats{ ConnectionTuple: network.ConnectionTuple{ - Source: util.AddressFromNetIP(srcIP), - Dest: util.AddressFromNetIP(dstIP), - Pid: 0, // packet capture does not have PID information. - NetNS: defaultNsID, - SPort: uint16(synPkt.tcp.SrcPort), - DPort: uint16(synPkt.tcp.DstPort), - Type: network.TCP, - Family: family, + Source: util.AddressFromNetIP(srcIP), + Dest: util.AddressFromNetIP(dstIP), + Pid: 0, // packet capture does not have PID information. + NetNS: defaultNsID, + SPort: uint16(synPkt.tcp.SrcPort), + DPort: uint16(synPkt.tcp.DstPort), + Type: network.TCP, + Family: family, + Direction: direction, }, - Direction: direction, TCPFailures: make(map[uint16]uint32), } } diff --git a/pkg/network/tracer/offsetguess_test.go b/pkg/network/tracer/offsetguess_test.go index 43c27a25cc6e6..cf0d3d9ee84d5 100644 --- a/pkg/network/tracer/offsetguess_test.go +++ b/pkg/network/tracer/offsetguess_test.go @@ -15,13 +15,12 @@ import ( "testing" "time" + manager "github.com/DataDog/ebpf-manager" "github.com/cilium/ebpf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" - manager "github.com/DataDog/ebpf-manager" - "github.com/DataDog/datadog-agent/pkg/ebpf/bytecode/runtime" "github.com/DataDog/datadog-agent/pkg/ebpf/ebpftest" "github.com/DataDog/datadog-agent/pkg/ebpf/maps" @@ -38,7 +37,7 @@ import ( //go:generate $GOPATH/bin/include_headers pkg/network/ebpf/c/runtime/offsetguess-test.c pkg/ebpf/bytecode/build/runtime/offsetguess-test.c pkg/ebpf/c pkg/ebpf/c/protocols pkg/network/ebpf/c/runtime pkg/network/ebpf/c //go:generate $GOPATH/bin/integrity pkg/ebpf/bytecode/build/runtime/offsetguess-test.c pkg/ebpf/bytecode/runtime/offsetguess-test.go runtime -type offsetT int +type offsetT uint32 const ( offsetSaddr offsetT = iota diff --git a/pkg/network/tracer/tracer.go b/pkg/network/tracer/tracer.go index 56eaf657a802f..cc10d988a4f48 100644 --- a/pkg/network/tracer/tracer.go +++ b/pkg/network/tracer/tracer.go @@ -250,46 +250,55 @@ func (t *Tracer) start() error { return nil } +func loadEbpfConntracker(cfg *config.Config, telemetryComponent telemetryComponent.Component) (netlink.Conntracker, error) { + if !cfg.EnableEbpfConntracker { + log.Info("ebpf conntracker disabled") + return nil, nil + } + + if err := netlink.LoadNfConntrackKernelModule(cfg); err != nil { + log.Warnf("failed to load conntrack kernel module, though it may already be loaded: %s", err) + } + + return NewEBPFConntracker(cfg, telemetryComponent) +} + func newConntracker(cfg *config.Config, telemetryComponent telemetryComponent.Component) (netlink.Conntracker, error) { if !cfg.EnableConntrack { return netlink.NewNoOpConntracker(), nil } + var clb netlink.Conntracker var c netlink.Conntracker var err error - if !cfg.EnableEbpfless { - ns, err := cfg.GetRootNetNs() - if err != nil { - log.Warnf("error fetching root net namespace, will not attempt to load nf_conntrack_netlink module: %s", err) - } else { - defer ns.Close() - if err = netlink.LoadNfConntrackKernelModule(ns); err != nil { - log.Warnf("failed to load conntrack kernel module, though it may already be loaded: %s", err) - } - } - if cfg.EnableEbpfConntracker { - if c, err = NewEBPFConntracker(cfg, telemetryComponent); err == nil { - return c, nil - } + if c, err = loadEbpfConntracker(cfg, telemetryComponent); err != nil { log.Warnf("error initializing ebpf conntracker: %s", err) - } else { - log.Info("ebpf conntracker disabled") + log.Info("falling back to netlink conntracker") } - log.Info("falling back to netlink conntracker") + if clb, err = newCiliumLoadBalancerConntracker(cfg); err != nil { + log.Warnf("cilium lb conntracker is enabled, but failed to load: %s", err) + } } - if c, err = netlink.NewConntracker(cfg, telemetryComponent); err == nil { - return c, nil + if c == nil { + if c, err = netlink.NewConntracker(cfg, telemetryComponent); err != nil { + if errors.Is(err, netlink.ErrNotPermitted) || cfg.IgnoreConntrackInitFailure { + log.Warnf("could not initialize netlink conntracker: %s", err) + } else { + return nil, fmt.Errorf("error initializing conntracker: %s. set network_config.ignore_conntrack_init_failure to true to ignore conntrack failures on startup", err) + } + } } - if errors.Is(err, netlink.ErrNotPermitted) || cfg.IgnoreConntrackInitFailure { - log.Warnf("could not initialize conntrack, tracer will continue without NAT tracking: %s", err) - return netlink.NewNoOpConntracker(), nil + c = chainConntrackers(c, clb) + if c.GetType() == "" { + // no-op conntracker + log.Warnf("connection tracking is disabled") } - return nil, fmt.Errorf("error initializing conntracker: %s. set network_config.ignore_conntrack_init_failure to true to ignore conntrack failures on startup", err) + return c, nil } func newReverseDNS(c *config.Config, telemetrycomp telemetryComponent.Component) dns.ReverseDNS {