From e33e66ce6bac6e25ba42e1dacc477c8a1e77aa1e Mon Sep 17 00:00:00 2001 From: Ilya Gavrilov Date: Tue, 22 Oct 2024 21:19:45 +0200 Subject: [PATCH 1/5] Rework eBPF capture backend --- bpf/include/maps.h | 19 +- bpf/packet_sniffer.c | 350 ++++++++++++++------------ pkg/bpf/bpf.go | 2 +- pkg/discoverer/discoverer.go | 32 ++- pkg/discoverer/pids.go | 36 +-- pkg/hooks/packet/packet_sniffer.go | 143 +---------- pkg/poller/log/bpf_logger_messages.go | 1 + pkg/poller/packets/packets_poller.go | 27 +- tracer.go | 44 ++-- 9 files changed, 285 insertions(+), 369 deletions(-) diff --git a/bpf/include/maps.h b/bpf/include/maps.h index 36f012b..5bd3f60 100644 --- a/bpf/include/maps.h +++ b/bpf/include/maps.h @@ -18,6 +18,7 @@ Copyright (C) Kubeshark #define MAX_ENTRIES_HASH (1 << 12) // 4096 #define MAX_ENTRIES_PERF_OUTPUT (1 << 10) // 1024 #define MAX_ENTRIES_LRU_HASH (1 << 14) // 16384 +#define MAX_ENTRIES_LRU_HASH_BIG (1 << 20) // 1M // The same struct can be found in chunk.go // @@ -96,8 +97,10 @@ struct pkt { __u64 timestamp; __u64 cgroup_id; __u64 id; + __u32 len; + __u32 tot_len; + __u32 counter; __u16 num; - __u16 len; __u16 last; __u8 direction; unsigned char buf[PKT_PART_LEN]; @@ -116,11 +119,13 @@ struct { __type(value, __u64); } pkt_id SEC(".maps"); -struct pkt_data { +struct socket_cookie_data { __u64 cgroup_id; - __u32 pad1; - __u16 rewrite_src_port; - __u16 pad2; + __u32 src_ip; + __u32 dst_ip; + __u16 src_port; + __u16 dst_port; + __u8 side; // 0 - received, 1 - sent }; #define CONFIGURATION_FLAG_CAPTURE_STOPPED (1 << 0) @@ -144,6 +149,8 @@ struct configuration { #define BPF_LRU_HASH(_name, _key_type, _value_type) \ BPF_MAP(_name, BPF_MAP_TYPE_LRU_HASH, _key_type, _value_type, MAX_ENTRIES_LRU_HASH) +#define BPF_LRU_HASH_BIG(_name, _key_type, _value_type) \ + BPF_MAP(_name, BPF_MAP_TYPE_LRU_HASH, _key_type, _value_type, MAX_ENTRIES_LRU_HASH_BIG) #define BPF_ARRAY(_name, _key_type, _value_type, _max_entries) \ BPF_MAP(_name, BPF_MAP_TYPE_ARRAY, _key_type, _value_type, _max_entries) @@ -169,6 +176,6 @@ BPF_LRU_HASH(go_kernel_write_context, __u64, __u32); BPF_LRU_HASH(go_kernel_read_context, __u64, __u32); BPF_LRU_HASH(go_user_kernel_write_context, __u64, struct address_info); BPF_LRU_HASH(go_user_kernel_read_context, __u64, struct address_info); -BPF_LRU_HASH(pkt_context, __u64, struct pkt_data); +BPF_LRU_HASH_BIG(socket_cookies, __u64, struct socket_cookie_data); #endif /* __MAPS__ */ diff --git a/bpf/packet_sniffer.c b/bpf/packet_sniffer.c index 4e51747..e447013 100644 --- a/bpf/packet_sniffer.c +++ b/bpf/packet_sniffer.c @@ -48,7 +48,6 @@ cgroup_skb/ingress hook│ │cgroup_skb/egress #include "include/logger_messages.h" #include "include/common.h" - const volatile __u64 DISABLE_EBPF_CAPTURE = 0; /* @@ -59,134 +58,124 @@ const volatile __u64 DISABLE_EBPF_CAPTURE = 0; // #define ENABLE_TRACE_PACKETS #ifdef ENABLE_TRACE_PACKETS -#define TRACE_PACKET(NAME, IS_CGROUP, LOCAL_IP, REMOTE_IP, LOCAL_PORT, REMOTE_PORT, CGROUP_ID) \ - bpf_printk("PKT "NAME" skb: %p len: %d ret: %d, cgroup: %d cookie:0x%x", skb, (IS_CGROUP?(skb->len+14):skb->len), ret, CGROUP_ID, bpf_get_socket_cookie(skb)); \ - bpf_printk("PKT "NAME" ip_local: %pi4 ip_remote: %pi4", &(LOCAL_IP), &(REMOTE_IP)); \ - {__u32 __port_local = bpf_ntohl(LOCAL_PORT); __u32 __port_remote= bpf_ntohl(REMOTE_PORT);bpf_printk("PKT "NAME" port_local: 0x%x port_remote: 0x%x", __port_local, __port_remote);} \ - bpf_printk("PKT "NAME" ip_src: %pi4 ip_dst:%pi4", &(src_ip), &(dst_ip)); \ - {__u32 __src_port = bpf_ntohl(src_port); __u32 __dst_port= bpf_ntohl(dst_port);bpf_printk("PKT "NAME" port_src: 0x%x port_dst: 0x%x", __src_port, __dst_port); } +#define TRACE_PACKET(NAME, IS_CGROUP, LOCAL_IP, REMOTE_IP, LOCAL_PORT, REMOTE_PORT, CGROUP_ID) \ + bpf_printk("PKT " NAME " skb: %p len: %d ret: %d, cgroup: %d cookie:0x%x", skb, (IS_CGROUP ? (skb->len + 14) : skb->len), ret, CGROUP_ID, bpf_get_socket_cookie(skb)); \ + bpf_printk("PKT " NAME " ip_local: %pi4 ip_remote: %pi4", &(LOCAL_IP), &(REMOTE_IP)); \ + { \ + __u32 __port_local = bpf_ntohl(LOCAL_PORT); \ + __u32 __port_remote = bpf_ntohl(REMOTE_PORT); \ + bpf_printk("PKT " NAME " port_local: 0x%x port_remote: 0x%x", __port_local, __port_remote); \ + } \ + bpf_printk("PKT " NAME " ip_src: %pi4 ip_dst:%pi4", &(src_ip), &(dst_ip)); \ + { \ + __u32 __src_port = bpf_ntohl(src_port); \ + __u32 __dst_port = bpf_ntohl(dst_port); \ + bpf_printk("PKT " NAME " port_src: 0x%x port_dst: 0x%x", __src_port, __dst_port); \ + } #define TRACE_PACKET_SENT(NAME) \ - bpf_printk("PKT "NAME" sent"); + bpf_printk("PKT " NAME " sent"); #else #define TRACE_PACKET(NAME, IS_CGROUP, LOCAL_IP, REMOTE_IP, LOCAL_PORT, REMOTE_PORT, CGROUP_ID) \ - src_ip; dst_ip; src_port; dst_port; + src_ip; \ + dst_ip; \ + src_port; \ + dst_port; #define TRACE_PACKET_SENT(NAME) #endif -#define ETH_P_IP 0x0800 +#define ETH_P_IP 0x0800 -static __always_inline void save_packet(struct __sk_buff* skb, __u32 offset, __u32 rewrite_ip_src, __u16 rewrite_port_src, __u32 rewrite_ip_dst, __u16 rewrite_port_dst, __u64 cgroup_id, __u8 direction); -static __always_inline int parse_packet(struct __sk_buff* skb, int is_tc, __u32* src_ip4, __u16* src_port, __u32* dst_ip4, __u16* dst_port, __u8* ipp); +static __always_inline void save_packet(struct __sk_buff *skb, __u32 offset, __u32 rewrite_ip_src, __u16 rewrite_port_src, __u32 rewrite_ip_dst, __u16 rewrite_port_dst, __u64 cgroup_id, __u8 direction); +static __always_inline int parse_packet(struct __sk_buff *skb, int is_tc, __u32 *src_ip4, __u16 *src_port, __u32 *dst_ip4, __u16 *dst_port, __u8 *ipp); -SEC("cgroup_skb/ingress") -int filter_ingress_packets(struct __sk_buff* skb) { - if (DISABLE_EBPF_CAPTURE) - return 1; - if (capture_disabled()) - return 1; +// TODO: remove cookies - __u32 src_ip = 0; - __u16 src_port = 0; - __u32 dst_ip = 0; - __u16 dst_port = 0; - int ret = parse_packet(skb, 0, &src_ip, &src_port, &dst_ip, &dst_port, NULL); - if (ret) { - TRACE_PACKET("cg/in", true, skb->local_ip4, skb->remote_ip4, skb->local_port & 0xffff, skb->remote_port & 0xffff, bpf_skb_cgroup_id(skb)); - save_packet(skb, 0, 0, 0, 0, 0, bpf_skb_cgroup_id(skb), PACKET_DIRECTION_RECEIVED); - TRACE_PACKET_SENT("cg/in"); +int reported_cookie_error = 0; +static __always_inline __u64 get_socket_cookie(struct __sk_buff *skb) +{ + __u64 cookie = bpf_get_socket_cookie(skb); + if (!cookie && !reported_cookie_error) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 100, 0l, 0l); + reported_cookie_error = 1; } - return 1; + + return cookie; } -SEC("cgroup_skb/egress") -int filter_egress_packets(struct __sk_buff* skb) { +static __always_inline int filter_packets(struct __sk_buff *skb, __u8 side) +{ if (DISABLE_EBPF_CAPTURE) return 1; - if (capture_disabled()) return 1; + __u64 cookie = get_socket_cookie(skb); + if (!cookie) + return 1; + __u32 src_ip = 0; __u16 src_port = 0; - __u8 ip_proto = 0; __u32 dst_ip = 0; __u16 dst_port = 0; - int ret = parse_packet(skb, 0, &src_ip, &src_port, &dst_ip, &dst_port, &ip_proto); - if (ret) { - TRACE_PACKET("cg/eg", true, skb->local_ip4, skb->remote_ip4, bpf_htons(skb->local_port & 0xffff), skb->remote_port & 0xffff, bpf_skb_cgroup_id(skb)); - struct pkt_data data = { - .cgroup_id = bpf_skb_cgroup_id(skb), - .rewrite_src_port = bpf_htons(skb->local_port & 0xffff), - }; - - __u64 key_skb = (__u64)skb; - bpf_map_update_elem(&pkt_context, &key_skb, &data, BPF_ANY); + int ret = parse_packet(skb, 0, &src_ip, &src_port, &dst_ip, &dst_port, NULL); + TRACE_PACKET("cg/in", true, skb->local_ip4, skb->remote_ip4, skb->local_port & 0xffff, skb->remote_port & 0xffff, bpf_skb_cgroup_id(skb)); + if (!ret) + { + return 1; } - return 1; -} -SEC("tc/ingress") -int packet_pull_ingress(struct __sk_buff* skb) -{ - if (DISABLE_EBPF_CAPTURE) - return 1; - if (capture_disabled()) + struct socket_cookie_data init_data = { + .cgroup_id = bpf_skb_cgroup_id(skb), + .src_ip = src_ip, + .dst_ip = dst_ip, + .src_port = src_port, + .dst_port = dst_port, + .side = side, + }; + bpf_map_update_elem(&socket_cookies, &cookie, &init_data, BPF_NOEXIST); + struct socket_cookie_data *data = bpf_map_lookup_elem(&socket_cookies, &cookie); + if (!data) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 101, 0l, 0l); return 1; + } - bpf_skb_pull_data(skb, skb->len); - - __u32 src_ip = 0; - __u16 src_port = 0; - __u32 dst_ip = 0; - __u16 dst_port = 0; - __u8 ip_proto = 0; - int ret = parse_packet(skb, 1, &src_ip, &src_port, &dst_ip, &dst_port, &ip_proto); - if (ret) { - TRACE_PACKET("tc/in", false, dst_ip, src_ip, dst_port, src_port, 0); - - // in some cases packet after "cgroup_skb/egress" misses "tc/egress" part and get passed here to "tc/ingress" - __u64 key_skb = (__u64)skb; - struct pkt_data* data = bpf_map_lookup_elem(&pkt_context, &key_skb); - if (data) { - save_packet(skb, sizeof(struct ethhdr), 0, data->rewrite_src_port, 0, 0, data->cgroup_id, PACKET_DIRECTION_RECEIVED); - bpf_map_delete_elem(&pkt_context, &key_skb); - TRACE_PACKET_SENT("tc/in"); - } + if (data->side == side) + { + src_ip = data->src_ip; + src_port = data->src_port; + dst_ip = data->dst_ip; + dst_port = data->dst_port; } - return 0; //TC_ACT_OK + else + { + src_ip = data->dst_ip; + src_port = data->dst_port; + dst_ip = data->src_ip; + dst_port = data->src_port; + } + + save_packet(skb, 0, src_ip, src_port, dst_ip, dst_port, bpf_skb_cgroup_id(skb), side); + + return 1; } -SEC("tc/egress") -int packet_pull_egress(struct __sk_buff* skb) +SEC("cgroup_skb/ingress") +int filter_ingress_packets(struct __sk_buff *skb) { - if (DISABLE_EBPF_CAPTURE) - return 1; - if (capture_disabled()) - return 1; + return filter_packets(skb, PACKET_DIRECTION_RECEIVED); +} - bpf_skb_pull_data(skb, skb->len); - __u32 src_ip = 0; - __u16 src_port = 0; - __u32 dst_ip = 0; - __u16 dst_port = 0; - __u8 ip_proto = 0; - int ret = parse_packet(skb, 1, &src_ip, &src_port, &dst_ip, &dst_port, &ip_proto); - if (ret) { - TRACE_PACKET("tc/eg", false, src_ip, dst_ip, src_port, dst_port, bpf_skb_cgroup_id(skb)); - - __u64 key_skb = (__u64)skb; - struct pkt_data* data = bpf_map_lookup_elem(&pkt_context, &key_skb); - if (data) { - save_packet(skb, sizeof(struct ethhdr), 0, data->rewrite_src_port, 0, 0, data->cgroup_id, PACKET_DIRECTION_SENT); - bpf_map_delete_elem(&pkt_context, &key_skb); - TRACE_PACKET_SENT("tc/eg"); - } - } - return 0; // TC_ACT_OK +SEC("cgroup_skb/egress") +int filter_egress_packets(struct __sk_buff *skb) +{ + return filter_packets(skb, PACKET_DIRECTION_SENT); } -struct pkt_sniffer_ctx { - struct __sk_buff* skb; +struct pkt_sniffer_ctx +{ + struct __sk_buff *skb; __u32 offset; __u32 rewrite_ip_src; __u16 rewrite_port_src; @@ -196,8 +185,9 @@ struct pkt_sniffer_ctx { __u8 direction; }; -static __noinline void _save_packet(struct pkt_sniffer_ctx* ctx); -static __always_inline void save_packet(struct __sk_buff* skb, __u32 offset, __u32 rewrite_ip_src, __u16 rewrite_port_src, __u32 rewrite_ip_dst, __u16 rewrite_port_dst, __u64 cgroup_id, __u8 direction) { +static __noinline void _save_packet(struct pkt_sniffer_ctx *ctx); +static __always_inline void save_packet(struct __sk_buff *skb, __u32 offset, __u32 rewrite_ip_src, __u16 rewrite_port_src, __u32 rewrite_ip_dst, __u16 rewrite_port_dst, __u64 cgroup_id, __u8 direction) +{ struct pkt_sniffer_ctx ctx = { .skb = skb, .offset = offset, @@ -211,9 +201,11 @@ static __always_inline void save_packet(struct __sk_buff* skb, __u32 offset, __u return _save_packet(&ctx); } -// mark _save_packet as _noinline to make BPF-to-BPF call -static __noinline void _save_packet(struct pkt_sniffer_ctx* ctx) { - struct __sk_buff* skb = ctx->skb; +// TODO: remove offset: +// mark _save_packet as _noinline to make BPF-to-BPF call +static __noinline void _save_packet(struct pkt_sniffer_ctx *ctx) +{ + struct __sk_buff *skb = ctx->skb; __u32 offset = ctx->offset; __u32 rewrite_ip_src = ctx->rewrite_ip_src; __u16 rewrite_port_src = ctx->rewrite_port_src; @@ -221,86 +213,112 @@ static __noinline void _save_packet(struct pkt_sniffer_ctx* ctx) { __u16 rewrite_port_dst = ctx->rewrite_port_dst; __u64 cgroup_id = ctx->cgroup_id; __u8 direction = ctx->direction; + int zero = 0; + + struct pkt *p = bpf_map_lookup_elem(&pkt_heap, &zero); + if (p == NULL) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 3, 0l, 0l); + return; + } - void* data = (void*)(long)skb->data; - __u32 pkt_len = skb->len; + // void *data = (void *)(long)skb->data; + p->tot_len = skb->len; + p->counter = skb->len; - if (pkt_len < offset) { + if (p->counter < offset) + { return; } - data += offset; - pkt_len -= offset; + // data += offset; + // pkt_len -= offset; - if (pkt_len == 0) { + if (p->counter == 0) + { log_error(skb, LOG_ERROR_PKT_SNIFFER, 1, 0l, 0l); return; } - if (pkt_len > PKT_MAX_LEN) { + if (p->counter > PKT_MAX_LEN) + { log_error(skb, LOG_ERROR_PKT_SNIFFER, 2, 0l, 0l); return; } - int zero = 0; - struct pkt* p = bpf_map_lookup_elem(&pkt_heap, &zero); - if (p == NULL) { - log_error(skb, LOG_ERROR_PKT_SNIFFER, 3, 0l, 0l); - return; - } - __u64* pkt_id_ptr = bpf_map_lookup_elem(&pkt_id, &zero); - if (pkt_id_ptr == NULL) { + __u64 *pkt_id_ptr = bpf_map_lookup_elem(&pkt_id, &zero); + if (pkt_id_ptr == NULL) + { log_error(skb, LOG_ERROR_PKT_SNIFFER, 4, 0l, 0l); return; } p->timestamp = compat_get_uprobe_timestamp(); + //p->timestamp = 0; p->cgroup_id = cgroup_id; p->direction = direction; p->id = *pkt_id_ptr; + p->num = 0; + p->len = 0; + p->last = 0; (*pkt_id_ptr)++; - __u32 read_len = 0; - - for (__u32 i = 0; (i < PKT_MAX_LEN / PKT_PART_LEN) && pkt_len; i++) { - read_len = (pkt_len < PKT_PART_LEN) ? pkt_len : PKT_PART_LEN; - if (read_len < 0) { + for (__u32 i = 0; (i < PKT_MAX_LEN / PKT_PART_LEN) && p->counter; i++) + { + p->len = (p->counter < PKT_PART_LEN) ? p->counter : PKT_PART_LEN; + if (p->len < 0) + { log_error(skb, LOG_ERROR_PKT_SNIFFER, 5, 0l, 0l); + bpf_printk("!!! TAIL NOT SENT0: %d", p->len); return; } long err = 0; p->num = i; - p->len = read_len; + p->counter -= p->len; + p->last = (p->counter == 0) ? 1 : 0; - - if (p->len == sizeof(p->buf)) { - err = bpf_probe_read_kernel(p->buf, sizeof(p->buf), data + i * PKT_PART_LEN); - } else { - read_len &= (sizeof(p->buf) - 1); // Buffer must be N^2 - err = bpf_probe_read_kernel(p->buf, read_len, data + i * PKT_PART_LEN); + if (p->len == PKT_PART_LEN) + { + err = bpf_skb_load_bytes(skb, i * PKT_PART_LEN, &p->buf[0], PKT_PART_LEN); + } + else + { + __s32 p_len = p->len; + for (int j = 0; j < 4096; j++) { + if(bpf_skb_load_bytes(skb, i * PKT_PART_LEN+j, &p->buf[j], 1)) break; + p_len--; + } + if (p_len != 0) { + err = -1; + } } - if (err != 0) { + if (err != 0) + { log_error(skb, LOG_ERROR_PKT_SNIFFER, 6, 0l, 0l); + bpf_printk("ERROR 2"); return; } - pkt_len -= read_len; - p->last = (pkt_len == 0) ? 1 : 0; - struct iphdr* ip = (struct iphdr*)p->buf; + struct iphdr *ip = (struct iphdr *)p->buf; if (rewrite_ip_src) ip->saddr = rewrite_ip_src; if (rewrite_ip_dst) ip->daddr = rewrite_ip_dst; - if (ip->protocol == IPPROTO_TCP || ip->protocol == IPPROTO_UDP) { + if (ip->protocol == IPPROTO_TCP || ip->protocol == IPPROTO_UDP) + { int hdrsize = ip->ihl * 4; - __u16* src_dst = (__u16*)(&p->buf[0] + hdrsize); + __u16 *src_dst = (__u16 *)(&p->buf[0] + hdrsize); if (rewrite_port_src) *src_dst = rewrite_port_src; if (rewrite_port_dst) *(src_dst + 1) = rewrite_port_dst; } - bpf_perf_event_output(skb, &pkts_buffer, BPF_F_CURRENT_CPU, p, sizeof(struct pkt)); + if (bpf_perf_event_output(skb, &pkts_buffer, BPF_F_CURRENT_CPU, p, sizeof(struct pkt))) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 7, 0l, 0l); + bpf_printk("ERROR 3"); + } } } @@ -309,30 +327,35 @@ static __noinline void _save_packet(struct pkt_sniffer_ctx* ctx) { 0 in case packet has TCP source or destination port equal to 443 - in this case packet is treated as TLS and not going to be processed not 0 in other cases */ -static __always_inline int parse_packet(struct __sk_buff* skb, int is_tc, __u32* src_ip4, __u16* src_port, __u32* dst_ip4, __u16* dst_port, __u8* ipp) { - void* data = (void*)(long)skb->data; - void* data_end = (void*)(long)skb->data_end; - void* cursor = data; - - if (is_tc) { - struct ethhdr* eth = (struct ethhdr*)cursor; - if (eth + 1 > (struct ethhdr*)data_end) +static __always_inline int parse_packet(struct __sk_buff *skb, int is_tc, __u32 *src_ip4, __u16 *src_port, __u32 *dst_ip4, __u16 *dst_port, __u8 *ipp) +{ + void *data = (void *)(long)skb->data; + void *data_end = (void *)(long)skb->data_end; + void *cursor = data; + + if (is_tc) + { + struct ethhdr *eth = (struct ethhdr *)cursor; + if (eth + 1 > (struct ethhdr *)data_end) return 1; cursor += sizeof(struct ethhdr); } __u8 ip_proto = 0; - if (skb->protocol == bpf_htons(ETH_P_IP)) { + if (skb->protocol == bpf_htons(ETH_P_IP)) + { - struct iphdr* ip = (struct iphdr*)cursor; - if (ip + 1 > (struct iphdr*)data_end) + struct iphdr *ip = (struct iphdr *)cursor; + if (ip + 1 > (struct iphdr *)data_end) return 2; - if (src_ip4) { + if (src_ip4) + { *src_ip4 = ip->saddr; } - if (dst_ip4) { + if (dst_ip4) + { *dst_ip4 = ip->daddr; } @@ -340,29 +363,33 @@ static __always_inline int parse_packet(struct __sk_buff* skb, int is_tc, __u32* if (hdrsize < sizeof(struct iphdr)) return 3; - if ((void*)ip + hdrsize > data_end) + if ((void *)ip + hdrsize > data_end) return 4; cursor += hdrsize; ip_proto = ip->protocol; - if (ipp) { + if (ipp) + { *ipp = ip_proto; } if (ip_proto == IPPROTO_TCP) { - struct tcphdr* tcp = (struct tcphdr*)cursor; - if (tcp + 1 > (struct tcphdr*)data_end) + struct tcphdr *tcp = (struct tcphdr *)cursor; + if (tcp + 1 > (struct tcphdr *)data_end) return 5; - if (src_port) { + if (src_port) + { *src_port = tcp->source; } - if (dst_port) { + if (dst_port) + { *dst_port = tcp->dest; } cursor += tcp->doff * 4; - if (tcp->dest == bpf_htons(443) || tcp->source == bpf_htons(443)) { + if (tcp->dest == bpf_htons(443) || tcp->source == bpf_htons(443)) + { // skip only packets with tcp port 443 to support previous bpf filter return 0; } @@ -370,17 +397,18 @@ static __always_inline int parse_packet(struct __sk_buff* skb, int is_tc, __u32* if (ip_proto == IPPROTO_UDP) { - struct udphdr* udp = (struct udphdr*)cursor; - if (udp + 1 > (struct udphdr*)data_end) + struct udphdr *udp = (struct udphdr *)cursor; + if (udp + 1 > (struct udphdr *)data_end) return 5; - if (src_port) { + if (src_port) + { *src_port = udp->source; } - if (dst_port) { + if (dst_port) + { *dst_port = udp->dest; } } - } return 6; diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index ad107f4..71f84f7 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -50,7 +50,7 @@ func (objs *BpfObjectsImpl) loadBpfObjects(bpfConstants map[string]uint64, reade if errors.As(err, &ve) { errStr := fmt.Sprintf("%+v", ve) if len(errStr) > 2048 { - errStr = "(truncated) " + errStr[len(errStr)-1024:] + //XXX errStr = "(truncated) " + errStr[len(errStr)-1024:] } log.Warn().Msg(fmt.Sprintf("Got verifier error: %v", errStr)) } diff --git a/pkg/discoverer/discoverer.go b/pkg/discoverer/discoverer.go index f757716..3462ee6 100644 --- a/pkg/discoverer/discoverer.go +++ b/pkg/discoverer/discoverer.go @@ -31,7 +31,7 @@ type CgroupData struct { type InternalEventsDiscoverer interface { Start() error CgroupsInfo() *lru.Cache[CgroupID, ContainerID] - ContainersInfo() *lru.Cache[ContainerID, CgroupData] + ContainersInfo() *lru.Cache[ContainerID, []CgroupData] TargetCgroup(cgroupId uint64) UntargetCgroup(cgroupId uint64) } @@ -46,7 +46,7 @@ type InternalEventsDiscovererImpl struct { readerFoundCgroup *perf.Reader cgroupsInfo *lru.Cache[CgroupID, ContainerID] - containersInfo *lru.Cache[ContainerID, CgroupData] + containersInfo *lru.Cache[ContainerID, []CgroupData] pids *pids } @@ -62,7 +62,7 @@ func NewInternalEventsDiscoverer(procfs string, bpfObjects *bpf.BpfObjects) Inte if impl.cgroupsInfo, err = lru.New[CgroupID, ContainerID](16384); err != nil { return nil } - if impl.containersInfo, err = lru.New[ContainerID, CgroupData](16384); err != nil { + if impl.containersInfo, err = lru.New[ContainerID, []CgroupData](16384); err != nil { return nil } impl.pids, err = newPids(procfs, bpfObjects, impl.containersInfo) @@ -116,7 +116,7 @@ func (e *InternalEventsDiscovererImpl) CgroupsInfo() *lru.Cache[CgroupID, Contai return e.cgroupsInfo } -func (e *InternalEventsDiscovererImpl) ContainersInfo() *lru.Cache[ContainerID, CgroupData] { +func (e *InternalEventsDiscovererImpl) ContainersInfo() *lru.Cache[ContainerID, []CgroupData] { return e.containersInfo } @@ -160,7 +160,15 @@ func (e *InternalEventsDiscovererImpl) scanExistingCgroups(isCgroupsV2 bool) { } e.cgroupsInfo.Add(CgroupID(cgroupId), ContainerID(contId)) - e.containersInfo.Add(ContainerID(contId), CgroupData{CgroupPath: s, CgroupID: CgroupID(cgroupId)}) + + item := CgroupData{CgroupPath: s, CgroupID: CgroupID(cgroupId)} + if !e.containersInfo.Contains(ContainerID(contId)) { + e.containersInfo.Add(ContainerID(contId), []CgroupData{item}) + } else { + v, _ := e.containersInfo.Get(ContainerID(contId)) + v = append(v, item) + e.containersInfo.Add(ContainerID(contId), v) + } log.Debug().Uint64("Cgroup ID", cgroupId).Str("Container ID", contId).Msg("Initial cgroup is detected") return nil @@ -277,17 +285,21 @@ func (e *InternalEventsDiscovererImpl) handleFoundCgroup(isCgroupsV2 bool) { } contId, _ := GetContainerIdFromCgroupPath(cgroupPath) if contId != "" { - if _, ok := e.containersInfo.Get(ContainerID(contId)); ok { - continue - } - cgroupId, err := GetCgroupIdByPath(cgroupPath) if err != nil { log.Warn().Str("Path", cgroupPath).Msg("Can not find out cgroup id by path") continue } e.cgroupsInfo.Add(CgroupID(cgroupId), ContainerID(contId)) - e.containersInfo.Add(ContainerID(contId), CgroupData{CgroupPath: cgroupPath, CgroupID: CgroupID(cgroupId)}) + item := CgroupData{CgroupPath: cgroupPath, CgroupID: CgroupID(cgroupId)} + if !e.containersInfo.Contains(ContainerID(contId)) { + e.containersInfo.Add(ContainerID(contId), []CgroupData{item}) + } else { + v, _ := e.containersInfo.Get(ContainerID(contId)) + v = append(v, item) + e.containersInfo.Add(ContainerID(contId), v) + } + log.Debug().Uint64("Cgroup ID", cgroupId).Str("Container ID", contId).Str("Cgroup Path", cgroupPath).Msg("New cgroup is detected") } } diff --git a/pkg/discoverer/pids.go b/pkg/discoverer/pids.go index 1603f3e..d5ac0df 100644 --- a/pkg/discoverer/pids.go +++ b/pkg/discoverer/pids.go @@ -35,7 +35,7 @@ type pidInfo struct { type pids struct { procfs string bpfObjs *bpf.BpfObjects - containersInfo *lru.Cache[ContainerID, CgroupData] + containersInfo *lru.Cache[ContainerID, []CgroupData] readerFoundPid *perf.Reader discoveredPIDs *lru.Cache[uint32, *pidInfo] targetedPIDs *lru.Cache[uint32, *pidInfo] @@ -43,7 +43,7 @@ type pids struct { scanGolangQueue chan foundPidEvent } -func newPids(procfs string, bpfObjs *bpf.BpfObjects, containersInfo *lru.Cache[ContainerID, CgroupData]) (*pids, error) { +func newPids(procfs string, bpfObjs *bpf.BpfObjects, containersInfo *lru.Cache[ContainerID, []CgroupData]) (*pids, error) { discoveredPids, err := lru.New[uint32, *pidInfo](16384) if err != nil { @@ -105,7 +105,7 @@ func (p *pids) targetCgroup(cgroupId uint64) { err = hook.InstallHooks(p.bpfObjs, ex, offsets) if err != nil { - log.Warn().Err(err).Uint32("pid", pid).Uint64("cgroup", cgroupId).Msg("install go hook failed") + log.Debug().Err(err).Uint32("pid", pid).Uint64("cgroup", cgroupId).Msg("install go hook failed") return } pi.goHook = &hook @@ -240,7 +240,7 @@ func (p *pids) installGoHook(e foundPidEvent) (*goHooks.GoHooks, string) { err = hook.InstallHooks(p.bpfObjs, ex, offsets) if err != nil { - log.Warn().Err(err).Uint32("pid", e.pid).Uint64("cgroup", e.cgroup).Msg("install go hook failed") + log.Debug().Err(err).Uint32("pid", e.pid).Uint64("cgroup", e.cgroup).Msg("install go hook failed") return nil, "" } @@ -304,17 +304,19 @@ func (p *pids) scanPidsV2() error { if id == "" { continue } - ci, ok := p.containersInfo.Get(ContainerID(id)) + cis, ok := p.containersInfo.Get(ContainerID(id)) if !ok { continue } - pEvent := foundPidEvent{ - cgroup: uint64(ci.CgroupID), - pid: uint32(n), - } + for _, ci := range cis { + pEvent := foundPidEvent{ + cgroup: uint64(ci.CgroupID), + pid: uint32(n), + } - p.newPidFound(&pEvent) + p.newPidFound(&pEvent) + } } return nil @@ -369,17 +371,19 @@ func (p *pids) scanPidsV1() error { continue } - ci, ok := p.containersInfo.Get(ContainerID(id)) + cis, ok := p.containersInfo.Get(ContainerID(id)) if !ok { continue } - pEvent := foundPidEvent{ - cgroup: uint64(ci.CgroupID), - pid: uint32(n), - } + for _, ci := range cis { + pEvent := foundPidEvent{ + cgroup: uint64(ci.CgroupID), + pid: uint32(n), + } - p.newPidFound(&pEvent) + p.newPidFound(&pEvent) + } } return nil diff --git a/pkg/hooks/packet/packet_sniffer.go b/pkg/hooks/packet/packet_sniffer.go index 7f77981..7bd60a0 100644 --- a/pkg/hooks/packet/packet_sniffer.go +++ b/pkg/hooks/packet/packet_sniffer.go @@ -3,18 +3,11 @@ package packet import ( "errors" "fmt" - "runtime" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" - "os" - "path/filepath" - "strconv" - "github.com/rs/zerolog/log" - "github.com/vishvananda/netlink" - "github.com/vishvananda/netns" ) type podLinks struct { @@ -25,26 +18,21 @@ type podLinks struct { type PacketFilter struct { ingressFilterProgram *ebpf.Program egressFilterProgram *ebpf.Program - ingressPullProgram *ebpf.Program - egressPullProgram *ebpf.Program traceCgroupConnect *ebpf.Program cgroupHashMap *ebpf.Map attachedPods map[string]*podLinks tcClient TcClient } -func NewPacketFilter(procfs string, ingressFilterProgram, egressFilterProgram, pullIngress, pullEgress, traceCgroupConnect *ebpf.Program, cgroupHash *ebpf.Map) (*PacketFilter, error) { +func NewPacketFilter(procfs string, ingressFilterProgram, egressFilterProgram, traceCgroupConnect *ebpf.Program, cgroupHash *ebpf.Map) (*PacketFilter, error) { pf := &PacketFilter{ ingressFilterProgram: ingressFilterProgram, egressFilterProgram: egressFilterProgram, - ingressPullProgram: pullIngress, - egressPullProgram: pullEgress, traceCgroupConnect: traceCgroupConnect, cgroupHashMap: cgroupHash, attachedPods: make(map[string]*podLinks), tcClient: &TcClientImpl{}, } - pf.UpdateTCPrograms(procfs) return pf, nil } @@ -52,108 +40,6 @@ func (p *PacketFilter) Close() error { return p.tcClient.CleanTC() } -func (p *PacketFilter) UpdateTCPrograms(procfs string) { - links, err := netlink.LinkList() - if err != nil { - log.Error().Err(err).Msg("Get link list failed:") - return - } - - for _, l := range links { - if err := p.tcClient.SetupTC(l, p.ingressPullProgram, p.egressPullProgram); err != nil { - log.Error().Str("link", l.Attrs().Name).Err(err).Msg("Setup TC failed:") - continue - } - - log.Info().Str("link name", l.Attrs().Name).Int("link", l.Attrs().Index).Msg("Attached TC programs:") - } - - pids, err := getPIDsWithNetNamespace(procfs) - if err != nil { - log.Error().Err(err).Msg("Get pids list failed:") - return - } - - nsHandles := make(map[netns.NsHandle]struct{}) - for _, pid := range pids { - fname := fmt.Sprintf("%v/%v/ns/net", procfs, pid) - if nsh, err := netns.GetFromPath(fname); err != nil { - log.Warn().Uint32("pid", pid).Str("file", fname).Err(err).Msg("Get netns failed:") - } else { - nsHandles[nsh] = struct{}{} - } - } - var totalPrograms int - for h := range nsHandles { - done := make(chan bool) - errors := make(chan error) - - go func(nsh netns.NsHandle, done chan<- bool) { - // Setting a netns should be done from a dedicated OS thread. - // - // goroutines are not really OS threads, we try to mimic the issue by - // locking the OS thread to this goroutine - // - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - oldnetns, err := netns.Get() - - if err != nil { - errors <- fmt.Errorf("Unable to get netns of current thread %v", err) - return - } - - if err := netns.Set(nsh); err != nil { - errors <- fmt.Errorf("Unable to set netns of handle %v - %v", h, err) - return - } - - lo := -1 - links, err := netlink.LinkList() - if err != nil { - errors <- fmt.Errorf("Get link list in netns %v failed: %v", h, err) - return - } - var lnk netlink.Link - for _, link := range links { - if link.Attrs().Name == "lo" { - lo = link.Attrs().Index - lnk = link - break - } - } - if lo == -1 { - errors <- fmt.Errorf("Can not get lo id for netns %v", h) - return - } - - if err := p.tcClient.SetupTC(lnk, p.ingressPullProgram, p.egressPullProgram); err != nil { - log.Error().Int("link", lo).Err(err).Msg("Setup TC failed:") - errors <- fmt.Errorf("Unable to setup tc netns: %v iface: %v error: %v", h, lo, err) - return - } - - log.Debug().Int("netns", int(h)).Int("link", lo).Msg("Attached netns TC lo programs:") - totalPrograms++ - - if err := netns.Set(oldnetns); err != nil { - errors <- fmt.Errorf("Unable to set back netns of current thread %v", err) - return - } - - done <- true - }(h, done) - - select { - case err := <-errors: - log.Error().Err(err).Msg("Setup netns program failed:") - case <-done: - } - } - log.Info().Int("attached", totalPrograms).Msg("Attached netns TC programs:") -} - func (t *PacketFilter) AttachPod(uuid, cgroupV2Path string, cgoupIDs []uint64) error { log.Info().Str("pod", uuid).Str("path", cgroupV2Path).Msg("Attaching pod:") @@ -215,30 +101,3 @@ func (t *PacketFilter) DetachPod(uuid string) error { delete(t.attachedPods, uuid) return nil } - -func getPIDsWithNetNamespace(procfs string) ([]uint32, error) { - var pids []uint32 - - err := filepath.Walk(procfs, func(path string, info os.FileInfo, err error) error { - if err != nil { - return nil - } - - if info.IsDir() { - pid, err := strconv.Atoi(info.Name()) - if err == nil { - netNsPath := filepath.Join(path, "ns/net") - if _, err := os.Stat(netNsPath); err == nil { - pids = append(pids, uint32(pid)) - } - } - } - return nil - }) - - if err != nil { - return nil, err - } - - return pids, nil -} diff --git a/pkg/poller/log/bpf_logger_messages.go b/pkg/poller/log/bpf_logger_messages.go index cbfa076..5e628bf 100644 --- a/pkg/poller/log/bpf_logger_messages.go +++ b/pkg/poller/log/bpf_logger_messages.go @@ -27,4 +27,5 @@ var bpfLogMessages = []string{ /*0022*/ "[%d] Unable to get go user-kernel context [fd: %d]]", /*0023*/ "[%d] Unable to get go tcp connection fd info]", /*0024*/ "[%d] Packet sniffer error]", + /*0025*/ "[%d] File probes error]", } diff --git a/pkg/poller/packets/packets_poller.go b/pkg/poller/packets/packets_poller.go index fa2fd78..3891004 100644 --- a/pkg/poller/packets/packets_poller.go +++ b/pkg/poller/packets/packets_poller.go @@ -21,8 +21,10 @@ type tracerPacketsData struct { Timestamp uint64 CgroupID uint64 ID uint64 + Len uint32 + TotLen uint32 + Counter uint32 Num uint16 - Len uint16 Last uint16 Direction uint8 Data [4096]uint8 @@ -78,22 +80,23 @@ func (p *PacketsPoller) Start() { func (p *PacketsPoller) poll() { // tracerPktsChunk is generated by bpf2go. - chunks := make(chan *tracerPktChunk) + chunks := make(chan tracerPktChunk) go p.pollChunksPerfBuffer(chunks) for chunk := range chunks { if err := p.handlePktChunk(chunk); err != nil { + log.Error().Err(err).Msg("ERROR") //XXX utils.LogError(err) } } } -func (p *PacketsPoller) handlePktChunk(chunk *tracerPktChunk) error { +func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { p.mtx.Lock() defer p.mtx.Unlock() - const expectedChunkSize = 4132 + const expectedChunkSize = 4148 data := chunk.buf if len(data) != expectedChunkSize { return fmt.Errorf("bad pkt chunk: size %v expected: %v", len(data), expectedChunkSize) @@ -107,20 +110,24 @@ func (p *PacketsPoller) handlePktChunk(chunk *tracerPktChunk) error { } if ptr.Num != pkts.num { if ptr.ID == pkts.id { - return fmt.Errorf("Lost packet message from %v to %v", pkts.id, ptr.ID) + return fmt.Errorf("lost packet message(1) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v tot_len: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction, ptr.TotLen) } + // ID is changed, so new packet is started: pkts.len = 0 pkts.num = 0 pkts.id = ptr.ID if ptr.Num != pkts.num { - return fmt.Errorf("Lost packet message from %v to %v", pkts.num, ptr.Num) + return fmt.Errorf("lost packet message(2) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v tot_len: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction, ptr.TotLen) } } copy(pkts.buf[pkts.len:], ptr.Data[:ptr.Len]) pkts.len += uint32(ptr.Len) + if ptr.Last != 0 { - err := p.sorter.WritePlanePacket(ptr.Timestamp, ptr.CgroupID, ptr.Direction, layers.LayerTypeEthernet, p.ethhdr, gopacket.Payload(pkts.buf[:pkts.len])) + //XXX: + //err := p.sorter.WritePlanePacket(ptr.Timestamp, ptr.CgroupID, ptr.Direction, layers.LayerTypeEthernet, p.ethhdr, gopacket.Payload(pkts.buf[:pkts.len])) + err := p.sorter.WritePlanePacket(0, ptr.CgroupID, ptr.Direction, layers.LayerTypeEthernet, p.ethhdr, gopacket.Payload(pkts.buf[:pkts.len])) if err != nil { return err } @@ -135,7 +142,7 @@ func (p *PacketsPoller) handlePktChunk(chunk *tracerPktChunk) error { return nil } -func (p *PacketsPoller) pollChunksPerfBuffer(chunks chan<- *tracerPktChunk) { +func (p *PacketsPoller) pollChunksPerfBuffer(chunks chan<- tracerPktChunk) { log.Info().Msg("Start polling for tls events") for { @@ -161,7 +168,7 @@ func (p *PacketsPoller) pollChunksPerfBuffer(chunks chan<- *tracerPktChunk) { buf: record.RawSample, } - chunks <- &chunk - + //chunks <- chunk + p.handlePktChunk(chunk) //XXX } } diff --git a/tracer.go b/tracer.go index 9eac9fe..f936b57 100644 --- a/tracer.go +++ b/tracer.go @@ -83,7 +83,7 @@ func (t *Tracer) Init( if !*disableEbpfCapture { //TODO: for cgroup V2 only - t.packetFilter, err = packetHooks.NewPacketFilter(procfs, t.bpfObjects.BpfObjs.FilterIngressPackets, t.bpfObjects.BpfObjs.FilterEgressPackets, t.bpfObjects.BpfObjs.PacketPullIngress, t.bpfObjects.BpfObjs.PacketPullEgress, t.bpfObjects.BpfObjs.TraceCgroupConnect4, t.bpfObjects.BpfObjs.CgroupIds) + t.packetFilter, err = packetHooks.NewPacketFilter(procfs, t.bpfObjects.BpfObjs.FilterIngressPackets, t.bpfObjects.BpfObjs.FilterEgressPackets, t.bpfObjects.BpfObjs.TraceCgroupConnect4, t.bpfObjects.BpfObjs.CgroupIds) if err != nil { return err } @@ -128,39 +128,37 @@ func (t *Tracer) updateTargets(addPods, removePods []*v1.Pod, settings uint32) e for _, pod := range addPods { pd := t.runningPods[pod.UID] for _, containerId := range getContainerIDs(pod) { - value, ok := t.eventsDiscoverer.ContainersInfo().Get(discoverer.ContainerID(containerId)) + values, ok := t.eventsDiscoverer.ContainersInfo().Get(discoverer.ContainerID(containerId)) if !ok { // pod can be on a different node continue } - cInfo := containerInfo{ - cgroupPath: value.CgroupPath, - cgroupID: uint64(value.CgroupID), - } - pd.containers = append(pd.containers, cInfo) - - if t.packetFilter != nil { - if err := t.packetFilter.AttachPod(string(pod.UID), cInfo.cgroupPath, []uint64{cInfo.cgroupID}); err != nil { - log.Error().Err(err).Uint64("Cgroup ID", cInfo.cgroupID).Str("Cgroup path", cInfo.cgroupPath).Str("pod", pod.Name).Msg("Attach pod to cgroup failed:") - return err + for _, value := range values { + cInfo := containerInfo{ + cgroupPath: value.CgroupPath, + cgroupID: uint64(value.CgroupID), } - log.Info().Str("pod", pod.Name).Msg("Attached pod to cgroup:") - } else { - if err := t.bpfObjects.BpfObjs.CgroupIds.Update(cInfo.cgroupID, uint32(0), ebpf.UpdateNoExist); err != nil { - log.Error().Err(err).Uint64("Cgroup ID", cInfo.cgroupID).Msg("Cgroup IDs update failed") - return err + pd.containers = append(pd.containers, cInfo) + + if t.packetFilter != nil { + if err := t.packetFilter.AttachPod(string(pod.UID), cInfo.cgroupPath, []uint64{cInfo.cgroupID}); err != nil { + log.Error().Err(err).Uint64("Cgroup ID", cInfo.cgroupID).Str("Cgroup path", cInfo.cgroupPath).Str("pod", pod.Name).Msg("Attach pod to cgroup failed:") + return err + } + log.Info().Str("pod", pod.Name).Msg("Attached pod to cgroup:") + } else { + if err := t.bpfObjects.BpfObjs.CgroupIds.Update(cInfo.cgroupID, uint32(0), ebpf.UpdateNoExist); err != nil { + log.Error().Err(err).Uint64("Cgroup ID", cInfo.cgroupID).Msg("Cgroup IDs update failed") + return err + } } + t.eventsDiscoverer.TargetCgroup(cInfo.cgroupID) + log.Info().Str("Container ID", containerId).Uint64("Cgroup ID", cInfo.cgroupID).Msg("Cgroup has been targeted") } - t.eventsDiscoverer.TargetCgroup(cInfo.cgroupID) - log.Info().Str("Container ID", containerId).Uint64("Cgroup ID", cInfo.cgroupID).Msg("Cgroup has been targeted") } t.runningPods[pod.UID] = pd } - if t.packetFilter != nil { - t.packetFilter.UpdateTCPrograms(t.procfs) - } - return nil } From 377a4bc8cbb22992461162906ddf4ab86d4c806f Mon Sep 17 00:00:00 2001 From: Ilya Gavrilov Date: Wed, 23 Oct 2024 14:42:58 +0200 Subject: [PATCH 2/5] Fix errors loggong on pods not tracked --- bpf/packet_sniffer.c | 20 ++++++-------------- pkg/bpf/bpf.go | 2 +- pkg/discoverer/discoverer.go | 12 +++++++++--- pkg/hooks/packet/packet_sniffer.go | 7 ++++++- pkg/poller/packets/packets_poller.go | 25 ++++++------------------- tracer.go | 4 ++++ 6 files changed, 32 insertions(+), 38 deletions(-) diff --git a/bpf/packet_sniffer.c b/bpf/packet_sniffer.c index e447013..63fae5c 100644 --- a/bpf/packet_sniffer.c +++ b/bpf/packet_sniffer.c @@ -2,12 +2,12 @@ ------------------------------------------------------------------------------- Simplified packet flow diagram - eth0 ingress│ ▲ eth0 egress + │ ▲ + │ │ │ │ │ │ │ │ │ │ - tc/ingress hook│ │tc/egress hook │ │ │ │ │ │ @@ -22,20 +22,11 @@ cgroup_skb/ingress hook│ │cgroup_skb/egress -------------------------------------------------------------------------------- - Two types of hooks are in use: - 1. tc/ to hook on each kubernetes network interface - 2. cgroup_skb/ to hook on each targeted cgroup + cgroup_skb/ to hook on each targeted cgroup + socket cookies mechanism to track packets Each hook type attached into ingress and egrees parts. - cgroup_skb programs : - - cgroup_skb/ingress program exports incoming packet with 'received' flag onto perf buffer - - cgroup_skb/egress program saves ip/port information into the map, the packet expected to be exported into perf buffer once it get into tc/egress - - tc programs : - - use bpf_skb_pull_data bpf helper to load whole payload into sk_buf. Without that call kernel loads only first 1500 bytes - - export packets with overrided (from cgroup_skb/) ports into perf buffers - References: https://docs.cilium.io/en/stable/bpf/#bpf-guide @@ -88,7 +79,8 @@ const volatile __u64 DISABLE_EBPF_CAPTURE = 0; static __always_inline void save_packet(struct __sk_buff *skb, __u32 offset, __u32 rewrite_ip_src, __u16 rewrite_port_src, __u32 rewrite_ip_dst, __u16 rewrite_port_dst, __u64 cgroup_id, __u8 direction); static __always_inline int parse_packet(struct __sk_buff *skb, int is_tc, __u32 *src_ip4, __u16 *src_port, __u32 *dst_ip4, __u16 *dst_port, __u8 *ipp); -// TODO: remove cookies +// TODO: remove cookies from the socket_cookies map on socket close, +// untill this LRU performs cleaning int reported_cookie_error = 0; static __always_inline __u64 get_socket_cookie(struct __sk_buff *skb) diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 71f84f7..ad107f4 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -50,7 +50,7 @@ func (objs *BpfObjectsImpl) loadBpfObjects(bpfConstants map[string]uint64, reade if errors.As(err, &ve) { errStr := fmt.Sprintf("%+v", ve) if len(errStr) > 2048 { - //XXX errStr = "(truncated) " + errStr[len(errStr)-1024:] + errStr = "(truncated) " + errStr[len(errStr)-1024:] } log.Warn().Msg(fmt.Sprintf("Got verifier error: %v", errStr)) } diff --git a/pkg/discoverer/discoverer.go b/pkg/discoverer/discoverer.go index 3462ee6..a785d89 100644 --- a/pkg/discoverer/discoverer.go +++ b/pkg/discoverer/discoverer.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strings" + "sync" "unsafe" @@ -45,9 +46,10 @@ type InternalEventsDiscovererImpl struct { readerFoundOpenssl *perf.Reader readerFoundCgroup *perf.Reader - cgroupsInfo *lru.Cache[CgroupID, ContainerID] - containersInfo *lru.Cache[ContainerID, []CgroupData] - pids *pids + cgroupsInfo *lru.Cache[CgroupID, ContainerID] + containersInfo *lru.Cache[ContainerID, []CgroupData] + containersInfoMtx sync.Mutex + pids *pids } func NewInternalEventsDiscoverer(procfs string, bpfObjects *bpf.BpfObjects) InternalEventsDiscoverer { @@ -162,6 +164,7 @@ func (e *InternalEventsDiscovererImpl) scanExistingCgroups(isCgroupsV2 bool) { e.cgroupsInfo.Add(CgroupID(cgroupId), ContainerID(contId)) item := CgroupData{CgroupPath: s, CgroupID: CgroupID(cgroupId)} + e.containersInfoMtx.Lock() if !e.containersInfo.Contains(ContainerID(contId)) { e.containersInfo.Add(ContainerID(contId), []CgroupData{item}) } else { @@ -169,6 +172,7 @@ func (e *InternalEventsDiscovererImpl) scanExistingCgroups(isCgroupsV2 bool) { v = append(v, item) e.containersInfo.Add(ContainerID(contId), v) } + e.containersInfoMtx.Unlock() log.Debug().Uint64("Cgroup ID", cgroupId).Str("Container ID", contId).Msg("Initial cgroup is detected") return nil @@ -292,6 +296,7 @@ func (e *InternalEventsDiscovererImpl) handleFoundCgroup(isCgroupsV2 bool) { } e.cgroupsInfo.Add(CgroupID(cgroupId), ContainerID(contId)) item := CgroupData{CgroupPath: cgroupPath, CgroupID: CgroupID(cgroupId)} + e.containersInfoMtx.Lock() if !e.containersInfo.Contains(ContainerID(contId)) { e.containersInfo.Add(ContainerID(contId), []CgroupData{item}) } else { @@ -299,6 +304,7 @@ func (e *InternalEventsDiscovererImpl) handleFoundCgroup(isCgroupsV2 bool) { v = append(v, item) e.containersInfo.Add(ContainerID(contId), v) } + e.containersInfoMtx.Unlock() log.Debug().Uint64("Cgroup ID", cgroupId).Str("Container ID", contId).Str("Cgroup Path", cgroupPath).Msg("New cgroup is detected") } diff --git a/pkg/hooks/packet/packet_sniffer.go b/pkg/hooks/packet/packet_sniffer.go index 7bd60a0..6bc3463 100644 --- a/pkg/hooks/packet/packet_sniffer.go +++ b/pkg/hooks/packet/packet_sniffer.go @@ -82,7 +82,7 @@ func (t *PacketFilter) AttachPod(uuid, cgroupV2Path string, cgoupIDs []uint64) e func (t *PacketFilter) DetachPod(uuid string) error { log.Info().Str("pod", uuid).Msg("Detaching pod:") - p, ok := t.attachedPods[uuid] + p, ok := t.GetAttachedPod(uuid) if !ok { return fmt.Errorf("pod not attached") } @@ -101,3 +101,8 @@ func (t *PacketFilter) DetachPod(uuid string) error { delete(t.attachedPods, uuid) return nil } + +func (t *PacketFilter) GetAttachedPod(uuid string) (p *podLinks, ok bool) { + p, ok = t.attachedPods[uuid] + return +} diff --git a/pkg/poller/packets/packets_poller.go b/pkg/poller/packets/packets_poller.go index 3891004..e6da0b2 100644 --- a/pkg/poller/packets/packets_poller.go +++ b/pkg/poller/packets/packets_poller.go @@ -80,16 +80,8 @@ func (p *PacketsPoller) Start() { func (p *PacketsPoller) poll() { // tracerPktsChunk is generated by bpf2go. - chunks := make(chan tracerPktChunk) - go p.pollChunksPerfBuffer(chunks) - - for chunk := range chunks { - if err := p.handlePktChunk(chunk); err != nil { - log.Error().Err(err).Msg("ERROR") //XXX - utils.LogError(err) - } - } + go p.pollChunksPerfBuffer() } func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { @@ -110,14 +102,14 @@ func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { } if ptr.Num != pkts.num { if ptr.ID == pkts.id { - return fmt.Errorf("lost packet message(1) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v tot_len: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction, ptr.TotLen) + return fmt.Errorf("lost packet message(1) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction) } // ID is changed, so new packet is started: pkts.len = 0 pkts.num = 0 pkts.id = ptr.ID if ptr.Num != pkts.num { - return fmt.Errorf("lost packet message(2) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v tot_len: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction, ptr.TotLen) + return fmt.Errorf("lost packet message(2) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction) } } @@ -125,9 +117,7 @@ func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { pkts.len += uint32(ptr.Len) if ptr.Last != 0 { - //XXX: - //err := p.sorter.WritePlanePacket(ptr.Timestamp, ptr.CgroupID, ptr.Direction, layers.LayerTypeEthernet, p.ethhdr, gopacket.Payload(pkts.buf[:pkts.len])) - err := p.sorter.WritePlanePacket(0, ptr.CgroupID, ptr.Direction, layers.LayerTypeEthernet, p.ethhdr, gopacket.Payload(pkts.buf[:pkts.len])) + err := p.sorter.WritePlanePacket(ptr.Timestamp, ptr.CgroupID, ptr.Direction, layers.LayerTypeEthernet, p.ethhdr, gopacket.Payload(pkts.buf[:pkts.len])) if err != nil { return err } @@ -142,15 +132,13 @@ func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { return nil } -func (p *PacketsPoller) pollChunksPerfBuffer(chunks chan<- tracerPktChunk) { +func (p *PacketsPoller) pollChunksPerfBuffer() { log.Info().Msg("Start polling for tls events") for { record, err := p.chunksReader.Read() if err != nil { - close(chunks) - if errors.Is(err, perf.ErrClosed) { return } @@ -168,7 +156,6 @@ func (p *PacketsPoller) pollChunksPerfBuffer(chunks chan<- tracerPktChunk) { buf: record.RawSample, } - //chunks <- chunk - p.handlePktChunk(chunk) //XXX + p.handlePktChunk(chunk) } } diff --git a/tracer.go b/tracer.go index f936b57..bca051e 100644 --- a/tracer.go +++ b/tracer.go @@ -100,6 +100,10 @@ func (t *Tracer) updateTargets(addPods, removePods []*v1.Pod, settings uint32) e for _, pod := range removePods { if t.packetFilter != nil { + if _, ok := t.packetFilter.GetAttachedPod(string(pod.UID)); !ok { + // pod can be on a different node + continue + } if err := t.packetFilter.DetachPod(string(pod.UID)); err == nil { log.Info().Str("pod", pod.Name).Msg("Detached pod from cgroup:") } else { From 56fb5d1a847e2dc67752c87b6b58d12a4887c1f7 Mon Sep 17 00:00:00 2001 From: Ilya Gavrilov Date: Wed, 23 Oct 2024 17:40:50 +0200 Subject: [PATCH 3/5] rewrite packet capture eBPF code for preemptive kernels --- bpf/include/maps.h | 16 +++++- bpf/packet_sniffer.c | 78 ++++++++++++++++------------ pkg/poller/packets/packets_poller.go | 31 +++++------ 3 files changed, 72 insertions(+), 53 deletions(-) diff --git a/bpf/include/maps.h b/bpf/include/maps.h index 5bd3f60..a563b3e 100644 --- a/bpf/include/maps.h +++ b/bpf/include/maps.h @@ -112,13 +112,25 @@ struct { __type(key, int); __type(value, struct pkt); } pkt_heap SEC(".maps"); + +struct pkt_id_t { + __u64 id; + struct bpf_spin_lock lock; +}; struct { - __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(type, BPF_MAP_TYPE_ARRAY); __uint(max_entries, 1); __type(key, int); - __type(value, __u64); + __type(value, struct pkt_id_t); } pkt_id SEC(".maps"); +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_HASH); + __uint(max_entries, 4096); + __type(key, __u64); + __type(value, struct pkt); +} packet_context SEC(".maps"); + struct socket_cookie_data { __u64 cgroup_id; __u32 src_ip; diff --git a/bpf/packet_sniffer.c b/bpf/packet_sniffer.c index 63fae5c..7e75681 100644 --- a/bpf/packet_sniffer.c +++ b/bpf/packet_sniffer.c @@ -2,7 +2,7 @@ ------------------------------------------------------------------------------- Simplified packet flow diagram - │ ▲ + │ ▲ │ │ │ │ │ │ @@ -27,6 +27,8 @@ cgroup_skb/ingress hook│ │cgroup_skb/egress Each hook type attached into ingress and egrees parts. + Since preemption was introduced into eBPF starting from kernel 5.11, all functions should be thread-safe + References: https://docs.cilium.io/en/stable/bpf/#bpf-guide @@ -214,7 +216,6 @@ static __noinline void _save_packet(struct pkt_sniffer_ctx *ctx) return; } - // void *data = (void *)(long)skb->data; p->tot_len = skb->len; p->counter = skb->len; @@ -222,8 +223,6 @@ static __noinline void _save_packet(struct pkt_sniffer_ctx *ctx) { return; } - // data += offset; - // pkt_len -= offset; if (p->counter == 0) { @@ -237,58 +236,69 @@ static __noinline void _save_packet(struct pkt_sniffer_ctx *ctx) return; } - - __u64 *pkt_id_ptr = bpf_map_lookup_elem(&pkt_id, &zero); + struct pkt_id_t *pkt_id_ptr = bpf_map_lookup_elem(&pkt_id, &zero); if (pkt_id_ptr == NULL) { log_error(skb, LOG_ERROR_PKT_SNIFFER, 4, 0l, 0l); return; } + __u64 packet_id = 0; + bpf_spin_lock(&pkt_id_ptr->lock); + packet_id = pkt_id_ptr->id++; + bpf_spin_unlock(&pkt_id_ptr->lock); + + if (bpf_map_update_elem(&packet_context, &packet_id, p, BPF_NOEXIST)) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 5, 0l, 0l); + return; + } + p = bpf_map_lookup_elem(&packet_context, &packet_id); + if (!p) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 6, 0l, 0l); + return; + } + p->timestamp = compat_get_uprobe_timestamp(); - //p->timestamp = 0; p->cgroup_id = cgroup_id; p->direction = direction; - p->id = *pkt_id_ptr; + p->id = packet_id; p->num = 0; p->len = 0; p->last = 0; - (*pkt_id_ptr)++; +#pragma unroll for (__u32 i = 0; (i < PKT_MAX_LEN / PKT_PART_LEN) && p->counter; i++) { p->len = (p->counter < PKT_PART_LEN) ? p->counter : PKT_PART_LEN; - if (p->len < 0) - { - log_error(skb, LOG_ERROR_PKT_SNIFFER, 5, 0l, 0l); - bpf_printk("!!! TAIL NOT SENT0: %d", p->len); - return; - } - long err = 0; p->num = i; p->counter -= p->len; p->last = (p->counter == 0) ? 1 : 0; if (p->len == PKT_PART_LEN) { - err = bpf_skb_load_bytes(skb, i * PKT_PART_LEN, &p->buf[0], PKT_PART_LEN); + if (bpf_skb_load_bytes(skb, i * PKT_PART_LEN, &p->buf[0], PKT_PART_LEN) != 0) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 6, 0l, 0l); + goto save_end; + } } else { - __s32 p_len = p->len; - for (int j = 0; j < 4096; j++) { - if(bpf_skb_load_bytes(skb, i * PKT_PART_LEN+j, &p->buf[j], 1)) break; - p_len--; - } - if (p_len != 0) { - err = -1; - } - } + /* + FIXME: below loop should be simplified - if (err != 0) - { - log_error(skb, LOG_ERROR_PKT_SNIFFER, 6, 0l, 0l); - bpf_printk("ERROR 2"); - return; + so far next code can not pass verifier: + + p_len &= 0xFFF; + bpf_skb_load_bytes(skb, i * PKT_PART_LEN, &p->buf[0], p_len); + */ + + for (int j = 0; j < PKT_PART_LEN; j++) + { + if (bpf_skb_load_bytes(skb, i * PKT_PART_LEN + j, &p->buf[j], 1)) + break; + } } struct iphdr *ip = (struct iphdr *)p->buf; @@ -309,9 +319,13 @@ static __noinline void _save_packet(struct pkt_sniffer_ctx *ctx) if (bpf_perf_event_output(skb, &pkts_buffer, BPF_F_CURRENT_CPU, p, sizeof(struct pkt))) { log_error(skb, LOG_ERROR_PKT_SNIFFER, 7, 0l, 0l); - bpf_printk("ERROR 3"); } } +save_end: + if (bpf_map_delete_elem(&packet_context, &packet_id)) + { + log_error(skb, LOG_ERROR_PKT_SNIFFER, 100, 0l, 0l); + } } /* parse_packet identifies TLS packet diff --git a/pkg/poller/packets/packets_poller.go b/pkg/poller/packets/packets_poller.go index e6da0b2..c4b04d8 100644 --- a/pkg/poller/packets/packets_poller.go +++ b/pkg/poller/packets/packets_poller.go @@ -47,7 +47,7 @@ type PacketsPoller struct { mtx sync.Mutex chunksReader *perf.Reader sorter *bpf.PacketSorter - pktsMap map[int]*pktBuffer // CPU to packet + pktsMap map[uint64]*pktBuffer // packet id to packet } func NewPacketsPoller( @@ -58,7 +58,7 @@ func NewPacketsPoller( poller := &PacketsPoller{ ethhdr: ethernet.NewEthernetLayer(layers.EthernetTypeIPv4), sorter: sorter, - pktsMap: make(map[int]*pktBuffer), + pktsMap: make(map[uint64]*pktBuffer), } poller.chunksReader, err = perf.NewReader(bpfObjs.BpfObjs.PktsBuffer, os.Getpagesize()*10000) @@ -93,24 +93,16 @@ func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { if len(data) != expectedChunkSize { return fmt.Errorf("bad pkt chunk: size %v expected: %v", len(data), expectedChunkSize) } + ptr := (*tracerPacketsData)(unsafe.Pointer(&data[0])) - pkts, ok := p.pktsMap[chunk.cpu] + pkts, ok := p.pktsMap[ptr.ID] if !ok { - p.pktsMap[chunk.cpu] = &pktBuffer{} - pkts = p.pktsMap[chunk.cpu] + p.pktsMap[ptr.ID] = &pktBuffer{} + pkts = p.pktsMap[ptr.ID] } if ptr.Num != pkts.num { - if ptr.ID == pkts.id { - return fmt.Errorf("lost packet message(1) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction) - } - // ID is changed, so new packet is started: - pkts.len = 0 - pkts.num = 0 - pkts.id = ptr.ID - if ptr.Num != pkts.num { - return fmt.Errorf("lost packet message(2) id: (%v %v) num: (%v %v) len: %v last: %v dir: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction) - } + return fmt.Errorf("lost packet message id: (%v %v) num: (%v %v) len: %v last: %v dir: %v tot_len: %v cpu: %v", pkts.id, ptr.ID, pkts.num, ptr.Num, ptr.Len, ptr.Last, ptr.Direction, ptr.TotLen, chunk.cpu) } copy(pkts.buf[pkts.len:], ptr.Data[:ptr.Len]) @@ -122,12 +114,11 @@ func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { return err } - pkts.len = 0 - pkts.num = 0 - pkts.id++ + delete(p.pktsMap, ptr.ID) } else { pkts.num++ } + //TODO: check p.pktsMap size and rgarbage collect return nil } @@ -156,6 +147,8 @@ func (p *PacketsPoller) pollChunksPerfBuffer() { buf: record.RawSample, } - p.handlePktChunk(chunk) + if err = p.handlePktChunk(chunk); err != nil { + log.Error().Err(err).Msg("handle chunk failed") + } } } From 158d63107f3b590c43543fd1507667861cffd39e Mon Sep 17 00:00:00 2001 From: Ilya Gavrilov Date: Wed, 23 Oct 2024 18:02:19 +0200 Subject: [PATCH 4/5] memory checks --- pkg/poller/packets/packets_poller.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/pkg/poller/packets/packets_poller.go b/pkg/poller/packets/packets_poller.go index c4b04d8..54494d8 100644 --- a/pkg/poller/packets/packets_poller.go +++ b/pkg/poller/packets/packets_poller.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "sync" + "time" "unsafe" "github.com/cilium/ebpf/perf" @@ -82,6 +83,7 @@ func (p *PacketsPoller) poll() { // tracerPktsChunk is generated by bpf2go. go p.pollChunksPerfBuffer() + go p.checkBuffers() } func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { @@ -118,7 +120,6 @@ func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error { } else { pkts.num++ } - //TODO: check p.pktsMap size and rgarbage collect return nil } @@ -152,3 +153,25 @@ func (p *PacketsPoller) pollChunksPerfBuffer() { } } } + +func (p *PacketsPoller) checkBuffers() { + for { + p.mtx.Lock() + plen := len(p.pktsMap) + p.mtx.Unlock() + + log.Debug().Int("size", plen).Msg("packets map size") + if plen > 1024 { + log.Error().Int("size", plen).Msg("packets map is too big, removig elements") + p.mtx.Lock() + for i := range p.pktsMap { + delete(p.pktsMap, i) + if len(p.pktsMap) <= 1024 { + break + } + } + p.mtx.Unlock() + } + time.Sleep(5 * time.Second) + } +} From 00152c501745c600fac30967b672e8fde915a50d Mon Sep 17 00:00:00 2001 From: Ilya Gavrilov Date: Wed, 23 Oct 2024 18:07:38 +0200 Subject: [PATCH 5/5] memory checks --- pkg/poller/packets/packets_poller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/poller/packets/packets_poller.go b/pkg/poller/packets/packets_poller.go index 54494d8..beb7471 100644 --- a/pkg/poller/packets/packets_poller.go +++ b/pkg/poller/packets/packets_poller.go @@ -155,6 +155,8 @@ func (p *PacketsPoller) pollChunksPerfBuffer() { } func (p *PacketsPoller) checkBuffers() { + // only bug in eBPF code can cause pktsMap overflow + for { p.mtx.Lock() plen := len(p.pktsMap)