-
Notifications
You must be signed in to change notification settings - Fork 111
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* rebased code from hackathon * Move network config * make tests compile and pass * fix linters * Basic working * using cilium auto-generated structs * fixing integration tests * K8s decoration * basic entities and relationships * fixing namespace in services routes * fix linting and tests * Cleaning up ported code * fix flaky unit test * nonworking integration tests, probably due to limits in size of otel metrics export * fixing asserts test setup * RC to merge into main * updated go.mod
- Loading branch information
Showing
137 changed files
with
24,093 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
// Copyright Red Hat / IBM | ||
// Copyright Grafana Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// This implementation is a derivation of the code in | ||
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4 | ||
|
||
#ifndef __FLOW_H__ | ||
#define __FLOW_H__ | ||
|
||
#define TC_ACT_OK 0 | ||
#define TC_ACT_SHOT 2 | ||
#define IP_MAX_LEN 16 | ||
|
||
#define ETH_ALEN 6 /* Octets in one ethernet addr */ | ||
|
||
#define s6_addr in6_u.u6_addr8 | ||
#define ETH_P_IP 0x0800 /* Internet Protocol packet */ | ||
// ETH_P_IPV6 value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml | ||
#define ETH_P_IPV6 0x86DD /* IPv6 over bluebook */ | ||
typedef __u8 u8; | ||
typedef __u16 u16; | ||
typedef __u32 u32; | ||
typedef __u64 u64; | ||
|
||
typedef struct flow_metrics_t { | ||
u32 packets; | ||
u64 bytes; | ||
// start_mono_time_ts and end_mono_time_ts are the start and end times as system monotonic timestamps | ||
// in nanoseconds, as output from bpf_ktime_get_ns() (kernel space) | ||
// and monotime.Now() (user space) | ||
u64 start_mono_time_ns; | ||
u64 end_mono_time_ns; | ||
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt | ||
u16 flags; | ||
// The positive errno of a failed map insertion that caused a flow | ||
// to be sent via ringbuffer. | ||
// 0 otherwise | ||
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md | ||
u8 errno; | ||
} __attribute__((packed)) flow_metrics; | ||
|
||
// Attributes that uniquely identify a flow | ||
// TODO: remove attributes that won't be used in Beyla (e.g. MAC, maybe protocol...) | ||
typedef struct flow_id_t { | ||
u16 eth_protocol; | ||
u8 direction; | ||
// L2 data link layer | ||
u8 src_mac[ETH_ALEN]; | ||
u8 dst_mac[ETH_ALEN]; | ||
// L3 network layer | ||
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96 | ||
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2 | ||
struct in6_addr src_ip; | ||
struct in6_addr dst_ip; | ||
// L4 transport layer | ||
u16 src_port; | ||
u16 dst_port; | ||
u8 transport_protocol; | ||
// OS interface index | ||
u32 if_index; | ||
} __attribute__((packed)) flow_id; | ||
|
||
// Flow record is a tuple containing both flow identifier and metrics. It is used to send | ||
// a complete flow via ring buffer when only when the accounting hashmap is full. | ||
// Contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct | ||
typedef struct flow_record_t { | ||
flow_id id; | ||
flow_metrics metrics; | ||
} __attribute__((packed)) flow_record; | ||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
// Copyright Red Hat / IBM | ||
// Copyright Grafana Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// This implementation is a derivation of the code in | ||
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4 | ||
|
||
#include "vmlinux.h" | ||
#include <stdbool.h> | ||
|
||
#include "bpf_helpers.h" | ||
#include "bpf_endian.h" | ||
|
||
#include "flow.h" | ||
|
||
#define DISCARD 1 | ||
#define SUBMIT 0 | ||
|
||
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml | ||
#define INGRESS 0 | ||
#define EGRESS 1 | ||
|
||
// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml | ||
#define FIN_FLAG 0x01 | ||
#define SYN_FLAG 0x02 | ||
#define RST_FLAG 0x04 | ||
#define PSH_FLAG 0x08 | ||
#define ACK_FLAG 0x10 | ||
#define URG_FLAG 0x20 | ||
#define ECE_FLAG 0x40 | ||
#define CWR_FLAG 0x80 | ||
// Custom flags exported | ||
#define SYN_ACK_FLAG 0x100 | ||
#define FIN_ACK_FLAG 0x200 | ||
#define RST_ACK_FLAG 0x400 | ||
|
||
// Common Ringbuffer as a conduit for ingress/egress flows to userspace | ||
struct { | ||
__uint(type, BPF_MAP_TYPE_RINGBUF); | ||
__uint(max_entries, 1 << 24); | ||
} direct_flows SEC(".maps"); | ||
|
||
// Key: the flow identifier. Value: the flow metrics for that identifier. | ||
// The userspace will aggregate them into a single flow. | ||
struct { | ||
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH); | ||
__type(key, flow_id); | ||
__type(value, flow_metrics); | ||
} aggregated_flows SEC(".maps"); | ||
|
||
// Constant definitions, to be overridden by the invoker | ||
volatile const u32 sampling = 0; | ||
volatile const u8 trace_messages = 0; | ||
|
||
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; | ||
|
||
// sets the TCP header flags for connection information | ||
static inline void set_flags(struct tcphdr *th, u16 *flags) { | ||
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake. | ||
if (th->ack && th->syn) { | ||
*flags |= SYN_ACK_FLAG; | ||
} else if (th->ack && th->fin ) { | ||
// If both ACK and FIN are set, then it is graceful termination from server. | ||
*flags |= FIN_ACK_FLAG; | ||
} else if (th->ack && th->rst ) { | ||
// If both ACK and RST are set, then it is abrupt connection termination. | ||
*flags |= RST_ACK_FLAG; | ||
} else if (th->fin) { | ||
*flags |= FIN_FLAG; | ||
} else if (th->syn) { | ||
*flags |= SYN_FLAG; | ||
} else if (th->rst) { | ||
*flags |= RST_FLAG; | ||
} else if (th->psh) { | ||
*flags |= PSH_FLAG; | ||
} else if (th->urg) { | ||
*flags |= URG_FLAG; | ||
} else if (th->ece) { | ||
*flags |= ECE_FLAG; | ||
} else if (th->cwr) { | ||
*flags |= CWR_FLAG; | ||
} | ||
} | ||
// sets flow fields from IPv4 header information | ||
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) { | ||
if ((void *)ip + sizeof(*ip) > data_end) { | ||
return DISCARD; | ||
} | ||
|
||
__builtin_memcpy(id->src_ip.s6_addr, ip4in6, sizeof(ip4in6)); | ||
__builtin_memcpy(id->dst_ip.s6_addr, ip4in6, sizeof(ip4in6)); | ||
__builtin_memcpy(id->src_ip.s6_addr + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr)); | ||
__builtin_memcpy(id->dst_ip.s6_addr + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr)); | ||
id->transport_protocol = ip->protocol; | ||
id->src_port = 0; | ||
id->dst_port = 0; | ||
switch (ip->protocol) { | ||
case IPPROTO_TCP: { | ||
struct tcphdr *tcp = (void *)ip + sizeof(*ip); | ||
if ((void *)tcp + sizeof(*tcp) <= data_end) { | ||
id->src_port = __bpf_ntohs(tcp->source); | ||
id->dst_port = __bpf_ntohs(tcp->dest); | ||
set_flags(tcp, flags); | ||
} | ||
} break; | ||
case IPPROTO_UDP: { | ||
struct udphdr *udp = (void *)ip + sizeof(*ip); | ||
if ((void *)udp + sizeof(*udp) <= data_end) { | ||
id->src_port = __bpf_ntohs(udp->source); | ||
id->dst_port = __bpf_ntohs(udp->dest); | ||
} | ||
} break; | ||
default: | ||
break; | ||
} | ||
return SUBMIT; | ||
} | ||
|
||
// sets flow fields from IPv6 header information | ||
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) { | ||
if ((void *)ip + sizeof(*ip) > data_end) { | ||
return DISCARD; | ||
} | ||
|
||
id->src_ip = ip->saddr; | ||
id->dst_ip = ip->daddr; | ||
id->transport_protocol = ip->nexthdr; | ||
id->src_port = 0; | ||
id->dst_port = 0; | ||
switch (ip->nexthdr) { | ||
case IPPROTO_TCP: { | ||
struct tcphdr *tcp = (void *)ip + sizeof(*ip); | ||
if ((void *)tcp + sizeof(*tcp) <= data_end) { | ||
id->src_port = __bpf_ntohs(tcp->source); | ||
id->dst_port = __bpf_ntohs(tcp->dest); | ||
set_flags(tcp, flags); | ||
} | ||
} break; | ||
case IPPROTO_UDP: { | ||
struct udphdr *udp = (void *)ip + sizeof(*ip); | ||
if ((void *)udp + sizeof(*udp) <= data_end) { | ||
id->src_port = __bpf_ntohs(udp->source); | ||
id->dst_port = __bpf_ntohs(udp->dest); | ||
} | ||
} break; | ||
default: | ||
break; | ||
} | ||
return SUBMIT; | ||
} | ||
// sets flow fields from Ethernet header information | ||
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) { | ||
if ((void *)eth + sizeof(*eth) > data_end) { | ||
return DISCARD; | ||
} | ||
__builtin_memcpy(id->dst_mac, eth->h_dest, ETH_ALEN); | ||
__builtin_memcpy(id->src_mac, eth->h_source, ETH_ALEN); | ||
id->eth_protocol = __bpf_ntohs(eth->h_proto); | ||
|
||
if (id->eth_protocol == ETH_P_IP) { | ||
struct iphdr *ip = (void *)eth + sizeof(*eth); | ||
return fill_iphdr(ip, data_end, id, flags); | ||
} else if (id->eth_protocol == ETH_P_IPV6) { | ||
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth); | ||
return fill_ip6hdr(ip6, data_end, id, flags); | ||
} else { | ||
// TODO : Need to implement other specific ethertypes if needed | ||
// For now other parts of flow id remain zero | ||
__builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr)); | ||
__builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr)); | ||
id->transport_protocol = 0; | ||
id->src_port = 0; | ||
id->dst_port = 0; | ||
} | ||
return SUBMIT; | ||
} | ||
|
||
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { | ||
// If sampling is defined, will only parse 1 out of "sampling" flows | ||
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { | ||
return TC_ACT_OK; | ||
} | ||
void *data_end = (void *)(long)skb->data_end; | ||
void *data = (void *)(long)skb->data; | ||
|
||
flow_id id; | ||
__builtin_memset(&id, 0, sizeof(id)); | ||
u64 current_time = bpf_ktime_get_ns(); | ||
struct ethhdr *eth = data; | ||
u16 flags = 0; | ||
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) { | ||
return TC_ACT_OK; | ||
} | ||
|
||
//Set extra fields | ||
id.if_index = skb->ifindex; | ||
id.direction = direction; | ||
|
||
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide | ||
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/ | ||
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); | ||
if (aggregate_flow != NULL) { | ||
aggregate_flow->packets += 1; | ||
aggregate_flow->bytes += skb->len; | ||
aggregate_flow->end_mono_time_ns = current_time; | ||
// it might happen that start_mono_time hasn't been set due to | ||
// the way percpu hashmap deal with concurrent map entries | ||
if (aggregate_flow->start_mono_time_ns == 0) { | ||
aggregate_flow->start_mono_time_ns = current_time; | ||
} | ||
aggregate_flow->flags |= flags; | ||
|
||
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); | ||
if (trace_messages && ret != 0) { | ||
// usually error -16 (-EBUSY) is printed here. | ||
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause | ||
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets), | ||
// which can't be deduplicated. | ||
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md | ||
bpf_printk("error updating flow %d\n", ret); | ||
} | ||
} else { | ||
// Key does not exist in the map, and will need to create a new entry. | ||
flow_metrics new_flow = { | ||
.packets = 1, | ||
.bytes = skb->len, | ||
.start_mono_time_ns = current_time, | ||
.end_mono_time_ns = current_time, | ||
.flags = flags, | ||
}; | ||
|
||
// even if we know that the entry is new, another CPU might be concurrently inserting a flow | ||
// so we need to specify BPF_ANY | ||
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); | ||
if (ret != 0) { | ||
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here. | ||
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have | ||
// a repeated INTERSECTION of flows (different flows aggregating different packets), | ||
// which can be re-aggregated at userpace. | ||
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md | ||
if (trace_messages) { | ||
bpf_printk("error adding flow %d\n", ret); | ||
} | ||
|
||
new_flow.errno = -ret; | ||
flow_record *record = (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0); | ||
if (!record) { | ||
if (trace_messages) { | ||
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow"); | ||
} | ||
return TC_ACT_OK; | ||
} | ||
record->id = id; | ||
record->metrics = new_flow; | ||
bpf_ringbuf_submit(record, 0); | ||
} | ||
} | ||
return TC_ACT_OK; | ||
} | ||
|
||
SEC("tc_ingress") | ||
int ingress_flow_parse(struct __sk_buff *skb) { | ||
return flow_monitor(skb, INGRESS); | ||
} | ||
|
||
SEC("tc_egress") | ||
int egress_flow_parse(struct __sk_buff *skb) { | ||
return flow_monitor(skb, EGRESS); | ||
} | ||
|
||
// Force emitting structs into the ELF for automatic creation of Golang struct | ||
const flow_metrics *unused_flow_metrics __attribute__((unused)); | ||
const flow_id *unused_flow_id __attribute__((unused)); | ||
const flow_record *unused_flow_record __attribute__((unused)); | ||
|
||
char _license[] SEC("license") = "GPL"; |
Oops, something went wrong.