diff --git a/NOTICE b/NOTICE index a25dd0413..269115732 100644 --- a/NOTICE +++ b/NOTICE @@ -31,6 +31,10 @@ The Initial Developer of some parts of the product, which are copied from, deriv inspired by the DataDog Agent (https://github.com/DataDog/datadog-agent). Copyright DataDog. +The Initial Developer of some parts of the product, which are copied from, derived from, or +inspired by the New Relic Infrastructure Agent (https://github.com/newrelic/infrastructure-agent). +Copyright New Relic. + Grafana Beyla uses third-party libraries or other resources that may be distributed under licenses different than the Grafana Beyla software. The licenses for these third-party libraries are listed in the attached third_party_licenses.csv file diff --git a/go.mod b/go.mod index 2ee0fd913..4cfb2b155 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/mariomac/guara v0.0.0-20230621100729-42bd7716e524 github.com/mariomac/pipes v0.10.0 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.6.0 github.com/prometheus/common v0.48.0 @@ -34,6 +35,7 @@ require ( go.opentelemetry.io/collector/config/configgrpc v0.97.0 go.opentelemetry.io/collector/config/confighttp v0.97.0 go.opentelemetry.io/collector/config/configopaque v1.4.0 + go.opentelemetry.io/collector/config/configretry v0.97.0 go.opentelemetry.io/collector/config/configtelemetry v0.97.0 go.opentelemetry.io/collector/config/configtls v0.97.0 go.opentelemetry.io/collector/consumer v0.97.0 @@ -124,7 +126,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/rs/cors v1.10.1 // indirect @@ -140,7 +141,6 @@ require ( go.opentelemetry.io/collector/config/configauth v0.97.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.4.0 // indirect go.opentelemetry.io/collector/config/confignet v0.97.0 // indirect - go.opentelemetry.io/collector/config/configretry v0.97.0 // indirect go.opentelemetry.io/collector/config/internal v0.97.0 // indirect go.opentelemetry.io/collector/confmap v0.97.0 // indirect go.opentelemetry.io/collector/extension v0.97.0 // indirect diff --git a/pkg/beyla/config.go b/pkg/beyla/config.go index 47de0fe38..31b3b99cd 100644 --- a/pkg/beyla/config.go +++ b/pkg/beyla/config.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/beyla/pkg/internal/export/prom" "github.com/grafana/beyla/pkg/internal/filter" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/infraolly/process" "github.com/grafana/beyla/pkg/internal/traces" "github.com/grafana/beyla/pkg/services" "github.com/grafana/beyla/pkg/transform" @@ -99,6 +100,10 @@ var DefaultConfig = Config{ }, Routes: &transform.RoutesConfig{}, NetworkFlows: defaultNetworkConfig, + Processes: process.CollectConfig{ + RunMode: process.RunModePrivileged, + Interval: 5 * time.Second, + }, } type Config struct { @@ -149,6 +154,10 @@ type Config struct { ProfilePort int `yaml:"profile_port" env:"BEYLA_PROFILE_PORT"` InternalMetrics imetrics.Config `yaml:"internal_metrics"` + // Processes metrics for application. They will be only enabled if there is a metrics exporter enabled, + // and both the "application" and "application_process" features are enabled + Processes process.CollectConfig `yaml:"processes"` + // Grafana Agent specific configuration TracesReceiver TracesReceiverConfig `yaml:"-"` } diff --git a/pkg/beyla/config_test.go b/pkg/beyla/config_test.go index 37011e95b..c38b51a6c 100644 --- a/pkg/beyla/config_test.go +++ b/pkg/beyla/config_test.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/export/prom" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/infraolly/process" "github.com/grafana/beyla/pkg/internal/netolly/transform/cidr" "github.com/grafana/beyla/pkg/internal/traces" "github.com/grafana/beyla/pkg/transform" @@ -165,6 +166,10 @@ network: CacheLen: 1024, CacheTTL: 5 * time.Minute, }, + Processes: process.CollectConfig{ + RunMode: process.RunModePrivileged, + Interval: 5 * time.Second, + }, }, cfg) } diff --git a/pkg/internal/discover/typer.go b/pkg/internal/discover/typer.go index 871e96a38..2aafe18b7 100644 --- a/pkg/internal/discover/typer.go +++ b/pkg/internal/discover/typer.go @@ -69,7 +69,11 @@ func (t *typer) FilterClassify(evs []Event[ProcessMatch]) []Event[Instrumentable ev := &evs[i] switch evs[i].Type { case EventCreated: - svcID := svc.ID{Name: ev.Obj.Criteria.Name, Namespace: ev.Obj.Criteria.Namespace} + svcID := svc.ID{ + Name: ev.Obj.Criteria.Name, + Namespace: ev.Obj.Criteria.Namespace, + ProcPID: ev.Obj.Process.Pid, + } if elfFile, err := exec.FindExecELF(ev.Obj.Process, svcID); err != nil { t.log.Warn("error finding process ELF. Ignoring", "error", err) } else { diff --git a/pkg/internal/ebpf/common/pids.go b/pkg/internal/ebpf/common/pids.go index 03aebe52a..941fca896 100644 --- a/pkg/internal/ebpf/common/pids.go +++ b/pkg/internal/ebpf/common/pids.go @@ -214,7 +214,7 @@ func serviceInfo(pid uint32) svc.ID { name := commName(pid) lang := exec.FindProcLanguage(int32(pid), nil) - result := svc.ID{Name: name, SDKLanguage: lang} + result := svc.ID{Name: name, SDKLanguage: lang, ProcPID: int32(pid)} activePids.Add(pid, result) diff --git a/pkg/internal/export/attributes/attr_defs.go b/pkg/internal/export/attributes/attr_defs.go index 06c800534..631a7cd1b 100644 --- a/pkg/internal/export/attributes/attr_defs.go +++ b/pkg/internal/export/attributes/attr_defs.go @@ -150,6 +150,20 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { }, } + var processAttributes = AttrReportGroup{ + Attributes: map[attr.Name]Default{ + attr.ProcCommand: true, + attr.ProcCPUState: true, + attr.ProcOwner: true, + attr.ProcParentPid: true, + attr.ProcPid: true, + attr.ProcCommandLine: false, + attr.ProcCommandArgs: false, + attr.ProcExecName: false, + attr.ProcExecPath: false, + }, + } + return map[Section]AttrReportGroup{ BeylaNetworkFlow.Section: { SubGroups: []*AttrReportGroup{&networkCIDR, &networkKubeAttributes}, @@ -223,6 +237,9 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { attr.DBQueryText: false, }, }, + ProcessCPUUtilization.Section: { + SubGroups: []*AttrReportGroup{&processAttributes}, + }, } } diff --git a/pkg/internal/export/attributes/metric.go b/pkg/internal/export/attributes/metric.go index ec6697d79..3b2d269cd 100644 --- a/pkg/internal/export/attributes/metric.go +++ b/pkg/internal/export/attributes/metric.go @@ -61,6 +61,11 @@ var ( Prom: "db_client_operation_duration_seconds", OTEL: "db.client.operation.duration", } + ProcessCPUUtilization = Name{ + Section: "process.cpu.utilization", + Prom: "process_cpu_utilization_ratio", + OTEL: "process.cpu.utilization", + } MessagingPublishDuration = Name{ Section: "messaging.publish.duration", Prom: "messaging_publish_duration_seconds", @@ -79,7 +84,7 @@ var ( // as long as the metric name is recorgnisable. func normalizeMetric(name Section) Section { nameStr := strings.ReplaceAll(string(name), "_", ".") - for _, suffix := range []string{".bucket", ".sum", ".count", ".total"} { + for _, suffix := range []string{".ratio", ".bucket", ".sum", ".count", ".total"} { if strings.HasSuffix(nameStr, suffix) { nameStr = nameStr[:len(nameStr)-len(suffix)] break diff --git a/pkg/internal/export/attributes/names/attrs.go b/pkg/internal/export/attributes/names/attrs.go index ad6397844..f95bb3cae 100644 --- a/pkg/internal/export/attributes/names/attrs.go +++ b/pkg/internal/export/attributes/names/attrs.go @@ -96,6 +96,22 @@ var ( K8sDstNodeName = Name("k8s.dst.node.name") ) +// Process Metrics following OTEL 1.26 experimental conventions +// https://opentelemetry.io/docs/specs/semconv/resource/process/ +// https://opentelemetry.io/docs/specs/semconv/system/process-metrics/ + +const ( + ProcCommand = Name(semconv.ProcessCommandKey) + ProcCommandLine = Name(semconv.ProcessCommandLineKey) + ProcCPUState = Name("process.cpu.state") + ProcOwner = Name(semconv.ProcessOwnerKey) + ProcParentPid = Name(semconv.ProcessParentPIDKey) + ProcPid = Name(semconv.ProcessPIDKey) + ProcCommandArgs = Name(semconv.ProcessCommandArgsKey) + ProcExecName = Name(semconv.ProcessExecutableNameKey) + ProcExecPath = Name(semconv.ProcessExecutablePathKey) +) + // other beyla-specific attributes var ( // TargetInstance is a Prometheus-only attribute. diff --git a/pkg/internal/export/otel/common.go b/pkg/internal/export/otel/common.go index e0af1057c..e94707e76 100644 --- a/pkg/internal/export/otel/common.go +++ b/pkg/internal/export/otel/common.go @@ -58,7 +58,7 @@ var DefaultBuckets = Buckets{ RequestSizeHistogram: []float64{0, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}, } -func getResourceAttrs(service svc.ID) *resource.Resource { +func getResourceAttrs(service *svc.ID) *resource.Resource { attrs := []attribute.KeyValue{ semconv.ServiceName(service.Name), semconv.ServiceInstanceID(service.Instance), @@ -87,7 +87,10 @@ func getResourceAttrs(service svc.ID) *resource.Resource { type ReporterPool[T any] struct { pool *simplelru.LRU[svc.UID, T] - itemConstructor func(svc.ID) (T, error) + itemConstructor func(*svc.ID) (T, error) + + lastReporter T + lastService *svc.ID } // NewReporterPool creates a ReporterPool instance given a cache length, @@ -97,7 +100,7 @@ type ReporterPool[T any] struct { func NewReporterPool[T any]( cacheLen int, callback simplelru.EvictCallback[svc.UID, T], - itemConstructor func(id svc.ID) (T, error), + itemConstructor func(id *svc.ID) (T, error), ) ReporterPool[T] { pool, _ := simplelru.NewLRU[svc.UID, T](cacheLen, callback) return ReporterPool[T]{pool: pool, itemConstructor: itemConstructor} @@ -105,14 +108,34 @@ func NewReporterPool[T any]( // For retrieves the associated item for the given service name, or // creates a new one if it does not exist -func (rp *ReporterPool[T]) For(service svc.ID) (T, error) { +func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) { + // optimization: do not query the resources' cache if the + // previously processed span belongs to the same service name + // as the current. + // This will save querying OTEL resource reporters when there is + // only a single instrumented process. + // In multi-process tracing, this is likely to happen as most + // tracers group traces belonging to the same service in the same slice. + if rp.lastService == nil || service.UID != rp.lastService.UID { + lm, err := rp.get(service) + if err != nil { + var t T + return t, err + } + rp.lastService = service + rp.lastReporter = lm + } + return rp.lastReporter, nil +} + +func (rp *ReporterPool[T]) get(service *svc.ID) (T, error) { if m, ok := rp.pool.Get(service.UID); ok { return m, nil } m, err := rp.itemConstructor(service) if err != nil { var t T - return t, fmt.Errorf("creating resource for service %q: %w", &service, err) + return t, fmt.Errorf("creating resource for service %q: %w", service, err) } rp.pool.Add(service.UID, m) return m, nil diff --git a/pkg/internal/export/otel/expirer.go b/pkg/internal/export/otel/expirer.go new file mode 100644 index 000000000..b41fbc8bb --- /dev/null +++ b/pkg/internal/export/otel/expirer.go @@ -0,0 +1,153 @@ +package otel + +import ( + "context" + "fmt" + "log/slog" + "math" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/grafana/beyla/pkg/internal/export/attributes" + "github.com/grafana/beyla/pkg/internal/export/expire" +) + +var timeNow = time.Now + +func plog() *slog.Logger { + return slog.With("component", "otel.Expirer") +} + +// dataPoint implements a metric value of a given type, +// for a set of attributes +// Example of implementers: Gauge and Counter +type dataPoint[T any] interface { + // Load the current value for a given set of attributes + Load() T + // Attributes return the attributes of the current dataPoint + Attributes() attribute.Set +} + +// observer records measurements for a given metric type +type observer[T any] interface { + Observe(T, ...metric.ObserveOption) +} + +// Expirer drops metrics from labels that haven't been updated during a given timeout. +// It has multiple generic types to allow it working with different dataPoints (Gauge, Counter...) +// and different types of data (int, float...). +// Record: type of the record that holds the metric data request.Span, ebpf.Record, process.Status... +// Metric: type of the dataPoint kind: Counter, Gauge... +// VT: type of the value inside the datapoint: int, float64... +type Expirer[Record any, OT observer[VT], Metric dataPoint[VT], VT any] struct { + instancer func(set attribute.Set) Metric + attrs []attributes.Field[Record, attribute.KeyValue] + entries *expire.ExpiryMap[Metric] + log *slog.Logger +} + +// NewExpirer creates an expirer that wraps data points of a given type. Its labeled instances are dropped +// if they haven't been updated during the last timeout period. +// Arguments: +// - instancer: the constructor of each datapoint object (e.g. NewCounter, NewGauge...) +// - attrs: attributes for that given data point +// - clock: function that provides the current time +// - ttl: time to live of the datapoints whose attribute sets haven't been updated +func NewExpirer[Record any, OT observer[VT], Metric dataPoint[VT], VT any]( + instancer func(set attribute.Set) Metric, + attrs []attributes.Field[Record, attribute.KeyValue], + clock expire.Clock, + ttl time.Duration, +) *Expirer[Record, OT, Metric, VT] { + exp := Expirer[Record, OT, Metric, VT]{ + instancer: instancer, + attrs: attrs, + entries: expire.NewExpiryMap[Metric](clock, ttl), + } + exp.log = plog().With("type", fmt.Sprintf("%T", exp)) + return &exp +} + +// ForRecord returns the data point for the given eBPF record. If that record +// s accessed for the first time, a new data point is created. +// If not, a cached copy is returned and the "last access" cache time is updated. +func (ex *Expirer[Record, OT, Metric, VT]) ForRecord(r Record) Metric { + recordAttrs, attrValues := ex.recordAttributes(r) + return ex.entries.GetOrCreate(attrValues, func() Metric { + ex.log.With("labelValues", attrValues).Debug("storing new metric label set") + return ex.instancer(recordAttrs) + }) +} + +func (ex *Expirer[Record, OT, Metric, VT]) Collect(_ context.Context, observer OT) error { + if old := ex.entries.DeleteExpired(); len(old) > 0 { + ex.log.With("labelValues", old).Debug("deleting old OTEL metric") + } + + for _, v := range ex.entries.All() { + observer.Observe(v.Load(), metric.WithAttributeSet(v.Attributes())) + } + + return nil +} + +func (ex *Expirer[Record, OT, Metric, VT]) recordAttributes(m Record) (attribute.Set, []string) { + keyVals := make([]attribute.KeyValue, 0, len(ex.attrs)) + vals := make([]string, 0, len(ex.attrs)) + + for _, attr := range ex.attrs { + kv := attr.Get(m) + keyVals = append(keyVals, kv) + vals = append(vals, kv.Value.Emit()) + } + + return attribute.NewSet(keyVals...), vals +} + +type metricAttributes struct { + attributes attribute.Set +} + +func (g *metricAttributes) Attributes() attribute.Set { + return g.attributes +} + +// Counter data point type +type Counter struct { + metricAttributes + val atomic.Int64 +} + +func NewCounter(attributes attribute.Set) *Counter { + return &Counter{metricAttributes: metricAttributes{attributes: attributes}} +} +func (g *Counter) Load() int64 { + return g.val.Load() +} + +func (g *Counter) Add(v int64) { + g.val.Add(v) +} + +// Gauge data point type +type Gauge struct { + metricAttributes + // Go standard library does not provide atomic packages so we need to + // store the float as bytes and then convert it with the math package + floatBits uint64 +} + +func NewGauge(attributes attribute.Set) *Gauge { + return &Gauge{metricAttributes: metricAttributes{attributes: attributes}} +} + +func (g *Gauge) Load() float64 { + return math.Float64frombits(atomic.LoadUint64(&g.floatBits)) +} + +func (g *Gauge) Set(val float64) { + atomic.StoreUint64(&g.floatBits, math.Float64bits(val)) +} diff --git a/pkg/internal/netolly/export/otel/expirer_test.go b/pkg/internal/export/otel/expirer_test.go similarity index 86% rename from pkg/internal/netolly/export/otel/expirer_test.go rename to pkg/internal/export/otel/expirer_test.go index dee466105..96f6a97c5 100644 --- a/pkg/internal/netolly/export/otel/expirer_test.go +++ b/pkg/internal/export/otel/expirer_test.go @@ -9,17 +9,18 @@ import ( "github.com/mariomac/guara/pkg/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "github.com/grafana/beyla/pkg/internal/export/attributes" - "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/netolly/ebpf" "github.com/grafana/beyla/pkg/internal/pipe/global" "github.com/grafana/beyla/test/collector" ) -const timeout = 3 * time.Second +const timeout = 10 * time.Second func TestMetricsExpiration(t *testing.T) { + defer restoreEnvAfterExecution()() ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -29,13 +30,13 @@ func TestMetricsExpiration(t *testing.T) { now := syncedClock{now: time.Now()} timeNow = now.Now - otelExporter, err := MetricsExporterProvider( - &global.ContextInfo{}, &MetricsConfig{ - Metrics: &otel.MetricsConfig{ + otelExporter, err := NetMetricsExporterProvider( + &global.ContextInfo{}, &NetMetricsConfig{ + Metrics: &MetricsConfig{ Interval: 50 * time.Millisecond, CommonEndpoint: otlp.ServerEndpoint, - MetricsProtocol: otel.ProtocolHTTPProtobuf, - Features: []string{otel.FeatureNetwork}, + MetricsProtocol: ProtocolHTTPProtobuf, + Features: []string{FeatureNetwork}, TTL: 3 * time.Minute, }, AttributeSelectors: attributes.Selection{ attributes.BeylaNetworkFlow.Section: attributes.InclusionLists{ @@ -105,6 +106,22 @@ func TestMetricsExpiration(t *testing.T) { }) } +func TestGauge(t *testing.T) { + g := NewGauge(attribute.Set{}) + g.Set(123.456) + assert.Equal(t, 123.456, g.Load()) + g.Set(456.123) + assert.Equal(t, 456.123, g.Load()) +} + +func TestCounter(t *testing.T) { + g := NewCounter(attribute.Set{}) + g.Add(123) + assert.EqualValues(t, 123, g.Load()) + g.Add(123) + assert.EqualValues(t, 246, g.Load()) +} + type syncedClock struct { mt sync.Mutex now time.Time diff --git a/pkg/internal/export/otel/metrics.go b/pkg/internal/export/otel/metrics.go index 28d44032e..9d2d9ad45 100644 --- a/pkg/internal/export/otel/metrics.go +++ b/pkg/internal/export/otel/metrics.go @@ -55,6 +55,7 @@ const ( FeatureApplication = "application" FeatureSpan = "application_span" FeatureGraph = "application_service_graph" + FeatureProcess = "application_process" ) type MetricsConfig struct { @@ -130,23 +131,23 @@ func (m *MetricsConfig) GuessProtocol() Protocol { // Reason to disable linting: it requires to be a value despite it is considered a "heavy struct". // This method is invoked only once during startup time so it doesn't have a noticeable performance impact. // nolint:gocritic -func (m MetricsConfig) EndpointEnabled() bool { +func (m *MetricsConfig) EndpointEnabled() bool { return m.CommonEndpoint != "" || m.MetricsEndpoint != "" || m.Grafana.MetricsEnabled() } -func (m MetricsConfig) SpanMetricsEnabled() bool { +func (m *MetricsConfig) SpanMetricsEnabled() bool { return slices.Contains(m.Features, FeatureSpan) } -func (m MetricsConfig) ServiceGraphMetricsEnabled() bool { +func (m *MetricsConfig) ServiceGraphMetricsEnabled() bool { return slices.Contains(m.Features, FeatureGraph) } -func (m MetricsConfig) OTelMetricsEnabled() bool { +func (m *MetricsConfig) OTelMetricsEnabled() bool { return slices.Contains(m.Features, FeatureApplication) } -func (m MetricsConfig) Enabled() bool { +func (m *MetricsConfig) Enabled() bool { return m.EndpointEnabled() && (m.OTelMetricsEnabled() || m.SpanMetricsEnabled() || m.ServiceGraphMetricsEnabled()) } @@ -175,7 +176,7 @@ type MetricsReporter struct { // There is a Metrics instance for each service/process instrumented by Beyla. type Metrics struct { ctx context.Context - service svc.ID + service *svc.ID provider *metric.MeterProvider httpDuration instrument.Float64Histogram @@ -438,7 +439,7 @@ func (mr *MetricsReporter) setupGraphMeters(m *Metrics, meter instrument.Meter) return nil } -func (mr *MetricsReporter) newMetricSet(service svc.ID) (*Metrics, error) { +func (mr *MetricsReporter) newMetricSet(service *svc.ID) (*Metrics, error) { mlog := mlog().With("service", service) mlog.Debug("creating new Metrics reporter") resources := getResourceAttrs(service) @@ -602,7 +603,7 @@ func otelHistogramConfig(metricName string, buckets []float64, useExponentialHis } -func (mr *MetricsReporter) metricResourceAttributes(service svc.ID) attribute.Set { +func (mr *MetricsReporter) metricResourceAttributes(service *svc.ID) attribute.Set { attrs := []attribute.KeyValue{ request.ServiceMetric(service.Name), semconv.ServiceInstanceID(service.Instance), @@ -725,8 +726,6 @@ func (r *Metrics) record(span *request.Span, mr *MetricsReporter) { } func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) { - var lastSvcUID svc.UID - var reporter *Metrics for spans := range input { for i := range spans { s := &spans[i] @@ -735,23 +734,11 @@ func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) { if s.IgnoreSpan == request.IgnoreMetrics { continue } - - // optimization: do not query the resources' cache if the - // previously processed span belongs to the same service name - // as the current. - // This will save querying OTEL resource reporters when there is - // only a single instrumented process. - // In multi-process tracing, this is likely to happen as most - // tracers group traces belonging to the same service in the same slice. - if s.ServiceID.UID != lastSvcUID || reporter == nil { - lm, err := mr.reporters.For(s.ServiceID) - if err != nil { - mlog().Error("unexpected error creating OTEL resource. Ignoring metric", - err, "service", s.ServiceID) - continue - } - lastSvcUID = s.ServiceID.UID - reporter = lm + reporter, err := mr.reporters.For(&s.ServiceID) + if err != nil { + mlog().Error("unexpected error creating OTEL resource. Ignoring metric", + err, "service", s.ServiceID) + continue } reporter.record(s, mr) } diff --git a/pkg/internal/netolly/export/otel/metrics.go b/pkg/internal/export/otel/metrics_net.go similarity index 78% rename from pkg/internal/netolly/export/otel/metrics.go rename to pkg/internal/export/otel/metrics_net.go index 9e71b9c1a..ed3577f45 100644 --- a/pkg/internal/netolly/export/otel/metrics.go +++ b/pkg/internal/export/otel/metrics_net.go @@ -17,22 +17,22 @@ import ( "github.com/grafana/beyla/pkg/internal/export/attributes" "github.com/grafana/beyla/pkg/internal/export/expire" - "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/netolly/ebpf" "github.com/grafana/beyla/pkg/internal/pipe/global" ) -type MetricsConfig struct { - Metrics *otel.MetricsConfig +// NetMetricsConfig extends MetricsConfig for Network Metrics +type NetMetricsConfig struct { + Metrics *MetricsConfig AttributeSelectors attributes.Selection } -func (mc MetricsConfig) Enabled() bool { - return mc.Metrics != nil && mc.Metrics.EndpointEnabled() && slices.Contains(mc.Metrics.Features, otel.FeatureNetwork) +func (mc NetMetricsConfig) Enabled() bool { + return mc.Metrics != nil && mc.Metrics.EndpointEnabled() && slices.Contains(mc.Metrics.Features, FeatureNetwork) } -func mlog() *slog.Logger { - return slog.With("component", "flows.MetricsReporter") +func nmlog() *slog.Logger { + return slog.With("component", "otel.NetworkMetricsExporter") } func newResource() *resource.Resource { @@ -59,19 +59,19 @@ func newMeterProvider(res *resource.Resource, exporter *metric.Exporter, interva return meterProvider, nil } -type metricsExporter struct { - metrics *Expirer +type netMetricsExporter struct { + metrics *Expirer[*ebpf.Record, metric2.Int64Observer, *Counter, int64] clock *expire.CachedClock } -func MetricsExporterProvider(ctxInfo *global.ContextInfo, cfg *MetricsConfig) (pipe.FinalFunc[[]*ebpf.Record], error) { +func NetMetricsExporterProvider(ctxInfo *global.ContextInfo, cfg *NetMetricsConfig) (pipe.FinalFunc[[]*ebpf.Record], error) { if !cfg.Enabled() { // This node is not going to be instantiated. Let the pipes library just ignore it. return pipe.IgnoreFinal[[]*ebpf.Record](), nil } - log := mlog() + log := nmlog() log.Debug("instantiating network metrics exporter provider") - exporter, err := otel.InstantiateMetricsExporter(context.Background(), cfg.Metrics, log) + exporter, err := InstantiateMetricsExporter(context.Background(), cfg.Metrics, log) if err != nil { log.Error("", "error", err) return nil, err @@ -93,7 +93,7 @@ func MetricsExporterProvider(ctxInfo *global.ContextInfo, cfg *MetricsConfig) (p attrProv.For(attributes.BeylaNetworkFlow)) clock := expire.NewCachedClock(timeNow) - expirer := NewExpirer(attrs, clock.Time, cfg.Metrics.TTL) + expirer := NewExpirer[*ebpf.Record, metric2.Int64Observer](NewCounter, attrs, clock.Time, cfg.Metrics.TTL) ebpfEvents := provider.Meter("network_ebpf_events") _, err = ebpfEvents.Int64ObservableCounter( @@ -107,17 +107,17 @@ func MetricsExporterProvider(ctxInfo *global.ContextInfo, cfg *MetricsConfig) (p return nil, err } log.Debug("restricting attributes not in this list", "attributes", cfg.AttributeSelectors) - return (&metricsExporter{ + return (&netMetricsExporter{ metrics: expirer, clock: clock, }).Do, nil } -func (me *metricsExporter) Do(in <-chan []*ebpf.Record) { +func (me *netMetricsExporter) Do(in <-chan []*ebpf.Record) { for i := range in { me.clock.Update() for _, v := range i { - me.metrics.ForRecord(v).val.Add(int64(v.Metrics.Bytes)) + me.metrics.ForRecord(v).Add(int64(v.Metrics.Bytes)) } } } diff --git a/pkg/internal/netolly/export/otel/metrics_test.go b/pkg/internal/export/otel/metrics_net_test.go similarity index 56% rename from pkg/internal/netolly/export/otel/metrics_test.go rename to pkg/internal/export/otel/metrics_net_test.go index 3643b56bd..b1188a3cf 100644 --- a/pkg/internal/netolly/export/otel/metrics_test.go +++ b/pkg/internal/export/otel/metrics_net_test.go @@ -5,14 +5,15 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "github.com/grafana/beyla/pkg/internal/export/attributes" attr "github.com/grafana/beyla/pkg/internal/export/attributes/names" - "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/netolly/ebpf" ) func TestMetricAttributes(t *testing.T) { + defer restoreEnvAfterExecution()() in := &ebpf.Record{ NetFlowRecordT: ebpf.NetFlowRecordT{ Id: ebpf.NetFlowId{ @@ -35,19 +36,19 @@ func TestMetricAttributes(t *testing.T) { in.Id.SrcIp.In6U.U6Addr8 = [16]uint8{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 12, 34, 56, 78} in.Id.DstIp.In6U.U6Addr8 = [16]uint8{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 33, 22, 11, 1} - me := &metricsExporter{metrics: &Expirer{attrs: attributes.OpenTelemetryGetters( - ebpf.RecordGetters, []attr.Name{ - attr.SrcAddress, attr.DstAddres, attr.SrcPort, attr.DstPort, attr.SrcName, attr.DstName, - attr.K8sSrcName, attr.K8sSrcNamespace, attr.K8sDstName, attr.K8sDstNamespace, - })}} - reportedAttributes, _ := me.metrics.recordAttributes(in) + me := NewExpirer[*ebpf.Record, metric.Int64Observer, *Counter, int64](NewCounter, attributes.OpenTelemetryGetters(ebpf.RecordGetters, []attr.Name{ + attr.SrcAddress, attr.DstAddres, attr.SrcPort, attr.DstPort, attr.SrcName, attr.DstName, + attr.K8sSrcName, attr.K8sSrcNamespace, attr.K8sDstName, attr.K8sDstNamespace, + }), timeNow, timeout) + record := me.ForRecord(in) + reportedAttributes := record.Attributes() for _, mustContain := range []attribute.KeyValue{ attribute.String("src.address", "12.34.56.78"), attribute.String("dst.address", "33.22.11.1"), attribute.String("src.name", "srcname"), attribute.String("dst.name", "dstname"), - attribute.String("src.port", "12345"), - attribute.String("dst.port", "3210"), + attribute.Int("src.port", 12345), + attribute.Int("dst.port", 3210), attribute.String("k8s.src.name", "srcname"), attribute.String("k8s.src.namespace", "srcnamespace"), @@ -62,6 +63,7 @@ func TestMetricAttributes(t *testing.T) { } func TestMetricAttributes_Filter(t *testing.T) { + defer restoreEnvAfterExecution()() in := &ebpf.Record{ NetFlowRecordT: ebpf.NetFlowRecordT{ Id: ebpf.NetFlowId{ @@ -84,12 +86,13 @@ func TestMetricAttributes_Filter(t *testing.T) { in.Id.SrcIp.In6U.U6Addr8 = [16]uint8{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 12, 34, 56, 78} in.Id.DstIp.In6U.U6Addr8 = [16]uint8{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 33, 22, 11, 1} - me := &Expirer{attrs: attributes.OpenTelemetryGetters(ebpf.RecordGetters, []attr.Name{ + me := NewExpirer[*ebpf.Record, metric.Int64Observer, *Counter, int64](NewCounter, attributes.OpenTelemetryGetters(ebpf.RecordGetters, []attr.Name{ "src.address", "k8s.src.name", "k8s.dst.name", - })} - reportedAttributes, _ := me.recordAttributes(in) + }), timeNow, timeout) + record := me.ForRecord(in) + reportedAttributes := record.Attributes() for _, mustContain := range []attribute.KeyValue{ attribute.String("src.address", "12.34.56.78"), attribute.String("k8s.src.name", "srcname"), @@ -110,23 +113,23 @@ func TestMetricAttributes_Filter(t *testing.T) { } } -func TestMetricsConfig_Enabled(t *testing.T) { - assert.True(t, MetricsConfig{Metrics: &otel.MetricsConfig{ - Features: []string{otel.FeatureApplication, otel.FeatureNetwork}, CommonEndpoint: "foo"}}.Enabled()) - assert.True(t, MetricsConfig{Metrics: &otel.MetricsConfig{ - Features: []string{otel.FeatureNetwork, otel.FeatureApplication}, MetricsEndpoint: "foo"}}.Enabled()) - assert.True(t, MetricsConfig{Metrics: &otel.MetricsConfig{ - Features: []string{otel.FeatureNetwork}, Grafana: &otel.GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}}.Enabled()) +func TestNetMetricsConfig_Enabled(t *testing.T) { + assert.True(t, NetMetricsConfig{Metrics: &MetricsConfig{ + Features: []string{FeatureApplication, FeatureNetwork}, CommonEndpoint: "foo"}}.Enabled()) + assert.True(t, NetMetricsConfig{Metrics: &MetricsConfig{ + Features: []string{FeatureNetwork, FeatureApplication}, MetricsEndpoint: "foo"}}.Enabled()) + assert.True(t, NetMetricsConfig{Metrics: &MetricsConfig{ + Features: []string{FeatureNetwork}, Grafana: &GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}}.Enabled()) } -func TestMetricsConfig_Disabled(t *testing.T) { - var fa = []string{otel.FeatureApplication} - var fn = []string{otel.FeatureNetwork} - assert.False(t, MetricsConfig{Metrics: &otel.MetricsConfig{Features: fn}}.Enabled()) - assert.False(t, MetricsConfig{Metrics: &otel.MetricsConfig{Features: fn, Grafana: &otel.GrafanaOTLP{Submit: []string{"traces"}, InstanceID: "33221"}}}.Enabled()) - assert.False(t, MetricsConfig{Metrics: &otel.MetricsConfig{Features: fn, Grafana: &otel.GrafanaOTLP{Submit: []string{"metrics"}}}}.Enabled()) +func TestNetMetricsConfig_Disabled(t *testing.T) { + var fa = []string{FeatureApplication} + var fn = []string{FeatureNetwork} + assert.False(t, NetMetricsConfig{Metrics: &MetricsConfig{Features: fn}}.Enabled()) + assert.False(t, NetMetricsConfig{Metrics: &MetricsConfig{Features: fn, Grafana: &GrafanaOTLP{Submit: []string{"traces"}, InstanceID: "33221"}}}.Enabled()) + assert.False(t, NetMetricsConfig{Metrics: &MetricsConfig{Features: fn, Grafana: &GrafanaOTLP{Submit: []string{"metrics"}}}}.Enabled()) // network feature is not enabled - assert.False(t, MetricsConfig{Metrics: &otel.MetricsConfig{CommonEndpoint: "foo"}}.Enabled()) - assert.False(t, MetricsConfig{Metrics: &otel.MetricsConfig{MetricsEndpoint: "foo", Features: fa}}.Enabled()) - assert.False(t, MetricsConfig{Metrics: &otel.MetricsConfig{Grafana: &otel.GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}}.Enabled()) + assert.False(t, NetMetricsConfig{Metrics: &MetricsConfig{CommonEndpoint: "foo"}}.Enabled()) + assert.False(t, NetMetricsConfig{Metrics: &MetricsConfig{MetricsEndpoint: "foo", Features: fa}}.Enabled()) + assert.False(t, NetMetricsConfig{Metrics: &MetricsConfig{Grafana: &GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}}.Enabled()) } diff --git a/pkg/internal/export/otel/metrics_proc.go b/pkg/internal/export/otel/metrics_proc.go new file mode 100644 index 000000000..462e28999 --- /dev/null +++ b/pkg/internal/export/otel/metrics_proc.go @@ -0,0 +1,169 @@ +package otel + +import ( + "context" + "fmt" + "log/slog" + "slices" + + "github.com/mariomac/pipes/pipe" + "go.opentelemetry.io/otel/attribute" + metric2 "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + + "github.com/grafana/beyla/pkg/internal/export/attributes" + "github.com/grafana/beyla/pkg/internal/export/expire" + "github.com/grafana/beyla/pkg/internal/infraolly/process" + "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/svc" +) + +// ProcMetricsConfig extends MetricsConfig for process metrics +type ProcMetricsConfig struct { + Metrics *MetricsConfig + AttributeSelectors attributes.Selection +} + +func (mc *ProcMetricsConfig) Enabled() bool { + return mc.Metrics != nil && mc.Metrics.EndpointEnabled() && mc.Metrics.OTelMetricsEnabled() && + slices.Contains(mc.Metrics.Features, FeatureProcess) +} + +func pmlog() *slog.Logger { + return slog.With("component", "otel.ProcMetricsExporter") +} + +type procMetricsExporter struct { + ctx context.Context + cfg *ProcMetricsConfig + clock *expire.CachedClock + + exporter metric.Exporter + reporters ReporterPool[*procMetrics] + + log *slog.Logger + + attrCPUTime []attributes.Field[*process.Status, attribute.KeyValue] +} + +type procMetrics struct { + ctx context.Context + service *svc.ID + provider *metric.MeterProvider + + cpuTime *Expirer[*process.Status, metric2.Float64Observer, *Gauge, float64] +} + +func ProcMetricsExporterProvider( + ctx context.Context, + ctxInfo *global.ContextInfo, + cfg *ProcMetricsConfig, +) pipe.FinalProvider[[]*process.Status] { + return func() (pipe.FinalFunc[[]*process.Status], error) { + if !cfg.Enabled() { + // This node is not going to be instantiated. Let the pipes library just ignore it. + return pipe.IgnoreFinal[[]*process.Status](), nil + } + return newProcMetricsExporter(ctx, ctxInfo, cfg) + } +} + +func newProcMetricsExporter( + ctx context.Context, + ctxInfo *global.ContextInfo, + cfg *ProcMetricsConfig, +) (pipe.FinalFunc[[]*process.Status], error) { + SetupInternalOTELSDKLogger(cfg.Metrics.SDKLogLevel) + + log := pmlog() + log.Debug("instantiating process metrics exporter provider") + + // only user-provided attributes (or default set) will decorate the metrics + attrProv, err := attributes.NewAttrSelector(ctxInfo.MetricAttributeGroups, cfg.AttributeSelectors) + if err != nil { + return nil, fmt.Errorf("process OTEL exporter attributes: %w", err) + } + + mr := &procMetricsExporter{ + ctx: ctx, + cfg: cfg, + clock: expire.NewCachedClock(timeNow), + attrCPUTime: attributes.OpenTelemetryGetters( + process.OTELGetters, attrProv.For(attributes.ProcessCPUUtilization)), + log: log, + } + + mr.reporters = NewReporterPool[*procMetrics](cfg.Metrics.ReportersCacheLen, + func(id svc.UID, v *procMetrics) { + llog := log.With("service", id) + llog.Debug("evicting metrics reporter from cache") + go func() { + if err := v.provider.ForceFlush(ctx); err != nil { + llog.Warn("error flushing evicted metrics provider", "error", err) + } + }() + }, mr.newMetricSet) + + mr.exporter, err = InstantiateMetricsExporter(ctx, cfg.Metrics, log) + if err != nil { + log.Error("instantiating metrics exporter", "error", err) + return nil, err + } + + return mr.Do, nil +} + +func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, error) { + log := me.log.With("service", service) + log.Debug("creating new Metrics exporter") + resources := getResourceAttrs(service) + opts := []metric.Option{ + metric.WithResource(resources), + metric.WithReader(metric.NewPeriodicReader(me.exporter, + metric.WithInterval(me.cfg.Metrics.Interval))), + } + + m := procMetrics{ + ctx: me.ctx, + service: service, + provider: metric.NewMeterProvider(opts...), + } + + meter := m.provider.Meter(reporterName) + m.cpuTime = NewExpirer[*process.Status, metric2.Float64Observer]( + NewGauge, + me.attrCPUTime, + timeNow, + me.cfg.Metrics.TTL, + ) + if _, err := meter.Float64ObservableGauge( + attributes.ProcessCPUUtilization.OTEL, + metric2.WithDescription("TODO"), + metric2.WithUnit("1"), + metric2.WithFloat64Callback(m.cpuTime.Collect), + ); err != nil { + log.Error("creating observable gauge for "+attributes.ProcessCPUUtilization.OTEL, "error", err) + return nil, err + } + + return &m, nil +} + +// Do reads all the process status data points and create the metrics accordingly +func (me *procMetricsExporter) Do(in <-chan []*process.Status) { + for i := range in { + me.clock.Update() + for _, s := range i { + reporter, err := me.reporters.For(s.Service) + if err != nil { + me.log.Error("unexpected error creating OTEL resource. Ignoring metric", + err, "service", s.Service) + continue + } + me.log.Debug("reporting data for record", "record", s) + // TODO: support process.cpu.state=user/system/total + // TODO: add more process metrics https://opentelemetry.io/docs/specs/semconv/system/process-metrics/ + reporter.cpuTime.ForRecord(s).Set(s.CPUPercent / 100) + } + } +} diff --git a/pkg/internal/export/otel/metrics_test.go b/pkg/internal/export/otel/metrics_test.go index bb98c63ac..1d54642f8 100644 --- a/pkg/internal/export/otel/metrics_test.go +++ b/pkg/internal/export/otel/metrics_test.go @@ -20,8 +20,6 @@ import ( "github.com/grafana/beyla/pkg/internal/request" ) -const timeout = 10 * time.Second - var fakeMux = sync.Mutex{} func TestHTTPMetricsEndpointOptions(t *testing.T) { @@ -341,19 +339,19 @@ func TestMetricSetupHTTP_DoNotOverrideEnv(t *testing.T) { } func TestMetricsConfig_Enabled(t *testing.T) { - assert.True(t, MetricsConfig{Features: []string{FeatureApplication, FeatureNetwork}, CommonEndpoint: "foo"}.Enabled()) - assert.True(t, MetricsConfig{Features: []string{FeatureApplication}, MetricsEndpoint: "foo"}.Enabled()) - assert.True(t, MetricsConfig{Features: []string{FeatureNetwork, FeatureApplication}, Grafana: &GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}.Enabled()) + assert.True(t, (&MetricsConfig{Features: []string{FeatureApplication, FeatureNetwork}, CommonEndpoint: "foo"}).Enabled()) + assert.True(t, (&MetricsConfig{Features: []string{FeatureApplication}, MetricsEndpoint: "foo"}).Enabled()) + assert.True(t, (&MetricsConfig{Features: []string{FeatureNetwork, FeatureApplication}, Grafana: &GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}).Enabled()) } func TestMetricsConfig_Disabled(t *testing.T) { - assert.False(t, MetricsConfig{Features: []string{FeatureApplication}}.Enabled()) - assert.False(t, MetricsConfig{Features: []string{FeatureApplication}, Grafana: &GrafanaOTLP{Submit: []string{"traces"}, InstanceID: "33221"}}.Enabled()) - assert.False(t, MetricsConfig{Features: []string{FeatureApplication}, Grafana: &GrafanaOTLP{Submit: []string{"metrics"}}}.Enabled()) + assert.False(t, (&MetricsConfig{Features: []string{FeatureApplication}}).Enabled()) + assert.False(t, (&MetricsConfig{Features: []string{FeatureApplication}, Grafana: &GrafanaOTLP{Submit: []string{"traces"}, InstanceID: "33221"}}).Enabled()) + assert.False(t, (&MetricsConfig{Features: []string{FeatureApplication}, Grafana: &GrafanaOTLP{Submit: []string{"metrics"}}}).Enabled()) // application feature is not enabled - assert.False(t, MetricsConfig{CommonEndpoint: "foo"}.Enabled()) - assert.False(t, MetricsConfig{MetricsEndpoint: "foo", Features: []string{FeatureNetwork}}.Enabled()) - assert.False(t, MetricsConfig{Grafana: &GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}.Enabled()) + assert.False(t, (&MetricsConfig{CommonEndpoint: "foo"}).Enabled()) + assert.False(t, (&MetricsConfig{MetricsEndpoint: "foo", Features: []string{FeatureNetwork}}).Enabled()) + assert.False(t, (&MetricsConfig{Grafana: &GrafanaOTLP{Submit: []string{"traces", "metrics"}, InstanceID: "33221"}}).Enabled()) } func (f *fakeInternalMetrics) OTELMetricExport(len int) { diff --git a/pkg/internal/export/otel/traces.go b/pkg/internal/export/otel/traces.go index 099096b09..fbf07be22 100644 --- a/pkg/internal/export/otel/traces.go +++ b/pkg/internal/export/otel/traces.go @@ -323,7 +323,7 @@ func GenerateTraces(span *request.Span, userAttrs map[attr.Name]struct{}) ptrace traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() ss := rs.ScopeSpans().AppendEmpty() - resourceAttrs := attrsToMap(getResourceAttrs(span.ServiceID).Attributes()) + resourceAttrs := attrsToMap(getResourceAttrs(&span.ServiceID).Attributes()) resourceAttrs.PutStr(string(semconv.OTelLibraryNameKey), reporterName) resourceAttrs.CopyTo(rs.Resource().Attributes()) diff --git a/pkg/internal/export/prom/prom.go b/pkg/internal/export/prom/prom.go index ef1094582..24eb6e14c 100644 --- a/pkg/internal/export/prom/prom.go +++ b/pkg/internal/export/prom/prom.go @@ -124,9 +124,13 @@ func (p PrometheusConfig) ServiceGraphMetricsEnabled() bool { return slices.Contains(p.Features, otel.FeatureGraph) } +func (p PrometheusConfig) EndpointEnabled() bool { + return p.Port != 0 || p.Registry != nil +} + // nolint:gocritic func (p PrometheusConfig) Enabled() bool { - return (p.Port != 0 || p.Registry != nil) && (p.OTelMetricsEnabled() || p.SpanMetricsEnabled() || p.ServiceGraphMetricsEnabled()) + return p.EndpointEnabled() && (p.OTelMetricsEnabled() || p.SpanMetricsEnabled() || p.ServiceGraphMetricsEnabled()) } type metricsReporter struct { diff --git a/pkg/internal/filter/attribute_test.go b/pkg/internal/filter/attribute_test.go index 7dc853e2a..a2a09a799 100644 --- a/pkg/internal/filter/attribute_test.go +++ b/pkg/internal/filter/attribute_test.go @@ -19,7 +19,7 @@ func TestAttributeFilter(t *testing.T) { filterFunc, err := ByAttribute[*ebpf.Record](AttributeFamilyConfig{ "beyla.ip": MatchDefinition{Match: "148.*"}, "k8s.src.namespace": MatchDefinition{NotMatch: "debug"}, - }, ebpf.RecordGetters)() + }, ebpf.RecordStringGetters)() require.NoError(t, err) in := make(chan []*ebpf.Record, 10) @@ -93,7 +93,7 @@ func TestAttributeFilter_VerificationError(t *testing.T) { } for _, tc := range testCases { t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) { - _, err := ByAttribute[*ebpf.Record](tc, ebpf.RecordGetters)() + _, err := ByAttribute[*ebpf.Record](tc, ebpf.RecordStringGetters)() assert.Error(t, err) }) } diff --git a/pkg/internal/infraolly/process/collect.go b/pkg/internal/infraolly/process/collect.go new file mode 100644 index 000000000..0da6e0736 --- /dev/null +++ b/pkg/internal/infraolly/process/collect.go @@ -0,0 +1,109 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package process + +import ( + "context" + "log/slog" + "math" + "time" + + "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/mariomac/pipes/pipe" + + "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/svc" +) + +type CollectConfig struct { + // RunMode defaults to "privileged". A non-privileged harvester will omit some information like open FDs. + // TODO: move to an upper layer + RunMode RunMode + + // Interval between harvests + Interval time.Duration `yaml:"interval" env:"BEYLA_PROCESSES_INTERVAL"` +} + +// Collector returns runtime information about the currently running processes. +// The collector receives each application trace from the newPids internal channel, +// to know which PIDs are active. +type Collector struct { + newPids *<-chan []request.Span + ctx context.Context + cfg *CollectConfig + harvest *Harvester + cache *simplelru.LRU[int32, *linuxProcess] + log *slog.Logger +} + +// NewCollectorProvider creates and returns a new process Collector, given an agent context. +func NewCollectorProvider(ctx context.Context, input *<-chan []request.Span, cfg *CollectConfig) pipe.StartProvider[[]*Status] { + return func() (pipe.StartFunc[[]*Status], error) { + // we purge entries explicitly so size is unbounded + cache, _ := simplelru.NewLRU[int32, *linuxProcess](math.MaxInt, nil) + harvest := newHarvester(cfg, cache) + + return (&Collector{ + ctx: ctx, + cfg: cfg, + harvest: harvest, + cache: cache, + log: pslog(), + newPids: input, + }).Run, nil + } +} + +func (ps *Collector) Run(out chan<- []*Status) { + // TODO: set app metadata as key for later decoration? (e.g. K8s metadata, svc.ID) + pids := map[int32]*svc.ID{} + collectTicker := time.NewTicker(ps.cfg.Interval) + defer collectTicker.Stop() + newPids := *ps.newPids + for { + select { + case <-ps.ctx.Done(): + ps.log.Debug("exiting") + case spans := <-newPids: + // updating PIDs map with spans information + for i := range spans { + pids[spans[i].ServiceID.ProcPID] = &spans[i].ServiceID + } + case <-collectTicker.C: + ps.log.Debug("start process collection") + procs, removed := ps.Collect(pids) + for _, rp := range removed { + delete(pids, rp) + } + out <- procs + } + } +} + +// Collect returns the status for all the running processes, decorated with Docker runtime information, if applies. +// It also returns the PIDs that have to be removed from the map, as they do not exist anymore +func (ps *Collector) Collect(pids map[int32]*svc.ID) ([]*Status, []int32) { + results := make([]*Status, 0, len(pids)) + + var removed []int32 + for pid, svcID := range pids { + status, err := ps.harvest.Harvest(svcID) + if err != nil { + ps.log.Debug("skipping process", "pid", pid, "error", err) + ps.harvest.cache.Remove(pid) + removed = append(removed, pid) + continue + } + + results = append(results, status) + } + + // remove processes from cache that haven't been collected in this iteration + // (this means they already disappeared so there is no need for caching) + for ps.cache.Len() > len(results) { + ps.cache.RemoveOldest() + } + + return results, removed +} diff --git a/pkg/internal/infraolly/process/harvest.go b/pkg/internal/infraolly/process/harvest.go new file mode 100644 index 000000000..525e8cdb2 --- /dev/null +++ b/pkg/internal/infraolly/process/harvest.go @@ -0,0 +1,167 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Package process provides all the tools and functionality for sampling processes. It is divided in three main +// components: +// - Status: provides OS-level information of a process at a given spot +// - Harvester: fetches and creates actual Process Status from system +// - Collector: uses input from the application pipeline to fetch information for all the processes from +// the instrumented applications, and forwards it to the next stage of the Process' pipeline. +package process + +import ( + "fmt" + "log/slog" + "os" + + "github.com/hashicorp/golang-lru/v2/simplelru" + + "github.com/grafana/beyla/pkg/internal/svc" +) + +func hlog() *slog.Logger { + return slog.With("component", "process.Harvester") +} + +type RunMode string + +const ( + RunModePrivileged = "privileged" + RunModeUnprivileged = "unprivileged" +) + +// Harvester fetches processes' information from Linux +type Harvester struct { + // allows overriding the /proc filesystem location via HOST_PROC env var + procFSRoot string + privileged bool + cache *simplelru.LRU[int32, *linuxProcess] + log *slog.Logger +} + +func newHarvester(cfg *CollectConfig, cache *simplelru.LRU[int32, *linuxProcess]) *Harvester { + // we need to use the same method to override HOST_PROC that is used by gopsutil library + hostProc, ok := os.LookupEnv("HOST_PROC") + if !ok { + hostProc = "/proc" + } + + return &Harvester{ + procFSRoot: hostProc, + privileged: cfg.RunMode == RunModePrivileged, + cache: cache, + log: hlog(), + } +} + +// Harvest returns a status of a process whose PID is passed as argument. The 'elapsedSeconds' argument represents the +// time since this process was statusd for the last time. If the process has been statusd for the first time, this value +// will be ignored +func (ps *Harvester) Harvest(svcID *svc.ID) (*Status, error) { + pid := svcID.ProcPID + ps.log.Debug("harvesting pid", "pid", pid) + // Reuses process information that does not vary + cached, hasCachedEntry := ps.cache.Get(pid) + + var err error + // If cached is nil, the linux process will be created from fresh data + cached, err = getLinuxProcess(cached, ps.procFSRoot, pid, ps.privileged) + if err != nil { + return nil, fmt.Errorf("can't create process: %w", err) + } + + // Creates a fresh process status and populates it with the metrics data + status := NewStatus(pid, svcID) + + if err := ps.populateStaticData(status, cached); err != nil { + return nil, fmt.Errorf("can't populate static attributes: %w", err) + } + + // As soon as we have successfully stored the static (reusable) values, we can cache the entry + if !hasCachedEntry { + ps.cache.Add(pid, cached) + } + + if err := ps.populateGauges(status, cached); err != nil { + return nil, fmt.Errorf("can't fetch gauge data: %w", err) + } + + if err := ps.populateIOCounters(status, cached); err != nil { + return nil, fmt.Errorf("can't fetch deltas: %w", err) + } + + return status, nil +} + +// populateStaticData populates the status with the process data won't vary during the process life cycle +func (ps *Harvester) populateStaticData(status *Status, process *linuxProcess) error { + process.FetchCommandInfo() + status.Command = process.Command() + status.CommandArgs = process.commandArgs + status.CommandLine = process.commandLine + status.ExecPath = process.execPath + status.ExecName = process.execName + + status.ProcessID = process.Pid() + + var err error + if status.User, err = process.Username(); err != nil { + ps.log.Debug("can't get username for process", "pid", status.ProcessID, "error", err) + } + + status.ParentProcessID = process.Ppid() + + return nil +} + +// populateGauges populates the status with gauge data that represents the process state at a given point +func (ps *Harvester) populateGauges(status *Status, process *linuxProcess) error { + var err error + + cpuTimes, err := process.CPUTimes() + if err != nil { + return err + } + status.CPUPercent = cpuTimes.Percent + + totalCPU := cpuTimes.User + cpuTimes.System + + if totalCPU > 0 { + status.CPUUserPercent = (cpuTimes.User / totalCPU) * status.CPUPercent + status.CPUSystemPercent = (cpuTimes.System / totalCPU) * status.CPUPercent + } else { + status.CPUUserPercent = 0 + status.CPUSystemPercent = 0 + } + + if ps.privileged { + status.FdCount, err = process.NumFDs() + if err != nil { + return err + } + } + + // Extra status data + status.Status = process.Status() + status.ThreadCount = process.NumThreads() + status.MemoryVMSBytes = process.VMSize() + status.MemoryRSSBytes = process.VMRSS() + + return nil +} + +// populateIOCounters fills the status with the IO counters data. For the "X per second" metrics, it requires the +// last process status for comparative purposes +func (ps *Harvester) populateIOCounters(status *Status, source *linuxProcess) error { + ioCounters, err := source.IOCounters() + if err != nil { + return err + } + if ioCounters != nil { + status.IOReadCount = ioCounters.ReadCount + status.IOWriteCount = ioCounters.WriteCount + status.IOReadBytes = ioCounters.ReadBytes + status.IOWriteBytes = ioCounters.WriteBytes + } + return nil +} diff --git a/pkg/internal/infraolly/process/harvest_test.go b/pkg/internal/infraolly/process/harvest_test.go new file mode 100644 index 000000000..091c8d351 --- /dev/null +++ b/pkg/internal/infraolly/process/harvest_test.go @@ -0,0 +1,135 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//go:build linux + +package process + +import ( + "fmt" + "math" + "os" + "os/exec" + "testing" + "time" + + "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/mariomac/guara/pkg/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/beyla/pkg/internal/svc" +) + +func TestLinuxHarvester_IsPrivileged(t *testing.T) { + cases := []struct { + mode RunMode + privileged bool + }{ + {mode: RunModePrivileged, privileged: true}, + {mode: RunModeUnprivileged, privileged: false}, + } + for _, c := range cases { + t.Run(fmt.Sprint("mode ", c.mode), func(t *testing.T) { + cache, _ := simplelru.NewLRU[int32, *linuxProcess](math.MaxInt, nil) + h := newHarvester(&CollectConfig{RunMode: c.mode}, cache) + + // If not privileged, it is expected to not report neither FDs nor IO counters + status, err := h.Harvest(&svc.ID{ProcPID: int32(os.Getpid())}) + require.NoError(t, err) + if c.privileged { + assert.NotZero(t, status.FdCount) + assert.NotZero(t, status.IOReadCount) + } else { + assert.Zero(t, status.FdCount) + assert.Zero(t, status.IOReadCount) + } + }) + } +} + +func TestLinuxHarvester_Harvest(t *testing.T) { + // Given a process harvester + cache, _ := simplelru.NewLRU[int32, *linuxProcess](math.MaxInt, nil) + h := newHarvester(&CollectConfig{}, cache) + + // When retrieving for a given process status (e.g. the current testing executable) + status, err := h.Harvest(&svc.ID{ProcPID: int32(os.Getpid())}) + + // It returns the corresponding process status with valid data + require.NoError(t, err) + require.NotNil(t, status) + + assert.Equal(t, int32(os.Getpid()), status.ProcessID) + assert.Equal(t, "process.test", status.Command) + assert.Contains(t, status.CommandLine, os.Args[0]) + assert.NotEmpty(t, status.User) + assert.Contains(t, "RSD", status.Status, + "process status must be R (running), S (interruptible sleep) or D (uninterruptible sleep)") + assert.NotZero(t, status.MemoryVMSBytes) + assert.NotZero(t, status.MemoryRSSBytes) + assert.NotZero(t, status.ParentProcessID) + assert.NotZero(t, status.ThreadCount) +} + +func TestLinuxHarvester_Harvest_FullCommandLine(t *testing.T) { + cmd := exec.Command("/bin/sleep", "1m") + require.NoError(t, cmd.Start()) + defer func() { + _ = cmd.Process.Kill() + }() + + // Given a process harvester configured to showw the full command line + cache, _ := simplelru.NewLRU[int32, *linuxProcess](math.MaxInt, nil) + h := newHarvester(&CollectConfig{}, cache) + + test.Eventually(t, 5*time.Second, func(t require.TestingT) { + // When retrieving for a given process status (e.g. the current testing executable) + status, err := h.Harvest(&svc.ID{ProcPID: int32(cmd.Process.Pid)}) + + // It returns the corresponding Command line without stripping arguments + require.NoError(t, err) + require.NotNil(t, status) + + assert.Equal(t, "sleep", status.ExecName) + assert.Equal(t, "/bin/sleep", status.ExecPath) + assert.Equal(t, "/bin/sleep 1m", status.CommandLine) + assert.Equal(t, "sleep", status.Command) + assert.Equal(t, []string{"1m"}, status.CommandArgs) + }) +} + +func TestLinuxHarvester_Do_InvalidateCache_DifferentCmd(t *testing.T) { + currentPid := int32(os.Getpid()) + + // Given a process harvester + // That has cached an old process sharing the PID with a new process + cache, _ := simplelru.NewLRU[int32, *linuxProcess](math.MaxInt, nil) + cache.Add(currentPid, &linuxProcess{stats: procStats{command: "something old"}}) + h := newHarvester(&CollectConfig{}, cache) + + // When the process is harvested + status, err := h.Harvest(&svc.ID{ProcPID: currentPid}) + require.NoError(t, err) + + // The status is updated + assert.NotEmpty(t, status.Command) + assert.NotEqual(t, "something old", status.Command) +} + +func TestLinuxHarvester_Do_InvalidateCache_DifferentPid(t *testing.T) { + currentPid := int32(os.Getpid()) + + // Given a process harvester + // That has cached an old process sharing the PID with a new process + cache, _ := simplelru.NewLRU[int32, *linuxProcess](math.MaxInt, nil) + cache.Add(currentPid, &linuxProcess{stats: procStats{ppid: -1}}) + h := newHarvester(&CollectConfig{}, cache) + + // When the process is harvested + status, err := h.Harvest(&svc.ID{ProcPID: currentPid}) + require.NoError(t, err) + + // The status is updated + assert.NotEqual(t, -1, status.ParentProcessID) +} diff --git a/pkg/internal/infraolly/process/snapshot.go b/pkg/internal/infraolly/process/snapshot.go new file mode 100644 index 000000000..702ec460b --- /dev/null +++ b/pkg/internal/infraolly/process/snapshot.go @@ -0,0 +1,427 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package process + +import ( + "bytes" + "fmt" + "os" + "path" + "runtime" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/process" + + "github.com/grafana/beyla/pkg/internal/helpers" +) + +// CPUInfo represents CPU usage statistics at a given point +type CPUInfo struct { + // Percent is the total CPU usage percent + Percent float64 + // User is the CPU user time + User float64 + // System is the CPU system time + System float64 +} + +// linuxProcess is an implementation of the process.Snapshot interface for linux hosts. It is designed to be highly +// optimized and avoid unnecessary/duplicated system calls. +type linuxProcess struct { + // if privileged == false, some operations will be avoided: FD and IO count. + privileged bool + + stats procStats + process *process.Process + lastCPU CPUInfo + lastTime time.Time + + procFSRoot string + + // data that will be reused between harvests of the same process. + pid int32 + user string + commandInfoFetched bool + commandArgs []string + commandLine string + execPath string + execName string +} + +// needed to calculate RSS. +var pageSize int64 + +// needed to calculate CPU times. +var clockTicks int64 + +// for testing getting username from getent. +var getEntCommand = helpers.RunCommand //nolint:gochecknoglobals + +var ( + errMalformedGetentEntry = errors.New("malformed getent entry") + errInvalidUidsForProcess = errors.New("invalid uids for process") +) + +func init() { + pageSize = int64(os.Getpagesize()) + if pageSize <= 0 { + pageSize = 4096 // default value + } + + clockTicks = int64(cpu.ClocksPerSec) + if clockTicks <= 0 { + clockTicks = 100 // default value + } +} + +// getLinuxProcess returns a linux process snapshot, trying to reuse the data from a previous snapshot of the same +// process. +func getLinuxProcess(cachedCopy *linuxProcess, procFSRoot string, pid int32, privileged bool) (*linuxProcess, error) { + var gops *process.Process + var err error + + procStats, err := readProcStat(procFSRoot, pid) + if err != nil { + return nil, err + } + + // Reusing information from the last snapshot for the same process + // If the name or the PPID changed from the cachedCopy, we'll consider this sample is just + // a new process that shares the PID with an old one. + // if a process with the same Command but different CommandLine or User name + // occupies the same PID, the cache won't refresh the CommandLine and Username. + if cachedCopy == nil || procStats.command != cachedCopy.Command() || procStats.ppid != cachedCopy.Ppid() { + gops, err = process.NewProcess(pid) + if err != nil { + return nil, err + } + return &linuxProcess{ + privileged: privileged, + pid: pid, + process: gops, + stats: procStats, + procFSRoot: procFSRoot, + }, nil + } + + // Otherwise, instead of creating a new process snapshot, we just reuse the cachedCopy one, with updated data + cachedCopy.stats = procStats + + return cachedCopy, nil +} + +func (pw *linuxProcess) Pid() int32 { + return pw.pid +} + +func (pw *linuxProcess) Username() (string, error) { + var err error + if pw.user == "" { // caching user + // try to get it from gopsutil and return it if ok + pw.user, err = pw.process.Username() + if err == nil { + return pw.user, nil + } + + // get the uid to be retrieved from getent + uid, err := pw.uid() + if err != nil { + return "", err + } + + // try to get it using getent + pw.user, err = usernameFromGetent(uid) + if err != nil { + return "", err + } + } + return pw.user, nil +} + +func (pw *linuxProcess) uid() (int32, error) { + uuids, err := pw.process.Uids() + if err != nil { + return 0, fmt.Errorf("error getting process uids: %w", err) //nolint:wrapcheck + } + + if len(uuids) == 0 { + return 0, errInvalidUidsForProcess //nolint:wrapcheck + } + + return uuids[0], nil +} + +// usernameFromGetent returns the username using getent https://man7.org/linux/man-pages/man1/getent.1.html +// getent passwd format example: +// deleteme:x:63367:63367:Dynamic User:/:/usr/sbin/nologin +func usernameFromGetent(uid int32) (string, error) { + out, err := getEntCommand("/usr/bin/getent", "", []string{"passwd", fmt.Sprintf("%d", uid)}...) + if err != nil { + return "", err + } + + if sepIdx := strings.Index(out, ":"); sepIdx > 0 { + return out[0:sepIdx], nil + } + + return "", errMalformedGetentEntry //nolint:wrapcheck +} + +func (pw *linuxProcess) IOCounters() (*process.IOCountersStat, error) { + if !pw.privileged { + return nil, nil + } + return pw.process.IOCounters() +} + +// NumFDs returns the number of file descriptors. It returns -1 (and nil error) if the Agent does not have privileges to +// access this information. +func (pw *linuxProcess) NumFDs() (int32, error) { + if !pw.privileged { + return -1, nil + } + pid := pw.process.Pid + statPath := path.Join(pw.procFSRoot, strconv.Itoa(int(pid)), "fd") + d, err := os.Open(statPath) + if err != nil { + return 0, err + } + defer d.Close() + fnames, err := d.Readdirnames(-1) + return int32(len(fnames)), err +} + +///////////////////////////// +// Data to be derived from /proc//stat +///////////////////////////// + +type procStats struct { + command string + ppid int32 + numThreads int32 + state string + vmRSS int64 + vmSize int64 + cpu CPUInfo +} + +// /proc//stat standard field indices according to: http://man7.org/linux/man-pages/man5/proc.5.html +// because the first two fields are treated separately those indices are smaller with 2 elements than in the doc. +const ( + statState = 0 + statPPID = 1 + statUtime = 11 + statStime = 12 + statNumThreads = 17 + statVsize = 20 + statRss = 21 +) + +// readProcStat will gather information about the pid from /proc//stat file. +func readProcStat(procFSRoot string, pid int32) (procStats, error) { + statPath := path.Join(procFSRoot, strconv.Itoa(int(pid)), "stat") + + content, err := os.ReadFile(statPath) + if err != nil { + return procStats{}, err + } + + return parseProcStat(string(content)) +} + +// parseProcStat is used to parse the content of the /proc//stat file. +func parseProcStat(content string) (procStats, error) { + stats := procStats{} + + i := strings.Index(content, "(") + if i == -1 { + return stats, fmt.Errorf("could not find command name start symbol '(' for stats: %s", content) + } + // Drop the first first field which is the pid. + content = content[i+1:] + + i = strings.Index(content, ")") + if i == -1 { + return stats, fmt.Errorf("could not find command name end symbol ')' for stats: %s", content) + } + + // Command Name found as the second field inside the brackets. + stats.command = content[:i] + + fields := strings.Fields(content[i+1:]) + + // Process State + stats.state = fields[statState] + + // Parent PID + ppid, err := strconv.ParseInt(fields[statPPID], 10, 32) + if err != nil { + return stats, errors.Wrapf(err, "for stats: %s", content) + } + stats.ppid = int32(ppid) + + // User time + utime, err := strconv.ParseInt(fields[statUtime], 10, 64) + if err != nil { + return stats, errors.Wrapf(err, "for stats: %s", content) + } + stats.cpu.User = float64(utime) / float64(clockTicks) + + // System time + stime, err := strconv.ParseInt(fields[statStime], 10, 64) + if err != nil { + return stats, errors.Wrapf(err, "for stats: %s", content) + } + stats.cpu.System = float64(stime) / float64(clockTicks) + + // Number of threads + nthreads, err := strconv.ParseInt(fields[statNumThreads], 10, 32) + if err != nil { + return stats, errors.Wrapf(err, "for stats: %s", content) + } + stats.numThreads = int32(nthreads) + + // VM Memory size + stats.vmSize, err = strconv.ParseInt(fields[statVsize], 10, 64) + if err != nil { + return stats, errors.Wrapf(err, "for stats: %s", content) + } + + // VM RSS size + stats.vmRSS, err = strconv.ParseInt(fields[statRss], 10, 64) + if err != nil { + return stats, errors.Wrapf(err, "for stats: %s", content) + } + stats.vmRSS *= pageSize + + return stats, nil +} + +func (pw *linuxProcess) CPUTimes() (CPUInfo, error) { + now := time.Now() + + if pw.lastTime.IsZero() { + // invoked first time + pw.lastCPU = pw.stats.cpu + pw.lastTime = now + return pw.stats.cpu, nil + } + + // Calculate CPU percent from user time, system time, and last harvested cpu counters + numcpu := runtime.NumCPU() + delta := (now.Sub(pw.lastTime).Seconds()) * float64(numcpu) + pw.stats.cpu.Percent = calculatePercent(pw.lastCPU, pw.stats.cpu, delta, numcpu) + pw.lastCPU = pw.stats.cpu + pw.lastTime = now + + return pw.stats.cpu, nil +} + +func calculatePercent(t1, t2 CPUInfo, delta float64, numcpu int) float64 { + if delta == 0 { + return 0 + } + deltaProc := t2.User + t2.System - t1.User - t1.System + overallPercent := ((deltaProc / delta) * 100) * float64(numcpu) + return overallPercent +} + +func (pw *linuxProcess) Ppid() int32 { + return pw.stats.ppid +} + +func (pw *linuxProcess) NumThreads() int32 { + return pw.stats.numThreads +} + +func (pw *linuxProcess) Status() string { + return pw.stats.state +} + +func (pw *linuxProcess) VMRSS() int64 { + return pw.stats.vmRSS +} + +func (pw *linuxProcess) VMSize() int64 { + return pw.stats.vmSize +} + +func (pw *linuxProcess) Command() string { + return pw.stats.command +} + +////////////////////////// +// Data to be derived from /proc//cmdline: command line and arguments +////////////////////////// + +func (pw *linuxProcess) FetchCommandInfo() { + if pw.commandInfoFetched { + return + } + pw.commandInfoFetched = true + + cmdPath := path.Join(pw.procFSRoot, strconv.Itoa(int(pw.pid)), "cmdline") + procCmdline, err := os.ReadFile(cmdPath) + if err != nil { + procCmdline = nil // we can't be sure internal libraries return nil on error + } + + if len(procCmdline) == 0 { + return // zombie process + } + + // Ignoring dash on session commands + if procCmdline[0] == '-' { + procCmdline = procCmdline[1:] + } + + fullCommandLine := strings.Builder{} + // get command + procCmdline = sanitizeCommandLine(procCmdline) + + // get command args + procCmdline, pw.execPath, _ = getNextArg(procCmdline) + pw.execName = path.Base(pw.execPath) + + fullCommandLine.WriteString(pw.execPath) + for { + var arg string + var ok bool + procCmdline, arg, ok = getNextArg(procCmdline) + if !ok { + break + } + fullCommandLine.WriteByte(' ') + fullCommandLine.WriteString(arg) + pw.commandArgs = append(pw.commandArgs, arg) + } + pw.commandLine = fullCommandLine.String() +} + +// getNextArg consumes the next found argument from a /proc/*/cmdline string +// (where arguments are separated by the zero byte) +func getNextArg(procCmdline []byte) ([]byte, string, bool) { + if len(procCmdline) == 0 { + return nil, "", false + } + var arg []byte + for len(procCmdline) > 0 && procCmdline[0] != 0 { + arg = append(arg, procCmdline[0]) + procCmdline = procCmdline[1:] + } + // ignore the zero when it's an argument separator + if len(procCmdline) > 0 { + procCmdline = procCmdline[1:] + } + return procCmdline, string(arg), true +} + +// sanitizeCommandLine cleans the command line to remove wrappers like quotation marks. +func sanitizeCommandLine(cmd []byte) []byte { + return bytes.Trim(cmd, " \t\n\v\f\r\"'`") +} diff --git a/pkg/internal/infraolly/process/snapshot_test.go b/pkg/internal/infraolly/process/snapshot_test.go new file mode 100644 index 000000000..b7b3f8c48 --- /dev/null +++ b/pkg/internal/infraolly/process/snapshot_test.go @@ -0,0 +1,207 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//go:build linux + +package process + +import ( + "errors" + "fmt" + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/beyla/pkg/internal/helpers" +) + +func TestLinuxProcess_CmdLine(t *testing.T) { + hostProc := os.Getenv("HOST_PROC") + defer os.Setenv("HOST_PROC", hostProc) + tmpDir, err := os.MkdirTemp("", "proc") + require.NoError(t, err) + processDir := path.Join(tmpDir, "12345") + require.NoError(t, os.MkdirAll(processDir, 0o755)) + _ = os.Setenv("HOST_PROC", tmpDir) + + testCases := []struct { + rawProcCmdline []byte + expectedExec string + expectedExecPath string + expectedArgs []string + expectedCmdLine string + }{ + {[]byte{0}, "", "", nil, ""}, + {[]byte{'b', 'a', 's', 'h', 0}, "bash", "bash", nil, "bash"}, + {[]byte{'/', 'b', 'i', 'n', '/', 'b', 'a', 's', 'h', 0}, "bash", "/bin/bash", nil, "/bin/bash"}, + {[]byte{'/', 'b', 'i', 'n', '/', 'b', 'a', 's', 'h', 0, 'a', 'r', 'g', 0}, "bash", "/bin/bash", []string{"arg"}, "/bin/bash arg"}, + {[]byte{'-', '/', 'b', 'i', 'n', '/', 'b', 'a', 's', 'h', 0, 'a', 'r', 'g', 0}, "bash", "/bin/bash", []string{"arg"}, "/bin/bash arg"}, + { + []byte{'/', 'a', ' ', 'f', 'o', 'l', 'd', 'e', 'r', '/', 'c', 'm', 'd', 0, '-', 'a', 'g', 0, 'x', 'x', 0}, + "cmd", "/a folder/cmd", []string{"-ag", "xx"}, "/a folder/cmd -ag xx", + }, + } + for _, tc := range testCases { + require.NoError(t, os.WriteFile(path.Join(processDir, "cmdline"), tc.rawProcCmdline, 0o600)) + lp := linuxProcess{pid: 12345, procFSRoot: tmpDir} + lp.FetchCommandInfo() + assert.Equal(t, tc.expectedExecPath, lp.execPath) + assert.Equal(t, tc.expectedArgs, lp.commandArgs) + assert.Equal(t, tc.expectedCmdLine, lp.commandLine) + } +} + +// Test nonstandard implementations of the /proc//cmdline format, which don't use zeroes to separate nor +// end the command lines. (e.g. Nginx create processes whose cmdline is "nginx: master process /usr/sbin/nginx" +func TestLinuxProcess_CmdLine_NotStandard(t *testing.T) { + hostProc := os.Getenv("HOST_PROC") + defer os.Setenv("HOST_PROC", hostProc) + tmpDir, err := os.MkdirTemp("", "proc") + require.NoError(t, err) + processDir := path.Join(tmpDir, "12345") + require.NoError(t, os.MkdirAll(processDir, 0o755)) + _ = os.Setenv("HOST_PROC", tmpDir) + + testCases := []struct { + rawProcCmdline []byte + expected string + }{ + {[]byte("nginx: worker process"), "nginx: worker process"}, + {[]byte("nginx: master process /usr/sbin/nginx"), "nginx: master process /usr/sbin/nginx"}, + { + []byte("nginx: master process /usr/sbin/nginx -c /etc/nginx/nginx.conf"), + "nginx: master process /usr/sbin/nginx -c /etc/nginx/nginx.conf", + }, + } + for _, tc := range testCases { + require.NoError(t, os.WriteFile(path.Join(processDir, "cmdline"), tc.rawProcCmdline, 0o600)) + lp := linuxProcess{pid: 12345, procFSRoot: tmpDir} + + lp.FetchCommandInfo() + assert.Equal(t, tc.expected, lp.commandLine) + } +} + +func TestLinuxProcess_CmdLine_ProcessNotExist(t *testing.T) { + lp := linuxProcess{pid: 999999999} + lp.FetchCommandInfo() + assert.Empty(t, lp.execPath) + assert.Empty(t, lp.commandArgs) + assert.Empty(t, lp.commandLine) +} + +func TestParseProcStatMultipleWordsProcess(t *testing.T) { + content := `465 (node /home/ams-) S 7648 465 465 0 -1 4202496 85321 6128 0 0 378 60 9 2 20 0 11 0 6384148 1005015040 21241 18446744073709551615 4194304 36236634 140729243085280 140729243069424 140119099392231 0 0 4096 16898 18446744073709551615 0 0 17 1 0 0 0 0 0 38337168 38426896 57044992 140729243093258 140729243093333 140729243093333 140729243095018 0` + + expected := procStats{ + command: "node /home/ams-", + ppid: 7648, + numThreads: 11, + state: "S", + vmRSS: 87003136, + vmSize: 1005015040, + cpu: CPUInfo{ + Percent: 0, + User: 3.78, + System: 0.6, + }, + } + actual, err := parseProcStat(content) + assert.NoError(t, err) + + assert.Equal(t, expected, actual) +} + +func TestParseProcStatSingleWordProcess(t *testing.T) { + content := `1232 (foo-bar) S 1 1232 1232 0 -1 1077960960 4799 282681 88 142 24 15 193 94 20 0 12 0 1071 464912384 4490 18446744073709551615 1 1 0 0 0 0 0 0 2143420159 0 0 0 17 0 0 0 14 0 0 0 0 0 0 0 0 0 0` + + expected := procStats{ + command: "foo-bar", + ppid: 1, + numThreads: 12, + state: "S", + vmRSS: 18391040, + vmSize: 464912384, + cpu: CPUInfo{ + Percent: 0, + + User: 0.24, + System: 0.15, + }, + } + actual, err := parseProcStat(content) + assert.NoError(t, err) + + assert.Equal(t, expected, actual) +} + +func TestParseProcStatUntrimmedCommand(t *testing.T) { + cases := []struct { + input string + expected procStats + }{{ + input: "11155 (/usr/bin/spamd ) S 1 11155 11155 0 -1 1077944640 19696 1028 0 0 250 32 0 0 20 0 1 0 6285571 300249088 18439 18446744073709551615 4194304 4198572 140721992060048 140721992059288 139789215727443 0 0 4224 92163 18446744072271262725 0 0 17 1 0 0 0 0 0 6298944 6299796 18743296 140721992060730 140721992060807 140721992060807 140721992060905 0\n", + expected: procStats{command: "/usr/bin/spamd ", state: "S", ppid: 1, cpu: CPUInfo{User: 2.50, System: 0.32}, numThreads: 1, vmSize: 300249088, vmRSS: 18439 * pageSize}, + }, { + input: "11159 (spamd child) S 11155 11155 11155 0 -1 1077944384 459 0 0 0 1 0 0 0 20 0 1 0 6285738 300249088 17599 18446744073709551615 4194304 4198572 140721992060048 140721992059288 139789215727443 0 0 4224 2048 18446744072271262725 0 0 17 0 0 0 0 0 0 6298944 6299796 18743296 140721992060730 140721992060807 140721992060807 140721992060905 0\n", + expected: procStats{command: "spamd child", state: "S", ppid: 11155, cpu: CPUInfo{User: 0.01, System: 0}, numThreads: 1, vmSize: 300249088, vmRSS: 17599 * pageSize}, + }, { + input: "11160 ( spamd child) S 11155 11155 11155 0 -1 1077944384 459 0 0 0 0 0 0 0 20 0 1 0 6285738 300249088 17599 18446744073709551615 4194304 4198572 140721992060048 140721992059288 139789215727443 0 0 4224 2048 18446744072271262725 0 0 17 0 0 0 0 0 0 6298944 6299796 18743296 140721992060730 140721992060807 140721992060807 140721992060905 0\n", + expected: procStats{command: " spamd child", state: "S", ppid: 11155, cpu: CPUInfo{User: 0, System: 0}, numThreads: 1, vmSize: 300249088, vmRSS: 17599 * pageSize}, + }} + + for n, c := range cases { + t.Run(fmt.Sprint("test", n), func(t *testing.T) { + actual, err := parseProcStat(c.input) + assert.NoError(t, err) + assert.Equal(t, c.expected, actual) + }) + } +} + +func Test_usernameFromGetent(t *testing.T) { //nolint:paralleltest + testCases := []struct { + name string + getEntResult string + getEntError error + expectedUsername string + expectedError error + }{ + { + name: "happy path, user exists", + getEntResult: "deleteme:x:63367:63367:Dynamic User:/:/usr/sbin/nologin", + expectedUsername: "deleteme", + }, + { + name: "getent returns error (i.e. does not exist)", + getEntError: errors.New("some error"), + expectedUsername: "", + expectedError: errors.New("some error"), + }, + { + name: "getent returns unexpected formatted entry", + getEntResult: "this is an unexpected format", + expectedUsername: "", + expectedError: errMalformedGetentEntry, + }, + } + + for i := range testCases { + testCase := testCases[i] + t.Run(testCase.name, func(t *testing.T) { + getEntCommand = func(_, _ string, _ ...string) (string, error) { + return testCase.getEntResult, testCase.getEntError + } + defer func() { + getEntCommand = helpers.RunCommand + }() + + username, err := usernameFromGetent(123) + assert.Equal(t, testCase.expectedUsername, username) + assert.Equal(t, testCase.expectedError, err) + }) + } +} diff --git a/pkg/internal/infraolly/process/status.go b/pkg/internal/infraolly/process/status.go new file mode 100644 index 000000000..4310422e6 --- /dev/null +++ b/pkg/internal/infraolly/process/status.go @@ -0,0 +1,85 @@ +package process + +import ( + "log/slog" + + "go.opentelemetry.io/otel/attribute" + + "github.com/grafana/beyla/pkg/internal/export/attributes" + attr "github.com/grafana/beyla/pkg/internal/export/attributes/names" + "github.com/grafana/beyla/pkg/internal/svc" +) + +func pslog() *slog.Logger { + return slog.With("component", "process.Collector") +} + +// Status of a process after being harvested +type Status struct { + ProcessID int32 + Command string + CommandArgs []string + CommandLine string + ExecName string + ExecPath string + + User string + MemoryRSSBytes int64 + MemoryVMSBytes int64 + CPUPercent float64 + CPUUserPercent float64 + CPUSystemPercent float64 + Status string + ParentProcessID int32 + ThreadCount int32 + FdCount int32 + IOReadCount uint64 + IOWriteCount uint64 + IOReadBytes uint64 + IOWriteBytes uint64 + + Service *svc.ID +} + +func NewStatus(pid int32, svcID *svc.ID) *Status { + return &Status{ + ProcessID: pid, + Service: svcID, + } +} + +// nolint:cyclop +func OTELGetters(name attr.Name) (attributes.Getter[*Status, attribute.KeyValue], bool) { + var g attributes.Getter[*Status, attribute.KeyValue] + switch name { + case attr.ProcCommand: + g = func(s *Status) attribute.KeyValue { return attribute.Key(attr.ProcCommand).String(s.Command) } + case attr.ProcCommandLine: + g = func(s *Status) attribute.KeyValue { + return attribute.Key(attr.ProcCommandLine).String(s.CommandLine) + } + case attr.ProcExecName: + g = func(status *Status) attribute.KeyValue { + return attribute.Key(attr.ProcExecName).String(status.ExecName) + } + case attr.ProcExecPath: + g = func(status *Status) attribute.KeyValue { + return attribute.Key(attr.ProcExecPath).String(status.ExecPath) + } + case attr.ProcCommandArgs: + g = func(status *Status) attribute.KeyValue { + return attribute.Key(attr.ProcCommandArgs).StringSlice(status.CommandArgs) + } + case attr.ProcOwner: + g = func(s *Status) attribute.KeyValue { return attribute.Key(attr.ProcOwner).String(s.User) } + case attr.ProcParentPid: + g = func(s *Status) attribute.KeyValue { + return attribute.Key(attr.ProcParentPid).Int(int(s.ParentProcessID)) + } + case attr.ProcPid: + g = func(s *Status) attribute.KeyValue { + return attribute.Key(attr.ProcPid).Int(int(s.ProcessID)) + } + } + return g, g != nil +} diff --git a/pkg/internal/netolly/agent/pipeline.go b/pkg/internal/netolly/agent/pipeline.go index 011957811..e18b048d4 100644 --- a/pkg/internal/netolly/agent/pipeline.go +++ b/pkg/internal/netolly/agent/pipeline.go @@ -5,10 +5,10 @@ import ( "github.com/mariomac/pipes/pipe" + "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/filter" "github.com/grafana/beyla/pkg/internal/netolly/ebpf" "github.com/grafana/beyla/pkg/internal/netolly/export" - "github.com/grafana/beyla/pkg/internal/netolly/export/otel" "github.com/grafana/beyla/pkg/internal/netolly/export/prom" "github.com/grafana/beyla/pkg/internal/netolly/flow" "github.com/grafana/beyla/pkg/internal/netolly/transform/cidr" @@ -126,14 +126,14 @@ func (f *Flows) pipelineBuilder(ctx context.Context) (*pipe.Builder[*FlowsPipeli pipe.AddMiddleProvider(pb, rdns, func() (pipe.MiddleFunc[[]*ebpf.Record, []*ebpf.Record], error) { return flow.ReverseDNSProvider(&f.cfg.NetworkFlows.ReverseDNS) }) - pipe.AddMiddleProvider(pb, fltr, filter.ByAttribute(f.cfg.Filters.Network, ebpf.RecordGetters)) + pipe.AddMiddleProvider(pb, fltr, filter.ByAttribute(f.cfg.Filters.Network, ebpf.RecordStringGetters)) // Terminal nodes export the flow record information out of the pipeline: OTEL, Prom and printer. // Not all the nodes are mandatory here. Is the responsibility of each Provider function to decide // whether each node is going to be instantiated or just ignored. f.cfg.Attributes.Select.Normalize() pipe.AddFinalProvider(pb, otelExport, func() (pipe.FinalFunc[[]*ebpf.Record], error) { - return otel.MetricsExporterProvider(f.ctxInfo, &otel.MetricsConfig{ + return otel.NetMetricsExporterProvider(f.ctxInfo, &otel.NetMetricsConfig{ Metrics: &f.cfg.Metrics, AttributeSelectors: f.cfg.Attributes.Select, }) diff --git a/pkg/internal/netolly/ebpf/record_getters.go b/pkg/internal/netolly/ebpf/record_getters.go index daaf27122..4533363f7 100644 --- a/pkg/internal/netolly/ebpf/record_getters.go +++ b/pkg/internal/netolly/ebpf/record_getters.go @@ -1,7 +1,7 @@ package ebpf import ( - "strconv" + "go.opentelemetry.io/otel/attribute" "github.com/grafana/beyla/pkg/internal/export/attributes" attr "github.com/grafana/beyla/pkg/internal/export/attributes/names" @@ -10,35 +10,50 @@ import ( // RecordGetters returns the attributes.Getter function that returns the string value of a given // attribute name. -func RecordGetters(name attr.Name) (attributes.Getter[*Record, string], bool) { - var getter attributes.Getter[*Record, string] +func RecordGetters(name attr.Name) (attributes.Getter[*Record, attribute.KeyValue], bool) { + var getter attributes.Getter[*Record, attribute.KeyValue] switch name { case attr.BeylaIP: - getter = func(r *Record) string { return r.Attrs.BeylaIP } + getter = func(r *Record) attribute.KeyValue { return attribute.String(string(attr.BeylaIP), r.Attrs.BeylaIP) } case attr.Transport: - getter = func(r *Record) string { return transport.Protocol(r.Id.TransportProtocol).String() } + getter = func(r *Record) attribute.KeyValue { + return attribute.String(string(attr.Transport), transport.Protocol(r.Id.TransportProtocol).String()) + } case attr.SrcAddress: - getter = func(r *Record) string { return r.Id.SrcIP().IP().String() } + getter = func(r *Record) attribute.KeyValue { + return attribute.String(string(attr.SrcAddress), r.Id.SrcIP().IP().String()) + } case attr.DstAddres: - getter = func(r *Record) string { return r.Id.DstIP().IP().String() } + getter = func(r *Record) attribute.KeyValue { + return attribute.String(string(attr.DstAddres), r.Id.DstIP().IP().String()) + } case attr.SrcPort: - getter = func(r *Record) string { return strconv.FormatUint(uint64(r.Id.SrcPort), 10) } + getter = func(r *Record) attribute.KeyValue { return attribute.Int(string(attr.SrcPort), int(r.Id.SrcPort)) } case attr.DstPort: - getter = func(r *Record) string { return strconv.FormatUint(uint64(r.Id.DstPort), 10) } + getter = func(r *Record) attribute.KeyValue { return attribute.Int(string(attr.DstPort), int(r.Id.DstPort)) } case attr.SrcName: - getter = func(r *Record) string { return r.Attrs.SrcName } + getter = func(r *Record) attribute.KeyValue { return attribute.String(string(attr.SrcName), r.Attrs.SrcName) } case attr.DstName: - getter = func(r *Record) string { return r.Attrs.DstName } + getter = func(r *Record) attribute.KeyValue { return attribute.String(string(attr.DstName), r.Attrs.DstName) } case attr.Direction: - getter = func(r *Record) string { return directionStr(r.Id.Direction) } + getter = func(r *Record) attribute.KeyValue { + return attribute.String(string(attr.Direction), directionStr(r.Id.Direction)) + } case attr.Iface: - getter = func(r *Record) string { return r.Attrs.Interface } + getter = func(r *Record) attribute.KeyValue { return attribute.String(string(attr.Iface), r.Attrs.Interface) } default: - getter = func(r *Record) string { return r.Attrs.Metadata[name] } + getter = func(r *Record) attribute.KeyValue { return attribute.String(string(name), r.Attrs.Metadata[name]) } } return getter, getter != nil } +func RecordStringGetters(name attr.Name) (attributes.Getter[*Record, string], bool) { + if g, ok := RecordGetters(name); ok { + return func(r *Record) string { return g(r).Value.Emit() }, true + } + return nil, false +} + func directionStr(direction uint8) string { switch direction { case DirectionIngress: diff --git a/pkg/internal/netolly/export/otel/expirer.go b/pkg/internal/netolly/export/otel/expirer.go deleted file mode 100644 index 56cd16bab..000000000 --- a/pkg/internal/netolly/export/otel/expirer.go +++ /dev/null @@ -1,81 +0,0 @@ -package otel - -import ( - "context" - "log/slog" - "sync/atomic" - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - - "github.com/grafana/beyla/pkg/internal/export/attributes" - "github.com/grafana/beyla/pkg/internal/export/expire" - "github.com/grafana/beyla/pkg/internal/netolly/ebpf" -) - -var timeNow = time.Now - -func plog() *slog.Logger { - return slog.With("component", "otel.Expirer") -} - -// Expirer drops metrics from labels that haven't been updated during a given timeout -// TODO: generify and move to a common section for using it also in AppO11y, supporting more OTEL metrics -type Expirer struct { - attrs []attributes.Field[*ebpf.Record, string] - entries *expire.ExpiryMap[*Counter] -} - -type Counter struct { - attributes attribute.Set - val atomic.Int64 -} - -// NewExpirer creates a metric that wraps a Counter. Its labeled instances are dropped -// if they haven't been updated during the last timeout period -func NewExpirer(attrs []attributes.Field[*ebpf.Record, string], clock expire.Clock, expireTime time.Duration) *Expirer { - return &Expirer{ - attrs: attrs, - entries: expire.NewExpiryMap[*Counter](clock, expireTime), - } -} - -// ForRecord returns the Counter for the given eBPF record. If that record -// s accessed for the first time, a new Counter is created. -// If not, a cached copy is returned and the "last access" cache time is updated. -func (ex *Expirer) ForRecord(m *ebpf.Record) *Counter { - recordAttrs, attrValues := ex.recordAttributes(m) - return ex.entries.GetOrCreate(attrValues, func() *Counter { - plog().With("labelValues", attrValues).Debug("storing new metric label set") - return &Counter{ - attributes: recordAttrs, - } - }) -} - -func (ex *Expirer) Collect(_ context.Context, observer metric.Int64Observer) error { - log := plog() - log.Debug("invoking metrics collection") - old := ex.entries.DeleteExpired() - log.With("labelValues", old).Debug("deleting old OTEL metric") - - for _, v := range ex.entries.All() { - observer.Observe(v.val.Load(), metric.WithAttributeSet(v.attributes)) - } - - return nil -} - -func (ex *Expirer) recordAttributes(m *ebpf.Record) (attribute.Set, []string) { - keyVals := make([]attribute.KeyValue, 0, len(ex.attrs)) - vals := make([]string, 0, len(ex.attrs)) - - for _, attr := range ex.attrs { - val := attr.Get(m) - keyVals = append(keyVals, attribute.String(attr.ExposedName, val)) - vals = append(vals, val) - } - - return attribute.NewSet(keyVals...), vals -} diff --git a/pkg/internal/netolly/export/prom/prom.go b/pkg/internal/netolly/export/prom/prom.go index 1f287eeed..647fa2378 100644 --- a/pkg/internal/netolly/export/prom/prom.go +++ b/pkg/internal/netolly/export/prom/prom.go @@ -82,7 +82,7 @@ func newReporter( } attrs := attributes.PrometheusGetters( - ebpf.RecordGetters, + ebpf.RecordStringGetters, provider.For(attributes.BeylaNetworkFlow)) labelNames := make([]string, 0, len(attrs)) diff --git a/pkg/internal/pipe/instrumenter.go b/pkg/internal/pipe/instrumenter.go index 446a7c04c..e58023e5f 100644 --- a/pkg/internal/pipe/instrumenter.go +++ b/pkg/internal/pipe/instrumenter.go @@ -41,6 +41,8 @@ type nodesMap struct { Prometheus pipe.Final[[]request.Span] Printer pipe.Final[[]request.Span] Noop pipe.Final[[]request.Span] + + ProcessReport pipe.Final[[]request.Span] } // Connect must specify how the above nodes are connected. Nodes that are disabled @@ -51,7 +53,7 @@ func (n *nodesMap) Connect() { n.Routes.SendTo(n.Kubernetes) n.Kubernetes.SendTo(n.NameResolver) n.NameResolver.SendTo(n.AttributeFilter) - n.AttributeFilter.SendTo(n.AlloyTraces, n.Metrics, n.Traces, n.Prometheus, n.Printer, n.Noop) + n.AttributeFilter.SendTo(n.AlloyTraces, n.Metrics, n.Traces, n.Prometheus, n.Printer, n.Noop, n.ProcessReport) } // accessor functions to each field. Grouped here for code brevity during the pipeline build @@ -66,6 +68,7 @@ func otelTraces(n *nodesMap) *pipe.Final[[]request.Span] { re func printer(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Printer } func prometheus(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Prometheus } func noop(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Noop } +func processReport(n *nodesMap) *pipe.Final[[]request.Span] { return &n.ProcessReport } // builder with injectable instantiators for unit testing type graphFunctions struct { @@ -119,6 +122,10 @@ func newGraphBuilder(ctx context.Context, config *beyla.Config, ctxInfo *global. pipe.AddFinalProvider(gnb, noop, debug.NoopNode(config.Noop)) pipe.AddFinalProvider(gnb, printer, debug.PrinterNode(config.Printer)) + // process subpipeline will start another pipeline only to collect and export data + // about the processes of an instrumented application + pipe.AddFinalProvider(gnb, processReport, SubPipelineProvider(ctx, ctxInfo, config)) + // The returned builder later invokes its "Build" function that, given // the contents of the nodesMap struct, will instantiate // and interconnect each node according to the SendTo invocations in the diff --git a/pkg/internal/pipe/proc_pipeline.go b/pkg/internal/pipe/proc_pipeline.go new file mode 100644 index 000000000..a1788c5fa --- /dev/null +++ b/pkg/internal/pipe/proc_pipeline.go @@ -0,0 +1,71 @@ +package pipe + +import ( + "context" + "fmt" + "slices" + + "github.com/mariomac/pipes/pipe" + + "github.com/grafana/beyla/pkg/beyla" + "github.com/grafana/beyla/pkg/internal/export/otel" + "github.com/grafana/beyla/pkg/internal/infraolly/process" + "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/request" +) + +// processSubPipeline is actually a part of the Application Observability pipeline. +// Its management is moved here because it's only activated if the process +// metrics are activated. +type processSubPipeline struct { + Collector pipe.Start[[]*process.Status] + OtelExport pipe.Final[[]*process.Status] + // TODO: add prometheus exporter +} + +func procCollect(sp *processSubPipeline) *pipe.Start[[]*process.Status] { return &sp.Collector } +func otelExport(sp *processSubPipeline) *pipe.Final[[]*process.Status] { return &sp.OtelExport } + +func (sp *processSubPipeline) Connect() { + sp.Collector.SendTo(sp.OtelExport) +} + +// the sub-pipe is enabled only if there is a metrics exporter enabled, +// and both the "application" and "application_process" features are enabled +func isSubPipeEnabled(cfg *beyla.Config) bool { + return (cfg.Metrics.EndpointEnabled() && cfg.Metrics.OTelMetricsEnabled() && + slices.Contains(cfg.Metrics.Features, otel.FeatureProcess)) || + (cfg.Prometheus.EndpointEnabled() && cfg.Prometheus.OTelMetricsEnabled() && + slices.Contains(cfg.Prometheus.Features, otel.FeatureProcess)) +} + +// SubPipelineProvider returns a Final node that actually has a pipeline inside. +// It is manually connected through a channel +func SubPipelineProvider(ctx context.Context, ctxInfo *global.ContextInfo, cfg *beyla.Config) pipe.FinalProvider[[]request.Span] { + return func() (pipe.FinalFunc[[]request.Span], error) { + if !isSubPipeEnabled(cfg) { + return pipe.IgnoreFinal[[]request.Span](), nil + } + connectorChan := make(chan []request.Span, cfg.ChannelBufferLen) + var connector <-chan []request.Span = connectorChan + nb := pipe.NewBuilder(&processSubPipeline{}, pipe.ChannelBufferLen(cfg.ChannelBufferLen)) + pipe.AddStartProvider(nb, procCollect, process.NewCollectorProvider(ctx, &connector, &cfg.Processes)) + pipe.AddFinalProvider(nb, otelExport, otel.ProcMetricsExporterProvider(ctx, ctxInfo, + &otel.ProcMetricsConfig{ + Metrics: &cfg.Metrics, + AttributeSelectors: cfg.Attributes.Select, + })) + + runner, err := nb.Build() + if err != nil { + return nil, fmt.Errorf("creating process subpipeline: %w", err) + } + return func(in <-chan []request.Span) { + // connect the input channel of this final node to the input of the + // process collector + connector = in + runner.Start() + <-ctx.Done() + }, nil + } +} diff --git a/pkg/internal/svc/svc.go b/pkg/internal/svc/svc.go index 029a514b3..1b97c2e0e 100644 --- a/pkg/internal/svc/svc.go +++ b/pkg/internal/svc/svc.go @@ -64,6 +64,11 @@ type ID struct { Instance string Metadata map[attr.Name]string + + // ProcPID is the PID of the instrumented process as seen by Beyla's /proc filesystem. + // It is stored here at process discovery time, because it might differ form the + // UserPID and HostPID fields of the request.PidInfo struct. + ProcPID int32 } func (i *ID) String() string { diff --git a/test/integration/configs/instrumenter-config-java.yml b/test/integration/configs/instrumenter-config-java.yml index 0d6b7f87e..dfc7a4157 100644 --- a/test/integration/configs/instrumenter-config-java.yml +++ b/test/integration/configs/instrumenter-config-java.yml @@ -4,3 +4,7 @@ routes: unmatched: path otel_metrics_export: endpoint: http://otelcol:4318 +attributes: + select: + process_cpu_utilization: + include: ["*"] \ No newline at end of file diff --git a/test/integration/docker-compose-python.yml b/test/integration/docker-compose-python.yml index 6e97994c8..84f6ea723 100644 --- a/test/integration/docker-compose-python.yml +++ b/test/integration/docker-compose-python.yml @@ -41,6 +41,8 @@ services: BEYLA_METRICS_REPORT_PEER: "true" BEYLA_HOSTNAME: "beyla" BEYLA_BPF_HTTP_REQUEST_TIMEOUT: "5s" + BEYLA_PROCESSES_INTERVAL: "100ms" + BEYLA_OTEL_METRICS_FEATURES: "application,application_process" depends_on: testserver: condition: service_started diff --git a/test/integration/process_test.go b/test/integration/process_test.go new file mode 100644 index 000000000..4f978d07b --- /dev/null +++ b/test/integration/process_test.go @@ -0,0 +1,31 @@ +//go:build integration + +package integration + +import ( + "testing" + + "github.com/mariomac/guara/pkg/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/beyla/test/integration/components/prom" +) + +func testProcesses(attribMatcher map[string]string) func(t *testing.T) { + return func(t *testing.T) { + pq := prom.Client{HostPort: prometheusHostPort} + var results []prom.Result + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`process_cpu_utilization_ratio`) + require.NoError(t, err) + assert.NotEmpty(t, results) + for _, result := range results { + for k, v := range attribMatcher { + assert.Equalf(t, v, result.Metric[k], "attribute %v expected to be %v", k, v) + } + } + }) + } +} diff --git a/test/integration/suites_test.go b/test/integration/suites_test.go index 91f321967..ee57743e5 100644 --- a/test/integration/suites_test.go +++ b/test/integration/suites_test.go @@ -400,6 +400,12 @@ func TestSuite_Python(t *testing.T) { require.NoError(t, compose.Up()) t.Run("Python RED metrics", testREDMetricsPythonHTTP) t.Run("Python RED metrics with timeouts", testREDMetricsTimeoutPythonHTTP) + t.Run("Checking process metrics", testProcesses(map[string]string{ + "process_executable_name": "python", + "process_executable_path": "/usr/local/bin/python", + "process_command": "gunicorn", + "process_command_line": "/usr/local/bin/python /usr/local/bin/gunicorn -w 4 -b 0.0.0.0:8380 main:app --timeout 90", + })) t.Run("BPF pinning folder mounted", testBPFPinningMounted) require.NoError(t, compose.Close()) t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted)