Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shutdown panic #980

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http

## [Unreleased]

### Fixed

- Fix dirty shutdown caused by panic. ([#980](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/980))

## [v0.14.0-alpha] - 2024-07-15

### Added
Expand Down
44 changes: 42 additions & 2 deletions instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"

"github.com/go-logr/logr"
"github.com/go-logr/stdr"
Expand Down Expand Up @@ -57,6 +58,10 @@ type Instrumentation struct {
target *process.TargetDetails
analyzer *process.Analyzer
manager *instrumentation.Manager

stopMu sync.Mutex
stop context.CancelFunc
stopped chan struct{}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}

// Error message returned when instrumentation is launched without a valid target
Expand Down Expand Up @@ -158,13 +163,48 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
}

// Run starts the instrumentation.
//
// This function will not return until either ctx is done, an unrecoverable
// error is encountered, or Close is called.
func (i *Instrumentation) Run(ctx context.Context) error {
return i.manager.Run(ctx, i.target)
ctx, err := i.newStop(ctx)
if err != nil {
return err
}

err = i.manager.Run(ctx, i.target)
close(i.stopped)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil
}
return err
}

func (i *Instrumentation) newStop(parent context.Context) (context.Context, error) {
i.stopMu.Lock()
defer i.stopMu.Unlock()

if i.stop != nil {
return parent, errors.New("instrumentation already running")
}

ctx, stop := context.WithCancel(parent)
i.stop, i.stopped = stop, make(chan struct{})
return ctx, nil
}

// Close closes the Instrumentation, cleaning up all used resources.
func (i *Instrumentation) Close() error {
return i.manager.Close()
i.stopMu.Lock()
defer i.stopMu.Unlock()

if i.stop != nil {
i.stop()
<-i.stopped

i.stop, i.stopped = nil, nil
}
return nil
}

// InstrumentationOption applies a configuration option to [Instrumentation].
Expand Down
63 changes: 26 additions & 37 deletions internal/pkg/instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ import (
"go.opentelemetry.io/auto/internal/pkg/process"
)

// Function variables overridden in testing.
var (
openExecutable = link.OpenExecutable
rlimitRemoveMemlock = rlimit.RemoveMemlock
bpffsMount = bpffs.Mount
bpffsCleanup = bpffs.Cleanup
)

// Manager handles the management of [probe.Probe] instances.
type Manager struct {
logger logr.Logger
probes map[probe.ID]probe.Probe
done chan bool
incomingEvents chan *probe.Event
otelController *opentelemetry.Controller
globalImpl bool
wg sync.WaitGroup
closingErrors chan error
loadedIndicator chan struct{}
}

Expand All @@ -46,11 +50,8 @@ func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, gl
m := &Manager{
logger: logger,
probes: make(map[probe.ID]probe.Probe),
done: make(chan bool, 1),
incomingEvents: make(chan *probe.Event),
otelController: otelController,
globalImpl: globalImpl,
closingErrors: make(chan error, 1),
loadedIndicator: loadIndicator,
}

Expand Down Expand Up @@ -135,22 +136,21 @@ func (m *Manager) FilterUnusedProbes(target *process.TargetDetails) {
// Run runs the event processing loop for all managed probes.
func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error {
if len(m.probes) == 0 {
err := errors.New("no instrumentation for target process")
close(m.closingErrors)
return err
return errors.New("no instrumentation for target process")
}

err := m.load(target)
if err != nil {
close(m.closingErrors)
return err
}

m.wg.Add(len(m.probes))
eventCh := make(chan *probe.Event)
var wg sync.WaitGroup
for _, i := range m.probes {
wg.Add(1)
go func(p probe.Probe) {
defer m.wg.Done()
p.Run(m.incomingEvents)
defer wg.Done()
p.Run(eventCh)
}(i)
}

Expand All @@ -161,29 +161,27 @@ func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error
for {
select {
case <-ctx.Done():
m.logger.V(1).Info("shutting down all probes due to context cancellation")
err := m.cleanup(target)
err = errors.Join(err, ctx.Err())
m.closingErrors <- err
return nil
case <-m.done:
m.logger.V(1).Info("shutting down all probes due to signal")
m.logger.V(1).Info("Shutting down all probes")
err := m.cleanup(target)
m.closingErrors <- err
return nil
case e := <-m.incomingEvents:

// Wait for all probes to stop before closing the chan they send on.
wg.Wait()
close(eventCh)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

return errors.Join(err, ctx.Err())
case e := <-eventCh:
m.otelController.Trace(e)
}
}
}

func (m *Manager) load(target *process.TargetDetails) error {
// Remove resource limits for kernels <5.11.
if err := rlimit.RemoveMemlock(); err != nil {
if err := rlimitRemoveMemlock(); err != nil {
return err
}

exe, err := link.OpenExecutable(fmt.Sprintf("/proc/%d/exe", target.PID))
exe, err := openExecutable(fmt.Sprintf("/proc/%d/exe", target.PID))
if err != nil {
return err
}
Expand Down Expand Up @@ -212,26 +210,17 @@ func (m *Manager) mount(target *process.TargetDetails) error {
} else {
m.logger.V(1).Info("Mounting bpffs")
}
return bpffs.Mount(target)
return bpffsMount(target)
}

func (m *Manager) cleanup(target *process.TargetDetails) error {
var err error
close(m.incomingEvents)
for _, i := range m.probes {
err = errors.Join(err, i.Close())
}

m.logger.V(1).Info("Cleaning bpffs")
return errors.Join(err, bpffs.Cleanup(target))
}

// Close closes m.
func (m *Manager) Close() error {
m.done <- true
err := <-m.closingErrors
m.wg.Wait()
return err
return errors.Join(err, bpffsCleanup(target))
}

func (m *Manager) registerProbes() error {
Expand Down
87 changes: 87 additions & 0 deletions internal/pkg/instrumentation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package instrumentation

import (
"context"
"log"
"os"
"testing"
"time"

"github.com/cilium/ebpf/link"
"github.com/go-logr/stdr"
"github.com/hashicorp/go-version"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -186,3 +189,87 @@ func fakeManager(t *testing.T) *Manager {

return m
}

func TestRunStopping(t *testing.T) {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
probeStop := make(chan struct{})
p := newSlowProbe(probeStop)

logger := stdr.New(log.New(os.Stderr, "", log.LstdFlags))
logger = logger.WithName("Instrumentation")

m := &Manager{
logger: logger.WithName("Manager"),
probes: map[probe.ID]probe.Probe{{}: p},
}

origOpenExecutable := openExecutable
openExecutable = func(string) (*link.Executable, error) { return nil, nil }
t.Cleanup(func() { openExecutable = origOpenExecutable })

origRlimitRemoveMemlock := rlimitRemoveMemlock
rlimitRemoveMemlock = func() error { return nil }
t.Cleanup(func() { rlimitRemoveMemlock = origRlimitRemoveMemlock })

origBpffsMount := bpffsMount
bpffsMount = func(*process.TargetDetails) error { return nil }
t.Cleanup(func() { bpffsMount = origBpffsMount })

origBpffsCleanup := bpffsCleanup
bpffsCleanup = func(*process.TargetDetails) error { return nil }
t.Cleanup(func() { bpffsCleanup = origBpffsCleanup })

ctx, stopCtx := context.WithCancel(context.Background())
errCh := make(chan error, 1)
go func() { errCh <- m.Run(ctx, &process.TargetDetails{PID: 1000}) }()

assert.NotPanics(t, func() {
stopCtx()
assert.Eventually(t, func() bool {
select {
case <-p.closeSignal:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
close(probeStop)
})

var err error
assert.Eventually(t, func() bool {
select {
case err = <-errCh:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
assert.ErrorIs(t, err, context.Canceled, "Stopping Run error")
}

type slowProbe struct {
probe.Probe

closeSignal chan struct{}
stop chan struct{}
}

func newSlowProbe(stop chan struct{}) slowProbe {
return slowProbe{
closeSignal: make(chan struct{}),
stop: stop,
}
}

func (p slowProbe) Load(*link.Executable, *process.TargetDetails) error {
return nil
}

func (p slowProbe) Run(c chan<- *probe.Event) {
}

func (p slowProbe) Close() error {
p.closeSignal <- struct{}{}
<-p.stop
return nil
}