Skip to content

Commit

Permalink
Network metrics: filter by transport protocol (#752)
Browse files Browse the repository at this point in the history
* move transport.Protocol to its own package

* protocol filter node

* Integrate protocol filter into pipeline

* optimized and cleaned up allower/excluder interfacing
  • Loading branch information
mariomac authored Apr 18, 2024
1 parent 1a6538e commit 8236939
Show file tree
Hide file tree
Showing 10 changed files with 381 additions and 77 deletions.
5 changes: 5 additions & 0 deletions pkg/beyla/network_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type NetworkConfig struct {
// If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression,
// otherwise it will be matched as a case-sensitive string.
ExcludeInterfaces []string `yaml:"exclude_interfaces" env:"BEYLA_NETWORK_EXCLUDE_INTERFACES" envSeparator:","`
// Protocols causes Beyla to drop flows whose transport protocol is not in this list.
Protocols []string `yaml:"protocols" env:"BEYLA_NETWORK_PROTOCOLS" envSeparator:","`
// ExcludeProtocols causes Beyla to drop flows whose transport protocol is in this list.
// If the Protocols list is already defined, ExcludeProtocols has no effect.
ExcludeProtocols []string `yaml:"exclude_protocols" env:"BEYLA_NETWORK_EXCLUDE_PROTOCOLS" envSeparator:","`
// CacheMaxFlows specifies how many flows can be accumulated in the accounting cache before
// being flushed for its later export. Default value is 5000.
// Decrease it if you see the "received message larger than max" error in Beyla logs.
Expand Down
20 changes: 13 additions & 7 deletions pkg/internal/netolly/agent/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ type FlowsPipeline struct {
MapTracer pipe.Start[[]*ebpf.Record]
RingBufTracer pipe.Start[[]*ebpf.Record]

Deduper pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
Kubernetes pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
ReverseDNS pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
CIDRs pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
Decorator pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
ProtoFilter pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
Deduper pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
Kubernetes pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
ReverseDNS pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
CIDRs pipe.Middle[[]*ebpf.Record, []*ebpf.Record]
Decorator pipe.Middle[[]*ebpf.Record, []*ebpf.Record]

OTEL pipe.Final[[]*ebpf.Record]
Prom pipe.Final[[]*ebpf.Record]
Expand All @@ -33,9 +34,10 @@ type FlowsPipeline struct {

// Connect specifies how the pipeline nodes are connected
func (fp *FlowsPipeline) Connect() {
fp.MapTracer.SendTo(fp.Deduper)
fp.RingBufTracer.SendTo(fp.Deduper)
fp.MapTracer.SendTo(fp.ProtoFilter)
fp.RingBufTracer.SendTo(fp.ProtoFilter)

fp.ProtoFilter.SendTo(fp.Deduper)
fp.Deduper.SendTo(fp.Kubernetes)
fp.Kubernetes.SendTo(fp.ReverseDNS)
fp.ReverseDNS.SendTo(fp.CIDRs)
Expand All @@ -48,6 +50,7 @@ func (fp *FlowsPipeline) Connect() {
func mapTracer(fp *FlowsPipeline) *pipe.Start[[]*ebpf.Record] { return &fp.MapTracer }
func ringBufTracer(fp *FlowsPipeline) *pipe.Start[[]*ebpf.Record] { return &fp.RingBufTracer }

func prtFltr(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.ProtoFilter }
func deduper(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.Deduper }
func kube(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.Kubernetes }
func rdns(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.ReverseDNS }
Expand Down Expand Up @@ -79,6 +82,9 @@ func (f *Flows) buildPipeline(ctx context.Context) (*pipe.Runner, error) {
// Middle nodes: transforming flow records and passing them to the next stage in the pipeline.
// Many of the nodes here are not mandatory. It's decision of each Provider function to decide
// whether the node needs to be instantiated or just bypassed.
pipe.AddMiddleProvider(pb, prtFltr,
flow.ProtocolFilterProvider(f.cfg.NetworkFlows.Protocols, f.cfg.NetworkFlows.ExcludeProtocols))

pipe.AddMiddleProvider(pb, deduper, func() (pipe.MiddleFunc[[]*ebpf.Record, []*ebpf.Record], error) {
var deduperExpireTime = f.cfg.NetworkFlows.DeduperFCTTL
if deduperExpireTime <= 0 {
Expand Down
68 changes: 2 additions & 66 deletions pkg/internal/netolly/export/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/grafana/beyla/pkg/internal/netolly/ebpf"
"github.com/grafana/beyla/pkg/internal/netolly/flow/transport"
)

// Attribute stores how to expose a metric attribute: its exposed name and how to
Expand Down Expand Up @@ -50,7 +51,7 @@ func attributeFor(exposedName, internalName string) Attribute {
case "beyla.ip":
getter = func(r *ebpf.Record) string { return r.Attrs.BeylaIP }
case "transport":
getter = func(r *ebpf.Record) string { return l4TransportStr(r.Id.TransportProtocol) }
getter = func(r *ebpf.Record) string { return transport.Protocol(r.Id.TransportProtocol).String() }
case "src.address":
getter = func(r *ebpf.Record) string { return r.Id.SrcIP().IP().String() }
case "dst.address":
Expand Down Expand Up @@ -83,68 +84,3 @@ func directionStr(direction uint8) string {
return ""
}
}

// values taken from the list of "Standard well-defined IP protocols" from uapi/linux/in.h
// nolint:cyclop
func l4TransportStr(proto uint8) string {
switch proto {
case 0:
return "IP"
case 1:
return "ICMP"
case 2:
return "IGMP"
case 4:
return "IPIP"
case 6:
return "TCP"
case 8:
return "EGP"
case 12:
return "PUP"
case 17:
return "UDP"
case 22:
return "IDP"
case 29:
return "TP"
case 33:
return "DCCP"
case 41:
return "IPV6"
case 46:
return "RSVP"
case 47:
return "GRE"
case 50:
return "ESP"
case 51:
return "AH"
case 92:
return "MTP"
case 94:
return "BEETPH"
case 98:
return "ENCAP"
case 103:
return "PIM"
case 108:
return "COMP"
case 115:
return "L2TP"
case 132:
return "SCTP"
case 136:
return "UDPLITE"
case 137:
return "MPLS"
case 143:
return "ETHERNET"
case 255:
return "RAW"
// TODO: consider adding an extra byte to TransportProtocol to support this protocol
// case 262:
// return "MPTCP"
}
return strconv.Itoa(int(proto))
}
101 changes: 101 additions & 0 deletions pkg/internal/netolly/flow/protocol_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package flow

import (
"fmt"

"github.com/mariomac/pipes/pipe"

"github.com/grafana/beyla/pkg/internal/netolly/ebpf"
"github.com/grafana/beyla/pkg/internal/netolly/flow/transport"
)

// ProtocolFilterProvider allows selecting which protocols are going to be instrumented.
// It drops any flow not appearing in the "allowed" list.
// If the Allowed list is empty, it drops any flow appearing in the "excluded" list.
func ProtocolFilterProvider(allowed, excluded []string) pipe.MiddleProvider[[]*ebpf.Record, []*ebpf.Record] {
return func() (pipe.MiddleFunc[[]*ebpf.Record, []*ebpf.Record], error) {
if len(allowed) == 0 && len(excluded) == 0 {
// user did not configure any filter. Ignore this node
return pipe.Bypass[[]*ebpf.Record](), nil
}
pf, err := newFilter(allowed, excluded)
if err != nil {
return nil, err
}
return pf.nodeLoop, nil
}
}

type protocolFilter struct {
isAllowed func(r *ebpf.Record) bool
}

func newFilter(allowed, excluded []string) (*protocolFilter, error) {
// if the allowed list has items, only interfaces in that list are allowed
if len(allowed) > 0 {
allow, err := allower(allowed)
if err != nil {
return nil, err
}
return &protocolFilter{isAllowed: allow}, nil
}
// if the allowed list is empty, any interface is allowed except if it matches the exclusion list
exclude, err := excluder(excluded)
if err != nil {
return nil, err
}
return &protocolFilter{isAllowed: exclude}, nil
}

func (pf *protocolFilter) nodeLoop(in <-chan []*ebpf.Record, out chan<- []*ebpf.Record) {
for records := range in {
if filtered := pf.filter(records); len(filtered) > 0 {
out <- filtered
}
}
}

func (pf *protocolFilter) filter(input []*ebpf.Record) []*ebpf.Record {
writeIdx := 0
for readIdx := range input {
if pf.isAllowed(input[readIdx]) {
input[writeIdx] = input[readIdx]
writeIdx++
}
}
return input[:writeIdx]
}

func allower(allowed []string) (func(r *ebpf.Record) bool, error) {
allow, err := protocolsMap(allowed)
if err != nil {
return nil, fmt.Errorf("in network protocols: %w", err)
}
return func(r *ebpf.Record) bool {
_, ok := allow[transport.Protocol(r.Id.TransportProtocol)]
return ok
}, nil
}

func excluder(excluded []string) (func(r *ebpf.Record) bool, error) {
exclude, err := protocolsMap(excluded)
if err != nil {
return nil, fmt.Errorf("in network excluded protocols: %w", err)
}
return func(r *ebpf.Record) bool {
_, excluded := exclude[transport.Protocol(r.Id.TransportProtocol)]
return !excluded
}, nil
}

func protocolsMap(entries []string) (map[transport.Protocol]struct{}, error) {
protoMap := map[transport.Protocol]struct{}{}
for _, aStr := range entries {
if atp, err := transport.ParseProtocol(aStr); err == nil {
protoMap[atp] = struct{}{}
} else {
return nil, err
}
}
return protoMap, nil
}
79 changes: 79 additions & 0 deletions pkg/internal/netolly/flow/protocol_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package flow

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/beyla/pkg/internal/netolly/ebpf"
"github.com/grafana/beyla/pkg/internal/netolly/flow/transport"
"github.com/grafana/beyla/pkg/internal/testutil"
)

var tcp1 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 1, TransportProtocol: uint8(transport.TCP)}}}
var tcp2 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 2, TransportProtocol: uint8(transport.TCP)}}}
var tcp3 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 3, TransportProtocol: uint8(transport.TCP)}}}
var udp1 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 4, TransportProtocol: uint8(transport.UDP)}}}
var udp2 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 5, TransportProtocol: uint8(transport.UDP)}}}
var icmp1 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 7, TransportProtocol: uint8(transport.ICMP)}}}
var icmp2 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 8, TransportProtocol: uint8(transport.ICMP)}}}
var icmp3 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 9, TransportProtocol: uint8(transport.ICMP)}}}

func TestProtocolFilter_Allow(t *testing.T) {
protocolFilter, err := ProtocolFilterProvider([]string{"TCP"}, nil)()
require.NoError(t, err)
input, output := make(chan []*ebpf.Record, 10), make(chan []*ebpf.Record, 10)
defer close(input)
go protocolFilter(input, output)

input <- []*ebpf.Record{}
input <- []*ebpf.Record{tcp1, tcp2, tcp3}
input <- []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3}
input <- []*ebpf.Record{icmp2, tcp1, udp1, icmp1, tcp2, udp2, tcp3, icmp3}

filtered := testutil.ReadChannel(t, output, timeout)
assert.Equal(t, []*ebpf.Record{tcp1, tcp2, tcp3}, filtered)
filtered = testutil.ReadChannel(t, output, timeout)
assert.Equal(t, []*ebpf.Record{tcp1, tcp2, tcp3}, filtered)
// no more slices are sent (the second was completely filtered)
select {
case o := <-output:
require.Failf(t, "unexpected flows!", "%v", o)
default:
// ok!!
}
}

func TestProtocolFilter_Exclude(t *testing.T) {
protocolFilter, err := ProtocolFilterProvider(nil, []string{"TCP"})()
require.NoError(t, err)
input, output := make(chan []*ebpf.Record, 10), make(chan []*ebpf.Record, 10)
defer close(input)
go protocolFilter(input, output)

input <- []*ebpf.Record{tcp1, tcp2, tcp3}
input <- []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3}
input <- []*ebpf.Record{}
input <- []*ebpf.Record{icmp2, tcp1, udp1, icmp1, tcp2, udp2, tcp3, icmp3}

filtered := testutil.ReadChannel(t, output, timeout)
assert.Equal(t, []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3}, filtered)
filtered = testutil.ReadChannel(t, output, timeout)
assert.Equal(t, []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3}, filtered)
// no more slices are sent (the first was completely filtered)
select {
case o := <-output:
require.Failf(t, "unexpected flows!", "%v", o)
default:
// ok!!
}
}
func TestProtocolFilter_ParsingErrors(t *testing.T) {
_, err := ProtocolFilterProvider([]string{"TCP", "tralara"}, nil)()
assert.Error(t, err)
_, err = ProtocolFilterProvider([]string{"TCP", "tralara"}, []string{"UDP"})()
assert.Error(t, err)
_, err = ProtocolFilterProvider(nil, []string{"TCP", "tralara"})()
assert.Error(t, err)
}
Loading

0 comments on commit 8236939

Please sign in to comment.