Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Resource Processor to internal data model #1315

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/processor/attraction/attraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// Settings
type Settings struct {
// Actions specifies the list of attributes to act on.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE}.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
// This is a required field.
Actions []ActionKeyValue `mapstructure:"actions"`
}
Expand Down
2 changes: 1 addition & 1 deletion processor/attributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Config struct {
filterspan.MatchConfig `mapstructure:",squash"`

// Specifies the list of attributes to act on.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE}.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
// This is a required field.
attraction.Settings `mapstructure:",squash"`
}
23 changes: 12 additions & 11 deletions processor/resourceprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@

Supported pipeline types: metrics, traces

The resource processor can be used to override a resource.
The resource processor can be used to apply changes on resource attributes.
Please refer to [config.go](./config.go) for the config spec.

The following configuration options are required:
- `type`: Resource type to be applied. If specified, this value overrides the
original resource type. Otherwise, the original resource type is kept.
- `labels`: Map of key/value pairs that should be added to the resource.
`attributes` represents actions that can be applied on resource attributes.
See processor/attributesprocessor/README.md for more details on supported attributes actions.

Examples:

```yaml
processors:
resource:
type: "host"
labels: {
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
}
attributes:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline I think we can borrow the attribute processor actions implementation and config for this.
BTW, if need specifically a rename we can add it as an action so that we don't have to use upsert+delete pair all the time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code was extracted and easy to use.

- key: cloud.zone
value: "zone-1"
action: upsert
- key: k8s.cluster.name
from_attribute: k8s-cluster
action: insert
- key: redundant-attribute
action: delete
```
Refer to [config.yaml](./testdata/config.yaml) for detailed
Expand Down
12 changes: 9 additions & 3 deletions processor/resourceprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ package resourceprocessor

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/attraction"
)

// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// ResourceType overrides the original resource type.

// AttributesActions specifies the list of actions to be applied on resource attributes.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
AttributesActions []attraction.ActionKeyValue `mapstructure:"attributes"`

// ResourceType field is deprecated. Set "opencensus.type" key in "attributes.upsert" map instead.
ResourceType string `mapstructure:"type"`
// Labels specify static labels to be added to resource.
// In case of a conflict the label will be overridden.

// Labels field is deprecated. Use "attributes.upsert" instead.
Labels map[string]string `mapstructure:"labels"`
}
26 changes: 14 additions & 12 deletions processor/resourceprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,35 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/attraction"
)

func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factories.Processors[typeStr] = &Factory{}

cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.NoError(t, err)
assert.NotNil(t, cfg)

p1 := cfg.Processors["resource"]
assert.Equal(t, p1, factory.CreateDefaultConfig())

p2 := cfg.Processors["resource/2"]
assert.Equal(t, p2, &Config{
assert.Equal(t, cfg.Processors["resource"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource/2",
NameVal: "resource",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT},
{Key: "k8s.cluster.name", FromAttribute: "k8s-cluster", Action: attraction.INSERT},
{Key: "redundant-attribute", Action: attraction.DELETE},
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
})

assert.Equal(t, cfg.Processors["resource/invalid"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource/invalid",
},
})
}
78 changes: 68 additions & 10 deletions processor/resourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package resourceprocessor

import (
"context"
"fmt"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/processor/attraction"
"go.opentelemetry.io/collector/translator/conventions"
)

const (
Expand All @@ -32,30 +37,83 @@ type Factory struct {
}

// Type gets the type of the Option config created by this factory.
func (Factory) Type() configmodels.Type {
func (*Factory) Type() configmodels.Type {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (Factory) CreateDefaultConfig() configmodels.Processor {
// Note: This isn't a valid configuration because the processor would do no work.
func (*Factory) CreateDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
ResourceType: "",
Labels: map[string]string{},
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (Factory) CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumerOld, cfg configmodels.Processor) (component.TraceProcessorOld, error) {
oCfg := cfg.(*Config)
return newResourceTraceProcessor(nextConsumer, oCfg), nil
func (*Factory) CreateTraceProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (component.TraceProcessor, error) {
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
if err != nil {
return nil, err
}
return newResourceTraceProcessor(nextConsumer, attrProc), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (Factory) CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg configmodels.Processor) (component.MetricsProcessorOld, error) {
oCfg := cfg.(*Config)
return newResourceMetricProcessor(nextConsumer, oCfg), nil
func (*Factory) CreateMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (component.MetricsProcessor, error) {
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
if err != nil {
return nil, err
}
return newResourceMetricProcessor(nextConsumer, attrProc), nil
}

func createAttrProcessor(cfg *Config, logger *zap.Logger) (*attraction.AttrProc, error) {
handleDeprecatedFields(cfg, logger)
if len(cfg.AttributesActions) == 0 {
return nil, fmt.Errorf("error creating \"%q\" processor due to missing required field \"attributes\"", cfg.Name())
}
attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: cfg.AttributesActions})
if err != nil {
return nil, fmt.Errorf("error creating \"%q\" processor: %w", cfg.Name(), err)
}
return attrProc, nil
}

// handleDeprecatedFields converts deprecated ResourceType and Labels fields into Attributes.Upsert
func handleDeprecatedFields(cfg *Config, logger *zap.Logger) {

// Upsert value from deprecated ResourceType config to resource attributes with "opencensus.type" key
if cfg.ResourceType != "" {
logger.Warn("[DEPRECATED] \"type\" field is deprecated and will be removed in future release. " +
"Please set the value to \"attributes\" with key=opencensus.type and action=upsert.")
upsertResourceType := attraction.ActionKeyValue{
Action: attraction.UPSERT,
Key: conventions.OCAttributeResourceType,
Value: cfg.ResourceType,
}
cfg.AttributesActions = append(cfg.AttributesActions, upsertResourceType)
}

// Upsert values from deprecated Labels config to resource attributes
if len(cfg.Labels) > 0 {
logger.Warn("[DEPRECATED] \"labels\" field is deprecated and will be removed in future release. " +
"Please use \"attributes\" field instead.")
for k, v := range cfg.Labels {
action := attraction.ActionKeyValue{Action: attraction.UPSERT, Key: k, Value: v}
cfg.AttributesActions = append(cfg.AttributesActions, action)
}
}
}
78 changes: 75 additions & 3 deletions processor/resourceprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package resourceprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/attraction"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -32,13 +36,81 @@ func TestCreateDefaultConfig(t *testing.T) {

func TestCreateProcessor(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT},
},
}

tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg)
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
}

func TestInvalidEmptyActions(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()

_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)

_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)
}

func TestInvalidAttributeActions(t *testing.T) {
var factory Factory
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "k", Value: "v", Action: "invalid-action"},
},
}

_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)

_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Error(t, err)
}

func TestDeprecatedConfig(t *testing.T) {
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
},
}

handleDeprecatedFields(cfg, zap.NewNop())

assert.EqualValues(t, &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
},
AttributesActions: []attraction.ActionKeyValue{
{Key: "opencensus.resourcetype", Value: "host", Action: attraction.UPSERT},
{Key: "cloud.zone", Value: "zone-1", Action: attraction.UPSERT},
},
}, cfg)
}
Loading