Skip to content

Commit

Permalink
Merge branch 'grafana:main' into helm-chart
Browse files Browse the repository at this point in the history
  • Loading branch information
Khushi Jain authored Feb 23, 2024
2 parents a901739 + 30146a3 commit e037188
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
5 changes: 4 additions & 1 deletion pkg/beyla/network_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package beyla

import (
"time"

"github.com/grafana/beyla/pkg/internal/netolly/flow"
)

type NetworkConfig struct {
Expand Down Expand Up @@ -60,6 +62,7 @@ type NetworkConfig struct {
// When enabled, it will detect duplicate flows (flows that have been detected e.g. through
// both the physical and a virtual interface).
// "firstCome" will forward only flows from the first interface the flows are received from.
// Default value: firstCome
Deduper string `yaml:"deduper" env:"BEYLA_NETWORK_DEDUPER"`
// DeduperFCExpiry specifies the expiry duration of the flows "firstCome" deduplicator. After
// a flow hasn't been received for that expiry time, the deduplicator forgets it. That means
Expand Down Expand Up @@ -92,7 +95,7 @@ var defaultNetworkConfig = NetworkConfig{
ExcludeInterfaces: []string{"lo"},
CacheMaxFlows: 5000,
CacheActiveTimeout: 5 * time.Second,
Deduper: "none",
Deduper: flow.DeduperFirstCome,
DeduperJustMark: false,
Direction: "both",
ListenInterfaces: "watch",
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/netolly/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[NetFlowId][]NetFlowMetrics {
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metrics) {
//if err := flowMap.Delete(id); err != nil {
// tlog().Warn("couldn't delete flow entry", "flowId", id)
//}
if err := flowMap.Delete(id); err != nil {
tlog().Warn("couldn't delete flow entry", "flowId", id)
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
Expand Down
25 changes: 23 additions & 2 deletions pkg/internal/netolly/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package export

import (
"context"
"fmt"
"log/slog"
"time"

Expand All @@ -19,6 +20,12 @@ import (
"github.com/grafana/beyla/pkg/internal/netolly/transform/k8s"
)

const (
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
directionIngress = 0
directionEgress = 1
)

type MetricsConfig struct {
Metrics *otel.MetricsConfig
}
Expand Down Expand Up @@ -68,14 +75,17 @@ func destinationAttrs(m *ebpf.Record) (namespace, name string) {
}

func attributes(m *ebpf.Record) []attribute.KeyValue {
res := make([]attribute.KeyValue, 0, 8+len(m.Metadata))
res := make([]attribute.KeyValue, 0, 11+len(m.Metadata))

srcNS, srcName := sourceAttrs(m)
dstNS, dstName := destinationAttrs(m)

// this will cause cardinality explosion. Discuss what to do
//res = append(res, attribute.Int("dst.port", int(m.Id.DstPort)))
res = append(res,
attribute.String("beyla.ip", m.AgentIP),
attribute.String("iface", m.Interface),
attribute.String("direction", directionStr(m.Id.Direction)),
attribute.String("src.address", m.Id.SrcIP().IP().String()),
attribute.String("dst.address", m.Id.DstIP().IP().String()),
attribute.String("src.name", srcName),
Expand All @@ -94,9 +104,20 @@ func attributes(m *ebpf.Record) []attribute.KeyValue {
return res
}

// TODO: merge with AppO11y's otel.Exporter
func directionStr(direction uint8) string {
switch direction {
case directionIngress:
return "ingress"
case directionEgress:
return "egress"
}
// should never happen. Logging received value in case of bug
return fmt.Sprint(direction)
}

func MetricsExporterProvider(cfg MetricsConfig) (node.TerminalFunc[[]*ebpf.Record], error) {
log := mlog()
log.Debug("instantiating network metrics exporter provider")
exporter, err := otel.InstantiateMetricsExporter(context.Background(), cfg.Metrics, log)
if err != nil {
log.Error("", "error", err)
Expand Down

0 comments on commit e037188

Please sign in to comment.