diff --git a/processor/resourceprocessor/README.md b/processor/resourceprocessor/README.md index 053b4006de2..5d16479c81f 100644 --- a/processor/resourceprocessor/README.md +++ b/processor/resourceprocessor/README.md @@ -6,9 +6,7 @@ The resource processor can be used to override a resource. Please refer to [config.go](./config.go) for the config spec. The following configuration options are required: -- `type`: Resource type to be applied. If specified, this value overrides the -original resource type. Otherwise, the original resource type is kept. -- `labels`: Map of key/value pairs that should be added to the resource. +- `attributes`: Map of key/value pairs that should be added to the resource. Examples: @@ -16,7 +14,7 @@ Examples: processors: resource: type: "host" - labels: { + attributes: { "cloud.zone": "zone-1", "k8s.cluster.name": "k8s-cluster", "host.name": "k8s-node", diff --git a/processor/resourceprocessor/config.go b/processor/resourceprocessor/config.go index 51b797a0c35..2570095c22e 100644 --- a/processor/resourceprocessor/config.go +++ b/processor/resourceprocessor/config.go @@ -21,9 +21,18 @@ import ( // Config defines configuration for Resource processor. type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` - // ResourceType overrides the original resource type. + Attributes AttributesActions `mapstructure:"attributes"` + + // ResourceType field is deprecated. Set "opencensus.type" key in "attributes.upsert" map instead. ResourceType string `mapstructure:"type"` - // Labels specify static labels to be added to resource. - // In case of a conflict the label will be overridden. + + // Labels field is deprecated. Use "attributes.upsert" instead. Labels map[string]string `mapstructure:"labels"` } + +// AttributesActions defines actions that can be applied to resource attributes. +type AttributesActions struct { + // Upsert attribute action defines static key/values pairs to be added to resource attributes. + // In case of a conflict the attribute will be overridden. + Upsert map[string]string `mapstructure:"upsert"` +} diff --git a/processor/resourceprocessor/config_test.go b/processor/resourceprocessor/config_test.go index 4a7c33e42e7..1dac125c93e 100644 --- a/processor/resourceprocessor/config_test.go +++ b/processor/resourceprocessor/config_test.go @@ -44,11 +44,12 @@ func TestLoadConfig(t *testing.T) { TypeVal: "resource", NameVal: "resource/2", }, - ResourceType: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", + Attributes: AttributesActions{ + Upsert: map[string]string{ + "cloud.zone": "zone-1", + "k8s.cluster.name": "k8s-cluster", + "host.name": "k8s-node", + }, }, }) } diff --git a/processor/resourceprocessor/factory.go b/processor/resourceprocessor/factory.go index f4275b692de..308e330716e 100644 --- a/processor/resourceprocessor/factory.go +++ b/processor/resourceprocessor/factory.go @@ -15,11 +15,13 @@ package resourceprocessor import ( - "go.uber.org/zap" + "context" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/translator/conventions" + "go.uber.org/zap" ) const ( @@ -32,30 +34,63 @@ type Factory struct { } // Type gets the type of the Option config created by this factory. -func (Factory) Type() configmodels.Type { +func (*Factory) Type() configmodels.Type { return typeStr } // CreateDefaultConfig creates the default configuration for processor. -func (Factory) CreateDefaultConfig() configmodels.Processor { +func (*Factory) CreateDefaultConfig() configmodels.Processor { return &Config{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: typeStr, NameVal: typeStr, }, - ResourceType: "", - Labels: map[string]string{}, + Attributes: AttributesActions{ + Upsert: map[string]string{}, + }, } } // CreateTraceProcessor creates a trace processor based on this config. -func (Factory) CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumerOld, cfg configmodels.Processor) (component.TraceProcessorOld, error) { +func (*Factory) CreateTraceProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + nextConsumer consumer.TraceConsumer, + cfg configmodels.Processor, +) (component.TraceProcessor, error) { oCfg := cfg.(*Config) + handleDeprecatedFields(oCfg, params.Logger) return newResourceTraceProcessor(nextConsumer, oCfg), nil } // CreateMetricsProcessor creates a metrics processor based on this config. -func (Factory) CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg configmodels.Processor) (component.MetricsProcessorOld, error) { +func (*Factory) CreateMetricsProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + nextConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (component.MetricsProcessor, error) { oCfg := cfg.(*Config) + handleDeprecatedFields(oCfg, params.Logger) return newResourceMetricProcessor(nextConsumer, oCfg), nil } + +// handleDeprecatedFields converts deprecated ResourceType and Labels fields into Attributes.Upsert +func handleDeprecatedFields(cfg *Config, logger *zap.Logger) { + if cfg.ResourceType != "" { + logger.Warn("[DEPRECATED] \"type\" field is deprecated and will be removed in future release. " + + "Please set the value to \"opencensus.type\" key in \"attributes.upsert\" map instead.") + cfg.Attributes.Upsert[conventions.OCAttributeResourceType] = cfg.ResourceType + } + if len(cfg.Labels) > 0 { + logger.Warn("[DEPRECATED] \"labels\" field is deprecated and will be removed in future release. " + + "Please use \"attributes.upsert\" field instead.") + for k, v := range cfg.Labels { + if _, ok := cfg.Attributes.Upsert[k]; ok { + logger.Warn("Label \"" + k + "\" ignored due to a conflict with a key in attributes.upsert") + continue + } + cfg.Attributes.Upsert[k] = v + } + } +} diff --git a/processor/resourceprocessor/factory_test.go b/processor/resourceprocessor/factory_test.go index d038b861093..092723b2fb0 100644 --- a/processor/resourceprocessor/factory_test.go +++ b/processor/resourceprocessor/factory_test.go @@ -15,12 +15,16 @@ package resourceprocessor import ( + "context" "testing" "github.com/stretchr/testify/assert" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/translator/conventions" ) func TestCreateDefaultConfig(t *testing.T) { @@ -34,11 +38,54 @@ func TestCreateProcessor(t *testing.T) { var factory Factory cfg := factory.CreateDefaultConfig() - tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg) + tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) assert.NoError(t, err) assert.NotNil(t, tp) - mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg) + mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) assert.NoError(t, err) assert.NotNil(t, mp) } + +func TestDeprecatedConfig(t *testing.T) { + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + ResourceType: "host", + Labels: map[string]string{ + "cloud.zone": "zone-1", + "k8s.cluster.name": "k8s-cluster", + "ignored": "new-value", + }, + Attributes: AttributesActions{ + Upsert: map[string]string{ + "ignored": "old-value", + }, + }, + } + + handleDeprecatedFields(cfg, zap.NewNop()) + + assert.EqualValues(t, &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + ResourceType: "host", + Labels: map[string]string{ + "cloud.zone": "zone-1", + "k8s.cluster.name": "k8s-cluster", + "ignored": "new-value", + }, + Attributes: AttributesActions{ + Upsert: map[string]string{ + "cloud.zone": "zone-1", + "k8s.cluster.name": "k8s-cluster", + conventions.OCAttributeResourceType: "host", + "ignored": "old-value", + }, + }, + }, cfg) +} diff --git a/processor/resourceprocessor/resource_processor.go b/processor/resourceprocessor/resource_processor.go index 8faecb9d744..dea74771cfc 100644 --- a/processor/resourceprocessor/resource_processor.go +++ b/processor/resourceprocessor/resource_processor.go @@ -17,36 +17,41 @@ package resourceprocessor import ( "context" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" ) type resourceTraceProcessor struct { - resource *resourcepb.Resource - capabilities component.ProcessorCapabilities - next consumer.TraceConsumerOld + attributesActions AttributesActions + capabilities component.ProcessorCapabilities + next consumer.TraceConsumer } -func newResourceTraceProcessor(next consumer.TraceConsumerOld, cfg *Config) *resourceTraceProcessor { - resource := createResource(cfg) +func newResourceTraceProcessor(next consumer.TraceConsumer, cfg *Config) *resourceTraceProcessor { return &resourceTraceProcessor{ - next: next, - capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)}, - resource: resource, + attributesActions: cfg.Attributes, + capabilities: component.ProcessorCapabilities{MutatesConsumedData: mutatesData(cfg.Attributes)}, + next: next, } } // ConsumeTraceData implements the TraceProcessor interface -func (rtp *resourceTraceProcessor) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - return rtp.next.ConsumeTraceData(ctx, consumerdata.TraceData{ - Node: td.Node, - Resource: mergeResource(td.Resource, rtp.resource), - Spans: td.Spans, - SourceFormat: td.SourceFormat, - }) +func (rtp *resourceTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + if !mutatesData(rtp.attributesActions) { + return rtp.next.ConsumeTraces(ctx, td) + } + + rss := td.ResourceSpans() + for i := 0; i < rss.Len(); i++ { + resource := rss.At(i).Resource() + if resource.IsNil() { + resource.InitEmpty() + } + applyAttributesActions(resource, rtp.attributesActions) + } + return rtp.next.ConsumeTraces(ctx, td) } // GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor. @@ -65,17 +70,16 @@ func (*resourceTraceProcessor) Shutdown(context.Context) error { } type resourceMetricProcessor struct { - resource *resourcepb.Resource - capabilities component.ProcessorCapabilities - next consumer.MetricsConsumerOld + attributesActions AttributesActions + capabilities component.ProcessorCapabilities + next consumer.MetricsConsumer } -func newResourceMetricProcessor(next consumer.MetricsConsumerOld, cfg *Config) *resourceMetricProcessor { - resource := createResource(cfg) +func newResourceMetricProcessor(next consumer.MetricsConsumer, cfg *Config) *resourceMetricProcessor { return &resourceMetricProcessor{ - resource: resource, - capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)}, - next: next, + attributesActions: cfg.Attributes, + capabilities: component.ProcessorCapabilities{MutatesConsumedData: mutatesData(cfg.Attributes)}, + next: next, } } @@ -95,52 +99,33 @@ func (*resourceMetricProcessor) Shutdown(context.Context) error { } // ConsumeMetricsData implements the MetricsProcessor interface -func (rmp *resourceMetricProcessor) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - return rmp.next.ConsumeMetricsData(ctx, consumerdata.MetricsData{ - Node: md.Node, - Resource: mergeResource(md.Resource, rmp.resource), - Metrics: md.Metrics, - }) -} - -func createResource(cfg *Config) *resourcepb.Resource { - rpb := &resourcepb.Resource{ - Type: cfg.ResourceType, - Labels: map[string]string{}, +func (rmp *resourceMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + if !mutatesData(rmp.attributesActions) { + return rmp.next.ConsumeMetrics(ctx, md) } - for k, v := range cfg.Labels { - rpb.Labels[k] = v - } - return rpb -} -func mergeResource(to, from *resourcepb.Resource) *resourcepb.Resource { - if isEmptyResource(from) { - return to - } - if to == nil { - if from.Type == "" { - // Since resource without type would be invalid, we keep resource as nil - return nil + imd := pdatautil.MetricsToInternalMetrics(md) + rms := imd.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + resource := rms.At(i).Resource() + if resource.IsNil() { + resource.InitEmpty() } - to = &resourcepb.Resource{Labels: map[string]string{}} - } - if from.Type != "" { - // Only change resource type if it was configured - to.Type = from.Type + applyAttributesActions(resource, rmp.attributesActions) } - if from.Labels != nil { - if to.Labels == nil { - to.Labels = make(map[string]string, len(from.Labels)) - } + return rmp.next.ConsumeMetrics(ctx, md) +} - for k, v := range from.Labels { - to.Labels[k] = v - } - } - return to +func mutatesData(actions AttributesActions) bool { + return len(actions.Upsert) > 0 +} + +func applyAttributesActions(resource pdata.Resource, attributesActions AttributesActions) { + upsertAttributes(resource, attributesActions.Upsert) } -func isEmptyResource(resource *resourcepb.Resource) bool { - return resource.Type == "" && len(resource.Labels) == 0 +func upsertAttributes(resource pdata.Resource, attrs map[string]string) { + for k, v := range attrs { + resource.Attributes().UpsertString(k, v) + } } diff --git a/processor/resourceprocessor/resource_processor_test.go b/processor/resourceprocessor/resource_processor_test.go index 0dbf8ffb8c6..b7cdffd905e 100644 --- a/processor/resourceprocessor/resource_processor_test.go +++ b/processor/resourceprocessor/resource_processor_test.go @@ -18,269 +18,198 @@ import ( "context" "testing" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/data/testdata" ) var ( - cfg = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - } - - emptyCfg = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "", - Labels: map[string]string{}, - } - - cfgWithEmptyResourceType = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - } - - resource = &resourcepb.Resource{ - Type: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, + processorSettings = configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", } - resource2 = &resourcepb.Resource{ - Type: "ht", - Labels: map[string]string{ - "zone": "zone-2", - "cluster": "cluster-2", - "host": "node-2", + cfg = &Config{ + ProcessorSettings: processorSettings, + Attributes: AttributesActions{ + Upsert: map[string]string{ + "cloud.zone": "zone-1", + "k8s.cluster.name": "k8s-cluster", + "host.name": "k8s-node", + }, }, } - mergedResource = &resourcepb.Resource{ - Type: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - "zone": "zone-2", - "cluster": "cluster-2", - "host": "node-2", + emptyCfg = &Config{ + ProcessorSettings: processorSettings, + Attributes: AttributesActions{ + Upsert: map[string]string{}, }, } ) -func TestResourceProcessor(t *testing.T) { +func TestResourceProcessorAttributesUpsert(t *testing.T) { tests := []struct { name string config *Config mutatesConsumedData bool - sourceResource *resourcepb.Resource - wantResource *resourcepb.Resource + sourceAttributes map[string]string + wantAttributes map[string]string }{ { - name: "Config with empty resource type doesn't mutate resource type", - config: cfgWithEmptyResourceType, + name: "config_with_no_attributes_doesnt_affect_nil_resource", + config: emptyCfg, + mutatesConsumedData: false, + sourceAttributes: nil, + wantAttributes: nil, + }, + { + name: "config_with_attributes_applied_on_nil_resource", + config: cfg, mutatesConsumedData: true, - sourceResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "original-label": "original-value", - "cloud.zone": "will-be-overridden", - }, - }, - wantResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "original-label": "original-value", - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - }, + sourceAttributes: nil, + wantAttributes: cfg.Attributes.Upsert, }, { - name: "Config with empty resource type keeps nil resource", - config: cfgWithEmptyResourceType, + name: "config_with_no_attributes_doesnt_affect_empty_resource", + config: emptyCfg, + mutatesConsumedData: false, + sourceAttributes: map[string]string{}, + wantAttributes: map[string]string{}, + }, + { + name: "config_with_attributes_applied_on_empty_resource", + config: cfg, mutatesConsumedData: true, - sourceResource: nil, - wantResource: nil, + sourceAttributes: map[string]string{}, + wantAttributes: cfg.Attributes.Upsert, }, { - name: "Consumed resource with nil labels", - config: cfgWithEmptyResourceType, + name: "config_attributes_added_to_existing_resource_attributes", + config: &Config{ + ProcessorSettings: processorSettings, + Attributes: AttributesActions{ + Upsert: map[string]string{ + "cloud.zone": "zone-1", + "k8s.cluster.name": "k8s-cluster", + }, + }, + }, mutatesConsumedData: true, - sourceResource: &resourcepb.Resource{ - Type: "original-type", + sourceAttributes: map[string]string{ + "host.name": "k8s-node", }, - wantResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", + wantAttributes: cfg.Attributes.Upsert, + }, + { + name: "config_attributes_override_existing_resource_attributes", + config: &Config{ + ProcessorSettings: processorSettings, + Attributes: AttributesActions{ + Upsert: map[string]string{ + "cloud.zone": "zone-1", + "k8s.cluster.name": "k8s-cluster", + }, }, }, + mutatesConsumedData: true, + sourceAttributes: map[string]string{ + "k8s.cluster.name": "wrong-cluster", + "host.name": "k8s-node", + }, + wantAttributes: cfg.Attributes.Upsert, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Test trace consuner + // Test trace consumer ttn := &testTraceConsumer{} rtp := newResourceTraceProcessor(ttn, tt.config) assert.Equal(t, tt.mutatesConsumedData, rtp.GetCapabilities().MutatesConsumedData) - err := rtp.ConsumeTraceData(context.Background(), consumerdata.TraceData{ - Resource: tt.sourceResource, - }) + sourceTraceData := generateTraceData(tt.sourceAttributes) + wantTraceData := generateTraceData(tt.wantAttributes) + err := rtp.ConsumeTraces(context.Background(), sourceTraceData) require.NoError(t, err) - assert.Equal(t, tt.wantResource, ttn.td.Resource) + assert.EqualValues(t, wantTraceData, ttn.td) // Test metrics consumer tmn := &testMetricsConsumer{} rmp := newResourceMetricProcessor(tmn, tt.config) assert.Equal(t, tt.mutatesConsumedData, rmp.GetCapabilities().MutatesConsumedData) - err = rmp.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{ - Resource: tt.sourceResource, - }) + sourceMetricData := generateMetricData(tt.sourceAttributes) + wantMetricData := generateMetricData(tt.wantAttributes) + err = rmp.ConsumeMetrics(context.Background(), sourceMetricData) require.NoError(t, err) - assert.Equal(t, tt.wantResource, tmn.md.Resource) + assert.EqualValues(t, wantMetricData, tmn.md) }) } } -func TestTraceResourceProcessor(t *testing.T) { - want := consumerdata.TraceData{ - Resource: resource, +func generateTraceData(attributes map[string]string) pdata.Traces { + td := testdata.GenerateTraceDataOneSpanNoResource() + if attributes == nil { + return td } - test := consumerdata.TraceData{} - - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, cfg) - assert.True(t, rtp.GetCapabilities().MutatesConsumedData) - - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) -} - -func TestTraceResourceProcessorEmpty(t *testing.T) { - want := consumerdata.TraceData{ - Resource: resource2, - } - test := consumerdata.TraceData{ - Resource: resource2, - } - - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, emptyCfg) - assert.False(t, rtp.GetCapabilities().MutatesConsumedData) - - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) -} - -func TestTraceResourceProcessorNonEmptyIncomingResource(t *testing.T) { - want := consumerdata.TraceData{ - Resource: mergedResource, - } - test := consumerdata.TraceData{ - Resource: resource2, - } - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, cfg) - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) -} - -func TestMetricResourceProcessor(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: resource, - } - test := consumerdata.MetricsData{} - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, cfg) - assert.True(t, rmp.GetCapabilities().MutatesConsumedData) - - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMetricResourceProcessorEmpty(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: resource2, + resource := td.ResourceSpans().At(0).Resource() + resource.InitEmpty() + for k, v := range attributes { + resource.Attributes().InsertString(k, v) } - test := consumerdata.MetricsData{ - Resource: resource2, - } - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, emptyCfg) - assert.False(t, rmp.GetCapabilities().MutatesConsumedData) - - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) + resource.Attributes().Sort() + return td } -func TestMetricResourceProcessorNonEmptyIncomingResource(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: mergedResource, +func generateMetricData(attributes map[string]string) pdata.Metrics { + md := testdata.GenerateMetricDataOneMetricNoResource() + if attributes == nil { + return pdatautil.MetricsFromInternalMetrics(md) } - test := consumerdata.MetricsData{ - Resource: resource2, + resource := md.ResourceMetrics().At(0).Resource() + resource.InitEmpty() + for k, v := range attributes { + resource.Attributes().InsertString(k, v) } - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, cfg) - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMergeResourceWithNilLabels(t *testing.T) { - resourceNilLabels := &resourcepb.Resource{Type: "host"} - assert.Nil(t, resourceNilLabels.Labels) - assert.Equal(t, mergeResource(nil, resourceNilLabels), &resourcepb.Resource{Type: "host", Labels: map[string]string{}}) + resource.Attributes().Sort() + return pdatautil.MetricsFromInternalMetrics(md) } type testTraceConsumer struct { - td consumerdata.TraceData + td pdata.Traces } -func (ttn *testTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { +func (ttn *testTraceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + // sort attributes to be able to compare traces + for i := 0; i < td.ResourceSpans().Len(); i++ { + sortResourceAttributes(td.ResourceSpans().At(i).Resource()) + } ttn.td = td return nil } type testMetricsConsumer struct { - md consumerdata.MetricsData + md pdata.Metrics } -func (tmn *testMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { +func (tmn *testMetricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + // sort attributes to be able to compare traces + imd := pdatautil.MetricsToInternalMetrics(md) + for i := 0; i < imd.ResourceMetrics().Len(); i++ { + sortResourceAttributes(imd.ResourceMetrics().At(i).Resource()) + } tmn.md = md return nil } + +func sortResourceAttributes(resource pdata.Resource) { + if resource.IsNil() { + return + } + resource.Attributes().Sort() +} diff --git a/processor/resourceprocessor/testdata/config.yaml b/processor/resourceprocessor/testdata/config.yaml index b770ca2be15..b8bbd3312f7 100644 --- a/processor/resourceprocessor/testdata/config.yaml +++ b/processor/resourceprocessor/testdata/config.yaml @@ -4,16 +4,15 @@ receivers: processors: # The following specifies an empty resource - it will have no effect on trace or metrics data. resource: - # The following specifies a non-trivial resource. Type "host" is used for Kubernetes node resources - # that expect the labels "cloud.zone", "k8s.cluster.name", "host.name" to be defined (although this - # is not enforced by the configuration logic). + # The following specifies a non-trivial resource. It sets resource attributes "cloud.zone", + # "k8s.cluster.name" and "host.name" for every metric or trace being processed. + # If any of the attributes already have some values, they will be overridden. resource/2: - type: "host" - labels: { - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - } + attributes: + upsert: + cloud.zone: "zone-1" + k8s.cluster.name: "k8s-cluster" + host.name: "k8s-node" exporters: exampleexporter: @@ -24,3 +23,7 @@ service: receivers: [examplereceiver] processors: [resource/2] exporters: [exampleexporter] + traces: + receivers: [examplereceiver] + processors: [resource/2] + exporters: [exampleexporter] diff --git a/testbed/tests/resource_processor_test.go b/testbed/tests/resource_processor_test.go index ad1958c8ac6..a00e556d5c9 100644 --- a/testbed/tests/resource_processor_test.go +++ b/testbed/tests/resource_processor_test.go @@ -67,38 +67,6 @@ const ( } ` - mockedConsumedResourceWithoutTypeJSON = ` - { - "resource": { - "attributes": [ - { - "key": "label-key", - "value": { "stringValue": "label-value" } - } - ] - }, - "instrumentation_library_metrics": [ - { - "metrics": [ - { - "metric_descriptor": { - "name": "metric-name", - "description": "metric-description", - "unit": "metric-unit", - "type": 1 - }, - "int64_data_points": [ - { - "value": 0 - } - ] - } - ] - } - ] - } -` - mockedConsumedResourceNilJSON = ` { "instrumentation_library_metrics": [ @@ -160,11 +128,11 @@ func getResourceProcessorTestCases(t *testing.T) []resourceProcessorTestCase { tests := []resourceProcessorTestCase{ { - name: "Override consumed resource labels and type", + name: "Override consumed resource attributes", resourceProcessorConfig: ` resource: - type: vm - labels: { + attributes: { + "opencensus.resourcetype": "vm", "additional-label-key": "additional-label-value", } `, @@ -189,53 +157,18 @@ func getResourceProcessorTestCases(t *testing.T) []resourceProcessorTestCase { }), }, { - name: "Return nil if consumed resource is nil and type is empty", + name: "Return nil if consumed resource is nil and there are no attributes in config", resourceProcessorConfig: ` resource: - labels: { - "additional-label-key": "additional-label-value", - } `, mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceNilJSON), isNilResource: true, }, { - name: "Return nil if consumed resource and resource in config is nil", - resourceProcessorConfig: ` - resource: -`, - mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceNilJSON), - isNilResource: true, - }, - { - name: "Return resource without type", - resourceProcessorConfig: ` - resource: - labels: { - "additional-label-key": "additional-label-value", - } -`, - mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceWithoutTypeJSON), - expectedMetricData: getMetricDataFromResourceMetrics(&otlpmetrics.ResourceMetrics{ - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: "label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "label-value"}}, - }, - { - Key: "additional-label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "additional-label-value"}}, - }, - }, - }, - }), - }, - { - name: "Consumed resource with nil labels", + name: "Consumed empty resource", resourceProcessorConfig: ` resource: - labels: { + attributes: { "additional-label-key": "additional-label-value", } `,