Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource attributes copy processor #133

Merged
merged 7 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/collector/factories.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"github.com/observiq/observiq-collector/pkg/processor/resourceattributetransposerprocessor"
"github.com/observiq/observiq-collector/pkg/receiver/logsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/observiqexporter"
Expand Down Expand Up @@ -51,6 +52,7 @@ var defaultProcessors = []component.ProcessorFactory{
batchprocessor.NewFactory(),
memorylimiterprocessor.NewFactory(),
probabilisticsamplerprocessor.NewFactory(),
resourceattributetransposerprocessor.NewFactory(),
componenttest.NewNopProcessorFactory(),
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/processor/resourceattributetransposerprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Resource Attribute Transposer Processor

This processor copies a resource level attribute to all individual metric data points associated with the resource.
If they key already exists, no action is taken (the data points' attribute _**IS NOT**_ overwritten)

## Configuration

The following options may be configured:
- `operations` (default: []): A list of operations to apply to each resource metric.
- `operations[].from` (default: ""): The attribute to copy off of the resource
- `operations[].to` (default: ""): The destination attribute on each individual metric data point

### Example configuration

```yaml
processors:
resourceattributetransposer:
operations:
- from: "some.resource.level.attr"
to: "some.metricdatapoint.level.attr"
- from: "another.resource.attr"
to: "another.datapoint.attr"
```

## Limitations

Currently, this assumes that the resources attributes is a flat map. This means that you cannot move a single resource attribute if it is under a nested map. You can, however, move a whole nested map.

16 changes: 16 additions & 0 deletions pkg/processor/resourceattributetransposerprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package resourceattributetransposerprocessor

import "go.opentelemetry.io/collector/config"

type CopyResourceConfig struct {
// From is the attribute on the resource to copy from
From string `mapstructure:"from"`
// To is the attribute to copy to on the individual data point
To string `mapstructure:"to"`
}

type Config struct {
config.ProcessorSettings `mapstructure:",squash"`
// Operations is a list of copy operations to perform on each ResourceMetric.
Operations []CopyResourceConfig `mapstructure:"operations"`
}
45 changes: 45 additions & 0 deletions pkg/processor/resourceattributetransposerprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package resourceattributetransposerprocessor

import (
"path"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtest"
)

func TestConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

require.Equal(t, len(cfg.Processors), 2)

// Loaded config should be equal to default config
defaultCfg := factory.CreateDefaultConfig()
r0 := cfg.Processors[config.NewComponentID(typeStr)]
require.Equal(t, r0, defaultCfg)

customComponentID := config.NewComponentIDWithName(typeStr, "customname")
r1 := cfg.Processors[customComponentID].(*Config)
require.Equal(t, &Config{
ProcessorSettings: config.NewProcessorSettings(customComponentID),
Operations: []CopyResourceConfig{
{
From: "some.resource.level.attr",
To: "some.metricdatapoint.level.attr",
},
{
From: "another.resource.attr",
To: "another.datapoint.attr",
},
},
}, r1)
}
41 changes: 41 additions & 0 deletions pkg/processor/resourceattributetransposerprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package resourceattributetransposerprocessor

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
typeStr = "resourceattributetransposer"
)

// NewFactory returns a new factory for the resourceattributetransposer processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithMetrics(createMetricsProcessor),
)
}

// createDefaultConfig returns the default config for the resourceattributetransposer processor.
func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
}
}

// createMetricsProcessor creates the resourceattributetransposer processor.
func createMetricsProcessor(ctx context.Context, params component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Metrics) (component.MetricsProcessor, error) {
processorCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("config was not of correct type for the processor: %+v", cfg)
}

return newResourceAttributeTransposerProcessor(params.Logger, nextConsumer, processorCfg), nil
}
34 changes: 34 additions & 0 deletions pkg/processor/resourceattributetransposerprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package resourceattributetransposerprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/consumer/consumertest"
)

func TestNewFactory(t *testing.T) {
f := NewFactory()
require.NotNil(t, f)
}

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig()
require.NotNil(t, cfg)
require.NoError(t, configtest.CheckConfigStruct(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
cfg := createDefaultConfig()
p, err := createMetricsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.NotNil(t, p)
require.NoError(t, err)
}

func TestCreateMetricsExporterNilConfig(t *testing.T) {
_, err := createMetricsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), nil, consumertest.NewNop())
require.Error(t, err)
}
99 changes: 99 additions & 0 deletions pkg/processor/resourceattributetransposerprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package resourceattributetransposerprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
)

type resourceAttributeTransposerProcessor struct {
consumer consumer.Metrics
logger *zap.Logger
config *Config
}

// newResourceAttributeTransposerProcessor returns a new resourceToMetricsAttributesProcessor
func newResourceAttributeTransposerProcessor(logger *zap.Logger, consumer consumer.Metrics, config *Config) *resourceAttributeTransposerProcessor {
return &resourceAttributeTransposerProcessor{
consumer: consumer,
logger: logger,
config: config,
}
}

// Start starts the processor. It's a noop.
func (resourceAttributeTransposerProcessor) Start(ctx context.Context, host component.Host) error {
return nil
}

// Capabilities returns the consumer's capabilities. Indicates that this processor mutates the incoming metrics.
func (resourceAttributeTransposerProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

// ConsumeMetrics processes the incoming pdata.Metrics.
func (p resourceAttributeTransposerProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
resMetrics := md.ResourceMetrics()
for i := 0; i < resMetrics.Len(); i++ {
resMetric := resMetrics.At(i)
resourceAttrs := resMetric.Resource().Attributes()
for _, op := range p.config.Operations {
resourceValue, ok := resourceAttrs.Get(op.From)
if !ok {
continue
}

ilms := resMetric.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metrics := ilm.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(j)
setMetricAttr(metric, op.To, resourceValue)
}
}
}
}
return p.consumer.ConsumeMetrics(ctx, md)
}

// Shutdown stops the processor. It's a noop.
func (resourceAttributeTransposerProcessor) Shutdown(ctx context.Context) error {
return nil
}

// setMetricAttr sets the attribute (attrName) to the given value for every datapoint in the metric
func setMetricAttr(metric pdata.Metric, attrName string, value pdata.AttributeValue) {
switch metric.DataType() {
case pdata.MetricDataTypeGauge:
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
dp.Attributes().Insert(attrName, value)
}

case pdata.MetricDataTypeHistogram:
dps := metric.Histogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
dp.Attributes().Insert(attrName, value)
}
case pdata.MetricDataTypeSum:
dps := metric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
dp.Attributes().Insert(attrName, value)
}
case pdata.MetricDataTypeSummary:
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
dp.Attributes().Insert(attrName, value)
}
default:
// skip metric if None or unknown type
}
}
Loading