Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing network metrics' bytes and adding some useful fields #641

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/beyla/network_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package beyla

import (
"time"

"github.com/grafana/beyla/pkg/internal/netolly/flow"
)

type NetworkConfig struct {
Expand Down Expand Up @@ -60,6 +62,7 @@ type NetworkConfig struct {
// When enabled, it will detect duplicate flows (flows that have been detected e.g. through
// both the physical and a virtual interface).
// "firstCome" will forward only flows from the first interface the flows are received from.
// Default value: firstCome
Deduper string `yaml:"deduper" env:"BEYLA_NETWORK_DEDUPER"`
// DeduperFCExpiry specifies the expiry duration of the flows "firstCome" deduplicator. After
// a flow hasn't been received for that expiry time, the deduplicator forgets it. That means
Expand Down Expand Up @@ -92,7 +95,7 @@ var defaultNetworkConfig = NetworkConfig{
ExcludeInterfaces: []string{"lo"},
CacheMaxFlows: 5000,
CacheActiveTimeout: 5 * time.Second,
Deduper: "none",
Deduper: flow.DeduperFirstCome,
DeduperJustMark: false,
Direction: "both",
ListenInterfaces: "watch",
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/netolly/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[NetFlowId][]NetFlowMetrics {
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metrics) {
//if err := flowMap.Delete(id); err != nil {
// tlog().Warn("couldn't delete flow entry", "flowId", id)
//}
if err := flowMap.Delete(id); err != nil {
tlog().Warn("couldn't delete flow entry", "flowId", id)
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
Expand Down
25 changes: 23 additions & 2 deletions pkg/internal/netolly/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package export

import (
"context"
"fmt"
"log/slog"
"time"

Expand All @@ -19,6 +20,12 @@ import (
"github.com/grafana/beyla/pkg/internal/netolly/transform/k8s"
)

const (
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
directionIngress = 0
directionEgress = 1
)

type MetricsConfig struct {
Metrics *otel.MetricsConfig
}
Expand Down Expand Up @@ -68,14 +75,17 @@ func destinationAttrs(m *ebpf.Record) (namespace, name string) {
}

func attributes(m *ebpf.Record) []attribute.KeyValue {
res := make([]attribute.KeyValue, 0, 8+len(m.Metadata))
res := make([]attribute.KeyValue, 0, 11+len(m.Metadata))

srcNS, srcName := sourceAttrs(m)
dstNS, dstName := destinationAttrs(m)

// this will cause cardinality explosion. Discuss what to do
//res = append(res, attribute.Int("dst.port", int(m.Id.DstPort)))
res = append(res,
attribute.String("beyla.ip", m.AgentIP),
attribute.String("iface", m.Interface),
attribute.String("direction", directionStr(m.Id.Direction)),
attribute.String("src.address", m.Id.SrcIP().IP().String()),
attribute.String("dst.address", m.Id.DstIP().IP().String()),
attribute.String("src.name", srcName),
Expand All @@ -94,9 +104,20 @@ func attributes(m *ebpf.Record) []attribute.KeyValue {
return res
}

// TODO: merge with AppO11y's otel.Exporter
func directionStr(direction uint8) string {
switch direction {
case directionIngress:
return "ingress"
case directionEgress:
return "egress"
}
// should never happen. Logging received value in case of bug
return fmt.Sprint(direction)
}

func MetricsExporterProvider(cfg MetricsConfig) (node.TerminalFunc[[]*ebpf.Record], error) {
log := mlog()
log.Debug("instantiating network metrics exporter provider")
exporter, err := otel.InstantiateMetricsExporter(context.Background(), cfg.Metrics, log)
if err != nil {
log.Error("", "error", err)
Expand Down
Loading