From 124b27ae582908cb6e918bff07e37f0d44108b2a Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 20 Sep 2021 02:01:21 -0700 Subject: [PATCH] Split Collector from the cobra.Command. Signed-off-by: Bogdan Drutu --- CHANGELOG.md | 9 ++--- cmd/otelcol/main.go | 4 +-- service/collector.go | 65 ++++++------------------------------ service/collector_test.go | 13 ++++---- service/collector_windows.go | 7 ++-- service/command.go | 54 ++++++++++++++++++++++++++++++ 6 files changed, 84 insertions(+), 68 deletions(-) create mode 100644 service/command.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b1813f40833..f3f77a934f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,11 @@ - Move ValidateConfig from configcheck to configtest (#3956) - Remove AttributeMessageType (#4020) -- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005). -- Remove `AttributeHTTPStatusText` const, replaced with `"http.status_text"` (#4015, contrib/#5182). -- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063). -- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063). +- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005) +- Remove `AttributeHTTPStatusText` const (#4015) +- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063) +- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063) +- Split `service.Collector` from the `cobra.Command` (#4074) ## v0.35.0 Beta diff --git a/cmd/otelcol/main.go b/cmd/otelcol/main.go index 581876cb78f..2c38a1b9047 100644 --- a/cmd/otelcol/main.go +++ b/cmd/otelcol/main.go @@ -48,8 +48,8 @@ func runInteractive(settings service.CollectorSettings) error { return fmt.Errorf("failed to construct the collector server: %w", err) } - err = app.Run() - if err != nil { + cmd := service.NewCommand(app) + if err = cmd.Execute(); err != nil { return fmt.Errorf("collector server run finished with error: %w", err) } diff --git a/service/collector.go b/service/collector.go index 80672e65ed6..957f9fb3c97 100644 --- a/service/collector.go +++ b/service/collector.go @@ -19,14 +19,12 @@ package service import ( "context" "errors" - "flag" "fmt" "os" "os/signal" "runtime" "syscall" - "github.com/spf13/cobra" "go.opentelemetry.io/contrib/zpages" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" @@ -36,12 +34,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/config/configunmarshaler" "go.opentelemetry.io/collector/config/experimental/configsource" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/extension/ballastextension" - "go.opentelemetry.io/collector/internal/collector/telemetry" "go.opentelemetry.io/collector/service/internal" "go.opentelemetry.io/collector/service/internal/telemetrylogs" "go.opentelemetry.io/collector/service/parserprovider" @@ -59,20 +55,19 @@ const ( // (Internal note) Collector Lifecycle: // - New constructs a new Collector. -// - Run starts the collector and calls (*Collector).execute. -// - execute calls setupConfigurationComponents to handle configuration. +// - Run starts the collector. +// - Run calls setupConfigurationComponents to handle configuration. // If configuration parser fails, collector's config can be reloaded. // Collector can be shutdown if parser gets a shutdown error. -// - execute runs runAndWaitForShutdownEvent and waits for a shutdown event. +// - Run runs runAndWaitForShutdownEvent and waits for a shutdown event. // SIGINT and SIGTERM, errors, and (*Collector).Shutdown can trigger the shutdown events. // - Upon shutdown, pipelines are notified, then pipelines and extensions are shut down. -// - Users can call (*Collector).Shutdown anytime to shutdown the collector. +// - Users can call (*Collector).Shutdown anytime to shut down the collector. // Collector represents a server providing the OpenTelemetry Collector service. type Collector struct { - set CollectorSettings - rootCmd *cobra.Command - logger *zap.Logger + set CollectorSettings + logger *zap.Logger tracerProvider trace.TracerProvider meterProvider metric.MeterProvider @@ -107,45 +102,10 @@ func New(set CollectorSettings) (*Collector, error) { set.ConfigUnmarshaler = configunmarshaler.NewDefault() } - col := &Collector{ + return &Collector{ set: set, stateChannel: make(chan State, Closed+1), - } - - rootCmd := &cobra.Command{ - Use: set.BuildInfo.Command, - Version: set.BuildInfo.Version, - RunE: func(cmd *cobra.Command, args []string) error { - return col.execute(cmd.Context()) - }, - } - - // TODO: coalesce this code and expose this information to other components. - flagSet := new(flag.FlagSet) - addFlagsFns := []func(*flag.FlagSet){ - configtelemetry.Flags, - parserprovider.Flags, - telemetry.Flags, - telemetrylogs.Flags, - } - for _, addFlags := range addFlagsFns { - addFlags(flagSet) - } - rootCmd.Flags().AddGoFlagSet(flagSet) - col.rootCmd = rootCmd - - return col, nil -} - -// Run starts the collector according to the command and configuration -// given by the user, and waits for it to complete. -// Consecutive calls to Run are not allowed, Run shouldn't be called -// once a collector is shut down. -func (col *Collector) Run() error { - // From this point on do not show usage in case of error. - col.rootCmd.SilenceUsage = true - - return col.rootCmd.Execute() + }, nil } // GetStateChannel returns state channel of the collector server. @@ -153,11 +113,6 @@ func (col *Collector) GetStateChannel() chan State { return col.stateChannel } -// Command returns Collector's root command. -func (col *Collector) Command() *cobra.Command { - return col.rootCmd -} - // GetLogger returns logger used by the Collector. // The logger is initialized after collector server start. func (col *Collector) GetLogger() *zap.Logger { @@ -246,7 +201,9 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return nil } -func (col *Collector) execute(ctx context.Context) error { +// Run starts the collector according to the given configuration given, and waits for it to complete. +// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down. +func (col *Collector) Run(ctx context.Context) error { col.zPagesSpanProcessor = zpages.NewSpanProcessor() col.tracerProvider = sdktrace.NewTracerProvider( sdktrace.WithSampler(internal.AlwaysRecord()), diff --git a/service/collector_test.go b/service/collector_test.go index 1a9b9d877a2..0fc062350af 100644 --- a/service/collector_test.go +++ b/service/collector_test.go @@ -58,11 +58,11 @@ func TestCollector_Start(t *testing.T) { LoggingOptions: []zap.Option{zap.Hooks(hook)}, }) require.NoError(t, err) - assert.Equal(t, col.rootCmd, col.Command()) const testPrefix = "a_test" metricsPort := testutil.GetAvailablePort(t) - col.rootCmd.SetArgs([]string{ + cmd := NewCommand(col) + cmd.SetArgs([]string{ "--config=testdata/otelcol-config.yaml", "--metrics-addr=localhost:" + strconv.FormatUint(uint64(metricsPort), 10), "--metrics-prefix=" + testPrefix, @@ -71,7 +71,7 @@ func TestCollector_Start(t *testing.T) { colDone := make(chan struct{}) go func() { defer close(colDone) - assert.NoError(t, col.Run()) + assert.NoError(t, cmd.Execute()) }() assert.Equal(t, Starting, <-col.GetStateChannel()) @@ -123,12 +123,13 @@ func TestCollector_ReportError(t *testing.T) { col, err := New(CollectorSettings{BuildInfo: component.DefaultBuildInfo(), Factories: factories}) require.NoError(t, err) - col.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"}) + cmd := NewCommand(col) + cmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"}) colDone := make(chan struct{}) go func() { defer close(colDone) - assert.EqualError(t, col.Run(), "failed to shutdown collector telemetry: err1") + assert.EqualError(t, cmd.Execute(), "failed to shutdown collector telemetry: err1") }() assert.Equal(t, Starting, <-col.GetStateChannel()) @@ -154,7 +155,7 @@ func TestCollector_StartAsGoRoutine(t *testing.T) { colDone := make(chan struct{}) go func() { defer close(colDone) - colErr := col.Run() + colErr := col.Run(context.Background()) if colErr != nil { err = colErr } diff --git a/service/collector_windows.go b/service/collector_windows.go index 814a453ad6a..372bf42e2d2 100644 --- a/service/collector_windows.go +++ b/service/collector_windows.go @@ -87,9 +87,12 @@ func (s *WindowsService) start(elog *eventlog.Log, colErrorChannel chan error) e return err } - // col.Start blocks until receiving a SIGTERM signal, so needs to be started + // col.Run blocks until receiving a SIGTERM signal, so needs to be started // asynchronously, but it will exit early if an error occurs on startup - go func() { colErrorChannel <- s.col.Run() }() + go func() { + cmd := NewCommand(s.col) + colErrorChannel <- cmd.Execute() + }() // wait until the collector server is in the Running state go func() { diff --git a/service/command.go b/service/command.go new file mode 100644 index 00000000000..91e64244ea1 --- /dev/null +++ b/service/command.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "flag" + + "github.com/spf13/cobra" + + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/internal/collector/telemetry" + "go.opentelemetry.io/collector/service/internal/telemetrylogs" + "go.opentelemetry.io/collector/service/parserprovider" +) + +// NewCommand constructs a new cobra.Command using the given Collector. +// TODO: Make this independent of the collector internals. +func NewCommand(col *Collector) *cobra.Command { + rootCmd := &cobra.Command{ + Use: col.set.BuildInfo.Command, + Version: col.set.BuildInfo.Version, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + return col.Run(cmd.Context()) + }, + } + + // TODO: coalesce this code and expose this information to other components. + flagSet := new(flag.FlagSet) + addFlagsFns := []func(*flag.FlagSet){ + configtelemetry.Flags, + parserprovider.Flags, + telemetry.Flags, + telemetrylogs.Flags, + } + for _, addFlags := range addFlagsFns { + addFlags(flagSet) + } + + rootCmd.Flags().AddGoFlagSet(flagSet) + return rootCmd +}