From 99ce613f1006e0c4f0c46d7050615e1a7cd60a54 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Fri, 8 Nov 2024 10:20:54 +0100 Subject: [PATCH 1/4] extract reporter runloop --- reporter/otlp_reporter.go | 62 ++++++++++++++++----------------------- reporter/runloop.go | 44 +++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 37 deletions(-) create mode 100644 reporter/runloop.go diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 1cf6201c..1c00fbcf 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -102,8 +102,8 @@ type OTLPReporter struct { // client for the connection to the receiver. client otlpcollector.ProfilesServiceClient - // stopSignal is the stop signal for shutting down all background tasks. - stopSignal chan libpf.Void + // runLoop handles the run loop + runLoop *runLoop // rpcStats stores gRPC related statistics. rpcStats *StatsHandlerImpl @@ -180,15 +180,17 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { } return &OTLPReporter{ - config: cfg, - name: cfg.Name, - version: cfg.Version, - kernelVersion: cfg.KernelVersion, - hostName: cfg.HostName, - ipAddress: cfg.IPAddress, - samplesPerSecond: cfg.SamplesPerSecond, - hostID: strconv.FormatUint(cfg.HostID, 10), - stopSignal: make(chan libpf.Void), + config: cfg, + name: cfg.Name, + version: cfg.Version, + kernelVersion: cfg.KernelVersion, + hostName: cfg.HostName, + ipAddress: cfg.IPAddress, + samplesPerSecond: cfg.SamplesPerSecond, + hostID: strconv.FormatUint(cfg.HostID, 10), + runLoop: &runLoop{ + stopSignal: make(chan libpf.Void), + }, pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, client: nil, rpcStats: NewStatsHandler(), @@ -346,7 +348,7 @@ func (r *OTLPReporter) ReportMetrics(_ uint32, _ []uint32, _ []int64) {} // Stop triggers a graceful shutdown of OTLPReporter. func (r *OTLPReporter) Stop() { - close(r.stopSignal) + r.runLoop.Stop() } // GetMetrics returns internal metrics of OTLPReporter. @@ -370,41 +372,27 @@ func (r *OTLPReporter) Start(ctx context.Context) error { otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats) if err != nil { cancelReporting() - close(r.stopSignal) + r.runLoop.Stop() return err } r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn) - go func() { - tick := time.NewTicker(r.config.ReportInterval) - defer tick.Stop() - purgeTick := time.NewTicker(5 * time.Minute) - defer purgeTick.Stop() - for { - select { - case <-ctx.Done(): - return - case <-r.stopSignal: - return - case <-tick.C: - if err := r.reportOTLPProfile(ctx); err != nil { - log.Errorf("Request failed: %v", err) - } - tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2)) - case <-purgeTick.C: - // Allow the GC to purge expired entries to avoid memory leaks. - r.executables.PurgeExpired() - r.frames.PurgeExpired() - r.cgroupv2ID.PurgeExpired() - } + r.runLoop.Start(ctx, r.config.ReportInterval, func() { + if err := r.reportOTLPProfile(ctx); err != nil { + log.Errorf("Request failed: %v", err) } - }() + }, func() { + // Allow the GC to purge expired entries to avoid memory leaks. + r.executables.PurgeExpired() + r.frames.PurgeExpired() + r.cgroupv2ID.PurgeExpired() + }) // When Stop() is called and a signal to 'stop' is received, then: // - cancel the reporting functions currently running (using context) // - close the gRPC connection with collection-agent go func() { - <-r.stopSignal + <-r.runLoop.stopSignal cancelReporting() if err := otlpGrpcConn.Close(); err != nil { log.Fatalf("Stopping connection of OTLP client client failed: %v", err) diff --git a/reporter/runloop.go b/reporter/runloop.go new file mode 100644 index 00000000..0e5caf49 --- /dev/null +++ b/reporter/runloop.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" + +import ( + "context" + "time" + + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +// runLoop implements the run loop for all reporters +type runLoop struct { + // stopSignal is the stop signal for shutting down all background tasks. + stopSignal chan libpf.Void +} + +func (t *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, purge func()) { + go func() { + tick := time.NewTicker(reportInterval) + defer tick.Stop() + purgeTick := time.NewTicker(5 * time.Minute) + defer purgeTick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.stopSignal: + return + case <-tick.C: + run() + tick.Reset(libpf.AddJitter(reportInterval, 0.2)) + case <-purgeTick.C: + purge() + } + } + }() +} + +func (t *runLoop) Stop() { + close(t.stopSignal) +} From 88cf798ef21a68c3fcccf832ecb147e430ff5a7d Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Tue, 19 Nov 2024 18:53:00 +0100 Subject: [PATCH 2/4] Update reporter/runloop.go Co-authored-by: Christos Kalkanis --- reporter/runloop.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reporter/runloop.go b/reporter/runloop.go index 0e5caf49..4f7a3eb9 100644 --- a/reporter/runloop.go +++ b/reporter/runloop.go @@ -16,7 +16,7 @@ type runLoop struct { stopSignal chan libpf.Void } -func (t *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, purge func()) { +func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, purge func()) { go func() { tick := time.NewTicker(reportInterval) defer tick.Stop() From f72b742f1ccd6038f8c3570a0210529393284f44 Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Tue, 19 Nov 2024 18:53:06 +0100 Subject: [PATCH 3/4] Update reporter/runloop.go Co-authored-by: Christos Kalkanis --- reporter/runloop.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reporter/runloop.go b/reporter/runloop.go index 4f7a3eb9..ea7cf518 100644 --- a/reporter/runloop.go +++ b/reporter/runloop.go @@ -39,6 +39,6 @@ func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, }() } -func (t *runLoop) Stop() { - close(t.stopSignal) +func (rl *runLoop) Stop() { + close(rl.stopSignal) } From e4dc46d22c5fd723d425c030001ff505df5e7d58 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Tue, 19 Nov 2024 18:55:53 +0100 Subject: [PATCH 4/4] one more rename --- reporter/runloop.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reporter/runloop.go b/reporter/runloop.go index ea7cf518..0014d634 100644 --- a/reporter/runloop.go +++ b/reporter/runloop.go @@ -27,7 +27,7 @@ func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, select { case <-ctx.Done(): return - case <-t.stopSignal: + case <-rl.stopSignal: return case <-tick.C: run()