Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganizing Beyla configuration packages #584

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/beyla/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

otelsdk "go.opentelemetry.io/otel/sdk"

"github.com/grafana/beyla/pkg/appobserv"
"github.com/grafana/beyla/pkg/beyla"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func main() {
// in two parts:
// 1st executable - Invoke FindTarget, which also mounts the BPF maps
// 2nd executable - Invoke ReadAndForward, receiving the BPF map mountpoint as argument
instr := beyla.New(config)
instr := appobserv.New(config)
if err := instr.FindAndInstrument(ctx); err != nil {
slog.Error("Beyla couldn't find target process", "error", err)
os.Exit(-1)
Expand Down
33 changes: 8 additions & 25 deletions pkg/beyla/beyla.go → pkg/appobserv/appobserv.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// Package beyla provides public access to Beyla as a library. All the other subcomponents
// Package appobserv provides public access to Beyla application observability as a library. All the other subcomponents
// of Beyla are hidden.
package beyla
package appobserv

import (
"context"
"fmt"
"io"
"log/slog"

"k8s.io/client-go/kubernetes"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/connector"
"github.com/grafana/beyla/pkg/internal/discover"
"github.com/grafana/beyla/pkg/internal/imetrics"
Expand All @@ -21,17 +21,14 @@ import (
"github.com/grafana/beyla/pkg/internal/transform/kube"
)

// Config as provided by the user to configure and run Beyla
type Config pipe.Config

func log() *slog.Logger {
return slog.With("component", "beyla.Instrumenter")
}

// Instrumenter finds and instrument a service/process, and forwards the traces as
// configured by the user
type Instrumenter struct {
config *pipe.Config
config *beyla.Config
ctxInfo *global.ContextInfo

// tracesInput is used to communicate the found traces between the ProcessFinder and
Expand All @@ -42,28 +39,14 @@ type Instrumenter struct {
}

// New Instrumenter, given a Config
func New(config *Config) *Instrumenter {
func New(config *beyla.Config) *Instrumenter {
return &Instrumenter{
config: (*pipe.Config)(config),
ctxInfo: buildContextInfo((*pipe.Config)(config)),
config: config,
ctxInfo: buildContextInfo(config),
tracesInput: make(chan []request.Span, config.ChannelBufferLen),
}
}

// LoadConfig loads and validates configuration.
// Configuration from multiple source is overridden in the following order
// (from less to most priority):
// 1 - Default configuration
// 2 - Contents of the provided file reader (nillable)
// 3 - Environment variables
func LoadConfig(reader io.Reader) (*Config, error) {
cfg, err := pipe.LoadConfig(reader)
if err != nil {
return nil, err
}
return (*Config)(cfg), nil
}

// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
Expand Down Expand Up @@ -133,7 +116,7 @@ func (i *Instrumenter) ReadAndForward(ctx context.Context) error {

// buildContextInfo populates some globally shared components and properties
// from the user-provided configuration
func buildContextInfo(config *pipe.Config) *global.ContextInfo {
func buildContextInfo(config *beyla.Config) *global.ContextInfo {
promMgr := &connector.PrometheusManager{}
k8sCfg := &config.Attributes.Kubernetes
ctxInfo := &global.ContextInfo{
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/pipe/config.go → pkg/beyla/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pipe
package beyla

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pipe
package beyla

import (
"bytes"
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/cilium/ebpf/link"
"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/helpers"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/svc"
)

Expand All @@ -23,7 +23,7 @@ import (
// instrumenting the executable
type TraceAttacher struct {
log *slog.Logger
Cfg *pipe.Config
Cfg *beyla.Config
Ctx context.Context
DiscoveredTracers chan *ebpf.ProcessTracer
DeleteTracers chan *Instrumentable
Expand Down Expand Up @@ -178,10 +178,10 @@ func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) {
}
}

// pinpath must be unique for a given executable group
// BuildPinPath pinpath must be unique for a given executable group
// it will be:
// - current beyla PID
func BuildPinPath(cfg *pipe.Config) string {
func BuildPinPath(cfg *beyla.Config) string {
return path.Join(cfg.EBPF.BpfBaseDir, fmt.Sprintf("beyla-%d", os.Getpid()))
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/mariomac/pipes/pkg/graph"
"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/ebpf/goruntime"
"github.com/grafana/beyla/pkg/internal/ebpf/gosql"
Expand All @@ -15,7 +16,6 @@ import (
"github.com/grafana/beyla/pkg/internal/ebpf/httpssl"
"github.com/grafana/beyla/pkg/internal/ebpf/nethttp"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/pipe/global"
)

Expand All @@ -33,7 +33,7 @@ type ProcessFinder struct {
TraceAttacher
}

func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
processFinder := ProcessFinder{
ProcessWatcher: ProcessWatcher{Ctx: ctx, Cfg: cfg},
CriteriaMatcher: CriteriaMatcher{Cfg: cfg},
Expand All @@ -55,7 +55,7 @@ func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.Con

// Start the ProcessFinder pipeline in background. It returns a channel where each new discovered
// ebpf.ProcessTracer will be notified.
func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
func (pf *ProcessFinder) Start(cfg *beyla.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen))
graph.RegisterStart(gb, ProcessWatcherProvider)
graph.RegisterMiddle(gb, WatcherKubeEnricherProvider)
Expand All @@ -74,7 +74,7 @@ func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-
// auxiliary functions to instantiate the go and non-go tracers on diverse steps of the
// discovery pipeline

func newGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer {
func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
// Each program is an eBPF source: net/http, grpc...
return []ebpf.Tracer{
nethttp.New(&cfg.EBPF, metrics),
Expand All @@ -85,10 +85,10 @@ func newGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Trace
}
}

func newNonGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer {
func newNonGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
return []ebpf.Tracer{httpfltr.New(cfg, metrics), httpssl.New(cfg, metrics)}
}

func newNonGoTracersGroupUProbes(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer {
func newNonGoTracersGroupUProbes(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
return []ebpf.Tracer{httpssl.New(cfg, metrics)}
}
6 changes: 3 additions & 3 deletions pkg/internal/discover/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/mariomac/pipes/pkg/node"
"github.com/shirou/gopsutil/process"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/pipe"
)

// CriteriaMatcher filters the processes that match the discovery criteria.
type CriteriaMatcher struct {
Cfg *pipe.Config
Cfg *beyla.Config
}

func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processAttrs], []Event[ProcessMatch]], error) {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (m *matcher) matchByAttributes(actual map[string]string, required map[strin
return true
}

func FindingCriteria(cfg *pipe.Config) services.DefinitionCriteria {
func FindingCriteria(cfg *beyla.Config) services.DefinitionCriteria {
if cfg.Discovery.SystemWide {
// will return all the executables in the system
return services.DefinitionCriteria{
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/discover/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/testutil"
)

func TestCriteriaMatcher(t *testing.T) {
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: port-only
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestCriteriaMatcher(t *testing.T) {
}

func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) {
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: all-attributes-must-match
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) {
}

func TestCriteriaMatcherMissingPort(t *testing.T) {
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: port-only
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/discover/typer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (

"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/exec"
"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/svc"
)

// ExecTyper classifies the discovered executables according to the
// executable type (Go, generic...), and filters these executables
// that are not instrumentable.
type ExecTyper struct {
Cfg *pipe.Config
Cfg *beyla.Config
Metrics imetrics.Reporter
}

Expand Down Expand Up @@ -52,7 +52,7 @@ func ExecTyperProvider(ecfg ExecTyper) (node.MiddleFunc[[]Event[ProcessMatch], [
}

type typer struct {
cfg *pipe.Config
cfg *beyla.Config
metrics imetrics.Reporter
log *slog.Logger
currentPids map[int32]*exec.FileInfo
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakek8sclientset "k8s.io/client-go/kubernetes/fake"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/helpers/container"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/testutil"
)

Expand Down Expand Up @@ -125,7 +125,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
Informer: &informer,
})
require.NoError(t, err)
pipeConfig := pipe.Config{}
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
services:
- name: port-only
Expand Down
10 changes: 5 additions & 5 deletions pkg/internal/discover/watcher_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/shirou/gopsutil/net"
"github.com/shirou/gopsutil/process"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover/services"
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/ebpf/watcher"
"github.com/grafana/beyla/pkg/internal/pipe"
)

const (
Expand All @@ -28,7 +28,7 @@ const (
// as well as PIDs from processes that setup a new connection
type ProcessWatcher struct {
Ctx context.Context
Cfg *pipe.Config
Cfg *beyla.Config
}

type WatchEventType int
Expand Down Expand Up @@ -86,7 +86,7 @@ type pidPort struct {
// ^ This is partially done, although it's not fully async, we only use the info to reduce the overhead of port scanning.
type pollAccounter struct {
ctx context.Context
cfg *pipe.Config
cfg *beyla.Config
interval time.Duration
// last polled process:ports accessible by its pid
pids map[PID]processAttrs
Expand All @@ -98,7 +98,7 @@ type pollAccounter struct {
// injectable function
executableReady func(PID) bool
// injectable function to load the bpf program
loadBPFWatcher func(cfg *pipe.Config, events chan<- watcher.Event) error
loadBPFWatcher func(cfg *beyla.Config, events chan<- watcher.Event) error
// we use these to ensure we poll for the open ports effectively
stateMux sync.Mutex
bpfWatcherEnabled bool
Expand Down Expand Up @@ -320,7 +320,7 @@ func fetchProcessPorts(scanPorts bool) (map[PID]processAttrs, error) {
return processes, nil
}

func loadBPFWatcher(cfg *pipe.Config, events chan<- watcher.Event) error {
func loadBPFWatcher(cfg *beyla.Config, events chan<- watcher.Event) error {
wt := watcher.New(cfg, events)
return ebpf.RunUtilityTracer(wt, BuildPinPath(cfg))
}
Loading
Loading