Skip to content

Commit

Permalink
NETOBSERV-1112: This patch fixes a bug where RTT was not visible for …
Browse files Browse the repository at this point in the history
…flow logs at times. (#159)

* Make RTT values show up on both flow directions

* Fix RTT calculation error for container hooks.

Signed-off-by: Dushyant Behl <[email protected]>
Signed-off-by: Dushyant Behl <[email protected]>

---------

Signed-off-by: Dushyant Behl <[email protected]>
Signed-off-by: Dushyant Behl <[email protected]>
  • Loading branch information
dushyantbehl authored Aug 4, 2023
1 parent af7d59d commit 6d9d2e7
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 74 deletions.
2 changes: 1 addition & 1 deletion bpf/dns_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static inline int trace_dns(struct sk_buff *skb) {
if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false);
if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) {
u64 ts = bpf_ktime_get_ns();
u64 ts = bpf_ktime_get_ns();
bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY);
}
id.direction = EGRESS;
Expand Down
33 changes: 14 additions & 19 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}

// Record the current time first.
u64 current_time = bpf_ktime_get_ns();
pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

flow_id id;
__builtin_memset(&id, 0, sizeof(id));

pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
pkt.id = &id;
pkt.current_ts = current_time;

void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
Expand All @@ -60,30 +57,28 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}

if (enable_rtt) {
// This is currently gated as its not to be enabled by default.
calculate_flow_rtt(&pkt, direction, data_end);
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;

// We calculate the RTT before looking up aggregated_flows map because we want
// to keep the critical section between map lookup and update consume minimum time.
if (enable_rtt) {
// This is currently not to be enabled by default.
calculate_flow_rtt(&pkt, direction, data_end);
}

// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = current_time;
aggregate_flow->end_mono_time_ts = pkt.current_ts;
aggregate_flow->flags |= pkt.flags;

// Does not matter the gate. Will be zero if not enabled.
if (pkt.rtt > 0) {
/* Since RTT is calculated for few packets we need to check if it is non zero value then only we update
* the flow. If we remove this check a packet which fails to calculate RTT will override the previous valid
* RTT with 0.
*/
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
}

Expand All @@ -101,8 +96,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics new_flow = {
.packets = 1,
.bytes = skb->len,
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.flow_rtt = pkt.rtt
};
Expand Down
128 changes: 92 additions & 36 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include "utils.h"
#include "maps_definition.h"

static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, u8 reversed) {
const u64 MIN_RTT = 50000; //50 micro seconds

static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, bool reverse) {
flow_id *id = pkt->id;
if (reversed) {
if (reverse) {
__builtin_memcpy(seq_id->src_ip, id->dst_ip, IP_MAX_LEN);
__builtin_memcpy(seq_id->dst_ip, id->src_ip, IP_MAX_LEN);
seq_id->src_port = id->dst_port;
Expand All @@ -23,49 +25,104 @@ static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt,
seq_id->src_port = id->src_port;
seq_id->dst_port = id->dst_port;
}
seq_id->transport_protocol = id->transport_protocol;
seq_id->seq_id = seq;
seq_id->if_index = id->if_index;
}

static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) {
struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr;
if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) {
return;
}
static __always_inline void reverse_flow_id_struct(flow_id *src, flow_id *dst) {
// Fields which remain same
dst->eth_protocol = src->eth_protocol;
dst->transport_protocol = src->transport_protocol;
dst->if_index = src->if_index;

// Fields which should be reversed
dst->direction = (src->direction == INGRESS) ? EGRESS : INGRESS;
__builtin_memcpy(dst->src_mac, src->dst_mac, ETH_ALEN);
__builtin_memcpy(dst->dst_mac, src->src_mac, ETH_ALEN);
__builtin_memcpy(dst->src_ip, src->dst_ip, IP_MAX_LEN);
__builtin_memcpy(dst->dst_ip, src->src_ip, IP_MAX_LEN);
dst->src_port = src->dst_port;
dst->dst_port = src->src_port;
/* ICMP type can be ignore for now. We only deal with TCP packets for now.*/
}

switch (direction) {
case EGRESS: {
if (IS_SYN_PACKET(pkt)) {
// Record the outgoing syn sequence number
u32 seq = bpf_ntohl(tcp->seq);
fill_flow_seq_id(seq_id, pkt, seq, 0);
static __always_inline void update_reverse_flow_rtt(pkt_info *pkt, u32 seq) {
flow_id rev_flow_id;
__builtin_memset(&rev_flow_id, 0, sizeof(rev_flow_id));
reverse_flow_id_struct(pkt->id, &rev_flow_id);

long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_ANY);
flow_metrics *reverse_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &rev_flow_id);
if (reverse_flow != NULL) {
if (pkt->rtt > reverse_flow->flow_rtt) {
reverse_flow->flow_rtt = pkt->rtt;
long ret = bpf_map_update_elem(&aggregated_flows, &rev_flow_id, reverse_flow, BPF_EXIST);
if (trace_messages && ret != 0) {
bpf_printk("err saving flow sequence record %d", ret);
bpf_printk("error updating rtt value in flow %d\n", ret);
}
}
break;
}
case INGRESS: {
if (IS_ACK_PACKET(pkt)) {
// Stored sequence should be ack_seq - 1
u32 seq = bpf_ntohl(tcp->ack_seq) - 1;
// check reversed flow
fill_flow_seq_id(seq_id, pkt, seq, 1);

u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id);
if (prev_ts != NULL) {
pkt->rtt = pkt->current_ts - *prev_ts;
// Delete the flow from flow sequence map so if it
// restarts we have a new RTT calculation.
long ret = bpf_map_delete_elem(&flow_sequences, seq_id);
if (trace_messages && ret != 0) {
bpf_printk("error evicting flow sequence: %d", ret);
}
}
}

static __always_inline void __calculate_tcp_rtt(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
// Stored sequence should be ack_seq - 1
u32 seq = bpf_ntohl(tcp->ack_seq) - 1;
// check reversed flow
fill_flow_seq_id(seq_id, pkt, seq, true);

u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id);
if (prev_ts != NULL) {
u64 rtt = pkt->current_ts - *prev_ts;
/**
* FIXME: Because of SAMPLING the way it is done if we miss one of SYN/SYN+ACK/ACK
* then we can get RTT values which are the process response time rather than actual RTT.
* This check below clears them out but needs to be modified with a better solution or change
* the algorithm for calculating RTT so it doesn't interact with SAMPLING like this.
*/
if (rtt < MIN_RTT) {
return;
}
break;
pkt->rtt = rtt;
// Delete the flow from flow sequence map so if it
// restarts we have a new RTT calculation.
long ret = bpf_map_delete_elem(&flow_sequences, seq_id);
if (trace_messages && ret != 0) {
bpf_printk("error evicting flow sequence: %d", ret);
}
// This is an ACK packet with valid sequence id so a SYN must
// have been sent. We can safely update the reverse flow RTT here.
update_reverse_flow_rtt(pkt, seq);
}
return;
}

static __always_inline void __store_tcp_ts(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
// store timestamp of syn packets.
u32 seq = bpf_ntohl(tcp->seq);
fill_flow_seq_id(seq_id, pkt, seq, false);
long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_NOEXIST);
if (trace_messages && ret != 0) {
bpf_printk("err saving flow sequence record %d", ret);
}
return;
}

static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) {
struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr;
if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) {
return;
}

/* We calculate RTT for both SYN/SYN+ACK and SYN+ACK/ACK and take the maximum of both.*/
if (tcp->syn && tcp->ack) { // SYN ACK Packet
__calculate_tcp_rtt(pkt, tcp, seq_id);
__store_tcp_ts(pkt, tcp, seq_id);
}
else if (tcp->ack) {
__calculate_tcp_rtt(pkt, tcp, seq_id);
}
else if (tcp->syn) {
__store_tcp_ts(pkt, tcp, seq_id);
}
}

Expand All @@ -83,5 +140,4 @@ static __always_inline void calculate_flow_rtt(pkt_info *pkt, u8 direction, void
}
}

#endif /* __RTT_TRACKER_H__ */

#endif /* __RTT_TRACKER_H__ */
7 changes: 3 additions & 4 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400

#define IS_SYN_PACKET(pkt) ((pkt->flags & SYN_FLAG) || (pkt->flags & SYN_ACK_FLAG))
#define IS_ACK_PACKET(pkt) ((pkt->flags & ACK_FLAG) || (pkt->flags & SYN_ACK_FLAG))

#if defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && \
__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
#define bpf_ntohs(x) __builtin_bswap16(x)
Expand Down Expand Up @@ -124,14 +121,16 @@ typedef struct flow_id_t {
// Force emitting struct flow_id into the ELF.
const struct flow_id_t *unused2 __attribute__((unused));

// Standard 4 tuple and a sequence identifier.
// Standard 4 tuple, transport protocol and a sequence identifier.
// No need to emit this struct. It's used only in kernel space
typedef struct flow_seq_id_t {
u16 src_port;
u16 dst_port;
u8 src_ip[IP_MAX_LEN];
u8 dst_ip[IP_MAX_LEN];
u32 seq_id;
u8 transport_protocol;
u32 if_index; // OS interface index
} __attribute__((packed)) flow_seq_id;

// Flow record is a tuple containing both flow identifier and metrics. It is used to send
Expand Down
8 changes: 4 additions & 4 deletions examples/flowlogs-dump/server/flowlogs-dump-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords {
for _, record := range records.Entries {
if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -91,10 +91,10 @@ func main() {
record.GetDnsId(),
record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Microseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(),
)
} else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -113,7 +113,7 @@ func main() {
record.GetDnsId(),
record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Microseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(),
)
}
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
12 changes: 7 additions & 5 deletions pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
2 changes: 2 additions & 0 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
if enableRtt == 0 {
// Cannot set the size of map to be 0 so set it to 1.
spec.Maps[flowSequencesMap].MaxEntries = uint32(1)
} else {
log.Debugf("RTT calculations are enabled")
}

if !cfg.DNSTracker {
Expand Down

0 comments on commit 6d9d2e7

Please sign in to comment.