Skip to content

Commit

Permalink
Split config unmarshaler per config section (#6390)
Browse files Browse the repository at this point in the history
This change will allow to break circular dependency (service | service/internal/pipelines | service/internal/configunmarshaler | config.Config) which blocks removing of the deprecated `config.Config`

Signed-off-by: Bogdan <[email protected]>

Signed-off-by: Bogdan <[email protected]>
  • Loading branch information
bogdandrutu authored Oct 24, 2022
1 parent 1ffae25 commit 9a43339
Show file tree
Hide file tree
Showing 41 changed files with 868 additions and 673 deletions.
2 changes: 1 addition & 1 deletion service/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (cm *configProvider) Get(ctx context.Context, factories component.Factories
}

var cfg *Config
if cfg, err = configunmarshaler.New().Unmarshal(retMap, factories); err != nil {
if cfg, err = configunmarshaler.Unmarshal(retMap, factories); err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err)
}

Expand Down
206 changes: 18 additions & 188 deletions service/internal/configunmarshaler/defaultunmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ const (
_ configErrorCode = iota

errUnmarshalTopLevelStructure
errUnmarshalExtension
errUnmarshalReceiver
errUnmarshalProcessor
errUnmarshalExporter
)

type configError struct {
Expand All @@ -50,42 +46,23 @@ type configError struct {
code configErrorCode
}

// YAML top-level configuration keys.
const (
// extensionsKeyName is the configuration key name for extensions section.
extensionsKeyName = "extensions"

// receiversKeyName is the configuration key name for receivers section.
receiversKeyName = "receivers"

// exportersKeyName is the configuration key name for exporters section.
exportersKeyName = "exporters"

// processorsKeyName is the configuration key name for processors section.
processorsKeyName = "processors"
)

type configSettings struct {
Receivers map[config.ComponentID]map[string]interface{} `mapstructure:"receivers"`
Processors map[config.ComponentID]map[string]interface{} `mapstructure:"processors"`
Exporters map[config.ComponentID]map[string]interface{} `mapstructure:"exporters"`
Extensions map[config.ComponentID]map[string]interface{} `mapstructure:"extensions"`
Service config.Service `mapstructure:"service"`
}

type ConfigUnmarshaler struct{}

// New returns a default ConfigUnmarshaler that unmarshalls every component's configuration
// using the custom unmarshaler if present or default strict unmarshaler otherwise.
func New() ConfigUnmarshaler {
return ConfigUnmarshaler{}
Receivers *Receivers `mapstructure:"receivers"`
Processors *Processors `mapstructure:"processors"`
Exporters *Exporters `mapstructure:"exporters"`
Extensions *Extensions `mapstructure:"extensions"`
Service config.Service `mapstructure:"service"`
}

// Unmarshal the config.Config from a confmap.Conf.
// After the config is unmarshalled, `Validate()` must be called to validate.
func (ConfigUnmarshaler) Unmarshal(v *confmap.Conf, factories component.Factories) (*config.Config, error) {
func Unmarshal(v *confmap.Conf, factories component.Factories) (*config.Config, error) {
// Unmarshal top level sections and validate.
rawCfg := configSettings{
Receivers: NewReceivers(factories.Receivers),
Processors: NewProcessors(factories.Processors),
Exporters: NewExporters(factories.Exporters),
Extensions: NewExtensions(factories.Extensions),
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
Service: config.Service{
Telemetry: telemetry.Config{
Expand Down Expand Up @@ -113,166 +90,19 @@ func (ConfigUnmarshaler) Unmarshal(v *confmap.Conf, factories component.Factorie
}
}

var cfg config.Config
var err error
if cfg.Extensions, err = unmarshalExtensions(rawCfg.Extensions, factories.Extensions); err != nil {
return nil, configError{
error: err,
code: errUnmarshalExtension,
}
}

if cfg.Receivers, err = unmarshalReceivers(rawCfg.Receivers, factories.Receivers); err != nil {
return nil, configError{
error: err,
code: errUnmarshalReceiver,
}
}

if cfg.Processors, err = unmarshalProcessors(rawCfg.Processors, factories.Processors); err != nil {
return nil, configError{
error: err,
code: errUnmarshalProcessor,
}
}

if cfg.Exporters, err = unmarshalExporters(rawCfg.Exporters, factories.Exporters); err != nil {
return nil, configError{
error: err,
code: errUnmarshalExporter,
}
}

cfg.Service = rawCfg.Service

return &cfg, nil
}

func unmarshalExtensions(exts map[config.ComponentID]map[string]interface{}, factories map[config.Type]component.ExtensionFactory) (map[config.ComponentID]config.Extension, error) {
// Prepare resulting map.
extensions := make(map[config.ComponentID]config.Extension)

// Iterate over extensions and create a config for each.
for id, value := range exts {
// Find extension factory based on "type" that we read from config source.
factory, ok := factories[id.Type()]
if !ok {
return nil, errorUnknownType(extensionsKeyName, id, reflect.ValueOf(factories).MapKeys())
}

// Create the default config for this extension.
extensionCfg := factory.CreateDefaultConfig()
extensionCfg.SetIDName(id.Name())

// Now that the default config struct is created we can Unmarshal into it,
// and it will apply user-defined config on top of the default.
if err := config.UnmarshalExtension(confmap.NewFromStringMap(value), extensionCfg); err != nil {
return nil, errorUnmarshalError(extensionsKeyName, id, err)
}

extensions[id] = extensionCfg
}

return extensions, nil
}

// LoadReceiver loads a receiver config from componentConfig using the provided factories.
func LoadReceiver(componentConfig *confmap.Conf, id config.ComponentID, factory component.ReceiverFactory) (config.Receiver, error) {
// Create the default config for this receiver.
receiverCfg := factory.CreateDefaultConfig()
receiverCfg.SetIDName(id.Name())

// Now that the default config struct is created we can Unmarshal into it,
// and it will apply user-defined config on top of the default.
if err := config.UnmarshalReceiver(componentConfig, receiverCfg); err != nil {
return nil, errorUnmarshalError(receiversKeyName, id, err)
}

return receiverCfg, nil
}

func unmarshalReceivers(recvs map[config.ComponentID]map[string]interface{}, factories map[config.Type]component.ReceiverFactory) (map[config.ComponentID]config.Receiver, error) {
// Prepare resulting map.
receivers := make(map[config.ComponentID]config.Receiver)

// Iterate over input map and create a config for each.
for id, value := range recvs {
// Find receiver factory based on "type" that we read from config source.
factory := factories[id.Type()]
if factory == nil {
return nil, errorUnknownType(receiversKeyName, id, reflect.ValueOf(factories).MapKeys())
}

receiverCfg, err := LoadReceiver(confmap.NewFromStringMap(value), id, factory)
if err != nil {
// LoadReceiver already wraps the error.
return nil, err
}

receivers[id] = receiverCfg
}

return receivers, nil
}

func unmarshalExporters(exps map[config.ComponentID]map[string]interface{}, factories map[config.Type]component.ExporterFactory) (map[config.ComponentID]config.Exporter, error) {
// Prepare resulting map.
exporters := make(map[config.ComponentID]config.Exporter)

// Iterate over Exporters and create a config for each.
for id, value := range exps {
// Find exporter factory based on "type" that we read from config source.
factory := factories[id.Type()]
if factory == nil {
return nil, errorUnknownType(exportersKeyName, id, reflect.ValueOf(factories).MapKeys())
}

// Create the default config for this exporter.
exporterCfg := factory.CreateDefaultConfig()
exporterCfg.SetIDName(id.Name())

// Now that the default config struct is created we can Unmarshal into it,
// and it will apply user-defined config on top of the default.
if err := config.UnmarshalExporter(confmap.NewFromStringMap(value), exporterCfg); err != nil {
return nil, errorUnmarshalError(exportersKeyName, id, err)
}

exporters[id] = exporterCfg
}

return exporters, nil
}

func unmarshalProcessors(procs map[config.ComponentID]map[string]interface{}, factories map[config.Type]component.ProcessorFactory) (map[config.ComponentID]config.Processor, error) {
// Prepare resulting map.
processors := make(map[config.ComponentID]config.Processor)

// Iterate over processors and create a config for each.
for id, value := range procs {
// Find processor factory based on "type" that we read from config source.
factory := factories[id.Type()]
if factory == nil {
return nil, errorUnknownType(processorsKeyName, id, reflect.ValueOf(factories).MapKeys())
}

// Create the default config for this processor.
processorCfg := factory.CreateDefaultConfig()
processorCfg.SetIDName(id.Name())

// Now that the default config struct is created we can Unmarshal into it,
// and it will apply user-defined config on top of the default.
if err := config.UnmarshalProcessor(confmap.NewFromStringMap(value), processorCfg); err != nil {
return nil, errorUnmarshalError(processorsKeyName, id, err)
}

processors[id] = processorCfg
cfg := &config.Config{
Receivers: rawCfg.Receivers.GetReceivers(),
Processors: rawCfg.Processors.GetProcessors(),
Exporters: rawCfg.Exporters.GetExporters(),
Extensions: rawCfg.Extensions.GetExtensions(),
Service: rawCfg.Service,
}

return processors, nil
return cfg, nil
}

func errorUnknownType(component string, id config.ComponentID, factories []reflect.Value) error {
return fmt.Errorf("unknown %s type %q for %q (valid values: %v)", component, id.Type(), id, factories)
return fmt.Errorf("unknown %s type: %q for id: %q (valid values: %v)", component, id.Type(), id, factories)
}

func errorUnmarshalError(component string, id config.ComponentID, err error) error {
Expand Down
Loading

0 comments on commit 9a43339

Please sign in to comment.