Skip to content

Commit

Permalink
Fix initialization of the MetricProvider
Browse files Browse the repository at this point in the history
The problem was that the MetricProvider is initialized into the "service.telemetry.MetricProvider" after components were created. This change was not a trivial change because the process telemetry initialization requires the ballast size, which is available after the extensions are initialized, because of that I split the initialization of the MetricProvider/oc.Registry from the initialization of the process telemetry.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jun 29, 2022
1 parent bb45d00 commit e8005ac
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 103 deletions.
30 changes: 13 additions & 17 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (col *Collector) Shutdown() {

// runAndWaitForShutdownEvent waits for one of the shutdown events that can happen.
func (col *Collector) runAndWaitForShutdownEvent(ctx context.Context) error {
col.service.telemetry.Logger.Info("Everything is ready. Begin running and processing data.")
col.service.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.")

col.signalsChannel = make(chan os.Signal, 1)
// Only notify with SIGTERM and SIGINT if graceful shutdown is enabled.
Expand All @@ -139,11 +139,11 @@ LOOP:
select {
case err := <-col.set.ConfigProvider.Watch():
if err != nil {
col.service.telemetry.Logger.Error("Config watch failed", zap.Error(err))
col.service.telemetrySettings.Logger.Error("Config watch failed", zap.Error(err))
break LOOP
}

col.service.telemetry.Logger.Warn("Config updated, restart service")
col.service.telemetrySettings.Logger.Warn("Config updated, restart service")
col.setCollectorState(Closing)

if err = col.service.Shutdown(ctx); err != nil {
Expand All @@ -153,16 +153,16 @@ LOOP:
return fmt.Errorf("failed to setup configuration components: %w", err)
}
case err := <-col.asyncErrorChannel:
col.service.telemetry.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
col.service.telemetrySettings.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
break LOOP
case s := <-col.signalsChannel:
col.service.telemetry.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
col.service.telemetrySettings.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
break LOOP
case <-col.shutdownChan:
col.service.telemetry.Logger.Info("Received shutdown request")
col.service.telemetrySettings.Logger.Info("Received shutdown request")
break LOOP
case <-ctx.Done():
col.service.telemetry.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))
col.service.telemetrySettings.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))

// Call shutdown with background context as the passed in context has been canceled
return col.shutdown(context.Background())
Expand All @@ -187,19 +187,14 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
Config: cfg,
AsyncErrorChannel: col.asyncErrorChannel,
LoggingOptions: col.set.LoggingOptions,
telemetry: col.set.telemetry,
})
if err != nil {
return err
}

if !col.set.SkipSettingGRPCLogger {
telemetrylogs.SetColGRPCLogger(col.service.telemetry.Logger, cfg.Service.Telemetry.Logs.Level)
}

// TODO: Move this to the service initialization.
// It is called once because that is how it is implemented using sync.Once.
if err = col.set.telemetry.init(col.service); err != nil {
return err
telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
}

if err = col.service.Start(ctx); err != nil {
Expand All @@ -217,7 +212,7 @@ func (col *Collector) Run(ctx context.Context) error {
return err
}

col.service.telemetry.Logger.Info("Starting "+col.set.BuildInfo.Command+"...",
col.service.telemetrySettings.Logger.Info("Starting "+col.set.BuildInfo.Command+"...",
zap.String("Version", col.set.BuildInfo.Version),
zap.Int("NumCPU", runtime.NumCPU()),
)
Expand All @@ -233,7 +228,7 @@ func (col *Collector) shutdown(ctx context.Context) error {
var errs error

// Begin shutdown sequence.
col.service.telemetry.Logger.Info("Starting shutdown...")
col.service.telemetrySettings.Logger.Info("Starting shutdown...")

if err := col.set.ConfigProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err))
Expand All @@ -243,7 +238,8 @@ func (col *Collector) shutdown(ctx context.Context) error {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown service: %w", err))
}

if err := col.set.telemetry.shutdown(); err != nil {
// TODO: Move this as part of the service shutdown.
if err := col.service.telemetryInitializer.shutdown(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err))
}

Expand Down
17 changes: 4 additions & 13 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestCollectorReportError(t *testing.T) {
}

func TestCollectorFailedShutdown(t *testing.T) {
t.Skip("This test was using telemetry shutdown failure, switch to use a component that errors on shutdown.")
factories, err := componenttest.NopFactories()
require.NoError(t, err)

Expand All @@ -143,7 +144,7 @@ func TestCollectorFailedShutdown(t *testing.T) {
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: &mockColTelemetry{},
telemetry: newColTelemetry(featuregate.NewRegistry()),
})
require.NoError(t, err)

Expand Down Expand Up @@ -265,7 +266,7 @@ func ownMetricsTestCases(version string) []ownMetricsTestCase {
}}
}

func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter, tc ownMetricsTestCase) {
func testCollectorStartHelper(t *testing.T, telemetry *telemetryInitializer, tc ownMetricsTestCase) {
factories, err := componenttest.NopFactories()
zpagesExt := zpagesextension.NewFactory()
factories.Extensions[zpagesExt.Type()] = zpagesExt
Expand All @@ -288,7 +289,7 @@ func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter
// Set the metrics address to expose own metrics on.
"service::telemetry::metrics::address": metricsAddr,
}
// Also include resource attributes under the service.telemetry.resource key.
// Also include resource attributes under the service::telemetry::resource key.
for k, v := range tc.userDefinedResource {
extraCfgAsProps["service::telemetry::resource::"+k] = v
}
Expand Down Expand Up @@ -396,16 +397,6 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) {
assert.Equal(t, Closed, col.GetState())
}

type mockColTelemetry struct{}

func (tel *mockColTelemetry) init(*service) error {
return nil
}

func (tel *mockColTelemetry) shutdown() error {
return errors.New("err1")
}

func assertMetrics(t *testing.T, metricsAddr string, expectedLabels map[string]labelValue) {
client := &http.Client{}
resp, err := client.Get("http://" + metricsAddr + "/metrics")
Expand Down
2 changes: 1 addition & 1 deletion service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestConfigValidate(t *testing.T) {
expected: nil,
},
{
name: "custom-service-telemetry-encoding",
name: "custom-service-telemetrySettings-encoding",
cfgFn: func() *Config {
cfg := generateConfig()
cfg.Service.Telemetry.Logs.Encoding = "test_encoding"
Expand Down
35 changes: 25 additions & 10 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@ import (
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/extensions"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
)

// service represents the implementation of a component.Host.
type service struct {
buildInfo component.BuildInfo
config *Config
telemetry component.TelemetrySettings
host *serviceHost
buildInfo component.BuildInfo
config *Config
telemetrySettings component.TelemetrySettings
host *serviceHost
telemetryInitializer *telemetryInitializer
}

func newService(set *settings) (*service, error) {
srv := &service{
buildInfo: set.BuildInfo,
config: set.Config,
telemetry: component.TelemetrySettings{
telemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(internal.AlwaysRecord()),
Expand All @@ -54,26 +58,37 @@ func newService(set *settings) (*service, error) {
buildInfo: set.BuildInfo,
asyncErrorChannel: set.AsyncErrorChannel,
},
telemetryInitializer: set.telemetry,
}

var err error
if srv.telemetry.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}

if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp

extensionsSettings := extensions.Settings{
Telemetry: srv.telemetry,
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Configs: srv.config.Extensions,
Factories: srv.host.factories.Extensions,
ServiceExtensions: srv.config.Service.Extensions,
}
if srv.host.extensions, err = extensions.Build(context.Background(), extensionsSettings); err != nil {
return nil, fmt.Errorf("cannot build extensions: %w", err)
return nil, fmt.Errorf("failed build extensions: %w", err)
}

if srv.host.pipelines, err = pipelines.Build(context.Background(), srv.telemetrySettings, srv.buildInfo, srv.config, srv.host.factories); err != nil {
return nil, fmt.Errorf("failed build pipelines: %w", err)
}

if srv.host.pipelines, err = pipelines.Build(context.Background(), srv.telemetry, srv.buildInfo, srv.config, srv.host.factories); err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
}

return srv, nil
Expand Down
10 changes: 8 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/service/featuregate"
"go.opentelemetry.io/collector/service/internal/configunmarshaler"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func TestService_GetFactory(t *testing.T) {
assert.Nil(t, srv.host.GetFactory(42, "nop"))
}

func TestService_GetExtensions(t *testing.T) {
func TestServiceGetExtensions(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)
srv := createExampleService(t, factories)
Expand All @@ -71,7 +72,7 @@ func TestService_GetExtensions(t *testing.T) {
assert.Contains(t, extMap, config.NewComponentID("nop"))
}

func TestService_GetExporters(t *testing.T) {
func TestServiceGetExporters(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)
srv := createExampleService(t, factories)
Expand All @@ -98,11 +99,16 @@ func createExampleService(t *testing.T, factories component.Factories) *service
cfg, err := configunmarshaler.New().Unmarshal(conf, factories)
require.NoError(t, err)

telemetry := newColTelemetry(featuregate.NewRegistry())
srv, err := newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: cfg,
telemetry: telemetry,
})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, telemetry.shutdown())
})
return srv
}
5 changes: 4 additions & 1 deletion service/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type settings struct {

// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option

// For testing purpose only.
telemetry *telemetryInitializer
}

// CollectorSettings holds configuration for creating a new Collector.
Expand Down Expand Up @@ -63,5 +66,5 @@ type CollectorSettings struct {
SkipSettingGRPCLogger bool

// For testing purpose only.
telemetry collectorTelemetryExporter
telemetry *telemetryInitializer
}
Loading

0 comments on commit e8005ac

Please sign in to comment.