Skip to content

Commit

Permalink
Fixes process metrics when the application has not sent/received any …
Browse files Browse the repository at this point in the history
…request (#1225)

* Send ProcessAlive signal to instrument processes

* integration tests

* Rename SpanSignalsShortcut

* Add failing integration test for K8s metadata decoration

* Fix kubernetes decoration
  • Loading branch information
mariomac committed Oct 9, 2024
1 parent 98352a7 commit d572793
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 32 deletions.
4 changes: 3 additions & 1 deletion pkg/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,9 @@ func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) {
for spans := range input {
for i := range spans {
s := &spans[i]

if s.InternalSignal() {
continue
}
// If we are ignoring this span because of route patterns, don't do anything
if s.IgnoreMetrics() {
continue
Expand Down
3 changes: 3 additions & 0 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
for spans := range in {
for i := range spans {
span := &spans[i]
if span.InternalSignal() {
continue
}
if tr.spanDiscarded(span) {
continue
}
Expand Down
1 change: 0 additions & 1 deletion pkg/export/prom/expirer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (ex *Expirer[T]) Describe(descs chan<- *prometheus.Desc) {
// Collect wraps prometheus.Collector Wrap method
func (ex *Expirer[T]) Collect(metrics chan<- prometheus.Metric) {
log := plog()
log.Debug("invoking metrics collection")
for _, old := range ex.entries.DeleteExpired() {
ex.wrapped.DeleteLabelValues(old.labelVals...)
log.With("labelValues", old).Debug("deleting old Prometheus metric")
Expand Down
3 changes: 3 additions & 0 deletions pkg/export/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ func (r *metricsReporter) otelSpanObserved(span *request.Span) bool {

// nolint:cyclop
func (r *metricsReporter) observe(span *request.Span) {
if span.InternalSignal() {
return
}
t := span.Timings()
r.beylaInfo.WithLabelValues(span.ServiceID.SDKLanguage.String()).metric.Set(1.0)
duration := t.End.Sub(t.RequestStart).Seconds()
Expand Down
6 changes: 1 addition & 5 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type Instrumenter struct {

// tracesInput is used to communicate the found traces between the ProcessFinder and
// the ProcessTracer.
// TODO: When we split beyla into two executables, probably the BPF map
// should be the traces' communication mechanism instead of a native channel
tracesInput chan []request.Span
}

Expand All @@ -47,7 +45,7 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config)
// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument() error {
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo)
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo, i.tracesInput)
foundProcesses, deletedProcesses, err := finder.Start()
if err != nil {
return fmt.Errorf("couldn't start Process Finder: %w", err)
Expand Down Expand Up @@ -95,8 +93,6 @@ func (i *Instrumenter) ReadAndForward() error {
log := log()
log.Debug("creating instrumentation pipeline")

// TODO: when we split the executable, tracer should be reconstructed somehow
// from this instance
bp, err := pipe.Build(i.ctx, i.config, i.ctxInfo, i.tracesInput)
if err != nil {
return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err)
Expand Down
39 changes: 34 additions & 5 deletions pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/helpers/maps"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)

Expand All @@ -38,6 +39,13 @@ type TraceAttacher struct {
// keeps a copy of all the tracers for a given executable path
existingTracers map[uint64]*ebpf.ProcessTracer
reusableTracer *ebpf.ProcessTracer

// Usually, only ebpf.Tracer implementations will send spans data to the read decorator.
// But on each new process, we will send a "process alive" span type to the read decorator, whose
// unique purpose is to notify other parts of the system that this process is active, even
// if no spans are detected. This would allow, for example, to start instrumenting this process
// from the Process metrics pipeline even before it starts to do/receive requests.
SpanSignalsShortcut chan<- []request.Span
}

func TraceAttacherProvider(ta *TraceAttacher) pipe.FinalProvider[[]Event[Instrumentable]] {
Expand Down Expand Up @@ -95,10 +103,10 @@ func (ta *TraceAttacher) getTracer(ie *Instrumentable) (*ebpf.ProcessTracer, boo
"exec", ie.FileInfo.CmdExePath)
ie.FileInfo.Service.SDKLanguage = ie.Type
// allowing the tracer to forward traces from the new PID and its children processes
monitorPIDs(tracer, ie)
ta.monitorPIDs(tracer, ie)
ta.Metrics.InstrumentProcess(ie.FileInfo.ExecutableName())
if tracer.Type == ebpf.Generic {
monitorPIDs(ta.reusableTracer, ie)
ta.monitorPIDs(ta.reusableTracer, ie)
}
ta.log.Debug(".done")
return nil, false
Expand Down Expand Up @@ -170,11 +178,11 @@ func (ta *TraceAttacher) getTracer(ie *Instrumentable) (*ebpf.ProcessTracer, boo
"child", ie.ChildPids,
"exec", ie.FileInfo.CmdExePath)
// allowing the tracer to forward traces from the discovered PID and its children processes
monitorPIDs(tracer, ie)
ta.monitorPIDs(tracer, ie)
ta.existingTracers[ie.FileInfo.Ino] = tracer
if tracer.Type == ebpf.Generic {
if ta.reusableTracer != nil {
monitorPIDs(ta.reusableTracer, ie)
ta.monitorPIDs(ta.reusableTracer, ie)
} else {
ta.reusableTracer = tracer
}
Expand All @@ -191,7 +199,7 @@ func (ta *TraceAttacher) genericTracers() []ebpf.Tracer {
return newNonGoTracersGroup(ta.Cfg, ta.Metrics)
}

func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) {
func (ta *TraceAttacher) monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) {
// If the user does not override the service name via configuration
// the service name is the name of the found executable
// Unless the case of system-wide tracing, where the name of the
Expand All @@ -208,6 +216,27 @@ func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) {
for _, pid := range ie.ChildPids {
tracer.AllowPID(pid, ie.FileInfo.Ns, &ie.FileInfo.Service)
}
if ta.SpanSignalsShortcut != nil {
spans := make([]request.Span, 0, len(ie.ChildPids)+1)
// the forwarded signal must include
// - ServiceID, which includes several metadata about the process
// - PID namespace, to allow further kubernetes decoration
spans = append(spans, request.Span{
Type: request.EventTypeProcessAlive,
ServiceID: ie.FileInfo.Service,
Pid: request.PidInfo{Namespace: ie.FileInfo.Ns},
})
for _, pid := range ie.ChildPids {
service := ie.FileInfo.Service
service.ProcPID = int32(pid)
spans = append(spans, request.Span{
Type: request.EventTypeProcessAlive,
ServiceID: service,
Pid: request.PidInfo{Namespace: ie.FileInfo.Ns},
})
}
ta.SpanSignalsShortcut <- spans
}
}

// BuildPinPath pinpath must be unique for a given executable group
Expand Down
23 changes: 13 additions & 10 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/grafana/beyla/pkg/internal/ebpf/sarama"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
)

type ProcessFinder struct {
ctx context.Context
cfg *beyla.Config
ctxInfo *global.ContextInfo
ctx context.Context
cfg *beyla.Config
ctxInfo *global.ContextInfo
tracesInput chan<- []request.Span
}

// nodesMap stores ProcessFinder pipeline architecture
Expand Down Expand Up @@ -60,8 +62,8 @@ func containerDBUpdater(pf *nodesMap) *pipe.Middle[[]Event[Instrumentable], []Ev
}
func traceAttacher(pf *nodesMap) *pipe.Final[[]Event[Instrumentable]] { return &pf.TraceAttacher }

func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
return &ProcessFinder{ctx: ctx, cfg: cfg, ctxInfo: ctxInfo}
func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo, tracesInput chan<- []request.Span) *ProcessFinder {
return &ProcessFinder{ctx: ctx, cfg: cfg, ctxInfo: ctxInfo, tracesInput: tracesInput}
}

// Start the ProcessFinder pipeline in background. It returns a channel where each new discovered
Expand All @@ -79,11 +81,12 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.ProcessTracer, <-chan *Instrument
pipe.AddMiddleProvider(gb, containerDBUpdater,
ContainerDBUpdaterProvider(pf.ctxInfo.K8sInformer.IsKubeEnabled(), pf.ctxInfo.AppO11y.K8sDatabase))
pipe.AddFinalProvider(gb, traceAttacher, TraceAttacherProvider(&TraceAttacher{
Cfg: pf.cfg,
Ctx: pf.ctx,
DiscoveredTracers: discoveredTracers,
DeleteTracers: deleteTracers,
Metrics: pf.ctxInfo.Metrics,
Cfg: pf.cfg,
Ctx: pf.ctx,
DiscoveredTracers: discoveredTracers,
DeleteTracers: deleteTracers,
Metrics: pf.ctxInfo.Metrics,
SpanSignalsShortcut: pf.tracesInput,
}))
pipeline, err := gb.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/infraolly/process/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (ps *Collector) Run(out chan<- []*Status) {
for {
select {
case <-ps.ctx.Done():
ps.log.Debug("exiting")
// exiting
case spans := <-newPids:
// updating PIDs map with spans information
for i := range spans {
Expand Down
12 changes: 11 additions & 1 deletion pkg/internal/request/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ type EventType uint8
// The following consts need to coincide with some C identifiers:
// EVENT_HTTP_REQUEST, EVENT_GRPC_REQUEST, EVENT_HTTP_CLIENT, EVENT_GRPC_CLIENT, EVENT_SQL_CLIENT
const (
EventTypeHTTP EventType = iota + 1
// EventTypeProcessAlive is an internal signal. It will be ignored by the metrics exporters.
EventTypeProcessAlive EventType = iota
EventTypeHTTP
EventTypeGRPC
EventTypeHTTPClient
EventTypeGRPCClient
Expand All @@ -41,6 +43,8 @@ const (

func (t EventType) String() string {
switch t {
case EventTypeProcessAlive:
return "ProcessAlive"
case EventTypeHTTP:
return "HTTP"
case EventTypeGRPC:
Expand Down Expand Up @@ -150,6 +154,12 @@ func (s *Span) Inside(parent *Span) bool {
return s.RequestStart >= parent.RequestStart && s.End <= parent.End
}

// InternalSignal returns whether a span is not aimed to be exported as a metric
// or a trace, because it's used to internally send messages through the pipeline.
func (s *Span) InternalSignal() bool {
return s.Type == EventTypeProcessAlive
}

// helper attribute functions used by JSON serialization
type SpanAttributes map[string]string

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ scrape_configs:
- 'beyla-testserver:8999'
- 'beyla-pinger:8999'
- 'beyla-netolly:8999'
- 'beyla-promscrape:8999'
2 changes: 1 addition & 1 deletion test/integration/docker-compose-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
BEYLA_HOSTNAME: "beyla"
BEYLA_BPF_HTTP_REQUEST_TIMEOUT: "5s"
BEYLA_PROCESSES_INTERVAL: "100ms"
BEYLA_OTEL_METRICS_FEATURES: "application,application_process"
BEYLA_OTEL_METRICS_FEATURES: "application,application_process,application_span,application_service_graph"
depends_on:
testserver:
condition: service_started
Expand Down
100 changes: 100 additions & 0 deletions test/integration/k8s/manifests/06-beyla-all-processes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: beyla-config
data:
beyla-config.yml: |
prometheus_export:
port: 8999
features:
- application
- application_process
attributes:
select:
process_cpu_utilization:
include: ["*"]
exclude: ["cpu_mode"]
process_cpu_time:
include: ["*"]
exclude: ["cpu_mode"]
process_memory_usage:
include: ["*"]
process_memory_virtual:
include: ["*"]
process_disk_io:
include: ["*"]
process_network_io:
include: ["*"]
kubernetes:
enable: true
cluster_name: beyla
trace_printer: text
log_level: debug
discovery:
services:
- k8s_deployment_name: (otherinstance|testserver)
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: beyla
spec:
selector:
matchLabels:
instrumentation: beyla
template:
metadata:
labels:
instrumentation: beyla
# this label will trigger a deletion of beyla pods before tearing down
# kind, to force Beyla writing the coverage data
teardown: delete
spec:
hostPID: true #important!
serviceAccountName: beyla
volumes:
- name: beyla-config
configMap:
name: beyla-config
- name: testoutput
persistentVolumeClaim:
claimName: testoutput
containers:
- name: beyla
image: beyla:dev
imagePullPolicy: Never # loaded into Kind from localhost
args: ["--config=/config/beyla-config.yml"]
securityContext:
privileged: true
runAsUser: 0
volumeMounts:
- mountPath: /config
name: beyla-config
- mountPath: /testoutput
name: testoutput
env:
- name: GOCOVERDIR
value: "/testoutput"
- name: BEYLA_DISCOVERY_POLL_INTERVAL
value: "500ms"
- name: BEYLA_METRICS_INTERVAL
value: "10ms"
- name: BEYLA_BPF_BATCH_TIMEOUT
value: "10ms"
ports:
- containerPort: 8999
name: prometheus
protocol: TCP
---
---
kind: Service
apiVersion: v1
metadata:
name: beyla-promscrape
spec:
selector:
instrumentation: beyla
ports:
- port: 8999
name: prometheus
protocol: TCP
Loading

0 comments on commit d572793

Please sign in to comment.