From 20c4538f3a77eef80558772fa7cdc54ff95f51ab Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 9 Jul 2020 18:28:16 -0700 Subject: [PATCH] Migrate Resource Processor to internal data model This commit migrates resource processor to internal data model. Existing processor configuration is relevant only to OpenCensus format, so the configuration schema has to be changed. Taking this opportunity, this commit adds existing logic for attributes manipulation from attributes processor to resource processor. New config uses "attributes" field which represents actions that can be made on resource attributes. Supported actions: INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT. In order to migrate existing resource processor config: 1. Move key/values from "labels" field to "attributes" with action="upsert". 2. Add value from "type" field to "attributes" with key="opencensus.resourcetype" and action="upsert". --- internal/processor/attraction/attraction.go | 7 +- processor/attributesprocessor/config.go | 2 +- processor/resourceprocessor/README.md | 23 +- processor/resourceprocessor/config.go | 12 +- processor/resourceprocessor/config_test.go | 10 +- processor/resourceprocessor/factory.go | 66 +++- processor/resourceprocessor/factory_test.go | 57 +++- .../resourceprocessor/resource_processor.go | 106 +++--- .../resource_processor_test.go | 318 +++++++----------- .../resourceprocessor/testdata/config.yaml | 27 +- testbed/tests/resource_processor_test.go | 104 ++---- 11 files changed, 351 insertions(+), 381 deletions(-) diff --git a/internal/processor/attraction/attraction.go b/internal/processor/attraction/attraction.go index b5d4197b256..01d341d938c 100644 --- a/internal/processor/attraction/attraction.go +++ b/internal/processor/attraction/attraction.go @@ -26,7 +26,7 @@ import ( // Settings type Settings struct { // Actions specifies the list of attributes to act on. - // The set of actions are {INSERT, UPDATE, UPSERT, DELETE}. + // The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}. // This is a required field. Actions []ActionKeyValue `mapstructure:"actions"` } @@ -245,6 +245,11 @@ func (ap *AttrProc) Process(attrs pdata.AttributeMap) { } } +// HasActions return false if there are no actions to be applied to attributes +func (ap *AttrProc) HasActions() bool { + return len(ap.actions) > 1 +} + func getSourceAttributeValue(action attributeAction, attrs pdata.AttributeMap) (pdata.AttributeValue, bool) { // Set the key with a value from the configuration. if action.AttributeValue != nil { diff --git a/processor/attributesprocessor/config.go b/processor/attributesprocessor/config.go index 37d38a5b30d..e097df3bade 100644 --- a/processor/attributesprocessor/config.go +++ b/processor/attributesprocessor/config.go @@ -33,7 +33,7 @@ type Config struct { filterspan.MatchConfig `mapstructure:",squash"` // Specifies the list of attributes to act on. - // The set of actions are {INSERT, UPDATE, UPSERT, DELETE}. + // The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}. // This is a required field. attraction.Settings `mapstructure:",squash"` } diff --git a/processor/resourceprocessor/README.md b/processor/resourceprocessor/README.md index 053b4006de2..0afe0acca21 100644 --- a/processor/resourceprocessor/README.md +++ b/processor/resourceprocessor/README.md @@ -2,25 +2,26 @@ Supported pipeline types: metrics, traces -The resource processor can be used to override a resource. +The resource processor can be used to apply changes on resource attributes. Please refer to [config.go](./config.go) for the config spec. -The following configuration options are required: -- `type`: Resource type to be applied. If specified, this value overrides the -original resource type. Otherwise, the original resource type is kept. -- `labels`: Map of key/value pairs that should be added to the resource. +`attributes` represents actions that can be applied on resource attributes. +See processor/attributesprocessor/README.md for more details on supported attributes actions. Examples: ```yaml processors: resource: - type: "host" - labels: { - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - } + attributes: + - key: cloud.zone + value: "zone-1" + action: upsert + - key: k8s.cluster.name + from_attribute: k8s-cluster + action: insert + - key: redundant-attribute + action: delete ``` Refer to [config.yaml](./testdata/config.yaml) for detailed diff --git a/processor/resourceprocessor/config.go b/processor/resourceprocessor/config.go index 51b797a0c35..4c5e020c16c 100644 --- a/processor/resourceprocessor/config.go +++ b/processor/resourceprocessor/config.go @@ -16,14 +16,20 @@ package resourceprocessor import ( "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/processor/attraction" ) // Config defines configuration for Resource processor. type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` - // ResourceType overrides the original resource type. + + // AttributesActions specifies the list of actions to be applied on resource attributes. + // The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}. + AttributesActions []attraction.ActionKeyValue `mapstructure:"attributes"` + + // ResourceType field is deprecated. Set "opencensus.type" key in "attributes.upsert" map instead. ResourceType string `mapstructure:"type"` - // Labels specify static labels to be added to resource. - // In case of a conflict the label will be overridden. + + // Labels field is deprecated. Use "attributes.upsert" instead. Labels map[string]string `mapstructure:"labels"` } diff --git a/processor/resourceprocessor/config_test.go b/processor/resourceprocessor/config_test.go index 4a7c33e42e7..8cf529aaedd 100644 --- a/processor/resourceprocessor/config_test.go +++ b/processor/resourceprocessor/config_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/processor/attraction" ) func TestLoadConfig(t *testing.T) { @@ -44,11 +45,10 @@ func TestLoadConfig(t *testing.T) { TypeVal: "resource", NameVal: "resource/2", }, - ResourceType: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", + AttributesActions: []attraction.ActionKeyValue{ + {Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT}, + {Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT}, + {Key: "redundant-attribute", Action: attraction.DELETE}, }, }) } diff --git a/processor/resourceprocessor/factory.go b/processor/resourceprocessor/factory.go index f4275b692de..08d81cb6ebc 100644 --- a/processor/resourceprocessor/factory.go +++ b/processor/resourceprocessor/factory.go @@ -15,11 +15,16 @@ package resourceprocessor import ( + "context" + "fmt" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/processor/attraction" + "go.opentelemetry.io/collector/translator/conventions" ) const ( @@ -32,30 +37,75 @@ type Factory struct { } // Type gets the type of the Option config created by this factory. -func (Factory) Type() configmodels.Type { +func (*Factory) Type() configmodels.Type { return typeStr } // CreateDefaultConfig creates the default configuration for processor. -func (Factory) CreateDefaultConfig() configmodels.Processor { +func (*Factory) CreateDefaultConfig() configmodels.Processor { return &Config{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: typeStr, NameVal: typeStr, }, - ResourceType: "", - Labels: map[string]string{}, + AttributesActions: []attraction.ActionKeyValue{}, } } // CreateTraceProcessor creates a trace processor based on this config. -func (Factory) CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumerOld, cfg configmodels.Processor) (component.TraceProcessorOld, error) { +func (*Factory) CreateTraceProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + nextConsumer consumer.TraceConsumer, + cfg configmodels.Processor, +) (component.TraceProcessor, error) { oCfg := cfg.(*Config) - return newResourceTraceProcessor(nextConsumer, oCfg), nil + handleDeprecatedFields(oCfg, params.Logger) + attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: oCfg.AttributesActions}) + if err != nil { + return nil, fmt.Errorf("error creating \"attributes\" processor: %w of processor %q", err, cfg.Name()) + } + return newResourceTraceProcessor(nextConsumer, attrProc), nil } // CreateMetricsProcessor creates a metrics processor based on this config. -func (Factory) CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg configmodels.Processor) (component.MetricsProcessorOld, error) { +func (*Factory) CreateMetricsProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + nextConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (component.MetricsProcessor, error) { oCfg := cfg.(*Config) - return newResourceMetricProcessor(nextConsumer, oCfg), nil + handleDeprecatedFields(oCfg, params.Logger) + attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: oCfg.AttributesActions}) + if err != nil { + return nil, fmt.Errorf("error creating \"attributes\" processor: %w of processor %q", err, cfg.Name()) + } + return newResourceMetricProcessor(nextConsumer, attrProc), nil +} + +// handleDeprecatedFields converts deprecated ResourceType and Labels fields into Attributes.Upsert +func handleDeprecatedFields(cfg *Config, logger *zap.Logger) { + + // Upsert value from deprecated ResourceType config to resource attributes with "opencensus.type" key + if cfg.ResourceType != "" { + logger.Warn("[DEPRECATED] \"type\" field is deprecated and will be removed in future release. " + + "Please set the value to \"attributes\" with key=opencensus.type and action=upsert.") + upsertResourceType := attraction.ActionKeyValue{ + Action: attraction.UPSERT, + Key: conventions.OCAttributeResourceType, + Value: cfg.ResourceType, + } + cfg.AttributesActions = append(cfg.AttributesActions, upsertResourceType) + } + + // Upsert values from deprecated Labels config to resource attributes + if len(cfg.Labels) > 0 { + logger.Warn("[DEPRECATED] \"labels\" field is deprecated and will be removed in future release. " + + "Please use \"attributes\" field instead.") + for k, v := range cfg.Labels { + action := attraction.ActionKeyValue{Action: attraction.UPSERT, Key: k, Value: v} + cfg.AttributesActions = append(cfg.AttributesActions, action) + } + } } diff --git a/processor/resourceprocessor/factory_test.go b/processor/resourceprocessor/factory_test.go index d038b861093..7cc3c1e6256 100644 --- a/processor/resourceprocessor/factory_test.go +++ b/processor/resourceprocessor/factory_test.go @@ -15,12 +15,16 @@ package resourceprocessor import ( + "context" "testing" "github.com/stretchr/testify/assert" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/internal/processor/attraction" ) func TestCreateDefaultConfig(t *testing.T) { @@ -34,11 +38,60 @@ func TestCreateProcessor(t *testing.T) { var factory Factory cfg := factory.CreateDefaultConfig() - tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg) + tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) assert.NoError(t, err) assert.NotNil(t, tp) - mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg) + mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) assert.NoError(t, err) assert.NotNil(t, mp) } + +func TestInvalidAttributeActions(t *testing.T) { + var factory Factory + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "k", Value: "v", Action: "invalid-action"}, + }, + } + + _, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + assert.Error(t, err) + + _, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + assert.Error(t, err) +} + +func TestDeprecatedConfig(t *testing.T) { + cfg := &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + ResourceType: "host", + Labels: map[string]string{ + "cloud.zone": "zone-1", + }, + } + + handleDeprecatedFields(cfg, zap.NewNop()) + + assert.EqualValues(t, &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + ResourceType: "host", + Labels: map[string]string{ + "cloud.zone": "zone-1", + }, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "opencensus.resourcetype", Value: "host", Action: attraction.UPSERT}, + {Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT}, + }, + }, cfg) +} diff --git a/processor/resourceprocessor/resource_processor.go b/processor/resourceprocessor/resource_processor.go index 8faecb9d744..c9f1e674d87 100644 --- a/processor/resourceprocessor/resource_processor.go +++ b/processor/resourceprocessor/resource_processor.go @@ -17,36 +17,43 @@ package resourceprocessor import ( "context" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/processor/attraction" ) type resourceTraceProcessor struct { - resource *resourcepb.Resource + attrProc *attraction.AttrProc capabilities component.ProcessorCapabilities - next consumer.TraceConsumerOld + next consumer.TraceConsumer } -func newResourceTraceProcessor(next consumer.TraceConsumerOld, cfg *Config) *resourceTraceProcessor { - resource := createResource(cfg) +func newResourceTraceProcessor(next consumer.TraceConsumer, attrProc *attraction.AttrProc) *resourceTraceProcessor { return &resourceTraceProcessor{ + attrProc: attrProc, + capabilities: component.ProcessorCapabilities{MutatesConsumedData: attrProc.HasActions()}, next: next, - capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)}, - resource: resource, } } // ConsumeTraceData implements the TraceProcessor interface -func (rtp *resourceTraceProcessor) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - return rtp.next.ConsumeTraceData(ctx, consumerdata.TraceData{ - Node: td.Node, - Resource: mergeResource(td.Resource, rtp.resource), - Spans: td.Spans, - SourceFormat: td.SourceFormat, - }) +func (rtp *resourceTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + if !rtp.attrProc.HasActions() { + return rtp.next.ConsumeTraces(ctx, td) + } + + rss := td.ResourceSpans() + for i := 0; i < rss.Len(); i++ { + resource := rss.At(i).Resource() + if resource.IsNil() { + resource.InitEmpty() + } + attrs := resource.Attributes() + rtp.attrProc.Process(attrs) + } + return rtp.next.ConsumeTraces(ctx, td) } // GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor. @@ -65,16 +72,15 @@ func (*resourceTraceProcessor) Shutdown(context.Context) error { } type resourceMetricProcessor struct { - resource *resourcepb.Resource + attrProc *attraction.AttrProc capabilities component.ProcessorCapabilities - next consumer.MetricsConsumerOld + next consumer.MetricsConsumer } -func newResourceMetricProcessor(next consumer.MetricsConsumerOld, cfg *Config) *resourceMetricProcessor { - resource := createResource(cfg) +func newResourceMetricProcessor(next consumer.MetricsConsumer, attrProc *attraction.AttrProc) *resourceMetricProcessor { return &resourceMetricProcessor{ - resource: resource, - capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)}, + attrProc: attrProc, + capabilities: component.ProcessorCapabilities{MutatesConsumedData: attrProc.HasActions()}, next: next, } } @@ -95,52 +101,22 @@ func (*resourceMetricProcessor) Shutdown(context.Context) error { } // ConsumeMetricsData implements the MetricsProcessor interface -func (rmp *resourceMetricProcessor) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - return rmp.next.ConsumeMetricsData(ctx, consumerdata.MetricsData{ - Node: md.Node, - Resource: mergeResource(md.Resource, rmp.resource), - Metrics: md.Metrics, - }) -} - -func createResource(cfg *Config) *resourcepb.Resource { - rpb := &resourcepb.Resource{ - Type: cfg.ResourceType, - Labels: map[string]string{}, - } - for k, v := range cfg.Labels { - rpb.Labels[k] = v +func (rmp *resourceMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + if !rmp.attrProc.HasActions() { + return rmp.next.ConsumeMetrics(ctx, md) } - return rpb -} -func mergeResource(to, from *resourcepb.Resource) *resourcepb.Resource { - if isEmptyResource(from) { - return to - } - if to == nil { - if from.Type == "" { - // Since resource without type would be invalid, we keep resource as nil - return nil - } - to = &resourcepb.Resource{Labels: map[string]string{}} - } - if from.Type != "" { - // Only change resource type if it was configured - to.Type = from.Type - } - if from.Labels != nil { - if to.Labels == nil { - to.Labels = make(map[string]string, len(from.Labels)) + imd := pdatautil.MetricsToInternalMetrics(md) + rms := imd.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + resource := rms.At(i).Resource() + if resource.IsNil() { + resource.InitEmpty() } - - for k, v := range from.Labels { - to.Labels[k] = v + if resource.Attributes().Len() == 0 { + resource.Attributes().InitEmptyWithCapacity(1) } + rmp.attrProc.Process(resource.Attributes()) } - return to -} - -func isEmptyResource(resource *resourcepb.Resource) bool { - return resource.Type == "" && len(resource.Labels) == 0 + return rmp.next.ConsumeMetrics(ctx, md) } diff --git a/processor/resourceprocessor/resource_processor_test.go b/processor/resourceprocessor/resource_processor_test.go index 0dbf8ffb8c6..9e4ff040c84 100644 --- a/processor/resourceprocessor/resource_processor_test.go +++ b/processor/resourceprocessor/resource_processor_test.go @@ -18,269 +18,199 @@ import ( "context" "testing" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/internal/processor/attraction" ) var ( - cfg = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - } - - emptyCfg = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "", - Labels: map[string]string{}, - } - - cfgWithEmptyResourceType = &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "resource", - NameVal: "resource", - }, - ResourceType: "", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, - } - - resource = &resourcepb.Resource{ - Type: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, + processorSettings = configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", } - resource2 = &resourcepb.Resource{ - Type: "ht", - Labels: map[string]string{ - "zone": "zone-2", - "cluster": "cluster-2", - "host": "node-2", + cfg = &Config{ + ProcessorSettings: processorSettings, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT}, + {Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT}, + {Key: "redundant-attribute", Action: attraction.DELETE}, }, } - mergedResource = &resourcepb.Resource{ - Type: "host", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - "zone": "zone-2", - "cluster": "cluster-2", - "host": "node-2", - }, + emptyCfg = &Config{ + ProcessorSettings: processorSettings, + AttributesActions: []attraction.ActionKeyValue{}, } ) -func TestResourceProcessor(t *testing.T) { +func TestResourceProcessorAttributesUpsert(t *testing.T) { tests := []struct { name string config *Config mutatesConsumedData bool - sourceResource *resourcepb.Resource - wantResource *resourcepb.Resource + sourceAttributes map[string]string + wantAttributes map[string]string }{ { - name: "Config with empty resource type doesn't mutate resource type", - config: cfgWithEmptyResourceType, + name: "config_with_no_attributes_doesnt_affect_nil_resource", + config: emptyCfg, + mutatesConsumedData: false, + sourceAttributes: nil, + wantAttributes: nil, + }, + { + name: "config_with_attributes_applied_on_nil_resource", + config: cfg, mutatesConsumedData: true, - sourceResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "original-label": "original-value", - "cloud.zone": "will-be-overridden", - }, - }, - wantResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "original-label": "original-value", - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - }, + sourceAttributes: nil, + wantAttributes: map[string]string{ + "cloud.zone": "zone-1", }, }, { - name: "Config with empty resource type keeps nil resource", - config: cfgWithEmptyResourceType, + name: "config_with_no_attributes_doesnt_affect_empty_resource", + config: emptyCfg, + mutatesConsumedData: false, + sourceAttributes: map[string]string{}, + wantAttributes: map[string]string{}, + }, + { + name: "config_with_attributes_applied_on_empty_resource", + config: cfg, mutatesConsumedData: true, - sourceResource: nil, - wantResource: nil, + sourceAttributes: map[string]string{}, + wantAttributes: map[string]string{ + "cloud.zone": "zone-1", + }, }, { - name: "Consumed resource with nil labels", - config: cfgWithEmptyResourceType, + name: "config_attributes_applied_on_existing_resource_attributes", + config: cfg, mutatesConsumedData: true, - sourceResource: &resourcepb.Resource{ - Type: "original-type", + sourceAttributes: map[string]string{ + "cloud.zone": "to-be-replaced", + "k8s-cluster": "test-cluster", + "redundant-attribute": "to-be-removed", + }, + wantAttributes: map[string]string{ + "cloud.zone": "zone-1", + "k8s-cluster": "test-cluster", + "k8s.cluster.name": "test-cluster", }, - wantResource: &resourcepb.Resource{ - Type: "original-type", - Labels: map[string]string{ - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", + }, + { + name: "config_attributes_replacement", + config: &Config{ + ProcessorSettings: processorSettings, + AttributesActions: []attraction.ActionKeyValue{ + {Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT}, + {Key: "k8s-cluster", Action: attraction.DELETE}, }, }, + mutatesConsumedData: true, + sourceAttributes: map[string]string{ + "k8s-cluster": "test-cluster", + }, + wantAttributes: map[string]string{ + "k8s.cluster.name": "test-cluster", + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Test trace consuner + // Test trace consumer ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, tt.config) + attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: tt.config.AttributesActions}) + require.NoError(t, err) + + rtp := newResourceTraceProcessor(ttn, attrProc) assert.Equal(t, tt.mutatesConsumedData, rtp.GetCapabilities().MutatesConsumedData) - err := rtp.ConsumeTraceData(context.Background(), consumerdata.TraceData{ - Resource: tt.sourceResource, - }) + sourceTraceData := generateTraceData(tt.sourceAttributes) + wantTraceData := generateTraceData(tt.wantAttributes) + err = rtp.ConsumeTraces(context.Background(), sourceTraceData) require.NoError(t, err) - assert.Equal(t, tt.wantResource, ttn.td.Resource) + assert.EqualValues(t, wantTraceData, ttn.td) // Test metrics consumer tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, tt.config) + rmp := newResourceMetricProcessor(tmn, attrProc) assert.Equal(t, tt.mutatesConsumedData, rmp.GetCapabilities().MutatesConsumedData) - err = rmp.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{ - Resource: tt.sourceResource, - }) + sourceMetricData := generateMetricData(tt.sourceAttributes) + wantMetricData := generateMetricData(tt.wantAttributes) + err = rmp.ConsumeMetrics(context.Background(), sourceMetricData) require.NoError(t, err) - assert.Equal(t, tt.wantResource, tmn.md.Resource) + assert.EqualValues(t, wantMetricData, tmn.md) }) } } -func TestTraceResourceProcessor(t *testing.T) { - want := consumerdata.TraceData{ - Resource: resource, - } - test := consumerdata.TraceData{} - - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, cfg) - assert.True(t, rtp.GetCapabilities().MutatesConsumedData) - - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) -} - -func TestTraceResourceProcessorEmpty(t *testing.T) { - want := consumerdata.TraceData{ - Resource: resource2, +func generateTraceData(attributes map[string]string) pdata.Traces { + td := testdata.GenerateTraceDataOneSpanNoResource() + if attributes == nil { + return td } - test := consumerdata.TraceData{ - Resource: resource2, + resource := td.ResourceSpans().At(0).Resource() + resource.InitEmpty() + for k, v := range attributes { + resource.Attributes().InsertString(k, v) } - - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, emptyCfg) - assert.False(t, rtp.GetCapabilities().MutatesConsumedData) - - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) + resource.Attributes().Sort() + return td } -func TestTraceResourceProcessorNonEmptyIncomingResource(t *testing.T) { - want := consumerdata.TraceData{ - Resource: mergedResource, +func generateMetricData(attributes map[string]string) pdata.Metrics { + md := testdata.GenerateMetricDataOneMetricNoResource() + if attributes == nil { + return pdatautil.MetricsFromInternalMetrics(md) } - test := consumerdata.TraceData{ - Resource: resource2, + resource := md.ResourceMetrics().At(0).Resource() + resource.InitEmpty() + for k, v := range attributes { + resource.Attributes().InsertString(k, v) } - ttn := &testTraceConsumer{} - rtp := newResourceTraceProcessor(ttn, cfg) - rtp.ConsumeTraceData(context.Background(), test) - assert.Equal(t, ttn.td, want) -} - -func TestMetricResourceProcessor(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: resource, - } - test := consumerdata.MetricsData{} - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, cfg) - assert.True(t, rmp.GetCapabilities().MutatesConsumedData) - - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMetricResourceProcessorEmpty(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: resource2, - } - test := consumerdata.MetricsData{ - Resource: resource2, - } - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, emptyCfg) - assert.False(t, rmp.GetCapabilities().MutatesConsumedData) - - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMetricResourceProcessorNonEmptyIncomingResource(t *testing.T) { - want := consumerdata.MetricsData{ - Resource: mergedResource, - } - test := consumerdata.MetricsData{ - Resource: resource2, - } - - tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, cfg) - rmp.ConsumeMetricsData(context.Background(), test) - assert.Equal(t, tmn.md, want) -} - -func TestMergeResourceWithNilLabels(t *testing.T) { - resourceNilLabels := &resourcepb.Resource{Type: "host"} - assert.Nil(t, resourceNilLabels.Labels) - assert.Equal(t, mergeResource(nil, resourceNilLabels), &resourcepb.Resource{Type: "host", Labels: map[string]string{}}) + resource.Attributes().Sort() + return pdatautil.MetricsFromInternalMetrics(md) } type testTraceConsumer struct { - td consumerdata.TraceData + td pdata.Traces } -func (ttn *testTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { +func (ttn *testTraceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + // sort attributes to be able to compare traces + for i := 0; i < td.ResourceSpans().Len(); i++ { + sortResourceAttributes(td.ResourceSpans().At(i).Resource()) + } ttn.td = td return nil } type testMetricsConsumer struct { - md consumerdata.MetricsData + md pdata.Metrics } -func (tmn *testMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { +func (tmn *testMetricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + // sort attributes to be able to compare traces + imd := pdatautil.MetricsToInternalMetrics(md) + for i := 0; i < imd.ResourceMetrics().Len(); i++ { + sortResourceAttributes(imd.ResourceMetrics().At(i).Resource()) + } tmn.md = md return nil } + +func sortResourceAttributes(resource pdata.Resource) { + if resource.IsNil() { + return + } + resource.Attributes().Sort() +} diff --git a/processor/resourceprocessor/testdata/config.yaml b/processor/resourceprocessor/testdata/config.yaml index b770ca2be15..a0f3c7cc7f1 100644 --- a/processor/resourceprocessor/testdata/config.yaml +++ b/processor/resourceprocessor/testdata/config.yaml @@ -4,16 +4,21 @@ receivers: processors: # The following specifies an empty resource - it will have no effect on trace or metrics data. resource: - # The following specifies a non-trivial resource. Type "host" is used for Kubernetes node resources - # that expect the labels "cloud.zone", "k8s.cluster.name", "host.name" to be defined (although this - # is not enforced by the configuration logic). + # The following specifies a non-trivial resource. It sets resource attributes "cloud.zone", + # "k8s.cluster.name" and "host.name" for every metric or trace being processed. + # If any of the attributes already have some values, they will be overridden. + # There are many more attribute modification actions supported, + # check processor/attributesprocessor/testdata/config.yaml for reference. resource/2: - type: "host" - labels: { - "cloud.zone": "zone-1", - "k8s.cluster.name": "k8s-cluster", - "host.name": "k8s-node", - } + attributes: + - key: cloud.zone + value: zone-1 + action: upsert + - key: k8s.cluster.name + from_attribute: k8s-cluster + action: insert + - key: redundant-attribute + action: delete exporters: exampleexporter: @@ -24,3 +29,7 @@ service: receivers: [examplereceiver] processors: [resource/2] exporters: [exampleexporter] + traces: + receivers: [examplereceiver] + processors: [resource/2] + exporters: [exampleexporter] diff --git a/testbed/tests/resource_processor_test.go b/testbed/tests/resource_processor_test.go index ad1958c8ac6..b26efbbc95f 100644 --- a/testbed/tests/resource_processor_test.go +++ b/testbed/tests/resource_processor_test.go @@ -67,38 +67,6 @@ const ( } ` - mockedConsumedResourceWithoutTypeJSON = ` - { - "resource": { - "attributes": [ - { - "key": "label-key", - "value": { "stringValue": "label-value" } - } - ] - }, - "instrumentation_library_metrics": [ - { - "metrics": [ - { - "metric_descriptor": { - "name": "metric-name", - "description": "metric-description", - "unit": "metric-unit", - "type": 1 - }, - "int64_data_points": [ - { - "value": 0 - } - ] - } - ] - } - ] - } -` - mockedConsumedResourceNilJSON = ` { "instrumentation_library_metrics": [ @@ -160,13 +128,16 @@ func getResourceProcessorTestCases(t *testing.T) []resourceProcessorTestCase { tests := []resourceProcessorTestCase{ { - name: "Override consumed resource labels and type", + name: "Override consumed resource attributes", resourceProcessorConfig: ` resource: - type: vm - labels: { - "additional-label-key": "additional-label-value", - } + attributes: + - key: opencensus.resourcetype + value: vm + action: upsert + - key: label-key + value: new-label-value + action: update `, mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceWithTypeJSON), expectedMetricData: getMetricDataFromResourceMetrics(&otlpmetrics.ResourceMetrics{ @@ -178,29 +149,14 @@ func getResourceProcessorTestCases(t *testing.T) []resourceProcessorTestCase { }, { Key: "label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "label-value"}}, - }, - { - Key: "additional-label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "additional-label-value"}}, + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "new-label-value"}}, }, }, }, }), }, { - name: "Return nil if consumed resource is nil and type is empty", - resourceProcessorConfig: ` - resource: - labels: { - "additional-label-key": "additional-label-value", - } -`, - mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceNilJSON), - isNilResource: true, - }, - { - name: "Return nil if consumed resource and resource in config is nil", + name: "Return nil if consumed resource is nil and there are no attributes in config", resourceProcessorConfig: ` resource: `, @@ -208,42 +164,26 @@ func getResourceProcessorTestCases(t *testing.T) []resourceProcessorTestCase { isNilResource: true, }, { - name: "Return resource without type", - resourceProcessorConfig: ` - resource: - labels: { - "additional-label-key": "additional-label-value", - } -`, - mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceWithoutTypeJSON), - expectedMetricData: getMetricDataFromResourceMetrics(&otlpmetrics.ResourceMetrics{ - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: "label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "label-value"}}, - }, - { - Key: "additional-label-key", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "additional-label-value"}}, - }, - }, - }, - }), - }, - { - name: "Consumed resource with nil labels", + name: "Consumed empty resource", resourceProcessorConfig: ` resource: - labels: { - "additional-label-key": "additional-label-value", - } + attributes: + - key: opencensus.resourcetype + value: vm + action: insert + - key: additional-label-key + value: additional-label-value + action: insert `, mockedConsumedMetricData: getMetricDataFromJSON(t, mockedConsumedResourceWithoutAttributesJSON), expectedMetricData: getMetricDataFromResourceMetrics(&otlpmetrics.ResourceMetrics{ Resource: &otlpresource.Resource{ Attributes: []*otlpcommon.KeyValue{ + { + Key: "opencensus.resourcetype", + Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "vm"}}, + }, { Key: "additional-label-key", Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "additional-label-value"}},