Skip to content

Commit

Permalink
Reorganizing Beyla configuration (#584)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Jan 30, 2024
1 parent 78e18f4 commit ec757bd
Show file tree
Hide file tree
Showing 17 changed files with 67 additions and 81 deletions.
3 changes: 2 additions & 1 deletion cmd/beyla/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

otelsdk "go.opentelemetry.io/otel/sdk"

"github.com/grafana/beyla/pkg/appobserv"
"github.com/grafana/beyla/pkg/beyla"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func main() {
// in two parts:
// 1st executable - Invoke FindTarget, which also mounts the BPF maps
// 2nd executable - Invoke ReadAndForward, receiving the BPF map mountpoint as argument
instr := beyla.New(config)
instr := appobserv.New(config)
if err := instr.FindAndInstrument(ctx); err != nil {
slog.Error("Beyla couldn't find target process", "error", err)
os.Exit(-1)
Expand Down
33 changes: 8 additions & 25 deletions pkg/beyla/beyla.go → pkg/appobserv/appobserv.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// Package beyla provides public access to Beyla as a library. All the other subcomponents
// Package appobserv provides public access to Beyla application observability as a library. All the other subcomponents
// of Beyla are hidden.
package beyla
package appobserv

import (
"context"
"fmt"
"io"
"log/slog"

"k8s.io/client-go/kubernetes"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/connector"
"github.com/grafana/beyla/pkg/internal/discover"
"github.com/grafana/beyla/pkg/internal/imetrics"
Expand All @@ -21,17 +21,14 @@ import (
"github.com/grafana/beyla/pkg/internal/transform/kube"
)

// Config as provided by the user to configure and run Beyla
type Config pipe.Config

func log() *slog.Logger {
return slog.With("component", "beyla.Instrumenter")
}

// Instrumenter finds and instrument a service/process, and forwards the traces as
// configured by the user
type Instrumenter struct {
config *pipe.Config
config *beyla.Config
ctxInfo *global.ContextInfo

// tracesInput is used to communicate the found traces between the ProcessFinder and
Expand All @@ -42,28 +39,14 @@ type Instrumenter struct {
}

// New Instrumenter, given a Config
func New(config *Config) *Instrumenter {
func New(config *beyla.Config) *Instrumenter {
return &Instrumenter{
config: (*pipe.Config)(config),
ctxInfo: buildContextInfo((*pipe.Config)(config)),
config: config,
ctxInfo: buildContextInfo(config),
tracesInput: make(chan []request.Span, config.ChannelBufferLen),
}
}

// LoadConfig loads and validates configuration.
// Configuration from multiple source is overridden in the following order
// (from less to most priority):
// 1 - Default configuration
// 2 - Contents of the provided file reader (nillable)
// 3 - Environment variables
func LoadConfig(reader io.Reader) (*Config, error) {
cfg, err := pipe.LoadConfig(reader)
if err != nil {
return nil, err
}
return (*Config)(cfg), nil
}

// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
Expand Down Expand Up @@ -133,7 +116,7 @@ func (i *Instrumenter) ReadAndForward(ctx context.Context) error {

// buildContextInfo populates some globally shared components and properties
// from the user-provided configuration
func buildContextInfo(config *pipe.Config) *global.ContextInfo {
func buildContextInfo(config *beyla.Config) *global.ContextInfo {
promMgr := &connector.PrometheusManager{}
k8sCfg := &config.Attributes.Kubernetes
ctxInfo := &global.ContextInfo{
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/pipe/config.go → pkg/beyla/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pipe
package beyla

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pipe
package beyla

import (
"bytes"
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/cilium/ebpf/link"
"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/helpers"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/svc"
)

Expand All @@ -23,7 +23,7 @@ import (
// instrumenting the executable
type TraceAttacher struct {
log *slog.Logger
Cfg *pipe.Config
Cfg *beyla.Config
Ctx context.Context
DiscoveredTracers chan *ebpf.ProcessTracer
DeleteTracers chan *Instrumentable
Expand Down Expand Up @@ -178,10 +178,10 @@ func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) {
}
}

// pinpath must be unique for a given executable group
// BuildPinPath pinpath must be unique for a given executable group
// it will be:
// - current beyla PID
func BuildPinPath(cfg *pipe.Config) string {
func BuildPinPath(cfg *beyla.Config) string {
return path.Join(cfg.EBPF.BpfBaseDir, fmt.Sprintf("beyla-%d", os.Getpid()))
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/mariomac/pipes/pkg/graph"
"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/ebpf/goruntime"
"github.com/grafana/beyla/pkg/internal/ebpf/gosql"
Expand All @@ -15,7 +16,6 @@ import (
"github.com/grafana/beyla/pkg/internal/ebpf/httpssl"
"github.com/grafana/beyla/pkg/internal/ebpf/nethttp"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/pipe/global"
)

Expand All @@ -33,7 +33,7 @@ type ProcessFinder struct {
TraceAttacher
}

func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
processFinder := ProcessFinder{
ProcessWatcher: ProcessWatcher{Ctx: ctx, Cfg: cfg},
CriteriaMatcher: CriteriaMatcher{Cfg: cfg},
Expand All @@ -55,7 +55,7 @@ func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.Con

// Start the ProcessFinder pipeline in background. It returns a channel where each new discovered
// ebpf.ProcessTracer will be notified.
func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
func (pf *ProcessFinder) Start(cfg *beyla.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen))
graph.RegisterStart(gb, ProcessWatcherProvider)
graph.RegisterMiddle(gb, WatcherKubeEnricherProvider)
Expand All @@ -74,7 +74,7 @@ func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-
// auxiliary functions to instantiate the go and non-go tracers on diverse steps of the
// discovery pipeline

func newGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer {
func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
// Each program is an eBPF source: net/http, grpc...
return []ebpf.Tracer{
nethttp.New(&cfg.EBPF, metrics),
Expand All @@ -85,10 +85,10 @@ func newGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Trace
}
}

func newNonGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer {
func newNonGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
return []ebpf.Tracer{httpfltr.New(cfg, metrics), httpssl.New(cfg, metrics)}
}

func newNonGoTracersGroupUProbes(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer {
func newNonGoTracersGroupUProbes(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
return []ebpf.Tracer{httpssl.New(cfg, metrics)}
}
6 changes: 3 additions & 3 deletions pkg/internal/discover/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/mariomac/pipes/pkg/node"
"github.com/shirou/gopsutil/process"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/pipe"
)

// CriteriaMatcher filters the processes that match the discovery criteria.
type CriteriaMatcher struct {
Cfg *pipe.Config
Cfg *beyla.Config
}

func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processAttrs], []Event[ProcessMatch]], error) {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (m *matcher) matchByAttributes(actual map[string]string, required map[strin
return true
}

func FindingCriteria(cfg *pipe.Config) services.DefinitionCriteria {
func FindingCriteria(cfg *beyla.Config) services.DefinitionCriteria {
if cfg.Discovery.SystemWide {
// will return all the executables in the system
return services.DefinitionCriteria{
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/discover/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/testutil"
)

func TestCriteriaMatcher(t *testing.T) {
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: port-only
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestCriteriaMatcher(t *testing.T) {
}

func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) {
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: all-attributes-must-match
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) {
}

func TestCriteriaMatcherMissingPort(t *testing.T) {
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: port-only
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/discover/typer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (

"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/exec"
"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/svc"
)

// ExecTyper classifies the discovered executables according to the
// executable type (Go, generic...), and filters these executables
// that are not instrumentable.
type ExecTyper struct {
Cfg *pipe.Config
Cfg *beyla.Config
Metrics imetrics.Reporter
}

Expand Down Expand Up @@ -52,7 +52,7 @@ func ExecTyperProvider(ecfg ExecTyper) (node.MiddleFunc[[]Event[ProcessMatch], [
}

type typer struct {
cfg *pipe.Config
cfg *beyla.Config
metrics imetrics.Reporter
log *slog.Logger
currentPids map[int32]*exec.FileInfo
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakek8sclientset "k8s.io/client-go/kubernetes/fake"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/testutil"
)

Expand Down Expand Up @@ -125,7 +125,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
Informer: &informer,
})
require.NoError(t, err)
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: port-only
Expand Down
10 changes: 5 additions & 5 deletions pkg/internal/discover/watcher_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/shirou/gopsutil/net"
"github.com/shirou/gopsutil/process"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/ebpf/watcher"
"github.com/grafana/beyla/pkg/internal/pipe"
)

const (
Expand All @@ -28,7 +28,7 @@ const (
// as well as PIDs from processes that setup a new connection
type ProcessWatcher struct {
Ctx context.Context
Cfg *pipe.Config
Cfg *beyla.Config
}

type WatchEventType int
Expand Down Expand Up @@ -86,7 +86,7 @@ type pidPort struct {
// ^ This is partially done, although it's not fully async, we only use the info to reduce the overhead of port scanning.
type pollAccounter struct {
ctx context.Context
cfg *pipe.Config
cfg *beyla.Config
interval time.Duration
// last polled process:ports accessible by its pid
pids map[PID]processAttrs
Expand All @@ -98,7 +98,7 @@ type pollAccounter struct {
// injectable function
executableReady func(PID) bool
// injectable function to load the bpf program
loadBPFWatcher func(cfg *pipe.Config, events chan<- watcher.Event) error
loadBPFWatcher func(cfg *beyla.Config, events chan<- watcher.Event) error
// we use these to ensure we poll for the open ports effectively
stateMux sync.Mutex
bpfWatcherEnabled bool
Expand Down Expand Up @@ -320,7 +320,7 @@ func fetchProcessPorts(scanPorts bool) (map[PID]processAttrs, error) {
return processes, nil
}

func loadBPFWatcher(cfg *pipe.Config, events chan<- watcher.Event) error {
func loadBPFWatcher(cfg *beyla.Config, events chan<- watcher.Event) error {
wt := watcher.New(cfg, events)
return ebpf.RunUtilityTracer(wt, BuildPinPath(cfg))
}
Loading

0 comments on commit ec757bd

Please sign in to comment.