Skip to content

Commit

Permalink
Migrate Resource Processor to internal data model
Browse files Browse the repository at this point in the history
This commit migrates resource processor to internal data model. Existing configuration is relevant only to OpenCensus resource, so this commit also changes the configuration schema to align with OTLP resource naming and to allow future evolvement of the processor. 
New config field uses "attributes" field representing actions that can be made on resource attributes. For now we have only one action "upsert" representing existing behavior.  
Old fields "type" and "labels" are still supported for backward compatibility, but marked as deprecated. 
In order to migrate to migrate existing resource processor config:
1. Rename "labels" field to "attributes" 
2. Add value from "type" field to "attributes" under "opencensus.resourcetype" key.
  • Loading branch information
dmitryax committed Jul 10, 2020
1 parent e7ab219 commit a2bcbc2
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 362 deletions.
6 changes: 2 additions & 4 deletions processor/resourceprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ The resource processor can be used to override a resource.
Please refer to [config.go](./config.go) for the config spec.

The following configuration options are required:
- `type`: Resource type to be applied. If specified, this value overrides the
original resource type. Otherwise, the original resource type is kept.
- `labels`: Map of key/value pairs that should be added to the resource.
- `attributes`: Map of key/value pairs that should be added to the resource.

Examples:

```yaml
processors:
resource:
type: "host"
labels: {
attributes: {
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
Expand Down
15 changes: 12 additions & 3 deletions processor/resourceprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,18 @@ import (
// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// ResourceType overrides the original resource type.
Attributes AttributesActions `mapstructure:"attributes"`

// ResourceType field is deprecated. Set "opencensus.type" key in "attributes.upsert" map instead.
ResourceType string `mapstructure:"type"`
// Labels specify static labels to be added to resource.
// In case of a conflict the label will be overridden.

// Labels field is deprecated. Use "attributes.upsert" instead.
Labels map[string]string `mapstructure:"labels"`
}

// AttributesActions defines actions that can be applied to resource attributes.
type AttributesActions struct {
// Upsert attribute action defines static key/values pairs to be added to resource attributes.
// In case of a conflict the attribute will be overridden.
Upsert map[string]string `mapstructure:"upsert"`
}
11 changes: 6 additions & 5 deletions processor/resourceprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ func TestLoadConfig(t *testing.T) {
TypeVal: "resource",
NameVal: "resource/2",
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
Attributes: AttributesActions{
Upsert: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
},
},
})
}
49 changes: 42 additions & 7 deletions processor/resourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package resourceprocessor

import (
"go.uber.org/zap"
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"
)

const (
Expand All @@ -32,30 +34,63 @@ type Factory struct {
}

// Type gets the type of the Option config created by this factory.
func (Factory) Type() configmodels.Type {
func (*Factory) Type() configmodels.Type {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (Factory) CreateDefaultConfig() configmodels.Processor {
func (*Factory) CreateDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
ResourceType: "",
Labels: map[string]string{},
Attributes: AttributesActions{
Upsert: map[string]string{},
},
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (Factory) CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumerOld, cfg configmodels.Processor) (component.TraceProcessorOld, error) {
func (*Factory) CreateTraceProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (component.TraceProcessor, error) {
oCfg := cfg.(*Config)
handleDeprecatedFields(oCfg, params.Logger)
return newResourceTraceProcessor(nextConsumer, oCfg), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (Factory) CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg configmodels.Processor) (component.MetricsProcessorOld, error) {
func (*Factory) CreateMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)
handleDeprecatedFields(oCfg, params.Logger)
return newResourceMetricProcessor(nextConsumer, oCfg), nil
}

// handleDeprecatedFields converts deprecated ResourceType and Labels fields into Attributes.Upsert
func handleDeprecatedFields(cfg *Config, logger *zap.Logger) {
if cfg.ResourceType != "" {
logger.Warn("[DEPRECATED] \"type\" field is deprecated and will be removed in future release. " +
"Please set the value to \"opencensus.type\" key in \"attributes.upsert\" map instead.")
cfg.Attributes.Upsert[conventions.OCAttributeResourceType] = cfg.ResourceType
}
if len(cfg.Labels) > 0 {
logger.Warn("[DEPRECATED] \"labels\" field is deprecated and will be removed in future release. " +
"Please use \"attributes.upsert\" field instead.")
for k, v := range cfg.Labels {
if _, ok := cfg.Attributes.Upsert[k]; ok {
logger.Warn("Label \"" + k + "\" ignored due to a conflict with a key in attributes.upsert")
continue
}
cfg.Attributes.Upsert[k] = v
}
}
}
51 changes: 49 additions & 2 deletions processor/resourceprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package resourceprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/translator/conventions"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -34,11 +38,54 @@ func TestCreateProcessor(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg)
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
}

func TestDeprecatedConfig(t *testing.T) {
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"ignored": "new-value",
},
Attributes: AttributesActions{
Upsert: map[string]string{
"ignored": "old-value",
},
},
}

handleDeprecatedFields(cfg, zap.NewNop())

assert.EqualValues(t, &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
ResourceType: "host",
Labels: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"ignored": "new-value",
},
Attributes: AttributesActions{
Upsert: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
conventions.OCAttributeResourceType: "host",
"ignored": "old-value",
},
},
}, cfg)
}
119 changes: 52 additions & 67 deletions processor/resourceprocessor/resource_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,41 @@ package resourceprocessor
import (
"context"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
)

type resourceTraceProcessor struct {
resource *resourcepb.Resource
capabilities component.ProcessorCapabilities
next consumer.TraceConsumerOld
attributesActions AttributesActions
capabilities component.ProcessorCapabilities
next consumer.TraceConsumer
}

func newResourceTraceProcessor(next consumer.TraceConsumerOld, cfg *Config) *resourceTraceProcessor {
resource := createResource(cfg)
func newResourceTraceProcessor(next consumer.TraceConsumer, cfg *Config) *resourceTraceProcessor {
return &resourceTraceProcessor{
next: next,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)},
resource: resource,
attributesActions: cfg.Attributes,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: mutatesData(cfg.Attributes)},
next: next,
}
}

// ConsumeTraceData implements the TraceProcessor interface
func (rtp *resourceTraceProcessor) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
return rtp.next.ConsumeTraceData(ctx, consumerdata.TraceData{
Node: td.Node,
Resource: mergeResource(td.Resource, rtp.resource),
Spans: td.Spans,
SourceFormat: td.SourceFormat,
})
func (rtp *resourceTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
if !mutatesData(rtp.attributesActions) {
return rtp.next.ConsumeTraces(ctx, td)
}

rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
resource := rss.At(i).Resource()
if resource.IsNil() {
resource.InitEmpty()
}
applyAttributesActions(resource, rtp.attributesActions)
}
return rtp.next.ConsumeTraces(ctx, td)
}

// GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor.
Expand All @@ -65,17 +70,16 @@ func (*resourceTraceProcessor) Shutdown(context.Context) error {
}

type resourceMetricProcessor struct {
resource *resourcepb.Resource
capabilities component.ProcessorCapabilities
next consumer.MetricsConsumerOld
attributesActions AttributesActions
capabilities component.ProcessorCapabilities
next consumer.MetricsConsumer
}

func newResourceMetricProcessor(next consumer.MetricsConsumerOld, cfg *Config) *resourceMetricProcessor {
resource := createResource(cfg)
func newResourceMetricProcessor(next consumer.MetricsConsumer, cfg *Config) *resourceMetricProcessor {
return &resourceMetricProcessor{
resource: resource,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)},
next: next,
attributesActions: cfg.Attributes,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: mutatesData(cfg.Attributes)},
next: next,
}
}

Expand All @@ -95,52 +99,33 @@ func (*resourceMetricProcessor) Shutdown(context.Context) error {
}

// ConsumeMetricsData implements the MetricsProcessor interface
func (rmp *resourceMetricProcessor) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
return rmp.next.ConsumeMetricsData(ctx, consumerdata.MetricsData{
Node: md.Node,
Resource: mergeResource(md.Resource, rmp.resource),
Metrics: md.Metrics,
})
}

func createResource(cfg *Config) *resourcepb.Resource {
rpb := &resourcepb.Resource{
Type: cfg.ResourceType,
Labels: map[string]string{},
func (rmp *resourceMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
if !mutatesData(rmp.attributesActions) {
return rmp.next.ConsumeMetrics(ctx, md)
}
for k, v := range cfg.Labels {
rpb.Labels[k] = v
}
return rpb
}

func mergeResource(to, from *resourcepb.Resource) *resourcepb.Resource {
if isEmptyResource(from) {
return to
}
if to == nil {
if from.Type == "" {
// Since resource without type would be invalid, we keep resource as nil
return nil
imd := pdatautil.MetricsToInternalMetrics(md)
rms := imd.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
resource := rms.At(i).Resource()
if resource.IsNil() {
resource.InitEmpty()
}
to = &resourcepb.Resource{Labels: map[string]string{}}
}
if from.Type != "" {
// Only change resource type if it was configured
to.Type = from.Type
applyAttributesActions(resource, rmp.attributesActions)
}
if from.Labels != nil {
if to.Labels == nil {
to.Labels = make(map[string]string, len(from.Labels))
}
return rmp.next.ConsumeMetrics(ctx, md)
}

for k, v := range from.Labels {
to.Labels[k] = v
}
}
return to
func mutatesData(actions AttributesActions) bool {
return len(actions.Upsert) > 0
}

func applyAttributesActions(resource pdata.Resource, attributesActions AttributesActions) {
upsertAttributes(resource, attributesActions.Upsert)
}

func isEmptyResource(resource *resourcepb.Resource) bool {
return resource.Type == "" && len(resource.Labels) == 0
func upsertAttributes(resource pdata.Resource, attrs map[string]string) {
for k, v := range attrs {
resource.Attributes().UpsertString(k, v)
}
}
Loading

0 comments on commit a2bcbc2

Please sign in to comment.