Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-613: decrease premature eviction of eBPF hashmap #61

Merged
merged 5 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bpf/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ typedef struct flow_metrics_t {
// as output from bpf_ktime_get_ns()
u64 start_mono_time_ts;
u64 end_mono_time_ts;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
} __attribute__((packed)) flow_metrics;

// Attributes that uniquely identify a flow
Expand Down
31 changes: 25 additions & 6 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct {

// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 trace_messages = 0;

const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

Expand Down Expand Up @@ -184,7 +185,15 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
aggregate_flow->start_mono_time_ts = current_time;
}

bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_EXIST);
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of changing BPF_EXIST to BPF_ANY ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that the PerCPU map implementation sometimes complaints about the assumption of existence of the flows and we might loose packets.

E.g. it can't assume that an entry does not exist because another thread could have inserted the bucket in the map (which is common to all the CPUs despite the "PerCPU" prefix of the map), or it can't assume that exist because the userspace might be deleting it.

if (trace_messages && ret != 0) {
// usually error -16 (-EBUSY) is printed here.
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
// which can't be deduplicated.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
bpf_printk("error updating flow %d\n", ret);
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
Expand All @@ -196,13 +205,23 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
if (bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY) != 0) {
/*
When the map is full, we directly send the flow entry to userspace via ringbuffer,
until space is available in the kernel-side maps
*/
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting observation.

// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error adding flow %d\n", ret);
}

new_flow.errno = -ret;
flow_record *record = bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
record->id = id;
Expand Down
27 changes: 19 additions & 8 deletions e2e/basic/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package basic

import (
"context"
"fmt"
"os"
"path"
"strconv"
Expand Down Expand Up @@ -99,17 +100,15 @@ func TestSinglePacketFlows(t *testing.T) {
require.NoError(t, err)
logrus.WithFields(logrus.Fields{"stdOut": stdOut, "stdErr": stdErr}).Debug("ping sent")

sent, recv := getPingFlows(t, latestFlowMS)
sent, recv := getPingFlows(t, latestFlowMS, pktLen+ipIcmpHeadersLen)
logrus.Debugf("ping request flow: %#v", sent)
logrus.Debugf("ping response flow: %#v", recv)

assert.Equal(t, pingerIP, sent["SrcAddr"])
assert.Equal(t, serverPodIP, sent["DstAddr"])
assert.EqualValues(t, pktLen+ipIcmpHeadersLen, sent["Bytes"])
assert.EqualValues(t, 1, sent["Packets"])
assert.Equal(t, pingerIP, recv["DstAddr"])
assert.Equal(t, serverPodIP, recv["SrcAddr"])
assert.EqualValues(t, pktLen+ipIcmpHeadersLen, recv["Bytes"])
assert.EqualValues(t, 1, recv["Packets"])

if t.Failed() {
Expand All @@ -126,15 +125,22 @@ func TestSinglePacketFlows(t *testing.T) {
).Feature())
}

func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]interface{}) {
func getPingFlows(t *testing.T, newerThan time.Time, expectedBytes int) (sent, recv map[string]interface{}) {
logrus.Debug("Verifying that the request/return ICMP packets have been captured individually")
var query *tester.LokiQueryResponse
var err error
test.Eventually(t, testTimeout, func(t require.TestingT) {

test.Eventually(t, time.Minute, func(t require.TestingT) {
query, err = testCluster.Loki().
Query(1, `{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
Query(1, fmt.Sprintf(
`{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}`+
`|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP
`|~"\"Bytes\":%d[,}]"`, expectedBytes))
require.NoError(t, err)
require.NotNil(t, query)
if query == nil {
return
}
require.NotEmpty(t, query.Data.Result)
if len(query.Data.Result) > 0 {
sent, err = query.Data.Result[0].Values[0].FlowData()
Expand All @@ -144,11 +150,16 @@ func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]inte
}
}, test.Interval(time.Second))

test.Eventually(t, testTimeout, func(t require.TestingT) {
test.Eventually(t, time.Minute, func(t require.TestingT) {
query, err = testCluster.Loki().
Query(1, `{DstK8S_OwnerName="pinger",SrcK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
Query(1, fmt.Sprintf(`{SrcK8S_OwnerName="server",DstK8S_OwnerName="pinger"}`+
`|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP
`|~"\"Bytes\":%d[,}]"`, expectedBytes))
require.NoError(t, err)
require.NotNil(t, query)
if query == nil {
return
}
require.Len(t, query.Data.Result, 1)
if len(query.Data.Result) > 0 {
recv, err = query.Data.Result[0].Values[0].FlowData()
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) {

ingress, egress := flowDirections(cfg)

debug := false
if cfg.LogLevel == logrus.TraceLevel.String() || cfg.LogLevel == logrus.DebugLevel.String() {
debug = true
}

tracer, err := ebpf.NewFlowTracer(
debug,
cfg.Sampling, cfg.CacheMaxFlows, cfg.BuffersLength, cfg.CacheActiveTimeout,
ingress, egress,
interfaceNamer,
Expand Down
Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
56 changes: 42 additions & 14 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/cilium/ebpf/ringbuf"
Expand All @@ -28,6 +29,7 @@ const (
qdiscType = "clsact"
// constants defined in flows.c as "volatile const"
constSampling = "sampling"
constTraceMessages = "trace_messages"
aggregatedFlowsMap = "aggregated_flows"
)

Expand All @@ -50,10 +52,17 @@ type FlowTracer struct {
cacheMaxSize int
enableIngress bool
enableEgress bool
// ringBuf supports atomic logging of ringBuffer metrics
ringBuf struct {
isForwarding int32
forwardedFlows int32
mapFullErrs int32
}
}

// TODO: decouple flowtracer logic from eBPF maps access so we can inject mocks for testing
func NewFlowTracer(
traceMessages bool,
sampling, cacheMaxSize, buffersLength int,
evictionTimeout time.Duration,
ingress, egress bool,
Expand All @@ -73,8 +82,13 @@ func NewFlowTracer(
// Resize aggregated flows map according to user-provided configuration
spec.Maps[aggregatedFlowsMap].MaxEntries = uint32(cacheMaxSize)

traceMsgs := 0
if traceMessages {
traceMsgs = 1
}
if err := spec.RewriteConstants(map[string]interface{}{
constSampling: uint32(sampling),
constSampling: uint32(sampling),
constTraceMessages: uint8(traceMsgs),
}); err != nil {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}
Expand Down Expand Up @@ -376,8 +390,7 @@ func (m *FlowTracer) Trace(ctx context.Context, forwardFlows chan<- []*flow.Reco
func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlows chan<- []*flow.Record) {
flowAccount := make(chan *flow.RawRecord, m.buffersLength)
go m.accounter.Account(flowAccount, forwardFlows)
isForwarding := int32(0)
forwardedFlows := int32(0)
debugging := logrus.IsLevelEnabled(logrus.DebugLevel)
for {
select {
case <-ctx.Done():
Expand All @@ -399,11 +412,15 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
log.WithError(err).Warn("reading ringbuf event")
continue
}
if logrus.IsLevelEnabled(logrus.DebugLevel) {
m.logRingBufferFlows(&forwardedFlows, &isForwarding)
mapFullError := readFlow.Errno == uint8(syscall.E2BIG)
if debugging {
m.logRingBufferFlows(mapFullError)
}
// if the flow was received due to lack of space in the eBPF map
// forces a flow's eviction to leave room for new flows in the ebpf cache
m.flowsEvictor.Broadcast()
if mapFullError {
m.flowsEvictor.Broadcast()
}

// Will need to send it to accounter anyway to account regardless of complete/ongoing flow
flowAccount <- readFlow
Expand All @@ -413,17 +430,28 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow

// logRingBufferFlows avoids flooding logs on long series of evicted flows by grouping how
// many flows are forwarded
func (m *FlowTracer) logRingBufferFlows(forwardedFlows, isForwarding *int32) {
atomic.AddInt32(forwardedFlows, 1)
if atomic.CompareAndSwapInt32(isForwarding, 0, 1) {
func (m *FlowTracer) logRingBufferFlows(mapFullErr bool) {
atomic.AddInt32(&m.ringBuf.forwardedFlows, 1)
if mapFullErr {
atomic.AddInt32(&m.ringBuf.mapFullErrs, 1)
}
if atomic.CompareAndSwapInt32(&m.ringBuf.isForwarding, 0, 1) {
go func() {
time.Sleep(m.evictionTimeout)
log.WithFields(logrus.Fields{
"flows": atomic.LoadInt32(forwardedFlows),
mfe := atomic.LoadInt32(&m.ringBuf.mapFullErrs)
l := log.WithFields(logrus.Fields{
"flows": atomic.LoadInt32(&m.ringBuf.forwardedFlows),
"mapFullErrs": mfe,
"cacheMaxFlows": m.cacheMaxSize,
}).Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value")
atomic.StoreInt32(forwardedFlows, 0)
atomic.StoreInt32(isForwarding, 0)
})
if mfe == 0 {
l.Debug("received flows via ringbuffer")
} else {
l.Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value")
}
atomic.StoreInt32(&m.ringBuf.forwardedFlows, 0)
atomic.StoreInt32(&m.ringBuf.isForwarding, 0)
atomic.StoreInt32(&m.ringBuf.mapFullErrs, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these metrics be accessible to the operator apart from the logs?
This might be useful to indicate if the map size is too small to handle the volume of traffic, if its getting full often.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, maybe for a later task / PR ?

}()
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type RecordMetrics struct {
// and monotime.Now() (user space)
StartMonoTimeNs uint64
EndMonoTimeNs uint64

Errno uint8
}

// record structure as parsed from eBPF
Expand Down
2 changes: 2 additions & 0 deletions pkg/flow/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 bytes
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_start_time
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time
0x33, // u8 errno
}))
require.NoError(t, err)

Expand Down Expand Up @@ -53,6 +54,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
Bytes: 0x1a19181716151413,
StartMonoTimeNs: 0x1a19181716151413,
EndMonoTimeNs: 0x1a19181716151413,
Errno: 0x33,
},
}, *fr)
// assert that IP addresses are interpreted as IPv4 addresses
Expand Down