diff --git a/pkg/export/otel/metrics.go b/pkg/export/otel/metrics.go index 2a0720ac8..bbed747c3 100644 --- a/pkg/export/otel/metrics.go +++ b/pkg/export/otel/metrics.go @@ -848,7 +848,9 @@ func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) { for spans := range input { for i := range spans { s := &spans[i] - + if s.InternalSignal() { + continue + } // If we are ignoring this span because of route patterns, don't do anything if s.IgnoreMetrics() { continue diff --git a/pkg/export/otel/traces.go b/pkg/export/otel/traces.go index 0f2a4e807..4782422a8 100644 --- a/pkg/export/otel/traces.go +++ b/pkg/export/otel/traces.go @@ -203,6 +203,9 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err for spans := range in { for i := range spans { span := &spans[i] + if span.InternalSignal() { + continue + } if tr.spanDiscarded(span) { continue } diff --git a/pkg/export/prom/expirer.go b/pkg/export/prom/expirer.go index 9c2149eca..0b6d2a787 100644 --- a/pkg/export/prom/expirer.go +++ b/pkg/export/prom/expirer.go @@ -61,7 +61,6 @@ func (ex *Expirer[T]) Describe(descs chan<- *prometheus.Desc) { // Collect wraps prometheus.Collector Wrap method func (ex *Expirer[T]) Collect(metrics chan<- prometheus.Metric) { log := plog() - log.Debug("invoking metrics collection") for _, old := range ex.entries.DeleteExpired() { ex.wrapped.DeleteLabelValues(old.labelVals...) log.With("labelValues", old).Debug("deleting old Prometheus metric") diff --git a/pkg/export/prom/prom.go b/pkg/export/prom/prom.go index e72826d9e..89916b717 100644 --- a/pkg/export/prom/prom.go +++ b/pkg/export/prom/prom.go @@ -583,6 +583,9 @@ func (r *metricsReporter) otelSpanObserved(span *request.Span) bool { // nolint:cyclop func (r *metricsReporter) observe(span *request.Span) { + if span.InternalSignal() { + return + } t := span.Timings() r.beylaInfo.WithLabelValues(span.ServiceID.SDKLanguage.String()).metric.Set(1.0) duration := t.End.Sub(t.RequestStart).Seconds() diff --git a/pkg/internal/appolly/appolly.go b/pkg/internal/appolly/appolly.go index 6b19f366a..4528e7786 100644 --- a/pkg/internal/appolly/appolly.go +++ b/pkg/internal/appolly/appolly.go @@ -28,8 +28,6 @@ type Instrumenter struct { // tracesInput is used to communicate the found traces between the ProcessFinder and // the ProcessTracer. - // TODO: When we split beyla into two executables, probably the BPF map - // should be the traces' communication mechanism instead of a native channel tracesInput chan []request.Span } @@ -47,7 +45,7 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) // FindAndInstrument searches in background for any new executable matching the // selection criteria. func (i *Instrumenter) FindAndInstrument() error { - finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo) + finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo, i.tracesInput) foundProcesses, deletedProcesses, err := finder.Start() if err != nil { return fmt.Errorf("couldn't start Process Finder: %w", err) @@ -95,8 +93,6 @@ func (i *Instrumenter) ReadAndForward() error { log := log() log.Debug("creating instrumentation pipeline") - // TODO: when we split the executable, tracer should be reconstructed somehow - // from this instance bp, err := pipe.Build(i.ctx, i.config, i.ctxInfo, i.tracesInput) if err != nil { return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err) diff --git a/pkg/internal/discover/attacher.go b/pkg/internal/discover/attacher.go index 828ac3469..9bdff8a62 100644 --- a/pkg/internal/discover/attacher.go +++ b/pkg/internal/discover/attacher.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/helpers/maps" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -38,6 +39,13 @@ type TraceAttacher struct { // keeps a copy of all the tracers for a given executable path existingTracers map[uint64]*ebpf.ProcessTracer reusableTracer *ebpf.ProcessTracer + + // Usually, only ebpf.Tracer implementations will send spans data to the read decorator. + // But on each new process, we will send a "process alive" span type to the read decorator, whose + // unique purpose is to notify other parts of the system that this process is active, even + // if no spans are detected. This would allow, for example, to start instrumenting this process + // from the Process metrics pipeline even before it starts to do/receive requests. + SpanSignalsShortcut chan<- []request.Span } func TraceAttacherProvider(ta *TraceAttacher) pipe.FinalProvider[[]Event[Instrumentable]] { @@ -95,10 +103,10 @@ func (ta *TraceAttacher) getTracer(ie *Instrumentable) (*ebpf.ProcessTracer, boo "exec", ie.FileInfo.CmdExePath) ie.FileInfo.Service.SDKLanguage = ie.Type // allowing the tracer to forward traces from the new PID and its children processes - monitorPIDs(tracer, ie) + ta.monitorPIDs(tracer, ie) ta.Metrics.InstrumentProcess(ie.FileInfo.ExecutableName()) if tracer.Type == ebpf.Generic { - monitorPIDs(ta.reusableTracer, ie) + ta.monitorPIDs(ta.reusableTracer, ie) } ta.log.Debug(".done") return nil, false @@ -170,11 +178,11 @@ func (ta *TraceAttacher) getTracer(ie *Instrumentable) (*ebpf.ProcessTracer, boo "child", ie.ChildPids, "exec", ie.FileInfo.CmdExePath) // allowing the tracer to forward traces from the discovered PID and its children processes - monitorPIDs(tracer, ie) + ta.monitorPIDs(tracer, ie) ta.existingTracers[ie.FileInfo.Ino] = tracer if tracer.Type == ebpf.Generic { if ta.reusableTracer != nil { - monitorPIDs(ta.reusableTracer, ie) + ta.monitorPIDs(ta.reusableTracer, ie) } else { ta.reusableTracer = tracer } @@ -191,7 +199,7 @@ func (ta *TraceAttacher) genericTracers() []ebpf.Tracer { return newNonGoTracersGroup(ta.Cfg, ta.Metrics) } -func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) { +func (ta *TraceAttacher) monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) { // If the user does not override the service name via configuration // the service name is the name of the found executable // Unless the case of system-wide tracing, where the name of the @@ -208,6 +216,27 @@ func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) { for _, pid := range ie.ChildPids { tracer.AllowPID(pid, ie.FileInfo.Ns, &ie.FileInfo.Service) } + if ta.SpanSignalsShortcut != nil { + spans := make([]request.Span, 0, len(ie.ChildPids)+1) + // the forwarded signal must include + // - ServiceID, which includes several metadata about the process + // - PID namespace, to allow further kubernetes decoration + spans = append(spans, request.Span{ + Type: request.EventTypeProcessAlive, + ServiceID: ie.FileInfo.Service, + Pid: request.PidInfo{Namespace: ie.FileInfo.Ns}, + }) + for _, pid := range ie.ChildPids { + service := ie.FileInfo.Service + service.ProcPID = int32(pid) + spans = append(spans, request.Span{ + Type: request.EventTypeProcessAlive, + ServiceID: service, + Pid: request.PidInfo{Namespace: ie.FileInfo.Ns}, + }) + } + ta.SpanSignalsShortcut <- spans + } } // BuildPinPath pinpath must be unique for a given executable group diff --git a/pkg/internal/discover/finder.go b/pkg/internal/discover/finder.go index 2834f677e..089628ff3 100644 --- a/pkg/internal/discover/finder.go +++ b/pkg/internal/discover/finder.go @@ -19,12 +19,14 @@ import ( "github.com/grafana/beyla/pkg/internal/ebpf/sarama" "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/request" ) type ProcessFinder struct { - ctx context.Context - cfg *beyla.Config - ctxInfo *global.ContextInfo + ctx context.Context + cfg *beyla.Config + ctxInfo *global.ContextInfo + tracesInput chan<- []request.Span } // nodesMap stores ProcessFinder pipeline architecture @@ -60,8 +62,8 @@ func containerDBUpdater(pf *nodesMap) *pipe.Middle[[]Event[Instrumentable], []Ev } func traceAttacher(pf *nodesMap) *pipe.Final[[]Event[Instrumentable]] { return &pf.TraceAttacher } -func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo) *ProcessFinder { - return &ProcessFinder{ctx: ctx, cfg: cfg, ctxInfo: ctxInfo} +func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo, tracesInput chan<- []request.Span) *ProcessFinder { + return &ProcessFinder{ctx: ctx, cfg: cfg, ctxInfo: ctxInfo, tracesInput: tracesInput} } // Start the ProcessFinder pipeline in background. It returns a channel where each new discovered @@ -79,11 +81,12 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.ProcessTracer, <-chan *Instrument pipe.AddMiddleProvider(gb, containerDBUpdater, ContainerDBUpdaterProvider(pf.ctxInfo.K8sInformer.IsKubeEnabled(), pf.ctxInfo.AppO11y.K8sDatabase)) pipe.AddFinalProvider(gb, traceAttacher, TraceAttacherProvider(&TraceAttacher{ - Cfg: pf.cfg, - Ctx: pf.ctx, - DiscoveredTracers: discoveredTracers, - DeleteTracers: deleteTracers, - Metrics: pf.ctxInfo.Metrics, + Cfg: pf.cfg, + Ctx: pf.ctx, + DiscoveredTracers: discoveredTracers, + DeleteTracers: deleteTracers, + Metrics: pf.ctxInfo.Metrics, + SpanSignalsShortcut: pf.tracesInput, })) pipeline, err := gb.Build() if err != nil { diff --git a/pkg/internal/infraolly/process/collect.go b/pkg/internal/infraolly/process/collect.go index e9f90e9c5..a48081803 100644 --- a/pkg/internal/infraolly/process/collect.go +++ b/pkg/internal/infraolly/process/collect.go @@ -78,7 +78,7 @@ func (ps *Collector) Run(out chan<- []*Status) { for { select { case <-ps.ctx.Done(): - ps.log.Debug("exiting") + // exiting case spans := <-newPids: // updating PIDs map with spans information for i := range spans { diff --git a/pkg/internal/request/span.go b/pkg/internal/request/span.go index cc511febe..1b39df211 100644 --- a/pkg/internal/request/span.go +++ b/pkg/internal/request/span.go @@ -21,7 +21,9 @@ type EventType uint8 // The following consts need to coincide with some C identifiers: // EVENT_HTTP_REQUEST, EVENT_GRPC_REQUEST, EVENT_HTTP_CLIENT, EVENT_GRPC_CLIENT, EVENT_SQL_CLIENT const ( - EventTypeHTTP EventType = iota + 1 + // EventTypeProcessAlive is an internal signal. It will be ignored by the metrics exporters. + EventTypeProcessAlive EventType = iota + EventTypeHTTP EventTypeGRPC EventTypeHTTPClient EventTypeGRPCClient @@ -41,6 +43,8 @@ const ( func (t EventType) String() string { switch t { + case EventTypeProcessAlive: + return "ProcessAlive" case EventTypeHTTP: return "HTTP" case EventTypeGRPC: @@ -150,6 +154,12 @@ func (s *Span) Inside(parent *Span) bool { return s.RequestStart >= parent.RequestStart && s.End <= parent.End } +// InternalSignal returns whether a span is not aimed to be exported as a metric +// or a trace, because it's used to internally send messages through the pipeline. +func (s *Span) InternalSignal() bool { + return s.Type == EventTypeProcessAlive +} + // helper attribute functions used by JSON serialization type SpanAttributes map[string]string diff --git a/test/integration/configs/prometheus-config-promscrape-k8s-test.yml b/test/integration/configs/prometheus-config-promscrape-k8s-test.yml index 422452dda..db0c356c3 100644 --- a/test/integration/configs/prometheus-config-promscrape-k8s-test.yml +++ b/test/integration/configs/prometheus-config-promscrape-k8s-test.yml @@ -11,3 +11,4 @@ scrape_configs: - 'beyla-testserver:8999' - 'beyla-pinger:8999' - 'beyla-netolly:8999' + - 'beyla-promscrape:8999' \ No newline at end of file diff --git a/test/integration/docker-compose-python.yml b/test/integration/docker-compose-python.yml index 527afe5cb..68c41ae61 100644 --- a/test/integration/docker-compose-python.yml +++ b/test/integration/docker-compose-python.yml @@ -42,7 +42,7 @@ services: BEYLA_HOSTNAME: "beyla" BEYLA_BPF_HTTP_REQUEST_TIMEOUT: "5s" BEYLA_PROCESSES_INTERVAL: "100ms" - BEYLA_OTEL_METRICS_FEATURES: "application,application_process" + BEYLA_OTEL_METRICS_FEATURES: "application,application_process,application_span,application_service_graph" depends_on: testserver: condition: service_started diff --git a/test/integration/k8s/manifests/06-beyla-all-processes.yml b/test/integration/k8s/manifests/06-beyla-all-processes.yml new file mode 100644 index 000000000..950b34985 --- /dev/null +++ b/test/integration/k8s/manifests/06-beyla-all-processes.yml @@ -0,0 +1,100 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: beyla-config +data: + beyla-config.yml: | + prometheus_export: + port: 8999 + features: + - application + - application_process + attributes: + select: + process_cpu_utilization: + include: ["*"] + exclude: ["cpu_mode"] + process_cpu_time: + include: ["*"] + exclude: ["cpu_mode"] + process_memory_usage: + include: ["*"] + process_memory_virtual: + include: ["*"] + process_disk_io: + include: ["*"] + process_network_io: + include: ["*"] + kubernetes: + enable: true + cluster_name: beyla + trace_printer: text + log_level: debug + discovery: + services: + - k8s_deployment_name: (otherinstance|testserver) +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: beyla +spec: + selector: + matchLabels: + instrumentation: beyla + template: + metadata: + labels: + instrumentation: beyla + # this label will trigger a deletion of beyla pods before tearing down + # kind, to force Beyla writing the coverage data + teardown: delete + spec: + hostPID: true #important! + serviceAccountName: beyla + volumes: + - name: beyla-config + configMap: + name: beyla-config + - name: testoutput + persistentVolumeClaim: + claimName: testoutput + containers: + - name: beyla + image: beyla:dev + imagePullPolicy: Never # loaded into Kind from localhost + args: ["--config=/config/beyla-config.yml"] + securityContext: + privileged: true + runAsUser: 0 + volumeMounts: + - mountPath: /config + name: beyla-config + - mountPath: /testoutput + name: testoutput + env: + - name: GOCOVERDIR + value: "/testoutput" + - name: BEYLA_DISCOVERY_POLL_INTERVAL + value: "500ms" + - name: BEYLA_METRICS_INTERVAL + value: "10ms" + - name: BEYLA_BPF_BATCH_TIMEOUT + value: "10ms" + ports: + - containerPort: 8999 + name: prometheus + protocol: TCP +--- +--- +kind: Service +apiVersion: v1 +metadata: + name: beyla-promscrape +spec: + selector: + instrumentation: beyla + ports: + - port: 8999 + name: prometheus + protocol: TCP \ No newline at end of file diff --git a/test/integration/k8s/process_notraces/k8s_process_notraces_test.go b/test/integration/k8s/process_notraces/k8s_process_notraces_test.go new file mode 100644 index 000000000..597ec0cf5 --- /dev/null +++ b/test/integration/k8s/process_notraces/k8s_process_notraces_test.go @@ -0,0 +1,78 @@ +//go:build integration_k8s + +package prom + +import ( + "context" + "log/slog" + "os" + "testing" + "time" + + "github.com/mariomac/guara/pkg/test" + "github.com/stretchr/testify/require" + "sigs.k8s.io/e2e-framework/pkg/envconf" + "sigs.k8s.io/e2e-framework/pkg/features" + + "github.com/grafana/beyla/test/integration/components/docker" + "github.com/grafana/beyla/test/integration/components/kube" + "github.com/grafana/beyla/test/integration/components/prom" + k8s "github.com/grafana/beyla/test/integration/k8s/common" + "github.com/grafana/beyla/test/tools" +) + +var cluster *kube.Kind + +// TestMain is run once before all the tests in the package. If you need to mount a different cluster for +// a different test suite, you should add a new TestMain in a new package together with the new test suite +func TestMain(m *testing.M) { + if err := docker.Build(os.Stdout, tools.ProjectDir(), + docker.ImageBuild{Tag: "testserver:dev", Dockerfile: k8s.DockerfileTestServer}, + docker.ImageBuild{Tag: "beyla:dev", Dockerfile: k8s.DockerfileBeyla}, + docker.ImageBuild{Tag: "grpcpinger:dev", Dockerfile: k8s.DockerfilePinger}, + docker.ImageBuild{Tag: "httppinger:dev", Dockerfile: k8s.DockerfileHTTPPinger}, + docker.ImageBuild{Tag: "quay.io/prometheus/prometheus:v2.53.0"}, + ); err != nil { + slog.Error("can't build docker images", "error", err) + os.Exit(-1) + } + + cluster = kube.NewKind("test-kind-cluster-process-notraces", + kube.ExportLogs(k8s.PathKindLogs), + kube.KindConfig(k8s.PathManifests+"/00-kind.yml"), + kube.LocalImage("testserver:dev"), + kube.LocalImage("beyla:dev"), + kube.LocalImage("quay.io/prometheus/prometheus:v2.53.0"), + kube.Deploy(k8s.PathManifests+"/01-volumes.yml"), + kube.Deploy(k8s.PathManifests+"/01-serviceaccount.yml"), + kube.Deploy(k8s.PathManifests+"/02-prometheus-promscrape.yml"), + kube.Deploy(k8s.PathManifests+"/05-uninstrumented-service.yml"), + kube.Deploy(k8s.PathManifests+"/06-beyla-all-processes.yml"), + ) + + cluster.Run(m) +} + +// will test that process metrics are decorated correctly with all the metadata, even when +// Beyla hasn't instrumented still any single trace +func TestProcessMetrics_NoTraces(t *testing.T) { + cluster.TestEnv().Test(t, + waitForSomeMetrics(), + k8s.FeatureProcessMetricsDecoration(nil)) +} + +const prometheusHostPort = "localhost:39090" + +func waitForSomeMetrics() features.Feature { + return features.New("wait for some metrics to appear before starting the actual test"). + Assess("smoke test", func(ctx context.Context, t *testing.T, config *envconf.Config) context.Context { + pq := prom.Client{HostPort: prometheusHostPort} + // timeout needs to be high, as the K8s cluster needs to be spinned up at this right moment + test.Eventually(t, 5*time.Minute, func(t require.TestingT) { + results, err := pq.Query("process_cpu_time_seconds_total") + require.NoError(t, err) + require.NotEmpty(t, results) + }) + return ctx + }).Feature() +} diff --git a/test/integration/suites_test.go b/test/integration/suites_test.go index 49d2899db..68584a884 100644 --- a/test/integration/suites_test.go +++ b/test/integration/suites_test.go @@ -225,17 +225,19 @@ func TestSuite_PrometheusScrape(t *testing.T) { require.NoError(t, err) require.NoError(t, compose.Up()) - t.Run("RED metrics", testREDMetricsHTTP) - t.Run("GRPC RED metrics", testREDMetricsGRPC) - t.Run("Internal Prometheus metrics", testInternalPrometheusExport) - t.Run("Testing Beyla Build Info metric", testPrometheusBeylaBuildInfo) - t.Run("Testing for no Beyla self metrics", testPrometheusNoBeylaEvents) + // checking process metrics before any other test lets us verify that even if an application + // hasn't received any request, their processes are still instrumented t.Run("Testing process-level metrics", testProcesses(map[string]string{ "process_executable_name": "testserver", "process_executable_path": "/testserver", "process_command": "testserver", "process_command_line": "/testserver", })) + t.Run("RED metrics", testREDMetricsHTTP) + t.Run("GRPC RED metrics", testREDMetricsGRPC) + t.Run("Internal Prometheus metrics", testInternalPrometheusExport) + t.Run("Testing Beyla Build Info metric", testPrometheusBeylaBuildInfo) + t.Run("Testing for no Beyla self metrics", testPrometheusNoBeylaEvents) t.Run("BPF pinning folder mounted", testBPFPinningMounted) require.NoError(t, compose.Close()) @@ -414,14 +416,16 @@ func TestSuite_Python(t *testing.T) { compose.Env = append(compose.Env, `BEYLA_OPEN_PORT=8380`, `BEYLA_EXECUTABLE_NAME=`, `TEST_SERVICE_PORTS=8381:8380`) require.NoError(t, err) require.NoError(t, compose.Up()) - t.Run("Python RED metrics", testREDMetricsPythonHTTP) - t.Run("Python RED metrics with timeouts", testREDMetricsTimeoutPythonHTTP) + // checking process metrics before any other test lets us verify that even if an application + // hasn't received any request, their processes are still instrumented t.Run("Checking process metrics", testProcesses(map[string]string{ "process_executable_name": "python", "process_executable_path": "/usr/local/bin/python", "process_command": "gunicorn", "process_command_line": "/usr/local/bin/python /usr/local/bin/gunicorn -w 4 -b 0.0.0.0:8380 main:app --timeout 90", })) + t.Run("Python RED metrics", testREDMetricsPythonHTTP) + t.Run("Python RED metrics with timeouts", testREDMetricsTimeoutPythonHTTP) t.Run("BPF pinning folder mounted", testBPFPinningMounted) require.NoError(t, compose.Close()) t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted)