Skip to content

Commit

Permalink
usm: map cleaner: Use batch operations
Browse files Browse the repository at this point in the history
Using batch operations allowing us to cut allocations, memory pressure and general runtime by 50%.
  • Loading branch information
guyarb committed Nov 16, 2023
1 parent aaecb26 commit 404495b
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 134 deletions.
177 changes: 82 additions & 95 deletions pkg/ebpf/map_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,67 @@
package ebpf

import (
"bytes"
"encoding"
"encoding/binary"
"errors"
"fmt"
"reflect"
"sync"
"time"
"unsafe"

cebpf "github.com/cilium/ebpf"

"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/native"
)

// MapCleaner is responsible for periodically sweeping an eBPF map
// and deleting entries that satisfy a certain predicate function supplied by the user
type MapCleaner struct {
emap *cebpf.Map
key interface{}
val interface{}
once sync.Once
type MapCleaner[K any, V any] struct {
emap *cebpf.Map
keyBatch []K
valuesBatch []V
keysToDelete []K

// we resort to unsafe.Pointers because by doing so the underlying eBPF
// library avoids marshaling the key/value variables while traversing the map
keyPtr unsafe.Pointer
valPtr unsafe.Pointer
once sync.Once

// termination
stopOnce sync.Once
done chan struct{}
}

// NewMapCleaner instantiates a new MapCleaner
func NewMapCleaner(emap *cebpf.Map, key, val interface{}) (*MapCleaner, error) {
// we force types to be of pointer kind because of the reasons mentioned above
if reflect.ValueOf(key).Kind() != reflect.Ptr {
return nil, fmt.Errorf("%T is not a pointer kind", key)
func NewMapCleaner[K any, V any](emap *cebpf.Map, defaultBatchSize uint32) (*MapCleaner[K, V], error) {
batchSize := defaultBatchSize
if defaultBatchSize > emap.MaxEntries() {
batchSize = emap.MaxEntries()
}
if reflect.ValueOf(val).Kind() != reflect.Ptr {
return nil, fmt.Errorf("%T is not a pointer kind", val)
if batchSize == 0 {
batchSize = 1
}

return &MapCleaner{
emap: emap,
key: key,
val: val,
keyPtr: unsafe.Pointer(reflect.ValueOf(key).Elem().Addr().Pointer()),
valPtr: unsafe.Pointer(reflect.ValueOf(val).Elem().Addr().Pointer()),
done: make(chan struct{}),
return &MapCleaner[K, V]{
emap: emap,
keyBatch: make([]K, batchSize),
valuesBatch: make([]V, batchSize),
keysToDelete: make([]K, 0, batchSize),
done: make(chan struct{}),
}, nil
}

// Clean eBPF map
// `interval` determines how often the eBPF map is scanned;
// `shouldClean` is a predicate method that determines whether or not a certain
// `shouldClean` is a predicate method that determines whether a certain
// map entry should be deleted. the callback argument `nowTS` can be directly
// compared to timestamps generated using the `bpf_ktime_get_ns()` helper;
func (mc *MapCleaner) Clean(interval time.Duration, shouldClean func(nowTS int64, k, v interface{}) bool) {
func (mc *MapCleaner[K, V]) Clean(interval time.Duration, shouldClean func(nowTS int64, k K, v V) bool, useTimeStamps bool) {
if mc == nil {
return
}

cleaner := mc.cleanWithoutBatches
if version, err := kernel.HostVersion(); err == nil && version >= kernel.VersionCode(5, 6, 0) {
cleaner = mc.cleanWithBatches
}

var err error
now := int64(0)
mc.once.Do(func() {
ticker := time.NewTicker(interval)
go func() {
Expand All @@ -80,22 +77,23 @@ func (mc *MapCleaner) Clean(interval time.Duration, shouldClean func(nowTS int64
for {
select {
case <-ticker.C:
now, err := NowNanoseconds()
if err != nil {
break
if useTimeStamps {
now, err = NowNanoseconds()
if err != nil {
break
}
}
mc.clean(now, shouldClean)
cleaner(now, shouldClean)
case <-mc.done:
return
}
}

}()
})
}

// Stop stops the map cleaner
func (mc *MapCleaner) Stop() {
func (mc *MapCleaner[K, V]) Stop() {
if mc == nil {
return
}
Expand All @@ -107,85 +105,74 @@ func (mc *MapCleaner) Stop() {
})
}

func (mc *MapCleaner) clean(nowTS int64, shouldClean func(nowTS int64, k, v interface{}) bool) {
keySize := int(mc.emap.KeySize())
keysToDelete := make([][]byte, 0, 128)
totalCount, deletedCount := 0, 0
func (mc *MapCleaner[K, V]) cleanWithBatches(nowTS int64, shouldClean func(nowTS int64, k K, v V) bool) {
now := time.Now()

entries := mc.emap.Iterate()
for entries.Next(mc.keyPtr, mc.valPtr) {
totalCount++

if !shouldClean(nowTS, mc.key, mc.val) {
continue
mc.keysToDelete = mc.keysToDelete[:0]
totalCount, deletedCount := 0, 0
var next K
var n int
for {
n, _ = mc.emap.BatchLookup(next, &next, mc.keyBatch, mc.valuesBatch, nil)
if n == 0 {
break
}

marshalledKey, err := marshalBytes(mc.key, keySize)
if err != nil {
continue
totalCount += n
for i := 0; i < n; i++ {
if !shouldClean(nowTS, mc.keyBatch[i], mc.valuesBatch[i]) {
continue
}
mc.keysToDelete = append(mc.keysToDelete, mc.keyBatch[i])
}

// we accumulate alll keys to delete because it isn't safe to delete map
// entries during the traversal. the main downside of doing so is that all
// fields from the key type must be exported in order to be marshaled (unless
// the key type implements the `encoding.BinaryMarshaler` interface)
keysToDelete = append(keysToDelete, marshalledKey)
}

for _, key := range keysToDelete {
err := mc.emap.Delete(key)
if err == nil {
deletedCount++
if len(mc.keysToDelete) > 0 {
count, err := mc.emap.BatchDelete(mc.keysToDelete, nil)
if err != nil {
log.Debugf("failed to delete map entries: %v", err)
return
}
deletedCount += count
}

iterationErr := entries.Err()
elapsed := time.Since(now)
log.Debugf(
"finished cleaning map=%s entries_checked=%d entries_deleted=%d iteration_error=%v elapsed=%s",
"finished cleaning map=%s entries_checked=%d entries_deleted=%d elapsed=%s",
mc.emap,
totalCount,
deletedCount,
iterationErr,
elapsed,
)
}

// marshalBytes converts an arbitrary value into a byte buffer.
//
// Returns an error if the given value isn't representable in exactly
// length bytes.
//
// copied from: https://github.com/cilium/ebpf/blob/master/marshalers.go
func marshalBytes(data interface{}, length int) (buf []byte, err error) {
if data == nil {
return nil, errors.New("can't marshal a nil value")
}
func (mc *MapCleaner[K, V]) cleanWithoutBatches(nowTS int64, shouldClean func(nowTS int64, k K, v V) bool) {
now := time.Now()

switch value := data.(type) {
case encoding.BinaryMarshaler:
buf, err = value.MarshalBinary()
case string:
buf = []byte(value)
case []byte:
buf = value
case unsafe.Pointer:
err = errors.New("can't marshal from unsafe.Pointer")
default:
var wr bytes.Buffer
err = binary.Write(&wr, native.Endian, value)
if err != nil {
err = fmt.Errorf("encoding %T: %v", value, err)
mc.keysToDelete = mc.keysToDelete[:0]
totalCount, deletedCount := 0, 0

entries := mc.emap.Iterate()
for entries.Next(unsafe.Pointer(&mc.keyBatch[0]), unsafe.Pointer(&mc.valuesBatch[0])) {
totalCount++
if !shouldClean(nowTS, mc.keyBatch[0], mc.valuesBatch[0]) {
continue
}
buf = wr.Bytes()
}
if err != nil {
return nil, err
mc.keysToDelete = append(mc.keysToDelete, mc.keyBatch[0])
}

if len(buf) != length {
return nil, fmt.Errorf("%T doesn't marshal to %d bytes", data, length)
for _, key := range mc.keysToDelete {
err := mc.emap.Delete(unsafe.Pointer(&key))
if err == nil {
deletedCount++
}
}
return buf, nil

elapsed := time.Since(now)
log.Debugf(
"finished cleaning map=%s entries_checked=%d entries_deleted=%d elapsed=%s",
mc.emap,
totalCount,
deletedCount,
elapsed,
)
}
Loading

0 comments on commit 404495b

Please sign in to comment.