From 404495b3f6a74fa4ad7c1819d73dd5cfcba29104 Mon Sep 17 00:00:00 2001 From: Guy Arbitman Date: Thu, 16 Nov 2023 10:07:41 +0200 Subject: [PATCH] usm: map cleaner: Use batch operations Using batch operations allowing us to cut allocations, memory pressure and general runtime by 50%. --- pkg/ebpf/map_cleaner.go | 177 +++++++++++------------- pkg/ebpf/map_cleaner_test.go | 113 ++++++++++++++- pkg/network/protocols/http/protocol.go | 17 +-- pkg/network/protocols/http2/protocol.go | 17 +-- pkg/network/usm/ebpf_main.go | 18 +-- 5 files changed, 208 insertions(+), 134 deletions(-) diff --git a/pkg/ebpf/map_cleaner.go b/pkg/ebpf/map_cleaner.go index 8f2cce3879b82d..4c87923a46fde1 100644 --- a/pkg/ebpf/map_cleaner.go +++ b/pkg/ebpf/map_cleaner.go @@ -8,34 +8,25 @@ package ebpf import ( - "bytes" - "encoding" - "encoding/binary" - "errors" - "fmt" - "reflect" "sync" "time" "unsafe" cebpf "github.com/cilium/ebpf" + "github.com/DataDog/datadog-agent/pkg/util/kernel" "github.com/DataDog/datadog-agent/pkg/util/log" - "github.com/DataDog/datadog-agent/pkg/util/native" ) // MapCleaner is responsible for periodically sweeping an eBPF map // and deleting entries that satisfy a certain predicate function supplied by the user -type MapCleaner struct { - emap *cebpf.Map - key interface{} - val interface{} - once sync.Once +type MapCleaner[K any, V any] struct { + emap *cebpf.Map + keyBatch []K + valuesBatch []V + keysToDelete []K - // we resort to unsafe.Pointers because by doing so the underlying eBPF - // library avoids marshaling the key/value variables while traversing the map - keyPtr unsafe.Pointer - valPtr unsafe.Pointer + once sync.Once // termination stopOnce sync.Once @@ -43,35 +34,41 @@ type MapCleaner struct { } // NewMapCleaner instantiates a new MapCleaner -func NewMapCleaner(emap *cebpf.Map, key, val interface{}) (*MapCleaner, error) { - // we force types to be of pointer kind because of the reasons mentioned above - if reflect.ValueOf(key).Kind() != reflect.Ptr { - return nil, fmt.Errorf("%T is not a pointer kind", key) +func NewMapCleaner[K any, V any](emap *cebpf.Map, defaultBatchSize uint32) (*MapCleaner[K, V], error) { + batchSize := defaultBatchSize + if defaultBatchSize > emap.MaxEntries() { + batchSize = emap.MaxEntries() } - if reflect.ValueOf(val).Kind() != reflect.Ptr { - return nil, fmt.Errorf("%T is not a pointer kind", val) + if batchSize == 0 { + batchSize = 1 } - return &MapCleaner{ - emap: emap, - key: key, - val: val, - keyPtr: unsafe.Pointer(reflect.ValueOf(key).Elem().Addr().Pointer()), - valPtr: unsafe.Pointer(reflect.ValueOf(val).Elem().Addr().Pointer()), - done: make(chan struct{}), + return &MapCleaner[K, V]{ + emap: emap, + keyBatch: make([]K, batchSize), + valuesBatch: make([]V, batchSize), + keysToDelete: make([]K, 0, batchSize), + done: make(chan struct{}), }, nil } // Clean eBPF map // `interval` determines how often the eBPF map is scanned; -// `shouldClean` is a predicate method that determines whether or not a certain +// `shouldClean` is a predicate method that determines whether a certain // map entry should be deleted. the callback argument `nowTS` can be directly // compared to timestamps generated using the `bpf_ktime_get_ns()` helper; -func (mc *MapCleaner) Clean(interval time.Duration, shouldClean func(nowTS int64, k, v interface{}) bool) { +func (mc *MapCleaner[K, V]) Clean(interval time.Duration, shouldClean func(nowTS int64, k K, v V) bool, useTimeStamps bool) { if mc == nil { return } + cleaner := mc.cleanWithoutBatches + if version, err := kernel.HostVersion(); err == nil && version >= kernel.VersionCode(5, 6, 0) { + cleaner = mc.cleanWithBatches + } + + var err error + now := int64(0) mc.once.Do(func() { ticker := time.NewTicker(interval) go func() { @@ -80,22 +77,23 @@ func (mc *MapCleaner) Clean(interval time.Duration, shouldClean func(nowTS int64 for { select { case <-ticker.C: - now, err := NowNanoseconds() - if err != nil { - break + if useTimeStamps { + now, err = NowNanoseconds() + if err != nil { + break + } } - mc.clean(now, shouldClean) + cleaner(now, shouldClean) case <-mc.done: return } } - }() }) } // Stop stops the map cleaner -func (mc *MapCleaner) Stop() { +func (mc *MapCleaner[K, V]) Stop() { if mc == nil { return } @@ -107,85 +105,74 @@ func (mc *MapCleaner) Stop() { }) } -func (mc *MapCleaner) clean(nowTS int64, shouldClean func(nowTS int64, k, v interface{}) bool) { - keySize := int(mc.emap.KeySize()) - keysToDelete := make([][]byte, 0, 128) - totalCount, deletedCount := 0, 0 +func (mc *MapCleaner[K, V]) cleanWithBatches(nowTS int64, shouldClean func(nowTS int64, k K, v V) bool) { now := time.Now() - entries := mc.emap.Iterate() - for entries.Next(mc.keyPtr, mc.valPtr) { - totalCount++ - - if !shouldClean(nowTS, mc.key, mc.val) { - continue + mc.keysToDelete = mc.keysToDelete[:0] + totalCount, deletedCount := 0, 0 + var next K + var n int + for { + n, _ = mc.emap.BatchLookup(next, &next, mc.keyBatch, mc.valuesBatch, nil) + if n == 0 { + break } - marshalledKey, err := marshalBytes(mc.key, keySize) - if err != nil { - continue + totalCount += n + for i := 0; i < n; i++ { + if !shouldClean(nowTS, mc.keyBatch[i], mc.valuesBatch[i]) { + continue + } + mc.keysToDelete = append(mc.keysToDelete, mc.keyBatch[i]) } - - // we accumulate alll keys to delete because it isn't safe to delete map - // entries during the traversal. the main downside of doing so is that all - // fields from the key type must be exported in order to be marshaled (unless - // the key type implements the `encoding.BinaryMarshaler` interface) - keysToDelete = append(keysToDelete, marshalledKey) } - - for _, key := range keysToDelete { - err := mc.emap.Delete(key) - if err == nil { - deletedCount++ + if len(mc.keysToDelete) > 0 { + count, err := mc.emap.BatchDelete(mc.keysToDelete, nil) + if err != nil { + log.Debugf("failed to delete map entries: %v", err) + return } + deletedCount += count } - iterationErr := entries.Err() elapsed := time.Since(now) log.Debugf( - "finished cleaning map=%s entries_checked=%d entries_deleted=%d iteration_error=%v elapsed=%s", + "finished cleaning map=%s entries_checked=%d entries_deleted=%d elapsed=%s", mc.emap, totalCount, deletedCount, - iterationErr, elapsed, ) } -// marshalBytes converts an arbitrary value into a byte buffer. -// -// Returns an error if the given value isn't representable in exactly -// length bytes. -// -// copied from: https://github.com/cilium/ebpf/blob/master/marshalers.go -func marshalBytes(data interface{}, length int) (buf []byte, err error) { - if data == nil { - return nil, errors.New("can't marshal a nil value") - } +func (mc *MapCleaner[K, V]) cleanWithoutBatches(nowTS int64, shouldClean func(nowTS int64, k K, v V) bool) { + now := time.Now() - switch value := data.(type) { - case encoding.BinaryMarshaler: - buf, err = value.MarshalBinary() - case string: - buf = []byte(value) - case []byte: - buf = value - case unsafe.Pointer: - err = errors.New("can't marshal from unsafe.Pointer") - default: - var wr bytes.Buffer - err = binary.Write(&wr, native.Endian, value) - if err != nil { - err = fmt.Errorf("encoding %T: %v", value, err) + mc.keysToDelete = mc.keysToDelete[:0] + totalCount, deletedCount := 0, 0 + + entries := mc.emap.Iterate() + for entries.Next(unsafe.Pointer(&mc.keyBatch[0]), unsafe.Pointer(&mc.valuesBatch[0])) { + totalCount++ + if !shouldClean(nowTS, mc.keyBatch[0], mc.valuesBatch[0]) { + continue } - buf = wr.Bytes() - } - if err != nil { - return nil, err + mc.keysToDelete = append(mc.keysToDelete, mc.keyBatch[0]) } - if len(buf) != length { - return nil, fmt.Errorf("%T doesn't marshal to %d bytes", data, length) + for _, key := range mc.keysToDelete { + err := mc.emap.Delete(unsafe.Pointer(&key)) + if err == nil { + deletedCount++ + } } - return buf, nil + + elapsed := time.Since(now) + log.Debugf( + "finished cleaning map=%s entries_checked=%d entries_deleted=%d elapsed=%s", + mc.emap, + totalCount, + deletedCount, + elapsed, + ) } diff --git a/pkg/ebpf/map_cleaner_test.go b/pkg/ebpf/map_cleaner_test.go index f3249584f47d49..bd86c17fb468fa 100644 --- a/pkg/ebpf/map_cleaner_test.go +++ b/pkg/ebpf/map_cleaner_test.go @@ -8,15 +8,28 @@ package ebpf import ( + "os" "testing" "time" + "github.com/cihub/seelog" cebpf "github.com/cilium/ebpf" "github.com/cilium/ebpf/rlimit" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/util/log" ) +func TestMain(m *testing.M) { + logLevel := os.Getenv("DD_LOG_LEVEL") + if logLevel == "" { + logLevel = "warn" + } + log.SetupLogger(seelog.Default, logLevel) + os.Exit(m.Run()) +} + func TestMapCleaner(t *testing.T) { const numMapEntries = 100 @@ -36,7 +49,7 @@ func TestMapCleaner(t *testing.T) { }) require.NoError(t, err) - cleaner, err := NewMapCleaner(m, key, val) + cleaner, err := NewMapCleaner[int64, int64](m, 10) require.NoError(t, err) for i := 0; i < numMapEntries; i++ { *key = int64(i) @@ -45,10 +58,9 @@ func TestMapCleaner(t *testing.T) { } // Clean all the even entries - cleaner.Clean(100*time.Millisecond, func(now int64, k, v interface{}) bool { - key := k.(*int64) - return *key%2 == 0 - }) + cleaner.Clean(100*time.Millisecond, func(now int64, k int64, v int64) bool { + return k%2 == 0 + }, false) time.Sleep(1 * time.Second) cleaner.Stop() @@ -66,3 +78,94 @@ func TestMapCleaner(t *testing.T) { } } } + +func benchmarkBatchCleaner(b *testing.B, numMapEntries, batchSize uint32) { + var ( + key = new(int64) + val = new(int64) + ) + + err := rlimit.RemoveMemlock() + require.NoError(b, err) + + m, err := cebpf.NewMap(&cebpf.MapSpec{ + Type: cebpf.Hash, + KeySize: 8, + ValueSize: 8, + MaxEntries: numMapEntries, + }) + require.NoError(b, err) + + cleaner, err := NewMapCleaner[int64, int64](m, batchSize) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for i := uint32(0); i < numMapEntries; i++ { + *key = int64(i) + err := m.Put(key, val) + assert.Nilf(b, err, "can't put key=%d: %s", i, err) + } + + // Clean all the even entries + if batchSize == 0 { + cleaner.cleanWithoutBatches(0, func(now int64, k int64, v int64) bool { + return k%2 == 0 + }) + } else { + cleaner.cleanWithBatches(0, func(now int64, k int64, v int64) bool { + return k%2 == 0 + }) + } + for i := uint32(0); i < numMapEntries; i++ { + *key = int64(i) + err := m.Lookup(key, val) + + // If the entry is even, it should have been deleted + // otherwise it should be present + if i%2 == 0 { + assert.NotNilf(b, err, "entry key=%d should not be present", i) + } else { + assert.Nil(b, err) + } + } + } +} + +func BenchmarkBatchCleaner1000Entries10PerBatch(b *testing.B) { + benchmarkBatchCleaner(b, 1000, 10) +} + +func BenchmarkBatchCleaner1000Entries100PerBatch(b *testing.B) { + benchmarkBatchCleaner(b, 1000, 100) +} + +func BenchmarkBatchCleaner10000Entries100PerBatch(b *testing.B) { + benchmarkBatchCleaner(b, 10000, 100) +} + +func BenchmarkBatchCleaner10000Entries1000PerBatch(b *testing.B) { + benchmarkBatchCleaner(b, 10000, 1000) +} + +func BenchmarkBatchCleaner100000Entries100PerBatch(b *testing.B) { + benchmarkBatchCleaner(b, 100000, 100) +} + +func BenchmarkBatchCleaner100000Entries1000PerBatch(b *testing.B) { + benchmarkBatchCleaner(b, 100000, 1000) +} + +func BenchmarkCleaner1000Entries(b *testing.B) { + benchmarkBatchCleaner(b, 1000, 0) +} + +func BenchmarkCleaner10000Entries(b *testing.B) { + benchmarkBatchCleaner(b, 10000, 0) +} + +func BenchmarkCleaner100000Entries(b *testing.B) { + benchmarkBatchCleaner(b, 100000, 0) +} diff --git a/pkg/network/protocols/http/protocol.go b/pkg/network/protocols/http/protocol.go index 77f4cb6d235139..88e87fce0e0ac6 100644 --- a/pkg/network/protocols/http/protocol.go +++ b/pkg/network/protocols/http/protocol.go @@ -32,7 +32,7 @@ type protocol struct { cfg *config.Config telemetry *Telemetry statkeeper *StatKeeper - mapCleaner *ddebpf.MapCleaner + mapCleaner *ddebpf.MapCleaner[netebpf.ConnTuple, EbpfTx] eventsConsumer *events.Consumer } @@ -175,26 +175,21 @@ func (p *protocol) setupMapCleaner(mgr *manager.Manager) { log.Errorf("error getting http_in_flight map: %s", err) return } - mapCleaner, err := ddebpf.NewMapCleaner(httpMap, new(netebpf.ConnTuple), new(EbpfTx)) + mapCleaner, err := ddebpf.NewMapCleaner[netebpf.ConnTuple, EbpfTx](httpMap, 1024) if err != nil { log.Errorf("error creating map cleaner: %s", err) return } ttl := p.cfg.HTTPIdleConnectionTTL.Nanoseconds() - mapCleaner.Clean(p.cfg.HTTPMapCleanerInterval, func(now int64, key, val interface{}) bool { - httpTxn, ok := val.(*EbpfTx) - if !ok { - return false - } - - if updated := int64(httpTxn.Response_last_seen); updated > 0 { + mapCleaner.Clean(p.cfg.HTTPMapCleanerInterval, func(now int64, key netebpf.ConnTuple, val EbpfTx) bool { + if updated := int64(val.Response_last_seen); updated > 0 { return (now - updated) > ttl } - started := int64(httpTxn.Request_started) + started := int64(val.Request_started) return started > 0 && (now-started) > ttl - }) + }, true) p.mapCleaner = mapCleaner } diff --git a/pkg/network/protocols/http2/protocol.go b/pkg/network/protocols/http2/protocol.go index de59c21235b2b8..392e120187c825 100644 --- a/pkg/network/protocols/http2/protocol.go +++ b/pkg/network/protocols/http2/protocol.go @@ -35,7 +35,7 @@ type protocol struct { telemetry *http.Telemetry // TODO: Do we need to duplicate? statkeeper *http.StatKeeper - mapCleaner *ddebpf.MapCleaner + mapCleaner *ddebpf.MapCleaner[http2StreamKey, EbpfTx] eventsConsumer *events.Consumer } @@ -233,26 +233,21 @@ func (p *protocol) setupMapCleaner(mgr *manager.Manager) { log.Errorf("error getting %q map: %s", inFlightMap, err) return } - mapCleaner, err := ddebpf.NewMapCleaner(http2Map, new(http2StreamKey), new(EbpfTx)) + mapCleaner, err := ddebpf.NewMapCleaner[http2StreamKey, EbpfTx](http2Map, 1024) if err != nil { log.Errorf("error creating map cleaner: %s", err) return } ttl := p.cfg.HTTPIdleConnectionTTL.Nanoseconds() - mapCleaner.Clean(p.cfg.HTTPMapCleanerInterval, func(now int64, key, val interface{}) bool { - http2Txn, ok := val.(*EbpfTx) - if !ok { - return false - } - - if updated := int64(http2Txn.Response_last_seen); updated > 0 { + mapCleaner.Clean(p.cfg.HTTPMapCleanerInterval, func(now int64, key http2StreamKey, val EbpfTx) bool { + if updated := int64(val.Response_last_seen); updated > 0 { return (now - updated) > ttl } - started := int64(http2Txn.Request_started) + started := int64(val.Request_started) return started > 0 && (now-started) > ttl - }) + }, true) p.mapCleaner = mapCleaner } diff --git a/pkg/network/usm/ebpf_main.go b/pkg/network/usm/ebpf_main.go index 1800134adaf252..d73ac8c16e83e3 100644 --- a/pkg/network/usm/ebpf_main.go +++ b/pkg/network/usm/ebpf_main.go @@ -78,7 +78,7 @@ type ebpfProgram struct { disabledProtocols []*protocols.ProtocolSpec // Used for connection_protocol data expiration - mapCleaner *ddebpf.MapCleaner + mapCleaner *ddebpf.MapCleaner[netebpf.ConnTuple, netebpf.ProtocolStackWrapper] buildMode buildmode.Type } @@ -403,22 +403,16 @@ func (e *ebpfProgram) init(buf bytecode.AssetReader, options manager.Options) er const connProtoTTL = 3 * time.Minute const connProtoCleaningInterval = 5 * time.Minute -func (e *ebpfProgram) setupMapCleaner() (*ddebpf.MapCleaner, error) { - mapCleaner, err := ddebpf.NewMapCleaner(e.connectionProtocolMap, new(netebpf.ConnTuple), new(netebpf.ProtocolStackWrapper)) +func (e *ebpfProgram) setupMapCleaner() (*ddebpf.MapCleaner[netebpf.ConnTuple, netebpf.ProtocolStackWrapper], error) { + mapCleaner, err := ddebpf.NewMapCleaner[netebpf.ConnTuple, netebpf.ProtocolStackWrapper](e.connectionProtocolMap, 1024) if err != nil { return nil, err } ttl := connProtoTTL.Nanoseconds() - mapCleaner.Clean(connProtoCleaningInterval, func(now int64, key, val interface{}) bool { - protoStack, ok := val.(*netebpf.ProtocolStackWrapper) - if !ok { - return false - } - - updated := int64(protoStack.Updated) - return (now - updated) > ttl - }) + mapCleaner.Clean(connProtoCleaningInterval, func(now int64, key netebpf.ConnTuple, val netebpf.ProtocolStackWrapper) bool { + return (now - int64(val.Updated)) > ttl + }, true) return mapCleaner, nil }