Skip to content

Commit

Permalink
operator: expose receivers by default
Browse files Browse the repository at this point in the history
Signed-off-by: Benedikt Bongartz <[email protected]>
  • Loading branch information
frzifus committed Oct 4, 2024
1 parent 91759b0 commit 9cd5174
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 26 deletions.
2 changes: 1 addition & 1 deletion apis/v1beta1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error {
if len(otelcol.Spec.ManagementState) == 0 {
otelcol.Spec.ManagementState = ManagementStateManaged
}
return nil
return otelcol.Spec.Config.ApplyDefaults(c.logger)
}

func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
Expand Down
33 changes: 33 additions & 0 deletions apis/v1beta1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,39 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
expected v1beta1.OpenTelemetryCollector
shouldFailSar bool
}{
{
name: "update config defaults",
otelcol: v1beta1.OpenTelemetryCollector{
Spec: v1beta1.OpenTelemetryCollectorSpec{
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":null,"http":null}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
expected: v1beta1.OpenTelemetryCollector{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
Spec: v1beta1.OpenTelemetryCollectorSpec{
OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{
Args: map[string]string{"feature-gates": "-component.UseLocalHostAsDefaultHost"},
ManagementState: v1beta1.ManagementStateManaged,
Replicas: &one,
},
Mode: v1beta1.ModeDeployment,
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
Config: func() v1beta1.Config {
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
var cfg v1beta1.Config
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
return cfg
}(),
},
},
},
{
name: "all fields default",
otelcol: v1beta1.OpenTelemetryCollector{},
Expand Down
70 changes: 70 additions & 0 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"reflect"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -225,6 +226,71 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
return ports, nil
}

// getPortsForComponentKinds gets the ports for the given ComponentKind(s).
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
enabledComponents := c.GetEnabledComponents()
for _, componentKind := range componentKinds {
var retriever components.ParserRetriever
var cfg AnyConfig
switch componentKind {
case KindReceiver:
retriever = receivers.ReceiverFor
cfg = c.Receivers
case KindExporter:
continue
case KindProcessor:
continue
case KindExtension:
continue
}
for componentName := range enabledComponents[componentKind] {
parser := retriever(componentName)
if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil {
return err
} else {
// NOTE: The internally used parser returns components.SingleEndpointConfig
// as a map value. The following lines normalize this value..
got, err := yaml.Marshal(newCfg)
if err != nil {
return err
}
out := make(map[string]interface{}, 0)
if err := yaml.Unmarshal(got, out); err != nil {
return err
}
// NOTE: The underlying struct compoents.SingleEndpointConfig adds this listenaddress
// field. It is marshalled due to internal use. To avoid adding invalid fields to the

Check failure on line 262 in apis/v1beta1/config.go

View workflow job for this annotation

GitHub Actions / Code standards (linting)

`marshalled` is a misspelling of `marshaled` (misspell)
// collector config, this temporary workaround removes this field.
removeKeysRecursively(out, "listenaddress")
cfg.Object[componentName] = out
}
}
}

return nil
}

func removeKeysRecursively(m map[string]interface{}, keysToRemove ...string) {
for k, v := range m {
if slices.Contains(keysToRemove, k) {
delete(m, k)
continue
}

if nm, ok := v.(map[string]interface{}); ok {
removeKeysRecursively(nm, keysToRemove...)
}

if ns, ok := v.([]interface{}); ok {
for _, item := range ns {
if nestedMap, ok := item.(map[string]interface{}); ok {
removeKeysRecursively(nestedMap, keysToRemove...)
}
}
}
}
}

func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentKinds(logger, KindReceiver)
}
Expand All @@ -241,6 +307,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error
return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor)
}

func (c *Config) ApplyDefaults(logger logr.Logger) error {
return c.applyDefaultForComponentKinds(logger, KindReceiver)
}

// GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled
// that provides the hinting for the liveness probe.
func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) {
Expand Down
46 changes: 30 additions & 16 deletions internal/components/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType])

type Settings[ComponentConfigType any] struct {
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
defaultRecAddr string
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
defaultsApplier Defaulter[ComponentConfigType]
}

func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] {
Expand Down Expand Up @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build
o.appProtocol = appProtocol
})
}
func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultRecAddr = defaultRecAddr
})
}
func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.targetPort = intstr.FromInt32(targetPort)
Expand Down Expand Up @@ -118,19 +125,26 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat
})
}

func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultsApplier = defaultsApplier
})
}

func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) {
o := NewEmptySettings[ComponentConfigType]()
o.Apply(b...)
if len(o.name) == 0 {
return nil, fmt.Errorf("invalid settings struct, no name specified")
}
return &GenericParser[ComponentConfigType]{
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
settings: o,
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
defaultsApplier: o.defaultsApplier,
settings: o,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config
// It's expected that type Config is the configuration used by a parser.
type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error)

// Defaulter is a function that applies given defaults to the passed Config.
// It's expected that type Config is the configuration used by a parser.
type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, port int32, config ComponentConfigType) (ComponentConfigType, error)

// ComponentType returns the type for a given component name.
// components have a name like:
// - mycomponent/custom
Expand Down Expand Up @@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) {
type ParserRetriever func(string) Parser

type Parser interface {
// GetDefaultConfig .. TODO
GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error)

// Ports returns the service ports parsed based on the component's configuration where name is the component's name
// of the form "name" or "type/name"
Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error)
Expand Down
29 changes: 23 additions & 6 deletions internal/components/generic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,29 @@ var (
// GenericParser serves as scaffolding for custom parsing logic by isolating
// functionality to idempotent functions.
type GenericParser[T any] struct {
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
defaultsApplier Defaulter[T]
}

func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
if g.settings == nil || g.defaultsApplier == nil {
return config, nil
}

if g.settings.defaultRecAddr == "" {
return config, nil
}

var parsed T
if err := mapstructure.Decode(config, &parsed); err != nil {
return nil, err
}
return g.defaultsApplier(logger, g.settings.defaultRecAddr, g.settings.port, parsed)
}

func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
Expand Down
51 changes: 50 additions & 1 deletion internal/components/multi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type MultiPortOption func(parser *MultiPortReceiver)
type MultiPortReceiver struct {
name string

addrMappings map[string]string
portMappings map[string]*corev1.ServicePort
}

Expand Down Expand Up @@ -72,6 +73,37 @@ func (m *MultiPortReceiver) ParserName() string {
return fmt.Sprintf("__%s", m.name)
}

func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
multiProtoEndpointCfg := &MultiProtocolEndpointConfig{}
if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil {
return nil, err
}
tmp := make(map[string]*SingleEndpointConfig, len(multiProtoEndpointCfg.Protocols))
for protocol, ec := range multiProtoEndpointCfg.Protocols {
var port int32
if defaultSvc, ok := m.portMappings[protocol]; ok {
port = defaultSvc.Port
if ec != nil {
port = ec.GetPortNumOrDefault(logger, port)
}
}
var addr string
if defaultAddr, ok := m.addrMappings[protocol]; ok {
addr = defaultAddr
}
res, err := AddressDefaulter(logger, addr, port, ec)
if err != nil {
return nil, err
}
tmp[protocol] = res
}

for protocol, ec := range tmp {
multiProtoEndpointCfg.Protocols[protocol] = ec
}
return config, mapstructure.Decode(multiProtoEndpointCfg, &config)

}
func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
return nil, nil
}
Expand All @@ -91,7 +123,7 @@ func NewMultiPortReceiverBuilder(name string) MultiPortBuilder[*MultiProtocolEnd
}

func NewProtocolBuilder(name string, port int32) Builder[*MultiProtocolEndpointConfig] {
return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port)
return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port).WithDefaultsApplier(MultiAddressDefaulter)
}

func (mp MultiPortBuilder[ComponentConfigType]) AddPortMapping(builder Builder[ComponentConfigType]) MultiPortBuilder[ComponentConfigType] {
Expand All @@ -104,6 +136,7 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err
}
multiReceiver := &MultiPortReceiver{
name: mp[0].MustBuild().name,
addrMappings: map[string]string{},
portMappings: map[string]*corev1.ServicePort{},
}
for _, bu := range mp[1:] {
Expand All @@ -112,6 +145,9 @@ func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, err
return nil, err
}
multiReceiver.portMappings[built.name] = built.settings.GetServicePort()
if built.settings != nil {
multiReceiver.addrMappings[built.name] = built.settings.defaultRecAddr
}
}
return multiReceiver, nil
}
Expand All @@ -123,3 +159,16 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver {
return p
}
}

// MultiAddressDefaulter ...
func MultiAddressDefaulter(logger logr.Logger, defaultRecAddr string, port int32, config *MultiProtocolEndpointConfig) (*MultiProtocolEndpointConfig, error) {
for protocol, ec := range config.Protocols {
res, err := AddressDefaulter(logger, defaultRecAddr, port, ec)
if err != nil {
return nil, err
}
// TODO: can it be nil?
config.Protocols[protocol].Endpoint = res.Endpoint
}
return config, nil
}
2 changes: 2 additions & 0 deletions internal/components/receivers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ var (
components.NewMultiPortReceiverBuilder("otlp").
AddPortMapping(components.NewProtocolBuilder("grpc", 4317).
WithAppProtocol(&components.GrpcProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4317)).
AddPortMapping(components.NewProtocolBuilder("http", 4318).
WithAppProtocol(&components.HttpProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4318)).
MustBuild(),
components.NewMultiPortReceiverBuilder("skywalking").
Expand Down
Loading

0 comments on commit 9cd5174

Please sign in to comment.