Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usm: map cleaner: Use batch operations #20907

Merged
185 changes: 93 additions & 92 deletions pkg/ebpf/map_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,70 @@
package ebpf

import (
"bytes"
"encoding"
"encoding/binary"
"errors"
"fmt"
"reflect"
"sync"
"time"
"unsafe"

cebpf "github.com/cilium/ebpf"

"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/native"
)

// MapCleaner is responsible for periodically sweeping an eBPF map
// and deleting entries that satisfy a certain predicate function supplied by the user
type MapCleaner struct {
emap *cebpf.Map
key interface{}
val interface{}
once sync.Once
type MapCleaner[K any, V any] struct {
emap *cebpf.Map
keyBatch []K
valuesBatch []V
keysToDelete []K
brycekahle marked this conversation as resolved.
Show resolved Hide resolved

// we resort to unsafe.Pointers because by doing so the underlying eBPF
// library avoids marshaling the key/value variables while traversing the map
Comment on lines -35 to -36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-add the comment (not necessarily here) as this is still relevant. You're still passing a unsafe.Pointer to MapIterator.Next().
The reason why this was done originally was to avoid the marshalling cost as noted in the comment.

keyPtr unsafe.Pointer
valPtr unsafe.Pointer
once sync.Once

// termination
stopOnce sync.Once
done chan struct{}
}

// NewMapCleaner instantiates a new MapCleaner
func NewMapCleaner(emap *cebpf.Map, key, val interface{}) (*MapCleaner, error) {
// we force types to be of pointer kind because of the reasons mentioned above
if reflect.ValueOf(key).Kind() != reflect.Ptr {
return nil, fmt.Errorf("%T is not a pointer kind", key)
func NewMapCleaner[K any, V any](emap *cebpf.Map, defaultBatchSize uint32) (*MapCleaner[K, V], error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/suggestion: let's try to be more strict with the type constraints here. For example, we don't want pointers, interfaces etc since this would cause issues with the code downstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to look for a way of doing that, but couldn't find such a way (to exclude interface / pointer)
if you have a way - I'd be happy to learn
having said that - we still have a guard - UTs

batchSize := defaultBatchSize
if defaultBatchSize > emap.MaxEntries() {
batchSize = emap.MaxEntries()
}
if reflect.ValueOf(val).Kind() != reflect.Ptr {
return nil, fmt.Errorf("%T is not a pointer kind", val)
if batchSize == 0 {
batchSize = 1
}

return &MapCleaner{
emap: emap,
key: key,
val: val,
keyPtr: unsafe.Pointer(reflect.ValueOf(key).Elem().Addr().Pointer()),
valPtr: unsafe.Pointer(reflect.ValueOf(val).Elem().Addr().Pointer()),
Comment on lines -59 to -60
Copy link
Member

@p-lambert p-lambert Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's really nice to see a bunch of hacks go away now that we have generics :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the use of generics introduced improvement, as we don't need marshalBytes!

done: make(chan struct{}),
return &MapCleaner[K, V]{
emap: emap,
keyBatch: make([]K, batchSize),
valuesBatch: make([]V, batchSize),
keysToDelete: make([]K, 0, batchSize),
done: make(chan struct{}),
}, nil
}

// Clean eBPF map
// `interval` determines how often the eBPF map is scanned;
// `shouldClean` is a predicate method that determines whether or not a certain
// `shouldClean` is a predicate method that determines whether a certain
// map entry should be deleted. the callback argument `nowTS` can be directly
// compared to timestamps generated using the `bpf_ktime_get_ns()` helper;
func (mc *MapCleaner) Clean(interval time.Duration, shouldClean func(nowTS int64, k, v interface{}) bool) {
func (mc *MapCleaner[K, V]) Clean(interval time.Duration, shouldClean func(nowTS int64, k K, v V) bool) {
if mc == nil {
return
}

// Since kernel 5.6, the eBPF library supports batch operations on maps, which reduces the number of syscalls
// required to clean the map. We use the new batch operations if the kernel version is >= 5.6, and fallback to
// the old method otherwise. The new API is also more efficient because it minimizes the number of allocations.
cleaner := mc.cleanWithoutBatches
if version, err := kernel.HostVersion(); err == nil && version >= kernel.VersionCode(5, 6, 0) {
cleaner = mc.cleanWithBatches
}
p-lambert marked this conversation as resolved.
Show resolved Hide resolved

var err error
now := int64(0)
mc.once.Do(func() {
ticker := time.NewTicker(interval)
go func() {
Expand All @@ -80,22 +80,21 @@ func (mc *MapCleaner) Clean(interval time.Duration, shouldClean func(nowTS int64
for {
select {
case <-ticker.C:
now, err := NowNanoseconds()
now, err = NowNanoseconds()
if err != nil {
break
}
mc.clean(now, shouldClean)
cleaner(now, shouldClean)
case <-mc.done:
return
}
}

}()
})
}

// Stop stops the map cleaner
func (mc *MapCleaner) Stop() {
func (mc *MapCleaner[K, V]) Stop() {
if mc == nil {
return
}
Expand All @@ -107,85 +106,87 @@ func (mc *MapCleaner) Stop() {
})
}

func (mc *MapCleaner) clean(nowTS int64, shouldClean func(nowTS int64, k, v interface{}) bool) {
keySize := int(mc.emap.KeySize())
keysToDelete := make([][]byte, 0, 128)
totalCount, deletedCount := 0, 0
func (mc *MapCleaner[K, V]) cleanWithBatches(nowTS int64, shouldClean func(nowTS int64, k K, v V) bool) {
now := time.Now()

entries := mc.emap.Iterate()
for entries.Next(mc.keyPtr, mc.valPtr) {
totalCount++

if !shouldClean(nowTS, mc.key, mc.val) {
continue
mc.keysToDelete = mc.keysToDelete[:0]
totalCount, deletedCount := 0, 0
var next K
var n int
for {
Copy link
Member

@p-lambert p-lambert Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to execute this in a loop? I think it's not that unlikely that in a real-world workload this could run forever which would cause the MapCleaner to hang during termination, no?
In this pathological scenario you would supply also a stale timestamp to the callback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about running forever / adding timed context here
but instead, we can "halt" if we processed mc.emap.MaxEntries
what do you think about that?

Along side with aborting the loop when we got 0 entries from the batch api, it will guarantee we won't be stuck forever

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable to me 👍

n, _ = mc.emap.BatchLookup(next, &next, mc.keyBatch, mc.valuesBatch, nil)
if n == 0 {
break
}

marshalledKey, err := marshalBytes(mc.key, keySize)
if err != nil {
continue
totalCount += n
for i := 0; i < n; i++ {
if !shouldClean(nowTS, mc.keyBatch[i], mc.valuesBatch[i]) {
continue
}
mc.keysToDelete = append(mc.keysToDelete, mc.keyBatch[i])
}

// we accumulate alll keys to delete because it isn't safe to delete map
// entries during the traversal. the main downside of doing so is that all
// fields from the key type must be exported in order to be marshaled (unless
// the key type implements the `encoding.BinaryMarshaler` interface)
keysToDelete = append(keysToDelete, marshalledKey)
}

for _, key := range keysToDelete {
err := mc.emap.Delete(key)
if err == nil {
deletedCount++
if len(mc.keysToDelete) > 0 {
count, err := mc.emap.BatchDelete(mc.keysToDelete, nil)
if err != nil {
log.Debugf("failed to delete map entries: %v", err)
return
}
deletedCount += count
}

iterationErr := entries.Err()
elapsed := time.Since(now)
log.Debugf(
"finished cleaning map=%s entries_checked=%d entries_deleted=%d iteration_error=%v elapsed=%s",
"finished cleaning map=%s entries_checked=%d entries_deleted=%d elapsed=%s",
mc.emap,
totalCount,
deletedCount,
iterationErr,
elapsed,
)
mc.shrinkKeysToDelete()
}

// marshalBytes converts an arbitrary value into a byte buffer.
//
// Returns an error if the given value isn't representable in exactly
// length bytes.
//
// copied from: https://github.com/cilium/ebpf/blob/master/marshalers.go
func marshalBytes(data interface{}, length int) (buf []byte, err error) {
if data == nil {
return nil, errors.New("can't marshal a nil value")
}
func (mc *MapCleaner[K, V]) cleanWithoutBatches(nowTS int64, shouldClean func(nowTS int64, k K, v V) bool) {
now := time.Now()

switch value := data.(type) {
case encoding.BinaryMarshaler:
buf, err = value.MarshalBinary()
case string:
buf = []byte(value)
case []byte:
buf = value
case unsafe.Pointer:
err = errors.New("can't marshal from unsafe.Pointer")
default:
var wr bytes.Buffer
err = binary.Write(&wr, native.Endian, value)
if err != nil {
err = fmt.Errorf("encoding %T: %v", value, err)
mc.keysToDelete = mc.keysToDelete[:0]
totalCount, deletedCount := 0, 0

entries := mc.emap.Iterate()
// we resort to unsafe.Pointers because by doing so the underlying eBPF
// library avoids marshaling the key/value variables while traversing the map
for entries.Next(unsafe.Pointer(&mc.keyBatch[0]), unsafe.Pointer(&mc.valuesBatch[0])) {
totalCount++
if !shouldClean(nowTS, mc.keyBatch[0], mc.valuesBatch[0]) {
continue
}
buf = wr.Bytes()
mc.keysToDelete = append(mc.keysToDelete, mc.keyBatch[0])
}
if err != nil {
return nil, err

for _, key := range mc.keysToDelete {
err := mc.emap.Delete(unsafe.Pointer(&key))
if err == nil {
deletedCount++
}
}

if len(buf) != length {
return nil, fmt.Errorf("%T doesn't marshal to %d bytes", data, length)
elapsed := time.Since(now)
log.Debugf(
"finished cleaning map=%s entries_checked=%d entries_deleted=%d elapsed=%s",
mc.emap,
totalCount,
deletedCount,
elapsed,
)

mc.shrinkKeysToDelete()
}

func (mc *MapCleaner[K, V]) shrinkKeysToDelete() {
// If the number of keys to delete is greater than the batch size, shrinking its capacity
// by `mc.keysToDelete[:len(mc.keyBatch):len(mc.keyBatch)]` (similar to slices.Clip), and then resetting its length to 0
if len(mc.keysToDelete) > len(mc.keyBatch) {
mc.keysToDelete = mc.keysToDelete[:0:len(mc.keyBatch)]
brycekahle marked this conversation as resolved.
Show resolved Hide resolved
}
return buf, nil
}
Loading
Loading