From 1b921c828a1f40c609ac7e7fe6618572acb50c35 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 16 May 2023 21:53:26 -0700 Subject: [PATCH] [receivercreator] add support for logs and traces (#19641) --- ...nd-traces-support-to-receiver-creator.yaml | 16 + receiver/receivercreator/README.md | 13 +- receiver/receivercreator/config_test.go | 56 +++ receiver/receivercreator/consumer.go | 134 +++++++ receiver/receivercreator/consumer_test.go | 351 ++++++++++++++++++ receiver/receivercreator/factory.go | 72 +++- receiver/receivercreator/factory_test.go | 49 ++- receiver/receivercreator/go.mod | 3 + .../internal/metadata/generated_status.go | 2 + receiver/receivercreator/metadata.yaml | 2 +- receiver/receivercreator/observerhandler.go | 40 +- .../receivercreator/observerhandler_test.go | 267 ++++++++++++- receiver/receivercreator/receiver.go | 60 ++- receiver/receivercreator/receiver_test.go | 7 +- receiver/receivercreator/resourceenhancer.go | 91 ----- .../receivercreator/resourceenhancer_test.go | 257 ------------- receiver/receivercreator/rules_test.go | 2 +- receiver/receivercreator/runner.go | 130 ++++++- receiver/receivercreator/runner_test.go | 4 +- 19 files changed, 1128 insertions(+), 428 deletions(-) create mode 100644 .chloggen/add-log-and-traces-support-to-receiver-creator.yaml create mode 100644 receiver/receivercreator/consumer.go create mode 100644 receiver/receivercreator/consumer_test.go delete mode 100644 receiver/receivercreator/resourceenhancer.go delete mode 100644 receiver/receivercreator/resourceenhancer_test.go diff --git a/.chloggen/add-log-and-traces-support-to-receiver-creator.yaml b/.chloggen/add-log-and-traces-support-to-receiver-creator.yaml new file mode 100644 index 000000000000..11019ff3aee5 --- /dev/null +++ b/.chloggen/add-log-and-traces-support-to-receiver-creator.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receivercreator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add logs and traces support to receivercreator + +# One or more tracking issues related to the change +issues: [19205, 19206] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/receivercreator/README.md b/receiver/receivercreator/README.md index 467cbf790b4c..aaa8a399584b 100644 --- a/receiver/receivercreator/README.md +++ b/receiver/receivercreator/README.md @@ -3,9 +3,11 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: metrics | +| Stability | [alpha]: logs, traces | +| | [beta]: metrics | | Distributions | [contrib], [splunk] | +[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [splunk]: https://github.com/signalfx/splunk-otel-collector @@ -19,6 +21,10 @@ endpoints that you may be interested in. The configured rules will be evaluated for each endpoint discovered. If the rule evaluates to true then the receiver for that rule will be started against the matched endpoint. +If you use the receiver creator in multiple pipelines of differing telemetry types, +but a given dynamically instantiated receiver doesn't support one of the pipeline's type, +it will effectively lead to a logged no-op that won't cause a collector service failure. + ## Configuration **watch_observers** @@ -80,7 +86,7 @@ resource_attributes: : ``` -This setting controls what resource attributes are set on metrics emitted from the created receiver. These attributes can be set from [values in the endpoint](#rule-expressions) that was matched by the `rule`. These attributes vary based on the endpoint type. These defaults can be disabled by setting the attribute to be removed to an empty value. Note that the values can be dynamic and processed the same as in `config`. +This setting controls what resource attributes are set on telemetry emitted from the created receiver. These attributes can be set from [values in the endpoint](#rule-expressions) that was matched by the `rule`. These attributes vary based on the endpoint type. These defaults can be disabled by setting the attribute to be removed to an empty value. Note that the values can be dynamic and processed the same as in `config`. Note that the backticks below are not typos--they indicate the value is set dynamically. @@ -134,7 +140,7 @@ Similar to the per-endpoint type `resource_attributes` described above but for i ## Rule Expressions -Each rule must start with `type == ("pod"|"port"|"hostport"|"container") &&` such that the rule matches +Each rule must start with `type == ("pod"|"port"|"hostport"|"container"|"k8s.node") &&` such that the rule matches only one endpoint type. Depending on the type of endpoint the rule is targeting it will have different variables available. @@ -297,4 +303,3 @@ service: The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). - diff --git a/receiver/receivercreator/config_test.go b/receiver/receivercreator/config_test.go index 6d758a254570..d99016450de6 100644 --- a/receiver/receivercreator/config_test.go +++ b/receiver/receivercreator/config_test.go @@ -80,6 +80,10 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), expected: createDefaultConfig(), }, + { + id: component.NewIDWithName("receiver_creator", ""), + expected: createDefaultConfig(), + }, { id: component.NewIDWithName(metadata.Type, "1"), expected: &Config{ @@ -176,7 +180,9 @@ type nopWithEndpointFactory struct { type nopWithEndpointReceiver struct { mockComponent + consumer.Logs consumer.Metrics + consumer.Traces rcvr.CreateSettings cfg component.Config } @@ -192,6 +198,18 @@ type mockComponent struct { component.ShutdownFunc } +func (*nopWithEndpointFactory) CreateLogsReceiver( + _ context.Context, + rcs rcvr.CreateSettings, + cfg component.Config, + nextConsumer consumer.Logs) (rcvr.Logs, error) { + return &nopWithEndpointReceiver{ + Logs: nextConsumer, + CreateSettings: rcs, + cfg: cfg, + }, nil +} + func (*nopWithEndpointFactory) CreateMetricsReceiver( _ context.Context, rcs rcvr.CreateSettings, @@ -204,6 +222,18 @@ func (*nopWithEndpointFactory) CreateMetricsReceiver( }, nil } +func (*nopWithEndpointFactory) CreateTracesReceiver( + _ context.Context, + rcs rcvr.CreateSettings, + cfg component.Config, + nextConsumer consumer.Traces) (rcvr.Traces, error) { + return &nopWithEndpointReceiver{ + Traces: nextConsumer, + CreateSettings: rcs, + cfg: cfg, + }, nil +} + type nopWithoutEndpointConfig struct { NotEndpoint string `mapstructure:"not_endpoint"` IntField int `mapstructure:"int_field"` @@ -215,7 +245,9 @@ type nopWithoutEndpointFactory struct { type nopWithoutEndpointReceiver struct { mockComponent + consumer.Logs consumer.Metrics + consumer.Traces rcvr.CreateSettings cfg component.Config } @@ -226,6 +258,18 @@ func (*nopWithoutEndpointFactory) CreateDefaultConfig() component.Config { } } +func (*nopWithoutEndpointFactory) CreateLogsReceiver( + _ context.Context, + rcs rcvr.CreateSettings, + cfg component.Config, + nextConsumer consumer.Logs) (rcvr.Logs, error) { + return &nopWithoutEndpointReceiver{ + Logs: nextConsumer, + CreateSettings: rcs, + cfg: cfg, + }, nil +} + func (*nopWithoutEndpointFactory) CreateMetricsReceiver( _ context.Context, rcs rcvr.CreateSettings, @@ -237,3 +281,15 @@ func (*nopWithoutEndpointFactory) CreateMetricsReceiver( cfg: cfg, }, nil } + +func (*nopWithoutEndpointFactory) CreateTracesReceiver( + _ context.Context, + rcs rcvr.CreateSettings, + cfg component.Config, + nextConsumer consumer.Traces) (rcvr.Traces, error) { + return &nopWithoutEndpointReceiver{ + Traces: nextConsumer, + CreateSettings: rcs, + cfg: cfg, + }, nil +} diff --git a/receiver/receivercreator/consumer.go b/receiver/receivercreator/consumer.go new file mode 100644 index 000000000000..7461676e0e16 --- /dev/null +++ b/receiver/receivercreator/consumer.go @@ -0,0 +1,134 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +var _ consumer.Logs = (*enhancingConsumer)(nil) +var _ consumer.Metrics = (*enhancingConsumer)(nil) +var _ consumer.Traces = (*enhancingConsumer)(nil) + +// enhancingConsumer adds additional resource attributes from the given endpoint environment before passing the +// telemetry to its next consumers. The added attributes vary based on the type of the endpoint. +type enhancingConsumer struct { + logs consumer.Logs + metrics consumer.Metrics + traces consumer.Traces + attrs map[string]string +} + +func newEnhancingConsumer( + resources resourceAttributes, + receiverAttributes map[string]string, + env observer.EndpointEnv, + endpoint observer.Endpoint, + nextLogs consumer.Logs, + nextMetrics consumer.Metrics, + nextTraces consumer.Traces, +) (*enhancingConsumer, error) { + attrs := map[string]string{} + + for _, resource := range []map[string]string{resources[endpoint.Details.Type()], receiverAttributes} { + // Precompute values that will be inserted for each resource object passed through. + for attr, expr := range resource { + // If the attribute value is empty this signals to delete existing + if expr == "" { + delete(attrs, attr) + continue + } + + res, err := evalBackticksInConfigValue(expr, env) + if err != nil { + return nil, fmt.Errorf("failed processing resource attribute %q for endpoint %v: %w", attr, endpoint.ID, err) + } + + val := fmt.Sprint(res) + if val != "" { + attrs[attr] = val + } + } + } + + ec := &enhancingConsumer{attrs: attrs} + if nextLogs != nil { + ec.logs = nextLogs + } + if nextMetrics != nil { + ec.metrics = nextMetrics + } + if nextTraces != nil { + ec.traces = nextTraces + } + return ec, nil +} + +func (*enhancingConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func (ec *enhancingConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + if ec.logs == nil { + return fmt.Errorf("no log consumer available") + } + rl := ld.ResourceLogs() + for i := 0; i < rl.Len(); i++ { + ec.putAttrs(rl.At(i).Resource().Attributes()) + } + + return ec.logs.ConsumeLogs(ctx, ld) +} + +func (ec *enhancingConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if ec.metrics == nil { + return fmt.Errorf("no metric consumer available") + } + rm := md.ResourceMetrics() + for i := 0; i < rm.Len(); i++ { + ec.putAttrs(rm.At(i).Resource().Attributes()) + } + + return ec.metrics.ConsumeMetrics(ctx, md) +} + +func (ec *enhancingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + if ec.traces == nil { + return fmt.Errorf("no trace consumer available") + } + rs := td.ResourceSpans() + for i := 0; i < rs.Len(); i++ { + ec.putAttrs(rs.At(i).Resource().Attributes()) + } + + return ec.traces.ConsumeTraces(ctx, td) +} + +func (ec *enhancingConsumer) putAttrs(attrs pcommon.Map) { + for attr, val := range ec.attrs { + if _, found := attrs.Get(attr); !found { + attrs.PutStr(attr, val) + } + } +} diff --git a/receiver/receivercreator/consumer_test.go b/receiver/receivercreator/consumer_test.go new file mode 100644 index 000000000000..98423de18f6b --- /dev/null +++ b/receiver/receivercreator/consumer_test.go @@ -0,0 +1,351 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" +) + +func TestNewEnhancingConsumer(t *testing.T) { + podEnv, err := podEndpoint.Env() + require.NoError(t, err) + portEnv, err := portEndpoint.Env() + require.NoError(t, err) + cntrEnv, err := containerEndpoint.Env() + require.NoError(t, err) + + cfg := createDefaultConfig().(*Config) + type args struct { + resources resourceAttributes + resourceAttributes map[string]string + env observer.EndpointEnv + endpoint observer.Endpoint + nextLogs consumer.Logs + nextMetrics consumer.Metrics + nextTraces consumer.Traces + } + tests := []struct { + name string + args args + want *enhancingConsumer + expectedError string + }{ + { + name: "pod endpoint", + args: args{ + resources: cfg.ResourceAttributes, + env: podEnv, + endpoint: podEndpoint, + nextLogs: &consumertest.LogsSink{}, + nextMetrics: &consumertest.MetricsSink{}, + nextTraces: &consumertest.TracesSink{}, + }, + want: &enhancingConsumer{ + logs: &consumertest.LogsSink{}, + metrics: &consumertest.MetricsSink{}, + traces: &consumertest.TracesSink{}, + attrs: map[string]string{ + "k8s.pod.uid": "uid-1", + "k8s.pod.name": "pod-1", + "k8s.namespace.name": "default", + }, + }, + }, + { + name: "port endpoint", + args: args{ + resources: cfg.ResourceAttributes, + env: portEnv, + endpoint: portEndpoint, + nextLogs: nil, + nextMetrics: &consumertest.MetricsSink{}, + nextTraces: &consumertest.TracesSink{}, + }, + want: &enhancingConsumer{ + logs: nil, + metrics: &consumertest.MetricsSink{}, + traces: &consumertest.TracesSink{}, + attrs: map[string]string{ + "k8s.pod.uid": "uid-1", + "k8s.pod.name": "pod-1", + "k8s.namespace.name": "default", + }, + }, + }, + { + name: "container endpoint", + args: args{ + resources: cfg.ResourceAttributes, + env: cntrEnv, + endpoint: containerEndpoint, + nextLogs: &consumertest.LogsSink{}, + nextMetrics: nil, + nextTraces: &consumertest.TracesSink{}, + }, + want: &enhancingConsumer{ + logs: &consumertest.LogsSink{}, + metrics: nil, + traces: &consumertest.TracesSink{}, + attrs: map[string]string{ + "container.name": "otel-agent", + "container.image.name": "otelcol", + }, + }, + }, + { + // If the configured attribute value is empty it should not touch that + // attribute. + name: "attribute value empty", + args: args{ + resources: func() resourceAttributes { + res := createDefaultConfig().(*Config).ResourceAttributes + res[observer.PodType]["k8s.pod.name"] = "" + return res + }(), + env: podEnv, + endpoint: podEndpoint, + nextLogs: &consumertest.LogsSink{}, + nextMetrics: &consumertest.MetricsSink{}, + nextTraces: nil, + }, + want: &enhancingConsumer{ + logs: &consumertest.LogsSink{}, + metrics: &consumertest.MetricsSink{}, + traces: nil, + attrs: map[string]string{ + "k8s.pod.uid": "uid-1", + "k8s.namespace.name": "default", + }, + }, + }, + { + name: "both forms of resource attributes", + args: args{ + resources: func() resourceAttributes { + res := map[observer.EndpointType]map[string]string{observer.PodType: {}} + for k, v := range cfg.ResourceAttributes[observer.PodType] { + res[observer.PodType][k] = v + } + res[observer.PodType]["duplicate.resource.attribute"] = "pod.value" + res[observer.PodType]["delete.me"] = "pod.value" + return res + }(), + resourceAttributes: map[string]string{ + "expanded.resource.attribute": "`'labels' in pod ? pod.labels['region'] : labels['region']`", + "duplicate.resource.attribute": "receiver.value", + "delete.me": "", + }, + env: podEnv, + endpoint: podEndpoint, + nextLogs: nil, + nextMetrics: nil, + nextTraces: nil, + }, + want: &enhancingConsumer{ + logs: nil, + metrics: nil, + traces: nil, + attrs: map[string]string{ + "k8s.namespace.name": "default", + "k8s.pod.name": "pod-1", + "k8s.pod.uid": "uid-1", + "duplicate.resource.attribute": "receiver.value", + "expanded.resource.attribute": "west-1", + }, + }, + }, + { + name: "error", + args: args{ + resources: func() resourceAttributes { + res := createDefaultConfig().(*Config).ResourceAttributes + res[observer.PodType]["k8s.pod.name"] = "`unbalanced" + return res + }(), + env: podEnv, + endpoint: podEndpoint, + nextLogs: &consumertest.LogsSink{}, + nextMetrics: &consumertest.MetricsSink{}, + nextTraces: &consumertest.TracesSink{}, + }, + want: nil, + expectedError: `failed processing resource attribute "k8s.pod.name" for endpoint pod-1: expression was unbalanced starting at character 1`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newEnhancingConsumer(tt.args.resources, tt.args.resourceAttributes, tt.args.env, tt.args.endpoint, tt.args.nextLogs, tt.args.nextMetrics, tt.args.nextTraces) + if tt.expectedError != "" { + assert.EqualError(t, err, tt.expectedError) + assert.Nil(t, got) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + } + }) + } +} + +func TestEnhancingConsumerConsumeFunctions(t *testing.T) { + logsFn := func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some.log") + return ld + } + expectedLogs := logsFn() + + metricsFn := func() pmetric.Metrics { + md := pmetric.NewMetrics() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("some.metric") + return md + } + expectedMetrics := metricsFn() + + traceFn := func() ptrace.Traces { + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetName("some.span") + return td + } + expectedTraces := traceFn() + + for _, attrs := range []pcommon.Map{ + expectedLogs.ResourceLogs().At(0).Resource().Attributes(), + expectedMetrics.ResourceMetrics().At(0).Resource().Attributes(), + expectedTraces.ResourceSpans().At(0).Resource().Attributes(), + } { + attrs.PutStr("key1", "value1") + attrs.PutStr("key2", "value2") + } + + type consumers struct { + nextLogs *consumertest.LogsSink + nextMetrics *consumertest.MetricsSink + nextTraces *consumertest.TracesSink + } + type args struct { + ld plog.Logs + md pmetric.Metrics + td ptrace.Traces + } + type test struct { + name string + consumers consumers + args args + expectedLogs *plog.Logs + expectedMetrics *pmetric.Metrics + expectedTraces *ptrace.Traces + } + + var tests []test + + for _, _ls := range []*consumertest.LogsSink{{}, nil} { + for _, _ms := range []*consumertest.MetricsSink{{}, nil} { + for _, _ts := range []*consumertest.TracesSink{{}, nil} { + ls, ms, ts := _ls, _ms, _ts + tst := test{ + consumers: consumers{ + nextLogs: ls, + nextMetrics: ms, + nextTraces: ts, + }, + args: args{}, + } + if ls != nil { + tst.name += "logs " + tst.args.ld = logsFn() + tst.expectedLogs = &expectedLogs + } + if ms != nil { + tst.name += "metrics " + tst.args.md = metricsFn() + tst.expectedMetrics = &expectedMetrics + } + if ts != nil { + tst.name += "traces" + tst.args.td = traceFn() + tst.expectedTraces = &expectedTraces + } + tst.name = strings.TrimSpace(tst.name) + tests = append(tests, tst) + } + } + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ec := &enhancingConsumer{ + attrs: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + } + if tt.consumers.nextLogs != nil { + ec.logs = tt.consumers.nextLogs + tt.consumers.nextLogs.Reset() + } + if tt.consumers.nextMetrics != nil { + ec.metrics = tt.consumers.nextMetrics + tt.consumers.nextMetrics.Reset() + } + if tt.consumers.nextTraces != nil { + ec.traces = tt.consumers.nextTraces + tt.consumers.nextTraces.Reset() + } + + if tt.expectedLogs != nil { + require.NoError(t, ec.ConsumeLogs(context.Background(), tt.args.ld)) + logs := tt.consumers.nextLogs.AllLogs() + require.Len(t, logs, 1) + require.NoError(t, plogtest.CompareLogs(*tt.expectedLogs, logs[0])) + } else { + require.EqualError(t, ec.ConsumeLogs(context.Background(), plog.NewLogs()), "no log consumer available") + } + + if tt.expectedMetrics != nil { + require.NoError(t, ec.ConsumeMetrics(context.Background(), tt.args.md)) + metrics := tt.consumers.nextMetrics.AllMetrics() + require.Len(t, metrics, 1) + require.NoError(t, pmetrictest.CompareMetrics(*tt.expectedMetrics, metrics[0])) + } else { + require.EqualError(t, ec.ConsumeMetrics(context.Background(), pmetric.NewMetrics()), "no metric consumer available") + } + + if tt.expectedTraces != nil { + require.NoError(t, ec.ConsumeTraces(context.Background(), tt.args.td)) + traces := tt.consumers.nextTraces.AllTraces() + require.Len(t, traces, 1) + require.NoError(t, ptracetest.CompareTraces(*tt.expectedTraces, traces[0])) + } else { + require.EqualError(t, ec.ConsumeTraces(context.Background(), ptrace.NewTraces()), "no trace consumer available") + } + }) + } +} diff --git a/receiver/receivercreator/factory.go b/receiver/receivercreator/factory.go index 7c2f8dcdfee9..6a7e624f34d2 100644 --- a/receiver/receivercreator/factory.go +++ b/receiver/receivercreator/factory.go @@ -23,17 +23,23 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator/internal/metadata" ) // This file implements factory for receiver_creator. A receiver_creator can create other receivers at runtime. +var receivers = sharedcomponent.NewSharedComponents() + // NewFactory creates a factory for receiver creator. func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) + receiver.WithLogs(createLogsReceiver, metadata.LogsStability), + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + receiver.WithTraces(createTracesReceiver, metadata.TracesStability), + ) } func createDefaultConfig() component.Config { @@ -62,11 +68,71 @@ func createDefaultConfig() component.Config { } } +func createLogsReceiver( + _ context.Context, + params receiver.CreateSettings, + cfg component.Config, + consumer consumer.Logs, +) (receiver.Logs, error) { + var err error + var recv receiver.Logs + rCfg := cfg.(*Config) + r := receivers.GetOrAdd(cfg, func() component.Component { + recv, err = newLogsReceiverCreator(params, rCfg, consumer) + return recv + }) + rcvr := r.Component.(*receiverCreator) + if rcvr.nextLogsConsumer == nil { + rcvr.nextLogsConsumer = consumer + } + if err != nil { + return nil, err + } + return r, nil +} + func createMetricsReceiver( - ctx context.Context, + _ context.Context, params receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics, ) (receiver.Metrics, error) { - return newReceiverCreator(params, cfg.(*Config), consumer) + var err error + var recv receiver.Logs + rCfg := cfg.(*Config) + r := receivers.GetOrAdd(cfg, func() component.Component { + recv, err = newMetricsReceiverCreator(params, rCfg, consumer) + return recv + }) + rcvr := r.Component.(*receiverCreator) + if rcvr.nextMetricsConsumer == nil { + rcvr.nextMetricsConsumer = consumer + } + if err != nil { + return nil, err + } + return r, nil +} + +func createTracesReceiver( + _ context.Context, + params receiver.CreateSettings, + cfg component.Config, + consumer consumer.Traces, +) (receiver.Traces, error) { + var err error + var recv receiver.Logs + rCfg := cfg.(*Config) + r := receivers.GetOrAdd(cfg, func() component.Component { + recv, err = newTracesReceiverCreator(params, rCfg, consumer) + return recv + }) + rcvr := r.Component.(*receiverCreator) + if rcvr.nextTracesConsumer == nil { + rcvr.nextTracesConsumer = consumer + } + if err != nil { + return nil, err + } + return r, nil } diff --git a/receiver/receivercreator/factory_test.go b/receiver/receivercreator/factory_test.go index 67263d2e09b4..59265b975d31 100644 --- a/receiver/receivercreator/factory_test.go +++ b/receiver/receivercreator/factory_test.go @@ -19,9 +19,11 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" ) func TestCreateReceiver(t *testing.T) { @@ -29,21 +31,42 @@ func TestCreateReceiver(t *testing.T) { cfg := createDefaultConfig() params := receivertest.NewNopCreateSettings() - tReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, consumertest.NewNop()) + + lConsumer := consumertest.NewNop() + lReceiver, err := factory.CreateLogsReceiver(context.Background(), params, cfg, lConsumer) assert.NoError(t, err, "receiver creation failed") - assert.NotNil(t, tReceiver, "receiver creation failed") + assert.NotNil(t, lReceiver, "receiver creation failed") + + shared, ok := lReceiver.(*sharedcomponent.SharedComponent) + require.True(t, ok) + lrc := shared.Component.(*receiverCreator) + require.Same(t, lConsumer, lrc.nextLogsConsumer) + require.Nil(t, lrc.nextMetricsConsumer) + require.Nil(t, lrc.nextTracesConsumer) - tReceiver, err = factory.CreateMetricsReceiver(context.Background(), params, cfg, consumertest.NewNop()) + mConsumer := consumertest.NewNop() + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, mConsumer) assert.NoError(t, err, "receiver creation failed") - assert.NotNil(t, tReceiver, "receiver creation failed") + assert.NotNil(t, mReceiver, "receiver creation failed") + + shared, ok = mReceiver.(*sharedcomponent.SharedComponent) + require.True(t, ok) + mrc := shared.Component.(*receiverCreator) + require.Same(t, lrc, mrc) + require.Same(t, lConsumer, mrc.nextLogsConsumer) + require.Same(t, mConsumer, mrc.nextMetricsConsumer) + require.Nil(t, lrc.nextTracesConsumer) - mReceiver, err := factory.CreateTracesReceiver(context.Background(), params, cfg, nil) - assert.Error(t, err) - assert.ErrorIs(t, err, component.ErrDataTypeIsNotSupported) - assert.Nil(t, mReceiver) + tConsumer := consumertest.NewNop() + tReceiver, err := factory.CreateTracesReceiver(context.Background(), params, cfg, tConsumer) + assert.NoError(t, err, "receiver creation failed") + assert.NotNil(t, tReceiver, "receiver creation failed") - lReceiver, err := factory.CreateLogsReceiver(context.Background(), params, cfg, nil) - assert.Error(t, err) - assert.ErrorIs(t, err, component.ErrDataTypeIsNotSupported) - assert.Nil(t, lReceiver) + shared, ok = tReceiver.(*sharedcomponent.SharedComponent) + require.True(t, ok) + trc := shared.Component.(*receiverCreator) + require.Same(t, mrc, trc) + require.Same(t, lConsumer, mrc.nextLogsConsumer) + require.Same(t, mConsumer, mrc.nextMetricsConsumer) + require.Same(t, tConsumer, mrc.nextTracesConsumer) } diff --git a/receiver/receivercreator/go.mod b/receiver/receivercreator/go.mod index 35b33b354f44..ae2f6466d45a 100644 --- a/receiver/receivercreator/go.mod +++ b/receiver/receivercreator/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/antonmedv/expr v1.12.5 github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.77.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.77.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.77.0 github.com/spf13/cast v1.5.1 github.com/stretchr/testify v1.8.2 @@ -89,6 +90,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent + retract ( v0.76.2 v0.76.1 diff --git a/receiver/receivercreator/internal/metadata/generated_status.go b/receiver/receivercreator/internal/metadata/generated_status.go index 2ca09f3ae505..2d1d94d45487 100644 --- a/receiver/receivercreator/internal/metadata/generated_status.go +++ b/receiver/receivercreator/internal/metadata/generated_status.go @@ -8,5 +8,7 @@ import ( const ( Type = "receiver_creator" + LogsStability = component.StabilityLevelAlpha + TracesStability = component.StabilityLevelAlpha MetricsStability = component.StabilityLevelBeta ) diff --git a/receiver/receivercreator/metadata.yaml b/receiver/receivercreator/metadata.yaml index 9548e0d9fc33..d54819c2fe95 100644 --- a/receiver/receivercreator/metadata.yaml +++ b/receiver/receivercreator/metadata.yaml @@ -4,5 +4,5 @@ status: class: receiver stability: beta: [metrics] + alpha: [logs, traces] distributions: [contrib, splunk] - diff --git a/receiver/receivercreator/observerhandler.go b/receiver/receivercreator/observerhandler.go index 32ad956f5de2..77a19269739e 100644 --- a/receiver/receivercreator/observerhandler.go +++ b/receiver/receivercreator/observerhandler.go @@ -18,6 +18,7 @@ import ( "fmt" "sync" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.uber.org/multierr" @@ -44,8 +45,12 @@ type observerHandler struct { params receiver.CreateSettings // receiversByEndpointID is a map of endpoint IDs to a receiver instance. receiversByEndpointID receiverMap - // nextConsumer is the receiver_creator's own consumer - nextConsumer consumer.Metrics + // nextLogsConsumer is the receiver_creator's own consumer + nextLogsConsumer consumer.Logs + // nextMetricsConsumer is the receiver_creator's own consumer + nextMetricsConsumer consumer.Metrics + // nextTracesConsumer is the receiver_creator's own consumer + nextTracesConsumer consumer.Traces // runner starts and stops receiver instances. runner runner } @@ -82,8 +87,9 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) { defer obs.Unlock() for _, e := range added { - env, err := e.Env() - if err != nil { + var env observer.EndpointEnv + var err error + if env, err = e.Env(); err != nil { obs.params.TelemetrySettings.Logger.Error("unable to convert endpoint to environment map", zap.String("endpoint", string(e.ID)), zap.Error(err)) continue } @@ -91,8 +97,8 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) { obs.params.TelemetrySettings.Logger.Debug("handling added endpoint", zap.Any("env", env)) for _, template := range obs.config.receiverTemplates { - if matches, err := template.rule.eval(env); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(err)) + if matches, e := template.rule.eval(env); e != nil { + obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(e)) continue } else if !matches { continue @@ -137,35 +143,35 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) { // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources // as telemetry is emitted. - resourceEnhancer, err := newResourceEnhancer( + var consumer *enhancingConsumer + if consumer, err = newEnhancingConsumer( obs.config.ResourceAttributes, resAttrs, env, e, - obs.nextConsumer, - ) - - if err != nil { + obs.nextLogsConsumer, + obs.nextMetricsConsumer, + obs.nextTracesConsumer, + ); err != nil { obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) continue } - rcvr, err := obs.runner.start( + var receiver component.Component + if receiver, err = obs.runner.start( receiverConfig{ id: template.id, config: resolvedConfig, endpointID: e.ID, }, discoveredConfig, - resourceEnhancer, - ) - - if err != nil { + consumer, + ); err != nil { obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) continue } - obs.receiversByEndpointID.Put(e.ID, rcvr) + obs.receiversByEndpointID.Put(e.ID, receiver) } } } diff --git a/receiver/receivercreator/observerhandler_test.go b/receiver/receivercreator/observerhandler_test.go index 852bcef1cea9..2c03a0e24261 100644 --- a/receiver/receivercreator/observerhandler_test.go +++ b/receiver/receivercreator/observerhandler_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/otelcol" "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.opentelemetry.io/collector/receiver/receivertest" @@ -28,7 +29,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" ) -func TestOnAdd(t *testing.T) { +func TestOnAddForMetrics(t *testing.T) { for _, test := range []struct { name string receiverTemplateID component.ID @@ -90,7 +91,7 @@ func TestOnAdd(t *testing.T) { }, } - handler, mr := newObserverHandler(t, cfg) + handler, mr := newObserverHandler(t, cfg, nil, consumertest.NewNop(), nil) handler.OnAdd([]observer.Endpoint{ portEndpoint, unsupportedEndpoint, @@ -108,8 +109,218 @@ func TestOnAdd(t *testing.T) { require.NoError(t, mr.lastError) require.NotNil(t, mr.startedComponent) + wr, ok := mr.startedComponent.(*wrappedReceiver) + require.True(t, ok) + + require.Nil(t, wr.logs) + require.Nil(t, wr.traces) + + var actualConfig component.Config + switch v := wr.metrics.(type) { + case *nopWithEndpointReceiver: + require.NotNil(t, v) + actualConfig = v.cfg + case *nopWithoutEndpointReceiver: + require.NotNil(t, v) + actualConfig = v.cfg + default: + t.Fatalf("unexpected startedComponent: %T", v) + } + require.Equal(t, test.expectedReceiverConfig, actualConfig) + }) + } +} + +func TestOnAddForLogs(t *testing.T) { + for _, test := range []struct { + name string + receiverTemplateID component.ID + receiverTemplateConfig userConfigMap + expectedReceiverType component.Component + expectedReceiverConfig component.Config + expectedError string + }{ + { + name: "dynamically set with supported endpoint", + receiverTemplateID: component.NewIDWithName("with.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"int_field": 12345678}, + expectedReceiverType: &nopWithEndpointReceiver{}, + expectedReceiverConfig: &nopWithEndpointConfig{ + IntField: 12345678, + Endpoint: "localhost:1234", + }, + }, + { + name: "inherits supported endpoint", + receiverTemplateID: component.NewIDWithName("with.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"endpoint": "some.endpoint"}, + expectedReceiverType: &nopWithEndpointReceiver{}, + expectedReceiverConfig: &nopWithEndpointConfig{ + IntField: 1234, + Endpoint: "some.endpoint", + }, + }, + { + name: "not dynamically set with unsupported endpoint", + receiverTemplateID: component.NewIDWithName("without.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"int_field": 23456789, "not_endpoint": "not.an.endpoint"}, + expectedReceiverType: &nopWithoutEndpointReceiver{}, + expectedReceiverConfig: &nopWithoutEndpointConfig{ + IntField: 23456789, + NotEndpoint: "not.an.endpoint", + }, + }, + { + name: "inherits unsupported endpoint", + receiverTemplateID: component.NewIDWithName("without.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"endpoint": "unsupported.endpoint"}, + expectedError: "failed to load \"without.endpoint/some.name\" template config: 1 error(s) decoding:\n\n* '' has invalid keys: endpoint", + }, + } { + t.Run(test.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + rcvrCfg := receiverConfig{ + id: test.receiverTemplateID, + config: test.receiverTemplateConfig, + endpointID: portEndpoint.ID, + } + cfg.receiverTemplates = map[string]receiverTemplate{ + rcvrCfg.id.String(): { + receiverConfig: rcvrCfg, + rule: portRule, + Rule: `type == "port"`, + ResourceAttributes: map[string]interface{}{}, + }, + } + + handler, mr := newObserverHandler(t, cfg, consumertest.NewNop(), nil, nil) + handler.OnAdd([]observer.Endpoint{ + portEndpoint, + unsupportedEndpoint, + }) + + if test.expectedError != "" { + assert.Equal(t, 0, handler.receiversByEndpointID.Size()) + require.Error(t, mr.lastError) + require.EqualError(t, mr.lastError, test.expectedError) + require.Nil(t, mr.startedComponent) + return + } + + assert.Equal(t, 1, handler.receiversByEndpointID.Size()) + require.NoError(t, mr.lastError) + require.NotNil(t, mr.startedComponent) + + wr, ok := mr.startedComponent.(*wrappedReceiver) + require.True(t, ok) + + require.Nil(t, wr.metrics) + require.Nil(t, wr.traces) + + var actualConfig component.Config + switch v := wr.logs.(type) { + case *nopWithEndpointReceiver: + require.NotNil(t, v) + actualConfig = v.cfg + case *nopWithoutEndpointReceiver: + require.NotNil(t, v) + actualConfig = v.cfg + default: + t.Fatalf("unexpected startedComponent: %T", v) + } + require.Equal(t, test.expectedReceiverConfig, actualConfig) + }) + } +} + +func TestOnAddForTraces(t *testing.T) { + for _, test := range []struct { + name string + receiverTemplateID component.ID + receiverTemplateConfig userConfigMap + expectedReceiverType component.Component + expectedReceiverConfig component.Config + expectedError string + }{ + { + name: "dynamically set with supported endpoint", + receiverTemplateID: component.NewIDWithName("with.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"int_field": 12345678}, + expectedReceiverType: &nopWithEndpointReceiver{}, + expectedReceiverConfig: &nopWithEndpointConfig{ + IntField: 12345678, + Endpoint: "localhost:1234", + }, + }, + { + name: "inherits supported endpoint", + receiverTemplateID: component.NewIDWithName("with.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"endpoint": "some.endpoint"}, + expectedReceiverType: &nopWithEndpointReceiver{}, + expectedReceiverConfig: &nopWithEndpointConfig{ + IntField: 1234, + Endpoint: "some.endpoint", + }, + }, + { + name: "not dynamically set with unsupported endpoint", + receiverTemplateID: component.NewIDWithName("without.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"int_field": 23456789, "not_endpoint": "not.an.endpoint"}, + expectedReceiverType: &nopWithoutEndpointReceiver{}, + expectedReceiverConfig: &nopWithoutEndpointConfig{ + IntField: 23456789, + NotEndpoint: "not.an.endpoint", + }, + }, + { + name: "inherits unsupported endpoint", + receiverTemplateID: component.NewIDWithName("without.endpoint", "some.name"), + receiverTemplateConfig: userConfigMap{"endpoint": "unsupported.endpoint"}, + expectedError: "failed to load \"without.endpoint/some.name\" template config: 1 error(s) decoding:\n\n* '' has invalid keys: endpoint", + }, + } { + t.Run(test.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + rcvrCfg := receiverConfig{ + id: test.receiverTemplateID, + config: test.receiverTemplateConfig, + endpointID: portEndpoint.ID, + } + cfg.receiverTemplates = map[string]receiverTemplate{ + rcvrCfg.id.String(): { + receiverConfig: rcvrCfg, + rule: portRule, + Rule: `type == "port"`, + ResourceAttributes: map[string]interface{}{}, + }, + } + + handler, mr := newObserverHandler(t, cfg, nil, nil, consumertest.NewNop()) + handler.OnAdd([]observer.Endpoint{ + portEndpoint, + unsupportedEndpoint, + }) + + if test.expectedError != "" { + assert.Equal(t, 0, handler.receiversByEndpointID.Size()) + require.Error(t, mr.lastError) + require.EqualError(t, mr.lastError, test.expectedError) + require.Nil(t, mr.startedComponent) + return + } + + assert.Equal(t, 1, handler.receiversByEndpointID.Size()) + require.NoError(t, mr.lastError) + require.NotNil(t, mr.startedComponent) + + wr, ok := mr.startedComponent.(*wrappedReceiver) + require.True(t, ok) + + require.Nil(t, wr.logs) + require.Nil(t, wr.metrics) + var actualConfig component.Config - switch v := mr.startedComponent.(type) { + switch v := wr.traces.(type) { case *nopWithEndpointReceiver: require.NotNil(t, v) actualConfig = v.cfg @@ -120,11 +331,41 @@ func TestOnAdd(t *testing.T) { t.Fatalf("unexpected startedComponent: %T", v) } require.Equal(t, test.expectedReceiverConfig, actualConfig) + }) } } -func TestOnRemove(t *testing.T) { +func TestOnRemoveForMetrics(t *testing.T) { + cfg := createDefaultConfig().(*Config) + rcvrCfg := receiverConfig{ + id: component.NewIDWithName("with.endpoint", "some.name"), + config: userConfigMap{"endpoint": "some.endpoint"}, + endpointID: portEndpoint.ID, + } + cfg.receiverTemplates = map[string]receiverTemplate{ + rcvrCfg.id.String(): { + receiverConfig: rcvrCfg, + rule: portRule, + Rule: `type == "port"`, + ResourceAttributes: map[string]interface{}{}, + }, + } + handler, r := newObserverHandler(t, cfg, nil, consumertest.NewNop(), nil) + handler.OnAdd([]observer.Endpoint{portEndpoint}) + + rcvr := r.startedComponent + require.NotNil(t, rcvr) + require.NoError(t, r.lastError) + + handler.OnRemove([]observer.Endpoint{portEndpoint}) + + assert.Equal(t, 0, handler.receiversByEndpointID.Size()) + require.Same(t, rcvr, r.shutdownComponent) + require.NoError(t, r.lastError) +} + +func TestOnRemoveForLogs(t *testing.T) { cfg := createDefaultConfig().(*Config) rcvrCfg := receiverConfig{ id: component.NewIDWithName("with.endpoint", "some.name"), @@ -139,7 +380,7 @@ func TestOnRemove(t *testing.T) { ResourceAttributes: map[string]interface{}{}, }, } - handler, r := newObserverHandler(t, cfg) + handler, r := newObserverHandler(t, cfg, consumertest.NewNop(), nil, nil) handler.OnAdd([]observer.Endpoint{portEndpoint}) rcvr := r.startedComponent @@ -168,7 +409,7 @@ func TestOnChange(t *testing.T) { ResourceAttributes: map[string]interface{}{}, }, } - handler, r := newObserverHandler(t, cfg) + handler, r := newObserverHandler(t, cfg, nil, consumertest.NewNop(), nil) handler.OnAdd([]observer.Endpoint{portEndpoint}) origRcvr := r.startedComponent @@ -198,9 +439,9 @@ type mockRunner struct { func (r *mockRunner) start( receiver receiverConfig, discoveredConfig userConfigMap, - nextConsumer consumer.Metrics, + consumer *enhancingConsumer, ) (component.Component, error) { - r.startedComponent, r.lastError = r.receiverRunner.start(receiver, discoveredConfig, nextConsumer) + r.startedComponent, r.lastError = r.receiverRunner.start(receiver, discoveredConfig, consumer) return r.startedComponent, r.lastError } @@ -254,7 +495,12 @@ func newMockRunner(t *testing.T) *mockRunner { } } -func newObserverHandler(t *testing.T, config *Config) (*observerHandler, *mockRunner) { +func newObserverHandler( + t *testing.T, config *Config, + nextLogs consumer.Logs, + nextMetrics consumer.Metrics, + nextTraces consumer.Traces, +) (*observerHandler, *mockRunner) { set := receivertest.NewNopCreateSettings() set.ID = component.NewIDWithName("some.type", "some.name") mr := newMockRunner(t) @@ -263,5 +509,8 @@ func newObserverHandler(t *testing.T, config *Config) (*observerHandler, *mockRu config: config, receiversByEndpointID: receiverMap{}, runner: mr, + nextLogsConsumer: nextLogs, + nextMetricsConsumer: nextMetrics, + nextTracesConsumer: nextTraces, }, mr } diff --git a/receiver/receivercreator/receiver.go b/receiver/receivercreator/receiver.go index b64d0c8e837c..c0327c9cf822 100644 --- a/receiver/receivercreator/receiver.go +++ b/receiver/receivercreator/receiver.go @@ -30,23 +30,53 @@ var _ receiver.Metrics = (*receiverCreator)(nil) // receiverCreator implements consumer.Metrics. type receiverCreator struct { - params receiver.CreateSettings - cfg *Config - nextConsumer consumer.Metrics - observerHandler *observerHandler - observables []observer.Observable + params receiver.CreateSettings + cfg *Config + nextLogsConsumer consumer.Logs + nextMetricsConsumer consumer.Metrics + nextTracesConsumer consumer.Traces + observerHandler *observerHandler + observables []observer.Observable } -// newReceiverCreator creates the receiver_creator with the given parameters. -func newReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { +// newLogsReceiverCreator creates the receiver_creator with the given parameters. +func newLogsReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Logs) (receiver.Logs, error) { if nextConsumer == nil { return nil, component.ErrNilNextConsumer } r := &receiverCreator{ - params: params, - cfg: cfg, - nextConsumer: nextConsumer, + params: params, + cfg: cfg, + nextLogsConsumer: nextConsumer, + } + return r, nil +} + +// newMetricsReceiverCreator creates the receiver_creator with the given parameters. +func newMetricsReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { + if nextConsumer == nil { + return nil, component.ErrNilNextConsumer + } + + r := &receiverCreator{ + params: params, + cfg: cfg, + nextMetricsConsumer: nextConsumer, + } + return r, nil +} + +// newTracesReceiverCreator creates the receiver_creator with the given parameters. +func newTracesReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Traces) (receiver.Traces, error) { + if nextConsumer == nil { + return nil, component.ErrNilNextConsumer + } + + r := &receiverCreator{ + params: params, + cfg: cfg, + nextTracesConsumer: nextConsumer, } return r, nil } @@ -70,12 +100,10 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error { config: rc.cfg, params: rc.params, receiversByEndpointID: receiverMap{}, - nextConsumer: rc.nextConsumer, - runner: &receiverRunner{ - params: rc.params, - idNamespace: rc.params.ID, - host: &loggingHost{host, rc.params.Logger}, - }, + nextLogsConsumer: rc.nextLogsConsumer, + nextMetricsConsumer: rc.nextMetricsConsumer, + nextTracesConsumer: rc.nextTracesConsumer, + runner: newReceiverRunner(rc.params, &loggingHost{host, rc.params.Logger}), } observers := map[component.ID]observer.Observable{} diff --git a/receiver/receivercreator/receiver_test.go b/receiver/receivercreator/receiver_test.go index bf9d356131f8..d465aeee72d9 100644 --- a/receiver/receivercreator/receiver_test.go +++ b/receiver/receivercreator/receiver_test.go @@ -37,6 +37,7 @@ import ( zapObserver "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator/internal/metadata" ) @@ -93,7 +94,8 @@ func TestMockedEndToEnd(t *testing.T) { rcvr, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, mockConsumer) require.NoError(t, err) - dyn := rcvr.(*receiverCreator) + sc := rcvr.(*sharedcomponent.SharedComponent) + dyn := sc.Component.(*receiverCreator) require.NoError(t, rcvr.Start(context.Background(), host)) var shutdownOnce sync.Once @@ -111,7 +113,8 @@ func TestMockedEndToEnd(t *testing.T) { // Test that we can send metrics. for _, receiver := range dyn.observerHandler.receiversByEndpointID.Values() { - example := receiver.(*nopWithEndpointReceiver) + wr := receiver.(*wrappedReceiver) + example := wr.metrics.(*nopWithEndpointReceiver) md := pmetric.NewMetrics() rm := md.ResourceMetrics().AppendEmpty() rm.Resource().Attributes().PutStr("attr", "1") diff --git a/receiver/receivercreator/resourceenhancer.go b/receiver/receivercreator/resourceenhancer.go deleted file mode 100644 index ced55d111811..000000000000 --- a/receiver/receivercreator/resourceenhancer.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator" - -import ( - "context" - "fmt" - - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" -) - -var _ consumer.Metrics = (*resourceEnhancer)(nil) - -// resourceEnhancer adds additional resource attribute entries -// from the given endpoint environment. The added attributes vary based on the type -// of the endpoint. -type resourceEnhancer struct { - nextConsumer consumer.Metrics - attrs map[string]string -} - -func newResourceEnhancer( - resources resourceAttributes, - receiverAttributes map[string]string, - env observer.EndpointEnv, - endpoint observer.Endpoint, - nextConsumer consumer.Metrics, -) (*resourceEnhancer, error) { - attrs := map[string]string{} - - for _, resource := range []map[string]string{resources[endpoint.Details.Type()], receiverAttributes} { - // Precompute values that will be inserted for each resource object passed through. - for attr, expr := range resource { - // If the attribute value is empty this signals to delete existing - if expr == "" { - delete(attrs, attr) - continue - } - - res, err := evalBackticksInConfigValue(expr, env) - if err != nil { - return nil, fmt.Errorf("failed processing resource attribute %q for endpoint %v: %w", attr, endpoint.ID, err) - } - - val := fmt.Sprint(res) - if val != "" { - attrs[attr] = val - } - } - } - - return &resourceEnhancer{ - nextConsumer: nextConsumer, - attrs: attrs, - }, nil -} - -func (r *resourceEnhancer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -func (r *resourceEnhancer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - rm := md.ResourceMetrics() - for i := 0; i < rm.Len(); i++ { - rms := rm.At(i) - attrs := rms.Resource().Attributes() - - for attr, val := range r.attrs { - if _, found := attrs.Get(attr); !found { - attrs.PutStr(attr, val) - } - } - } - - return r.nextConsumer.ConsumeMetrics(ctx, md) -} diff --git a/receiver/receivercreator/resourceenhancer_test.go b/receiver/receivercreator/resourceenhancer_test.go deleted file mode 100644 index 82d94b2218a9..000000000000 --- a/receiver/receivercreator/resourceenhancer_test.go +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package receivercreator - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/pmetric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" -) - -func Test_newResourceEnhancer(t *testing.T) { - podEnv, err := podEndpoint.Env() - if err != nil { - t.Fatal(err) - } - portEnv, err := portEndpoint.Env() - if err != nil { - t.Fatal(err) - } - - cntrEnv, err := containerEndpoint.Env() - if err != nil { - t.Fatal(err) - } - - cfg := createDefaultConfig().(*Config) - type args struct { - resources resourceAttributes - resourceAttributes map[string]string - env observer.EndpointEnv - endpoint observer.Endpoint - nextConsumer consumer.Metrics - } - tests := []struct { - name string - args args - want *resourceEnhancer - wantErr bool - }{ - { - name: "pod endpoint", - args: args{ - resources: cfg.ResourceAttributes, - env: podEnv, - endpoint: podEndpoint, - nextConsumer: &consumertest.MetricsSink{}, - }, - want: &resourceEnhancer{ - nextConsumer: &consumertest.MetricsSink{}, - attrs: map[string]string{ - "k8s.pod.uid": "uid-1", - "k8s.pod.name": "pod-1", - "k8s.namespace.name": "default", - }, - }, - wantErr: false, - }, - { - name: "port endpoint", - args: args{ - resources: cfg.ResourceAttributes, - env: portEnv, - endpoint: portEndpoint, - nextConsumer: &consumertest.MetricsSink{}, - }, - want: &resourceEnhancer{ - nextConsumer: &consumertest.MetricsSink{}, - attrs: map[string]string{ - "k8s.pod.uid": "uid-1", - "k8s.pod.name": "pod-1", - "k8s.namespace.name": "default", - }, - }, - wantErr: false, - }, - { - name: "container endpoint", - args: args{ - resources: cfg.ResourceAttributes, - env: cntrEnv, - endpoint: containerEndpoint, - nextConsumer: &consumertest.MetricsSink{}, - }, - want: &resourceEnhancer{ - nextConsumer: &consumertest.MetricsSink{}, - attrs: map[string]string{ - "container.name": "otel-agent", - "container.image.name": "otelcol", - }, - }, - wantErr: false, - }, - { - // If the configured attribute value is empty it should not touch that - // attribute. - name: "attribute value empty", - args: args{ - resources: func() resourceAttributes { - res := createDefaultConfig().(*Config).ResourceAttributes - res[observer.PodType]["k8s.pod.name"] = "" - return res - }(), - env: podEnv, - endpoint: podEndpoint, - nextConsumer: nil, - }, - want: &resourceEnhancer{ - nextConsumer: nil, - attrs: map[string]string{ - "k8s.pod.uid": "uid-1", - "k8s.namespace.name": "default", - }, - }, - wantErr: false, - }, - { - name: "both forms of resource attributes", - args: args{ - resources: func() resourceAttributes { - res := map[observer.EndpointType]map[string]string{observer.PodType: {}} - for k, v := range cfg.ResourceAttributes[observer.PodType] { - res[observer.PodType][k] = v - } - res[observer.PodType]["duplicate.resource.attribute"] = "pod.value" - res[observer.PodType]["delete.me"] = "pod.value" - return res - }(), - resourceAttributes: map[string]string{ - "expanded.resource.attribute": "`'labels' in pod ? pod.labels['region'] : labels['region']`", - "duplicate.resource.attribute": "receiver.value", - "delete.me": "", - }, - env: podEnv, - endpoint: podEndpoint, - nextConsumer: nil, - }, - want: &resourceEnhancer{ - nextConsumer: nil, - attrs: map[string]string{ - "k8s.namespace.name": "default", - "k8s.pod.name": "pod-1", - "k8s.pod.uid": "uid-1", - "duplicate.resource.attribute": "receiver.value", - "expanded.resource.attribute": "west-1", - }, - }, - wantErr: false, - }, - { - name: "error", - args: args{ - resources: func() resourceAttributes { - res := createDefaultConfig().(*Config).ResourceAttributes - res[observer.PodType]["k8s.pod.name"] = "`unbalanced" - return res - }(), - env: podEnv, - endpoint: podEndpoint, - nextConsumer: nil, - }, - want: nil, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := newResourceEnhancer(tt.args.resources, tt.args.resourceAttributes, tt.args.env, tt.args.endpoint, tt.args.nextConsumer) - if tt.wantErr { - assert.Error(t, err) - return - } - require.NoError(t, err) - assert.Equal(t, tt.want, got) - }) - } -} - -func Test_resourceEnhancer_ConsumeMetrics(t *testing.T) { - type fields struct { - nextConsumer *consumertest.MetricsSink - attrs map[string]string - } - type args struct { - ctx context.Context - md pmetric.Metrics - } - tests := []struct { - name string - fields fields - args args - wantErr bool - want pmetric.Metrics - }{ - { - name: "insert", - fields: fields{ - nextConsumer: &consumertest.MetricsSink{}, - attrs: map[string]string{ - "key1": "value1", - "key2": "value2", - }, - }, - args: args{ - ctx: context.Background(), - md: func() pmetric.Metrics { - md := pmetric.NewMetrics() - md.ResourceMetrics().AppendEmpty() - return md - }(), - }, - want: func() pmetric.Metrics { - md := pmetric.NewMetrics() - attr := md.ResourceMetrics().AppendEmpty().Resource().Attributes() - attr.PutStr("key1", "value1") - attr.PutStr("key2", "value2") - return md - }(), - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &resourceEnhancer{ - nextConsumer: tt.fields.nextConsumer, - attrs: tt.fields.attrs, - } - if err := r.ConsumeMetrics(tt.args.ctx, tt.args.md); (err != nil) != tt.wantErr { - t.Errorf("ConsumeMetrics() error = %v, wantErr %v", err, tt.wantErr) - } - - metrics := tt.fields.nextConsumer.AllMetrics() - - require.Len(t, metrics, 1) - require.NoError(t, pmetrictest.CompareMetrics(tt.want, metrics[0])) - }) - } -} diff --git a/receiver/receivercreator/rules_test.go b/receiver/receivercreator/rules_test.go index e2fcbbb4aeb5..af134b3874af 100644 --- a/receiver/receivercreator/rules_test.go +++ b/receiver/receivercreator/rules_test.go @@ -74,7 +74,7 @@ func Test_newRule(t *testing.T) { wantErr bool }{ {"empty rule", args{""}, true}, - {"does not start with type", args{"port == 1234"}, true}, + {"does not startMetrics with type", args{"port == 1234"}, true}, {"invalid syntax", args{"port =="}, true}, {"valid port", args{`type == "port" && port_name == "http"`}, false}, {"valid pod", args{`type=="pod" && port_name == "http"`}, false}, diff --git a/receiver/receivercreator/runner.go b/receiver/receivercreator/runner.go index d8acb7c44fbf..e2502e135d1b 100644 --- a/receiver/receivercreator/runner.go +++ b/receiver/receivercreator/runner.go @@ -16,38 +16,54 @@ package receivercreator // import "github.com/open-telemetry/opentelemetry-colle import ( "context" + "errors" "fmt" + "sync" "github.com/spf13/cast" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/consumer" rcvr "go.opentelemetry.io/collector/receiver" + "go.uber.org/multierr" "go.uber.org/zap" ) // runner starts and stops receiver instances. type runner interface { - // start a receiver instance from its static config and discovered config. - start(receiver receiverConfig, discoveredConfig userConfigMap, nextConsumer consumer.Metrics) (component.Component, error) + // start a metrics receiver instance from its static config and discovered config. + start(receiver receiverConfig, discoveredConfig userConfigMap, consumer *enhancingConsumer) (component.Component, error) // shutdown a receiver. shutdown(rcvr component.Component) error } // receiverRunner handles starting/stopping of a concrete subreceiver instance. type receiverRunner struct { + logger *zap.Logger params rcvr.CreateSettings idNamespace component.ID host component.Host + receivers map[string]*wrappedReceiver + lock *sync.Mutex +} + +func newReceiverRunner(params rcvr.CreateSettings, host component.Host) *receiverRunner { + return &receiverRunner{ + logger: params.Logger, + params: params, + idNamespace: params.ID, + host: &loggingHost{host, params.Logger}, + receivers: map[string]*wrappedReceiver{}, + lock: &sync.Mutex{}, + } } var _ runner = (*receiverRunner)(nil) -// start a receiver instance from its static config and discovered config. func (run *receiverRunner) start( receiver receiverConfig, discoveredConfig userConfigMap, - nextConsumer consumer.Metrics, + consumer *enhancingConsumer, ) (component.Component, error) { factory := run.host.GetFactory(component.KindReceiver, receiver.id.Type()) @@ -65,16 +81,48 @@ func (run *receiverRunner) start( // Sets dynamically created receiver to something like receiver_creator/1/redis{endpoint="localhost:6380"}/. id := component.NewIDWithName(factory.Type(), fmt.Sprintf("%s/%s{endpoint=%q}/%s", receiver.id.Name(), run.idNamespace, targetEndpoint, receiver.endpointID)) - recvr, err := run.createRuntimeReceiver(receiverFactory, id, cfg, nextConsumer) - if err != nil { - return nil, err + wr := &wrappedReceiver{} + var createError error + if consumer.logs != nil { + if wr.logs, err = run.createLogsRuntimeReceiver(receiverFactory, id, cfg, consumer); err != nil { + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + run.logger.Info("instantiated receiver doesn't support logs", zap.String("receiver", receiver.id.String()), zap.Error(err)) + wr.logs = nil + } else { + createError = multierr.Combine(createError, err) + } + } + } + if consumer.metrics != nil { + if wr.metrics, err = run.createMetricsRuntimeReceiver(receiverFactory, id, cfg, consumer); err != nil { + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + run.logger.Info("instantiated receiver doesn't support metrics", zap.String("receiver", receiver.id.String()), zap.Error(err)) + wr.metrics = nil + } else { + createError = multierr.Combine(createError, err) + } + } + } + if consumer.traces != nil { + if wr.traces, err = run.createTracesRuntimeReceiver(receiverFactory, id, cfg, consumer); err != nil { + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + run.logger.Info("instantiated receiver doesn't support traces", zap.String("receiver", receiver.id.String()), zap.Error(err)) + wr.traces = nil + } else { + createError = multierr.Combine(createError, err) + } + } } - if err = recvr.Start(context.Background(), run.host); err != nil { - return nil, err + if createError != nil { + return nil, fmt.Errorf("failed creating endpoint-derived receiver: %w", createError) } - return recvr, nil + if err = wr.Start(context.Background(), run.host); err != nil { + return nil, fmt.Errorf("failed starting endpoint-derived receiver: %w", createError) + } + + return wr, nil } // shutdown the given receiver. @@ -133,8 +181,21 @@ func mergeTemplatedAndDiscoveredConfigs(factory rcvr.Factory, templated, discove return templatedConfig, targetEndpoint, nil } -// createRuntimeReceiver creates a receiver that is discovered at runtime. -func (run *receiverRunner) createRuntimeReceiver( +// createLogsRuntimeReceiver creates a receiver that is discovered at runtime. +func (run *receiverRunner) createLogsRuntimeReceiver( + factory rcvr.Factory, + id component.ID, + cfg component.Config, + nextConsumer consumer.Logs, +) (rcvr.Logs, error) { + runParams := run.params + runParams.Logger = runParams.Logger.With(zap.String("name", id.String())) + runParams.ID = id + return factory.CreateLogsReceiver(context.Background(), runParams, cfg, nextConsumer) +} + +// createMetricsRuntimeReceiver creates a receiver that is discovered at runtime. +func (run *receiverRunner) createMetricsRuntimeReceiver( factory rcvr.Factory, id component.ID, cfg component.Config, @@ -145,3 +206,48 @@ func (run *receiverRunner) createRuntimeReceiver( runParams.ID = id return factory.CreateMetricsReceiver(context.Background(), runParams, cfg, nextConsumer) } + +// createTracesRuntimeReceiver creates a receiver that is discovered at runtime. +func (run *receiverRunner) createTracesRuntimeReceiver( + factory rcvr.Factory, + id component.ID, + cfg component.Config, + nextConsumer consumer.Traces, +) (rcvr.Traces, error) { + runParams := run.params + runParams.Logger = runParams.Logger.With(zap.String("name", id.String())) + runParams.ID = id + return factory.CreateTracesReceiver(context.Background(), runParams, cfg, nextConsumer) +} + +var _ component.Component = (*wrappedReceiver)(nil) + +type wrappedReceiver struct { + logs rcvr.Logs + metrics rcvr.Metrics + traces rcvr.Traces +} + +func (w *wrappedReceiver) Start(ctx context.Context, host component.Host) error { + var err error + for _, r := range []component.Component{w.logs, w.metrics, w.traces} { + if r != nil { + if e := r.Start(ctx, host); e != nil { + err = multierr.Combine(err, e) + } + } + } + return err +} + +func (w *wrappedReceiver) Shutdown(ctx context.Context) error { + var err error + for _, r := range []component.Component{w.logs, w.metrics, w.traces} { + if r != nil { + if e := r.Shutdown(ctx); e != nil { + err = multierr.Combine(err, e) + } + } + } + return err +} diff --git a/receiver/receivercreator/runner_test.go b/receiver/receivercreator/runner_test.go index e897e8ed41d4..c945a97855e6 100644 --- a/receiver/receivercreator/runner_test.go +++ b/receiver/receivercreator/runner_test.go @@ -28,7 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator/internal/metadata" ) -func Test_loadAndCreateRuntimeReceiver(t *testing.T) { +func Test_loadAndCreateMetricsRuntimeReceiver(t *testing.T) { logCore, logs := observer.New(zap.DebugLevel) logger := zap.New(logCore).With(zap.String("name", "receiver_creator")) rcs := receivertest.NewNopCreateSettings() @@ -52,7 +52,7 @@ func Test_loadAndCreateRuntimeReceiver(t *testing.T) { // Test that metric receiver can be created from loaded config and it logs its id for the "name" field. t.Run("test create receiver from loaded config", func(t *testing.T) { - recvr, err := run.createRuntimeReceiver( + recvr, err := run.createMetricsRuntimeReceiver( exampleFactory, component.NewIDWithName("nop", "1/receiver_creator/1{endpoint=\"localhost:12345\"}/endpoint.id"), loadedConfig,