diff --git a/internal/components/component.go b/internal/components/component.go new file mode 100644 index 0000000000..e704c39e25 --- /dev/null +++ b/internal/components/component.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "errors" + "regexp" + "strconv" + "strings" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +var ( + GrpcProtocol = "grpc" + HttpProtocol = "http" + UnsetPort int32 = 0 + PortNotFoundErr = errors.New("port should not be empty") +) + +type PortRetriever interface { + GetPortNum() (int32, error) + GetPortNumOrDefault(logr.Logger, int32) int32 +} + +type PortBuilderOption func(*corev1.ServicePort) + +func WithTargetPort(targetPort int32) PortBuilderOption { + return func(servicePort *corev1.ServicePort) { + servicePort.TargetPort = intstr.FromInt32(targetPort) + } +} + +func WithAppProtocol(proto *string) PortBuilderOption { + return func(servicePort *corev1.ServicePort) { + servicePort.AppProtocol = proto + } +} + +func WithProtocol(proto corev1.Protocol) PortBuilderOption { + return func(servicePort *corev1.ServicePort) { + servicePort.Protocol = proto + } +} + +// ComponentType returns the type for a given component name. +// components have a name like: +// - mycomponent/custom +// - mycomponent +// we extract the "mycomponent" part and see if we have a parser for the component. +func ComponentType(name string) string { + if strings.Contains(name, "/") { + return name[:strings.Index(name, "/")] + } + return name +} + +func PortFromEndpoint(endpoint string) (int32, error) { + var err error + var port int64 + + r := regexp.MustCompile(":[0-9]+") + + if r.MatchString(endpoint) { + portStr := r.FindString(endpoint) + cleanedPortStr := strings.Replace(portStr, ":", "", -1) + port, err = strconv.ParseInt(cleanedPortStr, 10, 32) + + if err != nil { + return 0, err + } + } + + if port == 0 { + return 0, PortNotFoundErr + } + + return int32(port), err +} + +type ComponentPortParser interface { + // Ports returns the service ports parsed based on the exporter's configuration + Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) + + // ParserType returns the name of this parser + ParserType() string + + // ParserName is an internal name for the parser + ParserName() string +} + +func ConstructServicePort(current *corev1.ServicePort, port int32) corev1.ServicePort { + return corev1.ServicePort{ + Name: current.Name, + Port: port, + TargetPort: current.TargetPort, + NodePort: current.NodePort, + AppProtocol: current.AppProtocol, + Protocol: current.Protocol, + } +} diff --git a/internal/components/component_test.go b/internal/components/component_test.go new file mode 100644 index 0000000000..4671e98087 --- /dev/null +++ b/internal/components/component_test.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +func TestComponentType(t *testing.T) { + for _, tt := range []struct { + desc string + name string + expected string + }{ + {"regular case", "myreceiver", "myreceiver"}, + {"named instance", "myreceiver/custom", "myreceiver"}, + } { + t.Run(tt.desc, func(t *testing.T) { + // test and verify + assert.Equal(t, tt.expected, components.ComponentType(tt.name)) + }) + } +} + +func TestReceiverParsePortFromEndpoint(t *testing.T) { + for _, tt := range []struct { + desc string + endpoint string + expected int + errorExpected bool + }{ + {"regular case", "http://localhost:1234", 1234, false}, + {"absolute with path", "http://localhost:1234/server-status?auto", 1234, false}, + {"no protocol", "0.0.0.0:1234", 1234, false}, + {"just port", ":1234", 1234, false}, + {"no port at all", "http://localhost", 0, true}, + {"overflow", "0.0.0.0:2147483648", 0, true}, + } { + t.Run(tt.desc, func(t *testing.T) { + // test + val, err := components.PortFromEndpoint(tt.endpoint) + if tt.errorExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + assert.EqualValues(t, tt.expected, val, "wrong port from endpoint %s: %d", tt.endpoint, val) + }) + } +} diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go new file mode 100644 index 0000000000..304d92d521 --- /dev/null +++ b/internal/components/multi_endpoint.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "fmt" + + "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +var _ ComponentPortParser = &MultiPortReceiver{} + +// MultiProtocolEndpointConfig represents the minimal struct for a given YAML configuration input containing a map to +// a struct with either endpoint or listen_address. +type MultiProtocolEndpointConfig struct { + Protocols map[string]*SingleEndpointConfig `mapstructure:"protocols"` +} + +// MultiPortOption allows the setting of options for a MultiPortReceiver. +type MultiPortOption func(parser *MultiPortReceiver) + +// MultiPortReceiver is a special parser for components with endpoints for each protocol. +type MultiPortReceiver struct { + name string + + portMappings map[string]*corev1.ServicePort +} + +func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { + multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} + if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { + return nil, err + } + var ports []corev1.ServicePort + for protocol, ec := range multiProtoEndpointCfg.Protocols { + if defaultSvc, ok := m.portMappings[protocol]; ok { + port := defaultSvc.Port + if ec != nil { + port = ec.GetPortNumOrDefault(logger, port) + defaultSvc.Name = naming.PortName(fmt.Sprintf("%s-%s", m.name, protocol), port) + } + ports = append(ports, ConstructServicePort(defaultSvc, port)) + } else { + return nil, fmt.Errorf("unknown protocol set: %s", protocol) + } + } + return ports, nil +} + +func (m *MultiPortReceiver) ParserType() string { + return ComponentType(m.name) +} + +func (m *MultiPortReceiver) ParserName() string { + return fmt.Sprintf("__%s", m.name) +} + +func NewMultiPortReceiver(name string, opts ...MultiPortOption) *MultiPortReceiver { + multiReceiver := &MultiPortReceiver{ + name: name, + portMappings: map[string]*corev1.ServicePort{}, + } + for _, opt := range opts { + opt(multiReceiver) + } + return multiReceiver +} + +func WithPortMapping(name string, port int32, opts ...PortBuilderOption) MultiPortOption { + return func(parser *MultiPortReceiver) { + servicePort := &corev1.ServicePort{ + Name: naming.PortName(fmt.Sprintf("%s-%s", parser.name, name), port), + Port: port, + } + for _, opt := range opts { + opt(servicePort) + } + parser.portMappings[name] = servicePort + } +} diff --git a/internal/components/multi_endpoint_test.go b/internal/components/multi_endpoint_test.go new file mode 100644 index 0000000000..8009b8e9f3 --- /dev/null +++ b/internal/components/multi_endpoint_test.go @@ -0,0 +1,329 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +var ( + httpConfig = map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{}, + }, + } + httpAndGrpcConfig = map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{}, + "grpc": map[string]interface{}{}, + }, + } +) + +func TestMultiPortReceiver_ParserName(t *testing.T) { + type fields struct { + name string + opts []components.MultiPortOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "__receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + want: "__receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) + assert.Equalf(t, tt.want, m.ParserName(), "ParserName()") + }) + } +} + +func TestMultiPortReceiver_ParserType(t *testing.T) { + type fields struct { + name string + opts []components.MultiPortOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2/test", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + want: "receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) + assert.Equalf(t, tt.want, m.ParserType(), "ParserType()") + }) + } +} + +func TestMultiPortReceiver_Ports(t *testing.T) { + type fields struct { + name string + opts []components.MultiPortOption + } + type args struct { + config interface{} + } + tests := []struct { + name string + fields fields + args args + want []corev1.ServicePort + wantErr assert.ErrorAssertionFunc + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + args: args{ + config: nil, + }, + want: nil, + wantErr: assert.NoError, + }, + { + name: "single port mapping without builder options", + fields: fields{ + name: "receiver2", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver2-http", + Port: 80, + }, + }, + wantErr: assert.NoError, + }, + { + name: "port mapping with target port", + fields: fields{ + name: "receiver3", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80, components.WithTargetPort(8080)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver3-http", + Port: 80, + TargetPort: intstr.FromInt32(8080), + }, + }, + wantErr: assert.NoError, + }, + { + name: "port mapping with app protocol", + fields: fields{ + name: "receiver4", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80, components.WithAppProtocol(&components.HttpProtocol)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver4-http", + Port: 80, + AppProtocol: &components.HttpProtocol, + }, + }, + wantErr: assert.NoError, + }, + { + name: "port mapping with protocol", + fields: fields{ + name: "receiver5", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80, components.WithProtocol(corev1.ProtocolTCP)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver5-http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + wantErr: assert.NoError, + }, + { + name: "multiple port mappings", + fields: fields{ + name: "receiver6", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + components.WithPortMapping("grpc", 4317, + components.WithTargetPort(4317), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol)), + }, + }, + args: args{ + config: httpAndGrpcConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver6-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + Protocol: corev1.ProtocolTCP, + AppProtocol: &components.GrpcProtocol, + }, + { + Name: "receiver6-http", + Port: 80, + }, + }, + wantErr: assert.NoError, + }, + { + name: "multiple port mappings only one enabled", + fields: fields{ + name: "receiver6", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + components.WithPortMapping("grpc", 4317, + components.WithTargetPort(4317), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver6-http", + Port: 80, + }, + }, + wantErr: assert.NoError, + }, + { + name: "error unmarshalling configuration", + fields: fields{ + name: "receiver1", + opts: nil, + }, + args: args{ + config: "invalid config", // Simulate an invalid config that causes LoadMap to fail + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "error marshaling configuration", + fields: fields{ + name: "receiver1", + opts: nil, + }, + args: args{ + config: func() {}, // Simulate an invalid config that causes LoadMap to fail + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "unknown protocol", + fields: fields{ + name: "receiver2", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + args: args{ + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "unknown": map[string]interface{}{}, + }, + }, + }, + want: nil, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) + got, err := m.Ports(logr.Discard(), tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { + return + } + assert.ElementsMatchf(t, tt.want, got, "Ports(%v)", tt.args.config) + }) + } +} diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go new file mode 100644 index 0000000000..2848b36514 --- /dev/null +++ b/internal/components/receivers/helpers.go @@ -0,0 +1,146 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivers + +import ( + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +// registry holds a record of all known receiver parsers. +var registry = make(map[string]components.ComponentPortParser) + +// Register adds a new parser builder to the list of known builders. +func Register(name string, p components.ComponentPortParser) { + registry[name] = p +} + +// IsRegistered checks whether a parser is registered with the given name. +func IsRegistered(name string) bool { + _, ok := registry[name] + return ok +} + +// BuilderFor returns a parser builder for the given exporter name. +func BuilderFor(name string) components.ComponentPortParser { + if parser, ok := registry[components.ComponentType(name)]; ok { + return parser + } + return components.NewSinglePortParser(components.ComponentType(name), components.UnsetPort) +} + +var ( + componentParsers = []components.ComponentPortParser{ + components.NewMultiPortReceiver("otlp", + components.WithPortMapping( + "grpc", + 4317, + components.WithAppProtocol(&components.GrpcProtocol), + components.WithTargetPort(4317), + ), components.WithPortMapping( + "http", + 4318, + components.WithAppProtocol(&components.HttpProtocol), + components.WithTargetPort(4318), + ), + ), + components.NewMultiPortReceiver("skywalking", + components.WithPortMapping(components.GrpcProtocol, 11800, + components.WithTargetPort(11800), + components.WithAppProtocol(&components.GrpcProtocol), + ), + components.WithPortMapping(components.HttpProtocol, 12800, + components.WithTargetPort(12800), + components.WithAppProtocol(&components.HttpProtocol), + )), + components.NewMultiPortReceiver("jaeger", + components.WithPortMapping(components.GrpcProtocol, 14250, + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol), + ), + components.WithPortMapping("thrift_http", 14268, + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.HttpProtocol), + ), + components.WithPortMapping("thrift_compact", 6831, + components.WithProtocol(corev1.ProtocolUDP), + ), + components.WithPortMapping("thrift_binary", 6832, + components.WithProtocol(corev1.ProtocolUDP), + ), + ), + components.NewMultiPortReceiver("loki", + components.WithPortMapping(components.GrpcProtocol, 9095, + components.WithTargetPort(9095), + components.WithAppProtocol(&components.GrpcProtocol), + ), + components.WithPortMapping(components.HttpProtocol, 3100, + components.WithTargetPort(3100), + components.WithAppProtocol(&components.HttpProtocol), + ), + ), + components.NewSinglePortParser("awsxray", 2000), + components.NewSinglePortParser("carbon", 2003), + components.NewSinglePortParser("collectd", 8081), + components.NewSinglePortParser("fluentforward", 8006), + components.NewSinglePortParser("influxdb", 8086), + components.NewSinglePortParser("opencensus", 55678, components.WithAppProtocol(nil)), + components.NewSinglePortParser("sapm", 7276), + components.NewSinglePortParser("signalfx", 9943), + components.NewSinglePortParser("splunk_hec", 8088), + components.NewSinglePortParser("statsd", 8125, components.WithProtocol(corev1.ProtocolUDP)), + components.NewSinglePortParser("tcplog", components.UnsetPort, components.WithProtocol(corev1.ProtocolTCP)), + components.NewSinglePortParser("udplog", components.UnsetPort, components.WithProtocol(corev1.ProtocolUDP)), + components.NewSinglePortParser("wavefront", 2003), + components.NewSinglePortParser("zipkin", 9411, components.WithAppProtocol(&components.HttpProtocol), components.WithProtocol(corev1.ProtocolTCP)), + NewScraperParser("prometheus"), + NewScraperParser("kubeletstats"), + NewScraperParser("sshcheck"), + NewScraperParser("cloudfoundry"), + NewScraperParser("vcenter"), + NewScraperParser("oracledb"), + NewScraperParser("snmp"), + NewScraperParser("googlecloudpubsub"), + NewScraperParser("chrony"), + NewScraperParser("jmx"), + NewScraperParser("podman_stats"), + NewScraperParser("pulsar"), + NewScraperParser("docker_stats"), + NewScraperParser("aerospike"), + NewScraperParser("zookeeper"), + NewScraperParser("prometheus_simple"), + NewScraperParser("saphana"), + NewScraperParser("riak"), + NewScraperParser("redis"), + NewScraperParser("rabbitmq"), + NewScraperParser("purefb"), + NewScraperParser("postgresql"), + NewScraperParser("nsxt"), + NewScraperParser("nginx"), + NewScraperParser("mysql"), + NewScraperParser("memcached"), + NewScraperParser("httpcheck"), + NewScraperParser("haproxy"), + NewScraperParser("flinkmetrics"), + NewScraperParser("couchdb"), + } +) + +func init() { + for _, parser := range componentParsers { + Register(parser.ParserType(), parser) + } +} diff --git a/internal/components/receivers/multi_endpoint_receiver_test.go b/internal/components/receivers/multi_endpoint_receiver_test.go new file mode 100644 index 0000000000..dde04b763f --- /dev/null +++ b/internal/components/receivers/multi_endpoint_receiver_test.go @@ -0,0 +1,378 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivers_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" +) + +var ( + grpc = "grpc" + http = "http" +) + +func TestMultiEndpointReceiverParsers(t *testing.T) { + type testCase struct { + name string + config interface{} + expectedErr error + expectedSvc []corev1.ServicePort + } + type fields struct { + receiverName string + parserName string + cases []testCase + } + for _, tt := range []fields{ + { + receiverName: "jaeger", + parserName: "__jaeger", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "jaeger-grpc", + Port: 14250, + Protocol: corev1.ProtocolTCP, + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "jaeger-grpc", + Port: 1234, + Protocol: corev1.ProtocolTCP, + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "thrift_http": map[string]interface{}{}, + "thrift_compact": map[string]interface{}{}, + "thrift_binary": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "jaeger-grpc", + Port: 14250, + Protocol: corev1.ProtocolTCP, + AppProtocol: &grpc, + }, + { + Name: "port-14268", + Port: 14268, + Protocol: corev1.ProtocolTCP, + AppProtocol: &http, + }, + { + Name: "port-6831", + Port: 6831, + Protocol: corev1.ProtocolUDP, + }, + { + Name: "port-6832", + Port: 6832, + Protocol: corev1.ProtocolUDP, + }, + }, + }, + }, + }, + { + receiverName: "otlp", + parserName: "__otlp", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 1234, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "http": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + { + Name: "otlp-http", + Port: 4318, + TargetPort: intstr.FromInt32(4318), + AppProtocol: &http, + }, + }, + }, + }, + }, + { + receiverName: "loki", + parserName: "__loki", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "loki-grpc", + Port: 9095, + TargetPort: intstr.FromInt32(9095), + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "loki-grpc", + Port: 1234, + TargetPort: intstr.FromInt32(9095), + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "http": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "loki-grpc", + Port: 9095, + TargetPort: intstr.FromInt32(9095), + AppProtocol: &grpc, + }, + { + Name: "loki-http", + Port: 3100, + TargetPort: intstr.FromInt32(3100), + AppProtocol: &http, + }, + }, + }, + }, + }, + { + receiverName: "skywalking", + parserName: "__skywalking", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "skywalking-grpc", + Port: 11800, + TargetPort: intstr.FromInt32(11800), + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "skywalking-grpc", + Port: 1234, + TargetPort: intstr.FromInt32(11800), + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "http": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "skywalking-grpc", + Port: 11800, + TargetPort: intstr.FromInt32(11800), + AppProtocol: &grpc, + }, + { + Name: "skywalking-http", + Port: 12800, + TargetPort: intstr.FromInt32(12800), + AppProtocol: &http, + }, + }, + }, + }, + }, + } { + t.Run(tt.receiverName, func(t *testing.T) { + t.Run("self registers", func(t *testing.T) { + // verify + assert.True(t, receivers.IsRegistered(tt.receiverName)) + }) + + t.Run("is found by name", func(t *testing.T) { + p := receivers.BuilderFor(tt.receiverName) + assert.Equal(t, tt.parserName, p.ParserName()) + }) + + t.Run("bad config errors", func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test + _, err := parser.Ports(logger, []interface{}{"junk"}) + + // verify + assert.ErrorContains(t, err, "expected a map, got 'slice'") + }) + t.Run("good config, unknown protocol", func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test + _, err := parser.Ports(logger, map[string]interface{}{ + "protocols": map[string]interface{}{ + "garbage": map[string]interface{}{}, + }, + }) + + // verify + assert.ErrorContains(t, err, "unknown protocol set: garbage") + }) + for _, kase := range tt.cases { + t.Run(kase.name, func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, kase.config) + if kase.expectedErr != nil { + assert.EqualError(t, err, kase.expectedErr.Error()) + return + } + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(kase.expectedSvc)) + assert.ElementsMatch(t, ports, kase.expectedSvc) + }) + } + + }) + } +} diff --git a/internal/components/receivers/scraper.go b/internal/components/receivers/scraper.go new file mode 100644 index 0000000000..8f01e95c3a --- /dev/null +++ b/internal/components/receivers/scraper.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivers + +import ( + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +var ( + _ components.ComponentPortParser = &ScraperParser{} +) + +type ScraperParser struct { + componentType string +} + +func (s *ScraperParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { + return nil, nil +} + +func (s *ScraperParser) ParserType() string { + return s.componentType +} + +func (s *ScraperParser) ParserName() string { + return fmt.Sprintf("__%s", s.componentType) +} + +func NewScraperParser(name string) *ScraperParser { + return &ScraperParser{ + componentType: components.ComponentType(name), + } +} diff --git a/internal/components/receivers/scraper_test.go b/internal/components/receivers/scraper_test.go new file mode 100644 index 0000000000..3456cbc6ff --- /dev/null +++ b/internal/components/receivers/scraper_test.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivers_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" +) + +func TestScraperParsers(t *testing.T) { + for _, tt := range []struct { + receiverName string + parserName string + defaultPort int + }{ + {"prometheus", "__prometheus", 0}, + {"kubeletstats", "__kubeletstats", 0}, + {"sshcheck", "__sshcheck", 0}, + {"cloudfoundry", "__cloudfoundry", 0}, + {"vcenter", "__vcenter", 0}, + {"oracledb", "__oracledb", 0}, + {"snmp", "__snmp", 0}, + {"googlecloudpubsub", "__googlecloudpubsub", 0}, + {"chrony", "__chrony", 0}, + {"jmx", "__jmx", 0}, + {"podman_stats", "__podman_stats", 0}, + {"pulsar", "__pulsar", 0}, + {"docker_stats", "__docker_stats", 0}, + {"aerospike", "__aerospike", 0}, + {"zookeeper", "__zookeeper", 0}, + {"prometheus_simple", "__prometheus_simple", 0}, + {"saphana", "__saphana", 0}, + {"riak", "__riak", 0}, + {"redis", "__redis", 0}, + {"rabbitmq", "__rabbitmq", 0}, + {"purefb", "__purefb", 0}, + {"postgresql", "__postgresql", 0}, + {"nsxt", "__nsxt", 0}, + {"nginx", "__nginx", 0}, + {"mysql", "__mysql", 0}, + {"memcached", "__memcached", 0}, + {"httpcheck", "__httpcheck", 0}, + {"haproxy", "__haproxy", 0}, + {"flinkmetrics", "__flinkmetrics", 0}, + {"couchdb", "__couchdb", 0}, + } { + t.Run(tt.receiverName, func(t *testing.T) { + t.Run("builds successfully", func(t *testing.T) { + // test + parser := receivers.BuilderFor(tt.receiverName) + + // verify + assert.Equal(t, tt.parserName, parser.ParserName()) + }) + + t.Run("default is nothing", func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, map[string]interface{}{}) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) + + t.Run("always returns nothing", func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0:65535", + }) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) + }) + } +} diff --git a/internal/components/receivers/single_endpoint_receiver_test.go b/internal/components/receivers/single_endpoint_receiver_test.go new file mode 100644 index 0000000000..f06353ca90 --- /dev/null +++ b/internal/components/receivers/single_endpoint_receiver_test.go @@ -0,0 +1,148 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivers_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +var logger = logf.Log.WithName("unit-tests") + +func TestParseEndpoint(t *testing.T) { + // prepare + // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser + parser := receivers.BuilderFor("myreceiver") + + // test + ports, err := parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].Port) +} + +func TestFailedToParseEndpoint(t *testing.T) { + // prepare + // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser + parser := receivers.BuilderFor("myreceiver") + + // test + ports, err := parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0", + }) + + // verify + assert.Error(t, err) + assert.Len(t, ports, 0) +} + +func TestDownstreamParsers(t *testing.T) { + for _, tt := range []struct { + desc string + receiverName string + parserName string + defaultPort int + listenAddrParser bool + }{ + {"zipkin", "zipkin", "__zipkin", 9411, false}, + {"opencensus", "opencensus", "__opencensus", 55678, false}, + + // contrib receivers + {"carbon", "carbon", "__carbon", 2003, false}, + {"collectd", "collectd", "__collectd", 8081, false}, + {"sapm", "sapm", "__sapm", 7276, false}, + {"signalfx", "signalfx", "__signalfx", 9943, false}, + {"wavefront", "wavefront", "__wavefront", 2003, false}, + {"fluentforward", "fluentforward", "__fluentforward", 8006, false}, + {"statsd", "statsd", "__statsd", 8125, false}, + {"influxdb", "influxdb", "__influxdb", 8086, false}, + {"splunk_hec", "splunk_hec", "__splunk_hec", 8088, false}, + {"awsxray", "awsxray", "__awsxray", 2000, false}, + {"tcplog", "tcplog", "__tcplog", 0, true}, + {"udplog", "udplog", "__udplog", 0, true}, + } { + t.Run(tt.receiverName, func(t *testing.T) { + t.Run("builds successfully", func(t *testing.T) { + // test + parser := receivers.BuilderFor(tt.receiverName) + + // verify + assert.Equal(t, tt.parserName, parser.ParserName()) + }) + t.Run("bad config errors", func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test throwing in pure junk + _, err := parser.Ports(logger, func() {}) + + // verify + assert.ErrorContains(t, err, "expected a map, got 'func'") + }) + + t.Run("assigns the expected port", func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, map[string]interface{}{}) + + if tt.defaultPort == 0 { + assert.Len(t, ports, 0) + return + } + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, tt.defaultPort, ports[0].Port) + assert.Equal(t, naming.PortName(tt.receiverName, int32(tt.defaultPort)), ports[0].Name) + }) + + t.Run("allows port to be overridden", func(t *testing.T) { + // prepare + parser := receivers.BuilderFor(tt.receiverName) + + // test + var ports []corev1.ServicePort + var err error + if tt.listenAddrParser { + ports, err = parser.Ports(logger, map[string]interface{}{ + "listen_address": "0.0.0.0:65535", + }) + } else { + ports, err = parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0:65535", + }) + } + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 65535, ports[0].Port) + assert.Equal(t, naming.PortName(tt.receiverName, int32(tt.defaultPort)), ports[0].Name) + }) + }) + } +} diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go new file mode 100644 index 0000000000..f7de2b7aaa --- /dev/null +++ b/internal/components/single_endpoint.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "fmt" + + "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +var ( + _ ComponentPortParser = &SingleEndpointParser{} +) + +// SingleEndpointConfig represents the minimal struct for a given YAML configuration input containing either +// endpoint or listen_address. +type SingleEndpointConfig struct { + Endpoint string `mapstructure:"endpoint,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty"` +} + +func (g *SingleEndpointConfig) GetPortNumOrDefault(logger logr.Logger, p int32) int32 { + num, err := g.GetPortNum() + if err != nil { + logger.V(3).Info("no port set, using default: %d", p) + return p + } + return num +} + +func (g *SingleEndpointConfig) GetPortNum() (int32, error) { + if len(g.Endpoint) > 0 { + return PortFromEndpoint(g.Endpoint) + } else if len(g.ListenAddress) > 0 { + return PortFromEndpoint(g.ListenAddress) + } + return 0, PortNotFoundErr +} + +// SingleEndpointParser is a special parser for a generic receiver that has an endpoint or listen_address in its +// configuration. It doesn't self-register and should be created/used directly. +type SingleEndpointParser struct { + name string + + svcPort *corev1.ServicePort +} + +func (s *SingleEndpointParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { + singleEndpointConfig := &SingleEndpointConfig{} + if err := mapstructure.Decode(config, singleEndpointConfig); err != nil { + return nil, err + } + if _, err := singleEndpointConfig.GetPortNum(); err != nil && s.svcPort.Port == UnsetPort { + logger.WithValues("receiver", s.name).Error(err, "couldn't parse the endpoint's port and no default port set") + return []corev1.ServicePort{}, err + } + + port := singleEndpointConfig.GetPortNumOrDefault(logger, s.svcPort.Port) + s.svcPort.Name = naming.PortName(s.name, port) + return []corev1.ServicePort{ConstructServicePort(s.svcPort, port)}, nil +} + +func (s *SingleEndpointParser) ParserType() string { + return ComponentType(s.name) +} + +func (s *SingleEndpointParser) ParserName() string { + return fmt.Sprintf("__%s", s.name) +} + +func NewSinglePortParser(name string, port int32, opts ...PortBuilderOption) *SingleEndpointParser { + servicePort := &corev1.ServicePort{ + Name: naming.PortName(name, port), + Port: port, + } + for _, opt := range opts { + opt(servicePort) + } + return &SingleEndpointParser{name: name, svcPort: servicePort} +} diff --git a/internal/components/single_endpoint_test.go b/internal/components/single_endpoint_test.go new file mode 100644 index 0000000000..b0efdb1c90 --- /dev/null +++ b/internal/components/single_endpoint_test.go @@ -0,0 +1,294 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +func TestSingleEndpointConfig_GetPortNumOrDefault(t *testing.T) { + type fields struct { + Endpoint string + ListenAddress string + } + type args struct { + p int32 + } + tests := []struct { + name string + fields fields + args args + want int32 + }{ + { + name: "Test with valid endpoint", + fields: fields{ + Endpoint: "example.com:8080", + ListenAddress: "", + }, + args: args{ + p: 9000, + }, + want: 8080, + }, + { + name: "Test with valid listen address", + fields: fields{ + Endpoint: "", + ListenAddress: "0.0.0.0:9090", + }, + args: args{ + p: 9000, + }, + want: 9090, + }, + { + name: "Test with invalid configuration (no endpoint or listen address)", + fields: fields{ + Endpoint: "", + ListenAddress: "", + }, + args: args{ + p: 9000, + }, + want: 9000, // Should return default port + }, + { + name: "Test with invalid endpoint format", + fields: fields{ + Endpoint: "invalid_endpoint", + ListenAddress: "", + }, + args: args{ + p: 9000, + }, + want: 9000, // Should return default port + }, + { + name: "Test with invalid listen address format", + fields: fields{ + Endpoint: "", + ListenAddress: "invalid_listen_address", + }, + args: args{ + p: 9000, + }, + want: 9000, // Should return default port + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := &components.SingleEndpointConfig{ + Endpoint: tt.fields.Endpoint, + ListenAddress: tt.fields.ListenAddress, + } + assert.Equalf(t, tt.want, g.GetPortNumOrDefault(logr.Discard(), tt.args.p), "GetPortNumOrDefault(%v)", tt.args.p) + }) + } +} + +func TestSingleEndpointParser_ParserName(t *testing.T) { + type fields struct { + name string + port int32 + opts []components.PortBuilderOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "__receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2", + opts: []components.PortBuilderOption{ + components.WithTargetPort(8080), + }, + }, + want: "__receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + assert.Equalf(t, tt.want, s.ParserName(), "ParserName()") + }) + } +} + +func TestSingleEndpointParser_ParserType(t *testing.T) { + type fields struct { + name string + port int32 + opts []components.PortBuilderOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2/test", + opts: []components.PortBuilderOption{ + components.WithTargetPort(80), + }, + }, + want: "receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + assert.Equalf(t, tt.want, s.ParserType(), "ParserType()") + }) + } +} + +func TestSingleEndpointParser_Ports(t *testing.T) { + type fields struct { + name string + port int32 + opts []components.PortBuilderOption + } + type args struct { + config interface{} + } + tests := []struct { + name string + fields fields + args args + want []corev1.ServicePort + wantErr assert.ErrorAssertionFunc + }{ + { + name: "ValidConfigWithPort", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: map[string]interface{}{ + "port": 8080, + }, + }, + want: []corev1.ServicePort{ + {Name: "testparser", Port: 8080}, + }, + wantErr: assert.NoError, + }, + { + name: "ValidConfigWithDefaultPort", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: map[string]interface{}{}, + }, + want: []corev1.ServicePort{ + {Name: "testparser", Port: 8080}, + }, + wantErr: assert.NoError, + }, + { + name: "ConfigWithFixins", + fields: fields{ + name: "testparser", + port: 8080, + opts: []components.PortBuilderOption{ + components.WithTargetPort(4317), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol), + }, + }, + args: args{ + config: map[string]interface{}{}, + }, + want: []corev1.ServicePort{ + { + Name: "testparser", + Port: 8080, + TargetPort: intstr.FromInt32(4317), + Protocol: corev1.ProtocolTCP, + AppProtocol: &components.GrpcProtocol, + }, + }, + wantErr: assert.NoError, + }, + { + name: "InvalidConfigMissingPort", + fields: fields{ + name: "testparser", + port: 0, + }, + args: args{ + config: map[string]interface{}{ + "endpoint": "garbageeeee", + }, + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "ErrorParsingConfig", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: "invalid config", + }, + want: nil, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + got, err := s.Ports(logr.Discard(), tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { + return + } + assert.ElementsMatchf(t, tt.want, got, "Ports(%v)", tt.args.config) + }) + } +}