Skip to content

Commit

Permalink
[receivercreator] add support for logs and traces (#19641)
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme authored May 17, 2023
1 parent 721ca5e commit 1b921c8
Show file tree
Hide file tree
Showing 19 changed files with 1,128 additions and 428 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-log-and-traces-support-to-receiver-creator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receivercreator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add logs and traces support to receivercreator

# One or more tracking issues related to the change
issues: [19205, 19206]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
13 changes: 9 additions & 4 deletions receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: metrics |
| Stability | [alpha]: logs, traces |
| | [beta]: metrics |
| Distributions | [contrib], [splunk] |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[splunk]: https://github.com/signalfx/splunk-otel-collector
Expand All @@ -19,6 +21,10 @@ endpoints that you may be interested in. The configured rules will be
evaluated for each endpoint discovered. If the rule evaluates to true then
the receiver for that rule will be started against the matched endpoint.

If you use the receiver creator in multiple pipelines of differing telemetry types,
but a given dynamically instantiated receiver doesn't support one of the pipeline's type,
it will effectively lead to a logged no-op that won't cause a collector service failure.

## Configuration

**watch_observers**
Expand Down Expand Up @@ -80,7 +86,7 @@ resource_attributes:
<attribute>: <attribute value>
```
This setting controls what resource attributes are set on metrics emitted from the created receiver. These attributes can be set from [values in the endpoint](#rule-expressions) that was matched by the `rule`. These attributes vary based on the endpoint type. These defaults can be disabled by setting the attribute to be removed to an empty value. Note that the values can be dynamic and processed the same as in `config`.
This setting controls what resource attributes are set on telemetry emitted from the created receiver. These attributes can be set from [values in the endpoint](#rule-expressions) that was matched by the `rule`. These attributes vary based on the endpoint type. These defaults can be disabled by setting the attribute to be removed to an empty value. Note that the values can be dynamic and processed the same as in `config`.

Note that the backticks below are not typos--they indicate the value is set dynamically.

Expand Down Expand Up @@ -134,7 +140,7 @@ Similar to the per-endpoint type `resource_attributes` described above but for i

## Rule Expressions

Each rule must start with `type == ("pod"|"port"|"hostport"|"container") &&` such that the rule matches
Each rule must start with `type == ("pod"|"port"|"hostport"|"container"|"k8s.node") &&` such that the rule matches
only one endpoint type. Depending on the type of endpoint the rule is
targeting it will have different variables available.

Expand Down Expand Up @@ -297,4 +303,3 @@ service:
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
56 changes: 56 additions & 0 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, ""),
expected: createDefaultConfig(),
},
{
id: component.NewIDWithName("receiver_creator", ""),
expected: createDefaultConfig(),
},
{
id: component.NewIDWithName(metadata.Type, "1"),
expected: &Config{
Expand Down Expand Up @@ -176,7 +180,9 @@ type nopWithEndpointFactory struct {

type nopWithEndpointReceiver struct {
mockComponent
consumer.Logs
consumer.Metrics
consumer.Traces
rcvr.CreateSettings
cfg component.Config
}
Expand All @@ -192,6 +198,18 @@ type mockComponent struct {
component.ShutdownFunc
}

func (*nopWithEndpointFactory) CreateLogsReceiver(
_ context.Context,
rcs rcvr.CreateSettings,
cfg component.Config,
nextConsumer consumer.Logs) (rcvr.Logs, error) {
return &nopWithEndpointReceiver{
Logs: nextConsumer,
CreateSettings: rcs,
cfg: cfg,
}, nil
}

func (*nopWithEndpointFactory) CreateMetricsReceiver(
_ context.Context,
rcs rcvr.CreateSettings,
Expand All @@ -204,6 +222,18 @@ func (*nopWithEndpointFactory) CreateMetricsReceiver(
}, nil
}

func (*nopWithEndpointFactory) CreateTracesReceiver(
_ context.Context,
rcs rcvr.CreateSettings,
cfg component.Config,
nextConsumer consumer.Traces) (rcvr.Traces, error) {
return &nopWithEndpointReceiver{
Traces: nextConsumer,
CreateSettings: rcs,
cfg: cfg,
}, nil
}

type nopWithoutEndpointConfig struct {
NotEndpoint string `mapstructure:"not_endpoint"`
IntField int `mapstructure:"int_field"`
Expand All @@ -215,7 +245,9 @@ type nopWithoutEndpointFactory struct {

type nopWithoutEndpointReceiver struct {
mockComponent
consumer.Logs
consumer.Metrics
consumer.Traces
rcvr.CreateSettings
cfg component.Config
}
Expand All @@ -226,6 +258,18 @@ func (*nopWithoutEndpointFactory) CreateDefaultConfig() component.Config {
}
}

func (*nopWithoutEndpointFactory) CreateLogsReceiver(
_ context.Context,
rcs rcvr.CreateSettings,
cfg component.Config,
nextConsumer consumer.Logs) (rcvr.Logs, error) {
return &nopWithoutEndpointReceiver{
Logs: nextConsumer,
CreateSettings: rcs,
cfg: cfg,
}, nil
}

func (*nopWithoutEndpointFactory) CreateMetricsReceiver(
_ context.Context,
rcs rcvr.CreateSettings,
Expand All @@ -237,3 +281,15 @@ func (*nopWithoutEndpointFactory) CreateMetricsReceiver(
cfg: cfg,
}, nil
}

func (*nopWithoutEndpointFactory) CreateTracesReceiver(
_ context.Context,
rcs rcvr.CreateSettings,
cfg component.Config,
nextConsumer consumer.Traces) (rcvr.Traces, error) {
return &nopWithoutEndpointReceiver{
Traces: nextConsumer,
CreateSettings: rcs,
cfg: cfg,
}, nil
}
134 changes: 134 additions & 0 deletions receiver/receivercreator/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

var _ consumer.Logs = (*enhancingConsumer)(nil)
var _ consumer.Metrics = (*enhancingConsumer)(nil)
var _ consumer.Traces = (*enhancingConsumer)(nil)

// enhancingConsumer adds additional resource attributes from the given endpoint environment before passing the
// telemetry to its next consumers. The added attributes vary based on the type of the endpoint.
type enhancingConsumer struct {
logs consumer.Logs
metrics consumer.Metrics
traces consumer.Traces
attrs map[string]string
}

func newEnhancingConsumer(
resources resourceAttributes,
receiverAttributes map[string]string,
env observer.EndpointEnv,
endpoint observer.Endpoint,
nextLogs consumer.Logs,
nextMetrics consumer.Metrics,
nextTraces consumer.Traces,
) (*enhancingConsumer, error) {
attrs := map[string]string{}

for _, resource := range []map[string]string{resources[endpoint.Details.Type()], receiverAttributes} {
// Precompute values that will be inserted for each resource object passed through.
for attr, expr := range resource {
// If the attribute value is empty this signals to delete existing
if expr == "" {
delete(attrs, attr)
continue
}

res, err := evalBackticksInConfigValue(expr, env)
if err != nil {
return nil, fmt.Errorf("failed processing resource attribute %q for endpoint %v: %w", attr, endpoint.ID, err)
}

val := fmt.Sprint(res)
if val != "" {
attrs[attr] = val
}
}
}

ec := &enhancingConsumer{attrs: attrs}
if nextLogs != nil {
ec.logs = nextLogs
}
if nextMetrics != nil {
ec.metrics = nextMetrics
}
if nextTraces != nil {
ec.traces = nextTraces
}
return ec, nil
}

func (*enhancingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

func (ec *enhancingConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if ec.logs == nil {
return fmt.Errorf("no log consumer available")
}
rl := ld.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
ec.putAttrs(rl.At(i).Resource().Attributes())
}

return ec.logs.ConsumeLogs(ctx, ld)
}

func (ec *enhancingConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if ec.metrics == nil {
return fmt.Errorf("no metric consumer available")
}
rm := md.ResourceMetrics()
for i := 0; i < rm.Len(); i++ {
ec.putAttrs(rm.At(i).Resource().Attributes())
}

return ec.metrics.ConsumeMetrics(ctx, md)
}

func (ec *enhancingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if ec.traces == nil {
return fmt.Errorf("no trace consumer available")
}
rs := td.ResourceSpans()
for i := 0; i < rs.Len(); i++ {
ec.putAttrs(rs.At(i).Resource().Attributes())
}

return ec.traces.ConsumeTraces(ctx, td)
}

func (ec *enhancingConsumer) putAttrs(attrs pcommon.Map) {
for attr, val := range ec.attrs {
if _, found := attrs.Get(attr); !found {
attrs.PutStr(attr, val)
}
}
}
Loading

0 comments on commit 1b921c8

Please sign in to comment.