Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract reporter runloop #228

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 25 additions & 37 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ type OTLPReporter struct {
// client for the connection to the receiver.
client otlpcollector.ProfilesServiceClient

// stopSignal is the stop signal for shutting down all background tasks.
stopSignal chan libpf.Void
// runLoop handles the run loop
runLoop *runLoop

// rpcStats stores gRPC related statistics.
rpcStats *StatsHandlerImpl
Expand Down Expand Up @@ -180,15 +180,17 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) {
}

return &OTLPReporter{
config: cfg,
name: cfg.Name,
version: cfg.Version,
kernelVersion: cfg.KernelVersion,
hostName: cfg.HostName,
ipAddress: cfg.IPAddress,
samplesPerSecond: cfg.SamplesPerSecond,
hostID: strconv.FormatUint(cfg.HostID, 10),
stopSignal: make(chan libpf.Void),
config: cfg,
name: cfg.Name,
version: cfg.Version,
kernelVersion: cfg.KernelVersion,
hostName: cfg.HostName,
ipAddress: cfg.IPAddress,
samplesPerSecond: cfg.SamplesPerSecond,
hostID: strconv.FormatUint(cfg.HostID, 10),
runLoop: &runLoop{
stopSignal: make(chan libpf.Void),
},
pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout,
client: nil,
rpcStats: NewStatsHandler(),
Expand Down Expand Up @@ -346,7 +348,7 @@ func (r *OTLPReporter) ReportMetrics(_ uint32, _ []uint32, _ []int64) {}

// Stop triggers a graceful shutdown of OTLPReporter.
func (r *OTLPReporter) Stop() {
close(r.stopSignal)
r.runLoop.Stop()
}

// GetMetrics returns internal metrics of OTLPReporter.
Expand All @@ -370,41 +372,27 @@ func (r *OTLPReporter) Start(ctx context.Context) error {
otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats)
if err != nil {
cancelReporting()
close(r.stopSignal)
r.runLoop.Stop()
return err
}
r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn)

go func() {
tick := time.NewTicker(r.config.ReportInterval)
defer tick.Stop()
purgeTick := time.NewTicker(5 * time.Minute)
defer purgeTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-r.stopSignal:
return
case <-tick.C:
if err := r.reportOTLPProfile(ctx); err != nil {
log.Errorf("Request failed: %v", err)
}
tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2))
case <-purgeTick.C:
// Allow the GC to purge expired entries to avoid memory leaks.
r.executables.PurgeExpired()
r.frames.PurgeExpired()
r.cgroupv2ID.PurgeExpired()
}
r.runLoop.Start(ctx, r.config.ReportInterval, func() {
if err := r.reportOTLPProfile(ctx); err != nil {
log.Errorf("Request failed: %v", err)
}
}()
}, func() {
// Allow the GC to purge expired entries to avoid memory leaks.
r.executables.PurgeExpired()
r.frames.PurgeExpired()
r.cgroupv2ID.PurgeExpired()
})

// When Stop() is called and a signal to 'stop' is received, then:
// - cancel the reporting functions currently running (using context)
// - close the gRPC connection with collection-agent
go func() {
<-r.stopSignal
<-r.runLoop.stopSignal
cancelReporting()
if err := otlpGrpcConn.Close(); err != nil {
log.Fatalf("Stopping connection of OTLP client client failed: %v", err)
Expand Down
44 changes: 44 additions & 0 deletions reporter/runloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"

import (
"context"
"time"

"go.opentelemetry.io/ebpf-profiler/libpf"
)

// runLoop implements the run loop for all reporters
type runLoop struct {
// stopSignal is the stop signal for shutting down all background tasks.
stopSignal chan libpf.Void
}

func (t *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, purge func()) {
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
go func() {
tick := time.NewTicker(reportInterval)
defer tick.Stop()
purgeTick := time.NewTicker(5 * time.Minute)
defer purgeTick.Stop()

for {
select {
case <-ctx.Done():
return
case <-t.stopSignal:
return
case <-tick.C:
run()
tick.Reset(libpf.AddJitter(reportInterval, 0.2))
case <-purgeTick.C:
purge()
}
}
}()
}

func (t *runLoop) Stop() {
close(t.stopSignal)
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
}