Skip to content

Commit

Permalink
NETOBSERV-559: use LookupAndDelete to read maps
Browse files Browse the repository at this point in the history
Keep legacy code for old kernels

Do not base support detection on kernel version

Instead, just try and fallback to legacy when relevant
  • Loading branch information
jotak committed Mar 15, 2024
1 parent 58d01d9 commit da2f5e5
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 106 deletions.
160 changes: 97 additions & 63 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,18 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher")
// and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated
// in the map
type FlowFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
rttFentryLink link.Link
rttKprobeLink link.Link
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
rttFentryLink link.Link
rttKprobeLink link.Link
lookupAndDeleteSupported bool
}

type FlowFetcherConfig struct {
Expand Down Expand Up @@ -119,7 +120,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}

oldKernel := utils.IskernelOlderthan514()
oldKernel := utils.IsKernelOlderThan("5.14.0")
if oldKernel {
log.Infof("kernel older than 5.14.0 detected: not all hooks are supported")
}
objects, err := kernelSpecificLoadAndAssign(oldKernel, spec)
if err != nil {
return nil, err
Expand Down Expand Up @@ -165,17 +169,18 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}

return &FlowFetcher{
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink,
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
}, nil
}

Expand Down Expand Up @@ -404,35 +409,41 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
}

// LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it.
// It returns a map where the key
// For synchronization purposes, we get/delete a whole snapshot of the flows map.
// This way we avoid missing packets that could be updated on the
// ebpf side while we process/aggregate them here
// Changing this method invocation by BatchLookupAndDelete could improve performance
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
if !m.lookupAndDeleteSupported {
return m.legacyLookupAndDeleteMap(met)
}

flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var ids []BpfFlowId
var id BpfFlowId
var metrics []BpfFlowMetrics

count := 0
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
// First, get all ids and don't care about metrics (we need lookup+delete to be atomic)
for iterator.Next(&id, &metrics) {
ids = append(ids, id)
}

count := 0
// Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
for i, id := range ids {
count++
if err := flowMap.Delete(id); err != nil {
if err := flowMap.LookupAndDelete(&id, &metrics); err != nil {
if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
m.lookupAndDeleteSupported = false
return m.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
continue
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
flows[id] = metrics
}
met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows)))
Expand All @@ -451,16 +462,21 @@ func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) {
monotonicTimeNow := monotime.Now()
dnsMap := m.objects.DnsFlows
var dnsKey BpfDnsFlowId
var keysToDelete []BpfDnsFlowId
var dnsVal uint64

if dnsMap != nil {
// Ideally the Lookup + Delete should be atomic, however we cannot use LookupAndDelete since the deletion is conditional
// Do not delete while iterating, as it causes severe performance degradation
iterator := dnsMap.Iterate()
for iterator.Next(&dnsKey, &dnsVal) {
if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut {
if err := dnsMap.Delete(dnsKey); err != nil {
log.WithError(err).WithField("dnsKey", dnsKey).
Warnf("couldn't delete DNS record entry")
}
keysToDelete = append(keysToDelete, dnsKey)
}
}
for _, dnsKey = range keysToDelete {
if err := dnsMap.Delete(dnsKey); err != nil {
log.WithError(err).WithField("dnsKey", dnsKey).Warnf("couldn't delete DNS record entry")
}
}
}
Expand Down Expand Up @@ -529,14 +545,15 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf

// It provides access to packets from the kernel space (via PerfCPU hashmap)
type PacketFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
lookupAndDeleteSupported bool
}

func NewPacketFetcher(
Expand Down Expand Up @@ -605,14 +622,15 @@ func NewPacketFetcher(
}

return &PacketFetcher{
objects: &objects,
perfReader: packets,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
objects: &objects,
perfReader: packets,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
}, nil
}

Expand Down Expand Up @@ -797,19 +815,35 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
}

func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
if !p.lookupAndDeleteSupported {
return p.legacyLookupAndDeleteMap(met)
}

packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)

var id int
var ids []int
var packet []*byte

// First, get all ids and ignore content (we need lookup+delete to be atomic)
for iterator.Next(&id, &packet) {
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).
Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteFlows").Inc()
ids = append(ids, id)
}

// Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
for i, id := range ids {
if err := packetMap.LookupAndDelete(&id, &packet); err != nil {
if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
p.lookupAndDeleteSupported = false
return p.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc()
}
packets[id] = append(packets[id], packet...)
packets[id] = packet
}

return packets
}
49 changes: 49 additions & 0 deletions pkg/ebpf/tracer_legacy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ebpf

import "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"

// This file contains legacy implementations kept for old kernels

func (m *FlowFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId
var metrics []BpfFlowMetrics
count := 0

// Deleting while iterating is really bad for performance (like, really!) as it causes seeing multiple times the same key
// This is solved in >=4.20 kernels with LookupAndDelete
for iterator.Next(&id, &metrics) {
count++
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc()
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
flows[id] = append(flows[id], metrics...)
}
met.BufferSizeGauge.WithBufferName("hashmap-legacy-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-legacy-unique").Set(float64(len(flows)))

return flows
}

func (p *PacketFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)

var id int
var packet []*byte
for iterator.Next(&id, &packet) {
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry").Inc()
}
packets[id] = append(packets[id], packet...)
}
return packets
}
27 changes: 13 additions & 14 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ import (
)

var (
getCurrentKernelVersion = currentKernelVersion
log = logrus.WithField("component", "utils")
kernelVersion uint32
log = logrus.WithField("component", "utils")
)

func init() {
var err error
kernelVersion, err = currentKernelVersion()
if err != nil {
log.Errorf("failed to get current kernel version: %v", err)
}
}

// GetSocket returns socket string in the correct format based on address family
func GetSocket(hostIP string, hostPort int) string {
socket := fmt.Sprintf("%s:%d", hostIP, hostPort)
Expand All @@ -26,22 +34,13 @@ func GetSocket(hostIP string, hostPort int) string {
return socket
}

func IskernelOlderthan514() bool {
kernelVersion514, err := kernelVersionFromReleaseString("5.14.0")
func IsKernelOlderThan(version string) bool {
refVersion, err := kernelVersionFromReleaseString(version)
if err != nil {
log.Warnf("failed to get kernel version from release string: %v", err)
return false
}
currentVersion, err := getCurrentKernelVersion()
if err != nil {
log.Warnf("failed to get current kernel version: %v", err)
return false
}
if currentVersion < kernelVersion514 {
log.Infof("older kernel version not all hooks will be supported")
return true
}
return false
return kernelVersion != 0 && kernelVersion < refVersion
}

var versionRegex = regexp.MustCompile(`^(\d+)\.(\d+).(\d+).*$`)
Expand Down
Loading

0 comments on commit da2f5e5

Please sign in to comment.