Skip to content

Commit

Permalink
Fixes process metrics when the application has not sent/received any …
Browse files Browse the repository at this point in the history
…request (#1225)

* Send ProcessAlive signal to instrument processes

* integration tests

* Rename SpanSignalsShortcut

* Add failing integration test for K8s metadata decoration

* Fix kubernetes decoration
  • Loading branch information
mariomac committed Oct 8, 2024
1 parent 98352a7 commit b7c1684
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 27 deletions.
4 changes: 3 additions & 1 deletion pkg/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,9 @@ func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) {
for spans := range input {
for i := range spans {
s := &spans[i]

if s.InternalSignal() {
continue
}
// If we are ignoring this span because of route patterns, don't do anything
if s.IgnoreMetrics() {
continue
Expand Down
3 changes: 3 additions & 0 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
for spans := range in {
for i := range spans {
span := &spans[i]
if span.InternalSignal() {
continue
}
if tr.spanDiscarded(span) {
continue
}
Expand Down
1 change: 0 additions & 1 deletion pkg/export/prom/expirer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (ex *Expirer[T]) Describe(descs chan<- *prometheus.Desc) {
// Collect wraps prometheus.Collector Wrap method
func (ex *Expirer[T]) Collect(metrics chan<- prometheus.Metric) {
log := plog()
log.Debug("invoking metrics collection")
for _, old := range ex.entries.DeleteExpired() {
ex.wrapped.DeleteLabelValues(old.labelVals...)
log.With("labelValues", old).Debug("deleting old Prometheus metric")
Expand Down
3 changes: 3 additions & 0 deletions pkg/export/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ func (r *metricsReporter) otelSpanObserved(span *request.Span) bool {

// nolint:cyclop
func (r *metricsReporter) observe(span *request.Span) {
if span.InternalSignal() {
return
}
t := span.Timings()
r.beylaInfo.WithLabelValues(span.ServiceID.SDKLanguage.String()).metric.Set(1.0)
duration := t.End.Sub(t.RequestStart).Seconds()
Expand Down
6 changes: 1 addition & 5 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type Instrumenter struct {

// tracesInput is used to communicate the found traces between the ProcessFinder and
// the ProcessTracer.
// TODO: When we split beyla into two executables, probably the BPF map
// should be the traces' communication mechanism instead of a native channel
tracesInput chan []request.Span
}

Expand All @@ -47,7 +45,7 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config)
// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument() error {
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo)
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo, i.tracesInput)
foundProcesses, deletedProcesses, err := finder.Start()
if err != nil {
return fmt.Errorf("couldn't start Process Finder: %w", err)
Expand Down Expand Up @@ -95,8 +93,6 @@ func (i *Instrumenter) ReadAndForward() error {
log := log()
log.Debug("creating instrumentation pipeline")

// TODO: when we split the executable, tracer should be reconstructed somehow
// from this instance
bp, err := pipe.Build(i.ctx, i.config, i.ctxInfo, i.tracesInput)
if err != nil {
return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/helpers/maps"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)

Expand All @@ -38,6 +39,13 @@ type TraceAttacher struct {
// keeps a copy of all the tracers for a given executable path
existingTracers map[uint64]*ebpf.ProcessTracer
reusableTracer *ebpf.ProcessTracer

// Usually, only ebpf.Tracer implementations will send spans data to the read decorator.
// But on each new process, we will send a "process alive" span type to the read decorator, whose
// unique purpose is to notify other parts of the system that this process is active, even
// if no spans are detected. This would allow, for example, to start instrumenting this process
// from the Process metrics pipeline even before it starts to do/receive requests.
SpanSignalsShortcut chan<- []request.Span
}

func TraceAttacherProvider(ta *TraceAttacher) pipe.FinalProvider[[]Event[Instrumentable]] {
Expand Down
23 changes: 13 additions & 10 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/grafana/beyla/pkg/internal/ebpf/sarama"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
)

type ProcessFinder struct {
ctx context.Context
cfg *beyla.Config
ctxInfo *global.ContextInfo
ctx context.Context
cfg *beyla.Config
ctxInfo *global.ContextInfo
tracesInput chan<- []request.Span
}

// nodesMap stores ProcessFinder pipeline architecture
Expand Down Expand Up @@ -60,8 +62,8 @@ func containerDBUpdater(pf *nodesMap) *pipe.Middle[[]Event[Instrumentable], []Ev
}
func traceAttacher(pf *nodesMap) *pipe.Final[[]Event[Instrumentable]] { return &pf.TraceAttacher }

func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
return &ProcessFinder{ctx: ctx, cfg: cfg, ctxInfo: ctxInfo}
func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo, tracesInput chan<- []request.Span) *ProcessFinder {
return &ProcessFinder{ctx: ctx, cfg: cfg, ctxInfo: ctxInfo, tracesInput: tracesInput}
}

// Start the ProcessFinder pipeline in background. It returns a channel where each new discovered
Expand All @@ -79,11 +81,12 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.ProcessTracer, <-chan *Instrument
pipe.AddMiddleProvider(gb, containerDBUpdater,
ContainerDBUpdaterProvider(pf.ctxInfo.K8sInformer.IsKubeEnabled(), pf.ctxInfo.AppO11y.K8sDatabase))
pipe.AddFinalProvider(gb, traceAttacher, TraceAttacherProvider(&TraceAttacher{
Cfg: pf.cfg,
Ctx: pf.ctx,
DiscoveredTracers: discoveredTracers,
DeleteTracers: deleteTracers,
Metrics: pf.ctxInfo.Metrics,
Cfg: pf.cfg,
Ctx: pf.ctx,
DiscoveredTracers: discoveredTracers,
DeleteTracers: deleteTracers,
Metrics: pf.ctxInfo.Metrics,
SpanSignalsShortcut: pf.tracesInput,
}))
pipeline, err := gb.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/infraolly/process/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (ps *Collector) Run(out chan<- []*Status) {
for {
select {
case <-ps.ctx.Done():
ps.log.Debug("exiting")
// exiting
case spans := <-newPids:
// updating PIDs map with spans information
for i := range spans {
Expand Down
12 changes: 11 additions & 1 deletion pkg/internal/request/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ type EventType uint8
// The following consts need to coincide with some C identifiers:
// EVENT_HTTP_REQUEST, EVENT_GRPC_REQUEST, EVENT_HTTP_CLIENT, EVENT_GRPC_CLIENT, EVENT_SQL_CLIENT
const (
EventTypeHTTP EventType = iota + 1
// EventTypeProcessAlive is an internal signal. It will be ignored by the metrics exporters.
EventTypeProcessAlive EventType = iota
EventTypeHTTP
EventTypeGRPC
EventTypeHTTPClient
EventTypeGRPCClient
Expand All @@ -41,6 +43,8 @@ const (

func (t EventType) String() string {
switch t {
case EventTypeProcessAlive:
return "ProcessAlive"
case EventTypeHTTP:
return "HTTP"
case EventTypeGRPC:
Expand Down Expand Up @@ -150,6 +154,12 @@ func (s *Span) Inside(parent *Span) bool {
return s.RequestStart >= parent.RequestStart && s.End <= parent.End
}

// InternalSignal returns whether a span is not aimed to be exported as a metric
// or a trace, because it's used to internally send messages through the pipeline.
func (s *Span) InternalSignal() bool {
return s.Type == EventTypeProcessAlive
}

// helper attribute functions used by JSON serialization
type SpanAttributes map[string]string

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ scrape_configs:
- 'beyla-testserver:8999'
- 'beyla-pinger:8999'
- 'beyla-netolly:8999'
- 'beyla-promscrape:8999'
2 changes: 1 addition & 1 deletion test/integration/docker-compose-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
BEYLA_HOSTNAME: "beyla"
BEYLA_BPF_HTTP_REQUEST_TIMEOUT: "5s"
BEYLA_PROCESSES_INTERVAL: "100ms"
BEYLA_OTEL_METRICS_FEATURES: "application,application_process"
BEYLA_OTEL_METRICS_FEATURES: "application,application_process,application_span,application_service_graph"
depends_on:
testserver:
condition: service_started
Expand Down
100 changes: 100 additions & 0 deletions test/integration/k8s/manifests/06-beyla-all-processes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: beyla-config
data:
beyla-config.yml: |
prometheus_export:
port: 8999
features:
- application
- application_process
attributes:
select:
process_cpu_utilization:
include: ["*"]
exclude: ["cpu_mode"]
process_cpu_time:
include: ["*"]
exclude: ["cpu_mode"]
process_memory_usage:
include: ["*"]
process_memory_virtual:
include: ["*"]
process_disk_io:
include: ["*"]
process_network_io:
include: ["*"]
kubernetes:
enable: true
cluster_name: beyla
trace_printer: text
log_level: debug
discovery:
services:
- k8s_deployment_name: (otherinstance|testserver)
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: beyla
spec:
selector:
matchLabels:
instrumentation: beyla
template:
metadata:
labels:
instrumentation: beyla
# this label will trigger a deletion of beyla pods before tearing down
# kind, to force Beyla writing the coverage data
teardown: delete
spec:
hostPID: true #important!
serviceAccountName: beyla
volumes:
- name: beyla-config
configMap:
name: beyla-config
- name: testoutput
persistentVolumeClaim:
claimName: testoutput
containers:
- name: beyla
image: beyla:dev
imagePullPolicy: Never # loaded into Kind from localhost
args: ["--config=/config/beyla-config.yml"]
securityContext:
privileged: true
runAsUser: 0
volumeMounts:
- mountPath: /config
name: beyla-config
- mountPath: /testoutput
name: testoutput
env:
- name: GOCOVERDIR
value: "/testoutput"
- name: BEYLA_DISCOVERY_POLL_INTERVAL
value: "500ms"
- name: BEYLA_METRICS_INTERVAL
value: "10ms"
- name: BEYLA_BPF_BATCH_TIMEOUT
value: "10ms"
ports:
- containerPort: 8999
name: prometheus
protocol: TCP
---
---
kind: Service
apiVersion: v1
metadata:
name: beyla-promscrape
spec:
selector:
instrumentation: beyla
ports:
- port: 8999
name: prometheus
protocol: TCP
78 changes: 78 additions & 0 deletions test/integration/k8s/process_notraces/k8s_process_notraces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//go:build integration_k8s

package prom

import (
"context"
"log/slog"
"os"
"testing"
"time"

"github.com/mariomac/guara/pkg/test"
"github.com/stretchr/testify/require"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/kube"
"github.com/grafana/beyla/test/integration/components/prom"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
"github.com/grafana/beyla/test/tools"
)

var cluster *kube.Kind

// TestMain is run once before all the tests in the package. If you need to mount a different cluster for
// a different test suite, you should add a new TestMain in a new package together with the new test suite
func TestMain(m *testing.M) {
if err := docker.Build(os.Stdout, tools.ProjectDir(),
docker.ImageBuild{Tag: "testserver:dev", Dockerfile: k8s.DockerfileTestServer},
docker.ImageBuild{Tag: "beyla:dev", Dockerfile: k8s.DockerfileBeyla},
docker.ImageBuild{Tag: "grpcpinger:dev", Dockerfile: k8s.DockerfilePinger},
docker.ImageBuild{Tag: "httppinger:dev", Dockerfile: k8s.DockerfileHTTPPinger},
docker.ImageBuild{Tag: "quay.io/prometheus/prometheus:v2.53.0"},
); err != nil {
slog.Error("can't build docker images", "error", err)
os.Exit(-1)
}

cluster = kube.NewKind("test-kind-cluster-process-notraces",
kube.ExportLogs(k8s.PathKindLogs),
kube.KindConfig(k8s.PathManifests+"/00-kind.yml"),
kube.LocalImage("testserver:dev"),
kube.LocalImage("beyla:dev"),
kube.LocalImage("quay.io/prometheus/prometheus:v2.53.0"),
kube.Deploy(k8s.PathManifests+"/01-volumes.yml"),
kube.Deploy(k8s.PathManifests+"/01-serviceaccount.yml"),
kube.Deploy(k8s.PathManifests+"/02-prometheus-promscrape.yml"),
kube.Deploy(k8s.PathManifests+"/05-uninstrumented-service.yml"),
kube.Deploy(k8s.PathManifests+"/06-beyla-all-processes.yml"),
)

cluster.Run(m)
}

// will test that process metrics are decorated correctly with all the metadata, even when
// Beyla hasn't instrumented still any single trace
func TestProcessMetrics_NoTraces(t *testing.T) {
cluster.TestEnv().Test(t,
waitForSomeMetrics(),
k8s.FeatureProcessMetricsDecoration(nil))
}

const prometheusHostPort = "localhost:39090"

func waitForSomeMetrics() features.Feature {
return features.New("wait for some metrics to appear before starting the actual test").
Assess("smoke test", func(ctx context.Context, t *testing.T, config *envconf.Config) context.Context {
pq := prom.Client{HostPort: prometheusHostPort}
// timeout needs to be high, as the K8s cluster needs to be spinned up at this right moment
test.Eventually(t, 5*time.Minute, func(t require.TestingT) {
results, err := pq.Query("process_cpu_time_seconds_total")
require.NoError(t, err)
require.NotEmpty(t, results)
})
return ctx
}).Feature()
}
Loading

0 comments on commit b7c1684

Please sign in to comment.