diff --git a/bpf/flow.h b/bpf/flow.h index ad009c2b5..383d347b2 100644 --- a/bpf/flow.h +++ b/bpf/flow.h @@ -46,6 +46,8 @@ typedef struct flow_metrics_t { u16 flags; // direction of the flow EGRESS / INGRESS u8 direction; + // who initiated of the connection: INITIATOR_SRC or INITIATOR_DST + u8 initiator; // The positive errno of a failed map insertion that caused a flow // to be sent via ringbuffer. // 0 otherwise diff --git a/bpf/flows.c b/bpf/flows.c index 23e671a87..4c7ea5a8e 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -195,6 +195,7 @@ static inline int flow_monitor(struct __sk_buff *skb) { .end_mono_time_ns = current_time, .flags = flags, .direction = UNKNOWN, + .initiator = INITIATOR_UNKNOWN, }; u8 *direction = (u8 *)bpf_map_lookup_elem(&flow_directions, &id); @@ -225,6 +226,8 @@ static inline int flow_monitor(struct __sk_buff *skb) { new_flow.direction = *direction; } + new_flow.initiator = get_connection_initiator(&id, flags); + // even if we know that the entry is new, another CPU might be concurrently inserting a flow // so we need to specify BPF_ANY long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); diff --git a/bpf/flows_common.h b/bpf/flows_common.h index 59fa3f508..8ed3ac8dc 100644 --- a/bpf/flows_common.h +++ b/bpf/flows_common.h @@ -31,6 +31,16 @@ #define FIN_ACK_FLAG 0x200 #define RST_ACK_FLAG 0x400 +// In conn_initiator_key, which sorted ip:port inititated the connection +#define INITIATOR_LOW 1 +#define INITIATOR_HIGH 2 + +// In flow_metrics, who initiated the connection +#define INITIATOR_SRC 1 +#define INITIATOR_DST 2 + +#define INITIATOR_UNKNOWN 0 + // Common Ringbuffer as a conduit for ingress/egress flows to userspace struct { __uint(type, BPF_MAP_TYPE_RINGBUF); @@ -47,16 +57,138 @@ struct { // Key: the flow identifier. Value: the flow direction. struct { - __uint(type, BPF_MAP_TYPE_LRU_HASH); - __type(key, flow_id); - __type(value, u8); + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, flow_id); + __type(value, u8); } flow_directions SEC(".maps"); +// To know who initiated each connection, we store the src/dst ip:ports but ordered +// by numeric value of the IP (and port as secondary criteria), so the key is consistent +// for either client and server flows. +typedef struct conn_initiator_key_t { + struct in6_addr low_ip; + struct in6_addr high_ip; + u16 low_ip_port; + u16 high_ip_port; +} __attribute__((packed)) conn_initiator_key; + +// Key: the flow identifier. +// Value: the connection initiator index (INITIATOR_LOW, INITIATOR_HIGH). +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, conn_initiator_key); + __type(value, u8); +} conn_initiators SEC(".maps"); + const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; // Constant definitions, to be overridden by the invoker volatile const u32 sampling = 0; volatile const u8 trace_messages = 0; +// we can safely assume that the passed address is IPv6 as long as we encode IPv4 +// as IPv6 during the creation of the flow_id. +static inline s32 compare_ipv6(flow_id *fid) { + for (int i = 0; i < 4; i++) { + s32 diff = fid->src_ip.in6_u.u6_addr32[i] - fid->dst_ip.in6_u.u6_addr32[i]; + if (diff != 0) { + return diff; + } + } + return 0; +} + +// creates a key that is consistent for both requests and responses, by +// ordering endpoints (ip:port) numerically into a lower and a higher endpoint. +// returns true if the lower address corresponds to the source address +// (false if the lower address corresponds to the destination address) +static inline u8 fill_conn_initiator_key(flow_id *id, conn_initiator_key *key) { + s32 cmp = compare_ipv6(id); + if (cmp < 0) { + __builtin_memcpy(&key->low_ip, &id->src_ip, sizeof(struct in6_addr)); + key->low_ip_port = id->src_port; + __builtin_memcpy(&key->high_ip, &id->dst_ip, sizeof(struct in6_addr)); + key->high_ip_port = id->dst_port; + return 1; + } + // if the IPs are equal (cmp == 0) we will use the ports as secondary order criteria + __builtin_memcpy(&key->high_ip, &id->src_ip, sizeof(struct in6_addr)); + __builtin_memcpy(&key->low_ip, &id->dst_ip, sizeof(struct in6_addr)); + if (cmp > 0 || id->src_port > id->dst_port) { + key->high_ip_port = id->src_port; + key->low_ip_port = id->dst_port; + return 0; + } + key->low_ip_port = id->src_port; + key->high_ip_port = id->dst_port; + return 1; +} + +// returns INITIATOR_SRC or INITIATOR_DST, but might return INITIATOR_UNKNOWN +// if the connection initiator couldn't be found. The user-space Beyla pipeline +// will handle this last case heuristically +static inline u8 get_connection_initiator(flow_id *id, u16 flags) { + conn_initiator_key initiator_key; + // from the initiator_key with sorted ip/ports, know the index of the + // endpoint that that initiated the connection, which might be the low or the high address + u8 low_is_src = fill_conn_initiator_key(id, &initiator_key); + u8 *initiator = (u8 *)bpf_map_lookup_elem(&conn_initiators, &initiator_key); + u8 initiator_index = INITIATOR_UNKNOWN; + if (initiator == NULL) { + // SYN and ACK is sent from the server to the client + // The initiator is the destination address + if ((flags & (SYN_FLAG | ACK_FLAG)) == (SYN_FLAG | ACK_FLAG)) { + if (low_is_src) { + initiator_index = INITIATOR_HIGH; + } else { + initiator_index = INITIATOR_LOW; + } + } + // SYN is sent from the client to the server. + // The initiator is the source address + else if (flags & SYN_FLAG) { + if (low_is_src) { + initiator_index = INITIATOR_LOW; + } else { + initiator_index = INITIATOR_HIGH; + } + } + + if (initiator_index != INITIATOR_UNKNOWN) { + bpf_map_update_elem(&conn_initiators, &initiator_key, &initiator_index, BPF_NOEXIST); + } + } else { + initiator_index = *initiator; + } + + // when flow receives FIN or RST, clean flow_directions + if (flags & FIN_FLAG || flags & RST_FLAG || flags & FIN_ACK_FLAG || flags & RST_ACK_FLAG) { + bpf_map_delete_elem(&conn_initiators, &initiator_key); + } + + u8 flow_initiator = INITIATOR_UNKNOWN; + // at this point, we should know the index of the endpoint that initiated the connection. + // Then we accordingly set whether the initiator is the source or the destination address. + // If not, we forward the unknown status and the userspace will take + // heuristic actions to guess who is + switch (initiator_index) { + case INITIATOR_LOW: + if (low_is_src) { + flow_initiator = INITIATOR_SRC; + } else { + flow_initiator = INITIATOR_DST; + } + break; + case INITIATOR_HIGH: + if (low_is_src) { + flow_initiator = INITIATOR_DST; + } else { + flow_initiator = INITIATOR_SRC; + } + break; + } + + return flow_initiator; +} #endif //__FLOW_HELPERS_H__ \ No newline at end of file diff --git a/bpf/flows_sock.c b/bpf/flows_sock.c index 31d0f4acb..6fdccdc1b 100644 --- a/bpf/flows_sock.c +++ b/bpf/flows_sock.c @@ -241,6 +241,8 @@ int socket__http_filter(struct __sk_buff *skb) { new_flow.direction = *direction; } + new_flow.initiator = get_connection_initiator(&id, flags); + // even if we know that the entry is new, another CPU might be concurrently inserting a flow // so we need to specify BPF_ANY long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); diff --git a/pkg/internal/ebpf/goredis/bpf_bpfel_arm64.o b/pkg/internal/ebpf/goredis/bpf_bpfel_arm64.o index e4b03f7d8..90d9cf8ea 100644 Binary files a/pkg/internal/ebpf/goredis/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/goredis/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/goredis/bpf_bpfel_x86.o b/pkg/internal/ebpf/goredis/bpf_bpfel_x86.o index 0412e3148..4fb7cf2ca 100644 Binary files a/pkg/internal/ebpf/goredis/bpf_bpfel_x86.o and b/pkg/internal/ebpf/goredis/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/goredis/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/goredis/bpf_debug_bpfel_arm64.o index f19c8e1b1..8bb18019b 100644 Binary files a/pkg/internal/ebpf/goredis/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/goredis/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/goredis/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/goredis/bpf_debug_bpfel_x86.o index 7e7b3f29d..1f4ee8e6f 100644 Binary files a/pkg/internal/ebpf/goredis/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/goredis/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/goruntime/bpf_bpfel_arm64.o b/pkg/internal/ebpf/goruntime/bpf_bpfel_arm64.o index 9aa83483d..c2c4cac93 100644 Binary files a/pkg/internal/ebpf/goruntime/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/goruntime/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/goruntime/bpf_bpfel_x86.o b/pkg/internal/ebpf/goruntime/bpf_bpfel_x86.o index 58b647d25..99380eb0d 100644 Binary files a/pkg/internal/ebpf/goruntime/bpf_bpfel_x86.o and b/pkg/internal/ebpf/goruntime/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_arm64.o index 63b51cc12..915d0346d 100644 Binary files a/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_x86.o index 8e323f9cd..1ae72f4c2 100644 Binary files a/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/goruntime/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_bpfel_arm64.o b/pkg/internal/ebpf/grpc/bpf_bpfel_arm64.o index b0088b9e0..030790b13 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/grpc/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_bpfel_x86.o b/pkg/internal/ebpf/grpc/bpf_bpfel_x86.o index 0bce887cf..6fabd4da2 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_bpfel_x86.o and b/pkg/internal/ebpf/grpc/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/grpc/bpf_debug_bpfel_arm64.o index afa82931c..d72b4bf00 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/grpc/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/grpc/bpf_debug_bpfel_x86.o index 3d51486aa..6eebf9d4e 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/grpc/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_tp_bpfel_arm64.o b/pkg/internal/ebpf/grpc/bpf_tp_bpfel_arm64.o index 7e8f1f714..9e9706afe 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_tp_bpfel_arm64.o and b/pkg/internal/ebpf/grpc/bpf_tp_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_tp_bpfel_x86.o b/pkg/internal/ebpf/grpc/bpf_tp_bpfel_x86.o index 89d1222f2..d6860a69d 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_tp_bpfel_x86.o and b/pkg/internal/ebpf/grpc/bpf_tp_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_arm64.o b/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_arm64.o index 3ceeafbb7..6797f3b1d 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_arm64.o and b/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_x86.o b/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_x86.o index 510180665..9e962e8d5 100644 Binary files a/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_x86.o and b/pkg/internal/ebpf/grpc/bpf_tp_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o index c669318d1..99e4e57c3 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o index 08ea0d0d5..69706208e 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o index bc6a0b1e8..7b0f09b86 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o index 19e59200f..814b4bd0b 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o index 3675f6ebb..767f59dd0 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o index daf72e974..6ac319cfa 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o index c5c190778..f9ca07fa4 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o index 69ba3c43a..8336a9806 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o index eb4d22735..f4251274c 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o index f604516d7..fb7ef0ad8 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o index fdaf56fc3..753be0985 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o index 98e4bd260..b8ba85cd5 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o index a4aeb25e5..32f5a6a6a 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o index bc9a4f902..9a12af143 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o index 26e9d4bd9..4caabc39c 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o index 62a74df48..598fee11d 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/kafkago/bpf_bpfel_arm64.o b/pkg/internal/ebpf/kafkago/bpf_bpfel_arm64.o index 2a3257e0f..872e30cd6 100644 Binary files a/pkg/internal/ebpf/kafkago/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/kafkago/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/kafkago/bpf_bpfel_x86.o b/pkg/internal/ebpf/kafkago/bpf_bpfel_x86.o index 013a332cc..751a9d7a9 100644 Binary files a/pkg/internal/ebpf/kafkago/bpf_bpfel_x86.o and b/pkg/internal/ebpf/kafkago/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_arm64.o index 4bbf4f4e8..41b1bc7c2 100644 Binary files a/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_x86.o index c4d709b79..6321c864a 100644 Binary files a/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/kafkago/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_bpfel_arm64.o b/pkg/internal/ebpf/nethttp/bpf_bpfel_arm64.o index fed524eee..2bc3a93ad 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/nethttp/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_bpfel_x86.o b/pkg/internal/ebpf/nethttp/bpf_bpfel_x86.o index 6c38674ef..3193fcbe5 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_bpfel_x86.o and b/pkg/internal/ebpf/nethttp/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_arm64.o index 97e99caf9..0f8ee3cf6 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_x86.o index 19a9fb674..80a68ae7e 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/nethttp/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_arm64.o b/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_arm64.o index 87ff6e8bc..f59841340 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_arm64.o and b/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_x86.o b/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_x86.o index 22b31db40..ed9524a97 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_x86.o and b/pkg/internal/ebpf/nethttp/bpf_tp_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_arm64.o b/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_arm64.o index 31eb71c8c..9ea26ca6d 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_arm64.o and b/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_x86.o b/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_x86.o index f9af4655c..1d52c2e91 100644 Binary files a/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_x86.o and b/pkg/internal/ebpf/nethttp/bpf_tp_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/nodejs/bpf_bpfel_arm64.o b/pkg/internal/ebpf/nodejs/bpf_bpfel_arm64.o index 36fd902ee..d6876db2c 100644 Binary files a/pkg/internal/ebpf/nodejs/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/nodejs/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/nodejs/bpf_bpfel_x86.o b/pkg/internal/ebpf/nodejs/bpf_bpfel_x86.o index d461bcd32..6638478ba 100644 Binary files a/pkg/internal/ebpf/nodejs/bpf_bpfel_x86.o and b/pkg/internal/ebpf/nodejs/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_arm64.o index 506813940..2dcf0c075 100644 Binary files a/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_x86.o index 5866757a7..e9bac526a 100644 Binary files a/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/nodejs/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/sarama/bpf_bpfel_arm64.o b/pkg/internal/ebpf/sarama/bpf_bpfel_arm64.o index de95d9dfc..1a71bdf56 100644 Binary files a/pkg/internal/ebpf/sarama/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/sarama/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/sarama/bpf_bpfel_x86.o b/pkg/internal/ebpf/sarama/bpf_bpfel_x86.o index 4926978c8..7e281ab4a 100644 Binary files a/pkg/internal/ebpf/sarama/bpf_bpfel_x86.o and b/pkg/internal/ebpf/sarama/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/sarama/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/sarama/bpf_debug_bpfel_arm64.o index 66299b005..4a9c67dc1 100644 Binary files a/pkg/internal/ebpf/sarama/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/sarama/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/sarama/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/sarama/bpf_debug_bpfel_x86.o index d13aa40c3..1ad76d778 100644 Binary files a/pkg/internal/ebpf/sarama/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/sarama/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/watcher/bpf_bpfel_arm64.o b/pkg/internal/ebpf/watcher/bpf_bpfel_arm64.o index 30f6586d1..d9d6511b4 100644 Binary files a/pkg/internal/ebpf/watcher/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/watcher/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/watcher/bpf_bpfel_x86.o b/pkg/internal/ebpf/watcher/bpf_bpfel_x86.o index 6627abbd8..a25519472 100644 Binary files a/pkg/internal/ebpf/watcher/bpf_bpfel_x86.o and b/pkg/internal/ebpf/watcher/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/watcher/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/watcher/bpf_debug_bpfel_arm64.o index 1a0c46364..7d56d5f22 100644 Binary files a/pkg/internal/ebpf/watcher/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/watcher/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/watcher/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/watcher/bpf_debug_bpfel_x86.o index b0c26a4d2..cc4783675 100644 Binary files a/pkg/internal/ebpf/watcher/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/watcher/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/export/attributes/attr_defs.go b/pkg/internal/export/attributes/attr_defs.go index 09182110b..7368b7ef3 100644 --- a/pkg/internal/export/attributes/attr_defs.go +++ b/pkg/internal/export/attributes/attr_defs.go @@ -204,6 +204,8 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { attr.DstPort: false, attr.SrcName: false, attr.DstName: false, + attr.ServerPort: false, + attr.ClientPort: false, attr.Direction: Default(ifaceDirEnabled), attr.Iface: Default(ifaceDirEnabled), }, diff --git a/pkg/internal/export/attributes/attr_selector_test.go b/pkg/internal/export/attributes/attr_selector_test.go index 66852321a..2e04091fe 100644 --- a/pkg/internal/export/attributes/attr_selector_test.go +++ b/pkg/internal/export/attributes/attr_selector_test.go @@ -108,7 +108,9 @@ func TestFor_GlobEntries_Order(t *testing.T) { require.NoError(t, err) assert.Equal(t, []attr.Name{ "beyla.ip", + "client.port", "dst.name", + "server.port", "src.address", "src.name", "src.port", diff --git a/pkg/internal/export/attributes/names/attrs.go b/pkg/internal/export/attributes/names/attrs.go index ec03e5095..d58afd9c8 100644 --- a/pkg/internal/export/attributes/names/attrs.go +++ b/pkg/internal/export/attributes/names/attrs.go @@ -81,6 +81,8 @@ const ( SrcCIDR = Name("src.cidr") DstCIDR = Name("dst.cidr") + ClientPort = Name("client.port") + K8sSrcOwnerName = Name("k8s.src.owner.name") K8sSrcNamespace = Name("k8s.src.namespace") K8sDstOwnerName = Name("k8s.dst.owner.name") diff --git a/pkg/internal/netolly/ebpf/net_bpfel_arm64.go b/pkg/internal/netolly/ebpf/net_bpfel_arm64.go index 4cc37f16e..7ede6d0d6 100644 --- a/pkg/internal/netolly/ebpf/net_bpfel_arm64.go +++ b/pkg/internal/netolly/ebpf/net_bpfel_arm64.go @@ -12,6 +12,13 @@ import ( "github.com/cilium/ebpf" ) +type NetConnInitiatorKey struct { + LowIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + HighIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + LowIpPort uint16 + HighIpPort uint16 +} + type NetFlowId NetFlowIdT type NetFlowIdT struct { @@ -33,6 +40,7 @@ type NetFlowMetricsT struct { EndMonoTimeNs uint64 Flags uint16 Direction uint8 + Initiator uint8 Errno uint8 } @@ -91,6 +99,7 @@ type NetProgramSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type NetMapSpecs struct { AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.MapSpec `ebpf:"conn_initiators"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` FlowDirections *ebpf.MapSpec `ebpf:"flow_directions"` } @@ -115,6 +124,7 @@ func (o *NetObjects) Close() error { // It can be passed to LoadNetObjects or ebpf.CollectionSpec.LoadAndAssign. type NetMaps struct { AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.Map `ebpf:"conn_initiators"` DirectFlows *ebpf.Map `ebpf:"direct_flows"` FlowDirections *ebpf.Map `ebpf:"flow_directions"` } @@ -122,6 +132,7 @@ type NetMaps struct { func (m *NetMaps) Close() error { return _NetClose( m.AggregatedFlows, + m.ConnInitiators, m.DirectFlows, m.FlowDirections, ) diff --git a/pkg/internal/netolly/ebpf/net_bpfel_arm64.o b/pkg/internal/netolly/ebpf/net_bpfel_arm64.o index e9142547d..6d280fa69 100644 Binary files a/pkg/internal/netolly/ebpf/net_bpfel_arm64.o and b/pkg/internal/netolly/ebpf/net_bpfel_arm64.o differ diff --git a/pkg/internal/netolly/ebpf/net_bpfel_x86.go b/pkg/internal/netolly/ebpf/net_bpfel_x86.go index 4f1ff12c6..093b4e82c 100644 --- a/pkg/internal/netolly/ebpf/net_bpfel_x86.go +++ b/pkg/internal/netolly/ebpf/net_bpfel_x86.go @@ -12,6 +12,13 @@ import ( "github.com/cilium/ebpf" ) +type NetConnInitiatorKey struct { + LowIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + HighIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + LowIpPort uint16 + HighIpPort uint16 +} + type NetFlowId NetFlowIdT type NetFlowIdT struct { @@ -33,6 +40,7 @@ type NetFlowMetricsT struct { EndMonoTimeNs uint64 Flags uint16 Direction uint8 + Initiator uint8 Errno uint8 } @@ -91,6 +99,7 @@ type NetProgramSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type NetMapSpecs struct { AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.MapSpec `ebpf:"conn_initiators"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` FlowDirections *ebpf.MapSpec `ebpf:"flow_directions"` } @@ -115,6 +124,7 @@ func (o *NetObjects) Close() error { // It can be passed to LoadNetObjects or ebpf.CollectionSpec.LoadAndAssign. type NetMaps struct { AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.Map `ebpf:"conn_initiators"` DirectFlows *ebpf.Map `ebpf:"direct_flows"` FlowDirections *ebpf.Map `ebpf:"flow_directions"` } @@ -122,6 +132,7 @@ type NetMaps struct { func (m *NetMaps) Close() error { return _NetClose( m.AggregatedFlows, + m.ConnInitiators, m.DirectFlows, m.FlowDirections, ) diff --git a/pkg/internal/netolly/ebpf/net_bpfel_x86.o b/pkg/internal/netolly/ebpf/net_bpfel_x86.o index 840ce9b68..b0c3d53df 100644 Binary files a/pkg/internal/netolly/ebpf/net_bpfel_x86.o and b/pkg/internal/netolly/ebpf/net_bpfel_x86.o differ diff --git a/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.go b/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.go index 78e210d38..a790a4d31 100644 --- a/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.go +++ b/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.go @@ -12,6 +12,13 @@ import ( "github.com/cilium/ebpf" ) +type NetSkConnInitiatorKey struct { + LowIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + HighIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + LowIpPort uint16 + HighIpPort uint16 +} + type NetSkFlowId NetSkFlowIdT type NetSkFlowIdT struct { @@ -33,6 +40,7 @@ type NetSkFlowMetricsT struct { EndMonoTimeNs uint64 Flags uint16 Direction uint8 + Initiator uint8 Errno uint8 } @@ -90,6 +98,7 @@ type NetSkProgramSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type NetSkMapSpecs struct { AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.MapSpec `ebpf:"conn_initiators"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` FlowDirections *ebpf.MapSpec `ebpf:"flow_directions"` } @@ -114,6 +123,7 @@ func (o *NetSkObjects) Close() error { // It can be passed to LoadNetSkObjects or ebpf.CollectionSpec.LoadAndAssign. type NetSkMaps struct { AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.Map `ebpf:"conn_initiators"` DirectFlows *ebpf.Map `ebpf:"direct_flows"` FlowDirections *ebpf.Map `ebpf:"flow_directions"` } @@ -121,6 +131,7 @@ type NetSkMaps struct { func (m *NetSkMaps) Close() error { return _NetSkClose( m.AggregatedFlows, + m.ConnInitiators, m.DirectFlows, m.FlowDirections, ) diff --git a/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.o b/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.o index 4f313a47a..497328871 100644 Binary files a/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.o and b/pkg/internal/netolly/ebpf/netsk_bpfel_arm64.o differ diff --git a/pkg/internal/netolly/ebpf/netsk_bpfel_x86.go b/pkg/internal/netolly/ebpf/netsk_bpfel_x86.go index 2886a268c..c1b5493e5 100644 --- a/pkg/internal/netolly/ebpf/netsk_bpfel_x86.go +++ b/pkg/internal/netolly/ebpf/netsk_bpfel_x86.go @@ -12,6 +12,13 @@ import ( "github.com/cilium/ebpf" ) +type NetSkConnInitiatorKey struct { + LowIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + HighIp struct{ In6U struct{ U6Addr8 [16]uint8 } } + LowIpPort uint16 + HighIpPort uint16 +} + type NetSkFlowId NetSkFlowIdT type NetSkFlowIdT struct { @@ -33,6 +40,7 @@ type NetSkFlowMetricsT struct { EndMonoTimeNs uint64 Flags uint16 Direction uint8 + Initiator uint8 Errno uint8 } @@ -90,6 +98,7 @@ type NetSkProgramSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type NetSkMapSpecs struct { AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.MapSpec `ebpf:"conn_initiators"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` FlowDirections *ebpf.MapSpec `ebpf:"flow_directions"` } @@ -114,6 +123,7 @@ func (o *NetSkObjects) Close() error { // It can be passed to LoadNetSkObjects or ebpf.CollectionSpec.LoadAndAssign. type NetSkMaps struct { AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` + ConnInitiators *ebpf.Map `ebpf:"conn_initiators"` DirectFlows *ebpf.Map `ebpf:"direct_flows"` FlowDirections *ebpf.Map `ebpf:"flow_directions"` } @@ -121,6 +131,7 @@ type NetSkMaps struct { func (m *NetSkMaps) Close() error { return _NetSkClose( m.AggregatedFlows, + m.ConnInitiators, m.DirectFlows, m.FlowDirections, ) diff --git a/pkg/internal/netolly/ebpf/netsk_bpfel_x86.o b/pkg/internal/netolly/ebpf/netsk_bpfel_x86.o index 044aefd29..eee3287b1 100644 Binary files a/pkg/internal/netolly/ebpf/netsk_bpfel_x86.o and b/pkg/internal/netolly/ebpf/netsk_bpfel_x86.o differ diff --git a/pkg/internal/netolly/ebpf/record.go b/pkg/internal/netolly/ebpf/record.go index f92408268..211095086 100644 --- a/pkg/internal/netolly/ebpf/record.go +++ b/pkg/internal/netolly/ebpf/record.go @@ -75,6 +75,7 @@ func (fm *NetFlowMetrics) Accumulate(src *NetFlowMetrics) { fm.StartMonoTimeNs = src.StartMonoTimeNs // set Direction here, because the correct value is in the first packet only fm.Direction = src.Direction + fm.Initiator = src.Initiator } if fm.EndMonoTimeNs == 0 || fm.EndMonoTimeNs < src.EndMonoTimeNs { fm.EndMonoTimeNs = src.EndMonoTimeNs diff --git a/pkg/internal/netolly/ebpf/record_getters.go b/pkg/internal/netolly/ebpf/record_getters.go index 5703f732c..36b8106e0 100644 --- a/pkg/internal/netolly/ebpf/record_getters.go +++ b/pkg/internal/netolly/ebpf/record_getters.go @@ -10,6 +10,7 @@ import ( // RecordGetters returns the attributes.Getter function that returns the string value of a given // attribute name. +// nolint:cyclop func RecordGetters(name attr.Name) (attributes.Getter[*Record, attribute.KeyValue], bool) { var getter attributes.Getter[*Record, attribute.KeyValue] switch name { @@ -41,6 +42,34 @@ func RecordGetters(name attr.Name) (attributes.Getter[*Record, attribute.KeyValu } case attr.Iface: getter = func(r *Record) attribute.KeyValue { return attribute.String(string(attr.Iface), r.Attrs.Interface) } + case attr.ClientPort: + getter = func(r *Record) attribute.KeyValue { + var clientPort uint16 + switch r.Metrics.Initiator { + case InitiatorDst: + clientPort = r.Id.DstPort + case InitiatorSrc: + clientPort = r.Id.SrcPort + default: + // guess it, assuming that ephemeral ports for clients would be usually higher + clientPort = max(r.Id.DstPort, r.Id.SrcPort) + } + return attribute.Int(string(attr.ClientPort), int(clientPort)) + } + case attr.ServerPort: + getter = func(r *Record) attribute.KeyValue { + var serverPort uint16 + switch r.Metrics.Initiator { + case InitiatorDst: + serverPort = r.Id.SrcPort + case InitiatorSrc: + serverPort = r.Id.DstPort + default: + // guess it, assuming that ephemeral ports for clients would be usually higher + serverPort = min(r.Id.DstPort, r.Id.SrcPort) + } + return attribute.Int(string(attr.ServerPort), int(serverPort)) + } default: getter = func(r *Record) attribute.KeyValue { return attribute.String(string(name), r.Attrs.Metadata[name]) } } diff --git a/pkg/internal/netolly/ebpf/sock_tracer.go b/pkg/internal/netolly/ebpf/sock_tracer.go index fe9c52544..2899f26ac 100644 --- a/pkg/internal/netolly/ebpf/sock_tracer.go +++ b/pkg/internal/netolly/ebpf/sock_tracer.go @@ -66,6 +66,7 @@ func NewSockFlowFetcher( // Resize aggregated flows and flow directions maps according to user-provided configuration spec.Maps[aggregatedFlowsMap].MaxEntries = uint32(cacheMaxSize) spec.Maps[flowDirectionsMap].MaxEntries = uint32(cacheMaxSize) + spec.Maps[connInitiatorsMap].MaxEntries = uint32(cacheMaxSize) traceMsgs := 0 if tlog.Enabled(context.TODO(), slog.LevelDebug) { diff --git a/pkg/internal/netolly/ebpf/tracer.go b/pkg/internal/netolly/ebpf/tracer.go index aea472047..31050be60 100644 --- a/pkg/internal/netolly/ebpf/tracer.go +++ b/pkg/internal/netolly/ebpf/tracer.go @@ -45,6 +45,7 @@ const ( constSampling = "sampling" constTraceMessages = "trace_messages" aggregatedFlowsMap = "aggregated_flows" + connInitiatorsMap = "conn_initiators" flowDirectionsMap = "flow_directions" ) @@ -86,6 +87,7 @@ func NewFlowFetcher( // Resize aggregated flows and flow directions maps according to user-provided configuration spec.Maps[aggregatedFlowsMap].MaxEntries = uint32(cacheMaxSize) spec.Maps[flowDirectionsMap].MaxEntries = uint32(cacheMaxSize) + spec.Maps[connInitiatorsMap].MaxEntries = uint32(cacheMaxSize) traceMsgs := 0 if tlog.Enabled(context.TODO(), slog.LevelDebug) { diff --git a/pkg/internal/netolly/ebpf/tracer_common.go b/pkg/internal/netolly/ebpf/tracer_common.go index fe5e82405..6bb09a3f7 100644 --- a/pkg/internal/netolly/ebpf/tracer_common.go +++ b/pkg/internal/netolly/ebpf/tracer_common.go @@ -7,5 +7,9 @@ const ( DirectionIngress = 0 DirectionEgress = 1 + // InitiatorSrc and InitiatorDst values set accordingly to flows_common.h definition + InitiatorSrc = 1 + InitiatorDst = 2 + InterfaceUnset = 0xFFFFFFFF ) diff --git a/test/integration/configs/instrumenter-config-netolly-direction.yml b/test/integration/configs/instrumenter-config-netolly-direction.yml index e35249900..be4899a3f 100644 --- a/test/integration/configs/instrumenter-config-netolly-direction.yml +++ b/test/integration/configs/instrumenter-config-netolly-direction.yml @@ -15,3 +15,5 @@ attributes: - dst.cidr - iface - direction + - client.port + - server.port diff --git a/test/integration/k8s/manifests/06-beyla-netolly-sk-promexport.yml b/test/integration/k8s/manifests/06-beyla-netolly-tc-promexport.yml similarity index 98% rename from test/integration/k8s/manifests/06-beyla-netolly-sk-promexport.yml rename to test/integration/k8s/manifests/06-beyla-netolly-tc-promexport.yml index 789c6ccd3..82240346c 100644 --- a/test/integration/k8s/manifests/06-beyla-netolly-sk-promexport.yml +++ b/test/integration/k8s/manifests/06-beyla-netolly-tc-promexport.yml @@ -89,7 +89,7 @@ spec: - name: BEYLA_NETWORK_METRICS value: "true" - name: BEYLA_NETWORK_SOURCE - value: "socket_filter" + value: "tc" - name: BEYLA_NETWORK_CACHE_ACTIVE_TIMEOUT value: "100ms" - name: BEYLA_NETWORK_CACHE_MAX_FLOWS diff --git a/test/integration/k8s/netolly/k8s_netolly_network_metrics.go b/test/integration/k8s/netolly/k8s_netolly_network_metrics.go index 0af0c44d7..57d93cec5 100644 --- a/test/integration/k8s/netolly/k8s_netolly_network_metrics.go +++ b/test/integration/k8s/netolly/k8s_netolly_network_metrics.go @@ -56,6 +56,8 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assert.Equal(t, "Service", metric["k8s_dst_type"]) assert.Contains(t, podSubnets, metric["src_cidr"], metric) assert.Contains(t, svcSubnets, metric["dst_cidr"], metric) + assert.Equal(t, "8080", metric["server_port"]) + assert.NotEqual(t, "8080", metric["client_port"]) // services don't have host IP or name }) // testing request flows (to testserver as Pod) @@ -89,6 +91,8 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assertIsIP(t, metric["k8s_dst_node_ip"]) assert.Contains(t, podSubnets, metric["src_cidr"], metric) assert.Contains(t, podSubnets, metric["dst_cidr"], metric) + assert.Equal(t, "8080", metric["server_port"]) + assert.NotEqual(t, "8080", metric["client_port"]) }) // testing response flows (from testserver Pod) @@ -122,6 +126,8 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assert.Contains(t, podSubnets, metric["src_cidr"], metric) assert.Contains(t, podSubnets, metric["dst_cidr"], metric) assert.Equal(t, "TCP", metric["transport"]) + assert.Equal(t, "8080", metric["server_port"]) + assert.NotEqual(t, "8080", metric["client_port"]) }) // testing response flows (from testserver Service) @@ -151,6 +157,8 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assertIsIP(t, metric["k8s_dst_node_ip"]) assert.Contains(t, svcSubnets, metric["src_cidr"], metric) assert.Contains(t, podSubnets, metric["dst_cidr"], metric) + assert.Equal(t, "8080", metric["server_port"]) + assert.NotEqual(t, "8080", metric["client_port"]) }) // check that there aren't captured flows if there is no communication diff --git a/test/integration/k8s/netolly_sk_promexport/k8s_netolly_prom_main_test.go b/test/integration/k8s/netolly_tc_promexport/k8s_netolly_prom_main_test.go similarity index 97% rename from test/integration/k8s/netolly_sk_promexport/k8s_netolly_prom_main_test.go rename to test/integration/k8s/netolly_tc_promexport/k8s_netolly_prom_main_test.go index 1347633d0..c48fa2858 100644 --- a/test/integration/k8s/netolly_sk_promexport/k8s_netolly_prom_main_test.go +++ b/test/integration/k8s/netolly_tc_promexport/k8s_netolly_prom_main_test.go @@ -42,7 +42,7 @@ func TestMain(m *testing.M) { kube.Deploy(k8s.PathManifests+"/01-serviceaccount.yml"), kube.Deploy(k8s.PathManifests+"/02-prometheus-promscrape.yml"), kube.Deploy(k8s.PathManifests+"/05-uninstrumented-service.yml"), - kube.Deploy(k8s.PathManifests+"/06-beyla-netolly-sk-promexport.yml"), + kube.Deploy(k8s.PathManifests+"/06-beyla-netolly-tc-promexport.yml"), ) cluster.Run(m) diff --git a/test/integration/suites_network_test.go b/test/integration/suites_network_test.go index 15e96d900..439193789 100644 --- a/test/integration/suites_network_test.go +++ b/test/integration/suites_network_test.go @@ -94,7 +94,7 @@ func TestNetwork_AllowedAttributes(t *testing.T) { func TestNetwork_Direction(t *testing.T) { compose, err := docker.ComposeSuite("docker-compose-netolly-direction.yml", path.Join(pathOutput, "test-suite-netolly-direction.log")) - compose.Env = append(compose.Env, "BEYLA_NETWORK_DEDUPER=first_come", "BEYLA_EXECUTABLE_NAME=", `BEYLA_CONFIG_SUFFIX=-direction`) + compose.Env = append(compose.Env, "BEYLA_NETWORK_DEDUPER=first_come", "BEYLA_NETWORK_SOURCE=tc", "BEYLA_EXECUTABLE_NAME=", `BEYLA_CONFIG_SUFFIX=-direction`) require.NoError(t, err) require.NoError(t, compose.Up()) @@ -103,11 +103,16 @@ func TestNetwork_Direction(t *testing.T) { require.Contains(t, f.Metric, "direction") } - // test correct direction labels + // test correct direction labels and client/server ports client := results[slices.IndexFunc(results, func(result prom.Result) bool { return result.Metric["dst_port"] == "8080" })] - require.Equal(t, client.Metric["direction"], "egress") + require.Equal(t, "egress", client.Metric["direction"]) + require.Equal(t, "7000", client.Metric["client_port"]) + require.Equal(t, "8080", client.Metric["server_port"]) + server := results[slices.IndexFunc(results, func(result prom.Result) bool { return result.Metric["src_port"] == "8080" })] - require.Equal(t, server.Metric["direction"], "ingress") + require.Equal(t, "ingress", server.Metric["direction"], "ingress") + require.Equal(t, "7000", client.Metric["client_port"]) + require.Equal(t, "8080", client.Metric["server_port"]) require.NoError(t, compose.Close()) } @@ -123,11 +128,17 @@ func TestNetwork_Direction_Use_Socket_Filter(t *testing.T) { require.Contains(t, f.Metric, "direction") } - // test correct direction labels + // test correct direction labels and client/server ports client := results[slices.IndexFunc(results, func(result prom.Result) bool { return result.Metric["dst_port"] == "8080" })] - require.Equal(t, client.Metric["direction"], "egress") + require.Equal(t, "egress", client.Metric["direction"]) + require.Equal(t, "7000", client.Metric["client_port"]) + require.Equal(t, "8080", client.Metric["server_port"]) + server := results[slices.IndexFunc(results, func(result prom.Result) bool { return result.Metric["src_port"] == "8080" })] require.Equal(t, server.Metric["direction"], "ingress") + require.Equal(t, "ingress", server.Metric["direction"], "ingress") + require.Equal(t, "7000", client.Metric["client_port"]) + require.Equal(t, "8080", client.Metric["server_port"]) require.NoError(t, compose.Close()) }