Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-559: use LookupAndDelete to read maps #283

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 97 additions & 63 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,18 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher")
// and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated
// in the map
type FlowFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
rttFentryLink link.Link
rttKprobeLink link.Link
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
rttFentryLink link.Link
rttKprobeLink link.Link
lookupAndDeleteSupported bool
}

type FlowFetcherConfig struct {
Expand Down Expand Up @@ -119,7 +120,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}

oldKernel := utils.IskernelOlderthan514()
oldKernel := utils.IsKernelOlderThan("5.14.0")
if oldKernel {
log.Infof("kernel older than 5.14.0 detected: not all hooks are supported")
}
objects, err := kernelSpecificLoadAndAssign(oldKernel, spec)
if err != nil {
return nil, err
Expand Down Expand Up @@ -165,17 +169,18 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}

return &FlowFetcher{
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink,
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
}, nil
}

Expand Down Expand Up @@ -404,35 +409,41 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
}

// LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it.
// It returns a map where the key
// For synchronization purposes, we get/delete a whole snapshot of the flows map.
// This way we avoid missing packets that could be updated on the
// ebpf side while we process/aggregate them here
// Changing this method invocation by BatchLookupAndDelete could improve performance
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
if !m.lookupAndDeleteSupported {
return m.legacyLookupAndDeleteMap(met)
}

flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var ids []BpfFlowId
var id BpfFlowId
var metrics []BpfFlowMetrics

count := 0
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
// First, get all ids and don't care about metrics (we need lookup+delete to be atomic)
for iterator.Next(&id, &metrics) {
ids = append(ids, id)
}

count := 0
// Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
for i, id := range ids {
count++
if err := flowMap.Delete(id); err != nil {
if err := flowMap.LookupAndDelete(&id, &metrics); err != nil {
if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
m.lookupAndDeleteSupported = false
return m.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if u do it like this is bad ?

for iterator.Next(&id, &metrics) {
   if err := flowMap.LookupAndDelete(&id, &metrics); err != nil {
      log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
    }
    flows[id] = append(flows[id], metrics...)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried that one in particular but I think the same issue will come up as there's still a delete within the iteration. In the previous code, doing the Delete within the iteration resulted in screwed up keys ending up in iterating over the full 100K map entries

met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
continue
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
flows[id] = metrics
}
met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows)))
Expand All @@ -451,16 +462,21 @@ func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) {
monotonicTimeNow := monotime.Now()
dnsMap := m.objects.DnsFlows
var dnsKey BpfDnsFlowId
var keysToDelete []BpfDnsFlowId
var dnsVal uint64

if dnsMap != nil {
// Ideally the Lookup + Delete should be atomic, however we cannot use LookupAndDelete since the deletion is conditional
// Do not delete while iterating, as it causes severe performance degradation
iterator := dnsMap.Iterate()
for iterator.Next(&dnsKey, &dnsVal) {
if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut {
if err := dnsMap.Delete(dnsKey); err != nil {
log.WithError(err).WithField("dnsKey", dnsKey).
Warnf("couldn't delete DNS record entry")
}
keysToDelete = append(keysToDelete, dnsKey)
}
}
for _, dnsKey = range keysToDelete {
if err := dnsMap.Delete(dnsKey); err != nil {
log.WithError(err).WithField("dnsKey", dnsKey).Warnf("couldn't delete DNS record entry")
}
}
}
Expand Down Expand Up @@ -529,14 +545,15 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf

// It provides access to packets from the kernel space (via PerfCPU hashmap)
type PacketFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
lookupAndDeleteSupported bool
}

func NewPacketFetcher(
Expand Down Expand Up @@ -605,14 +622,15 @@ func NewPacketFetcher(
}

return &PacketFetcher{
objects: &objects,
perfReader: packets,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
objects: &objects,
perfReader: packets,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
}, nil
}

Expand Down Expand Up @@ -797,19 +815,35 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
}

func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
if !p.lookupAndDeleteSupported {
return p.legacyLookupAndDeleteMap(met)
}

packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)

var id int
var ids []int
var packet []*byte

// First, get all ids and ignore content (we need lookup+delete to be atomic)
for iterator.Next(&id, &packet) {
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).
Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteFlows").Inc()
ids = append(ids, id)
}

// Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
for i, id := range ids {
if err := packetMap.LookupAndDelete(&id, &packet); err != nil {
if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
p.lookupAndDeleteSupported = false
return p.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc()
}
packets[id] = append(packets[id], packet...)
packets[id] = packet
}

return packets
}
49 changes: 49 additions & 0 deletions pkg/ebpf/tracer_legacy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ebpf

import "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"

// This file contains legacy implementations kept for old kernels

func (m *FlowFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId
var metrics []BpfFlowMetrics
count := 0

// Deleting while iterating is really bad for performance (like, really!) as it causes seeing multiple times the same key
// This is solved in >=4.20 kernels with LookupAndDelete
for iterator.Next(&id, &metrics) {
count++
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc()
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
flows[id] = append(flows[id], metrics...)
}
met.BufferSizeGauge.WithBufferName("hashmap-legacy-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-legacy-unique").Set(float64(len(flows)))

return flows
}

func (p *PacketFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)

var id int
var packet []*byte
for iterator.Next(&id, &packet) {
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry").Inc()
}
packets[id] = append(packets[id], packet...)
}
return packets
}
27 changes: 13 additions & 14 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ import (
)

var (
getCurrentKernelVersion = currentKernelVersion
log = logrus.WithField("component", "utils")
kernelVersion uint32
log = logrus.WithField("component", "utils")
)

func init() {
var err error
kernelVersion, err = currentKernelVersion()
if err != nil {
log.Errorf("failed to get current kernel version: %v", err)
}
}

// GetSocket returns socket string in the correct format based on address family
func GetSocket(hostIP string, hostPort int) string {
socket := fmt.Sprintf("%s:%d", hostIP, hostPort)
Expand All @@ -26,22 +34,13 @@ func GetSocket(hostIP string, hostPort int) string {
return socket
}

func IskernelOlderthan514() bool {
kernelVersion514, err := kernelVersionFromReleaseString("5.14.0")
func IsKernelOlderThan(version string) bool {
refVersion, err := kernelVersionFromReleaseString(version)
if err != nil {
log.Warnf("failed to get kernel version from release string: %v", err)
return false
}
currentVersion, err := getCurrentKernelVersion()
if err != nil {
log.Warnf("failed to get current kernel version: %v", err)
return false
}
if currentVersion < kernelVersion514 {
log.Infof("older kernel version not all hooks will be supported")
return true
}
return false
return kernelVersion != 0 && kernelVersion < refVersion
}

var versionRegex = regexp.MustCompile(`^(\d+)\.(\d+).(\d+).*$`)
Expand Down
Loading
Loading