diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 1dcace29..b140f412 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -104,8 +104,8 @@ type OTLPReporter struct { // client for the connection to the receiver. client otlpcollector.ProfilesServiceClient - // stopSignal is the stop signal for shutting down all background tasks. - stopSignal chan libpf.Void + // runLoop handles the run loop + runLoop *runLoop // rpcStats stores gRPC related statistics. rpcStats *StatsHandlerImpl @@ -182,15 +182,17 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { } return &OTLPReporter{ - config: cfg, - name: cfg.Name, - version: cfg.Version, - kernelVersion: cfg.KernelVersion, - hostName: cfg.HostName, - ipAddress: cfg.IPAddress, - samplesPerSecond: cfg.SamplesPerSecond, - hostID: strconv.FormatUint(cfg.HostID, 10), - stopSignal: make(chan libpf.Void), + config: cfg, + name: cfg.Name, + version: cfg.Version, + kernelVersion: cfg.KernelVersion, + hostName: cfg.HostName, + ipAddress: cfg.IPAddress, + samplesPerSecond: cfg.SamplesPerSecond, + hostID: strconv.FormatUint(cfg.HostID, 10), + runLoop: &runLoop{ + stopSignal: make(chan libpf.Void), + }, pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, client: nil, rpcStats: NewStatsHandler(), @@ -348,7 +350,7 @@ func (r *OTLPReporter) ReportMetrics(_ uint32, _ []uint32, _ []int64) {} // Stop triggers a graceful shutdown of OTLPReporter. func (r *OTLPReporter) Stop() { - close(r.stopSignal) + r.runLoop.Stop() } // GetMetrics returns internal metrics of OTLPReporter. @@ -372,41 +374,27 @@ func (r *OTLPReporter) Start(ctx context.Context) error { otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats) if err != nil { cancelReporting() - close(r.stopSignal) + r.runLoop.Stop() return err } r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn) - go func() { - tick := time.NewTicker(r.config.ReportInterval) - defer tick.Stop() - purgeTick := time.NewTicker(5 * time.Minute) - defer purgeTick.Stop() - for { - select { - case <-ctx.Done(): - return - case <-r.stopSignal: - return - case <-tick.C: - if err := r.reportOTLPProfile(ctx); err != nil { - log.Errorf("Request failed: %v", err) - } - tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2)) - case <-purgeTick.C: - // Allow the GC to purge expired entries to avoid memory leaks. - r.executables.PurgeExpired() - r.frames.PurgeExpired() - r.cgroupv2ID.PurgeExpired() - } + r.runLoop.Start(ctx, r.config.ReportInterval, func() { + if err := r.reportOTLPProfile(ctx); err != nil { + log.Errorf("Request failed: %v", err) } - }() + }, func() { + // Allow the GC to purge expired entries to avoid memory leaks. + r.executables.PurgeExpired() + r.frames.PurgeExpired() + r.cgroupv2ID.PurgeExpired() + }) // When Stop() is called and a signal to 'stop' is received, then: // - cancel the reporting functions currently running (using context) // - close the gRPC connection with collection-agent go func() { - <-r.stopSignal + <-r.runLoop.stopSignal cancelReporting() if err := otlpGrpcConn.Close(); err != nil { log.Fatalf("Stopping connection of OTLP client client failed: %v", err) diff --git a/reporter/runloop.go b/reporter/runloop.go new file mode 100644 index 00000000..0014d634 --- /dev/null +++ b/reporter/runloop.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" + +import ( + "context" + "time" + + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +// runLoop implements the run loop for all reporters +type runLoop struct { + // stopSignal is the stop signal for shutting down all background tasks. + stopSignal chan libpf.Void +} + +func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, purge func()) { + go func() { + tick := time.NewTicker(reportInterval) + defer tick.Stop() + purgeTick := time.NewTicker(5 * time.Minute) + defer purgeTick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-rl.stopSignal: + return + case <-tick.C: + run() + tick.Reset(libpf.AddJitter(reportInterval, 0.2)) + case <-purgeTick.C: + purge() + } + } + }() +} + +func (rl *runLoop) Stop() { + close(rl.stopSignal) +}