Skip to content

Commit

Permalink
Add ConfigUnmarshaler interface to allow mutations on the parsed Conf…
Browse files Browse the repository at this point in the history
…ig (open-telemetry#3706)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Aug 16, 2021
1 parent ffda9d5 commit cce6f70
Show file tree
Hide file tree
Showing 51 changed files with 104 additions and 56 deletions.
6 changes: 3 additions & 3 deletions config/configtest/configtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package configtest
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configloader"
"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/config/configunmarshaler"
)

// LoadConfig loads a config from file, and does NOT validate the configuration.
Expand All @@ -28,8 +28,8 @@ func LoadConfig(fileName string, factories component.Factories) (*config.Config,
if err != nil {
return nil, err
}
// Load the config using the given factories.
return configloader.Load(cp, factories)
// Unmarshal the config using the given factories.
return configunmarshaler.NewDefault().Unmarshal(cp, factories)
}

// LoadConfigAndValidate loads a config from the file, and validates the configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package configloader
package configunmarshaler

import (
"fmt"
Expand All @@ -26,8 +26,8 @@ import (
"go.opentelemetry.io/collector/config/configparser"
)

// These are errors that can be returned by Load(). Note that error codes are not part
// of Load()'s public API, they are for internal unit testing only.
// These are errors that can be returned by Unmarshal(). Note that error codes are not part
// of Unmarshal()'s public API, they are for internal unit testing only.
type configErrorCode int

const (
Expand Down Expand Up @@ -89,12 +89,20 @@ type pipelineSettings struct {
Exporters []string `mapstructure:"exporters"`
}

// Load loads a Config from Parser.
// After loading the config, `Validate()` must be called to validate.
func Load(v *configparser.Parser, factories component.Factories) (*config.Config, error) {
type defaultUnmarshaler struct{}

// NewDefault returns a default ConfigUnmarshaler that unmarshalls every configuration
// using the custom unmarshaler if present or default to strict
func NewDefault() ConfigUnmarshaler {
return &defaultUnmarshaler{}
}

// Unmarshal the Config from a Parser.
// After the config is unmarshalled, `Validate()` must be called to validate.
func (*defaultUnmarshaler) Unmarshal(v *configparser.Parser, factories component.Factories) (*config.Config, error) {
var cfg config.Config

// Load the config.
// Unmarshal the config.

// Struct to validate top level sections.
var rawCfg configSettings
Expand All @@ -111,34 +119,34 @@ func Load(v *configparser.Parser, factories component.Factories) (*config.Config

// Start with the service extensions.

extensions, err := loadExtensions(cast.ToStringMap(v.Get(extensionsKeyName)), factories.Extensions)
extensions, err := unmarshalExtensions(cast.ToStringMap(v.Get(extensionsKeyName)), factories.Extensions)
if err != nil {
return nil, err
}
cfg.Extensions = extensions

// Load data components (receivers, exporters, and processors).
// Unmarshal data components (receivers, exporters, and processors).

receivers, err := loadReceivers(cast.ToStringMap(v.Get(receiversKeyName)), factories.Receivers)
receivers, err := unmarshalReceivers(cast.ToStringMap(v.Get(receiversKeyName)), factories.Receivers)
if err != nil {
return nil, err
}
cfg.Receivers = receivers

exporters, err := loadExporters(cast.ToStringMap(v.Get(exportersKeyName)), factories.Exporters)
exporters, err := unmarshalExporters(cast.ToStringMap(v.Get(exportersKeyName)), factories.Exporters)
if err != nil {
return nil, err
}
cfg.Exporters = exporters

processors, err := loadProcessors(cast.ToStringMap(v.Get(processorsKeyName)), factories.Processors)
processors, err := unmarshalProcessors(cast.ToStringMap(v.Get(processorsKeyName)), factories.Processors)
if err != nil {
return nil, err
}
cfg.Processors = processors

// Load the service and its data pipelines.
service, err := loadService(rawCfg.Service)
// Unmarshal the service and its data pipelines.
service, err := unmarshalService(rawCfg.Service)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -175,7 +183,7 @@ func errorDuplicateName(component string, id config.ComponentID) error {
}
}

func loadExtensions(exts map[string]interface{}, factories map[config.Type]component.ExtensionFactory) (config.Extensions, error) {
func unmarshalExtensions(exts map[string]interface{}, factories map[config.Type]component.ExtensionFactory) (config.Extensions, error) {
// Prepare resulting map.
extensions := make(config.Extensions)

Expand Down Expand Up @@ -217,7 +225,7 @@ func loadExtensions(exts map[string]interface{}, factories map[config.Type]compo
return extensions, nil
}

func loadService(rawService serviceSettings) (config.Service, error) {
func unmarshalService(rawService serviceSettings) (config.Service, error) {
var ret config.Service
ret.Extensions = make([]config.ComponentID, 0, len(rawService.Extensions))
for _, extIDStr := range rawService.Extensions {
Expand All @@ -230,7 +238,7 @@ func loadService(rawService serviceSettings) (config.Service, error) {

// Process the pipelines first so in case of error on them it can be properly
// reported.
pipelines, err := loadPipelines(rawService.Pipelines)
pipelines, err := unmarshalPipelines(rawService.Pipelines)
ret.Pipelines = pipelines

return ret, err
Expand All @@ -252,7 +260,7 @@ func LoadReceiver(componentConfig *configparser.Parser, id config.ComponentID, f
return receiverCfg, nil
}

func loadReceivers(recvs map[string]interface{}, factories map[config.Type]component.ReceiverFactory) (config.Receivers, error) {
func unmarshalReceivers(recvs map[string]interface{}, factories map[config.Type]component.ReceiverFactory) (config.Receivers, error) {
// Prepare resulting map.
receivers := make(config.Receivers)

Expand Down Expand Up @@ -289,7 +297,7 @@ func loadReceivers(recvs map[string]interface{}, factories map[config.Type]compo
return receivers, nil
}

func loadExporters(exps map[string]interface{}, factories map[config.Type]component.ExporterFactory) (config.Exporters, error) {
func unmarshalExporters(exps map[string]interface{}, factories map[config.Type]component.ExporterFactory) (config.Exporters, error) {
// Prepare resulting map.
exporters := make(config.Exporters)

Expand Down Expand Up @@ -331,7 +339,7 @@ func loadExporters(exps map[string]interface{}, factories map[config.Type]compon
return exporters, nil
}

func loadProcessors(procs map[string]interface{}, factories map[config.Type]component.ProcessorFactory) (config.Processors, error) {
func unmarshalProcessors(procs map[string]interface{}, factories map[config.Type]component.ProcessorFactory) (config.Processors, error) {
// Prepare resulting map.
processors := make(config.Processors)

Expand Down Expand Up @@ -373,7 +381,7 @@ func loadProcessors(procs map[string]interface{}, factories map[config.Type]comp
return processors, nil
}

func loadPipelines(pipelinesConfig map[string]pipelineSettings) (config.Pipelines, error) {
func unmarshalPipelines(pipelinesConfig map[string]pipelineSettings) (config.Pipelines, error) {
// Prepare resulting map.
pipelines := make(config.Pipelines)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package configloader
package configunmarshaler

import (
"os"
Expand All @@ -33,7 +33,7 @@ func TestDecodeConfig(t *testing.T) {
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)

// Load the config
// Unmarshal the config
cfg, err := loadConfigFile(t, path.Join(".", "testdata", "valid-config.yaml"), factories)
require.NoError(t, err, "Unable to load config")

Expand Down Expand Up @@ -186,7 +186,7 @@ func TestSimpleConfig(t *testing.T) {
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)

// Load the config
// Unmarshal the config
cfg, err := loadConfigFile(t, path.Join(".", "testdata", test.name+".yaml"), factories)
require.NoError(t, err, "Unable to load config")

Expand Down Expand Up @@ -276,7 +276,7 @@ func TestEscapedEnvVars(t *testing.T) {
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)

// Load the config
// Unmarshal the config
cfg, err := loadConfigFile(t, path.Join(".", "testdata", "simple-config-with-escaped-env.yaml"), factories)
require.NoError(t, err, "Unable to load config")

Expand Down Expand Up @@ -459,8 +459,8 @@ func loadConfigFile(t *testing.T, fileName string, factories component.Factories
v, err := configparser.NewParserFromFile(fileName)
require.NoError(t, err)

// Load the config from the configparser.Parser using the given factories.
return Load(v, factories)
// Unmarshal the config from the configparser.Parser using the given factories.
return NewDefault().Unmarshal(v, factories)
}

type nestedConfig struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package configloader implements configuration loading from a config.Parser.
// Package configunmarshaler implements configuration unmarshalling from a config.Parser.
// The implementation relies on registered factories that allow creating
// default configuration for each type of receiver/exporter/processor.
package configloader
package configunmarshaler
27 changes: 27 additions & 0 deletions config/configunmarshaler/unmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configunmarshaler

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configparser"
)

// ConfigUnmarshaler is the interface that unmarshalls the collector configuration from the configparser.Parser.
type ConfigUnmarshaler interface {
// Unmarshal the configuration from the given parser and factories.
Unmarshal(v *configparser.Parser, factories component.Factories) (*config.Config, error)
}
32 changes: 19 additions & 13 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configloader"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/extension/ballastextension"
Expand Down Expand Up @@ -81,7 +81,8 @@ type Collector struct {

factories component.Factories

parserProvider parserprovider.ParserProvider
parserProvider parserprovider.ParserProvider
configUnmarshaler configunmarshaler.ConfigUnmarshaler

// shutdownChan is used to terminate the collector.
shutdownChan chan struct{}
Expand All @@ -102,14 +103,26 @@ func New(set CollectorSettings) (*Collector, error) {
}

col := &Collector{
info: set.BuildInfo,
factories: set.Factories,
stateChannel: make(chan State, Closed+1),
info: set.BuildInfo,
factories: set.Factories,
stateChannel: make(chan State, Closed+1),
parserProvider: set.ParserProvider,
configUnmarshaler: set.ConfigUnmarshaler,
// We use a negative in the settings not to break the existing
// behavior. Internally, allowGracefulShutodwn is more readable.
allowGracefulShutodwn: !set.DisableGracefulShutdown,
}

if col.parserProvider == nil {
// use default provider.
col.parserProvider = parserprovider.Default()
}

if col.configUnmarshaler == nil {
// use default provider.
col.configUnmarshaler = configunmarshaler.NewDefault()
}

rootCmd := &cobra.Command{
Use: set.BuildInfo.Command,
Version: set.BuildInfo.Version,
Expand Down Expand Up @@ -147,13 +160,6 @@ func New(set CollectorSettings) (*Collector, error) {
rootCmd.Flags().AddGoFlagSet(flagSet)
col.rootCmd = rootCmd

parserProvider := set.ParserProvider
if parserProvider == nil {
// use default provider.
parserProvider = parserprovider.Default()
}
col.parserProvider = parserProvider

return col, nil
}

Expand Down Expand Up @@ -240,7 +246,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return fmt.Errorf("cannot load configuration's parser: %w", err)
}

cfg, err := configloader.Load(cp, col.factories)
cfg, err := col.configUnmarshaler.Unmarshal(cp, col.factories)
if err != nil {
return fmt.Errorf("cannot load configuration: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/service/defaultcomponents"
"go.opentelemetry.io/collector/service/internal/builder"
"go.opentelemetry.io/collector/service/parserprovider"
Expand Down Expand Up @@ -329,11 +330,12 @@ func TestCollector_reloadService(t *testing.T) {
}

col := Collector{
logger: zap.NewNop(),
tracerProvider: trace.NewNoopTracerProvider(),
parserProvider: tt.parserProvider,
factories: factories,
service: tt.service,
logger: zap.NewNop(),
tracerProvider: trace.NewNoopTracerProvider(),
parserProvider: tt.parserProvider,
configUnmarshaler: configunmarshaler.NewDefault(),
factories: factories,
service: tt.service,
}

err := col.reloadService(ctx)
Expand Down
2 changes: 1 addition & 1 deletion service/internal/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterIDs []config.Compon
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)
cfg, err := configtest.LoadConfigAndValidate("testdata/pipelines_builder.yaml", factories)
// Load the config
// Unmarshal the config
require.Nil(t, err)

// BuildProcessors the pipeline
Expand Down
Loading

0 comments on commit cce6f70

Please sign in to comment.