Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Collector from the cobra.Command. #4074

Merged
merged 1 commit into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

- Move ValidateConfig from configcheck to configtest (#3956)
- Remove AttributeMessageType (#4020)
- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005).
- Remove `AttributeHTTPStatusText` const, replaced with `"http.status_text"` (#4015, contrib/#5182).
- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063).
- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063).
- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005)
- Remove `AttributeHTTPStatusText` const (#4015)
- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063)
- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063)
- Split `service.Collector` from the `cobra.Command` (#4074)

## v0.35.0 Beta

Expand Down
4 changes: 2 additions & 2 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func runInteractive(settings service.CollectorSettings) error {
return fmt.Errorf("failed to construct the collector server: %w", err)
}

err = app.Run()
if err != nil {
cmd := service.NewCommand(app)
if err = cmd.Execute(); err != nil {
return fmt.Errorf("collector server run finished with error: %w", err)
}

Expand Down
65 changes: 11 additions & 54 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package service
import (
"context"
"errors"
"flag"
"fmt"
"os"
"os/signal"
"runtime"
"syscall"

"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/zpages"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
Expand All @@ -36,12 +34,10 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/parserprovider"
Expand All @@ -59,20 +55,19 @@ const (

// (Internal note) Collector Lifecycle:
// - New constructs a new Collector.
// - Run starts the collector and calls (*Collector).execute.
// - execute calls setupConfigurationComponents to handle configuration.
// - Run starts the collector.
// - Run calls setupConfigurationComponents to handle configuration.
// If configuration parser fails, collector's config can be reloaded.
// Collector can be shutdown if parser gets a shutdown error.
// - execute runs runAndWaitForShutdownEvent and waits for a shutdown event.
// - Run runs runAndWaitForShutdownEvent and waits for a shutdown event.
// SIGINT and SIGTERM, errors, and (*Collector).Shutdown can trigger the shutdown events.
// - Upon shutdown, pipelines are notified, then pipelines and extensions are shut down.
// - Users can call (*Collector).Shutdown anytime to shutdown the collector.
// - Users can call (*Collector).Shutdown anytime to shut down the collector.

// Collector represents a server providing the OpenTelemetry Collector service.
type Collector struct {
set CollectorSettings
rootCmd *cobra.Command
logger *zap.Logger
set CollectorSettings
logger *zap.Logger

tracerProvider trace.TracerProvider
meterProvider metric.MeterProvider
Expand Down Expand Up @@ -107,57 +102,17 @@ func New(set CollectorSettings) (*Collector, error) {
set.ConfigUnmarshaler = configunmarshaler.NewDefault()
}

col := &Collector{
return &Collector{
set: set,
stateChannel: make(chan State, Closed+1),
}

rootCmd := &cobra.Command{
Use: set.BuildInfo.Command,
Version: set.BuildInfo.Version,
RunE: func(cmd *cobra.Command, args []string) error {
return col.execute(cmd.Context())
},
}

// TODO: coalesce this code and expose this information to other components.
flagSet := new(flag.FlagSet)
addFlagsFns := []func(*flag.FlagSet){
configtelemetry.Flags,
parserprovider.Flags,
telemetry.Flags,
telemetrylogs.Flags,
}
for _, addFlags := range addFlagsFns {
addFlags(flagSet)
}
rootCmd.Flags().AddGoFlagSet(flagSet)
col.rootCmd = rootCmd

return col, nil
}

// Run starts the collector according to the command and configuration
// given by the user, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called
// once a collector is shut down.
func (col *Collector) Run() error {
// From this point on do not show usage in case of error.
col.rootCmd.SilenceUsage = true

return col.rootCmd.Execute()
}, nil
}

// GetStateChannel returns state channel of the collector server.
func (col *Collector) GetStateChannel() chan State {
return col.stateChannel
}

// Command returns Collector's root command.
func (col *Collector) Command() *cobra.Command {
return col.rootCmd
}

// GetLogger returns logger used by the Collector.
// The logger is initialized after collector server start.
func (col *Collector) GetLogger() *zap.Logger {
Expand Down Expand Up @@ -246,7 +201,9 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return nil
}

func (col *Collector) execute(ctx context.Context) error {
// Run starts the collector according to the given configuration given, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
func (col *Collector) Run(ctx context.Context) error {
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
col.zPagesSpanProcessor = zpages.NewSpanProcessor()
col.tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(internal.AlwaysRecord()),
Expand Down
13 changes: 7 additions & 6 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func TestCollector_Start(t *testing.T) {
LoggingOptions: []zap.Option{zap.Hooks(hook)},
})
require.NoError(t, err)
assert.Equal(t, col.rootCmd, col.Command())

const testPrefix = "a_test"
metricsPort := testutil.GetAvailablePort(t)
col.rootCmd.SetArgs([]string{
cmd := NewCommand(col)
cmd.SetArgs([]string{
"--config=testdata/otelcol-config.yaml",
"--metrics-addr=localhost:" + strconv.FormatUint(uint64(metricsPort), 10),
"--metrics-prefix=" + testPrefix,
Expand All @@ -71,7 +71,7 @@ func TestCollector_Start(t *testing.T) {
colDone := make(chan struct{})
go func() {
defer close(colDone)
assert.NoError(t, col.Run())
assert.NoError(t, cmd.Execute())
}()

assert.Equal(t, Starting, <-col.GetStateChannel())
Expand Down Expand Up @@ -123,12 +123,13 @@ func TestCollector_ReportError(t *testing.T) {
col, err := New(CollectorSettings{BuildInfo: component.DefaultBuildInfo(), Factories: factories})
require.NoError(t, err)

col.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})
cmd := NewCommand(col)
cmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})

colDone := make(chan struct{})
go func() {
defer close(colDone)
assert.EqualError(t, col.Run(), "failed to shutdown collector telemetry: err1")
assert.EqualError(t, cmd.Execute(), "failed to shutdown collector telemetry: err1")
}()

assert.Equal(t, Starting, <-col.GetStateChannel())
Expand All @@ -154,7 +155,7 @@ func TestCollector_StartAsGoRoutine(t *testing.T) {
colDone := make(chan struct{})
go func() {
defer close(colDone)
colErr := col.Run()
colErr := col.Run(context.Background())
if colErr != nil {
err = colErr
}
Expand Down
7 changes: 5 additions & 2 deletions service/collector_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ func (s *WindowsService) start(elog *eventlog.Log, colErrorChannel chan error) e
return err
}

// col.Start blocks until receiving a SIGTERM signal, so needs to be started
// col.Run blocks until receiving a SIGTERM signal, so needs to be started
// asynchronously, but it will exit early if an error occurs on startup
go func() { colErrorChannel <- s.col.Run() }()
go func() {
cmd := NewCommand(s.col)
colErrorChannel <- cmd.Execute()
}()

// wait until the collector server is in the Running state
go func() {
Expand Down
54 changes: 54 additions & 0 deletions service/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service

import (
"flag"

"github.com/spf13/cobra"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/parserprovider"
)

// NewCommand constructs a new cobra.Command using the given Collector.
// TODO: Make this independent of the collector internals.
func NewCommand(col *Collector) *cobra.Command {
rootCmd := &cobra.Command{
Use: col.set.BuildInfo.Command,
Version: col.set.BuildInfo.Version,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
return col.Run(cmd.Context())
},
}

// TODO: coalesce this code and expose this information to other components.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still needed? If so is there an issue open for it?

flagSet := new(flag.FlagSet)
addFlagsFns := []func(*flag.FlagSet){
configtelemetry.Flags,
parserprovider.Flags,
telemetry.Flags,
telemetrylogs.Flags,
}
for _, addFlags := range addFlagsFns {
addFlags(flagSet)
}

rootCmd.Flags().AddGoFlagSet(flagSet)
return rootCmd
}