Skip to content

Commit

Permalink
components: apply MultiAddressDefaulter
Browse files Browse the repository at this point in the history
Signed-off-by: Benedikt Bongartz <[email protected]>
  • Loading branch information
frzifus committed Oct 7, 2024
1 parent 6df1e40 commit edf8554
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 52 deletions.
22 changes: 4 additions & 18 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
return ports, nil
}

// getPortsForComponentKinds gets the ports for the given ComponentKind(s).
// applyDefaultForComponentKinds applies defaults to the endpoints for the given ComponentKind(s).
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
enabledComponents := c.GetEnabledComponents()
for _, componentKind := range componentKinds {
Expand All @@ -244,25 +244,11 @@ func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKind
}
for componentName := range enabledComponents[componentKind] {
parser := retriever(componentName)
if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil {
newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName])
if err != nil {
return err
} else {
// NOTE: The internally used parser returns components.SingleEndpointConfig
// as a map value. The following lines normalize this value..
got, err := yaml.Marshal(newCfg)
if err != nil {
return err
}
out := make(map[string]interface{}, 0)
if err := yaml.Unmarshal(got, out); err != nil {
return err
}
// NOTE: The underlying struct compoents.SingleEndpointConfig adds this listenaddress
// field. It is marshaled due to internal use. To avoid adding invalid fields to the
// collector config, this temporary workaround removes this field.
// TODO: Try to get rid of it or move it into the parser.GetDefaultConfig method.
cfg.Object[componentName] = out
}
cfg.Object[componentName] = newCfg
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type PortRetriever interface {
GetPortNumOrDefault(logr.Logger, int32) int32
}

type AddressProvider = func(name string) (address string, port int32)

// PortParser is a function that returns a list of servicePorts given a config of type Config.
type PortParser[ComponentConfigType any] func(logger logr.Logger, name string, defaultPort *corev1.ServicePort, config ComponentConfigType) ([]corev1.ServicePort, error)

Expand All @@ -51,7 +53,7 @@ type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config Com

// Defaulter is a function that applies given defaults to the passed Config.
// It's expected that type Config is the configuration used by a parser.
type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, port int32, config ComponentConfigType) (ComponentConfigType, error)
type Defaulter[ComponentConfigType any] func(logger logr.Logger, addrProv AddressProvider, config ComponentConfigType) (map[string]interface{}, error)

// ComponentType returns the type for a given component name.
// components have a name like:
Expand Down
2 changes: 1 addition & 1 deletion internal/components/generic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface
if err := mapstructure.Decode(config, &parsed); err != nil {
return nil, err
}
return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.GetServicePort().Port, parsed)
return g.defaultsApplier(logger, func(string) (string, int32) { return g.settings.defaultRecAddr, g.settings.GetServicePort().Port }, parsed)
}

func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
Expand Down
69 changes: 48 additions & 21 deletions internal/components/multi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type MultiPortOption func(parser *MultiPortReceiver)
type MultiPortReceiver struct {
name string

addrMappings map[string]string
portMappings map[string]*corev1.ServicePort
addrMappings map[string]string
portMappings map[string]*corev1.ServicePort
defaultsApplier Defaulter[any]
}

func (m *MultiPortReceiver) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) {
Expand Down Expand Up @@ -78,11 +79,12 @@ func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interfac
if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil {
return nil, err
}
tmp := make(map[string]*SingleEndpointConfig, len(multiProtoEndpointCfg.Protocols))
for protocol, ec := range multiProtoEndpointCfg.Protocols {

defaulter := func(protocol string) (string, int32) {
var port int32
if defaultSvc, ok := m.portMappings[protocol]; ok {
port = defaultSvc.Port
ec := multiProtoEndpointCfg.Protocols[protocol]
if ec != nil {
port = ec.GetPortNumOrDefault(logger, port)
}
Expand All @@ -91,19 +93,12 @@ func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interfac
if defaultAddr, ok := m.addrMappings[protocol]; ok {
addr = defaultAddr
}
res, err := AddressDefaulter(logger, addr, port, ec)
if err != nil {
return nil, err
}
tmp[protocol] = res
}

for protocol, ec := range tmp {
multiProtoEndpointCfg.Protocols[protocol] = ec
return addr, port
}
return config, mapstructure.Decode(multiProtoEndpointCfg, &config)

return m.defaultsApplier(logger, defaulter, multiProtoEndpointCfg)
}

func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
return nil, nil
}
Expand Down Expand Up @@ -134,16 +129,20 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err
if len(mp) < 1 {
return nil, fmt.Errorf("must provide at least one port mapping")
}

mp0Defaulter := mp[0].MustBuild().defaultsApplier
multiReceiver := &MultiPortReceiver{
name: mp[0].MustBuild().name,
addrMappings: map[string]string{},
portMappings: map[string]*corev1.ServicePort{},
name: mp[0].MustBuild().name,
defaultsApplier: createMultiAddressDefaulter(mp0Defaulter),
addrMappings: map[string]string{},
portMappings: map[string]*corev1.ServicePort{},
}
for _, bu := range mp[1:] {
built, err := bu.Build()
if err != nil {
return nil, err
}
multiReceiver.defaultsApplier = createMultiAddressDefaulter(built.defaultsApplier)
multiReceiver.portMappings[built.name] = built.settings.GetServicePort()
if built.settings != nil {
multiReceiver.addrMappings[built.name] = built.settings.defaultRecAddr
Expand All @@ -160,13 +159,41 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver {
}
}

func MultiAddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *MultiProtocolEndpointConfig) (*MultiProtocolEndpointConfig, error) {
func createMultiAddressDefaulter[ComponentConfigType any](defaultsApplier Defaulter[ComponentConfigType]) Defaulter[any] {
return func(logger logr.Logger, addrProv AddressProvider, config any) (map[string]interface{}, error) {
tc, ok := config.(ComponentConfigType)
if !ok {
return nil, fmt.Errorf("invalid config type, expected ComponentConfigType")
}

result, err := defaultsApplier(logger, addrProv, tc)
if err != nil {
return nil, err
}

return result, nil
}
}

func MultiAddressDefaulter(logger logr.Logger, addrProv AddressProvider, config *MultiProtocolEndpointConfig) (map[string]interface{}, error) {
root := make(map[string]interface{})
if err := mapstructure.Decode(config, &root); err != nil {
return nil, err
}

proto, ok := root["protocols"].(map[string]interface{})
if !ok {
proto = make(map[string]interface{})
root["protocols"] = proto
}

for protocol, ec := range config.Protocols {
res, err := AddressDefaulter(logger, defaultRecAddr, port, ec)
res, err := AddressDefaulter(logger, func(string) (string, int32) { return addrProv(protocol) }, ec)
if err != nil {
return nil, err
}
config.Protocols[protocol].Endpoint = res.Endpoint
proto[protocol] = res
}
return config, nil

return root, nil
}
22 changes: 11 additions & 11 deletions internal/components/single_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/internal/naming"
Expand Down Expand Up @@ -90,22 +91,21 @@ func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEn
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter)
}

func AddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *SingleEndpointConfig) (*SingleEndpointConfig, error) {
func AddressDefaulter(logger logr.Logger, addrProv AddressProvider, config *SingleEndpointConfig) (map[string]interface{}, error) {
if config == nil {
config = &SingleEndpointConfig{}
}
defaultRecAddr, port := addrProv("")

if config.Endpoint == "" {
config.Endpoint = fmt.Sprintf("%s:%d", defaultRecAddr, port)
return config, nil
}

v := strings.Split(config.Endpoint, ":")
if len(v) < 2 {
return config, nil
} else {
v := strings.Split(config.Endpoint, ":")
if len(v) < 2 || v[0] == "" {
config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[len(v)-1])
}
}

if v[0] == "" {
config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1])
}
return config, nil
res := make(map[string]interface{})
return res, mapstructure.Decode(config, &res)
}

0 comments on commit edf8554

Please sign in to comment.