Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ConfigUnmarshaler interface to allow mutations on the parsed Config #3706

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/configtest/configtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package configtest
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configloader"
"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/config/configunmarshaler"
)

// LoadConfig loads a config from file, and does NOT validate the configuration.
Expand All @@ -28,8 +28,8 @@ func LoadConfig(fileName string, factories component.Factories) (*config.Config,
if err != nil {
return nil, err
}
// Load the config using the given factories.
return configloader.Load(cp, factories)
// Unmarshal the config using the given factories.
return configunmarshaler.NewDefault().Unmarshal(cp, factories)
}

// LoadConfigAndValidate loads a config from the file, and validates the configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package configloader
package configunmarshaler

import (
"fmt"
Expand All @@ -26,8 +26,8 @@ import (
"go.opentelemetry.io/collector/config/configparser"
)

// These are errors that can be returned by Load(). Note that error codes are not part
// of Load()'s public API, they are for internal unit testing only.
// These are errors that can be returned by Unmarshal(). Note that error codes are not part
// of Unmarshal()'s public API, they are for internal unit testing only.
type configErrorCode int

const (
Expand Down Expand Up @@ -89,12 +89,20 @@ type pipelineSettings struct {
Exporters []string `mapstructure:"exporters"`
}

// Load loads a Config from Parser.
// After loading the config, `Validate()` must be called to validate.
func Load(v *configparser.Parser, factories component.Factories) (*config.Config, error) {
type defaultUnmarshaler struct{}

// NewDefault returns a default ConfigUnmarshaler that unmarshalls every configuration
// using the custom unmarshaler if present or default to strict
func NewDefault() ConfigUnmarshaler {
return &defaultUnmarshaler{}
}

// Unmarshal the Config from a Parser.
// After the config is unmarshalled, `Validate()` must be called to validate.
func (*defaultUnmarshaler) Unmarshal(v *configparser.Parser, factories component.Factories) (*config.Config, error) {
var cfg config.Config

// Load the config.
// Unmarshal the config.

// Struct to validate top level sections.
var rawCfg configSettings
Expand All @@ -111,34 +119,34 @@ func Load(v *configparser.Parser, factories component.Factories) (*config.Config

// Start with the service extensions.

extensions, err := loadExtensions(cast.ToStringMap(v.Get(extensionsKeyName)), factories.Extensions)
extensions, err := unmarshalExtensions(cast.ToStringMap(v.Get(extensionsKeyName)), factories.Extensions)
if err != nil {
return nil, err
}
cfg.Extensions = extensions

// Load data components (receivers, exporters, and processors).
// Unmarshal data components (receivers, exporters, and processors).

receivers, err := loadReceivers(cast.ToStringMap(v.Get(receiversKeyName)), factories.Receivers)
receivers, err := unmarshalReceivers(cast.ToStringMap(v.Get(receiversKeyName)), factories.Receivers)
if err != nil {
return nil, err
}
cfg.Receivers = receivers

exporters, err := loadExporters(cast.ToStringMap(v.Get(exportersKeyName)), factories.Exporters)
exporters, err := unmarshalExporters(cast.ToStringMap(v.Get(exportersKeyName)), factories.Exporters)
if err != nil {
return nil, err
}
cfg.Exporters = exporters

processors, err := loadProcessors(cast.ToStringMap(v.Get(processorsKeyName)), factories.Processors)
processors, err := unmarshalProcessors(cast.ToStringMap(v.Get(processorsKeyName)), factories.Processors)
if err != nil {
return nil, err
}
cfg.Processors = processors

// Load the service and its data pipelines.
service, err := loadService(rawCfg.Service)
// Unmarshal the service and its data pipelines.
service, err := unmarshalService(rawCfg.Service)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -175,7 +183,7 @@ func errorDuplicateName(component string, id config.ComponentID) error {
}
}

func loadExtensions(exts map[string]interface{}, factories map[config.Type]component.ExtensionFactory) (config.Extensions, error) {
func unmarshalExtensions(exts map[string]interface{}, factories map[config.Type]component.ExtensionFactory) (config.Extensions, error) {
// Prepare resulting map.
extensions := make(config.Extensions)

Expand Down Expand Up @@ -217,7 +225,7 @@ func loadExtensions(exts map[string]interface{}, factories map[config.Type]compo
return extensions, nil
}

func loadService(rawService serviceSettings) (config.Service, error) {
func unmarshalService(rawService serviceSettings) (config.Service, error) {
var ret config.Service
ret.Extensions = make([]config.ComponentID, 0, len(rawService.Extensions))
for _, extIDStr := range rawService.Extensions {
Expand All @@ -230,7 +238,7 @@ func loadService(rawService serviceSettings) (config.Service, error) {

// Process the pipelines first so in case of error on them it can be properly
// reported.
pipelines, err := loadPipelines(rawService.Pipelines)
pipelines, err := unmarshalPipelines(rawService.Pipelines)
ret.Pipelines = pipelines

return ret, err
Expand All @@ -252,7 +260,7 @@ func LoadReceiver(componentConfig *configparser.Parser, id config.ComponentID, f
return receiverCfg, nil
}

func loadReceivers(recvs map[string]interface{}, factories map[config.Type]component.ReceiverFactory) (config.Receivers, error) {
func unmarshalReceivers(recvs map[string]interface{}, factories map[config.Type]component.ReceiverFactory) (config.Receivers, error) {
// Prepare resulting map.
receivers := make(config.Receivers)

Expand Down Expand Up @@ -289,7 +297,7 @@ func loadReceivers(recvs map[string]interface{}, factories map[config.Type]compo
return receivers, nil
}

func loadExporters(exps map[string]interface{}, factories map[config.Type]component.ExporterFactory) (config.Exporters, error) {
func unmarshalExporters(exps map[string]interface{}, factories map[config.Type]component.ExporterFactory) (config.Exporters, error) {
// Prepare resulting map.
exporters := make(config.Exporters)

Expand Down Expand Up @@ -331,7 +339,7 @@ func loadExporters(exps map[string]interface{}, factories map[config.Type]compon
return exporters, nil
}

func loadProcessors(procs map[string]interface{}, factories map[config.Type]component.ProcessorFactory) (config.Processors, error) {
func unmarshalProcessors(procs map[string]interface{}, factories map[config.Type]component.ProcessorFactory) (config.Processors, error) {
// Prepare resulting map.
processors := make(config.Processors)

Expand Down Expand Up @@ -373,7 +381,7 @@ func loadProcessors(procs map[string]interface{}, factories map[config.Type]comp
return processors, nil
}

func loadPipelines(pipelinesConfig map[string]pipelineSettings) (config.Pipelines, error) {
func unmarshalPipelines(pipelinesConfig map[string]pipelineSettings) (config.Pipelines, error) {
// Prepare resulting map.
pipelines := make(config.Pipelines)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package configloader
package configunmarshaler

import (
"os"
Expand All @@ -33,7 +33,7 @@ func TestDecodeConfig(t *testing.T) {
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)

// Load the config
// Unmarshal the config
cfg, err := loadConfigFile(t, path.Join(".", "testdata", "valid-config.yaml"), factories)
require.NoError(t, err, "Unable to load config")

Expand Down Expand Up @@ -186,7 +186,7 @@ func TestSimpleConfig(t *testing.T) {
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)

// Load the config
// Unmarshal the config
cfg, err := loadConfigFile(t, path.Join(".", "testdata", test.name+".yaml"), factories)
require.NoError(t, err, "Unable to load config")

Expand Down Expand Up @@ -276,7 +276,7 @@ func TestEscapedEnvVars(t *testing.T) {
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)

// Load the config
// Unmarshal the config
cfg, err := loadConfigFile(t, path.Join(".", "testdata", "simple-config-with-escaped-env.yaml"), factories)
require.NoError(t, err, "Unable to load config")

Expand Down Expand Up @@ -459,8 +459,8 @@ func loadConfigFile(t *testing.T, fileName string, factories component.Factories
v, err := configparser.NewParserFromFile(fileName)
require.NoError(t, err)

// Load the config from the configparser.Parser using the given factories.
return Load(v, factories)
// Unmarshal the config from the configparser.Parser using the given factories.
return NewDefault().Unmarshal(v, factories)
}

type nestedConfig struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package configloader implements configuration loading from a config.Parser.
// Package configunmarshaler implements configuration unmarshalling from a config.Parser.
// The implementation relies on registered factories that allow creating
// default configuration for each type of receiver/exporter/processor.
package configloader
package configunmarshaler
27 changes: 27 additions & 0 deletions config/configunmarshaler/unmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configunmarshaler

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configparser"
)

// ConfigUnmarshaler is the interface that unmarshalls the collector configuration from the configparser.Parser.
type ConfigUnmarshaler interface {
// Unmarshal the configuration from the given parser and factories.
Unmarshal(v *configparser.Parser, factories component.Factories) (*config.Config, error)
}
32 changes: 19 additions & 13 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configloader"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/extension/ballastextension"
Expand Down Expand Up @@ -81,7 +81,8 @@ type Collector struct {

factories component.Factories

parserProvider parserprovider.ParserProvider
parserProvider parserprovider.ParserProvider
configUnmarshaler configunmarshaler.ConfigUnmarshaler

// shutdownChan is used to terminate the collector.
shutdownChan chan struct{}
Expand All @@ -102,14 +103,26 @@ func New(set CollectorSettings) (*Collector, error) {
}

col := &Collector{
info: set.BuildInfo,
factories: set.Factories,
stateChannel: make(chan State, Closed+1),
info: set.BuildInfo,
factories: set.Factories,
stateChannel: make(chan State, Closed+1),
parserProvider: set.ParserProvider,
configUnmarshaler: set.ConfigUnmarshaler,
// We use a negative in the settings not to break the existing
// behavior. Internally, allowGracefulShutodwn is more readable.
allowGracefulShutodwn: !set.DisableGracefulShutdown,
}

if col.parserProvider == nil {
// use default provider.
col.parserProvider = parserprovider.Default()
}

if col.configUnmarshaler == nil {
// use default provider.
col.configUnmarshaler = configunmarshaler.NewDefault()
}

rootCmd := &cobra.Command{
Use: set.BuildInfo.Command,
Version: set.BuildInfo.Version,
Expand Down Expand Up @@ -147,13 +160,6 @@ func New(set CollectorSettings) (*Collector, error) {
rootCmd.Flags().AddGoFlagSet(flagSet)
col.rootCmd = rootCmd

parserProvider := set.ParserProvider
if parserProvider == nil {
// use default provider.
parserProvider = parserprovider.Default()
}
col.parserProvider = parserProvider

return col, nil
}

Expand Down Expand Up @@ -240,7 +246,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return fmt.Errorf("cannot load configuration's parser: %w", err)
}

cfg, err := configloader.Load(cp, col.factories)
cfg, err := col.configUnmarshaler.Unmarshal(cp, col.factories)
if err != nil {
return fmt.Errorf("cannot load configuration: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/service/defaultcomponents"
"go.opentelemetry.io/collector/service/internal/builder"
"go.opentelemetry.io/collector/service/parserprovider"
Expand Down Expand Up @@ -329,11 +330,12 @@ func TestCollector_reloadService(t *testing.T) {
}

col := Collector{
logger: zap.NewNop(),
tracerProvider: trace.NewNoopTracerProvider(),
parserProvider: tt.parserProvider,
factories: factories,
service: tt.service,
logger: zap.NewNop(),
tracerProvider: trace.NewNoopTracerProvider(),
parserProvider: tt.parserProvider,
configUnmarshaler: configunmarshaler.NewDefault(),
factories: factories,
service: tt.service,
}

err := col.reloadService(ctx)
Expand Down
2 changes: 1 addition & 1 deletion service/internal/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterIDs []config.Compon
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)
cfg, err := configtest.LoadConfigAndValidate("testdata/pipelines_builder.yaml", factories)
// Load the config
// Unmarshal the config
require.Nil(t, err)

// BuildProcessors the pipeline
Expand Down
Loading