Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix initialization of the MetricProvider #5571

Merged
merged 1 commit into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

- Deprecate `service.ConfigServiceTelemetry`, `service.ConfigServiceTelemetryLogs`, and `service.ConfigServiceTelemetryMetrics` (#5565)

### 🧰 Bug fixes 🧰

- Fix initialization of the OpenTelemetry MetricProvider. (#5571)

## v0.54.0 Beta

### 🛑 Breaking changes 🛑
Expand Down
30 changes: 13 additions & 17 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (col *Collector) Shutdown() {

// runAndWaitForShutdownEvent waits for one of the shutdown events that can happen.
func (col *Collector) runAndWaitForShutdownEvent(ctx context.Context) error {
col.service.telemetry.Logger.Info("Everything is ready. Begin running and processing data.")
col.service.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.")

col.signalsChannel = make(chan os.Signal, 1)
// Only notify with SIGTERM and SIGINT if graceful shutdown is enabled.
Expand All @@ -139,11 +139,11 @@ LOOP:
select {
case err := <-col.set.ConfigProvider.Watch():
if err != nil {
col.service.telemetry.Logger.Error("Config watch failed", zap.Error(err))
col.service.telemetrySettings.Logger.Error("Config watch failed", zap.Error(err))
break LOOP
}

col.service.telemetry.Logger.Warn("Config updated, restart service")
col.service.telemetrySettings.Logger.Warn("Config updated, restart service")
col.setCollectorState(Closing)

if err = col.service.Shutdown(ctx); err != nil {
Expand All @@ -153,16 +153,16 @@ LOOP:
return fmt.Errorf("failed to setup configuration components: %w", err)
}
case err := <-col.asyncErrorChannel:
col.service.telemetry.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
col.service.telemetrySettings.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
break LOOP
case s := <-col.signalsChannel:
col.service.telemetry.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
col.service.telemetrySettings.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
break LOOP
case <-col.shutdownChan:
col.service.telemetry.Logger.Info("Received shutdown request")
col.service.telemetrySettings.Logger.Info("Received shutdown request")
break LOOP
case <-ctx.Done():
col.service.telemetry.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))
col.service.telemetrySettings.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))

// Call shutdown with background context as the passed in context has been canceled
return col.shutdown(context.Background())
Expand All @@ -187,19 +187,14 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
Config: cfg,
AsyncErrorChannel: col.asyncErrorChannel,
LoggingOptions: col.set.LoggingOptions,
telemetry: col.set.telemetry,
})
if err != nil {
return err
}

if !col.set.SkipSettingGRPCLogger {
telemetrylogs.SetColGRPCLogger(col.service.telemetry.Logger, cfg.Service.Telemetry.Logs.Level)
}

// TODO: Move this to the service initialization.
// It is called once because that is how it is implemented using sync.Once.
if err = col.set.telemetry.init(col.service); err != nil {
return err
telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
}

if err = col.service.Start(ctx); err != nil {
Expand All @@ -217,7 +212,7 @@ func (col *Collector) Run(ctx context.Context) error {
return err
}

col.service.telemetry.Logger.Info("Starting "+col.set.BuildInfo.Command+"...",
col.service.telemetrySettings.Logger.Info("Starting "+col.set.BuildInfo.Command+"...",
zap.String("Version", col.set.BuildInfo.Version),
zap.Int("NumCPU", runtime.NumCPU()),
)
Expand All @@ -233,7 +228,7 @@ func (col *Collector) shutdown(ctx context.Context) error {
var errs error

// Begin shutdown sequence.
col.service.telemetry.Logger.Info("Starting shutdown...")
col.service.telemetrySettings.Logger.Info("Starting shutdown...")

if err := col.set.ConfigProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err))
Expand All @@ -243,7 +238,8 @@ func (col *Collector) shutdown(ctx context.Context) error {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown service: %w", err))
}

if err := col.set.telemetry.shutdown(); err != nil {
// TODO: Move this as part of the service shutdown.
if err := col.service.telemetryInitializer.shutdown(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err))
}

Expand Down
17 changes: 4 additions & 13 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestCollectorReportError(t *testing.T) {
}

func TestCollectorFailedShutdown(t *testing.T) {
t.Skip("This test was using telemetry shutdown failure, switch to use a component that errors on shutdown.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this unnecessary now? Maybe delete it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you can see it is a TODO, to use a different design, and re-enable. Not sure how to do this without blowing the size of this PR.

factories, err := componenttest.NopFactories()
require.NoError(t, err)

Expand All @@ -143,7 +144,7 @@ func TestCollectorFailedShutdown(t *testing.T) {
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: &mockColTelemetry{},
telemetry: newColTelemetry(featuregate.NewRegistry()),
})
require.NoError(t, err)

Expand Down Expand Up @@ -265,7 +266,7 @@ func ownMetricsTestCases(version string) []ownMetricsTestCase {
}}
}

func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter, tc ownMetricsTestCase) {
func testCollectorStartHelper(t *testing.T, telemetry *telemetryInitializer, tc ownMetricsTestCase) {
factories, err := componenttest.NopFactories()
zpagesExt := zpagesextension.NewFactory()
factories.Extensions[zpagesExt.Type()] = zpagesExt
Expand All @@ -288,7 +289,7 @@ func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter
// Set the metrics address to expose own metrics on.
"service::telemetry::metrics::address": metricsAddr,
}
// Also include resource attributes under the service.telemetry.resource key.
// Also include resource attributes under the service::telemetry::resource key.
for k, v := range tc.userDefinedResource {
extraCfgAsProps["service::telemetry::resource::"+k] = v
}
Expand Down Expand Up @@ -396,16 +397,6 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) {
assert.Equal(t, Closed, col.GetState())
}

type mockColTelemetry struct{}

func (tel *mockColTelemetry) init(*service) error {
return nil
}

func (tel *mockColTelemetry) shutdown() error {
return errors.New("err1")
}

func assertMetrics(t *testing.T, metricsAddr string, expectedLabels map[string]labelValue) {
client := &http.Client{}
resp, err := client.Get("http://" + metricsAddr + "/metrics")
Expand Down
2 changes: 1 addition & 1 deletion service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestConfigValidate(t *testing.T) {
expected: nil,
},
{
name: "custom-service-telemetry-encoding",
name: "custom-service-telemetrySettings-encoding",
cfgFn: func() *Config {
cfg := generateConfig()
cfg.Service.Telemetry.Logs.Encoding = "test_encoding"
Expand Down
35 changes: 25 additions & 10 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@ import (
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/extensions"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
)

// service represents the implementation of a component.Host.
type service struct {
buildInfo component.BuildInfo
config *Config
telemetry component.TelemetrySettings
host *serviceHost
buildInfo component.BuildInfo
config *Config
telemetrySettings component.TelemetrySettings
host *serviceHost
telemetryInitializer *telemetryInitializer
}

func newService(set *settings) (*service, error) {
srv := &service{
buildInfo: set.BuildInfo,
config: set.Config,
telemetry: component.TelemetrySettings{
telemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(internal.AlwaysRecord()),
Expand All @@ -54,26 +58,37 @@ func newService(set *settings) (*service, error) {
buildInfo: set.BuildInfo,
asyncErrorChannel: set.AsyncErrorChannel,
},
telemetryInitializer: set.telemetry,
}

var err error
if srv.telemetry.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}

if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp

extensionsSettings := extensions.Settings{
Telemetry: srv.telemetry,
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Configs: srv.config.Extensions,
Factories: srv.host.factories.Extensions,
ServiceExtensions: srv.config.Service.Extensions,
}
if srv.host.extensions, err = extensions.Build(context.Background(), extensionsSettings); err != nil {
return nil, fmt.Errorf("cannot build extensions: %w", err)
return nil, fmt.Errorf("failed build extensions: %w", err)
}

if srv.host.pipelines, err = pipelines.Build(context.Background(), srv.telemetrySettings, srv.buildInfo, srv.config, srv.host.factories); err != nil {
return nil, fmt.Errorf("failed build pipelines: %w", err)
}

if srv.host.pipelines, err = pipelines.Build(context.Background(), srv.telemetry, srv.buildInfo, srv.config, srv.host.factories); err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
}

return srv, nil
Expand Down
10 changes: 8 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/service/featuregate"
"go.opentelemetry.io/collector/service/internal/configunmarshaler"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func TestService_GetFactory(t *testing.T) {
assert.Nil(t, srv.host.GetFactory(42, "nop"))
}

func TestService_GetExtensions(t *testing.T) {
func TestServiceGetExtensions(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)
srv := createExampleService(t, factories)
Expand All @@ -71,7 +72,7 @@ func TestService_GetExtensions(t *testing.T) {
assert.Contains(t, extMap, config.NewComponentID("nop"))
}

func TestService_GetExporters(t *testing.T) {
func TestServiceGetExporters(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)
srv := createExampleService(t, factories)
Expand All @@ -98,11 +99,16 @@ func createExampleService(t *testing.T, factories component.Factories) *service
cfg, err := configunmarshaler.New().Unmarshal(conf, factories)
require.NoError(t, err)

telemetry := newColTelemetry(featuregate.NewRegistry())
srv, err := newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: cfg,
telemetry: telemetry,
})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, telemetry.shutdown())
})
return srv
}
5 changes: 4 additions & 1 deletion service/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type settings struct {

// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option

// For testing purpose only.
telemetry *telemetryInitializer
}

// CollectorSettings holds configuration for creating a new Collector.
Expand Down Expand Up @@ -63,5 +66,5 @@ type CollectorSettings struct {
SkipSettingGRPCLogger bool

// For testing purpose only.
telemetry collectorTelemetryExporter
telemetry *telemetryInitializer
}
Loading