Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Config Provider as an option to set the initial configuration and update it. #1010

Merged
merged 9 commits into from
Aug 20, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http
- Support `golang.org/x/net` `v0.28.0`. ([#988](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/988))
- Support `google.golang.org/grpc` `1.67.0-dev`. ([#1007](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1007))
- Support Go `1.23.0`. ([#1007](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1007))
- Introduce `config.Provider` as an option to set the initial configuration and update it in runtime. ([#1010](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1010))

### Fixed

Expand Down
73 changes: 73 additions & 0 deletions config/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package config

import (
"context"

"go.opentelemetry.io/otel/trace"
)

// InstrumentationLibraryID is used to identify an instrumentation library.
type InstrumentationLibraryID struct {
// Name of the instrumentation pkg (e.g. "net/http").
InstrumentedPkg string
// SpanKind is the relevant span kind for the instrumentation.
// This can be used to configure server-only, client-only spans.
// If not set, the identifier is assumed to be applicable to all span kinds relevant to the instrumented package.
SpanKind trace.SpanKind
}

// InstrumentationLibrary is used to configure instrumentation for a specific library.
type InstrumentationLibrary struct {
// TracesEnabled determines whether traces are enabled for the instrumentation library.
// if nil - take DefaultTracesDisabled value.
TracesEnabled *bool
}

// InstrumentationConfig is used to configure instrumentation.
type InstrumentationConfig struct {
// InstrumentationLibraryConfigs defines library-specific configuration.
// If a package is referenced by more than one key, the most specific key is used.
// For example, if ("net/http", unspecified) and ("net/http", client) are both present,
// the configuration for ("net/http", client) is used for client spans and the configuration for ("net/http", unspecified) is used for server spans.
InstrumentationLibraryConfigs map[InstrumentationLibraryID]InstrumentationLibrary

// DefaultTracesDisabled determines whether traces are disabled by default.
// If set to true, traces are disabled by default for all libraries, unless the library is explicitly enabled.
// If set to false, traces are enabled by default for all libraries, unless the library is explicitly disabled.
// default is false - traces are enabled by default.
DefaultTracesDisabled bool
}

// Provider provides the initial configuration and updates to the instrumentation configuration.
type Provider interface {
// InitialConfig returns the initial instrumentation configuration.
InitialConfig(ctx context.Context) InstrumentationConfig
// Watch returns a channel that receives updates to the instrumentation configuration.
Watch() <-chan InstrumentationConfig
// Shutdown releases any resources held by the provider.
Shutdown(ctx context.Context) error
}

type noopProvider struct{}

// NewNoopProvider returns a provider that does not provide any updates and provide the default configuration as the initial one.
func NewNoopProvider() Provider {
return &noopProvider{}
}

func (p *noopProvider) InitialConfig(_ context.Context) InstrumentationConfig {
return InstrumentationConfig{}
}

func (p *noopProvider) Watch() <-chan InstrumentationConfig {
c := make(chan InstrumentationConfig)
close(c)
return c
}

func (p *noopProvider) Shutdown(_ context.Context) error {
return nil
}
25 changes: 21 additions & 4 deletions instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"

"go.opentelemetry.io/auto/config"
"go.opentelemetry.io/auto/internal/pkg/instrumentation"
"go.opentelemetry.io/auto/internal/pkg/opentelemetry"
"go.opentelemetry.io/auto/internal/pkg/process"
Expand Down Expand Up @@ -74,11 +75,11 @@ func newLogger(logLevel LogLevel) logr.Logger {
level, _ = zap.ParseAtomicLevel(LogLevelInfo.String())
}

config := zap.NewProductionConfig()
c := zap.NewProductionConfig()

config.Level.SetLevel(level.Level())
c.Level.SetLevel(level.Level())

zapLog, err := config.Build()
zapLog, err := c.Build()

var logger logr.Logger
if err != nil {
Expand Down Expand Up @@ -130,7 +131,7 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
return nil, err
}

mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl, c.loadIndicator)
mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl, c.loadIndicator, c.cp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -221,6 +222,7 @@ type instConfig struct {
globalImpl bool
loadIndicator chan struct{}
logLevel LogLevel
cp config.Provider
}

func newInstConfig(ctx context.Context, opts []InstrumentationOption) (instConfig, error) {
Expand Down Expand Up @@ -255,6 +257,10 @@ func newInstConfig(ctx context.Context, opts []InstrumentationOption) (instConfi
c.logLevel = LogLevelInfo
}

if c.cp == nil {
c.cp = config.NewNoopProvider()
}

return c, err
}

Expand Down Expand Up @@ -562,3 +568,14 @@ func WithLogLevel(level LogLevel) InstrumentationOption {
return c, nil
})
}

// WithConfigProvider returns an [InstrumentationOption] that will configure
// an [Instrumentation] to use the provided config.Provider. The config.Provider
// is used to provide the initial configuration and update the configuration of
// the instrumentation in runtime.
func WithConfigProvider(cp config.Provider) InstrumentationOption {
return fnOpt(func(_ context.Context, c instConfig) (instConfig, error) {
c.cp = cp
return c, nil
})
}
146 changes: 129 additions & 17 deletions internal/pkg/instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/cilium/ebpf/rlimit"
"github.com/go-logr/logr"

"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/auto/config"
dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql"
kafkaConsumer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer"
kafkaProducer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer"
Expand Down Expand Up @@ -42,17 +45,25 @@ type Manager struct {
otelController *opentelemetry.Controller
globalImpl bool
loadedIndicator chan struct{}
cp config.Provider
exe *link.Executable
td *process.TargetDetails
runningProbesWG sync.WaitGroup
eventCh chan *probe.Event
currentConfig config.InstrumentationConfig
}

// NewManager returns a new [Manager].
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}) (*Manager, error) {
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}, cp config.Provider) (*Manager, error) {
logger = logger.WithName("Manager")
m := &Manager{
logger: logger,
probes: make(map[probe.ID]probe.Probe),
otelController: otelController,
globalImpl: globalImpl,
loadedIndicator: loadIndicator,
cp: cp,
eventCh: make(chan *probe.Event),
}

err := m.registerProbes()
Expand Down Expand Up @@ -133,43 +144,136 @@ func (m *Manager) FilterUnusedProbes(target *process.TargetDetails) {
}
}

func getProbeConfig(id probe.ID, c config.InstrumentationConfig) (config.InstrumentationLibrary, bool) {
libKindID := config.InstrumentationLibraryID{
InstrumentedPkg: id.InstrumentedPkg,
SpanKind: id.SpanKind,
}

if lib, ok := c.InstrumentationLibraryConfigs[libKindID]; ok {
return lib, true
}

libID := config.InstrumentationLibraryID{
InstrumentedPkg: id.InstrumentedPkg,
SpanKind: trace.SpanKindUnspecified,
}

if lib, ok := c.InstrumentationLibraryConfigs[libID]; ok {
return lib, true
}

return config.InstrumentationLibrary{}, false
}

func isProbeEnabled(id probe.ID, c config.InstrumentationConfig) bool {
if pc, ok := getProbeConfig(id, c); ok && pc.TracesEnabled != nil {
return *pc.TracesEnabled
}
return !c.DefaultTracesDisabled
}

func (m *Manager) applyConfig(c config.InstrumentationConfig) error {
if m.td == nil {
return errors.New("failed to apply config: target details not set")
}
if m.exe == nil {
return errors.New("failed to apply config: executable not set")
}

var err error

for id, p := range m.probes {
currentlyEnabled := isProbeEnabled(id, m.currentConfig)
newEnabled := isProbeEnabled(id, c)

if currentlyEnabled && !newEnabled {
m.logger.Info("Disabling probe", "id", id)
err = errors.Join(err, p.Close())
continue
}

if !currentlyEnabled && newEnabled {
m.logger.Info("Enabling probe", "id", id)
err = errors.Join(err, p.Load(m.exe, m.td))
if err == nil {
m.runProbe(p)
}
continue
}
}

return nil
}

func (m *Manager) runProbe(p probe.Probe) {
m.runningProbesWG.Add(1)
go func(ap probe.Probe) {
defer m.runningProbesWG.Done()
ap.Run(m.eventCh)
}(p)
}

func (m *Manager) ConfigLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case c, ok := <-m.cp.Watch():
if !ok {
m.logger.Info("Configuration provider closed, configuration updates will no longer be received")
return
}
err := m.applyConfig(c)
if err != nil {
m.logger.Error(err, "Failed to apply config")
continue
}
m.currentConfig = c
}
}
}

// Run runs the event processing loop for all managed probes.
func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error {
if len(m.probes) == 0 {
return errors.New("no instrumentation for target process")
}
if m.cp == nil {
return errors.New("no config provider set")
}

m.currentConfig = m.cp.InitialConfig(ctx)

err := m.load(target)
if err != nil {
return err
}

eventCh := make(chan *probe.Event)
var wg sync.WaitGroup
for _, i := range m.probes {
wg.Add(1)
go func(p probe.Probe) {
defer wg.Done()
p.Run(eventCh)
}(i)
for id, p := range m.probes {
if isProbeEnabled(id, m.currentConfig) {
m.runProbe(p)
}
}

if m.loadedIndicator != nil {
close(m.loadedIndicator)
}

go m.ConfigLoop(ctx)

for {
select {
case <-ctx.Done():
m.logger.V(1).Info("Shutting down all probes")
err := m.cleanup(target)

// Wait for all probes to stop before closing the chan they send on.
wg.Wait()
close(eventCh)
m.runningProbesWG.Wait()
close(m.eventCh)

return errors.Join(err, ctx.Err())
case e := <-eventCh:
case e := <-m.eventCh:
m.otelController.Trace(e)
}
}
Expand All @@ -185,18 +289,25 @@ func (m *Manager) load(target *process.TargetDetails) error {
if err != nil {
return err
}
m.exe = exe

if m.td == nil {
m.td = target
}

if err := m.mount(target); err != nil {
return err
}

// Load probes
for name, i := range m.probes {
m.logger.V(0).Info("loading probe", "name", name)
err := i.Load(exe, target)
if err != nil {
m.logger.Error(err, "error while loading probes, cleaning up", "name", name)
return errors.Join(err, m.cleanup(target))
if isProbeEnabled(name, m.currentConfig) {
m.logger.V(0).Info("loading probe", "name", name)
err := i.Load(exe, target)
if err != nil {
m.logger.Error(err, "error while loading probes, cleaning up", "name", name)
return errors.Join(err, m.cleanup(target))
}
}
}

Expand All @@ -215,6 +326,7 @@ func (m *Manager) mount(target *process.TargetDetails) error {

func (m *Manager) cleanup(target *process.TargetDetails) error {
var err error
err = errors.Join(err, m.cp.Shutdown(context.Background()))
for _, i := range m.probes {
err = errors.Join(err, i.Close())
}
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/instrumentation/manager_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-logr/stdr"
"github.com/hashicorp/go-version"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/auto/config"
"go.opentelemetry.io/auto/internal/pkg/inject"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/testutils"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
Expand Down Expand Up @@ -49,7 +50,7 @@ func fakeManager(t *testing.T) *Manager {
logger := stdr.New(log.New(os.Stderr, "", log.LstdFlags))
logger = logger.WithName("Instrumentation")

m, err := NewManager(logger, nil, true, nil)
m, err := NewManager(logger, nil, true, nil, config.NewNoopProvider())
assert.NoError(t, err)
assert.NotNil(t, m)

Expand Down
Loading
Loading