Skip to content

Commit

Permalink
[NPM] Add cilium lb conntracker (DataDog#31842)
Browse files Browse the repository at this point in the history
  • Loading branch information
hmahmood authored Dec 20, 2024
1 parent 0c83327 commit 7f64411
Show file tree
Hide file tree
Showing 18 changed files with 852 additions and 386 deletions.
18 changes: 9 additions & 9 deletions cmd/system-probe/modules/network_tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ func TestDecode(t *testing.T) {
BufferedData: network.BufferedData{
Conns: []network.ConnectionStats{
{ConnectionTuple: network.ConnectionTuple{
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("10.2.2.2"),
Pid: 6000,
NetNS: 7,
SPort: 1000,
DPort: 9000,
Type: network.UDP,
Family: network.AFINET6,
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("10.2.2.2"),
Pid: 6000,
NetNS: 7,
SPort: 1000,
DPort: 9000,
Type: network.UDP,
Family: network.AFINET6,
Direction: network.LOCAL,
},
Monotonic: network.StatCounters{
SentBytes: 1,
Expand All @@ -54,7 +55,6 @@ func TestDecode(t *testing.T) {
ReplSrcPort: 40,
ReplDstPort: 70,
},
Direction: network.LOCAL,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions pkg/config/setup/system_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {
cfg.BindEnvAndSetDefault(join(netNS, "conntrack_init_timeout"), 10*time.Second)
cfg.BindEnvAndSetDefault(join(netNS, "allow_netlink_conntracker_fallback"), true)
cfg.BindEnvAndSetDefault(join(netNS, "enable_ebpf_conntracker"), true)
cfg.BindEnvAndSetDefault(join(netNS, "enable_cilium_lb_conntracker"), false)

cfg.BindEnvAndSetDefault(join(spNS, "source_excludes"), map[string][]string{})
cfg.BindEnvAndSetDefault(join(spNS, "dest_excludes"), map[string][]string{})
Expand Down
23 changes: 23 additions & 0 deletions pkg/ebpf/maps/generic_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,35 @@ func validateValueTypeForMapType[V any](t ebpf.MapType) error {
return nil
}

func validateKeyValueSizes[K, V any](m *ebpf.Map) error {
var k K
tk := reflect.TypeOf(k)
if tk.Size() != uintptr(m.KeySize()) {
return fmt.Errorf("map key size (%d) does not match key type (%T) size (%d)", m.KeySize(), k, tk.Size())
}
var v V
tv := reflect.TypeOf(v)
tvSize := tv.Size()
if isPerCPU(m.Type()) {
tvSize = tv.Elem().Size()
}
if tvSize != uintptr(m.ValueSize()) {
return fmt.Errorf("map value size (%d) does not match value type (%T) size (%d)", m.ValueSize(), v, tvSize)
}

return nil
}

// Map creates a new GenericMap from an existing ebpf.Map
func Map[K any, V any](m *ebpf.Map) (*GenericMap[K, V], error) {
if err := validateValueTypeForMapType[V](m.Type()); err != nil {
return nil, err
}

if err := validateKeyValueSizes[K, V](m); err != nil {
return nil, err
}

// See if we can perform binary.Read on the key type. If we can't we can't use the batch API
// for this map
var kval K
Expand Down
24 changes: 24 additions & 0 deletions pkg/ebpf/maps/generic_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,30 @@ func TestIterateWithPointerKey(t *testing.T) {
require.Equal(t, expectedNumbers, actualNumbers)
}

func TestValidateMapKeyValueSize(t *testing.T) {
m, err := ebpf.NewMap(&ebpf.MapSpec{
Type: ebpf.Hash,
MaxEntries: 1,
KeySize: 8,
ValueSize: 8,
})

require.NoError(t, err, "could not create map")
t.Cleanup(func() { m.Close() })

gm, err := Map[uint32, uint64](m)
assert.Error(t, err)
assert.Nil(t, gm)

gm2, err := Map[uint64, uint32](m)
assert.Error(t, err)
assert.Nil(t, gm2)

gm3, err := Map[uint64, uint64](m)
assert.NoError(t, err)
assert.NotNil(t, gm3)
}

func TestGenericHashMapCanUseBatchAPI(t *testing.T) {
hash, err := ebpf.NewMap(&ebpf.MapSpec{
Type: ebpf.Hash,
Expand Down
6 changes: 5 additions & 1 deletion pkg/network/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,12 @@ type Config struct {
// default is true
EnableConntrackAllNamespaces bool

// EnableEbpfConntracker enables the ebpf based network conntracker. Used only for testing at the moment
// EnableEbpfConntracker enables the ebpf based network conntracker
EnableEbpfConntracker bool

// EnableCiliumLBConntracker enables the cilium load balancer conntracker
EnableCiliumLBConntracker bool

// ClosedChannelSize specifies the size for closed channel for the tracer
ClosedChannelSize int

Expand Down Expand Up @@ -360,6 +363,7 @@ func New() *Config {
IgnoreConntrackInitFailure: cfg.GetBool(sysconfig.FullKeyPath(netNS, "ignore_conntrack_init_failure")),
ConntrackInitTimeout: cfg.GetDuration(sysconfig.FullKeyPath(netNS, "conntrack_init_timeout")),
EnableEbpfConntracker: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_ebpf_conntracker")),
EnableCiliumLBConntracker: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_cilium_lb_conntracker")),

EnableGatewayLookup: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_gateway_lookup")),

Expand Down
32 changes: 16 additions & 16 deletions pkg/network/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,15 @@ func TestSerialization(t *testing.T) {
BufferedData: network.BufferedData{
Conns: []network.ConnectionStats{
{ConnectionTuple: network.ConnectionTuple{
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("10.2.2.2"),
Pid: 6000,
NetNS: 7,
SPort: 1000,
DPort: 9000,
Type: network.TCP,
Family: network.AFINET6,
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("10.2.2.2"),
Pid: 6000,
NetNS: 7,
SPort: 1000,
DPort: 9000,
Type: network.TCP,
Family: network.AFINET6,
Direction: network.LOCAL,
},
Monotonic: network.StatCounters{
SentBytes: 1,
Expand All @@ -222,7 +223,6 @@ func TestSerialization(t *testing.T) {
ReplDstPort: 80,
},

Direction: network.LOCAL,
Via: &network.Via{
Subnet: network.Subnet{
Alias: "subnet-foo",
Expand All @@ -231,14 +231,14 @@ func TestSerialization(t *testing.T) {
ProtocolStack: protocols.Stack{Application: protocols.HTTP},
},
{ConnectionTuple: network.ConnectionTuple{
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("8.8.8.8"),
SPort: 1000,
DPort: 53,
Type: network.UDP,
Family: network.AFINET6,
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("8.8.8.8"),
SPort: 1000,
DPort: 53,
Type: network.UDP,
Family: network.AFINET6,
Direction: network.LOCAL,
},
Direction: network.LOCAL,
StaticTags: tagOpenSSL | tagTLS,
ProtocolStack: protocols.Stack{Application: protocols.HTTP2},
DNSStats: map[dns.Hostname]map[dns.QueryType]dns.Stats{
Expand Down
14 changes: 7 additions & 7 deletions pkg/network/encoding/marshal/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestFormatConnectionDNS(t *testing.T) {
BufferedData: network.BufferedData{
Conns: []network.ConnectionStats{
{ConnectionTuple: network.ConnectionTuple{
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("8.8.8.8"),
SPort: 1000,
DPort: 53,
Type: network.UDP,
Family: network.AFINET6,
},
Source: util.AddressFromString("10.1.1.1"),
Dest: util.AddressFromString("8.8.8.8"),
SPort: 1000,
DPort: 53,
Type: network.UDP,
Family: network.AFINET6,
Direction: network.LOCAL,
},
DNSStats: map[dns.Hostname]map[dns.QueryType]dns.Stats{
dns.ToHostname("foo.com"): {
dns.TypeA: {
Expand Down
18 changes: 9 additions & 9 deletions pkg/network/event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,15 @@ type StatCookie = uint64

// ConnectionTuple represents the unique network key for a connection
type ConnectionTuple struct {
Source util.Address
Dest util.Address
Pid uint32
NetNS uint32
SPort uint16
DPort uint16
Type ConnectionType
Family ConnectionFamily
Source util.Address
Dest util.Address
Pid uint32
NetNS uint32
SPort uint16
DPort uint16
Type ConnectionType
Family ConnectionFamily
Direction ConnectionDirection
}

func (c ConnectionTuple) String() string {
Expand Down Expand Up @@ -285,7 +286,6 @@ type ConnectionStats struct {
ProtocolStack protocols.Stack

// keep these fields last because they are 1 byte each and otherwise inflate the struct size due to alignment
Direction ConnectionDirection
SPortIsEphemeral EphemeralPortType
IntraHost bool
IsAssured bool
Expand Down
18 changes: 9 additions & 9 deletions pkg/network/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func TestBeautifyKey(t *testing.T) {
},
{
ConnectionTuple: ConnectionTuple{
Pid: 32065,
Type: 0,
Family: AFINET,
Source: util.AddressFromString("172.21.148.124"),
Dest: util.AddressFromString("130.211.21.187"),
SPort: 52012,
DPort: 443,
Pid: 32065,
Type: 0,
Family: AFINET,
Source: util.AddressFromString("172.21.148.124"),
Dest: util.AddressFromString("130.211.21.187"),
SPort: 52012,
DPort: 443,
Direction: 2,
},
Direction: 2,
Cookie: 2,
Cookie: 2,
},
} {
bk := c.ByteKey(buf)
Expand Down
6 changes: 2 additions & 4 deletions pkg/network/netlink/conntracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ import (
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/client_golang/prometheus"
"github.com/syndtr/gocapability/capability"
"golang.org/x/sys/unix"

"github.com/hashicorp/golang-lru/v2/simplelru"

telemetryComp "github.com/DataDog/datadog-agent/comp/core/telemetry"

"github.com/DataDog/datadog-agent/pkg/network"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
Expand All @@ -49,7 +47,7 @@ type Conntracker interface {
// Collect returns the current state of all metrics of the collector
Collect(metrics chan<- prometheus.Metric)
GetTranslationForConn(*network.ConnectionTuple) *network.IPTranslation
// GetType returns a string describing whether the conntracker is "ebpf" or "netlink"
// GetType returns a string describing the conntracker type
GetType() string
DeleteTranslation(*network.ConnectionTuple)
DumpCachedTable(context.Context) (map[uint32][]DebugConntrackEntry, error)
Expand Down
9 changes: 8 additions & 1 deletion pkg/network/netlink/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,14 @@ func (c *Consumer) dumpTable(family uint8, output chan Event, ns netns.NsHandle)

// LoadNfConntrackKernelModule requests a dummy connection tuple from netlink conntrack which is discarded but has
// the side effect of loading the nf_conntrack_netlink module
func LoadNfConntrackKernelModule(ns netns.NsHandle) error {
func LoadNfConntrackKernelModule(cfg *config.Config) error {
ns, err := cfg.GetRootNetNs()
if err != nil {
return fmt.Errorf("error fetching root net namespace, will not attempt to load nf_conntrack_netlink module: %w", err)
}

defer ns.Close()

sock, err := NewSocket(ns)
if err != nil {
ino, errIno := kernel.GetInoForNs(ns)
Expand Down
Loading

0 comments on commit 7f64411

Please sign in to comment.