From 4c07a3b5dc8084dbec12a67ee00733d608b68c5d Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 20 May 2024 15:08:31 -0400 Subject: [PATCH 1/7] Simplified parsers for 95% of use cases --- internal/components/component.go | 164 +++++++++ internal/components/component_test.go | 66 ++++ internal/components/multi_endpoint.go | 148 ++++++++ internal/components/multi_endpoint_test.go | 353 ++++++++++++++++++++ internal/components/scraper.go | 80 +++++ internal/components/single_endpoint.go | 111 ++++++ internal/components/single_endpoint_test.go | 122 +++++++ 7 files changed, 1044 insertions(+) create mode 100644 internal/components/component.go create mode 100644 internal/components/component_test.go create mode 100644 internal/components/multi_endpoint.go create mode 100644 internal/components/multi_endpoint_test.go create mode 100644 internal/components/scraper.go create mode 100644 internal/components/single_endpoint.go create mode 100644 internal/components/single_endpoint_test.go diff --git a/internal/components/component.go b/internal/components/component.go new file mode 100644 index 0000000000..861e4d1d3f --- /dev/null +++ b/internal/components/component.go @@ -0,0 +1,164 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "encoding/json" + "errors" + "regexp" + "strconv" + "strings" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +const ( + unsetPort = 0 +) + +var ( + portNotFoundErr = errors.New("port should not be empty") + grpc = "grpc" + http = "http" +) + +type PortRetriever interface { + GetPortNum() (int32, error) + GetPortNumOrDefault(logr.Logger, int32) int32 +} + +type PortBuilderOption func(portBuilder *corev1.ServicePort) + +func WithTargetPort(targetPort int32) PortBuilderOption { + return func(servicePort *corev1.ServicePort) { + servicePort.TargetPort = intstr.FromInt32(targetPort) + } +} +func WithNodePort(nodePort int32) PortBuilderOption { + return func(servicePort *corev1.ServicePort) { + servicePort.NodePort = nodePort + } +} + +func WithAppProtocol(proto *string) PortBuilderOption { + return func(servicePort *corev1.ServicePort) { + servicePort.AppProtocol = proto + } +} + +func WithProtocol(proto corev1.Protocol) PortBuilderOption { + return func(servicePort *corev1.ServicePort) { + servicePort.Protocol = proto + } +} + +func ComponentType(name string) string { + // components have a name like: + // - mycomponent/custom + // - mycomponent + // we extract the "mycomponent" part and see if we have a parser for the component + if strings.Contains(name, "/") { + return name[:strings.Index(name, "/")] + } + + return name +} + +func PortFromEndpoint(endpoint string) (int32, error) { + var err error + var port int64 + + r := regexp.MustCompile(":[0-9]+") + + if r.MatchString(endpoint) { + port, err = strconv.ParseInt(strings.Replace(r.FindString(endpoint), ":", "", -1), 10, 32) + + if err != nil { + return 0, err + } + } + + if port == 0 { + return 0, portNotFoundErr + } + + return int32(port), err +} + +type ComponentPortParser interface { + // Ports returns the service ports parsed based on the exporter's configuration + Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) + + // ParserType returns the name of this parser + ParserType() string + + // ParserName is an internal name for the parser + ParserName() string +} + +// registry holds a record of all known receiver parsers. +var registry = make(map[string]ComponentPortParser) + +// Register adds a new parser builder to the list of known builders. +func Register(name string, p ComponentPortParser) { + registry[name] = p +} + +// IsRegistered checks whether a parser is registered with the given name. +func IsRegistered(name string) bool { + _, ok := registry[name] + return ok +} + +// BuilderFor returns a parser builder for the given exporter name. +func BuilderFor(name string) ComponentPortParser { + if parser, ok := registry[ComponentType(name)]; ok { + return parser + } + return NewSinglePortParser(ComponentType(name), unsetPort) +} + +func LoadMap[T any](m interface{}, in T) error { + // Convert map to JSON bytes + yamlData, err := json.Marshal(m) + if err != nil { + return err + } + // Unmarshal YAML into the provided struct + if err := json.Unmarshal(yamlData, in); err != nil { + return err + } + return nil +} + +func ConstructServicePort(current *corev1.ServicePort, port int32) corev1.ServicePort { + return corev1.ServicePort{ + Name: current.Name, + Port: port, + TargetPort: current.TargetPort, + NodePort: current.NodePort, + AppProtocol: current.AppProtocol, + Protocol: current.Protocol, + } +} + +func init() { + parsers := append(scraperReceivers, append(singleEndpointConfigs, multiPortReceivers...)...) + for _, parser := range parsers { + Register(parser.ParserType(), parser) + } +} diff --git a/internal/components/component_test.go b/internal/components/component_test.go new file mode 100644 index 0000000000..3ff4356b71 --- /dev/null +++ b/internal/components/component_test.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +func TestComponentType(t *testing.T) { + for _, tt := range []struct { + desc string + name string + expected string + }{ + {"regular case", "myreceiver", "myreceiver"}, + {"named instance", "myreceiver/custom", "myreceiver"}, + } { + t.Run(tt.desc, func(t *testing.T) { + // test and verify + assert.Equal(t, tt.expected, components.ComponentType(tt.name)) + }) + } +} + +func TestReceiverParsePortFromEndpoint(t *testing.T) { + for _, tt := range []struct { + desc string + endpoint string + expected int + errorExpected bool + }{ + {"regular case", "http://localhost:1234", 1234, false}, + {"absolute with path", "http://localhost:1234/server-status?auto", 1234, false}, + {"no protocol", "0.0.0.0:1234", 1234, false}, + {"just port", ":1234", 1234, false}, + {"no port at all", "http://localhost", 0, true}, + } { + t.Run(tt.desc, func(t *testing.T) { + // test + val, err := components.PortFromEndpoint(tt.endpoint) + if tt.errorExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + assert.EqualValues(t, tt.expected, val, "wrong port from endpoint %s: %d", tt.endpoint, val) + }) + } +} diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go new file mode 100644 index 0000000000..588f79046f --- /dev/null +++ b/internal/components/multi_endpoint.go @@ -0,0 +1,148 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "errors" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +var ( + _ ComponentPortParser = &MultiPortReceiver{} + multiPortReceivers = []ComponentPortParser{ + NewMultiPortReceiver("otlp", + WithPortMapping( + "grpc", + 4317, + WithAppProtocol(&grpc), + WithTargetPort(4317), + ), WithPortMapping( + "http", + 4318, + WithAppProtocol(&http), + WithTargetPort(4318), + ), + ), + NewMultiPortReceiver("skywalking", + WithPortMapping(grpc, 11800, + WithTargetPort(11800), + WithAppProtocol(&grpc), + ), + WithPortMapping(http, 12800, + WithTargetPort(12800), + WithAppProtocol(&http), + )), + NewMultiPortReceiver("jaeger", + WithPortMapping(grpc, 14250, + WithProtocol(corev1.ProtocolTCP), + WithAppProtocol(&grpc), + ), + WithPortMapping("thrift_http", 14268, + WithProtocol(corev1.ProtocolTCP), + WithAppProtocol(&http), + ), + WithPortMapping("thrift_compact", 6831, + WithProtocol(corev1.ProtocolUDP), + ), + WithPortMapping("thrift_binary", 6832, + WithProtocol(corev1.ProtocolUDP), + ), + ), + NewMultiPortReceiver("loki", + WithPortMapping(grpc, 9095, + WithTargetPort(9095), + WithAppProtocol(&grpc), + ), + WithPortMapping(http, 3100, + WithTargetPort(3100), + WithAppProtocol(&http), + ), + ), + } +) + +// MultiProtocolEndpointConfig represents the minimal struct for a given YAML configuration input containing a map to +// a struct with either endpoint or listen_address. +type MultiProtocolEndpointConfig struct { + Protocols map[string]*SingleEndpointConfig `json:"protocols"` +} + +// MultiPortOption allows the setting of options for +type MultiPortOption func(parser *MultiPortReceiver) + +// MultiPortReceiver is a special parser for components with endpoints for each protocol. +type MultiPortReceiver struct { + name string + + portMappings map[string]*corev1.ServicePort +} + +func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { + multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} + if err := LoadMap[*MultiProtocolEndpointConfig](config, multiProtoEndpointCfg); err != nil { + return nil, err + } + var ports []corev1.ServicePort + for protocol, ec := range multiProtoEndpointCfg.Protocols { + if defaultSvc, ok := m.portMappings[protocol]; ok { + port := defaultSvc.Port + if ec != nil { + port = ec.GetPortNumOrDefault(logger, port) + defaultSvc.Name = naming.PortName(fmt.Sprintf("%s-%s", m.name, protocol), port) + } + ports = append(ports, ConstructServicePort(defaultSvc, port)) + } else { + return nil, errors.New(fmt.Sprintf("unknown protocol set: %s", protocol)) + } + } + return ports, nil +} + +func (m *MultiPortReceiver) ParserType() string { + return ComponentType(m.name) +} + +func (m *MultiPortReceiver) ParserName() string { + return fmt.Sprintf("__%s", m.name) +} + +func NewMultiPortReceiver(name string, opts ...MultiPortOption) *MultiPortReceiver { + multiReceiver := &MultiPortReceiver{ + name: name, + portMappings: map[string]*corev1.ServicePort{}, + } + for _, opt := range opts { + opt(multiReceiver) + } + return multiReceiver +} + +func WithPortMapping(name string, port int32, opts ...PortBuilderOption) MultiPortOption { + return func(parser *MultiPortReceiver) { + servicePort := &corev1.ServicePort{ + Name: naming.PortName(fmt.Sprintf("%s-%s", parser.name, name), port), + Port: port, + } + for _, opt := range opts { + opt(servicePort) + } + parser.portMappings[name] = servicePort + } +} diff --git a/internal/components/multi_endpoint_test.go b/internal/components/multi_endpoint_test.go new file mode 100644 index 0000000000..f01ccaa473 --- /dev/null +++ b/internal/components/multi_endpoint_test.go @@ -0,0 +1,353 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +var ( + grpc = "grpc" + http = "http" +) + +func TestMultiEndpointParsers(t *testing.T) { + type testCase struct { + name string + config interface{} + expectedErr error + expectedSvc []corev1.ServicePort + } + type fields struct { + receiverName string + parserName string + cases []testCase + } + for _, tt := range []fields{ + { + receiverName: "jaeger", + parserName: "__jaeger", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "jaeger-grpc", + Port: 14250, + Protocol: corev1.ProtocolTCP, + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "jaeger-grpc", + Port: 1234, + Protocol: corev1.ProtocolTCP, + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "thrift_http": map[string]interface{}{}, + "thrift_compact": map[string]interface{}{}, + "thrift_binary": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "jaeger-grpc", + Port: 14250, + Protocol: corev1.ProtocolTCP, + AppProtocol: &grpc, + }, + { + Name: "port-14268", + Port: 14268, + Protocol: corev1.ProtocolTCP, + AppProtocol: &http, + }, + { + Name: "port-6831", + Port: 6831, + Protocol: corev1.ProtocolUDP, + }, + { + Name: "port-6832", + Port: 6832, + Protocol: corev1.ProtocolUDP, + }, + }, + }, + }, + }, + { + receiverName: "otlp", + parserName: "__otlp", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 1234, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "http": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + { + Name: "otlp-http", + Port: 4318, + TargetPort: intstr.FromInt32(4318), + AppProtocol: &http, + }, + }, + }, + }, + }, + { + receiverName: "loki", + parserName: "__loki", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "loki-grpc", + Port: 9095, + TargetPort: intstr.FromInt32(9095), + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "loki-grpc", + Port: 1234, + TargetPort: intstr.FromInt32(9095), + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "http": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "loki-grpc", + Port: 9095, + TargetPort: intstr.FromInt32(9095), + AppProtocol: &grpc, + }, + { + Name: "loki-http", + Port: 3100, + TargetPort: intstr.FromInt32(3100), + AppProtocol: &http, + }, + }, + }, + }, + }, + { + receiverName: "skywalking", + parserName: "__skywalking", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "skywalking-grpc", + Port: 11800, + TargetPort: intstr.FromInt32(11800), + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "skywalking-grpc", + Port: 1234, + TargetPort: intstr.FromInt32(11800), + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "http": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "skywalking-grpc", + Port: 11800, + TargetPort: intstr.FromInt32(11800), + AppProtocol: &grpc, + }, + { + Name: "skywalking-http", + Port: 12800, + TargetPort: intstr.FromInt32(12800), + AppProtocol: &http, + }, + }, + }, + }, + }, + } { + t.Run(tt.receiverName, func(t *testing.T) { + t.Run("self registers", func(t *testing.T) { + // verify + assert.True(t, components.IsRegistered(tt.receiverName)) + }) + + t.Run("is found by name", func(t *testing.T) { + p := components.BuilderFor(tt.receiverName) + assert.Equal(t, tt.parserName, p.ParserName()) + }) + for _, kase := range tt.cases { + t.Run(kase.name, func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, kase.config) + if kase.expectedErr != nil { + assert.EqualError(t, err, kase.expectedErr.Error()) + return + } + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(kase.expectedSvc)) + assert.ElementsMatch(t, ports, kase.expectedSvc) + }) + } + + }) + } +} diff --git a/internal/components/scraper.go b/internal/components/scraper.go new file mode 100644 index 0000000000..4e3bb808ae --- /dev/null +++ b/internal/components/scraper.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" +) + +var ( + _ ComponentPortParser = &ScraperParser{} + scraperReceivers = []ComponentPortParser{ + NewScraperParser("prometheus"), + NewScraperParser("kubeletstats"), + NewScraperParser("sshcheck"), + NewScraperParser("cloudfoundry"), + NewScraperParser("vcenter"), + NewScraperParser("oracledb"), + NewScraperParser("snmp"), + NewScraperParser("googlecloudpubsub"), + NewScraperParser("chrony"), + NewScraperParser("jmx"), + NewScraperParser("podman_stats"), + NewScraperParser("pulsar"), + NewScraperParser("docker_stats"), + NewScraperParser("aerospike"), + NewScraperParser("zookeeper"), + NewScraperParser("prometheus_simple"), + NewScraperParser("saphana"), + NewScraperParser("riak"), + NewScraperParser("redis"), + NewScraperParser("rabbitmq"), + NewScraperParser("purefb"), + NewScraperParser("postgresql"), + NewScraperParser("nsxt"), + NewScraperParser("nginx"), + NewScraperParser("mysql"), + NewScraperParser("memcached"), + NewScraperParser("httpcheck"), + NewScraperParser("haproxy"), + NewScraperParser("flinkmetrics"), + NewScraperParser("couchdb"), + } +) + +type ScraperParser struct { + componentType string +} + +func (s *ScraperParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { + return nil, nil +} + +func (s *ScraperParser) ParserType() string { + return s.componentType +} + +func (s *ScraperParser) ParserName() string { + return fmt.Sprintf("__%s", s.componentType) +} + +func NewScraperParser(name string) *ScraperParser { + return &ScraperParser{ + componentType: ComponentType(name), + } +} diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go new file mode 100644 index 0000000000..98a8652117 --- /dev/null +++ b/internal/components/single_endpoint.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +var ( + _ ComponentPortParser = &SingleEndpointParser{} + singleEndpointConfigs = []ComponentPortParser{ + NewSinglePortParser("awsxray", 2000), + NewSinglePortParser("carbon", 2003), + NewSinglePortParser("collectd", 8081), + NewSinglePortParser("fluentforward", 8006), + NewSinglePortParser("influxdb", 8086), + NewSinglePortParser("opencensus", 55678, WithAppProtocol(nil)), + NewSinglePortParser("sapm", 7276), + NewSinglePortParser("signalfx", 9943), + NewSinglePortParser("splunk_hec", 8088), + NewSinglePortParser("statsd", 8125, WithProtocol(corev1.ProtocolUDP)), + NewSinglePortParser("tcplog", unsetPort, WithProtocol(corev1.ProtocolTCP)), + NewSinglePortParser("udplog", unsetPort, WithProtocol(corev1.ProtocolUDP)), + NewSinglePortParser("wavefront", 2003), + NewSinglePortParser("zipkin", 9411, WithAppProtocol(&http), WithProtocol(corev1.ProtocolTCP)), + } +) + +// SingleEndpointConfig represents the minimal struct for a given YAML configuration input containing either +// endpoint or listen_address. +type SingleEndpointConfig struct { + Endpoint string `json:"endpoint,omitempty"` + ListenAddress string `json:"listen_address,omitempty"` +} + +func (g *SingleEndpointConfig) GetPortNumOrDefault(logger logr.Logger, p int32) int32 { + num, err := g.GetPortNum() + if err != nil { + logger.V(3).Info("no port set, using default: %d", p) + return p + } + return num +} + +func (g *SingleEndpointConfig) GetPortNum() (int32, error) { + if len(g.Endpoint) > 0 { + return PortFromEndpoint(g.Endpoint) + } else if len(g.ListenAddress) > 0 { + return PortFromEndpoint(g.ListenAddress) + } + return 0, portNotFoundErr +} + +// SingleEndpointParser is a special parser for a generic receiver that has an endpoint or listen_address in its +// configuration. It doesn't self-register and should be created/used directly. +type SingleEndpointParser struct { + name string + + svcPort *corev1.ServicePort +} + +func (s *SingleEndpointParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { + singleEndpointConfig := &SingleEndpointConfig{} + if err := LoadMap[*SingleEndpointConfig](config, singleEndpointConfig); err != nil { + return nil, err + } + if _, err := singleEndpointConfig.GetPortNum(); err != nil && s.svcPort.Port == unsetPort { + logger.WithValues("receiver", s.name).Error(err, "couldn't parse the endpoint's port and no default port set") + return []corev1.ServicePort{}, err + } + + port := singleEndpointConfig.GetPortNumOrDefault(logger, s.svcPort.Port) + s.svcPort.Name = naming.PortName(s.name, port) + return []corev1.ServicePort{ConstructServicePort(s.svcPort, port)}, nil +} + +func (s *SingleEndpointParser) ParserType() string { + return ComponentType(s.name) +} + +func (s *SingleEndpointParser) ParserName() string { + return fmt.Sprintf("__%s", s.name) +} + +func NewSinglePortParser(name string, port int32, opts ...PortBuilderOption) *SingleEndpointParser { + servicePort := &corev1.ServicePort{ + Name: naming.PortName(name, port), + Port: port, + } + for _, opt := range opts { + opt(servicePort) + } + return &SingleEndpointParser{name: name, svcPort: servicePort} +} diff --git a/internal/components/single_endpoint_test.go b/internal/components/single_endpoint_test.go new file mode 100644 index 0000000000..231f6f3481 --- /dev/null +++ b/internal/components/single_endpoint_test.go @@ -0,0 +1,122 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +var logger = logf.Log.WithName("unit-tests") + +func TestParseEndpoint(t *testing.T) { + // prepare + // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser + parser := components.BuilderFor("myreceiver") + + // test + ports, err := parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].Port) +} + +func TestFailedToParseEndpoint(t *testing.T) { + // prepare + // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser + parser := components.BuilderFor("myreceiver") + + // test + ports, err := parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0", + }) + + // verify + assert.Error(t, err) + assert.Len(t, ports, 0) +} + +func TestDownstreamParsers(t *testing.T) { + for _, tt := range []struct { + desc string + receiverName string + parserName string + defaultPort int + }{ + {"zipkin", "zipkin", "__zipkin", 9411}, + {"opencensus", "opencensus", "__opencensus", 55678}, + + // contrib receivers + {"carbon", "carbon", "__carbon", 2003}, + {"collectd", "collectd", "__collectd", 8081}, + {"sapm", "sapm", "__sapm", 7276}, + {"signalfx", "signalfx", "__signalfx", 9943}, + {"wavefront", "wavefront", "__wavefront", 2003}, + {"fluentforward", "fluentforward", "__fluentforward", 8006}, + {"statsd", "statsd", "__statsd", 8125}, + {"influxdb", "influxdb", "__influxdb", 8086}, + {"splunk_hec", "splunk_hec", "__splunk_hec", 8088}, + {"awsxray", "awsxray", "__awsxray", 2000}, + } { + t.Run(tt.receiverName, func(t *testing.T) { + t.Run("builds successfully", func(t *testing.T) { + // test + parser := components.BuilderFor(tt.receiverName) + + // verify + assert.Equal(t, tt.parserName, parser.ParserName()) + }) + + t.Run("assigns the expected port", func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, map[string]interface{}{}) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, tt.defaultPort, ports[0].Port) + assert.Equal(t, naming.PortName(tt.receiverName, int32(tt.defaultPort)), ports[0].Name) + }) + + t.Run("allows port to be overridden", func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0:65535", + }) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 65535, ports[0].Port) + assert.Equal(t, naming.PortName(tt.receiverName, int32(tt.defaultPort)), ports[0].Name) + }) + }) + } +} From 36aeffdab8ef20e35c50928472e6ad1e5d1bd69b Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 20 May 2024 15:32:21 -0400 Subject: [PATCH 2/7] tons o tests --- internal/components/component.go | 5 -- internal/components/component_test.go | 1 + internal/components/multi_endpoint.go | 5 +- internal/components/multi_endpoint_test.go | 25 ++++++ internal/components/scraper_test.go | 98 +++++++++++++++++++++ internal/components/single_endpoint_test.go | 64 ++++++++++---- 6 files changed, 171 insertions(+), 27 deletions(-) create mode 100644 internal/components/scraper_test.go diff --git a/internal/components/component.go b/internal/components/component.go index 861e4d1d3f..41294f1572 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -48,11 +48,6 @@ func WithTargetPort(targetPort int32) PortBuilderOption { servicePort.TargetPort = intstr.FromInt32(targetPort) } } -func WithNodePort(nodePort int32) PortBuilderOption { - return func(servicePort *corev1.ServicePort) { - servicePort.NodePort = nodePort - } -} func WithAppProtocol(proto *string) PortBuilderOption { return func(servicePort *corev1.ServicePort) { diff --git a/internal/components/component_test.go b/internal/components/component_test.go index 3ff4356b71..4671e98087 100644 --- a/internal/components/component_test.go +++ b/internal/components/component_test.go @@ -50,6 +50,7 @@ func TestReceiverParsePortFromEndpoint(t *testing.T) { {"no protocol", "0.0.0.0:1234", 1234, false}, {"just port", ":1234", 1234, false}, {"no port at all", "http://localhost", 0, true}, + {"overflow", "0.0.0.0:2147483648", 0, true}, } { t.Run(tt.desc, func(t *testing.T) { // test diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index 588f79046f..0cf6c05624 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -15,7 +15,6 @@ package components import ( - "errors" "fmt" "github.com/go-logr/logr" @@ -84,7 +83,7 @@ type MultiProtocolEndpointConfig struct { Protocols map[string]*SingleEndpointConfig `json:"protocols"` } -// MultiPortOption allows the setting of options for +// MultiPortOption allows the setting of options for a MultiPortReceiver. type MultiPortOption func(parser *MultiPortReceiver) // MultiPortReceiver is a special parser for components with endpoints for each protocol. @@ -109,7 +108,7 @@ func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]cor } ports = append(ports, ConstructServicePort(defaultSvc, port)) } else { - return nil, errors.New(fmt.Sprintf("unknown protocol set: %s", protocol)) + return nil, fmt.Errorf("unknown protocol set: %s", protocol) } } return ports, nil diff --git a/internal/components/multi_endpoint_test.go b/internal/components/multi_endpoint_test.go index f01ccaa473..550f130c64 100644 --- a/internal/components/multi_endpoint_test.go +++ b/internal/components/multi_endpoint_test.go @@ -329,6 +329,31 @@ func TestMultiEndpointParsers(t *testing.T) { p := components.BuilderFor(tt.receiverName) assert.Equal(t, tt.parserName, p.ParserName()) }) + + t.Run("bad config errors", func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test + _, err := parser.Ports(logger, []interface{}{"junk"}) + + // verify + assert.ErrorContains(t, err, "cannot unmarshal array") + }) + t.Run("good config, unknown protocol", func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test + _, err := parser.Ports(logger, map[string]interface{}{ + "protocols": map[string]interface{}{ + "garbage": map[string]interface{}{}, + }, + }) + + // verify + assert.ErrorContains(t, err, "unknown protocol set: garbage") + }) for _, kase := range tt.cases { t.Run(kase.name, func(t *testing.T) { // prepare diff --git a/internal/components/scraper_test.go b/internal/components/scraper_test.go new file mode 100644 index 0000000000..eb77cee7dd --- /dev/null +++ b/internal/components/scraper_test.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +func TestScraperParsers(t *testing.T) { + for _, tt := range []struct { + receiverName string + parserName string + defaultPort int + }{ + {"prometheus", "__prometheus", 0}, + {"kubeletstats", "__kubeletstats", 0}, + {"sshcheck", "__sshcheck", 0}, + {"cloudfoundry", "__cloudfoundry", 0}, + {"vcenter", "__vcenter", 0}, + {"oracledb", "__oracledb", 0}, + {"snmp", "__snmp", 0}, + {"googlecloudpubsub", "__googlecloudpubsub", 0}, + {"chrony", "__chrony", 0}, + {"jmx", "__jmx", 0}, + {"podman_stats", "__podman_stats", 0}, + {"pulsar", "__pulsar", 0}, + {"docker_stats", "__docker_stats", 0}, + {"aerospike", "__aerospike", 0}, + {"zookeeper", "__zookeeper", 0}, + {"prometheus_simple", "__prometheus_simple", 0}, + {"saphana", "__saphana", 0}, + {"riak", "__riak", 0}, + {"redis", "__redis", 0}, + {"rabbitmq", "__rabbitmq", 0}, + {"purefb", "__purefb", 0}, + {"postgresql", "__postgresql", 0}, + {"nsxt", "__nsxt", 0}, + {"nginx", "__nginx", 0}, + {"mysql", "__mysql", 0}, + {"memcached", "__memcached", 0}, + {"httpcheck", "__httpcheck", 0}, + {"haproxy", "__haproxy", 0}, + {"flinkmetrics", "__flinkmetrics", 0}, + {"couchdb", "__couchdb", 0}, + } { + t.Run(tt.receiverName, func(t *testing.T) { + t.Run("builds successfully", func(t *testing.T) { + // test + parser := components.BuilderFor(tt.receiverName) + + // verify + assert.Equal(t, tt.parserName, parser.ParserName()) + }) + + t.Run("default is nothing", func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, map[string]interface{}{}) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) + + t.Run("always returns nothing", func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test + ports, err := parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0:65535", + }) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) + }) + } +} diff --git a/internal/components/single_endpoint_test.go b/internal/components/single_endpoint_test.go index 231f6f3481..5c325f26e8 100644 --- a/internal/components/single_endpoint_test.go +++ b/internal/components/single_endpoint_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" logf "sigs.k8s.io/controller-runtime/pkg/log" "github.com/open-telemetry/opentelemetry-operator/internal/components" @@ -59,25 +60,28 @@ func TestFailedToParseEndpoint(t *testing.T) { func TestDownstreamParsers(t *testing.T) { for _, tt := range []struct { - desc string - receiverName string - parserName string - defaultPort int + desc string + receiverName string + parserName string + defaultPort int + listenAddrParser bool }{ - {"zipkin", "zipkin", "__zipkin", 9411}, - {"opencensus", "opencensus", "__opencensus", 55678}, + {"zipkin", "zipkin", "__zipkin", 9411, false}, + {"opencensus", "opencensus", "__opencensus", 55678, false}, // contrib receivers - {"carbon", "carbon", "__carbon", 2003}, - {"collectd", "collectd", "__collectd", 8081}, - {"sapm", "sapm", "__sapm", 7276}, - {"signalfx", "signalfx", "__signalfx", 9943}, - {"wavefront", "wavefront", "__wavefront", 2003}, - {"fluentforward", "fluentforward", "__fluentforward", 8006}, - {"statsd", "statsd", "__statsd", 8125}, - {"influxdb", "influxdb", "__influxdb", 8086}, - {"splunk_hec", "splunk_hec", "__splunk_hec", 8088}, - {"awsxray", "awsxray", "__awsxray", 2000}, + {"carbon", "carbon", "__carbon", 2003, false}, + {"collectd", "collectd", "__collectd", 8081, false}, + {"sapm", "sapm", "__sapm", 7276, false}, + {"signalfx", "signalfx", "__signalfx", 9943, false}, + {"wavefront", "wavefront", "__wavefront", 2003, false}, + {"fluentforward", "fluentforward", "__fluentforward", 8006, false}, + {"statsd", "statsd", "__statsd", 8125, false}, + {"influxdb", "influxdb", "__influxdb", 8086, false}, + {"splunk_hec", "splunk_hec", "__splunk_hec", 8088, false}, + {"awsxray", "awsxray", "__awsxray", 2000, false}, + {"tcplog", "tcplog", "__tcplog", 0, true}, + {"udplog", "udplog", "__udplog", 0, true}, } { t.Run(tt.receiverName, func(t *testing.T) { t.Run("builds successfully", func(t *testing.T) { @@ -87,6 +91,16 @@ func TestDownstreamParsers(t *testing.T) { // verify assert.Equal(t, tt.parserName, parser.ParserName()) }) + t.Run("bad config errors", func(t *testing.T) { + // prepare + parser := components.BuilderFor(tt.receiverName) + + // test throwing in pure junk + _, err := parser.Ports(logger, func() {}) + + // verify + assert.ErrorContains(t, err, "unsupported type") + }) t.Run("assigns the expected port", func(t *testing.T) { // prepare @@ -95,6 +109,10 @@ func TestDownstreamParsers(t *testing.T) { // test ports, err := parser.Ports(logger, map[string]interface{}{}) + if tt.defaultPort == 0 { + assert.Len(t, ports, 0) + return + } // verify assert.NoError(t, err) assert.Len(t, ports, 1) @@ -107,9 +125,17 @@ func TestDownstreamParsers(t *testing.T) { parser := components.BuilderFor(tt.receiverName) // test - ports, err := parser.Ports(logger, map[string]interface{}{ - "endpoint": "0.0.0.0:65535", - }) + var ports []corev1.ServicePort + var err error + if tt.listenAddrParser { + ports, err = parser.Ports(logger, map[string]interface{}{ + "listen_address": "0.0.0.0:65535", + }) + } else { + ports, err = parser.Ports(logger, map[string]interface{}{ + "endpoint": "0.0.0.0:65535", + }) + } // verify assert.NoError(t, err) From 0041f08482fd9773b0921e4afbbaee365cde08f9 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 20 May 2024 16:29:56 -0400 Subject: [PATCH 3/7] known receivers --- internal/components/component.go | 35 +------------ internal/components/receivers/helpers.go | 46 +++++++++++++++++ .../{ => receivers}/multi_endpoint.go | 51 ++++++++++--------- .../{ => receivers}/multi_endpoint_test.go | 14 ++--- .../components/{ => receivers}/scraper.go | 10 ++-- .../{ => receivers}/scraper_test.go | 10 ++-- .../{ => receivers}/single_endpoint.go | 34 +++++++------ .../{ => receivers}/single_endpoint_test.go | 16 +++--- 8 files changed, 119 insertions(+), 97 deletions(-) create mode 100644 internal/components/receivers/helpers.go rename internal/components/{ => receivers}/multi_endpoint.go (71%) rename internal/components/{ => receivers}/multi_endpoint_test.go (96%) rename internal/components/{ => receivers}/scraper.go (87%) rename internal/components/{ => receivers}/scraper_test.go (92%) rename internal/components/{ => receivers}/single_endpoint.go (68%) rename internal/components/{ => receivers}/single_endpoint_test.go (92%) diff --git a/internal/components/component.go b/internal/components/component.go index 41294f1572..bb77a85f47 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -31,9 +31,7 @@ const ( ) var ( - portNotFoundErr = errors.New("port should not be empty") - grpc = "grpc" - http = "http" + PortNotFoundErr = errors.New("port should not be empty") ) type PortRetriever interface { @@ -88,7 +86,7 @@ func PortFromEndpoint(endpoint string) (int32, error) { } if port == 0 { - return 0, portNotFoundErr + return 0, PortNotFoundErr } return int32(port), err @@ -105,28 +103,6 @@ type ComponentPortParser interface { ParserName() string } -// registry holds a record of all known receiver parsers. -var registry = make(map[string]ComponentPortParser) - -// Register adds a new parser builder to the list of known builders. -func Register(name string, p ComponentPortParser) { - registry[name] = p -} - -// IsRegistered checks whether a parser is registered with the given name. -func IsRegistered(name string) bool { - _, ok := registry[name] - return ok -} - -// BuilderFor returns a parser builder for the given exporter name. -func BuilderFor(name string) ComponentPortParser { - if parser, ok := registry[ComponentType(name)]; ok { - return parser - } - return NewSinglePortParser(ComponentType(name), unsetPort) -} - func LoadMap[T any](m interface{}, in T) error { // Convert map to JSON bytes yamlData, err := json.Marshal(m) @@ -150,10 +126,3 @@ func ConstructServicePort(current *corev1.ServicePort, port int32) corev1.Servic Protocol: current.Protocol, } } - -func init() { - parsers := append(scraperReceivers, append(singleEndpointConfigs, multiPortReceivers...)...) - for _, parser := range parsers { - Register(parser.ParserType(), parser) - } -} diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go new file mode 100644 index 0000000000..2c065ff118 --- /dev/null +++ b/internal/components/receivers/helpers.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivers + +import "github.com/open-telemetry/opentelemetry-operator/internal/components" + +// registry holds a record of all known receiver parsers. +var registry = make(map[string]components.ComponentPortParser) + +// Register adds a new parser builder to the list of known builders. +func Register(name string, p components.ComponentPortParser) { + registry[name] = p +} + +// IsRegistered checks whether a parser is registered with the given name. +func IsRegistered(name string) bool { + _, ok := registry[name] + return ok +} + +// BuilderFor returns a parser builder for the given exporter name. +func BuilderFor(name string) components.ComponentPortParser { + if parser, ok := registry[components.ComponentType(name)]; ok { + return parser + } + return NewSinglePortParser(components.ComponentType(name), unsetPort) +} + +func init() { + parsers := append(scraperReceivers, append(singleEndpointConfigs, multiPortReceivers...)...) + for _, parser := range parsers { + Register(parser.ParserType(), parser) + } +} diff --git a/internal/components/multi_endpoint.go b/internal/components/receivers/multi_endpoint.go similarity index 71% rename from internal/components/multi_endpoint.go rename to internal/components/receivers/multi_endpoint.go index 0cf6c05624..21355be4cd 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/receivers/multi_endpoint.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package components +package receivers import ( "fmt" @@ -20,58 +20,59 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + "github.com/open-telemetry/opentelemetry-operator/internal/components" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) var ( - _ ComponentPortParser = &MultiPortReceiver{} - multiPortReceivers = []ComponentPortParser{ + _ components.ComponentPortParser = &MultiPortReceiver{} + multiPortReceivers = []components.ComponentPortParser{ NewMultiPortReceiver("otlp", WithPortMapping( "grpc", 4317, - WithAppProtocol(&grpc), - WithTargetPort(4317), + components.WithAppProtocol(&grpc), + components.WithTargetPort(4317), ), WithPortMapping( "http", 4318, - WithAppProtocol(&http), - WithTargetPort(4318), + components.WithAppProtocol(&http), + components.WithTargetPort(4318), ), ), NewMultiPortReceiver("skywalking", WithPortMapping(grpc, 11800, - WithTargetPort(11800), - WithAppProtocol(&grpc), + components.WithTargetPort(11800), + components.WithAppProtocol(&grpc), ), WithPortMapping(http, 12800, - WithTargetPort(12800), - WithAppProtocol(&http), + components.WithTargetPort(12800), + components.WithAppProtocol(&http), )), NewMultiPortReceiver("jaeger", WithPortMapping(grpc, 14250, - WithProtocol(corev1.ProtocolTCP), - WithAppProtocol(&grpc), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&grpc), ), WithPortMapping("thrift_http", 14268, - WithProtocol(corev1.ProtocolTCP), - WithAppProtocol(&http), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&http), ), WithPortMapping("thrift_compact", 6831, - WithProtocol(corev1.ProtocolUDP), + components.WithProtocol(corev1.ProtocolUDP), ), WithPortMapping("thrift_binary", 6832, - WithProtocol(corev1.ProtocolUDP), + components.WithProtocol(corev1.ProtocolUDP), ), ), NewMultiPortReceiver("loki", WithPortMapping(grpc, 9095, - WithTargetPort(9095), - WithAppProtocol(&grpc), + components.WithTargetPort(9095), + components.WithAppProtocol(&grpc), ), WithPortMapping(http, 3100, - WithTargetPort(3100), - WithAppProtocol(&http), + components.WithTargetPort(3100), + components.WithAppProtocol(&http), ), ), } @@ -95,7 +96,7 @@ type MultiPortReceiver struct { func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} - if err := LoadMap[*MultiProtocolEndpointConfig](config, multiProtoEndpointCfg); err != nil { + if err := components.LoadMap[*MultiProtocolEndpointConfig](config, multiProtoEndpointCfg); err != nil { return nil, err } var ports []corev1.ServicePort @@ -106,7 +107,7 @@ func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]cor port = ec.GetPortNumOrDefault(logger, port) defaultSvc.Name = naming.PortName(fmt.Sprintf("%s-%s", m.name, protocol), port) } - ports = append(ports, ConstructServicePort(defaultSvc, port)) + ports = append(ports, components.ConstructServicePort(defaultSvc, port)) } else { return nil, fmt.Errorf("unknown protocol set: %s", protocol) } @@ -115,7 +116,7 @@ func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]cor } func (m *MultiPortReceiver) ParserType() string { - return ComponentType(m.name) + return components.ComponentType(m.name) } func (m *MultiPortReceiver) ParserName() string { @@ -133,7 +134,7 @@ func NewMultiPortReceiver(name string, opts ...MultiPortOption) *MultiPortReceiv return multiReceiver } -func WithPortMapping(name string, port int32, opts ...PortBuilderOption) MultiPortOption { +func WithPortMapping(name string, port int32, opts ...components.PortBuilderOption) MultiPortOption { return func(parser *MultiPortReceiver) { servicePort := &corev1.ServicePort{ Name: naming.PortName(fmt.Sprintf("%s-%s", parser.name, name), port), diff --git a/internal/components/multi_endpoint_test.go b/internal/components/receivers/multi_endpoint_test.go similarity index 96% rename from internal/components/multi_endpoint_test.go rename to internal/components/receivers/multi_endpoint_test.go index 550f130c64..a7db5aab3b 100644 --- a/internal/components/multi_endpoint_test.go +++ b/internal/components/receivers/multi_endpoint_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package components_test +package receivers_test import ( "testing" @@ -21,7 +21,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" - "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" ) var ( @@ -322,17 +322,17 @@ func TestMultiEndpointParsers(t *testing.T) { t.Run(tt.receiverName, func(t *testing.T) { t.Run("self registers", func(t *testing.T) { // verify - assert.True(t, components.IsRegistered(tt.receiverName)) + assert.True(t, receivers.IsRegistered(tt.receiverName)) }) t.Run("is found by name", func(t *testing.T) { - p := components.BuilderFor(tt.receiverName) + p := receivers.BuilderFor(tt.receiverName) assert.Equal(t, tt.parserName, p.ParserName()) }) t.Run("bad config errors", func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test _, err := parser.Ports(logger, []interface{}{"junk"}) @@ -342,7 +342,7 @@ func TestMultiEndpointParsers(t *testing.T) { }) t.Run("good config, unknown protocol", func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test _, err := parser.Ports(logger, map[string]interface{}{ @@ -357,7 +357,7 @@ func TestMultiEndpointParsers(t *testing.T) { for _, kase := range tt.cases { t.Run(kase.name, func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test ports, err := parser.Ports(logger, kase.config) diff --git a/internal/components/scraper.go b/internal/components/receivers/scraper.go similarity index 87% rename from internal/components/scraper.go rename to internal/components/receivers/scraper.go index 4e3bb808ae..533dea6a0d 100644 --- a/internal/components/scraper.go +++ b/internal/components/receivers/scraper.go @@ -12,18 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package components +package receivers import ( "fmt" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" ) var ( - _ ComponentPortParser = &ScraperParser{} - scraperReceivers = []ComponentPortParser{ + _ components.ComponentPortParser = &ScraperParser{} + scraperReceivers = []components.ComponentPortParser{ NewScraperParser("prometheus"), NewScraperParser("kubeletstats"), NewScraperParser("sshcheck"), @@ -75,6 +77,6 @@ func (s *ScraperParser) ParserName() string { func NewScraperParser(name string) *ScraperParser { return &ScraperParser{ - componentType: ComponentType(name), + componentType: components.ComponentType(name), } } diff --git a/internal/components/scraper_test.go b/internal/components/receivers/scraper_test.go similarity index 92% rename from internal/components/scraper_test.go rename to internal/components/receivers/scraper_test.go index eb77cee7dd..3456cbc6ff 100644 --- a/internal/components/scraper_test.go +++ b/internal/components/receivers/scraper_test.go @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package components_test +package receivers_test import ( "testing" "github.com/stretchr/testify/assert" - "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" ) func TestScraperParsers(t *testing.T) { @@ -62,7 +62,7 @@ func TestScraperParsers(t *testing.T) { t.Run(tt.receiverName, func(t *testing.T) { t.Run("builds successfully", func(t *testing.T) { // test - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // verify assert.Equal(t, tt.parserName, parser.ParserName()) @@ -70,7 +70,7 @@ func TestScraperParsers(t *testing.T) { t.Run("default is nothing", func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test ports, err := parser.Ports(logger, map[string]interface{}{}) @@ -82,7 +82,7 @@ func TestScraperParsers(t *testing.T) { t.Run("always returns nothing", func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test ports, err := parser.Ports(logger, map[string]interface{}{ diff --git a/internal/components/single_endpoint.go b/internal/components/receivers/single_endpoint.go similarity index 68% rename from internal/components/single_endpoint.go rename to internal/components/receivers/single_endpoint.go index 98a8652117..59c9439016 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/receivers/single_endpoint.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package components +package receivers import ( "fmt" @@ -20,26 +20,30 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + "github.com/open-telemetry/opentelemetry-operator/internal/components" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) var ( - _ ComponentPortParser = &SingleEndpointParser{} - singleEndpointConfigs = []ComponentPortParser{ + _ components.ComponentPortParser = &SingleEndpointParser{} + grpc = "grpc" + http = "http" + unsetPort int32 = 0 + singleEndpointConfigs = []components.ComponentPortParser{ NewSinglePortParser("awsxray", 2000), NewSinglePortParser("carbon", 2003), NewSinglePortParser("collectd", 8081), NewSinglePortParser("fluentforward", 8006), NewSinglePortParser("influxdb", 8086), - NewSinglePortParser("opencensus", 55678, WithAppProtocol(nil)), + NewSinglePortParser("opencensus", 55678, components.WithAppProtocol(nil)), NewSinglePortParser("sapm", 7276), NewSinglePortParser("signalfx", 9943), NewSinglePortParser("splunk_hec", 8088), - NewSinglePortParser("statsd", 8125, WithProtocol(corev1.ProtocolUDP)), - NewSinglePortParser("tcplog", unsetPort, WithProtocol(corev1.ProtocolTCP)), - NewSinglePortParser("udplog", unsetPort, WithProtocol(corev1.ProtocolUDP)), + NewSinglePortParser("statsd", 8125, components.WithProtocol(corev1.ProtocolUDP)), + NewSinglePortParser("tcplog", unsetPort, components.WithProtocol(corev1.ProtocolTCP)), + NewSinglePortParser("udplog", unsetPort, components.WithProtocol(corev1.ProtocolUDP)), NewSinglePortParser("wavefront", 2003), - NewSinglePortParser("zipkin", 9411, WithAppProtocol(&http), WithProtocol(corev1.ProtocolTCP)), + NewSinglePortParser("zipkin", 9411, components.WithAppProtocol(&http), components.WithProtocol(corev1.ProtocolTCP)), } ) @@ -61,11 +65,11 @@ func (g *SingleEndpointConfig) GetPortNumOrDefault(logger logr.Logger, p int32) func (g *SingleEndpointConfig) GetPortNum() (int32, error) { if len(g.Endpoint) > 0 { - return PortFromEndpoint(g.Endpoint) + return components.PortFromEndpoint(g.Endpoint) } else if len(g.ListenAddress) > 0 { - return PortFromEndpoint(g.ListenAddress) + return components.PortFromEndpoint(g.ListenAddress) } - return 0, portNotFoundErr + return 0, components.PortNotFoundErr } // SingleEndpointParser is a special parser for a generic receiver that has an endpoint or listen_address in its @@ -78,7 +82,7 @@ type SingleEndpointParser struct { func (s *SingleEndpointParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { singleEndpointConfig := &SingleEndpointConfig{} - if err := LoadMap[*SingleEndpointConfig](config, singleEndpointConfig); err != nil { + if err := components.LoadMap[*SingleEndpointConfig](config, singleEndpointConfig); err != nil { return nil, err } if _, err := singleEndpointConfig.GetPortNum(); err != nil && s.svcPort.Port == unsetPort { @@ -88,18 +92,18 @@ func (s *SingleEndpointParser) Ports(logger logr.Logger, config interface{}) ([] port := singleEndpointConfig.GetPortNumOrDefault(logger, s.svcPort.Port) s.svcPort.Name = naming.PortName(s.name, port) - return []corev1.ServicePort{ConstructServicePort(s.svcPort, port)}, nil + return []corev1.ServicePort{components.ConstructServicePort(s.svcPort, port)}, nil } func (s *SingleEndpointParser) ParserType() string { - return ComponentType(s.name) + return components.ComponentType(s.name) } func (s *SingleEndpointParser) ParserName() string { return fmt.Sprintf("__%s", s.name) } -func NewSinglePortParser(name string, port int32, opts ...PortBuilderOption) *SingleEndpointParser { +func NewSinglePortParser(name string, port int32, opts ...components.PortBuilderOption) *SingleEndpointParser { servicePort := &corev1.ServicePort{ Name: naming.PortName(name, port), Port: port, diff --git a/internal/components/single_endpoint_test.go b/internal/components/receivers/single_endpoint_test.go similarity index 92% rename from internal/components/single_endpoint_test.go rename to internal/components/receivers/single_endpoint_test.go index 5c325f26e8..837a4b05e7 100644 --- a/internal/components/single_endpoint_test.go +++ b/internal/components/receivers/single_endpoint_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package components_test +package receivers_test import ( "testing" @@ -21,7 +21,7 @@ import ( corev1 "k8s.io/api/core/v1" logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) @@ -30,7 +30,7 @@ var logger = logf.Log.WithName("unit-tests") func TestParseEndpoint(t *testing.T) { // prepare // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser - parser := components.BuilderFor("myreceiver") + parser := receivers.BuilderFor("myreceiver") // test ports, err := parser.Ports(logger, map[string]interface{}{ @@ -46,7 +46,7 @@ func TestParseEndpoint(t *testing.T) { func TestFailedToParseEndpoint(t *testing.T) { // prepare // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser - parser := components.BuilderFor("myreceiver") + parser := receivers.BuilderFor("myreceiver") // test ports, err := parser.Ports(logger, map[string]interface{}{ @@ -86,14 +86,14 @@ func TestDownstreamParsers(t *testing.T) { t.Run(tt.receiverName, func(t *testing.T) { t.Run("builds successfully", func(t *testing.T) { // test - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // verify assert.Equal(t, tt.parserName, parser.ParserName()) }) t.Run("bad config errors", func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test throwing in pure junk _, err := parser.Ports(logger, func() {}) @@ -104,7 +104,7 @@ func TestDownstreamParsers(t *testing.T) { t.Run("assigns the expected port", func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test ports, err := parser.Ports(logger, map[string]interface{}{}) @@ -122,7 +122,7 @@ func TestDownstreamParsers(t *testing.T) { t.Run("allows port to be overridden", func(t *testing.T) { // prepare - parser := components.BuilderFor(tt.receiverName) + parser := receivers.BuilderFor(tt.receiverName) // test var ports []corev1.ServicePort From 11ed8c2ca5166831d9512129a451004d82e2d97c Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 20 May 2024 16:33:55 -0400 Subject: [PATCH 4/7] lint --- internal/components/component.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/components/component.go b/internal/components/component.go index bb77a85f47..004681c8d1 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -26,10 +26,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) -const ( - unsetPort = 0 -) - var ( PortNotFoundErr = errors.New("port should not be empty") ) From 805cf7e7f0789e487a88b879ed518a6a3563fa5f Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 21 May 2024 11:41:24 -0400 Subject: [PATCH 5/7] redone with many tests --- internal/components/component.go | 5 +- .../{receivers => }/multi_endpoint.go | 65 +--- internal/components/multi_endpoint_test.go | 329 ++++++++++++++++++ internal/components/receivers/helpers.go | 108 +++++- ...est.go => multi_endpoint_receiver_test.go} | 2 +- internal/components/receivers/scraper.go | 34 +- ...st.go => single_endpoint_receiver_test.go} | 0 .../{receivers => }/single_endpoint.go | 40 +-- internal/components/single_endpoint_test.go | 294 ++++++++++++++++ 9 files changed, 749 insertions(+), 128 deletions(-) rename internal/components/{receivers => }/multi_endpoint.go (58%) create mode 100644 internal/components/multi_endpoint_test.go rename internal/components/receivers/{multi_endpoint_test.go => multi_endpoint_receiver_test.go} (99%) rename internal/components/receivers/{single_endpoint_test.go => single_endpoint_receiver_test.go} (100%) rename internal/components/{receivers => }/single_endpoint.go (58%) create mode 100644 internal/components/single_endpoint_test.go diff --git a/internal/components/component.go b/internal/components/component.go index 004681c8d1..aa41522b24 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -27,7 +27,10 @@ import ( ) var ( - PortNotFoundErr = errors.New("port should not be empty") + GrpcProtocol = "grpc" + HttpProtocol = "http" + UnsetPort int32 = 0 + PortNotFoundErr = errors.New("port should not be empty") ) type PortRetriever interface { diff --git a/internal/components/receivers/multi_endpoint.go b/internal/components/multi_endpoint.go similarity index 58% rename from internal/components/receivers/multi_endpoint.go rename to internal/components/multi_endpoint.go index 21355be4cd..02c5cc5c42 100644 --- a/internal/components/receivers/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package receivers +package components import ( "fmt" @@ -20,63 +20,10 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" - "github.com/open-telemetry/opentelemetry-operator/internal/components" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) -var ( - _ components.ComponentPortParser = &MultiPortReceiver{} - multiPortReceivers = []components.ComponentPortParser{ - NewMultiPortReceiver("otlp", - WithPortMapping( - "grpc", - 4317, - components.WithAppProtocol(&grpc), - components.WithTargetPort(4317), - ), WithPortMapping( - "http", - 4318, - components.WithAppProtocol(&http), - components.WithTargetPort(4318), - ), - ), - NewMultiPortReceiver("skywalking", - WithPortMapping(grpc, 11800, - components.WithTargetPort(11800), - components.WithAppProtocol(&grpc), - ), - WithPortMapping(http, 12800, - components.WithTargetPort(12800), - components.WithAppProtocol(&http), - )), - NewMultiPortReceiver("jaeger", - WithPortMapping(grpc, 14250, - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&grpc), - ), - WithPortMapping("thrift_http", 14268, - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&http), - ), - WithPortMapping("thrift_compact", 6831, - components.WithProtocol(corev1.ProtocolUDP), - ), - WithPortMapping("thrift_binary", 6832, - components.WithProtocol(corev1.ProtocolUDP), - ), - ), - NewMultiPortReceiver("loki", - WithPortMapping(grpc, 9095, - components.WithTargetPort(9095), - components.WithAppProtocol(&grpc), - ), - WithPortMapping(http, 3100, - components.WithTargetPort(3100), - components.WithAppProtocol(&http), - ), - ), - } -) +var _ ComponentPortParser = &MultiPortReceiver{} // MultiProtocolEndpointConfig represents the minimal struct for a given YAML configuration input containing a map to // a struct with either endpoint or listen_address. @@ -96,7 +43,7 @@ type MultiPortReceiver struct { func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} - if err := components.LoadMap[*MultiProtocolEndpointConfig](config, multiProtoEndpointCfg); err != nil { + if err := LoadMap[*MultiProtocolEndpointConfig](config, multiProtoEndpointCfg); err != nil { return nil, err } var ports []corev1.ServicePort @@ -107,7 +54,7 @@ func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]cor port = ec.GetPortNumOrDefault(logger, port) defaultSvc.Name = naming.PortName(fmt.Sprintf("%s-%s", m.name, protocol), port) } - ports = append(ports, components.ConstructServicePort(defaultSvc, port)) + ports = append(ports, ConstructServicePort(defaultSvc, port)) } else { return nil, fmt.Errorf("unknown protocol set: %s", protocol) } @@ -116,7 +63,7 @@ func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]cor } func (m *MultiPortReceiver) ParserType() string { - return components.ComponentType(m.name) + return ComponentType(m.name) } func (m *MultiPortReceiver) ParserName() string { @@ -134,7 +81,7 @@ func NewMultiPortReceiver(name string, opts ...MultiPortOption) *MultiPortReceiv return multiReceiver } -func WithPortMapping(name string, port int32, opts ...components.PortBuilderOption) MultiPortOption { +func WithPortMapping(name string, port int32, opts ...PortBuilderOption) MultiPortOption { return func(parser *MultiPortReceiver) { servicePort := &corev1.ServicePort{ Name: naming.PortName(fmt.Sprintf("%s-%s", parser.name, name), port), diff --git a/internal/components/multi_endpoint_test.go b/internal/components/multi_endpoint_test.go new file mode 100644 index 0000000000..8009b8e9f3 --- /dev/null +++ b/internal/components/multi_endpoint_test.go @@ -0,0 +1,329 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +var ( + httpConfig = map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{}, + }, + } + httpAndGrpcConfig = map[string]interface{}{ + "protocols": map[string]interface{}{ + "http": map[string]interface{}{}, + "grpc": map[string]interface{}{}, + }, + } +) + +func TestMultiPortReceiver_ParserName(t *testing.T) { + type fields struct { + name string + opts []components.MultiPortOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "__receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + want: "__receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) + assert.Equalf(t, tt.want, m.ParserName(), "ParserName()") + }) + } +} + +func TestMultiPortReceiver_ParserType(t *testing.T) { + type fields struct { + name string + opts []components.MultiPortOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2/test", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + want: "receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) + assert.Equalf(t, tt.want, m.ParserType(), "ParserType()") + }) + } +} + +func TestMultiPortReceiver_Ports(t *testing.T) { + type fields struct { + name string + opts []components.MultiPortOption + } + type args struct { + config interface{} + } + tests := []struct { + name string + fields fields + args args + want []corev1.ServicePort + wantErr assert.ErrorAssertionFunc + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + args: args{ + config: nil, + }, + want: nil, + wantErr: assert.NoError, + }, + { + name: "single port mapping without builder options", + fields: fields{ + name: "receiver2", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver2-http", + Port: 80, + }, + }, + wantErr: assert.NoError, + }, + { + name: "port mapping with target port", + fields: fields{ + name: "receiver3", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80, components.WithTargetPort(8080)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver3-http", + Port: 80, + TargetPort: intstr.FromInt32(8080), + }, + }, + wantErr: assert.NoError, + }, + { + name: "port mapping with app protocol", + fields: fields{ + name: "receiver4", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80, components.WithAppProtocol(&components.HttpProtocol)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver4-http", + Port: 80, + AppProtocol: &components.HttpProtocol, + }, + }, + wantErr: assert.NoError, + }, + { + name: "port mapping with protocol", + fields: fields{ + name: "receiver5", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80, components.WithProtocol(corev1.ProtocolTCP)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver5-http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + wantErr: assert.NoError, + }, + { + name: "multiple port mappings", + fields: fields{ + name: "receiver6", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + components.WithPortMapping("grpc", 4317, + components.WithTargetPort(4317), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol)), + }, + }, + args: args{ + config: httpAndGrpcConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver6-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + Protocol: corev1.ProtocolTCP, + AppProtocol: &components.GrpcProtocol, + }, + { + Name: "receiver6-http", + Port: 80, + }, + }, + wantErr: assert.NoError, + }, + { + name: "multiple port mappings only one enabled", + fields: fields{ + name: "receiver6", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + components.WithPortMapping("grpc", 4317, + components.WithTargetPort(4317), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol)), + }, + }, + args: args{ + config: httpConfig, + }, + want: []corev1.ServicePort{ + { + Name: "receiver6-http", + Port: 80, + }, + }, + wantErr: assert.NoError, + }, + { + name: "error unmarshalling configuration", + fields: fields{ + name: "receiver1", + opts: nil, + }, + args: args{ + config: "invalid config", // Simulate an invalid config that causes LoadMap to fail + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "error marshaling configuration", + fields: fields{ + name: "receiver1", + opts: nil, + }, + args: args{ + config: func() {}, // Simulate an invalid config that causes LoadMap to fail + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "unknown protocol", + fields: fields{ + name: "receiver2", + opts: []components.MultiPortOption{ + components.WithPortMapping("http", 80), + }, + }, + args: args{ + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "unknown": map[string]interface{}{}, + }, + }, + }, + want: nil, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) + got, err := m.Ports(logr.Discard(), tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { + return + } + assert.ElementsMatchf(t, tt.want, got, "Ports(%v)", tt.args.config) + }) + } +} diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go index 2c065ff118..2848b36514 100644 --- a/internal/components/receivers/helpers.go +++ b/internal/components/receivers/helpers.go @@ -14,7 +14,11 @@ package receivers -import "github.com/open-telemetry/opentelemetry-operator/internal/components" +import ( + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) // registry holds a record of all known receiver parsers. var registry = make(map[string]components.ComponentPortParser) @@ -35,12 +39,108 @@ func BuilderFor(name string) components.ComponentPortParser { if parser, ok := registry[components.ComponentType(name)]; ok { return parser } - return NewSinglePortParser(components.ComponentType(name), unsetPort) + return components.NewSinglePortParser(components.ComponentType(name), components.UnsetPort) } +var ( + componentParsers = []components.ComponentPortParser{ + components.NewMultiPortReceiver("otlp", + components.WithPortMapping( + "grpc", + 4317, + components.WithAppProtocol(&components.GrpcProtocol), + components.WithTargetPort(4317), + ), components.WithPortMapping( + "http", + 4318, + components.WithAppProtocol(&components.HttpProtocol), + components.WithTargetPort(4318), + ), + ), + components.NewMultiPortReceiver("skywalking", + components.WithPortMapping(components.GrpcProtocol, 11800, + components.WithTargetPort(11800), + components.WithAppProtocol(&components.GrpcProtocol), + ), + components.WithPortMapping(components.HttpProtocol, 12800, + components.WithTargetPort(12800), + components.WithAppProtocol(&components.HttpProtocol), + )), + components.NewMultiPortReceiver("jaeger", + components.WithPortMapping(components.GrpcProtocol, 14250, + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol), + ), + components.WithPortMapping("thrift_http", 14268, + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.HttpProtocol), + ), + components.WithPortMapping("thrift_compact", 6831, + components.WithProtocol(corev1.ProtocolUDP), + ), + components.WithPortMapping("thrift_binary", 6832, + components.WithProtocol(corev1.ProtocolUDP), + ), + ), + components.NewMultiPortReceiver("loki", + components.WithPortMapping(components.GrpcProtocol, 9095, + components.WithTargetPort(9095), + components.WithAppProtocol(&components.GrpcProtocol), + ), + components.WithPortMapping(components.HttpProtocol, 3100, + components.WithTargetPort(3100), + components.WithAppProtocol(&components.HttpProtocol), + ), + ), + components.NewSinglePortParser("awsxray", 2000), + components.NewSinglePortParser("carbon", 2003), + components.NewSinglePortParser("collectd", 8081), + components.NewSinglePortParser("fluentforward", 8006), + components.NewSinglePortParser("influxdb", 8086), + components.NewSinglePortParser("opencensus", 55678, components.WithAppProtocol(nil)), + components.NewSinglePortParser("sapm", 7276), + components.NewSinglePortParser("signalfx", 9943), + components.NewSinglePortParser("splunk_hec", 8088), + components.NewSinglePortParser("statsd", 8125, components.WithProtocol(corev1.ProtocolUDP)), + components.NewSinglePortParser("tcplog", components.UnsetPort, components.WithProtocol(corev1.ProtocolTCP)), + components.NewSinglePortParser("udplog", components.UnsetPort, components.WithProtocol(corev1.ProtocolUDP)), + components.NewSinglePortParser("wavefront", 2003), + components.NewSinglePortParser("zipkin", 9411, components.WithAppProtocol(&components.HttpProtocol), components.WithProtocol(corev1.ProtocolTCP)), + NewScraperParser("prometheus"), + NewScraperParser("kubeletstats"), + NewScraperParser("sshcheck"), + NewScraperParser("cloudfoundry"), + NewScraperParser("vcenter"), + NewScraperParser("oracledb"), + NewScraperParser("snmp"), + NewScraperParser("googlecloudpubsub"), + NewScraperParser("chrony"), + NewScraperParser("jmx"), + NewScraperParser("podman_stats"), + NewScraperParser("pulsar"), + NewScraperParser("docker_stats"), + NewScraperParser("aerospike"), + NewScraperParser("zookeeper"), + NewScraperParser("prometheus_simple"), + NewScraperParser("saphana"), + NewScraperParser("riak"), + NewScraperParser("redis"), + NewScraperParser("rabbitmq"), + NewScraperParser("purefb"), + NewScraperParser("postgresql"), + NewScraperParser("nsxt"), + NewScraperParser("nginx"), + NewScraperParser("mysql"), + NewScraperParser("memcached"), + NewScraperParser("httpcheck"), + NewScraperParser("haproxy"), + NewScraperParser("flinkmetrics"), + NewScraperParser("couchdb"), + } +) + func init() { - parsers := append(scraperReceivers, append(singleEndpointConfigs, multiPortReceivers...)...) - for _, parser := range parsers { + for _, parser := range componentParsers { Register(parser.ParserType(), parser) } } diff --git a/internal/components/receivers/multi_endpoint_test.go b/internal/components/receivers/multi_endpoint_receiver_test.go similarity index 99% rename from internal/components/receivers/multi_endpoint_test.go rename to internal/components/receivers/multi_endpoint_receiver_test.go index a7db5aab3b..8126c8dcad 100644 --- a/internal/components/receivers/multi_endpoint_test.go +++ b/internal/components/receivers/multi_endpoint_receiver_test.go @@ -29,7 +29,7 @@ var ( http = "http" ) -func TestMultiEndpointParsers(t *testing.T) { +func TestMultiEndpointReceiverParsers(t *testing.T) { type testCase struct { name string config interface{} diff --git a/internal/components/receivers/scraper.go b/internal/components/receivers/scraper.go index 533dea6a0d..8f01e95c3a 100644 --- a/internal/components/receivers/scraper.go +++ b/internal/components/receivers/scraper.go @@ -24,39 +24,7 @@ import ( ) var ( - _ components.ComponentPortParser = &ScraperParser{} - scraperReceivers = []components.ComponentPortParser{ - NewScraperParser("prometheus"), - NewScraperParser("kubeletstats"), - NewScraperParser("sshcheck"), - NewScraperParser("cloudfoundry"), - NewScraperParser("vcenter"), - NewScraperParser("oracledb"), - NewScraperParser("snmp"), - NewScraperParser("googlecloudpubsub"), - NewScraperParser("chrony"), - NewScraperParser("jmx"), - NewScraperParser("podman_stats"), - NewScraperParser("pulsar"), - NewScraperParser("docker_stats"), - NewScraperParser("aerospike"), - NewScraperParser("zookeeper"), - NewScraperParser("prometheus_simple"), - NewScraperParser("saphana"), - NewScraperParser("riak"), - NewScraperParser("redis"), - NewScraperParser("rabbitmq"), - NewScraperParser("purefb"), - NewScraperParser("postgresql"), - NewScraperParser("nsxt"), - NewScraperParser("nginx"), - NewScraperParser("mysql"), - NewScraperParser("memcached"), - NewScraperParser("httpcheck"), - NewScraperParser("haproxy"), - NewScraperParser("flinkmetrics"), - NewScraperParser("couchdb"), - } + _ components.ComponentPortParser = &ScraperParser{} ) type ScraperParser struct { diff --git a/internal/components/receivers/single_endpoint_test.go b/internal/components/receivers/single_endpoint_receiver_test.go similarity index 100% rename from internal/components/receivers/single_endpoint_test.go rename to internal/components/receivers/single_endpoint_receiver_test.go diff --git a/internal/components/receivers/single_endpoint.go b/internal/components/single_endpoint.go similarity index 58% rename from internal/components/receivers/single_endpoint.go rename to internal/components/single_endpoint.go index 59c9439016..5bf4460c6e 100644 --- a/internal/components/receivers/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package receivers +package components import ( "fmt" @@ -20,31 +20,11 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" - "github.com/open-telemetry/opentelemetry-operator/internal/components" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) var ( - _ components.ComponentPortParser = &SingleEndpointParser{} - grpc = "grpc" - http = "http" - unsetPort int32 = 0 - singleEndpointConfigs = []components.ComponentPortParser{ - NewSinglePortParser("awsxray", 2000), - NewSinglePortParser("carbon", 2003), - NewSinglePortParser("collectd", 8081), - NewSinglePortParser("fluentforward", 8006), - NewSinglePortParser("influxdb", 8086), - NewSinglePortParser("opencensus", 55678, components.WithAppProtocol(nil)), - NewSinglePortParser("sapm", 7276), - NewSinglePortParser("signalfx", 9943), - NewSinglePortParser("splunk_hec", 8088), - NewSinglePortParser("statsd", 8125, components.WithProtocol(corev1.ProtocolUDP)), - NewSinglePortParser("tcplog", unsetPort, components.WithProtocol(corev1.ProtocolTCP)), - NewSinglePortParser("udplog", unsetPort, components.WithProtocol(corev1.ProtocolUDP)), - NewSinglePortParser("wavefront", 2003), - NewSinglePortParser("zipkin", 9411, components.WithAppProtocol(&http), components.WithProtocol(corev1.ProtocolTCP)), - } + _ ComponentPortParser = &SingleEndpointParser{} ) // SingleEndpointConfig represents the minimal struct for a given YAML configuration input containing either @@ -65,11 +45,11 @@ func (g *SingleEndpointConfig) GetPortNumOrDefault(logger logr.Logger, p int32) func (g *SingleEndpointConfig) GetPortNum() (int32, error) { if len(g.Endpoint) > 0 { - return components.PortFromEndpoint(g.Endpoint) + return PortFromEndpoint(g.Endpoint) } else if len(g.ListenAddress) > 0 { - return components.PortFromEndpoint(g.ListenAddress) + return PortFromEndpoint(g.ListenAddress) } - return 0, components.PortNotFoundErr + return 0, PortNotFoundErr } // SingleEndpointParser is a special parser for a generic receiver that has an endpoint or listen_address in its @@ -82,28 +62,28 @@ type SingleEndpointParser struct { func (s *SingleEndpointParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { singleEndpointConfig := &SingleEndpointConfig{} - if err := components.LoadMap[*SingleEndpointConfig](config, singleEndpointConfig); err != nil { + if err := LoadMap[*SingleEndpointConfig](config, singleEndpointConfig); err != nil { return nil, err } - if _, err := singleEndpointConfig.GetPortNum(); err != nil && s.svcPort.Port == unsetPort { + if _, err := singleEndpointConfig.GetPortNum(); err != nil && s.svcPort.Port == UnsetPort { logger.WithValues("receiver", s.name).Error(err, "couldn't parse the endpoint's port and no default port set") return []corev1.ServicePort{}, err } port := singleEndpointConfig.GetPortNumOrDefault(logger, s.svcPort.Port) s.svcPort.Name = naming.PortName(s.name, port) - return []corev1.ServicePort{components.ConstructServicePort(s.svcPort, port)}, nil + return []corev1.ServicePort{ConstructServicePort(s.svcPort, port)}, nil } func (s *SingleEndpointParser) ParserType() string { - return components.ComponentType(s.name) + return ComponentType(s.name) } func (s *SingleEndpointParser) ParserName() string { return fmt.Sprintf("__%s", s.name) } -func NewSinglePortParser(name string, port int32, opts ...components.PortBuilderOption) *SingleEndpointParser { +func NewSinglePortParser(name string, port int32, opts ...PortBuilderOption) *SingleEndpointParser { servicePort := &corev1.ServicePort{ Name: naming.PortName(name, port), Port: port, diff --git a/internal/components/single_endpoint_test.go b/internal/components/single_endpoint_test.go new file mode 100644 index 0000000000..b0efdb1c90 --- /dev/null +++ b/internal/components/single_endpoint_test.go @@ -0,0 +1,294 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +func TestSingleEndpointConfig_GetPortNumOrDefault(t *testing.T) { + type fields struct { + Endpoint string + ListenAddress string + } + type args struct { + p int32 + } + tests := []struct { + name string + fields fields + args args + want int32 + }{ + { + name: "Test with valid endpoint", + fields: fields{ + Endpoint: "example.com:8080", + ListenAddress: "", + }, + args: args{ + p: 9000, + }, + want: 8080, + }, + { + name: "Test with valid listen address", + fields: fields{ + Endpoint: "", + ListenAddress: "0.0.0.0:9090", + }, + args: args{ + p: 9000, + }, + want: 9090, + }, + { + name: "Test with invalid configuration (no endpoint or listen address)", + fields: fields{ + Endpoint: "", + ListenAddress: "", + }, + args: args{ + p: 9000, + }, + want: 9000, // Should return default port + }, + { + name: "Test with invalid endpoint format", + fields: fields{ + Endpoint: "invalid_endpoint", + ListenAddress: "", + }, + args: args{ + p: 9000, + }, + want: 9000, // Should return default port + }, + { + name: "Test with invalid listen address format", + fields: fields{ + Endpoint: "", + ListenAddress: "invalid_listen_address", + }, + args: args{ + p: 9000, + }, + want: 9000, // Should return default port + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := &components.SingleEndpointConfig{ + Endpoint: tt.fields.Endpoint, + ListenAddress: tt.fields.ListenAddress, + } + assert.Equalf(t, tt.want, g.GetPortNumOrDefault(logr.Discard(), tt.args.p), "GetPortNumOrDefault(%v)", tt.args.p) + }) + } +} + +func TestSingleEndpointParser_ParserName(t *testing.T) { + type fields struct { + name string + port int32 + opts []components.PortBuilderOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "__receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2", + opts: []components.PortBuilderOption{ + components.WithTargetPort(8080), + }, + }, + want: "__receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + assert.Equalf(t, tt.want, s.ParserName(), "ParserName()") + }) + } +} + +func TestSingleEndpointParser_ParserType(t *testing.T) { + type fields struct { + name string + port int32 + opts []components.PortBuilderOption + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "no options", + fields: fields{ + name: "receiver1", + opts: nil, + }, + want: "receiver1", + }, + { + name: "with port mapping without builder options", + fields: fields{ + name: "receiver2/test", + opts: []components.PortBuilderOption{ + components.WithTargetPort(80), + }, + }, + want: "receiver2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + assert.Equalf(t, tt.want, s.ParserType(), "ParserType()") + }) + } +} + +func TestSingleEndpointParser_Ports(t *testing.T) { + type fields struct { + name string + port int32 + opts []components.PortBuilderOption + } + type args struct { + config interface{} + } + tests := []struct { + name string + fields fields + args args + want []corev1.ServicePort + wantErr assert.ErrorAssertionFunc + }{ + { + name: "ValidConfigWithPort", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: map[string]interface{}{ + "port": 8080, + }, + }, + want: []corev1.ServicePort{ + {Name: "testparser", Port: 8080}, + }, + wantErr: assert.NoError, + }, + { + name: "ValidConfigWithDefaultPort", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: map[string]interface{}{}, + }, + want: []corev1.ServicePort{ + {Name: "testparser", Port: 8080}, + }, + wantErr: assert.NoError, + }, + { + name: "ConfigWithFixins", + fields: fields{ + name: "testparser", + port: 8080, + opts: []components.PortBuilderOption{ + components.WithTargetPort(4317), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol), + }, + }, + args: args{ + config: map[string]interface{}{}, + }, + want: []corev1.ServicePort{ + { + Name: "testparser", + Port: 8080, + TargetPort: intstr.FromInt32(4317), + Protocol: corev1.ProtocolTCP, + AppProtocol: &components.GrpcProtocol, + }, + }, + wantErr: assert.NoError, + }, + { + name: "InvalidConfigMissingPort", + fields: fields{ + name: "testparser", + port: 0, + }, + args: args{ + config: map[string]interface{}{ + "endpoint": "garbageeeee", + }, + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "ErrorParsingConfig", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: "invalid config", + }, + want: nil, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + got, err := s.Ports(logr.Discard(), tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { + return + } + assert.ElementsMatchf(t, tt.want, got, "Ports(%v)", tt.args.config) + }) + } +} From c3e4e61620cf1fad655c63a179749f25ecb03fdd Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 30 May 2024 11:04:49 -0400 Subject: [PATCH 6/7] no need for json, mapstructure only --- internal/components/component.go | 30 ++++++------------- internal/components/multi_endpoint.go | 5 ++-- .../receivers/multi_endpoint_receiver_test.go | 2 +- .../single_endpoint_receiver_test.go | 2 +- internal/components/single_endpoint.go | 7 +++-- 5 files changed, 18 insertions(+), 28 deletions(-) diff --git a/internal/components/component.go b/internal/components/component.go index aa41522b24..cb1838c9e1 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -15,7 +15,6 @@ package components import ( - "encoding/json" "errors" "regexp" "strconv" @@ -38,7 +37,7 @@ type PortRetriever interface { GetPortNumOrDefault(logr.Logger, int32) int32 } -type PortBuilderOption func(portBuilder *corev1.ServicePort) +type PortBuilderOption func(*corev1.ServicePort) func WithTargetPort(targetPort int32) PortBuilderOption { return func(servicePort *corev1.ServicePort) { @@ -58,15 +57,15 @@ func WithProtocol(proto corev1.Protocol) PortBuilderOption { } } +// ComponentType returns the type for a given component name. +// components have a name like: +// - mycomponent/custom +// - mycomponent +// we extract the "mycomponent" part and see if we have a parser for the component func ComponentType(name string) string { - // components have a name like: - // - mycomponent/custom - // - mycomponent - // we extract the "mycomponent" part and see if we have a parser for the component if strings.Contains(name, "/") { return name[:strings.Index(name, "/")] } - return name } @@ -77,7 +76,9 @@ func PortFromEndpoint(endpoint string) (int32, error) { r := regexp.MustCompile(":[0-9]+") if r.MatchString(endpoint) { - port, err = strconv.ParseInt(strings.Replace(r.FindString(endpoint), ":", "", -1), 10, 32) + portStr := r.FindString(endpoint) + cleanedPortStr := strings.Replace(portStr, ":", "", -1) + port, err = strconv.ParseInt(cleanedPortStr, 10, 32) if err != nil { return 0, err @@ -102,19 +103,6 @@ type ComponentPortParser interface { ParserName() string } -func LoadMap[T any](m interface{}, in T) error { - // Convert map to JSON bytes - yamlData, err := json.Marshal(m) - if err != nil { - return err - } - // Unmarshal YAML into the provided struct - if err := json.Unmarshal(yamlData, in); err != nil { - return err - } - return nil -} - func ConstructServicePort(current *corev1.ServicePort, port int32) corev1.ServicePort { return corev1.ServicePort{ Name: current.Name, diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index 02c5cc5c42..304d92d521 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-operator/internal/naming" @@ -28,7 +29,7 @@ var _ ComponentPortParser = &MultiPortReceiver{} // MultiProtocolEndpointConfig represents the minimal struct for a given YAML configuration input containing a map to // a struct with either endpoint or listen_address. type MultiProtocolEndpointConfig struct { - Protocols map[string]*SingleEndpointConfig `json:"protocols"` + Protocols map[string]*SingleEndpointConfig `mapstructure:"protocols"` } // MultiPortOption allows the setting of options for a MultiPortReceiver. @@ -43,7 +44,7 @@ type MultiPortReceiver struct { func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} - if err := LoadMap[*MultiProtocolEndpointConfig](config, multiProtoEndpointCfg); err != nil { + if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { return nil, err } var ports []corev1.ServicePort diff --git a/internal/components/receivers/multi_endpoint_receiver_test.go b/internal/components/receivers/multi_endpoint_receiver_test.go index 8126c8dcad..dde04b763f 100644 --- a/internal/components/receivers/multi_endpoint_receiver_test.go +++ b/internal/components/receivers/multi_endpoint_receiver_test.go @@ -338,7 +338,7 @@ func TestMultiEndpointReceiverParsers(t *testing.T) { _, err := parser.Ports(logger, []interface{}{"junk"}) // verify - assert.ErrorContains(t, err, "cannot unmarshal array") + assert.ErrorContains(t, err, "expected a map, got 'slice'") }) t.Run("good config, unknown protocol", func(t *testing.T) { // prepare diff --git a/internal/components/receivers/single_endpoint_receiver_test.go b/internal/components/receivers/single_endpoint_receiver_test.go index 837a4b05e7..f06353ca90 100644 --- a/internal/components/receivers/single_endpoint_receiver_test.go +++ b/internal/components/receivers/single_endpoint_receiver_test.go @@ -99,7 +99,7 @@ func TestDownstreamParsers(t *testing.T) { _, err := parser.Ports(logger, func() {}) // verify - assert.ErrorContains(t, err, "unsupported type") + assert.ErrorContains(t, err, "expected a map, got 'func'") }) t.Run("assigns the expected port", func(t *testing.T) { diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index 5bf4460c6e..f7de2b7aaa 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-operator/internal/naming" @@ -30,8 +31,8 @@ var ( // SingleEndpointConfig represents the minimal struct for a given YAML configuration input containing either // endpoint or listen_address. type SingleEndpointConfig struct { - Endpoint string `json:"endpoint,omitempty"` - ListenAddress string `json:"listen_address,omitempty"` + Endpoint string `mapstructure:"endpoint,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty"` } func (g *SingleEndpointConfig) GetPortNumOrDefault(logger logr.Logger, p int32) int32 { @@ -62,7 +63,7 @@ type SingleEndpointParser struct { func (s *SingleEndpointParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { singleEndpointConfig := &SingleEndpointConfig{} - if err := LoadMap[*SingleEndpointConfig](config, singleEndpointConfig); err != nil { + if err := mapstructure.Decode(config, singleEndpointConfig); err != nil { return nil, err } if _, err := singleEndpointConfig.GetPortNum(); err != nil && s.svcPort.Port == UnsetPort { From 5bd7d3fe89c9a0190d4ed1c86b946620f6047f7d Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Fri, 31 May 2024 10:49:31 -0400 Subject: [PATCH 7/7] lint prob --- internal/components/component.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/components/component.go b/internal/components/component.go index cb1838c9e1..e704c39e25 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -61,7 +61,7 @@ func WithProtocol(proto corev1.Protocol) PortBuilderOption { // components have a name like: // - mycomponent/custom // - mycomponent -// we extract the "mycomponent" part and see if we have a parser for the component +// we extract the "mycomponent" part and see if we have a parser for the component. func ComponentType(name string) string { if strings.Contains(name, "/") { return name[:strings.Index(name, "/")]