Skip to content

Commit

Permalink
Migrate Resource Processor to internal data model (open-telemetry#1315)
Browse files Browse the repository at this point in the history
This commit migrates resource processor to internal data model. Existing processor configuration is relevant only to OpenCensus format, so the configuration schema has to be changed. Taking this opportunity, this commit adds existing logic for attributes manipulation from attributes processor to resource processor. 
New config uses "attributes" field which represents actions that can be made on resource attributes. Supported actions: INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT.
In order to migrate existing resource processor config:
1. Move key/values from "labels" field to "attributes" with action="upsert".
2. Add value from "type" field to "attributes" with key="opencensus.resourcetype"  and action="upsert".
  • Loading branch information
dmitryax authored and wyTrivail committed Jul 13, 2020
1 parent b1b95f6 commit 4794b76
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 412 deletions.
2 changes: 1 addition & 1 deletion internal/processor/attraction/attraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// Settings
type Settings struct {
// Actions specifies the list of attributes to act on.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE}.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
// This is a required field.
Actions []ActionKeyValue `mapstructure:"actions"`
}
Expand Down
2 changes: 1 addition & 1 deletion processor/attributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Config struct {
filterspan.MatchConfig `mapstructure:",squash"`

// Specifies the list of attributes to act on.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE}.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
// This is a required field.
attraction.Settings `mapstructure:",squash"`
}
23 changes: 12 additions & 11 deletions processor/resourceprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@

Supported pipeline types: metrics, traces

The resource processor can be used to override a resource.
The resource processor can be used to apply changes on resource attributes.
Please refer to [config.go](./config.go) for the config spec.

The following configuration options are required:
- `type`: Resource type to be applied. If specified, this value overrides the
original resource type. Otherwise, the original resource type is kept.
- `labels`: Map of key/value pairs that should be added to the resource.
`attributes` represents actions that can be applied on resource attributes.
See processor/attributesprocessor/README.md for more details on supported attributes actions.

Examples:

```yaml
processors:
resource:
type: "host"
labels: {
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
}
attributes:
- key: cloud.zone
value: "zone-1"
action: upsert
- key: k8s.cluster.name
from_attribute: k8s-cluster
action: insert
- key: redundant-attribute
action: delete
```
Refer to [config.yaml](./testdata/config.yaml) for detailed
Expand Down
12 changes: 9 additions & 3 deletions processor/resourceprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ package resourceprocessor

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/attraction"
)

// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// ResourceType overrides the original resource type.

// AttributesActions specifies the list of actions to be applied on resource attributes.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
AttributesActions []attraction.ActionKeyValue `mapstructure:"attributes"`

// ResourceType field is deprecated. Set "opencensus.type" key in "attributes.upsert" map instead.
ResourceType string `mapstructure:"type"`
// Labels specify static labels to be added to resource.
// In case of a conflict the label will be overridden.

// Labels field is deprecated. Use "attributes.upsert" instead.
Labels map[string]string `mapstructure:"labels"`
}
26 changes: 14 additions & 12 deletions processor/resourceprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,35 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/attraction"
)

func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factories.Processors[typeStr] = &Factory{}

cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.NoError(t, err)
assert.NotNil(t, cfg)

p1 := cfg.Processors["resource"]
assert.Equal(t, p1, factory.CreateDefaultConfig())

p2 := cfg.Processors["resource/2"]
assert.Equal(t, p2, &Config{
assert.Equal(t, cfg.Processors["resource"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource/2",
NameVal: "resource",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT},
{Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT},
{Key: "redundant-attribute", Action: attraction.DELETE},
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
})

assert.Equal(t, cfg.Processors["resource/invalid"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource/invalid",
},
})
}
78 changes: 68 additions & 10 deletions processor/resourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package resourceprocessor

import (
"context"
"fmt"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/processor/attraction"
"go.opentelemetry.io/collector/translator/conventions"
)

const (
Expand All @@ -32,30 +37,83 @@ type Factory struct {
}

// Type gets the type of the Option config created by this factory.
func (Factory) Type() configmodels.Type {
func (*Factory) Type() configmodels.Type {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (Factory) CreateDefaultConfig() configmodels.Processor {
// Note: This isn't a valid configuration because the processor would do no work.
func (*Factory) CreateDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
ResourceType: "",
Labels: map[string]string{},
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (Factory) CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumerOld, cfg configmodels.Processor) (component.TraceProcessorOld, error) {
oCfg := cfg.(*Config)
return newResourceTraceProcessor(nextConsumer, oCfg), nil
func (*Factory) CreateTraceProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (component.TraceProcessor, error) {
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
if err != nil {
return nil, err
}
return newResourceTraceProcessor(nextConsumer, attrProc), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (Factory) CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg configmodels.Processor) (component.MetricsProcessorOld, error) {
oCfg := cfg.(*Config)
return newResourceMetricProcessor(nextConsumer, oCfg), nil
func (*Factory) CreateMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (component.MetricsProcessor, error) {
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
if err != nil {
return nil, err
}
return newResourceMetricProcessor(nextConsumer, attrProc), nil
}

func createAttrProcessor(cfg *Config, logger *zap.Logger) (*attraction.AttrProc, error) {
handleDeprecatedFields(cfg, logger)
if len(cfg.AttributesActions) == 0 {
return nil, fmt.Errorf("error creating \"%q\" processor due to missing required field \"attributes\"", cfg.Name())
}
attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: cfg.AttributesActions})
if err != nil {
return nil, fmt.Errorf("error creating \"%q\" processor: %w", cfg.Name(), err)
}
return attrProc, nil
}

// handleDeprecatedFields converts deprecated ResourceType and Labels fields into Attributes.Upsert
func handleDeprecatedFields(cfg *Config, logger *zap.Logger) {

// Upsert value from deprecated ResourceType config to resource attributes with "opencensus.type" key
if cfg.ResourceType != "" {
logger.Warn("[DEPRECATED] \"type\" field is deprecated and will be removed in future release. " +
"Please set the value to \"attributes\" with key=opencensus.type and action=upsert.")
upsertResourceType := attraction.ActionKeyValue{
Action: attraction.UPSERT,
Key: conventions.OCAttributeResourceType,
Value: cfg.ResourceType,
}
cfg.AttributesActions = append(cfg.AttributesActions, upsertResourceType)
}

// Upsert values from deprecated Labels config to resource attributes
if len(cfg.Labels) > 0 {
logger.Warn("[DEPRECATED] \"labels\" field is deprecated and will be removed in future release. " +
"Please use \"attributes\" field instead.")
for k, v := range cfg.Labels {
action := attraction.ActionKeyValue{Action: attraction.UPSERT, Key: k, Value: v}
cfg.AttributesActions = append(cfg.AttributesActions, action)
}
}
}
78 changes: 75 additions & 3 deletions processor/resourceprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package resourceprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/attraction"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -32,13 +36,81 @@ func TestCreateDefaultConfig(t *testing.T) {

func TestCreateProcessor(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT},
},
}

tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg)
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
}

func TestInvalidEmptyActions(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()

_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)

_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)
}

func TestInvalidAttributeActions(t *testing.T) {
var factory Factory
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "k", Value: "v", Action: "invalid-action"},
},
}

_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)

_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)
}

func TestDeprecatedConfig(t *testing.T) {
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
},
}

handleDeprecatedFields(cfg, zap.NewNop())

assert.EqualValues(t, &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "opencensus.resourcetype", Value: "host", Action: attraction.UPSERT},
{Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT},
},
}, cfg)
}
Loading

0 comments on commit 4794b76

Please sign in to comment.