diff --git a/config/config.go b/config/config.go index 3c2f7ab59cb..06cdc58baa2 100644 --- a/config/config.go +++ b/config/config.go @@ -54,6 +54,11 @@ const ( errUnmarshalTopLevelStructureError ) +const ( + // ViperDelimiter is used as the default key delimiter in the default viper instance + ViperDelimiter = "::" +) + type configError struct { msg string // human readable error message. code configErrorCode // internal error code. @@ -114,7 +119,7 @@ const typeAndNameSeparator = "/" // Creates a new Viper instance with a different key-delimitor "::" instead of the // default ".". This way configs can have keys that contain ".". func NewViper() *viper.Viper { - return viper.NewWithOptions(viper.KeyDelimiter("::")) + return viper.NewWithOptions(viper.KeyDelimiter(ViperDelimiter)) } // Load loads a Config from Viper. diff --git a/go.mod b/go.mod index d1d3d863303..27d57817ffe 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/soheilhy/cmux v0.1.4 github.com/spf13/cast v1.3.1 github.com/spf13/cobra v1.1.1 + github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 github.com/tinylib/msgp v1.1.5 diff --git a/service/service.go b/service/service.go index 8b94e1f8720..99b3a305741 100644 --- a/service/service.go +++ b/service/service.go @@ -104,26 +104,37 @@ type Parameters struct { ApplicationStartInfo component.ApplicationStartInfo // ConfigFactory that creates the configuration. // If it is not provided the default factory (FileLoaderConfigFactory) is used. - // The default factory loads the configuration specified as a command line flag. + // The default factory loads the configuration file and overrides component's configuration + // properties supplied via --set command line flag. ConfigFactory ConfigFactory // LoggingOptions provides a way to change behavior of zap logging. LoggingOptions []zap.Option } // ConfigFactory creates config. -type ConfigFactory func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) - -// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file. -func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { +// The ConfigFactory implementation should call AddSetFlagProperties to enable configuration passed via `--set` flag. +// Viper and command instances are passed from the Application. +// The factories also belong to the Application and are equal to the factories passed via Parameters. +type ConfigFactory func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) + +// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file +// and from --set command line flag (if the flag is present). +func FileLoaderConfigFactory(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) { file := builder.GetConfigFile() if file == "" { return nil, errors.New("config file not specified") } + // first load the config file v.SetConfigFile(file) err := v.ReadInConfig() if err != nil { return nil, fmt.Errorf("error loading config file %q: %v", file, err) } + + // next overlay the config file with --set flags + if err := AddSetFlagProperties(v, cmd); err != nil { + return nil, fmt.Errorf("failed to process set flag: %v", err) + } return config.Load(v, factories) } @@ -172,6 +183,7 @@ func New(params Parameters) (*Application, error) { addFlags(flagSet) } rootCmd.Flags().AddGoFlagSet(flagSet) + addSetFlag(rootCmd.Flags()) app.rootCmd = rootCmd @@ -276,7 +288,7 @@ func (app *Application) setupConfigurationComponents(ctx context.Context, factor } app.logger.Info("Loading configuration...") - cfg, err := factory(app.v, app.factories) + cfg, err := factory(app.v, app.rootCmd, app.factories) if err != nil { return fmt.Errorf("cannot load configuration: %w", err) } diff --git a/service/service_test.go b/service/service_test.go index bee323bd618..d3d1b181e59 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -19,6 +19,7 @@ import ( "bufio" "context" "errors" + "flag" "fmt" "net/http" "sort" @@ -26,8 +27,10 @@ import ( "strings" "syscall" "testing" + "time" "github.com/prometheus/common/expfmt" + "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,6 +41,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/processor/attributesprocessor" + "go.opentelemetry.io/collector/processor/batchprocessor" + "go.opentelemetry.io/collector/receiver/jaegerreceiver" + "go.opentelemetry.io/collector/service/builder" "go.opentelemetry.io/collector/service/defaultcomponents" "go.opentelemetry.io/collector/testutil" ) @@ -137,7 +144,7 @@ func TestApplication_StartAsGoRoutine(t *testing.T) { params := Parameters{ ApplicationStartInfo: componenttest.TestApplicationStartInfo(), - ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { + ConfigFactory: func(_ *viper.Viper, _ *cobra.Command, factories component.Factories) (*configmodels.Config, error) { return constructMimumalOpConfig(t, factories), nil }, Factories: factories, @@ -412,7 +419,7 @@ func createExampleApplication(t *testing.T) *Application { app, err := New(Parameters{ Factories: factories, - ConfigFactory: func(v *viper.Viper, factories component.Factories) (c *configmodels.Config, err error) { + ConfigFactory: func(_ *viper.Viper, _ *cobra.Command, factories component.Factories) (c *configmodels.Config, err error) { config := &configmodels.Config{ Receivers: map[string]configmodels.Receiver{ string(exampleReceiverFactory.Type()): exampleReceiverFactory.CreateDefaultConfig(), @@ -511,6 +518,105 @@ func TestApplication_GetExporters(t *testing.T) { <-appDone } +func TestSetFlag(t *testing.T) { + factories, err := defaultcomponents.Components() + require.NoError(t, err) + params := Parameters{ + Factories: factories, + } + t.Run("unknown_component", func(t *testing.T) { + app, err := New(params) + require.NoError(t, err) + err = app.rootCmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.doesnotexist.timeout=2s", + }) + require.NoError(t, err) + cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories) + require.Error(t, err) + require.Nil(t, cfg) + + }) + t.Run("component_not_added_to_pipeline", func(t *testing.T) { + app, err := New(params) + require.NoError(t, err) + err = app.rootCmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.batch/foo.timeout=2s", + }) + require.NoError(t, err) + cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories) + require.NoError(t, err) + assert.NotNil(t, cfg) + err = config.ValidateConfig(cfg, zap.NewNop()) + require.NoError(t, err) + + var processors []string + for k := range cfg.Processors { + processors = append(processors, k) + } + sort.Strings(processors) + // batch/foo is not added to the pipeline + assert.Equal(t, []string{"attributes", "batch", "batch/foo", "queued_retry"}, processors) + assert.Equal(t, []string{"attributes", "batch", "queued_retry"}, cfg.Service.Pipelines["traces"].Processors) + }) + t.Run("ok", func(t *testing.T) { + app, err := New(params) + require.NoError(t, err) + + err = app.rootCmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.batch.timeout=2s", + // Arrays are overridden and object arrays cannot be indexed + // this creates actions array of size 1 + "--set=processors.attributes.actions.key=foo", + "--set=processors.attributes.actions.value=bar", + "--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345", + "--set=extensions.health_check.port=8080", + }) + require.NoError(t, err) + cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories) + require.NoError(t, err) + require.NotNil(t, cfg) + err = config.ValidateConfig(cfg, zap.NewNop()) + require.NoError(t, err) + + assert.Equal(t, 3, len(cfg.Processors)) + batch := cfg.Processors["batch"].(*batchprocessor.Config) + assert.Equal(t, time.Second*2, batch.Timeout) + jaeger := cfg.Receivers["jaeger"].(*jaegerreceiver.Config) + assert.Equal(t, "localhost:12345", jaeger.GRPC.NetAddr.Endpoint) + attributes := cfg.Processors["attributes"].(*attributesprocessor.Config) + require.Equal(t, 1, len(attributes.Actions)) + assert.Equal(t, "foo", attributes.Actions[0].Key) + assert.Equal(t, "bar", attributes.Actions[0].Value) + }) +} + +func TestSetFlag_component_does_not_exist(t *testing.T) { + factories, err := defaultcomponents.Components() + require.NoError(t, err) + + v := config.NewViper() + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + fs := &flag.FlagSet{} + builder.Flags(fs) + cmd.Flags().AddGoFlagSet(fs) + cmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.batch.timeout=2s", + // Arrays are overridden and object arrays cannot be indexed + // this creates actions array of size 1 + "--set=processors.attributes.actions.key=foo", + "--set=processors.attributes.actions.value=bar", + "--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345", + }) + cfg, err := FileLoaderConfigFactory(v, cmd, factories) + require.NoError(t, err) + require.NotNil(t, cfg) +} + func constructMimumalOpConfig(t *testing.T, factories component.Factories) *configmodels.Config { configStr := ` receivers: diff --git a/service/set_flag.go b/service/set_flag.go new file mode 100644 index 00000000000..61c155e3efa --- /dev/null +++ b/service/set_flag.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "bytes" + "fmt" + "strings" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "go.opentelemetry.io/collector/config" +) + +const ( + setFlagName = "set" + setFlagFileType = "properties" +) + +func addSetFlag(flagSet *pflag.FlagSet) { + flagSet.StringArray(setFlagName, []string{}, "Set arbitrary component config property. The component has to be defined in the config file and the flag has a higher precedence. Array config properties are overridden and maps are joined, note that only a single (first) array property can be set e.g. -set=processors.attributes.actions.key=some_key. Example --set=processors.batch.timeout=2s") +} + +// AddSetFlagProperties overrides properties from set flag(s) in supplied viper instance. +// The implementation reads set flag(s) from the cmd and passes the content to a new viper instance as .properties file. +// Then the properties from new viper instance are read and set to the supplied viper. +func AddSetFlagProperties(v *viper.Viper, cmd *cobra.Command) error { + flagProperties, err := cmd.Flags().GetStringArray(setFlagName) + if err != nil { + return err + } + if len(flagProperties) == 0 { + return nil + } + b := &bytes.Buffer{} + for _, property := range flagProperties { + property = strings.TrimSpace(property) + if _, err := fmt.Fprintf(b, "%s\n", property); err != nil { + return err + } + } + viperFlags := config.NewViper() + viperFlags.SetConfigType(setFlagFileType) + if err := viperFlags.ReadConfig(b); err != nil { + return fmt.Errorf("failed to read set flag config: %v", err) + } + + // Viper implementation of v.MergeConfig(io.Reader) or v.MergeConfigMap(map[string]interface) + // does not work properly. This is b/c if it attempts to merge into a nil object it will fail here + // https://github.com/spf13/viper/blob/3826be313591f83193f048520482a7b3cf17d506/viper.go#L1709 + + // The workaround is to call v.Set(string, interface) on all root properties from the config file + // this will correctly preserve the original config and set them up for viper to overlay them with + // the --set params. It should also be noted that setting the root keys is important. This is + // b/c the viper .AllKeys() method does not return empty objects. + // For instance with the following yaml structure: + // a: + // b: + // c: {} + // + // viper.AllKeys() would only return a.b, but not a.c. However otel expects {} to behave + // the same as nil object in its config file. Therefore we extract and set the root keys only + // to catch both a.b and a.c. + + rootKeys := map[string]struct{}{} + for _, k := range viperFlags.AllKeys() { + keys := strings.Split(k, config.ViperDelimiter) + if len(keys) > 0 { + rootKeys[keys[0]] = struct{}{} + } + } + + for k := range rootKeys { + v.Set(k, v.Get(k)) + } + + // now that we've copied the config into the viper "overrides" copy the --set flags + // as well + for _, k := range viperFlags.AllKeys() { + v.Set(k, viperFlags.Get(k)) + } + + return nil +} diff --git a/service/set_flag_test.go b/service/set_flag_test.go new file mode 100644 index 00000000000..f6f0c63ffcc --- /dev/null +++ b/service/set_flag_test.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "testing" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSetFlags(t *testing.T) { + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + + err := cmd.ParseFlags([]string{ + "--set=processors.batch.timeout=2s", + "--set=processors.batch/foo.timeout=3s", + "--set=receivers.otlp.protocols.grpc.endpoint=localhost:1818", + "--set=exporters.kafka.brokers=foo:9200,foo2:9200", + }) + require.NoError(t, err) + + v := viper.New() + err = AddSetFlagProperties(v, cmd) + require.NoError(t, err) + + settings := v.AllSettings() + assert.Equal(t, 4, len(settings)) + assert.Equal(t, "2s", v.Get("processors::batch::timeout")) + assert.Equal(t, "3s", v.Get("processors::batch/foo::timeout")) + assert.Equal(t, "foo:9200,foo2:9200", v.Get("exporters::kafka::brokers")) + assert.Equal(t, "localhost:1818", v.Get("receivers::otlp::protocols::grpc::endpoint")) +} + +func TestSetFlags_err_set_flag(t *testing.T) { + cmd := &cobra.Command{} + v := viper.New() + err := AddSetFlagProperties(v, cmd) + require.Error(t, err) +} + +func TestSetFlags_empty(t *testing.T) { + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + v := viper.New() + err := AddSetFlagProperties(v, cmd) + require.NoError(t, err) + assert.Equal(t, 0, len(v.AllSettings())) +} diff --git a/service/testdata/otelcol-config.yaml b/service/testdata/otelcol-config.yaml index 15c67a190c8..ac3bd1fdcb1 100644 --- a/service/testdata/otelcol-config.yaml +++ b/service/testdata/otelcol-config.yaml @@ -17,7 +17,7 @@ processors: batch: extensions: - health_check: + health_check: {} pprof: zpages: diff --git a/testbed/testbed/otelcol_runner.go b/testbed/testbed/otelcol_runner.go index 7ba6966beac..56fafff5211 100644 --- a/testbed/testbed/otelcol_runner.go +++ b/testbed/testbed/otelcol_runner.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/shirou/gopsutil/process" + "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -104,7 +105,7 @@ func (ipp *InProcessCollector) Start(args StartParams) error { Version: version.Version, GitHash: version.GitHash, }, - ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { + ConfigFactory: func(_ *viper.Viper, _ *cobra.Command, _ component.Factories) (*configmodels.Config, error) { return ipp.config, nil }, Factories: ipp.factories,