Skip to content

Commit

Permalink
Split Collector from the cobra.Command.
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Sep 20, 2021
1 parent 1f5dd9f commit 124b27a
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 68 deletions.
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

- Move ValidateConfig from configcheck to configtest (#3956)
- Remove AttributeMessageType (#4020)
- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005).
- Remove `AttributeHTTPStatusText` const, replaced with `"http.status_text"` (#4015, contrib/#5182).
- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063).
- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063).
- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005)
- Remove `AttributeHTTPStatusText` const (#4015)
- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063)
- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063)
- Split `service.Collector` from the `cobra.Command` (#4074)

## v0.35.0 Beta

Expand Down
4 changes: 2 additions & 2 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func runInteractive(settings service.CollectorSettings) error {
return fmt.Errorf("failed to construct the collector server: %w", err)
}

err = app.Run()
if err != nil {
cmd := service.NewCommand(app)
if err = cmd.Execute(); err != nil {
return fmt.Errorf("collector server run finished with error: %w", err)
}

Expand Down
65 changes: 11 additions & 54 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package service
import (
"context"
"errors"
"flag"
"fmt"
"os"
"os/signal"
"runtime"
"syscall"

"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/zpages"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
Expand All @@ -36,12 +34,10 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/parserprovider"
Expand All @@ -59,20 +55,19 @@ const (

// (Internal note) Collector Lifecycle:
// - New constructs a new Collector.
// - Run starts the collector and calls (*Collector).execute.
// - execute calls setupConfigurationComponents to handle configuration.
// - Run starts the collector.
// - Run calls setupConfigurationComponents to handle configuration.
// If configuration parser fails, collector's config can be reloaded.
// Collector can be shutdown if parser gets a shutdown error.
// - execute runs runAndWaitForShutdownEvent and waits for a shutdown event.
// - Run runs runAndWaitForShutdownEvent and waits for a shutdown event.
// SIGINT and SIGTERM, errors, and (*Collector).Shutdown can trigger the shutdown events.
// - Upon shutdown, pipelines are notified, then pipelines and extensions are shut down.
// - Users can call (*Collector).Shutdown anytime to shutdown the collector.
// - Users can call (*Collector).Shutdown anytime to shut down the collector.

// Collector represents a server providing the OpenTelemetry Collector service.
type Collector struct {
set CollectorSettings
rootCmd *cobra.Command
logger *zap.Logger
set CollectorSettings
logger *zap.Logger

tracerProvider trace.TracerProvider
meterProvider metric.MeterProvider
Expand Down Expand Up @@ -107,57 +102,17 @@ func New(set CollectorSettings) (*Collector, error) {
set.ConfigUnmarshaler = configunmarshaler.NewDefault()
}

col := &Collector{
return &Collector{
set: set,
stateChannel: make(chan State, Closed+1),
}

rootCmd := &cobra.Command{
Use: set.BuildInfo.Command,
Version: set.BuildInfo.Version,
RunE: func(cmd *cobra.Command, args []string) error {
return col.execute(cmd.Context())
},
}

// TODO: coalesce this code and expose this information to other components.
flagSet := new(flag.FlagSet)
addFlagsFns := []func(*flag.FlagSet){
configtelemetry.Flags,
parserprovider.Flags,
telemetry.Flags,
telemetrylogs.Flags,
}
for _, addFlags := range addFlagsFns {
addFlags(flagSet)
}
rootCmd.Flags().AddGoFlagSet(flagSet)
col.rootCmd = rootCmd

return col, nil
}

// Run starts the collector according to the command and configuration
// given by the user, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called
// once a collector is shut down.
func (col *Collector) Run() error {
// From this point on do not show usage in case of error.
col.rootCmd.SilenceUsage = true

return col.rootCmd.Execute()
}, nil
}

// GetStateChannel returns state channel of the collector server.
func (col *Collector) GetStateChannel() chan State {
return col.stateChannel
}

// Command returns Collector's root command.
func (col *Collector) Command() *cobra.Command {
return col.rootCmd
}

// GetLogger returns logger used by the Collector.
// The logger is initialized after collector server start.
func (col *Collector) GetLogger() *zap.Logger {
Expand Down Expand Up @@ -246,7 +201,9 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return nil
}

func (col *Collector) execute(ctx context.Context) error {
// Run starts the collector according to the given configuration given, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
func (col *Collector) Run(ctx context.Context) error {
col.zPagesSpanProcessor = zpages.NewSpanProcessor()
col.tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(internal.AlwaysRecord()),
Expand Down
13 changes: 7 additions & 6 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func TestCollector_Start(t *testing.T) {
LoggingOptions: []zap.Option{zap.Hooks(hook)},
})
require.NoError(t, err)
assert.Equal(t, col.rootCmd, col.Command())

const testPrefix = "a_test"
metricsPort := testutil.GetAvailablePort(t)
col.rootCmd.SetArgs([]string{
cmd := NewCommand(col)
cmd.SetArgs([]string{
"--config=testdata/otelcol-config.yaml",
"--metrics-addr=localhost:" + strconv.FormatUint(uint64(metricsPort), 10),
"--metrics-prefix=" + testPrefix,
Expand All @@ -71,7 +71,7 @@ func TestCollector_Start(t *testing.T) {
colDone := make(chan struct{})
go func() {
defer close(colDone)
assert.NoError(t, col.Run())
assert.NoError(t, cmd.Execute())
}()

assert.Equal(t, Starting, <-col.GetStateChannel())
Expand Down Expand Up @@ -123,12 +123,13 @@ func TestCollector_ReportError(t *testing.T) {
col, err := New(CollectorSettings{BuildInfo: component.DefaultBuildInfo(), Factories: factories})
require.NoError(t, err)

col.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})
cmd := NewCommand(col)
cmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})

colDone := make(chan struct{})
go func() {
defer close(colDone)
assert.EqualError(t, col.Run(), "failed to shutdown collector telemetry: err1")
assert.EqualError(t, cmd.Execute(), "failed to shutdown collector telemetry: err1")
}()

assert.Equal(t, Starting, <-col.GetStateChannel())
Expand All @@ -154,7 +155,7 @@ func TestCollector_StartAsGoRoutine(t *testing.T) {
colDone := make(chan struct{})
go func() {
defer close(colDone)
colErr := col.Run()
colErr := col.Run(context.Background())
if colErr != nil {
err = colErr
}
Expand Down
7 changes: 5 additions & 2 deletions service/collector_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ func (s *WindowsService) start(elog *eventlog.Log, colErrorChannel chan error) e
return err
}

// col.Start blocks until receiving a SIGTERM signal, so needs to be started
// col.Run blocks until receiving a SIGTERM signal, so needs to be started
// asynchronously, but it will exit early if an error occurs on startup
go func() { colErrorChannel <- s.col.Run() }()
go func() {
cmd := NewCommand(s.col)
colErrorChannel <- cmd.Execute()
}()

// wait until the collector server is in the Running state
go func() {
Expand Down
54 changes: 54 additions & 0 deletions service/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service

import (
"flag"

"github.com/spf13/cobra"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/parserprovider"
)

// NewCommand constructs a new cobra.Command using the given Collector.
// TODO: Make this independent of the collector internals.
func NewCommand(col *Collector) *cobra.Command {
rootCmd := &cobra.Command{
Use: col.set.BuildInfo.Command,
Version: col.set.BuildInfo.Version,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
return col.Run(cmd.Context())
},
}

// TODO: coalesce this code and expose this information to other components.
flagSet := new(flag.FlagSet)
addFlagsFns := []func(*flag.FlagSet){
configtelemetry.Flags,
parserprovider.Flags,
telemetry.Flags,
telemetrylogs.Flags,
}
for _, addFlags := range addFlagsFns {
addFlags(flagSet)
}

rootCmd.Flags().AddGoFlagSet(flagSet)
return rootCmd
}

0 comments on commit 124b27a

Please sign in to comment.