From 1c08399c2a71c4221ca98057f902716e6451370a Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 11 Oct 2022 14:48:10 +0200 Subject: [PATCH 1/5] NETOBSERV-613: decrease premature eviction of eBPF hashmap --- bpf/flow.h | 5 ++++ bpf/flows.c | 31 +++++++++++++++++++----- pkg/agent/agent.go | 6 +++++ pkg/ebpf/bpf_bpfeb.o | Bin 15568 -> 17424 bytes pkg/ebpf/bpf_bpfel.o | Bin 15664 -> 17520 bytes pkg/ebpf/tracer.go | 56 ++++++++++++++++++++++++++++++++----------- pkg/flow/record.go | 2 ++ 7 files changed, 80 insertions(+), 20 deletions(-) diff --git a/bpf/flow.h b/bpf/flow.h index 87ed92f9c..1248838bf 100644 --- a/bpf/flow.h +++ b/bpf/flow.h @@ -17,6 +17,11 @@ typedef struct flow_metrics_t { // as output from bpf_ktime_get_ns() u64 start_mono_time_ts; u64 end_mono_time_ts; + // The positive errno of a failed map insertion that caused a flow + // to be sent via ringbuffer. + // 0 otherwise + // https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md + u8 errno; } __attribute__((packed)) flow_metrics; // Attributes that uniquely identify a flow diff --git a/bpf/flows.c b/bpf/flows.c index 7d04c1e59..790f24b8a 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -58,6 +58,7 @@ struct { // Constant definitions, to be overridden by the invoker volatile const u32 sampling = 0; +volatile const u8 trace_messages = 0; const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; @@ -184,7 +185,15 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { aggregate_flow->start_mono_time_ts = current_time; } - bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_EXIST); + long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); + if (trace_messages && ret != 0) { + // usually error -16 (-EBUSY) is printed here. + // In this case, the flow is dropped, as submitting it to the ringbuffer would cause + // a duplicated UNION of flows (two different flows with partial aggregation of the same packets), + // which can't be deduplicated. + // other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md + bpf_printk("error updating flow %d\n", ret); + } } else { // Key does not exist in the map, and will need to create a new entry. flow_metrics new_flow = { @@ -196,13 +205,23 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { // even if we know that the entry is new, another CPU might be concurrently inserting a flow // so we need to specify BPF_ANY - if (bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY) != 0) { - /* - When the map is full, we directly send the flow entry to userspace via ringbuffer, - until space is available in the kernel-side maps - */ + long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); + if (ret != 0) { + // usually error -16 (-EBUSY) or -7 (E2BIG) is printed here. + // In this case, we send the single-packet flow via ringbuffer as in the worst case we can have + // a repeated INTERSECTION of flows (different flows aggregating different packets), + // which can be re-aggregated at userpace. + // other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md + if (trace_messages) { + bpf_printk("error adding flow %d\n", ret); + } + + new_flow.errno = -ret; flow_record *record = bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0); if (!record) { + if (trace_messages) { + bpf_printk("couldn't reserve space in the ringbuf. Dropping flow"); + } return TC_ACT_OK; } record->id = id; diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index bfd8270c6..152307d64 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -132,7 +132,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) { ingress, egress := flowDirections(cfg) + debug := false + if cfg.LogLevel == logrus.TraceLevel.String() || cfg.LogLevel == logrus.DebugLevel.String() { + debug = true + } + tracer, err := ebpf.NewFlowTracer( + debug, cfg.Sampling, cfg.CacheMaxFlows, cfg.BuffersLength, cfg.CacheActiveTimeout, ingress, egress, interfaceNamer, diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 67851f3d3723665327b91bdd118dc61d8fb40306..ee6465770f750f986efe7ab54426cac2f1b26477 100644 GIT binary patch delta 5623 zcmbVQYiv}<6+UzCzF6-%=CO;xJTA7e4F$7n9ySnQF-{XTP9ZHsG=bOz+cBxJH}+Bt zA>CDys)R&}$)ysN_+cd4(t;F-i9l*3EY(d>m9|zv3hJL&ZBkG}RHQ~Nm5SW&+MgM?w|8-Sh);#3(Isf&P7Qx{G2Zs+>`b)FN)hPpm|jE!I-$)r8aP{sMkdW%l{V zE??j2@e$h>ZECyP>UzXlm7mdrGm_W7#EqKB6u-L0)kx)mAGW%@@25!A94*_u<8pp2 zukIk4#y($>@vg_d$?b0<<7%A=WqmeuBznUad2#Oe2z&J@+CJ1gr9d4qA)b!sP1rw6*1F_x2p9h_c+lno-x5bLX!5c*U6z?4Bj13unp0WA1Rl2GYfM3oSk;;Vt zAQix`35H!Av%5;G=bVP62u}){pP*fcCkK4aK3`gvecwmKFMb%B573t3fvDf&hp`2G zU2qEcrr=iKzY5L)|6TAP@P7mk6B%4E_cI2}OAlC~G3K~9t1J+H_iRW3Ck3|xuLFin z<$yQ1dcy*46r2KnLU1eaw*=>apAkF=yjSor@C$;+fDh1mC$R~gk;TtaBGUpsCU_9| z1Tc#8u^!yPDd2N&&z(KzJt!(*9ERe3*T^F_er+dU{k>>%(0l|ugUCM!{1>}*?xWc| z&@98_h~YMmbAujuj_@Xpf1<5r9gFxhGmgqULi=yfYP9zwu5N|c*&UCz(^KGBGUik8 zgVXAO`dC!jc7RzPxNDn<4x(!%s%9a+3fmLDPnHBF;7z?JF!gR7>DD1 z>AcDHn$izLE}kWRUw8eAzh;k>m1m2S4-o|djNjs|4jy`P-pk65!=<}t=y&tq4TZaUaO(87 zXRHnO_p3HnZ~o?YmaHdBWM79>lj+3?)3U$4uW!%u?R{2N6?@xHR>zh;m+ojuodt7lHn&m{e3znpc6_0mL5y0$N8&p#}6ZqINyw7dOa z3m>wz&Ky`9tmQ~MSqD7q_Ugk~cKzS#U{7x?|IZO!UQlGOeZM+r->Uw3asKSvZ`H&b zx%!sB~6aJ4Z8E^7dI_fHXi^&*R3_c)yD;cMF%MA)Yj4w_M z{DOybhgXE3BV!&$su05k`#vSZV&3m^{~|IfVN)lCXIX)LtR#W|+hi<5he{IoUm_Km z1wJeR%-KFF{1>F0gk!?*ka7~<%eNbk<=YJ@ARDM9Rf!6X+pa_4D5)B7vqAWuNi8b} zPPsf7CAA9Q9CMrS3Q`H?tu7C+|2*JMv9Bf7!2R_L-$AMgA8ON`@hk;dfiI6K74e8q zV8rHGH^AW-u_-eWe1p^(%$QH}pOHEX?$`WRq|T#Y|F9d+1z+NiWdK?ipw;UO|El)_&M4*i%69x)1Y7Ga3(_;!Tg!;4Z& z$Rf(JB6OLIkxH};uEVq-8P_nPP)hg`GCo`cJ|G;sVss%s-Uaq)h4w-5$F3Qh?|%3> z-!zV|1`kozsmM{F!*GcNTqNTL?oHvF1s&YZFQ7;#lmtbi9)HYWk&gcx8J{HZA(a7! zF`&CxW06k0f37I}3K?G@Ad>TNtci#u2%jf2!uD%o|CG#mn30GiEDDlYiB}krgkiBq zWrm6>v4M~<+Udp@^^?hqKAKI7Vt~wDct_Er1zZX!dY8;rwqFtZ=VW#*03Y^n6daWT zBPC>}E5Pr%_DDCG8IHdv{B`U|9Aw$zl6*yRsqlBmJOYPe89@{yd~+_*O!1FmL<4wb zv7GB4Z6@;sMie_K{2+EPz9Y`}!XQU@rlUV1^V;OMA>bI@$Hr@c=HV_Hmm#;-V;R)CzTAqF~@y|}iX88bo05h|var($R*WqLV z9YbNdMr~le4tjkVE)fIQ-tF<%aa`JRkH5I{6@vKGLlMqf)Ot>=W_voq(e4N@!VjR= zT+|Z~4NOJDe-JbiikUw}WUJ+zkN0&M5T9R^~qp zbwb{{UVq-H-u|6Gg*aE(-kFSNgWCZ)TqZ8q3e5Fu-Ve-OV8U2}ui=Nw(VWY{p*_J* z0rOuR+J3fxU&`Y_3VsO10{c>Og^CS*l6=OWy#fUnC>8j5fL?yHfZs0Q_wqS^kiGHF zhW7LqsL8i)4>aT}0*@8&odulV5Y9kda3H7of{KNU7MEMoXU6Pj>q}=&aqYG<^;@EO z*LM58`qz!R2D_nQeot}@H8(aUTk2Lz>ozOvgi4#Nd%)FQ+>>sbe83G~n-li&hDJLT z^V`=OD#~Mn7=>4O8ntW%PkPV$>(>^!f__)02vn zeF2JFS`n__2FNmC-&?=th59jvP=M9AW;q*|Ro?#Xp7Ca9dU9;ZlZ^?X@*gP^MqMW> W{(y&1<#_qVb*_W5-FD)4!u`Lh)Snap delta 3817 zcmaJ@eP~=&7C-mCc{49FFKslRZIX6OT20cJ&NMNJqU}qn{g8+wt-F=2YhYV9y9f=t z71E_TL;cwPA?}omAZ}q(ZK0xI>l9m1alnH8LnXQkTiD136q~YLnq6hLEDPP=nS0*k zB?}&SbAP{c&OPVcbMJj`ibvJVp<pDkT(Drq-{xoxJnYM{llQ96m#vx3(FKX2w& z-I4!2j43<;G=IRj7Eb~A)CvH|dOV7O<{zRi(Dx9#1Go%~hg$JK)J6O-P6K}?H~{`q z@Hp_C-~#YfqI_k-G|0ciPzJs!cn)ck7IO`{UT_*1%i*~y0Nx;Y9C!e@8~Ot9kk#un z@V$Zq;O&CPkzbDqDZuck;A!B;1($*M37!K!V6lTiAHq)$k&^~~Uhp*V%fKiu#`@O; z2f(KUk7H!nkb_eJhIc@)qF5RDJzz|BG3bv3r!lhJ34H+lC1Ca`0Dl2|EATY%S7x?# zd;VK!dht|5Hg1FoVsayK3iJZTKGv~_&nV+s>jRbk4)hL;|MsnJquAL!Vy7R0--0&e z;0IYvSswmBQ0gD%gSHOs_nW!4X0x`vu6-DOI8B6!{=IVK?gmGd{#G;A-fA|sdyVk$ zcZikuk*$vyZ%s$s|M5zcVlr!5yE()YV7x$<`|QBs{hud<06$$ix#ox}^`_5yX1*(M zAK0^V*KYH2_k*!tJelr-s@JUSsW%UIH=0X5hs^8-&wR4sO_T3!G=J{hns;A98_N<7 zjdb{chBZ9--LX8P(sd!vrgu46Cn<6wbM<`1ow_@A;(@-V(qH9S^du_uMaX0 z9p5B6g}n7lrR6{b~#NwD5zZeH21xBAf$W5Pphu3!h5N;R0iSCEdn+%5t}rbQ(7Gs_;B3 znxK#b?n9)z5Ku`1_i0jI19({`FlYOW@RP{J3Fd@Cn5^)er~JkI`Yz!PHcCpEzFri4F?cibvq z+wnXlSb@9cNJYHjB3A62wi7BGD|VzKi5Ji;bSxI~6QoXpyCMIO)M*sVhnly53qHf| zlNsSC_-*EM%=2*I38>hW4kWISI){qg8!`a}mQiusi*OViPh0M3(m52E$O*rX)L!@} zq$3`4_$MZ;okSA6;5;g1hgqH=E3R;!AS9@~~n)8da? zGuuwYuNKZ%!}IYXPI~xYRTT)BtP_W~NYCIyl)Stwz z=Q4PC$pVvD&^0X3pThog0ZFvz23F`7gug-#VN3j@%j{A>@?CP;kf7ERd&p%0sW>?q zd;n`DVQL>a1K5IE=~xQu4p!qk_h>lbI63!YMGccSVCo_{JCLA33P@ciXD?p6mDjIE zA0b@0UrWy6$g8%(UHpscFz5O=<{yJAW<{l;2vzhYKNYe56IlE?b0`X}2=#nVtMZpc zeY~ZrCHmCr(On$NT*UB4YpCcK7WJw-qMumQ(|;oRXyUIU`ZJ6E3VRq9nz&5-hslcK zp2^Dm$xsUwHl?c1gnCoFt2O@&u8?#()KePhNlb}VH5M>m4}Dzpbuw~KS|WN;^Y@O5 zh|eF>sH8#uX5+&O2@~-aziyryTsy*gSqNWXs$b-Ou#C?y<+Yr)!SoLGY>OXfh({8{ zPm62F%ix?!Dva}v;w@X{-vBq8*`bZ9uH+B@#J#Hw1;>{F{JOQgrES%k*45PatNl;) z9o#>4(4x`Z+xqrTnf;r$n)91~Yqo7^zI)Rkjg5|GpUBF%so(NVGRT+>$E>(#aNI{t zU2yw7e72W}7h(4ROXI5hB<6-0GZ}QY@B9Ie1@>6~e#B$h*=Xd&v(%Z%!RFf-y@4mp zxNN5z(S=dzceVP07X_Af*0?xiB)=)AgmL?wlQ~zhOpJl<;Vt zeQVzZ>!1USTkT`{u7hmjC_1~toEhzOA6QiEF$++994eC8 z2|$*d8Qwbh%dDhzr=jBbVcc-!_y-Q$9IN!F0J2nu_6QGeR%ni{@+~MGuCHV0r`E2r Kp`VOZ_x}O8e}?k_ diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index 10b95d09815dbd5b5abe2a9b9cb5b7eb757aa9f8..b28228617135f0ee03e3b7426a289ba71b98b12f 100644 GIT binary patch delta 5314 zcmbW44Qx}_702&;j+;1*)0p7cApyTQA#o@qc7P^vNHGBlYw3U$DmF?JnmC0H662VW z(oJ?(rcvl5qe~?=Rb7x)jSVCc!UWQ~5osx+Rkb0(1lpuE`{)|2ts2y7%d}POf9^Xk zytpt;JJR(%zjMw#_v5{L-}Ugpsl!?}q=#e279G}xkBzR>PK|sT(6mFNrTPb&wr=!a zjvIMecJx|7RbHondy;`;Mctudv6vr+VZ~=||&@MU5c*=Jlic?fd zhdb^Vm^UuF-D`U;krCp(&pDIFw3xql`g1A+^;2&Ppnl50m%2@()z5c1@4$28x8)hP zzjG`67i@!h3A|OfRI7VWa%#rEQfcN7b-Em?)tOU#IQo0{Z}aZocXIqTg$$Hz89hDc zn%*mJ-;O7f{YhhgqGxAnci*!{@1Fhx#=@RLVcK{0^vtq$_wV1+(^r==lJS9f@*8nu zAhEMMZtU(eQoG_t60y(h?`<&FB>NKytD%q??_xRZEh@a8F9els#iN(K=X595EN{O2 zmcO}*p3MYGMy;|asD^%s^&03$vjGA?W{ReGIx2%St&XP)b-A@RyrwHevb7C)cf@|o-tmcGz^Q- znF0L>>qF3wa|0t!2uen?i9tUtcg*vZ{|pvd_c2)BWG^;^cuRf{-e0pN1IrEQa}YTM z{m=3j^VX$5f@Lu*Hqjqrz>J**(EpB@23r7BeZ!p?H{9Z%_^CtvIcl^kYGglfmj6lE z>798Jvx3N%Dm;Z&+vnZ#^@?ER=WgXHrk#LPmK3&Hnh-8ZG0t%<^6iR18mlNUe^G>T zjXf)%Q-9|^ulX-=H08O)Dxmlgj(f0JK3o}0M_|U;^8i_!*+pZBvQFb|XPwqyH43c~jd3js zy~8w&btp7H>Z%nv(N5)zfk`OmDsu?=#2Zb0^lW8XbK-32>mOaR;9NoBo_-uU$#}|W zmw%{TRk!NPU#)Kw2FvbVBa}+wfa%;D9~jvAY<$3|t)*x=Qs-UtOrp0lfg>mN^-%Ec zr`S_{!Nn@PrtZ<%JuQ*1*Ohu^_r!VAU+lHscwuHt*4Vz-`|PvQ>f1IQYT6e+&`D=( zgIWWl-B^Nd?~DTx@~ygqX$t+{#zB$xvHU+LbTDU-Nx37GFV}^BQapS0BbPKOe_>DFcmgS&0q1U`!Q-tsS_ zcy5D}K}iMT8&QhRJCx62Czx_QZHwFO5NRO>^_jr42ykU|5ke3(B=&)6AQo4ETWq`( z9J4X`H`zD>PS}|8XP7^Ap#S<;ba9vs3$b4oW_6Ke-U#-;ri&rwgJ3T{I>R=0fiE(j zhTmDKtNRcQ_oi&=6X2W6PkS5lnF(vpDIJj67o0#1= zM&>NjL>F^4aaa=xTYM$r2bo*pceZHaF!NS$`5H}}FgY#A;D+H82dEhz*Th8|yTD`2 zM-gB4BQTbh2KZxQtmhbJPW&ry@C;UrSq=C+R*YE<7|T7S7|)*?RHDH&2Yd{6Uc?e& z=9Ey4cw7^H8@s__<^pUR*A`uLp&iAq2A6kXMQrQ_4>IFbQ8GrHwi@7pUbQy>-^^*? zM`G&0lJ(5bqxKKut!Mr|>N)2zAe-L_10->^VQa-{k)oykHLoWaCP3h`9sG!@fu#do8OP25d`|{m#0VfJx3vPsCU5Lm1`6DXawV zpOVXRRis+237uCYl&>7+1cz8WX5)GA)8-{1KfOh?8A+`8t2U;1+G`|MJnbdsp!$zS zn}-d88qi*`xJ0NHV@_O@X_pF<<^tQaLH?_zU zE%W;$4~Y#O9g)taY6Aj4WXn|~rmgt|A$w#BL+AbN1 z`drf-5|YPS1KNjjB0>{A3!sho=Xy*;!&Az5B$1?1Ik$Vi*dJH1p{#3Y~Q zw0nJWN85w0iHA(jusqfl&>oZf+I)JpS-#U&t9?zuP_rytQLBAG;2P$?yIHw;MRR?u t#caEYd&P&E{?n^M9_x&da%6?imF2(gg>rK=p#4VPg6#)vlhV^3_z&>Rw?Y5_ delta 3493 zcmZ9Oe{2(F7{}lDx-jVfs@Q4!sje|?S+)DHQwE&6+!J5Pwbkh)RU8bF~ZipwEu7EcZIVYRM&apo0x z=79{f*Pz4b2hmPgne%I*EeV0;$8h-oj%tjZgU&%$peH-aIs*L_YXkZl*1gcvtOud5 zNs%#&M*)9wAP0S$^)z%oHo}|$zGBu9=qPIgx}J3}bTf1ub%W5Yc0T$x)&~0Ztb4VL z5E~f=(ci~<6#Z7#IrQ6EPov+%THwhXfUd)25$MCLN1O*UzixBfy;)3}^yHCAaS+0&$dP*s!gTrkc-sDp|7goFPhg(xe z7gp3!d>byPT>vRo(FI1uk6OiZiTC@mTKKTc9*InNR7c!!uj}dT-mHF&Z^(OWXCww! zgW6M9tWL)RDpr3$wKaIu;)ZwB<%WQ2U6RTi!9M2D$Y|g4ao4n7vIN>z2RDLU)|x0R zn$Zf-sD}2FEC3xx^X+n(F`M0B8mrq%GxHvC;Yf&nZ8q^8BzO;Mg3cWk66;`Eh|LvX z+Gv}bz=ned-s*b&C|)*jKsG3HKia6}WM%>CeiPZQ$~ z_JYqapFrFl)XWbYE#NY7ND~9tQsN5ibTFccF`F}Rl)^F2j(fn~m}Y)ys9-<1#3RKd z$BDh*Y38@UZhTH>n13P;N#Vispn5yr!)(XLm@_eaLfsK5Zn5KO;u+h{YK}4nG`YoM7JZF)7YC z*bB}vKa2d*Q7}CU_+BS|3yeD_{uo?6h8<%z7d(y~V>TCj3Oi;po<9|YP;iSAeg?bW z#}?wwX+U$uIVpk;_Jb458jg*(OB21Qqx@CivInsv4)%iwnDMHZmLtyC1(&d5X94)a z9R)!QOcU7F!MqQB@D$#9W`(|J91C*93lZ;Qw(AGL85(c`q9%v8o)e@OcXU=0Bg|27 z)j3UEW$pxr&m)eW>SyE3cKjA|Kl;i`n&`>5@+rO;^}YEx|1^MIu!|E;BB5>)=N>=S z6t@R3nC<+lcf@1(INAArOqgK4g7Fg5xIyM9=J8(F#H0_;p9=QDQH3LLffHT=mkF1c zX13!Uek=by#QhS$~9p5wfYGzUGYF$dF#Ic8cN&3N9yrQj=$d|jZ7lboKwE4fDPHlkJbD`+ji`qyD=>hDr}4P8qF)R)F0*=a(T1s|zoO855BAADGW z+LQ{*H`U&ht_=p%3#mo2o4`B72{AQggjHY4qi&@N0^{s`n)cB)bu(4066s)|F>bl& zS7O#EDzj>DS`P%5Szdz=AMIhGdNp0|?cx82U1};FmMv-`t!raRmEXQdt~23GQf&Zy zM&La+n?2m#8V@#FqmJ^d_)M99!uUNhpA=>6Q8(Ll@0nIBdyX2$jh|G>wYv8c_9fM( Hwc&pOvT&a& diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 98163b2b6..3215327d1 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "sync/atomic" + "syscall" "time" "github.com/cilium/ebpf/ringbuf" @@ -28,6 +29,7 @@ const ( qdiscType = "clsact" // constants defined in flows.c as "volatile const" constSampling = "sampling" + constTraceMessages = "trace_messages" aggregatedFlowsMap = "aggregated_flows" ) @@ -50,10 +52,17 @@ type FlowTracer struct { cacheMaxSize int enableIngress bool enableEgress bool + // ringBuf supports atomic logging of ringBuffer metrics + ringBuf struct { + isForwarding int32 + forwardedFlows int32 + mapFullErrs int32 + } } // TODO: decouple flowtracer logic from eBPF maps access so we can inject mocks for testing func NewFlowTracer( + traceMessages bool, sampling, cacheMaxSize, buffersLength int, evictionTimeout time.Duration, ingress, egress bool, @@ -73,8 +82,13 @@ func NewFlowTracer( // Resize aggregated flows map according to user-provided configuration spec.Maps[aggregatedFlowsMap].MaxEntries = uint32(cacheMaxSize) + traceMsgs := 0 + if traceMessages { + traceMsgs = 1 + } if err := spec.RewriteConstants(map[string]interface{}{ - constSampling: uint32(sampling), + constSampling: uint32(sampling), + constTraceMessages: uint8(traceMsgs), }); err != nil { return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) } @@ -376,8 +390,7 @@ func (m *FlowTracer) Trace(ctx context.Context, forwardFlows chan<- []*flow.Reco func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlows chan<- []*flow.Record) { flowAccount := make(chan *flow.RawRecord, m.buffersLength) go m.accounter.Account(flowAccount, forwardFlows) - isForwarding := int32(0) - forwardedFlows := int32(0) + debugging := logrus.IsLevelEnabled(logrus.DebugLevel) for { select { case <-ctx.Done(): @@ -399,11 +412,15 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow log.WithError(err).Warn("reading ringbuf event") continue } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - m.logRingBufferFlows(&forwardedFlows, &isForwarding) + mapFullError := readFlow.Errno == uint8(syscall.E2BIG) + if debugging { + m.logRingBufferFlows(mapFullError) } + // if the flow was received due to lack of space in the eBPF map // forces a flow's eviction to leave room for new flows in the ebpf cache - m.flowsEvictor.Broadcast() + if mapFullError { + m.flowsEvictor.Broadcast() + } // Will need to send it to accounter anyway to account regardless of complete/ongoing flow flowAccount <- readFlow @@ -413,17 +430,28 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow // logRingBufferFlows avoids flooding logs on long series of evicted flows by grouping how // many flows are forwarded -func (m *FlowTracer) logRingBufferFlows(forwardedFlows, isForwarding *int32) { - atomic.AddInt32(forwardedFlows, 1) - if atomic.CompareAndSwapInt32(isForwarding, 0, 1) { +func (m *FlowTracer) logRingBufferFlows(mapFullErr bool) { + atomic.AddInt32(&m.ringBuf.forwardedFlows, 1) + if mapFullErr { + atomic.AddInt32(&m.ringBuf.mapFullErrs, 1) + } + if atomic.CompareAndSwapInt32(&m.ringBuf.isForwarding, 0, 1) { go func() { time.Sleep(m.evictionTimeout) - log.WithFields(logrus.Fields{ - "flows": atomic.LoadInt32(forwardedFlows), + mfe := atomic.LoadInt32(&m.ringBuf.mapFullErrs) + l := log.WithFields(logrus.Fields{ + "flows": atomic.LoadInt32(&m.ringBuf.forwardedFlows), + "mapFullErrs": mfe, "cacheMaxFlows": m.cacheMaxSize, - }).Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value") - atomic.StoreInt32(forwardedFlows, 0) - atomic.StoreInt32(isForwarding, 0) + }) + if mfe == 0 { + l.Debug("received flows via ringbuffer") + } else { + l.Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value") + } + atomic.StoreInt32(&m.ringBuf.forwardedFlows, 0) + atomic.StoreInt32(&m.ringBuf.isForwarding, 0) + atomic.StoreInt32(&m.ringBuf.mapFullErrs, 0) }() } } diff --git a/pkg/flow/record.go b/pkg/flow/record.go index 0ea127118..951a03caf 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -65,6 +65,8 @@ type RecordMetrics struct { // and monotime.Now() (user space) StartMonoTimeNs uint64 EndMonoTimeNs uint64 + + Errno uint8 } // record structure as parsed from eBPF From ab3b8478e305129dae0c6069c97e539cfa73d7a6 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 11 Oct 2022 15:01:24 +0200 Subject: [PATCH 2/5] fix tests --- pkg/flow/record_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/flow/record_test.go b/pkg/flow/record_test.go index a3d465ae2..e26618aab 100644 --- a/pkg/flow/record_test.go +++ b/pkg/flow/record_test.go @@ -26,6 +26,7 @@ func TestRecordBinaryEncoding(t *testing.T) { 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 bytes 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_start_time 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time + 0x33, // u8 errno })) require.NoError(t, err) @@ -53,6 +54,7 @@ func TestRecordBinaryEncoding(t *testing.T) { Bytes: 0x1a19181716151413, StartMonoTimeNs: 0x1a19181716151413, EndMonoTimeNs: 0x1a19181716151413, + Errno: 0x33, }, }, *fr) // assert that IP addresses are interpreted as IPv4 addresses From 41c2ab1cc3a60045ce930eee1298b5ced9c08a76 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 11 Oct 2022 17:39:01 +0200 Subject: [PATCH 3/5] Fix e2e tests for out-of-order reporting --- e2e/basic/flow_test.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/e2e/basic/flow_test.go b/e2e/basic/flow_test.go index 085eb0237..985c8b960 100644 --- a/e2e/basic/flow_test.go +++ b/e2e/basic/flow_test.go @@ -4,6 +4,7 @@ package basic import ( "context" + "fmt" "os" "path" "strconv" @@ -99,17 +100,15 @@ func TestSinglePacketFlows(t *testing.T) { require.NoError(t, err) logrus.WithFields(logrus.Fields{"stdOut": stdOut, "stdErr": stdErr}).Debug("ping sent") - sent, recv := getPingFlows(t, latestFlowMS) + sent, recv := getPingFlows(t, latestFlowMS, pktLen+ipIcmpHeadersLen) logrus.Debugf("ping request flow: %#v", sent) logrus.Debugf("ping response flow: %#v", recv) assert.Equal(t, pingerIP, sent["SrcAddr"]) assert.Equal(t, serverPodIP, sent["DstAddr"]) - assert.EqualValues(t, pktLen+ipIcmpHeadersLen, sent["Bytes"]) assert.EqualValues(t, 1, sent["Packets"]) assert.Equal(t, pingerIP, recv["DstAddr"]) assert.Equal(t, serverPodIP, recv["SrcAddr"]) - assert.EqualValues(t, pktLen+ipIcmpHeadersLen, recv["Bytes"]) assert.EqualValues(t, 1, recv["Packets"]) if t.Failed() { @@ -126,15 +125,24 @@ func TestSinglePacketFlows(t *testing.T) { ).Feature()) } -func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]interface{}) { +func getPingFlows(t *testing.T, newerThan time.Time, expectedBytes int) (sent, recv map[string]interface{}) { logrus.Debug("Verifying that the request/return ICMP packets have been captured individually") var query *tester.LokiQueryResponse var err error - test.Eventually(t, testTimeout, func(t require.TestingT) { + // We need to check a reasonable number of flows before + // due to the chance of out-of-order submission (e.g. a later flow received via ringbuffer + // is forwarded before an older flow stored in the hashmap) + test.Eventually(t, 20*time.Second, func(t require.TestingT) { query, err = testCluster.Loki(). - Query(1, `{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP + Query(1, fmt.Sprintf( + `{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}`+ + `|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP + `|~"\"Bytes\":%d[,}]"`, expectedBytes)) require.NoError(t, err) require.NotNil(t, query) + if query == nil { + return + } require.NotEmpty(t, query.Data.Result) if len(query.Data.Result) > 0 { sent, err = query.Data.Result[0].Values[0].FlowData() @@ -144,11 +152,16 @@ func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]inte } }, test.Interval(time.Second)) - test.Eventually(t, testTimeout, func(t require.TestingT) { + test.Eventually(t, 20*time.Second, func(t require.TestingT) { query, err = testCluster.Loki(). - Query(1, `{DstK8S_OwnerName="pinger",SrcK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP + Query(1, fmt.Sprintf(`{SrcK8S_OwnerName="server",DstK8S_OwnerName="pinger"}`+ + `|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP + `|~"\"Bytes\":%d[,}]"`, expectedBytes)) require.NoError(t, err) require.NotNil(t, query) + if query == nil { + return + } require.Len(t, query.Data.Result, 1) if len(query.Data.Result) > 0 { recv, err = query.Data.Result[0].Values[0].FlowData() From 0a2446fdcc37c7b01529622f3c9fb4ca10bb6509 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 11 Oct 2022 17:40:02 +0200 Subject: [PATCH 4/5] rm unneeded comment --- e2e/basic/flow_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/e2e/basic/flow_test.go b/e2e/basic/flow_test.go index 985c8b960..e35dbc4c7 100644 --- a/e2e/basic/flow_test.go +++ b/e2e/basic/flow_test.go @@ -129,9 +129,7 @@ func getPingFlows(t *testing.T, newerThan time.Time, expectedBytes int) (sent, r logrus.Debug("Verifying that the request/return ICMP packets have been captured individually") var query *tester.LokiQueryResponse var err error - // We need to check a reasonable number of flows before - // due to the chance of out-of-order submission (e.g. a later flow received via ringbuffer - // is forwarded before an older flow stored in the hashmap) + test.Eventually(t, 20*time.Second, func(t require.TestingT) { query, err = testCluster.Loki(). Query(1, fmt.Sprintf( From 214445b11fd6a0696a695926be42254d32d9ef09 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 11 Oct 2022 17:40:24 +0200 Subject: [PATCH 5/5] extend eventually time just in case --- e2e/basic/flow_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/basic/flow_test.go b/e2e/basic/flow_test.go index e35dbc4c7..05559466e 100644 --- a/e2e/basic/flow_test.go +++ b/e2e/basic/flow_test.go @@ -130,7 +130,7 @@ func getPingFlows(t *testing.T, newerThan time.Time, expectedBytes int) (sent, r var query *tester.LokiQueryResponse var err error - test.Eventually(t, 20*time.Second, func(t require.TestingT) { + test.Eventually(t, time.Minute, func(t require.TestingT) { query, err = testCluster.Loki(). Query(1, fmt.Sprintf( `{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}`+ @@ -150,7 +150,7 @@ func getPingFlows(t *testing.T, newerThan time.Time, expectedBytes int) (sent, r } }, test.Interval(time.Second)) - test.Eventually(t, 20*time.Second, func(t require.TestingT) { + test.Eventually(t, time.Minute, func(t require.TestingT) { query, err = testCluster.Loki(). Query(1, fmt.Sprintf(`{SrcK8S_OwnerName="server",DstK8S_OwnerName="pinger"}`+ `|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP