From ec757bd76e1f14fce9cb32d956a0253e880712d5 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 30 Jan 2024 09:39:25 +0100 Subject: [PATCH] Reorganizing Beyla configuration (#584) --- cmd/beyla/main.go | 3 +- .../beyla.go => appobserv/appobserv.go} | 33 +++++-------------- pkg/{internal/pipe => beyla}/config.go | 2 +- pkg/{internal/pipe => beyla}/config_test.go | 2 +- pkg/internal/discover/attacher.go | 8 ++--- pkg/internal/discover/finder.go | 12 +++---- pkg/internal/discover/matcher.go | 6 ++-- pkg/internal/discover/matcher_test.go | 8 ++--- pkg/internal/discover/typer.go | 6 ++-- pkg/internal/discover/watcher_kube_test.go | 4 +-- pkg/internal/discover/watcher_proc.go | 10 +++--- pkg/internal/discover/watcher_proc_test.go | 10 +++--- pkg/internal/ebpf/httpfltr/httpfltr.go | 6 ++-- pkg/internal/ebpf/httpssl/httpssl.go | 6 ++-- pkg/internal/ebpf/watcher/watcher.go | 6 ++-- pkg/internal/pipe/instrumenter.go | 9 ++--- pkg/internal/pipe/instrumenter_test.go | 17 +++++----- 17 files changed, 67 insertions(+), 81 deletions(-) rename pkg/{beyla/beyla.go => appobserv/appobserv.go} (87%) rename pkg/{internal/pipe => beyla}/config.go (99%) rename pkg/{internal/pipe => beyla}/config_test.go (99%) diff --git a/cmd/beyla/main.go b/cmd/beyla/main.go index caec00700..d2d8142c5 100644 --- a/cmd/beyla/main.go +++ b/cmd/beyla/main.go @@ -15,6 +15,7 @@ import ( otelsdk "go.opentelemetry.io/otel/sdk" + "github.com/grafana/beyla/pkg/appobserv" "github.com/grafana/beyla/pkg/beyla" ) @@ -60,7 +61,7 @@ func main() { // in two parts: // 1st executable - Invoke FindTarget, which also mounts the BPF maps // 2nd executable - Invoke ReadAndForward, receiving the BPF map mountpoint as argument - instr := beyla.New(config) + instr := appobserv.New(config) if err := instr.FindAndInstrument(ctx); err != nil { slog.Error("Beyla couldn't find target process", "error", err) os.Exit(-1) diff --git a/pkg/beyla/beyla.go b/pkg/appobserv/appobserv.go similarity index 87% rename from pkg/beyla/beyla.go rename to pkg/appobserv/appobserv.go index c956ab954..6949a03a7 100644 --- a/pkg/beyla/beyla.go +++ b/pkg/appobserv/appobserv.go @@ -1,15 +1,15 @@ -// Package beyla provides public access to Beyla as a library. All the other subcomponents +// Package appobserv provides public access to Beyla application observability as a library. All the other subcomponents // of Beyla are hidden. -package beyla +package appobserv import ( "context" "fmt" - "io" "log/slog" "k8s.io/client-go/kubernetes" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/connector" "github.com/grafana/beyla/pkg/internal/discover" "github.com/grafana/beyla/pkg/internal/imetrics" @@ -21,9 +21,6 @@ import ( "github.com/grafana/beyla/pkg/internal/transform/kube" ) -// Config as provided by the user to configure and run Beyla -type Config pipe.Config - func log() *slog.Logger { return slog.With("component", "beyla.Instrumenter") } @@ -31,7 +28,7 @@ func log() *slog.Logger { // Instrumenter finds and instrument a service/process, and forwards the traces as // configured by the user type Instrumenter struct { - config *pipe.Config + config *beyla.Config ctxInfo *global.ContextInfo // tracesInput is used to communicate the found traces between the ProcessFinder and @@ -42,28 +39,14 @@ type Instrumenter struct { } // New Instrumenter, given a Config -func New(config *Config) *Instrumenter { +func New(config *beyla.Config) *Instrumenter { return &Instrumenter{ - config: (*pipe.Config)(config), - ctxInfo: buildContextInfo((*pipe.Config)(config)), + config: config, + ctxInfo: buildContextInfo(config), tracesInput: make(chan []request.Span, config.ChannelBufferLen), } } -// LoadConfig loads and validates configuration. -// Configuration from multiple source is overridden in the following order -// (from less to most priority): -// 1 - Default configuration -// 2 - Contents of the provided file reader (nillable) -// 3 - Environment variables -func LoadConfig(reader io.Reader) (*Config, error) { - cfg, err := pipe.LoadConfig(reader) - if err != nil { - return nil, err - } - return (*Config)(cfg), nil -} - // FindAndInstrument searches in background for any new executable matching the // selection criteria. func (i *Instrumenter) FindAndInstrument(ctx context.Context) error { @@ -133,7 +116,7 @@ func (i *Instrumenter) ReadAndForward(ctx context.Context) error { // buildContextInfo populates some globally shared components and properties // from the user-provided configuration -func buildContextInfo(config *pipe.Config) *global.ContextInfo { +func buildContextInfo(config *beyla.Config) *global.ContextInfo { promMgr := &connector.PrometheusManager{} k8sCfg := &config.Attributes.Kubernetes ctxInfo := &global.ContextInfo{ diff --git a/pkg/internal/pipe/config.go b/pkg/beyla/config.go similarity index 99% rename from pkg/internal/pipe/config.go rename to pkg/beyla/config.go index ad5b53c26..109ea49c2 100644 --- a/pkg/internal/pipe/config.go +++ b/pkg/beyla/config.go @@ -1,4 +1,4 @@ -package pipe +package beyla import ( "fmt" diff --git a/pkg/internal/pipe/config_test.go b/pkg/beyla/config_test.go similarity index 99% rename from pkg/internal/pipe/config_test.go rename to pkg/beyla/config_test.go index c9a7be4cb..e916420d3 100644 --- a/pkg/internal/pipe/config_test.go +++ b/pkg/beyla/config_test.go @@ -1,4 +1,4 @@ -package pipe +package beyla import ( "bytes" diff --git a/pkg/internal/discover/attacher.go b/pkg/internal/discover/attacher.go index a0dfb19e8..586efe985 100644 --- a/pkg/internal/discover/attacher.go +++ b/pkg/internal/discover/attacher.go @@ -10,11 +10,11 @@ import ( "github.com/cilium/ebpf/link" "github.com/mariomac/pipes/pkg/node" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/ebpf" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/helpers" "github.com/grafana/beyla/pkg/internal/imetrics" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -23,7 +23,7 @@ import ( // instrumenting the executable type TraceAttacher struct { log *slog.Logger - Cfg *pipe.Config + Cfg *beyla.Config Ctx context.Context DiscoveredTracers chan *ebpf.ProcessTracer DeleteTracers chan *Instrumentable @@ -178,10 +178,10 @@ func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) { } } -// pinpath must be unique for a given executable group +// BuildPinPath pinpath must be unique for a given executable group // it will be: // - current beyla PID -func BuildPinPath(cfg *pipe.Config) string { +func BuildPinPath(cfg *beyla.Config) string { return path.Join(cfg.EBPF.BpfBaseDir, fmt.Sprintf("beyla-%d", os.Getpid())) } diff --git a/pkg/internal/discover/finder.go b/pkg/internal/discover/finder.go index f6b4291fc..2f19716b8 100644 --- a/pkg/internal/discover/finder.go +++ b/pkg/internal/discover/finder.go @@ -7,6 +7,7 @@ import ( "github.com/mariomac/pipes/pkg/graph" "github.com/mariomac/pipes/pkg/node" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/ebpf" "github.com/grafana/beyla/pkg/internal/ebpf/goruntime" "github.com/grafana/beyla/pkg/internal/ebpf/gosql" @@ -15,7 +16,6 @@ import ( "github.com/grafana/beyla/pkg/internal/ebpf/httpssl" "github.com/grafana/beyla/pkg/internal/ebpf/nethttp" "github.com/grafana/beyla/pkg/internal/imetrics" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/pipe/global" ) @@ -33,7 +33,7 @@ type ProcessFinder struct { TraceAttacher } -func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.ContextInfo) *ProcessFinder { +func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo) *ProcessFinder { processFinder := ProcessFinder{ ProcessWatcher: ProcessWatcher{Ctx: ctx, Cfg: cfg}, CriteriaMatcher: CriteriaMatcher{Cfg: cfg}, @@ -55,7 +55,7 @@ func NewProcessFinder(ctx context.Context, cfg *pipe.Config, ctxInfo *global.Con // Start the ProcessFinder pipeline in background. It returns a channel where each new discovered // ebpf.ProcessTracer will be notified. -func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) { +func (pf *ProcessFinder) Start(cfg *beyla.Config) (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) { gb := graph.NewBuilder(node.ChannelBufferLen(cfg.ChannelBufferLen)) graph.RegisterStart(gb, ProcessWatcherProvider) graph.RegisterMiddle(gb, WatcherKubeEnricherProvider) @@ -74,7 +74,7 @@ func (pf *ProcessFinder) Start(cfg *pipe.Config) (<-chan *ebpf.ProcessTracer, <- // auxiliary functions to instantiate the go and non-go tracers on diverse steps of the // discovery pipeline -func newGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer { +func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer { // Each program is an eBPF source: net/http, grpc... return []ebpf.Tracer{ nethttp.New(&cfg.EBPF, metrics), @@ -85,10 +85,10 @@ func newGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Trace } } -func newNonGoTracersGroup(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer { +func newNonGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer { return []ebpf.Tracer{httpfltr.New(cfg, metrics), httpssl.New(cfg, metrics)} } -func newNonGoTracersGroupUProbes(cfg *pipe.Config, metrics imetrics.Reporter) []ebpf.Tracer { +func newNonGoTracersGroupUProbes(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer { return []ebpf.Tracer{httpssl.New(cfg, metrics)} } diff --git a/pkg/internal/discover/matcher.go b/pkg/internal/discover/matcher.go index 896236288..010ea9e35 100644 --- a/pkg/internal/discover/matcher.go +++ b/pkg/internal/discover/matcher.go @@ -11,13 +11,13 @@ import ( "github.com/mariomac/pipes/pkg/node" "github.com/shirou/gopsutil/process" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/discover/services" - "github.com/grafana/beyla/pkg/internal/pipe" ) // CriteriaMatcher filters the processes that match the discovery criteria. type CriteriaMatcher struct { - Cfg *pipe.Config + Cfg *beyla.Config } func CriteriaMatcherProvider(cm CriteriaMatcher) (node.MiddleFunc[[]Event[processAttrs], []Event[ProcessMatch]], error) { @@ -161,7 +161,7 @@ func (m *matcher) matchByAttributes(actual map[string]string, required map[strin return true } -func FindingCriteria(cfg *pipe.Config) services.DefinitionCriteria { +func FindingCriteria(cfg *beyla.Config) services.DefinitionCriteria { if cfg.Discovery.SystemWide { // will return all the executables in the system return services.DefinitionCriteria{ diff --git a/pkg/internal/discover/matcher_test.go b/pkg/internal/discover/matcher_test.go index 5fdf8781d..e16c60179 100644 --- a/pkg/internal/discover/matcher_test.go +++ b/pkg/internal/discover/matcher_test.go @@ -7,13 +7,13 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/discover/services" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/testutil" ) func TestCriteriaMatcher(t *testing.T) { - pipeConfig := pipe.Config{} + pipeConfig := beyla.Config{} require.NoError(t, yaml.Unmarshal([]byte(`discovery: services: - name: port-only @@ -74,7 +74,7 @@ func TestCriteriaMatcher(t *testing.T) { } func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) { - pipeConfig := pipe.Config{} + pipeConfig := beyla.Config{} require.NoError(t, yaml.Unmarshal([]byte(`discovery: services: - name: all-attributes-must-match @@ -135,7 +135,7 @@ func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) { } func TestCriteriaMatcherMissingPort(t *testing.T) { - pipeConfig := pipe.Config{} + pipeConfig := beyla.Config{} require.NoError(t, yaml.Unmarshal([]byte(`discovery: services: - name: port-only diff --git a/pkg/internal/discover/typer.go b/pkg/internal/discover/typer.go index 4e3ac5ae4..cb2250531 100644 --- a/pkg/internal/discover/typer.go +++ b/pkg/internal/discover/typer.go @@ -6,10 +6,10 @@ import ( "github.com/mariomac/pipes/pkg/node" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/imetrics" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -17,7 +17,7 @@ import ( // executable type (Go, generic...), and filters these executables // that are not instrumentable. type ExecTyper struct { - Cfg *pipe.Config + Cfg *beyla.Config Metrics imetrics.Reporter } @@ -52,7 +52,7 @@ func ExecTyperProvider(ecfg ExecTyper) (node.MiddleFunc[[]Event[ProcessMatch], [ } type typer struct { - cfg *pipe.Config + cfg *beyla.Config metrics imetrics.Reporter log *slog.Logger currentPids map[int32]*exec.FileInfo diff --git a/pkg/internal/discover/watcher_kube_test.go b/pkg/internal/discover/watcher_kube_test.go index 50a0e51cf..aedad80ea 100644 --- a/pkg/internal/discover/watcher_kube_test.go +++ b/pkg/internal/discover/watcher_kube_test.go @@ -16,10 +16,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakek8sclientset "k8s.io/client-go/kubernetes/fake" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/discover/services" "github.com/grafana/beyla/pkg/internal/helpers/container" "github.com/grafana/beyla/pkg/internal/kube" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/testutil" ) @@ -125,7 +125,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { Informer: &informer, }) require.NoError(t, err) - pipeConfig := pipe.Config{} + pipeConfig := beyla.Config{} require.NoError(t, yaml.Unmarshal([]byte(`discovery: services: - name: port-only diff --git a/pkg/internal/discover/watcher_proc.go b/pkg/internal/discover/watcher_proc.go index b2bcc8627..f3b084fb7 100644 --- a/pkg/internal/discover/watcher_proc.go +++ b/pkg/internal/discover/watcher_proc.go @@ -14,10 +14,10 @@ import ( "github.com/shirou/gopsutil/net" "github.com/shirou/gopsutil/process" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/discover/services" "github.com/grafana/beyla/pkg/internal/ebpf" "github.com/grafana/beyla/pkg/internal/ebpf/watcher" - "github.com/grafana/beyla/pkg/internal/pipe" ) const ( @@ -28,7 +28,7 @@ const ( // as well as PIDs from processes that setup a new connection type ProcessWatcher struct { Ctx context.Context - Cfg *pipe.Config + Cfg *beyla.Config } type WatchEventType int @@ -86,7 +86,7 @@ type pidPort struct { // ^ This is partially done, although it's not fully async, we only use the info to reduce the overhead of port scanning. type pollAccounter struct { ctx context.Context - cfg *pipe.Config + cfg *beyla.Config interval time.Duration // last polled process:ports accessible by its pid pids map[PID]processAttrs @@ -98,7 +98,7 @@ type pollAccounter struct { // injectable function executableReady func(PID) bool // injectable function to load the bpf program - loadBPFWatcher func(cfg *pipe.Config, events chan<- watcher.Event) error + loadBPFWatcher func(cfg *beyla.Config, events chan<- watcher.Event) error // we use these to ensure we poll for the open ports effectively stateMux sync.Mutex bpfWatcherEnabled bool @@ -320,7 +320,7 @@ func fetchProcessPorts(scanPorts bool) (map[PID]processAttrs, error) { return processes, nil } -func loadBPFWatcher(cfg *pipe.Config, events chan<- watcher.Event) error { +func loadBPFWatcher(cfg *beyla.Config, events chan<- watcher.Event) error { wt := watcher.New(cfg, events) return ebpf.RunUtilityTracer(wt, BuildPinPath(cfg)) } diff --git a/pkg/internal/discover/watcher_proc_test.go b/pkg/internal/discover/watcher_proc_test.go index 5209f2fe3..45dd14c38 100644 --- a/pkg/internal/discover/watcher_proc_test.go +++ b/pkg/internal/discover/watcher_proc_test.go @@ -12,8 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/ebpf/watcher" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/testutil" ) @@ -52,7 +52,7 @@ func TestWatcher_Poll(t *testing.T) { executableReady: func(PID) bool { return true }, - loadBPFWatcher: func(*pipe.Config, chan<- watcher.Event) error { + loadBPFWatcher: func(*beyla.Config, chan<- watcher.Event) error { return nil }, } @@ -132,7 +132,7 @@ func TestProcessNotReady(t *testing.T) { executableReady: func(pid PID) bool { return pid >= 3 }, - loadBPFWatcher: func(*pipe.Config, chan<- watcher.Event) error { + loadBPFWatcher: func(*beyla.Config, chan<- watcher.Event) error { return nil }, } @@ -163,7 +163,7 @@ func TestPortsFetchRequired(t *testing.T) { userConfig := bytes.NewBufferString("channel_buffer_len: 33") require.NoError(t, os.Setenv("BEYLA_OPEN_PORT", "8080-8089")) - cfg, err := pipe.LoadConfig(userConfig) + cfg, err := beyla.LoadConfig(userConfig) require.NoError(t, err) channelReturner := make(chan chan<- watcher.Event) @@ -181,7 +181,7 @@ func TestPortsFetchRequired(t *testing.T) { executableReady: func(pid PID) bool { return true }, - loadBPFWatcher: func(cfg *pipe.Config, events chan<- watcher.Event) error { + loadBPFWatcher: func(cfg *beyla.Config, events chan<- watcher.Event) error { channelReturner <- events return nil }, diff --git a/pkg/internal/ebpf/httpfltr/httpfltr.go b/pkg/internal/ebpf/httpfltr/httpfltr.go index 78027a150..6499dd5cc 100644 --- a/pkg/internal/ebpf/httpfltr/httpfltr.go +++ b/pkg/internal/ebpf/httpfltr/httpfltr.go @@ -16,11 +16,11 @@ import ( "github.com/cilium/ebpf/ringbuf" lru "github.com/hashicorp/golang-lru/v2" + "github.com/grafana/beyla/pkg/beyla" ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/imetrics" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -55,7 +55,7 @@ type PidsFilter interface { type Tracer struct { pidsFilter PidsFilter - cfg *pipe.Config + cfg *beyla.Config metrics imetrics.Reporter bpfObjects bpfObjects closers []io.Closer @@ -63,7 +63,7 @@ type Tracer struct { Service *svc.ID } -func New(cfg *pipe.Config, metrics imetrics.Reporter) *Tracer { +func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { log := slog.With("component", "httpfltr.Tracer") var filter PidsFilter if cfg.Discovery.SystemWide { diff --git a/pkg/internal/ebpf/httpssl/httpssl.go b/pkg/internal/ebpf/httpssl/httpssl.go index 3f67c14fa..3e687e7e5 100644 --- a/pkg/internal/ebpf/httpssl/httpssl.go +++ b/pkg/internal/ebpf/httpssl/httpssl.go @@ -8,12 +8,12 @@ import ( "github.com/cilium/ebpf" + "github.com/grafana/beyla/pkg/beyla" ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" "github.com/grafana/beyla/pkg/internal/ebpf/httpfltr" "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/imetrics" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -41,7 +41,7 @@ type HTTPInfo struct { type Tracer struct { pidsFilter httpfltr.PidsFilter - cfg *pipe.Config + cfg *beyla.Config metrics imetrics.Reporter bpfObjects bpfObjects closers []io.Closer @@ -49,7 +49,7 @@ type Tracer struct { Service *svc.ID } -func New(cfg *pipe.Config, metrics imetrics.Reporter) *Tracer { +func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { log := slog.With("component", "httpfltr.Tracer") var filter httpfltr.PidsFilter if cfg.Discovery.SystemWide { diff --git a/pkg/internal/ebpf/watcher/watcher.go b/pkg/internal/ebpf/watcher/watcher.go index e8b3328f9..c12df65a1 100644 --- a/pkg/internal/ebpf/watcher/watcher.go +++ b/pkg/internal/ebpf/watcher/watcher.go @@ -10,8 +10,8 @@ import ( "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" + "github.com/grafana/beyla/pkg/beyla" ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" - "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -22,7 +22,7 @@ import ( type BPFWatchInfo bpfWatchInfoT type Watcher struct { - cfg *pipe.Config + cfg *beyla.Config bpfObjects bpfObjects closers []io.Closer log *slog.Logger @@ -41,7 +41,7 @@ type Event struct { Payload uint32 // this will be either port or pid } -func New(cfg *pipe.Config, events chan<- Event) *Watcher { +func New(cfg *beyla.Config, events chan<- Event) *Watcher { log := slog.With("component", "watcher.Tracer") return &Watcher{ log: log, diff --git a/pkg/internal/pipe/instrumenter.go b/pkg/internal/pipe/instrumenter.go index b5c660eda..c38de561a 100644 --- a/pkg/internal/pipe/instrumenter.go +++ b/pkg/internal/pipe/instrumenter.go @@ -7,6 +7,7 @@ import ( "github.com/mariomac/pipes/pkg/graph" "github.com/mariomac/pipes/pkg/node" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/export/debug" "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/export/prom" @@ -35,7 +36,7 @@ type nodesMap struct { Noop debug.NoopEnabled } -func configToNodesMap(cfg *Config) *nodesMap { +func configToNodesMap(cfg *beyla.Config) *nodesMap { return &nodesMap{ TracesReader: traces.ReadDecorator{InstanceID: cfg.Attributes.InstanceID}, Routes: cfg.Routes, @@ -52,7 +53,7 @@ func configToNodesMap(cfg *Config) *nodesMap { type graphFunctions struct { ctx context.Context - config *Config + config *beyla.Config builder *graph.Builder ctxInfo *global.ContextInfo @@ -64,7 +65,7 @@ type graphFunctions struct { // Build instantiates the whole instrumentation --> processing --> submit // pipeline graph and returns it as a startable item -func Build(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) (*Instrumenter, error) { +func Build(ctx context.Context, config *beyla.Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) (*Instrumenter, error) { if err := config.Validate(); err != nil { return nil, fmt.Errorf("validating configuration: %w", err) } @@ -74,7 +75,7 @@ func Build(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tra // private constructor that can be instantiated from tests to override the node providers // and offsets inspector -func newGraphBuilder(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) *graphFunctions { +func newGraphBuilder(ctx context.Context, config *beyla.Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) *graphFunctions { // This is how the github.com/mariomac/pipes library, works: // https://github.com/mariomac/pipes/tree/main/docs/tutorial/b-highlevel/01-basic-nodes diff --git a/pkg/internal/pipe/instrumenter_test.go b/pkg/internal/pipe/instrumenter_test.go index eabb467a1..030054919 100644 --- a/pkg/internal/pipe/instrumenter_test.go +++ b/pkg/internal/pipe/instrumenter_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.19.0" + "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" @@ -41,7 +42,7 @@ func TestBasicPipeline(t *testing.T) { tc, err := collector.Start(ctx) require.NoError(t, err) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Metrics: otel.MetricsConfig{ MetricsEndpoint: tc.ServerEndpoint, ReportTarget: true, ReportPeerInfo: true, Interval: 10 * time.Millisecond, @@ -85,7 +86,7 @@ func TestTracerPipeline(t *testing.T) { tc, err := collector.Start(ctx) require.NoError(t, err) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Traces: otel.TracesConfig{ BatchTimeout: 10 * time.Millisecond, TracesEndpoint: tc.ServerEndpoint, @@ -121,7 +122,7 @@ func TestTracerPipelineBadTimestamps(t *testing.T) { tc, err := collector.Start(ctx) require.NoError(t, err) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Traces: otel.TracesConfig{ BatchTimeout: 10 * time.Millisecond, TracesEndpoint: tc.ServerEndpoint, @@ -153,7 +154,7 @@ func TestRouteConsolidation(t *testing.T) { tc, err := collector.Start(ctx) require.NoError(t, err) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Metrics: otel.MetricsConfig{ ReportPeerInfo: false, // no peer info MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, @@ -228,7 +229,7 @@ func TestGRPCPipeline(t *testing.T) { tc, err := collector.Start(ctx) require.NoError(t, err) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Metrics: otel.MetricsConfig{ MetricsEndpoint: tc.ServerEndpoint, ReportTarget: true, ReportPeerInfo: true, Interval: time.Millisecond, ReportersCacheLen: 16, @@ -270,7 +271,7 @@ func TestTraceGRPCPipeline(t *testing.T) { tc, err := collector.Start(ctx) require.NoError(t, err) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Traces: otel.TracesConfig{ TracesEndpoint: tc.ServerEndpoint, BatchTimeout: time.Millisecond, ReportersCacheLen: 16, @@ -467,7 +468,7 @@ func TestBasicPipelineInfo(t *testing.T) { require.NoError(t, err) tracesInput := make(chan []request.Span, 10) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Metrics: otel.MetricsConfig{ MetricsEndpoint: tc.ServerEndpoint, ReportTarget: true, ReportPeerInfo: true, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, @@ -502,7 +503,7 @@ func TestTracerPipelineInfo(t *testing.T) { tc, err := collector.Start(ctx) require.NoError(t, err) - gb := newGraphBuilder(ctx, &Config{ + gb := newGraphBuilder(ctx, &beyla.Config{ Traces: otel.TracesConfig{TracesEndpoint: tc.ServerEndpoint, ReportersCacheLen: 16}, }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data