From 2a48e3223d94b30adf4777aea4bdafa64b288ab9 Mon Sep 17 00:00:00 2001 From: Yuya Tajima Date: Wed, 25 Oct 2023 13:40:24 +0000 Subject: [PATCH] Add getting IOAM timestamp from xdp into meter --- cmd/fluvia/main.go | 7 +- go.mod | 8 +- go.sum | 3 +- internal/config/config.go | 1 + pkg/bpf/bpf.go | 88 ++++++++------ pkg/bpf/xdp_bpfeb.go | 6 +- pkg/bpf/xdp_bpfeb.o | Bin 7368 -> 6240 bytes pkg/bpf/xdp_bpfel.go | 6 +- pkg/bpf/xdp_bpfel.o | Bin 7368 -> 6288 bytes pkg/client/client.go | 18 ++- pkg/client/exporter.go | 2 +- pkg/client/meter.go | 249 ++++++++++++++++++++++++++++++-------- 12 files changed, 286 insertions(+), 102 deletions(-) diff --git a/cmd/fluvia/main.go b/cmd/fluvia/main.go index 1541761..f0851bd 100644 --- a/cmd/fluvia/main.go +++ b/cmd/fluvia/main.go @@ -51,5 +51,10 @@ func main() { ingressIfName = c.Ipfix.IngressInterface } - client.New(ingressIfName, raddr) + interval := c.Ipfix.Interval + if interval <= 0 { + interval = 1 + } + + client.New(ingressIfName, raddr, interval) } diff --git a/go.mod b/go.mod index c3d0b70..e390a5b 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,9 @@ go 1.20 require ( github.com/cilium/ebpf v0.11.0 github.com/google/gopacket v1.1.19 - github.com/pkg/errors v0.9.1 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 + golang.org/x/sys v0.10.0 gopkg.in/yaml.v3 v3.0.1 ) -require ( - golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect - golang.org/x/sys v0.10.0 // indirect -) +require golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect diff --git a/go.sum b/go.sum index e5f5bf2..18d73c7 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -17,6 +15,7 @@ golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/config/config.go b/internal/config/config.go index 69de831..de82e21 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ type Ipfix struct { Address string `yaml:"address"` Port string `yaml:"port"` IngressInterface string `yaml:"ingress-interface"` + Interval int `yaml:"interval"` } type Config struct { diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 4936e2a..fe5a2db 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -7,45 +7,78 @@ package bpf import ( - "fmt" + "errors" "net" "github.com/cilium/ebpf" - "github.com/pkg/errors" + "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf/perf" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -cc $BPF_CLANG -cflags $BPF_CFLAGS xdp ../../src/main.c -- -I../../src -type XdpProbeData struct { - H_dest [6]uint8 - H_source [6]uint8 - H_proto uint16 - _ [2]byte - V6Srcaddr struct{ In6U struct{ U6Addr8 [16]uint8 } } - V6Dstaddr struct{ In6U struct{ U6Addr8 [16]uint8 } } - NextHdr uint8 - HdrExtLen uint8 - RoutingType uint8 - SegmentsLeft uint8 - LastEntry uint8 - Flags uint8 - Tag uint16 - Segments [10]struct{ In6U struct{ U6Addr8 [16]uint8 } } +type XdpMetaData struct { + ReceivedNano uint64 + SentSec uint32 + SentSubsec uint32 } -func ReadXdpObjects(ops *ebpf.CollectionOptions) (*xdpObjects, error) { +type Xdp struct { + objs *xdpObjects + link link.Link +} + +func ReadXdpObjects(ops *ebpf.CollectionOptions) (*Xdp, error) { obj := &xdpObjects{} err := loadXdpObjects(obj, ops) if err != nil { - return nil, errors.WithStack(err) + return nil, err } // TODO: BPF log level remove hardcoding. yaml in config if err != nil { - return nil, errors.WithStack(err) + return nil, err + } + + return &Xdp{ + objs: obj, + }, nil +} + +func (x *Xdp) Attach(iface *net.Interface) error { + l, err := link.AttachXDP(link.XDPOptions{ + Program: x.objs.XdpProg, + Interface: iface.Index, + Flags: link.XDPGenericMode, + }) + if err != nil { + return err } - return obj, nil + x.link = l + + return nil +} + +func (x *Xdp) NewPerfReader() (*perf.Reader, error) { + return perf.NewReader(x.objs.PacketProbePerf, 4096) +} + +func (x *Xdp) Close() error { + errs := []error{} + if err := x.objs.Close(); err != nil { + errs = append(errs, err) + } + + if err := x.link.Close(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + + return nil } const ( @@ -55,16 +88,3 @@ const ( XDP_TX XDP_REDIRECT ) - -func PrintEntrys(entry XdpProbeData, count uint64) { - mac := func(mac [6]uint8) string { - return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]) - } - saddr := net.IP(entry.V6Srcaddr.In6U.U6Addr8[:]).String() - daddr := net.IP(entry.V6Dstaddr.In6U.U6Addr8[:]).String() - - fmt.Printf( - "H_dest: %s, H_source: %v, H_proto: %v, V6Dstaddr: %v, V6Srcaddr: %v -> count: %v\n", - mac(entry.H_dest), mac(entry.H_source), entry.H_proto, daddr, saddr, count) - -} diff --git a/pkg/bpf/xdp_bpfeb.go b/pkg/bpf/xdp_bpfeb.go index 993d162..b6c41db 100644 --- a/pkg/bpf/xdp_bpfeb.go +++ b/pkg/bpf/xdp_bpfeb.go @@ -60,7 +60,7 @@ type xdpProgramSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type xdpMapSpecs struct { - IpfixProbeMap *ebpf.MapSpec `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.MapSpec `ebpf:"packet_probe_perf"` } // xdpObjects contains all objects after they have been loaded into the kernel. @@ -82,12 +82,12 @@ func (o *xdpObjects) Close() error { // // It can be passed to loadXdpObjects or ebpf.CollectionSpec.LoadAndAssign. type xdpMaps struct { - IpfixProbeMap *ebpf.Map `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.Map `ebpf:"packet_probe_perf"` } func (m *xdpMaps) Close() error { return _XdpClose( - m.IpfixProbeMap, + m.PacketProbePerf, ) } diff --git a/pkg/bpf/xdp_bpfeb.o b/pkg/bpf/xdp_bpfeb.o index 027530192909721a863c2c4a86f0aca0718c3329..93101db9105c26eb1e4cbd49ace4078fda38f439 100644 GIT binary patch literal 6240 zcmbtYOKepR()DEK+B1sh z;dxk2(^5;MRS}iQ0;EWw);tzrffQEF0%^QpQ6v^#MhKAE6|wOGl?oM^?>~>3GsiJm zaO89U^ZoDtzvte2#&1tdUGja8n!}@iVQn#*5-bi1iQ@XYkGw`&t||SlrpvU4S0D8% zeCOxepYiG)1v~DnZEx|i(We+)O7Gc^6!c@yQNYWkKH4W~l=d=R5PYTX=@%p=qdzO? z2GP5Jar_M%IsDDC#Zr_GFg1MGB^ftH_)63@D9MPC$Ij3z%xOd=pSpykXPfdP^eyHr zjY~cqjH{kCzOHcNq@vod*R(rB(qd`MvD5m-UdP+hv^zrQ*uum!{Mk621$zkm2+J-0 zFyn^KuPe@o_8NKYGRcUrllQujl5*-wNc!nDMYl~IJ>OLF#!Q=DE5BZ*=u2$C+i)9$*F2W?lh1Uvm!1XL^R*R5*nee7L|<>a{`8XMQ$y|s z?4!7pZ#BNG_n*v!QfmlIPg`p1Xzw&|zH zTf-xZ+4+5z$pX*|EIp}(IzR=>oSClNFf!LSYuC@H5OWV6H)tc@T9jXnpRPbYs=s10D9mdW*Uyt{bYI zbqKYovI8Ho&N~)=KX^R$QNCECAh?iBUbqp=TzPdONZm+G1Oe4lwgw<-0pU)Sw;R;n&f~9&` z4+90mc4-jQPo1QCv6^2fhFQUBWoj!j20||If(A|u$iHeObE1&W7Y8z&zTiAqCt9sl z>KTO#S#cEh{Uk7!OL;)fQ=Au3h5Dn5rQ$+AbYi0{dD+qAVs_VNsj|>9xndarfzOLO zZz*9RJ5UKTVSYKx2E}x-R1GsF>Go!MKDbrO7s6lxJuOzF{mLLeFQUx^!&*)p6GQ!C zRIqD~sp?@Z2xFr;uDezeFPsP@Hl^78QRURfF&Eru(# zT(-h)uOt%5>C|*EIh~kJq-LJrFjp#<%Kt0VX6W+nagUb8c@B1UI7m%h4H6eFqU<~J zmbA4bUrHB-cXq{6RT{BTyPZ*5P8{UwTHCDF>%`79RgX_D+o8Fz1MHHsoMcg?;5}{dRh#7PZw*Y++y@-R6fn( zXn%7v@65elZ~3YThw^z7Kh1?vR~;~3ZiTmFqgA}C0^476t!lNDOwy`7tybvW{KRK~ zw`X#CHZ`H1f@&p)l+8;Ah#wiN9&z@u>mv?fY_w9U*YGAy*;{!gIi5_Z zZF;(2oH(H#9Gi>8W|EWg`N6FiJ}u@K(+gE_#}b!g3-}FJa%$-Fh<$5iOS0?uIj?cky6+>@x_EnmX7P5SU-l9w{U7=tx zp9zcAkOm6ra+M4+X9pp_NF>k2I4wnfhq+fcv%HhDS5+)o?#we(kD@9Fw|_*-H9k(T~l#&HMhe)u1dcOPdjFQ?^k5r?*xBU$tC7^?JVgHU9gyq#J43WiZbq&6M1pg!b<}OD)UeWg>CXrXExyNb&A{F;`X0wO z<3%*k@gKPk;aS`WB;57$xKDPgL2X8od)4^NqSe9-1^Lib*aPXjm;|@+Zc+SCP2d_H# zo`csM{LsPc4&HF^rh~WIvHbOD)i2*8HWrTlpo8NMPB?hZ!DR=pI{2Q0*Bt!N!Rrp* zaPX#sx7x9MjI4U)d%(uR(I0eh+`$P4&pEj4;8h3TbMTsjA3Av5!5a?VG`OdCKMmCI zyNK|MZgF5Nb!k8Xd6Hok|2{Q8L}_4gak&trv)KxM&OYInbL%98pO3ZlO&X})E=a1K z)7045U@)S{$st9DhLFwwKT>evy#I{T13%TPyN07F-%~SqZLs@Zj2k6~n)e^G`F!;h zd-*=LU36Q3&p-Gk$r0C7limkr0m}b#F?AaIUhr^UXvQ~kuy{o7D=N>6dMSyqXE23n z@^|e`{C(1@bus$P$zj^KrA3iP3Ur$@t&!RN%YwOE{o0-cbhv*0_V6(IbN&3ktX<+i z^G{YrAGQ>2UhVkobGP`9l|^J7l;aycJN~iK_LS_~Diqqs0n>k{uWk9h4jcV%yLM}$ Ro=-Gq9h7S{-*Xec@4ql@RW$$r literal 7368 zcmb_gYiL}@6+Tyc^+@xu+}f?2^l|N`W@Sn1m0}dNvdzlwDprxLwIm1IaeB4VUP)U^ zyX$=z$xf&m`YQrC9WD$N@#A2Ewi@ipAvpBw5MbWHU={ zS2|kl)YuqR_*6$1-D2rNCq1dKkDg$hit(G8hK?$vQ!)A-(f9viwC_gy->?PR?_fL? zr5k`Y!|pKbe#P~$>tTf5b%1?K$3YWsmLISl_>tg-;D>_m3%)0~F8HqCeZhAG?+Lyw zcvtX_;F{oDg0}^43a$wLT9D)A_?88i1g{FdA^20lMZwnvuLyoua6#}D!8yUAU`}vG za9S`WI3{>b@U-BlU_x+E@R;D&1!IB-1$zV!2tH)I{ASr?tnjZ&{Y%2XCS0vIZqe_EG0X2z9&XBf~)let`J&|x9Tji5MaRcgs(ep_b&BXI7FWNe6^&!sepO{h_lNO1X8Tn& zn?$!_s+}eN>bA1;``byR*73Z=BXL`oC7CaU|H4(Ai!#|K8~gI}t~E!sG=j4tpr z4lq}nNcU+;7dFu0Iu!^D+Klthd%NDZgY4k(eKfHAXd7-;vEe!01r6`1`*@zkqs|pz zo5(zayvKMSj^W{T#%pB(&kI^s5e!2{JQnMD^|5>fm=bvrm=pO8;Hx4p0e>L!b>NTi zARcQ4_+vbugS-a(g<625&Ox0)R6wuPfpKz9FeJ4OZ%7*+-%XKu{rp~X#_4Xx^G7sb z70Z7X1R%LOg{@ zZ0Fz!w*L$$>-h=lc}`fTz7w*JcEQ&+@%uF7oic7##qe+gZsVXX43uiC12TrrHWr^l zynPoS9|H2aU4wj7ZA7AHz*Rfx6X&os0;jzGU~BPO;PIhYE?z7q)sO>ou^To#PCSiW689c(m^Stzanh7N z4;>c=`o4hv%Sy}b)0rZ*9)oH+m5Gc`J&^QhoW&3c() zVbGf_(oC{YjAx4Zd77L_P8Fz_oT?Zgs8SD=GKKV1#!ER9)5$zk<6dVs*Wg{LDokhd zMU11{RsFM2qLOCtpapKArRTbo9|a}6D}Hn)O5F#ls+nwN%HbCkIr!P+oQvS|X|F)% zy?MHroGE#f&P}H0I3>u}Y%)i4shm5TB9%IRp^0YhtcPqO6M3&taMP3NOv;<1{G5BC zrTL#IaLRhNZ4A_y!sZT@1n zJ2HH9cvLUz1Fg>f{cvJvk%p1sV`?mgV!kv{bgHZ0!9=6}@~b){SbNn(BVk zu0&YDxzMh;zPejsq7z%-4O3$wo48_;sxJEtX{kiO()C&RL^FgM6a>mz!ekWDL)=uP$hdDrUf zbHZu8g*M%t92XAGO;)C#&6m)+OlIV9&48>1lLc}#Y`k&+C z#(~&~d06l7*0#1@-0$>9XeK@3WeOfeFzN#N3?Edv2jg3-3V-MEgzyaxMMa12xOD@v z|9y5K<36Qy^2@{eE%+Uxb9}>Rd*=Tn8sqOf+k)5O?vvwtp=0o{@EFl6d@VQ@;B21| zo+Wx6_fQ%OaIT*g{&S+MeE-S?IM*)<{{zu&_K$n7iibJZU)MZ@Ys?+^uZsz9C%TJ! zc%8%(0=|v=Slyb`nQsS|_(Ffj^_BB_Ti4sT2fYh!Hwj1X z-a~s^@?|qedmB2A*X}2JA33z00LMMl4hzSxu@Bi_G{A9>wR?pBglGfzH9HpIxHH=r zm-7D>(MRk*5#a3qwD5JJ2W&6nvza5`wv5kyi0d=fj-3;G=D73QbDI14wXbMizk@6r z`LnMJ?MC-!u5S!8dBT+Gc_I4>cS0j=`e_kNJFC z^LC09@k4?ld2c3iWVAn`i28=Jiho-fx0dP;ugJ{I#aZ?95OQZb^EtsTL5}hNQHJ5fIEQ?d Z7B=dDt8L3{$BO7U4$is1KK=e%{tM~-T!#Pv diff --git a/pkg/bpf/xdp_bpfel.go b/pkg/bpf/xdp_bpfel.go index 12b957d..ceb20b7 100644 --- a/pkg/bpf/xdp_bpfel.go +++ b/pkg/bpf/xdp_bpfel.go @@ -60,7 +60,7 @@ type xdpProgramSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type xdpMapSpecs struct { - IpfixProbeMap *ebpf.MapSpec `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.MapSpec `ebpf:"packet_probe_perf"` } // xdpObjects contains all objects after they have been loaded into the kernel. @@ -82,12 +82,12 @@ func (o *xdpObjects) Close() error { // // It can be passed to loadXdpObjects or ebpf.CollectionSpec.LoadAndAssign. type xdpMaps struct { - IpfixProbeMap *ebpf.Map `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.Map `ebpf:"packet_probe_perf"` } func (m *xdpMaps) Close() error { return _XdpClose( - m.IpfixProbeMap, + m.PacketProbePerf, ) } diff --git a/pkg/bpf/xdp_bpfel.o b/pkg/bpf/xdp_bpfel.o index f9363568f4bbeb303be957f9a8a7813fdb5f1832..872c945eaa6535b37fc1391b359edcf1745ea870 100644 GIT binary patch literal 6288 zcmbuDOKcm*8OMjTQi+MJhauUL5v1GHZb@0B9#$2#wggJDTp3j>mZIz=4zOC2E0Qpu z%cUSkPNO(Zo1g`%Lksv2v^wPA9tx-@`$iplP|+NGGzzy+Z!LP!DK&}|?*E$yxg1IH zp#u$Pe)BzNc4i-Xe|G+g-{)hh_}IT$gVZQvv7Sah%DYi^0-Ci`V%*BeHr>KI52cg! zGtZvWz(=Srp{~EY&$;#ijWX5>y>_$@QT?dlvV~v9${!pT?cEc+O+(m*-oi)fDxx0|FqWto0aUwCT+ev+8-->v&l65<0QrO9OYJ92XA{&OJJc>Xl zjd7X{)K!Wpfol80R(D38j#MOv&C7wHS=T}Mk7O3kBV5kUMM(UM+Me${?1pg8|Q z?>btt@0b?)wX>onIsQC(h>4f)y}=c%3qsF&U9?ZJ4hFi({xA8yd5URr{+`%Z2W6x1YmrA({iYQCIcAodwf5wreTSXqd zSU#`*!f7F|eI{CpzYm3)CAe3(6M)}ZMRXrxJv-6;2(=GY<@IYJKJa_FYUPaS#mj@= zkMWK8+kg2OM!jZ=wAJ6{U^#{V#d9c}~dj1YM3#Q3I%3s2Dp=cbv4CEWa~ipQ!23GO`X- zS|`%W3B7ijHz`%YXi0_dFKS7)?iuf*K@@&x%NAT_np5%k)LZ7_)wgHO#9Ogh(`4@L zlFjngnrRnYCu>(&$x7a`U9;pAZ`x+bc2?M0s$}LVC|ADF?GjC%kTSz;0rDA4R zvW1kr#+)^?T(zsVDbQ|~nr8LF2&)z<*>u598I)GiUE?tjGEE&88I171ijy47TiHS+ zi6E{)d5})9;yTr&;C#yHhkb|{D67RRkn%JpjG&-UFjp+3L(mCNlH{VMNs~ERo20@* z#-thZ00=x`tiLCO`BcQQlXiC1PMHO(P^{R=B6oYUv|`?Jvw7Q0V@(T{U`QBbSBzji zLEFt3=Z)czF=>!%iYewpS`daOGc<3vXnbSBAdeZh2v@U-IWuNnja?fHHPq`)v!&Is zqrH+eJXx^U+)T~Y0 zH8VDK8Pi_pm!xS)vPCODc5qcZszM_?iJVN@!(tNB@70B=AA0>N<+>u|$be2jEIvCm zKff@e)E(Q!g)Lc51sj;MFkQz=+GfVKQnnLp8X7Q^W6C0D7mTnmGGKfwpBiXfNx6Oa zju1xz{;}#(fBfKv$nsSddq`TGH%7g?!$6i*2jPaL_v+0ib#H*tBLiqgFN)#a(*?Jf z$p!m^{4^W=q55V%n0rWW`HBjM>C+~Dkqf0RRzP{VW#0}@R&cK}z5eQbF;|b0N?I&W zV=3frKJ*OG_KYtqC1%Abs5lvvt5$y=ogQ^e@q!Qf9M#=6l`{TxraJ^r*4&#~WOOg#kYeVv|ZG)B*D zD!)hFp$>lNy^8qnsOLTxy2P)*L1>rpSi}?Ho+pe2(ISIWm z=Sv&_?@7J^?taI|o_RO`X85Cc?~{JU$1sH^rgun^bNe;S-@^g$xL5z4k0E5k9vqYW z8~E?L?_&tT<-`GSN%Eh-T@SE69u9y%miz_DANd$exIJ+Iyd(J>_QQpzKDI0Q4!C=R zu_uy01)u(fSXY`4aSwRkBR>mngI^^NfIB7sh3xM!M(@ZZ|1a@<#taYlfP)_XHaP0x z0C-L^T<{W;Jc*aK_D#m#l>8HL7he7{lK%+q!)7RH`fHN^3Hhnd7~7ECh8NqDnB1m^ z1K|6TRs08%-++GjOCQ^oTmYY9ezxo30C-RGuOM$f?q~n-Z~**F^4}mI?e()xtP`yl z@j0*|S;Y@Z?!!CiORxIbvSb_V3&Pn8$}#b}-=yVoI>vo@1$jB%i)-#JcjS0X8-E?+ zCq4c%$c+kpYj>$H5%fE05akD`UG$|O%L&;INjq{+Qrg2*aLG7tdzijI21tlPauP*R zsVJs<3>C%8oEluxctc}sGhUSamd4u}?`Zs3<2{Y{HRhARn2OiwnJ;w>jfXUjYOJo4 z^0%zXOB!!zys7b)#@ib2X#7~?J&pG@#*O)G~Up7Q{yd-w>93;__4-&8t-e| z(b>f!F20)>zW8#H>BN-?2mYo*L-hTsK2%vGms`!7Rx0J-bN7%h>&BZBJ}+JCCW};V z=Xoogv-#=iA#+?bBg3K{rhml!pCti%IkhHFf2Nl*lzXD7|H@;~#5C5pLkdx}uF>3~ z>+db}|C+C|SNCUS)22GIJ%g71mlBn%n%z#(lMVl0B+XOVt8tnp#n;#1fgDhJ!dt^$ z@%$0?kEFeNUMPKi{W8*C$yK{6?Juf?r9fZ56vhvm{}Y+GA?=Q%rRR`xr{+cf>(X-~ zlNK_dq6O5D`#dcEGZ{dgD{@DAdi?VkJ2wBkh-L$|UUudF)31x3>pQTJx2C!+b=qY7 Q9XU#NcwMQ-@0H{K0|&QPE&u=k literal 7368 zcmbtYZ){W76~7LFBr5?z9LOl$9?(S&NlZvXni4>U&NM-RSc$11rF(Jgm&C)d9lvLk zK&xfdwxX(&(rKH}gw$wb6PkvWO-P`rQlg3vZ0ZLz^#dQeDVzGCQY$UeIt^(^?04=x z*M7OAY}&4T{X4&N&OP_ubN{?&e=FL5ydn?~x&*{OMVV=>5SPDPmLpP(h+23iwpL1( zs8jaEdWBW%gxCbJZO<*Kn5y5XFjy;b9L=gB4RlH>PFMa`eD6;}_}h2E)VAl>(qXDT zDDm{V8=?i0-|vX@`z8Bzv)^?=BfEuhP)Qzq@JQk3j4Ox|bJ=NYduUSa$p;~eALjF%a|!#KrGu8lso+y`o%Ft#YWR}m191uB z{D}_st19h=9-nMh>UXL^ZM1QOqY+JQ zsZjl?@PhPPpc#m&cFExo$6b9MrW({aDfh4Pl?d*Z4XJ+%QljB2l54+9Cse)@!5xO* zHp5S?Ln64>@M|*s)Osd@9fn`n@KgI>DoCeQ2=|2ACsV=anV;o(s&na{7-s$g+h1XR zkh$7#Qw=?e55Fqy>4rm+pCA5)?}_m{gqZIocW2FAaglBN0l% zw$$I$cz+6lQiW|1!gw30k*|CgGO13>TNC5{g)&xQ+mip{&zPe>YmN4cqso2@!=^YC z4_yhfAML06YY+&CGQ;Dcypec`^GUZi#rYBHe^XBO#l^dy21Fw}Uqg7Red;)pwt{-( z?gEr$t3=gcs6W!vNk=-VXg|deL!D+xJ&{R%sRDQ=qy*SRvFJ>2Xk7E^>U7M;afE`ya|s9RxmP zxdB*#!JwFHD%;a>qxMv}NIk6y>C{s}S}JI5H>+6HJ0Lf3TI3!`T}IynzN{fA=AEHLVZy=Z+aTC5+Pvdbn6a5f&5pJ?=WPV9~GNr3-ScZ5y%CW zMs&wPe3h;%*`%{lpCrY%5T$d$mG-m z4wRSQPH-v~>QSG54=GWoy+~(0VcSOs2ale$hx&%2cKmECYTF`T$c;L7D&Zx>gq?C+ z58&pCg`@)kMlUBWblGkpnMkDyB&J+Xidkpc>xF6}RftY|{Z3XCaz!tl9golC9pO6T zlTOxi`<*dQWD>3y&3c6yF_uY;yTVJ1mmDytVz($}-Sl|YNmL;;t6QE*(>PLHLtDQ8*~ zrtPVsQ*>-aoz>Q)H!TAq{5T0Bye}+2$o#&^L^>Nzikb_#bjsSR`?3yOA*E>!(b$_2 zqwR>c_W0eu*tJJkDCsdPREp(z6IP40zu7us>0r$zFBsd7@W!xq9kxdEWA=oX%etXx zyw{G|eX*Cjn#=B`py~XDt}l*C*0v+Gqp+JOY+o!kI1nF@;RXjz$D`$tZUH0kZ8hco za+f_c_~Kw(ZR^fvYu`QuF|3|0c{&CrZd>;Sm?=QekRm}wqbp2TCFBdk~&7M7G>sWfAm!Ilbh4wD>qimXsY^= zrxL^RuZ0Rt_VQ_ki%P6Z2WFKAAL5cnYPAQSZJ1@YnU*7z&b;<0DatmAjcB)$qU<9t7;TrNY{RvqEfAG{ELRBOLa@?@+k33^p_50?*hA5mPDW3~ z`}>CCHgK>nIs{Woi>3D9W9Nkq3AVPIzUbt}LcFoCHpofyV|rSUH4RAgntFZTHCubF zP+A?KHBTpv3m4}aJ5y)#mGrCFUvfoPsaB7?)FE$Ix$%)VI_bFf>`X3qzL>Y2j58T( zGVVl##}?#O4o!jn=W$VE z-`E84kUHPZEiFBG#ZwQVOgiagT}Om5YgcH7M=8ArTl7ptp?3tTJMqMBmrrf=j&uVL zXMAL_@J;Viq}wkwLi`5u9n_|C%Afe(QFo6hU*bFP(omCE&vic!AqMdX?3-Q_qSwc3 zz+=pFuy21`z5kLw@fz?6w*MG>^HueJOLpQl;B#z$AAHkoA+Gp%4fu8D8!_MQchoqj zeJl7=cQGEcBi;+X{sX*sF@KZzJ@^C3{ywnrLyXVI>%b?N{}s6Nz7V&WSK>ux=SRpF zklHr^8$U+Ae7p|4k@<1pj!%$3AFl@wG5;oT=Vw?qA8!EfX8r@nj7|I`iY;wfG^p z!2HkPHO~gbisAo&IhIY9YWPw9bi8)vU3mGeZ4ZbT^Bj0hXF!ZF*X^g7f1B*x0Wr_~ z58!o&0%D1I72ZW_BLQ)j`P0Pv17ewZ3%#qJ!9e~)Stqaoj8c$%os8u-ooD?#=Ik$T zus(#o{pBL-X^j5T0kYNC?+V*@LtiJ#&K5W7lX@h8i|)RgU1^q>uAtme(VTMU=T~{E z)Smor`gk>XD>`#{Gu|llYt8>Q&)?#9G=IA9_4xJtDg0Xf zpW{FEVn$-j{wd~K{omL9b9?Ha{LKF8-}-g^k8*%n)KYoC`_Jh9v9S)L1t0lRTM`y% RVNvcYknJd?x_=HY{teM&HADaa diff --git a/pkg/client/client.go b/pkg/client/client.go index ddde3f4..0175ad0 100755 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -7,11 +7,12 @@ package client import ( "net" + "time" - "github.com/nttcom/fluvia/pkg/packet/ipfix" + "github.com/nttcom/fluvia/pkg/ipfix" ) -func New(ingressIfName string, raddr *net.UDPAddr) ClientError { +func New(ingressIfName string, raddr *net.UDPAddr, interval int) ClientError { ch := make(chan []ipfix.FieldValue) errChan := make(chan ClientError) @@ -25,7 +26,18 @@ func New(ingressIfName string, raddr *net.UDPAddr) ClientError { } } }() - go NewMeter(ingressIfName, ch) + + m := NewMeter(ingressIfName) + go func() { + err := m.Run(ch, time.Duration(interval)) + if err != nil { + errChan <- ClientError{ + Component: "meter", + Error: err, + } + } + m.Close() + }() for { clientError := <-errChan diff --git a/pkg/client/exporter.go b/pkg/client/exporter.go index 69a41e4..fa2fdfb 100644 --- a/pkg/client/exporter.go +++ b/pkg/client/exporter.go @@ -10,7 +10,7 @@ import ( "net" "os" - "github.com/nttcom/fluvia/pkg/packet/ipfix" + "github.com/nttcom/fluvia/pkg/ipfix" ) const OBSERVATION_ID uint32 = 61166 diff --git a/pkg/client/meter.go b/pkg/client/meter.go index fe79628..6cbffce 100644 --- a/pkg/client/meter.go +++ b/pkg/client/meter.go @@ -7,91 +7,240 @@ package client import ( - "fmt" + "bytes" + "context" + "encoding/binary" + "errors" "log" "net" "net/netip" + "sync" "time" + "unsafe" - "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf" "github.com/nttcom/fluvia/pkg/bpf" - "github.com/nttcom/fluvia/pkg/packet/ipfix" + "github.com/nttcom/fluvia/pkg/ipfix" + "github.com/nttcom/fluvia/pkg/packet" + "golang.org/x/sync/errgroup" + "golang.org/x/sys/unix" ) -func NewMeter(ingressIfName string, ch chan []ipfix.FieldValue) { +type Stats struct { + Count int64 + DelayMean int64 + DelayMin int64 + DelayMax int64 + DelaySum int64 +} + +type StatsMap struct { + Mu sync.RWMutex + Db map[packet.ProbeData]*Stats +} + +type Meter struct { + statsMap *StatsMap + bootTime time.Time + xdp *bpf.Xdp +} + +func NewMeter(ingressIfName string) *Meter { + bootTime, err := getSystemBootTime() + if err != nil { + log.Fatalf("Could not get boot time: %s", err) + } + + statsMap := StatsMap{Db: make(map[packet.ProbeData]*Stats)} + iface, err := net.InterfaceByName(ingressIfName) if err != nil { log.Fatalf("lookup network iface %q: %s", ingressIfName, err) } // Load the XDP program - objs, err := bpf.ReadXdpObjects(nil) + xdp, err := bpf.ReadXdpObjects(&ebpf.CollectionOptions{ + Programs: ebpf.ProgramOptions{ + LogLevel: ebpf.LogLevelInstruction, + LogSize: ebpf.DefaultVerifierLogSize * 256, + }, + }) if err != nil { - log.Fatalf("Could not load XDP program: %s", err) + var ve *ebpf.VerifierError + if errors.As(err, &ve) { + log.Fatalf("Could not load XDP program: %+v\n", ve) + } } - defer objs.Close() // Attach the XDP program. - l, err := link.AttachXDP(link.XDPOptions{ - Program: objs.XdpProg, - Interface: iface.Index, - Flags: link.XDPGenericMode, - }) - if err != nil { + if err = xdp.Attach(iface); err != nil { log.Fatalf("Could not attach XDP program: %s", err) } - defer l.Close() log.Printf("Attached XDP program to iface %q (index %d)", iface.Name, iface.Index) log.Printf("Press Ctrl-C to exit and remove the program") - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - mapLogs := map[bpf.XdpProbeData]uint64{} - for range ticker.C { - var entry bpf.XdpProbeData - var count uint64 + return &Meter{ + statsMap: &statsMap, + bootTime: bootTime, + xdp: xdp, + } +} + +func (m *Meter) Run(flowChan chan []ipfix.FieldValue, interval time.Duration) error { + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + return m.Read(ctx) + }) + eg.Go(func() error { + return m.Send(ctx, flowChan, interval) + }) + + if err := eg.Wait(); err != nil { + return err + } + + return nil +} + +func (m *Meter) Read(ctx context.Context) error { + perfEvent, err := m.xdp.NewPerfReader() + if err != nil { + log.Fatalf("Could not obtain perf reader: %s", err) + } + + var metadata bpf.XdpMetaData + for { + select { + case <-ctx.Done(): + return nil + default: + eventData, err := perfEvent.Read() + if err != nil { + log.Fatalf("Could not read from bpf perf map:") + } - iter := objs.IpfixProbeMap.Iterate() + reader := bytes.NewReader(eventData.RawSample) - for iter.Next(&entry, &count) { - if _, ok := mapLogs[entry]; !ok { - mapLogs[entry] = 0 + if err := binary.Read(reader, binary.LittleEndian, &metadata); err != nil { + log.Fatalf("Could not read from reader: %s", err) } - dCnt := uint64(count - mapLogs[entry]) + metadata_size := unsafe.Sizeof(metadata) + if len(eventData.RawSample)-int(metadata_size) <= 0 { + continue + } - mapLogs[entry] = count + receivedNano := m.bootTime.Add(time.Duration(metadata.ReceivedNano) * time.Nanosecond) + SentNano := time.Unix(int64(metadata.SentSec), int64(metadata.SentSubsec)) - sl := []ipfix.SRHSegmentIPv6{} - for _, binSeg := range entry.Segments { - ipSeg, _ := netip.AddrFromSlice(binSeg.In6U.U6Addr8[:]) + delay := receivedNano.Sub(SentNano) - // Ignore zero values received from bpf map - if ipSeg == netip.IPv6Unspecified() { - break - } - seg := ipfix.SRHSegmentIPv6{Val: ipSeg} - sl = append(sl, seg) + probeData, err := packet.Parse(eventData.RawSample[metadata_size:]) + if err != nil { + log.Fatalf("Could not parse the packet: %s", err) } - actSeg, _ := netip.AddrFromSlice(entry.Segments[entry.SegmentsLeft].In6U.U6Addr8[:]) - - f := []ipfix.FieldValue{ - &ipfix.PacketDeltaCount{Val: dCnt}, - &ipfix.SRHActiveSegmentIPv6{Val: actSeg}, - &ipfix.SRHSegmentsIPv6Left{Val: entry.SegmentsLeft}, - &ipfix.SRHFlagsIPv6{Val: entry.Flags}, - &ipfix.SRHTagIPv6{Val: entry.Tag}, - &ipfix.SRHSegmentIPv6BasicList{ - SegmentList: sl, - }, + delayMicro := delay.Microseconds() + + m.statsMap.Mu.Lock() + if value, ok := m.statsMap.Db[*probeData]; !ok { + m.statsMap.Db[*probeData] = &Stats{ + Count: 1, + DelayMean: delayMicro, + DelayMin: delayMicro, + DelayMax: delayMicro, + DelaySum: delayMicro, + } + } else { + value.Count = value.Count + 1 + + if delayMicro < value.DelayMin { + value.DelayMin = delayMicro + } + + if delayMicro > value.DelayMax { + value.DelayMax = delayMicro + } + + value.DelaySum = value.DelaySum + delayMicro + value.DelayMean = value.DelaySum / value.Count } - // Throw to channel - ch <- f + m.statsMap.Mu.Unlock() } - if err := iter.Err(); err != nil { - fmt.Printf("Failed to iterate map: %v\n", err) + } +} + +func (m *Meter) Send(ctx context.Context, flowChan chan []ipfix.FieldValue, intervalSec time.Duration) error { + ticker := time.NewTicker(intervalSec * time.Second) + defer ticker.Stop() + + for range ticker.C { + select { + case <-ctx.Done(): + return nil + default: + m.statsMap.Mu.Lock() + for probeData, stat := range m.statsMap.Db { + dCnt := uint64(stat.Count) + + sl := []ipfix.SRHSegmentIPv6{} + for _, seg := range probeData.Segments { + if seg == "" { + break + } + ipSeg, _ := netip.ParseAddr(seg) + + // Ignore zero values received from bpf map + if ipSeg == netip.IPv6Unspecified() { + break + } + seg := ipfix.SRHSegmentIPv6{Val: ipSeg} + sl = append(sl, seg) + } + + actSeg, _ := netip.ParseAddr(probeData.Segments[probeData.SegmentsLeft]) + + f := []ipfix.FieldValue{ + &ipfix.PacketDeltaCount{Val: dCnt}, + &ipfix.SRHActiveSegmentIPv6{Val: actSeg}, + &ipfix.SRHSegmentsIPv6Left{Val: probeData.SegmentsLeft}, + &ipfix.SRHFlagsIPv6{Val: probeData.Flags}, + &ipfix.SRHTagIPv6{Val: probeData.Tag}, + &ipfix.SRHSegmentIPv6BasicList{ + SegmentList: sl, + }, + &ipfix.PathDelayMeanDeltaMicroseconds{Val: uint32(stat.DelayMean)}, + &ipfix.PathDelayMinDeltaMicroseconds{Val: uint32(stat.DelayMin)}, + &ipfix.PathDelayMaxDeltaMicroseconds{Val: uint32(stat.DelayMax)}, + &ipfix.PathDelaySumDeltaMicroseconds{Val: uint32(stat.DelaySum)}, + } + // Throw to channel + flowChan <- f + + // Stats (e.g., DelayMean) are based on packets received over a fixed duration + // These need to be cleared out for the next calculation of statistics + delete(m.statsMap.Db, probeData) + } + m.statsMap.Mu.Unlock() } } + + return nil +} + +func (m *Meter) Close() error { + if err := m.xdp.Close(); err != nil { + return err + } + + return nil +} + +func getSystemBootTime() (time.Time, error) { + var ts unix.Timespec + if err := unix.ClockGettime(unix.CLOCK_MONOTONIC, &ts); err != nil { + return time.Time{}, err + } + return time.Now().Add(-time.Duration(ts.Nano())), nil }