From 4a0b824c63141bd7ecc3075811108a2a8403a594 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 17 Nov 2021 16:14:32 -0500 Subject: [PATCH 1/7] Add resourcetometricsattrsprocessor --- .../resourcetometricsattrsprocessor/README.md | 28 + .../resourcetometricsattrsprocessor/config.go | 16 + .../config_test.go | 45 ++ .../factory.go | 41 ++ .../factory_test.go | 34 ++ .../processor.go | 99 ++++ .../processor_test.go | 527 ++++++++++++++++++ .../testdata/config.yaml | 21 + 8 files changed, 811 insertions(+) create mode 100644 pkg/processor/resourcetometricsattrsprocessor/README.md create mode 100644 pkg/processor/resourcetometricsattrsprocessor/config.go create mode 100644 pkg/processor/resourcetometricsattrsprocessor/config_test.go create mode 100644 pkg/processor/resourcetometricsattrsprocessor/factory.go create mode 100644 pkg/processor/resourcetometricsattrsprocessor/factory_test.go create mode 100644 pkg/processor/resourcetometricsattrsprocessor/processor.go create mode 100644 pkg/processor/resourcetometricsattrsprocessor/processor_test.go create mode 100644 pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml diff --git a/pkg/processor/resourcetometricsattrsprocessor/README.md b/pkg/processor/resourcetometricsattrsprocessor/README.md new file mode 100644 index 000000000..8c8065537 --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/README.md @@ -0,0 +1,28 @@ +# Resource to Metrics Attributes Processor + +This processor copies a resource level attribute to all individual metric data points associated with the resource. +If they key already exists, no action is taken (the data points' attribute _**IS NOT**_ overwritten) + +## Configuration + +The following options may be configured: +- `operations` (default: []): A list of operations to apply to each resource metric. + - `operations[].from` (default: ""): The attribute to copy off of the resource + - `operations[].to` (default: ""): The destination attribute on each individual metric data point + +### Example configuration + +```yaml +processors: + resourcetometricsattrs: + operations: + - from: "some.resource.level.attr" + to: "some.metricdatapoint.level.attr" + - from: "another.resource.attr" + to: "another.datapoint.attr" +``` + +## Limitations + +Currently, this assumes that the resources attributes is a flat map. This means that you cannot move a single resource attribute if it is under a nested map. You can, however, move a whole nested map. + diff --git a/pkg/processor/resourcetometricsattrsprocessor/config.go b/pkg/processor/resourcetometricsattrsprocessor/config.go new file mode 100644 index 000000000..409491a52 --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/config.go @@ -0,0 +1,16 @@ +package resourcetometricsattrsprocessor + +import "go.opentelemetry.io/collector/config" + +type CopyResourceConfig struct { + // From is the attribute on the resource to copy from + From string `mapstructure:"from"` + // To is the attribute to copy to on the individual data point + To string `mapstructure:"to"` +} + +type Config struct { + config.ProcessorSettings `mapstructure:",squash"` + // Operations is a list of copy operations to perform on each ResourceMetric. + Operations []CopyResourceConfig `mapstructure:"operations"` +} diff --git a/pkg/processor/resourcetometricsattrsprocessor/config_test.go b/pkg/processor/resourcetometricsattrsprocessor/config_test.go new file mode 100644 index 000000000..672a0eb20 --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/config_test.go @@ -0,0 +1,45 @@ +package resourcetometricsattrsprocessor + +import ( + "path" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configtest" +) + +func TestConfig(t *testing.T) { + factories, err := componenttest.NopFactories() + require.NoError(t, err) + + factory := NewFactory() + factories.Processors[typeStr] = factory + cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", "config.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, cfg) + + require.Equal(t, len(cfg.Processors), 2) + + // Loaded config should be equal to default config + defaultCfg := factory.CreateDefaultConfig() + r0 := cfg.Processors[config.NewComponentID(typeStr)] + require.Equal(t, r0, defaultCfg) + + customComponentID := config.NewComponentIDWithName(typeStr, "customname") + r1 := cfg.Processors[customComponentID].(*Config) + require.Equal(t, &Config{ + ProcessorSettings: config.NewProcessorSettings(customComponentID), + Operations: []CopyResourceConfig{ + { + From: "some.resource.level.attr", + To: "some.metricdatapoint.level.attr", + }, + { + From: "another.resource.attr", + To: "another.datapoint.attr", + }, + }, + }, r1) +} diff --git a/pkg/processor/resourcetometricsattrsprocessor/factory.go b/pkg/processor/resourcetometricsattrsprocessor/factory.go new file mode 100644 index 000000000..ad8922ef4 --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/factory.go @@ -0,0 +1,41 @@ +package resourcetometricsattrsprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +const ( + typeStr = "resourcetometricsattrs" +) + +// NewFactory returns a new factory for the resourcetometricsattrs processor. +func NewFactory() component.ProcessorFactory { + return processorhelper.NewFactory( + typeStr, + createDefaultConfig, + processorhelper.WithMetrics(createMetricsProcessor), + ) +} + +// createDefaultConfig returns the default config for the resourcetometricsattrs processor. +func createDefaultConfig() config.Processor { + return &Config{ + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), + } +} + +// createMetricsProcessor creates the resourcetometricsattrs processor. +func createMetricsProcessor(ctx context.Context, params component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Metrics) (component.MetricsProcessor, error) { + processorCfg, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("config was not of correct type for the processor: %+v", cfg) + } + + return newResourceToMetricsAttributesProcessor(params.Logger, nextConsumer, processorCfg), nil +} diff --git a/pkg/processor/resourcetometricsattrsprocessor/factory_test.go b/pkg/processor/resourcetometricsattrsprocessor/factory_test.go new file mode 100644 index 000000000..6f2b86dd3 --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/factory_test.go @@ -0,0 +1,34 @@ +package resourcetometricsattrsprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtest" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +func TestNewFactory(t *testing.T) { + f := NewFactory() + require.NotNil(t, f) +} + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig() + require.NotNil(t, cfg) + require.NoError(t, configtest.CheckConfigStruct(cfg)) +} + +func TestCreateMetricsExporter(t *testing.T) { + cfg := createDefaultConfig() + p, err := createMetricsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop()) + require.NotNil(t, p) + require.NoError(t, err) +} + +func TestCreateMetricsExporterNilConfig(t *testing.T) { + _, err := createMetricsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), nil, consumertest.NewNop()) + require.Error(t, err) +} diff --git a/pkg/processor/resourcetometricsattrsprocessor/processor.go b/pkg/processor/resourcetometricsattrsprocessor/processor.go new file mode 100644 index 000000000..73e33a498 --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/processor.go @@ -0,0 +1,99 @@ +package resourcetometricsattrsprocessor + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +type resourceToMetricsAttributesProcessor struct { + consumer consumer.Metrics + logger *zap.Logger + config *Config +} + +// newResourceToMetricsAttributesProcessor returns a new resourceToMetricsAttributesProcessor +func newResourceToMetricsAttributesProcessor(logger *zap.Logger, consumer consumer.Metrics, config *Config) *resourceToMetricsAttributesProcessor { + return &resourceToMetricsAttributesProcessor{ + consumer: consumer, + logger: logger, + config: config, + } +} + +// Start starts the processor. It's a noop. +func (resourceToMetricsAttributesProcessor) Start(ctx context.Context, host component.Host) error { + return nil +} + +// Capabilities returns the consumer's capabilities. Indicates that this processor mutates the incoming metrics. +func (resourceToMetricsAttributesProcessor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +// ConsumeMetrics processes the incoming pdata.Metrics. +func (p resourceToMetricsAttributesProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + resMetrics := md.ResourceMetrics() + for i := 0; i < resMetrics.Len(); i++ { + resMetric := resMetrics.At(i) + resourceAttrs := resMetric.Resource().Attributes() + for _, op := range p.config.Operations { + resourceValue, ok := resourceAttrs.Get(op.From) + if !ok { + continue + } + + ilms := resMetric.InstrumentationLibraryMetrics() + for j := 0; j < ilms.Len(); j++ { + ilm := ilms.At(j) + metrics := ilm.Metrics() + for k := 0; k < metrics.Len(); k++ { + metric := metrics.At(j) + setMetricAttr(metric, op.To, resourceValue) + } + } + } + } + return p.consumer.ConsumeMetrics(ctx, md) +} + +// Shutdown stops the processor. It's a noop. +func (resourceToMetricsAttributesProcessor) Shutdown(ctx context.Context) error { + return nil +} + +// setMetricAttr sets the attribute (attrName) to the given value for every datapoint in the metric +func setMetricAttr(metric pdata.Metric, attrName string, value pdata.AttributeValue) { + switch metric.DataType() { + case pdata.MetricDataTypeGauge: + dps := metric.Gauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + dp.Attributes().Insert(attrName, value) + } + + case pdata.MetricDataTypeHistogram: + dps := metric.Histogram().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + dp.Attributes().Insert(attrName, value) + } + case pdata.MetricDataTypeSum: + dps := metric.Sum().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + dp.Attributes().Insert(attrName, value) + } + case pdata.MetricDataTypeSummary: + dps := metric.Summary().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + dp.Attributes().Insert(attrName, value) + } + default: + // skip metric if None or unknown type + } +} diff --git a/pkg/processor/resourcetometricsattrsprocessor/processor_test.go b/pkg/processor/resourcetometricsattrsprocessor/processor_test.go new file mode 100644 index 000000000..10ef2a1de --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/processor_test.go @@ -0,0 +1,527 @@ +package resourcetometricsattrsprocessor + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +func TestProcessorStart(t *testing.T) { + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumertest.NewNop(), + createDefaultConfig().(*Config), + ) + + err := p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) +} + +func TestProcessorShutdown(t *testing.T) { + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumertest.NewNop(), + createDefaultConfig().(*Config), + ) + + err := p.Shutdown(context.Background()) + require.NoError(t, err) +} + +func TestProcessorCapabilities(t *testing.T) { + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumertest.NewNop(), + createDefaultConfig().(*Config), + ) + cap := p.Capabilities() + require.True(t, cap.MutatesData) +} + +// TestConsumeMetricsNoop test that the default config is essentially a noop +func TestConsumeMetricsNoop(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value")) + attrs.Insert("resourceattrib2", pdata.NewAttributeValueBool(false)) + attrs.Insert("resourceattrib3", pdata.NewAttributeValueBytes([]byte("some bytes"))) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + createDefaultConfig().(*Config), + ) + + p.ConsumeMetrics(ctx, metrics) + + require.Empty(t, getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsMoveExistingAttribs(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value")) + attrs.Insert("resourceattrib2", pdata.NewAttributeValueBool(false)) + attrs.Insert("resourceattrib3", pdata.NewAttributeValueBytes([]byte("some bytes"))) + attrs.Insert("resourceattrib4", pdata.NewAttributeValueDouble(2.0)) + attrs.Insert("resourceattrib5", pdata.NewAttributeValueInt(100)) + attrs.Insert("resourceattrib6", pdata.NewAttributeValueEmpty()) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "resourceattrib1", + }, + { + From: "resourceattrib2", + To: "resourceattrib2", + }, + { + From: "resourceattrib3", + To: "resourceattrib3", + }, + { + From: "resourceattrib4", + To: "resourceattrib4", + }, + { + From: "resourceattrib5", + To: "resourceattrib5", + }, + { + From: "resourceattrib6", + To: "resourceattrib6", + }, + { + From: "resourceattrib7", + To: "resourceattrib7", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeGauge) + dp := metric.Gauge().DataPoints() + dp.AppendEmpty().SetDoubleVal(3.0) + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}{ + "resourceattrib1": "value", + "resourceattrib2": false, + "resourceattrib3": []byte("some bytes"), + "resourceattrib4": float64(2.0), + "resourceattrib5": int64(100), + "resourceattrib6": nil, + }, getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsMixedExistence(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value1")) + attrs.Insert("resourceattrib2", pdata.NewAttributeValueString("value2")) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "resourceattrib1out", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeGauge) + dp := metric.Gauge().DataPoints() + dp.AppendEmpty().SetDoubleVal(3.0) + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}{ + "resourceattrib1out": "value1", + }, getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsSum(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value1")) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "resourceattrib1out", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeSum) + dp := metric.Sum().DataPoints() + dp.AppendEmpty().SetDoubleVal(3.0) + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}{ + "resourceattrib1out": "value1", + }, getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsHistogram(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value1")) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "resourceattrib1out", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeHistogram) + dp := metric.Histogram().DataPoints() + dp.AppendEmpty() + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}{ + "resourceattrib1out": "value1", + }, getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsSummary(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value1")) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "resourceattrib1out", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeSummary) + dp := metric.Summary().DataPoints() + dp.AppendEmpty() + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}{ + "resourceattrib1out": "value1", + }, getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsNone(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value1")) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "resourceattrib1out", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeNone) + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}(nil), getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsDoesNotOverwrite(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value1")) + attrs.Insert("resourceattrib2", pdata.NewAttributeValueString("value2")) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "out", + }, + { + From: "resourceattrib2", + To: "out", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeGauge) + dp := metric.Gauge().DataPoints() + dp.AppendEmpty().SetDoubleVal(3.0) + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}{ + "out": "value1", + }, getMetricAttrsFromMetrics(metricsOut)) +} + +func TestConsumeMetricsDoesNotOverwrite2(t *testing.T) { + ctx := context.Background() + metrics := createMetrics() + + attrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + attrs.Insert("resourceattrib1", pdata.NewAttributeValueString("value1")) + attrs.Insert("resourceattrib2", pdata.NewAttributeValueString("value2")) + + var metricsOut pdata.Metrics + + consumer := &mockConsumer{} + consumer.On("ConsumeMetrics", ctx, metrics).Run(func(args mock.Arguments) { + metricsOut = args[1].(pdata.Metrics) + }).Return(nil) + + cfg := createDefaultConfig().(*Config) + cfg.Operations = []CopyResourceConfig{ + { + From: "resourceattrib1", + To: "out", + }, + { + From: "resourceattrib2", + To: "out", + }, + } + + p := newResourceToMetricsAttributesProcessor( + zap.NewNop(), + consumer, + cfg, + ) + + metric := getMetric(metrics) + metric.SetDataType(pdata.MetricDataTypeGauge) + dps := metric.Gauge().DataPoints() + dp := dps.AppendEmpty() + dp.SetDoubleVal(3.0) + dp.Attributes().InsertString("out", "originalvalue") + + p.ConsumeMetrics(ctx, metrics) + + require.Equal(t, map[string]interface{}{ + "out": "originalvalue", + }, getMetricAttrsFromMetrics(metricsOut)) +} + +type mockConsumer struct { + mock.Mock +} + +func (m mockConsumer) Start(ctx context.Context, host component.Host) error { + args := m.Called(ctx, host) + return args.Error(0) +} + +func (m mockConsumer) Capabilities() consumer.Capabilities { + args := m.Called() + return args.Get(0).(consumer.Capabilities) +} + +func (m mockConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + args := m.Called(ctx, md) + return args.Error(0) +} + +func (m mockConsumer) Shutdown(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func assertAttributeMapIsEqual(t *testing.T, m1, m2 pdata.AttributeMap) { + if m1.Len() != m2.Len() { + require.Fail(t, "Len of m1 (%d) does not equal Len of m2 (%d)", m1.Len(), m2.Len()) + } + + m1.Range(func(k string, v pdata.AttributeValue) bool { + v2, ok := m2.Get(k) + if !ok { + require.Fail(t, "m2 does not contain key %s, but m1 does", k) + return false + } + + err := assertAttributeValueIsEqual(v, v2) + if err != nil { + require.Fail(t, "Attribute value for %s was not the same between m1 and m2: %v", k, err) + return false + } + + return true + }) +} + +func assertAttributeValueIsEqual(a1, a2 pdata.AttributeValue) error { + if a1.Type() != a2.Type() { + return fmt.Errorf("type of a1 (%d) is not equal to type of a2 (%d)", a1.Type(), a2.Type()) + } + if !a1.Equal(a2) { + return fmt.Errorf("a1 (%s) was not equal to a2 (%s)", a1.AsString(), a2.AsString()) + } + return nil +} + +func getMetric(m pdata.Metrics) pdata.Metric { + return m.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0) +} + +func createMetrics() pdata.Metrics { + metrics := pdata.NewMetrics() + metrics.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty() + return metrics +} + +func getResourceAttrsFromMetrics(m pdata.Metrics) map[string]interface{} { + return m.ResourceMetrics().At(0).Resource().Attributes().AsRaw() +} + +func getMetricAttrsFromMetrics(m pdata.Metrics) map[string]interface{} { + return getMetricAttrsFromMetric(getMetric(m)) +} + +func getMetricAttrsFromMetric(m pdata.Metric) map[string]interface{} { + switch m.DataType() { + case pdata.MetricDataTypeGauge: + return m.Gauge().DataPoints().At(0).Attributes().AsRaw() + case pdata.MetricDataTypeSum: + return m.Sum().DataPoints().At(0).Attributes().AsRaw() + case pdata.MetricDataTypeHistogram: + return m.Histogram().DataPoints().At(0).Attributes().AsRaw() + case pdata.MetricDataTypeSummary: + return m.Summary().DataPoints().At(0).Attributes().AsRaw() + } + return nil +} diff --git a/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml b/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml new file mode 100644 index 000000000..9cdc69dc4 --- /dev/null +++ b/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml @@ -0,0 +1,21 @@ +receivers: + nop: + +processors: + resourcetometricsattrs: + resourcetometricsattrs/customname: + operations: + - from: "some.resource.level.attr" + to: "some.metricdatapoint.level.attr" + - from: "another.resource.attr" + to: "another.datapoint.attr" + +exporters: + nop: + +service: + pipelines: + logs: + receivers: [nop] + processors: [resourcetometricsattrs] + exporters: [nop] \ No newline at end of file From 684d726ed2dab119ce7a28591379bb0b0bbba7a4 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 17 Nov 2021 16:25:13 -0500 Subject: [PATCH 2/7] Add processor to factories --- pkg/collector/factories.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/collector/factories.go b/pkg/collector/factories.go index 31b339c26..23003b22d 100644 --- a/pkg/collector/factories.go +++ b/pkg/collector/factories.go @@ -1,6 +1,7 @@ package collector import ( + "github.com/observiq/observiq-collector/pkg/processor/resourcetometricsattrsprocessor" "github.com/observiq/observiq-collector/pkg/receiver/logsreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/observiqexporter" @@ -51,6 +52,7 @@ var defaultProcessors = []component.ProcessorFactory{ batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), probabilisticsamplerprocessor.NewFactory(), + resourcetometricsattrsprocessor.NewFactory(), componenttest.NewNopProcessorFactory(), } From 1131bca6aed423377919b73ec38b3cae9f2f2e40 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 17 Nov 2021 16:55:31 -0500 Subject: [PATCH 3/7] Fix lint errors --- .../processor_test.go | 72 ++++++------------- 1 file changed, 22 insertions(+), 50 deletions(-) diff --git a/pkg/processor/resourcetometricsattrsprocessor/processor_test.go b/pkg/processor/resourcetometricsattrsprocessor/processor_test.go index 10ef2a1de..35582a418 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/processor_test.go +++ b/pkg/processor/resourcetometricsattrsprocessor/processor_test.go @@ -2,7 +2,6 @@ package resourcetometricsattrsprocessor import ( "context" - "fmt" "testing" "github.com/stretchr/testify/mock" @@ -70,7 +69,8 @@ func TestConsumeMetricsNoop(t *testing.T) { createDefaultConfig().(*Config), ) - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Empty(t, getMetricAttrsFromMetrics(metricsOut)) } @@ -137,7 +137,8 @@ func TestConsumeMetricsMoveExistingAttribs(t *testing.T) { dp := metric.Gauge().DataPoints() dp.AppendEmpty().SetDoubleVal(3.0) - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}{ "resourceattrib1": "value", @@ -183,7 +184,8 @@ func TestConsumeMetricsMixedExistence(t *testing.T) { dp := metric.Gauge().DataPoints() dp.AppendEmpty().SetDoubleVal(3.0) - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}{ "resourceattrib1out": "value1", @@ -223,7 +225,8 @@ func TestConsumeMetricsSum(t *testing.T) { dp := metric.Sum().DataPoints() dp.AppendEmpty().SetDoubleVal(3.0) - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}{ "resourceattrib1out": "value1", @@ -263,7 +266,8 @@ func TestConsumeMetricsHistogram(t *testing.T) { dp := metric.Histogram().DataPoints() dp.AppendEmpty() - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}{ "resourceattrib1out": "value1", @@ -303,7 +307,8 @@ func TestConsumeMetricsSummary(t *testing.T) { dp := metric.Summary().DataPoints() dp.AppendEmpty() - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}{ "resourceattrib1out": "value1", @@ -341,7 +346,8 @@ func TestConsumeMetricsNone(t *testing.T) { metric := getMetric(metrics) metric.SetDataType(pdata.MetricDataTypeNone) - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}(nil), getMetricAttrsFromMetrics(metricsOut)) } @@ -384,7 +390,8 @@ func TestConsumeMetricsDoesNotOverwrite(t *testing.T) { dp := metric.Gauge().DataPoints() dp.AppendEmpty().SetDoubleVal(3.0) - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}{ "out": "value1", @@ -431,7 +438,8 @@ func TestConsumeMetricsDoesNotOverwrite2(t *testing.T) { dp.SetDoubleVal(3.0) dp.Attributes().InsertString("out", "originalvalue") - p.ConsumeMetrics(ctx, metrics) + err := p.ConsumeMetrics(ctx, metrics) + require.NoError(t, err) require.Equal(t, map[string]interface{}{ "out": "originalvalue", @@ -442,58 +450,26 @@ type mockConsumer struct { mock.Mock } -func (m mockConsumer) Start(ctx context.Context, host component.Host) error { +func (m *mockConsumer) Start(ctx context.Context, host component.Host) error { args := m.Called(ctx, host) return args.Error(0) } -func (m mockConsumer) Capabilities() consumer.Capabilities { +func (m *mockConsumer) Capabilities() consumer.Capabilities { args := m.Called() return args.Get(0).(consumer.Capabilities) } -func (m mockConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { +func (m *mockConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { args := m.Called(ctx, md) return args.Error(0) } -func (m mockConsumer) Shutdown(ctx context.Context) error { +func (m *mockConsumer) Shutdown(ctx context.Context) error { args := m.Called(ctx) return args.Error(0) } -func assertAttributeMapIsEqual(t *testing.T, m1, m2 pdata.AttributeMap) { - if m1.Len() != m2.Len() { - require.Fail(t, "Len of m1 (%d) does not equal Len of m2 (%d)", m1.Len(), m2.Len()) - } - - m1.Range(func(k string, v pdata.AttributeValue) bool { - v2, ok := m2.Get(k) - if !ok { - require.Fail(t, "m2 does not contain key %s, but m1 does", k) - return false - } - - err := assertAttributeValueIsEqual(v, v2) - if err != nil { - require.Fail(t, "Attribute value for %s was not the same between m1 and m2: %v", k, err) - return false - } - - return true - }) -} - -func assertAttributeValueIsEqual(a1, a2 pdata.AttributeValue) error { - if a1.Type() != a2.Type() { - return fmt.Errorf("type of a1 (%d) is not equal to type of a2 (%d)", a1.Type(), a2.Type()) - } - if !a1.Equal(a2) { - return fmt.Errorf("a1 (%s) was not equal to a2 (%s)", a1.AsString(), a2.AsString()) - } - return nil -} - func getMetric(m pdata.Metrics) pdata.Metric { return m.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0) } @@ -504,10 +480,6 @@ func createMetrics() pdata.Metrics { return metrics } -func getResourceAttrsFromMetrics(m pdata.Metrics) map[string]interface{} { - return m.ResourceMetrics().At(0).Resource().Attributes().AsRaw() -} - func getMetricAttrsFromMetrics(m pdata.Metrics) map[string]interface{} { return getMetricAttrsFromMetric(getMetric(m)) } From be3860e87073e9e439f308a800813358387918f2 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Nov 2021 15:36:14 -0500 Subject: [PATCH 4/7] testdata config should be a metrics pipeline, not a logs one --- .../resourcetometricsattrsprocessor/testdata/config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml b/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml index 9cdc69dc4..cf9a18b68 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml +++ b/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml @@ -15,7 +15,7 @@ exporters: service: pipelines: - logs: + metrics: receivers: [nop] processors: [resourcetometricsattrs] exporters: [nop] \ No newline at end of file From 818316cdcbb44918f144399cdd64ee435d9dd62c Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Nov 2021 16:00:42 -0500 Subject: [PATCH 5/7] Rename to resourceattributetransposer --- pkg/collector/factories.go | 4 +-- .../README.md | 2 +- .../config.go | 2 +- .../config_test.go | 2 +- .../factory.go | 12 ++++----- .../factory_test.go | 2 +- .../processor.go | 18 ++++++------- .../processor_test.go | 26 +++++++++---------- .../testdata/config.yaml | 6 ++--- 9 files changed, 37 insertions(+), 37 deletions(-) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/README.md (97%) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/config.go (92%) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/config_test.go (96%) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/factory.go (74%) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/factory_test.go (95%) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/processor.go (79%) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/processor_test.go (95%) rename pkg/processor/{resourcetometricsattrsprocessor => resourceattributetransposer}/testdata/config.yaml (71%) diff --git a/pkg/collector/factories.go b/pkg/collector/factories.go index 23003b22d..e01af00b3 100644 --- a/pkg/collector/factories.go +++ b/pkg/collector/factories.go @@ -1,7 +1,7 @@ package collector import ( - "github.com/observiq/observiq-collector/pkg/processor/resourcetometricsattrsprocessor" + "github.com/observiq/observiq-collector/pkg/processor/resourceattributetransposer" "github.com/observiq/observiq-collector/pkg/receiver/logsreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/observiqexporter" @@ -52,7 +52,7 @@ var defaultProcessors = []component.ProcessorFactory{ batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), probabilisticsamplerprocessor.NewFactory(), - resourcetometricsattrsprocessor.NewFactory(), + resourceattributetransposer.NewFactory(), componenttest.NewNopProcessorFactory(), } diff --git a/pkg/processor/resourcetometricsattrsprocessor/README.md b/pkg/processor/resourceattributetransposer/README.md similarity index 97% rename from pkg/processor/resourcetometricsattrsprocessor/README.md rename to pkg/processor/resourceattributetransposer/README.md index 8c8065537..c64064488 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/README.md +++ b/pkg/processor/resourceattributetransposer/README.md @@ -14,7 +14,7 @@ The following options may be configured: ```yaml processors: - resourcetometricsattrs: + resourceattributetransposer: operations: - from: "some.resource.level.attr" to: "some.metricdatapoint.level.attr" diff --git a/pkg/processor/resourcetometricsattrsprocessor/config.go b/pkg/processor/resourceattributetransposer/config.go similarity index 92% rename from pkg/processor/resourcetometricsattrsprocessor/config.go rename to pkg/processor/resourceattributetransposer/config.go index 409491a52..b6d407315 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/config.go +++ b/pkg/processor/resourceattributetransposer/config.go @@ -1,4 +1,4 @@ -package resourcetometricsattrsprocessor +package resourceattributetransposer import "go.opentelemetry.io/collector/config" diff --git a/pkg/processor/resourcetometricsattrsprocessor/config_test.go b/pkg/processor/resourceattributetransposer/config_test.go similarity index 96% rename from pkg/processor/resourcetometricsattrsprocessor/config_test.go rename to pkg/processor/resourceattributetransposer/config_test.go index 672a0eb20..df48fc268 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/config_test.go +++ b/pkg/processor/resourceattributetransposer/config_test.go @@ -1,4 +1,4 @@ -package resourcetometricsattrsprocessor +package resourceattributetransposer import ( "path" diff --git a/pkg/processor/resourcetometricsattrsprocessor/factory.go b/pkg/processor/resourceattributetransposer/factory.go similarity index 74% rename from pkg/processor/resourcetometricsattrsprocessor/factory.go rename to pkg/processor/resourceattributetransposer/factory.go index ad8922ef4..5b37f3b86 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/factory.go +++ b/pkg/processor/resourceattributetransposer/factory.go @@ -1,4 +1,4 @@ -package resourcetometricsattrsprocessor +package resourceattributetransposer import ( "context" @@ -11,10 +11,10 @@ import ( ) const ( - typeStr = "resourcetometricsattrs" + typeStr = "resourceattributetransposer" ) -// NewFactory returns a new factory for the resourcetometricsattrs processor. +// NewFactory returns a new factory for the resourceattributetransposer processor. func NewFactory() component.ProcessorFactory { return processorhelper.NewFactory( typeStr, @@ -23,19 +23,19 @@ func NewFactory() component.ProcessorFactory { ) } -// createDefaultConfig returns the default config for the resourcetometricsattrs processor. +// createDefaultConfig returns the default config for the resourceattributetransposer processor. func createDefaultConfig() config.Processor { return &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), } } -// createMetricsProcessor creates the resourcetometricsattrs processor. +// createMetricsProcessor creates the resourceattributetransposer processor. func createMetricsProcessor(ctx context.Context, params component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Metrics) (component.MetricsProcessor, error) { processorCfg, ok := cfg.(*Config) if !ok { return nil, fmt.Errorf("config was not of correct type for the processor: %+v", cfg) } - return newResourceToMetricsAttributesProcessor(params.Logger, nextConsumer, processorCfg), nil + return newResourceAttributeTransposerProcessor(params.Logger, nextConsumer, processorCfg), nil } diff --git a/pkg/processor/resourcetometricsattrsprocessor/factory_test.go b/pkg/processor/resourceattributetransposer/factory_test.go similarity index 95% rename from pkg/processor/resourcetometricsattrsprocessor/factory_test.go rename to pkg/processor/resourceattributetransposer/factory_test.go index 6f2b86dd3..cbd1a7308 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/factory_test.go +++ b/pkg/processor/resourceattributetransposer/factory_test.go @@ -1,4 +1,4 @@ -package resourcetometricsattrsprocessor +package resourceattributetransposer import ( "context" diff --git a/pkg/processor/resourcetometricsattrsprocessor/processor.go b/pkg/processor/resourceattributetransposer/processor.go similarity index 79% rename from pkg/processor/resourcetometricsattrsprocessor/processor.go rename to pkg/processor/resourceattributetransposer/processor.go index 73e33a498..a9a6645ba 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/processor.go +++ b/pkg/processor/resourceattributetransposer/processor.go @@ -1,4 +1,4 @@ -package resourcetometricsattrsprocessor +package resourceattributetransposer import ( "context" @@ -9,15 +9,15 @@ import ( "go.uber.org/zap" ) -type resourceToMetricsAttributesProcessor struct { +type resourceAttributeTransposerProcessor struct { consumer consumer.Metrics logger *zap.Logger config *Config } -// newResourceToMetricsAttributesProcessor returns a new resourceToMetricsAttributesProcessor -func newResourceToMetricsAttributesProcessor(logger *zap.Logger, consumer consumer.Metrics, config *Config) *resourceToMetricsAttributesProcessor { - return &resourceToMetricsAttributesProcessor{ +// newResourceAttributeTransposerProcessor returns a new resourceToMetricsAttributesProcessor +func newResourceAttributeTransposerProcessor(logger *zap.Logger, consumer consumer.Metrics, config *Config) *resourceAttributeTransposerProcessor { + return &resourceAttributeTransposerProcessor{ consumer: consumer, logger: logger, config: config, @@ -25,17 +25,17 @@ func newResourceToMetricsAttributesProcessor(logger *zap.Logger, consumer consum } // Start starts the processor. It's a noop. -func (resourceToMetricsAttributesProcessor) Start(ctx context.Context, host component.Host) error { +func (resourceAttributeTransposerProcessor) Start(ctx context.Context, host component.Host) error { return nil } // Capabilities returns the consumer's capabilities. Indicates that this processor mutates the incoming metrics. -func (resourceToMetricsAttributesProcessor) Capabilities() consumer.Capabilities { +func (resourceAttributeTransposerProcessor) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } // ConsumeMetrics processes the incoming pdata.Metrics. -func (p resourceToMetricsAttributesProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { +func (p resourceAttributeTransposerProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { resMetrics := md.ResourceMetrics() for i := 0; i < resMetrics.Len(); i++ { resMetric := resMetrics.At(i) @@ -61,7 +61,7 @@ func (p resourceToMetricsAttributesProcessor) ConsumeMetrics(ctx context.Context } // Shutdown stops the processor. It's a noop. -func (resourceToMetricsAttributesProcessor) Shutdown(ctx context.Context) error { +func (resourceAttributeTransposerProcessor) Shutdown(ctx context.Context) error { return nil } diff --git a/pkg/processor/resourcetometricsattrsprocessor/processor_test.go b/pkg/processor/resourceattributetransposer/processor_test.go similarity index 95% rename from pkg/processor/resourcetometricsattrsprocessor/processor_test.go rename to pkg/processor/resourceattributetransposer/processor_test.go index 35582a418..da38315d0 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/processor_test.go +++ b/pkg/processor/resourceattributetransposer/processor_test.go @@ -1,4 +1,4 @@ -package resourcetometricsattrsprocessor +package resourceattributetransposer import ( "context" @@ -15,7 +15,7 @@ import ( ) func TestProcessorStart(t *testing.T) { - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumertest.NewNop(), createDefaultConfig().(*Config), @@ -26,7 +26,7 @@ func TestProcessorStart(t *testing.T) { } func TestProcessorShutdown(t *testing.T) { - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumertest.NewNop(), createDefaultConfig().(*Config), @@ -37,7 +37,7 @@ func TestProcessorShutdown(t *testing.T) { } func TestProcessorCapabilities(t *testing.T) { - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumertest.NewNop(), createDefaultConfig().(*Config), @@ -63,7 +63,7 @@ func TestConsumeMetricsNoop(t *testing.T) { metricsOut = args[1].(pdata.Metrics) }).Return(nil) - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, createDefaultConfig().(*Config), @@ -126,7 +126,7 @@ func TestConsumeMetricsMoveExistingAttribs(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, @@ -173,7 +173,7 @@ func TestConsumeMetricsMixedExistence(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, @@ -214,7 +214,7 @@ func TestConsumeMetricsSum(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, @@ -255,7 +255,7 @@ func TestConsumeMetricsHistogram(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, @@ -296,7 +296,7 @@ func TestConsumeMetricsSummary(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, @@ -337,7 +337,7 @@ func TestConsumeMetricsNone(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, @@ -379,7 +379,7 @@ func TestConsumeMetricsDoesNotOverwrite(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, @@ -425,7 +425,7 @@ func TestConsumeMetricsDoesNotOverwrite2(t *testing.T) { }, } - p := newResourceToMetricsAttributesProcessor( + p := newResourceAttributeTransposerProcessor( zap.NewNop(), consumer, cfg, diff --git a/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml b/pkg/processor/resourceattributetransposer/testdata/config.yaml similarity index 71% rename from pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml rename to pkg/processor/resourceattributetransposer/testdata/config.yaml index cf9a18b68..3126a257e 100644 --- a/pkg/processor/resourcetometricsattrsprocessor/testdata/config.yaml +++ b/pkg/processor/resourceattributetransposer/testdata/config.yaml @@ -2,8 +2,8 @@ receivers: nop: processors: - resourcetometricsattrs: - resourcetometricsattrs/customname: + resourceattributetransposer: + resourceattributetransposer/customname: operations: - from: "some.resource.level.attr" to: "some.metricdatapoint.level.attr" @@ -17,5 +17,5 @@ service: pipelines: metrics: receivers: [nop] - processors: [resourcetometricsattrs] + processors: [resourceattributetransposer] exporters: [nop] \ No newline at end of file From a096eccef1190ded405047049e5810f055868bc4 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Nov 2021 16:07:14 -0500 Subject: [PATCH 6/7] Suffix package with processor, follow conventions I realized how it was redundant to have it under a processor package, then have it suffixed with processor, then I realized not doing that was breaking convention. --- pkg/collector/factories.go | 4 ++-- .../README.md | 0 .../config.go | 2 +- .../config_test.go | 2 +- .../factory.go | 2 +- .../factory_test.go | 2 +- .../processor.go | 2 +- .../processor_test.go | 2 +- .../testdata/config.yaml | 0 9 files changed, 8 insertions(+), 8 deletions(-) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/README.md (100%) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/config.go (91%) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/config_test.go (96%) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/factory.go (96%) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/factory_test.go (95%) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/processor.go (98%) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/processor_test.go (99%) rename pkg/processor/{resourceattributetransposer => resourceattributetransposerprocessor}/testdata/config.yaml (100%) diff --git a/pkg/collector/factories.go b/pkg/collector/factories.go index e01af00b3..806416bd2 100644 --- a/pkg/collector/factories.go +++ b/pkg/collector/factories.go @@ -1,7 +1,7 @@ package collector import ( - "github.com/observiq/observiq-collector/pkg/processor/resourceattributetransposer" + "github.com/observiq/observiq-collector/pkg/processor/resourceattributetransposerprocessor" "github.com/observiq/observiq-collector/pkg/receiver/logsreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/observiqexporter" @@ -52,7 +52,7 @@ var defaultProcessors = []component.ProcessorFactory{ batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), probabilisticsamplerprocessor.NewFactory(), - resourceattributetransposer.NewFactory(), + resourceattributetransposerprocessor.NewFactory(), componenttest.NewNopProcessorFactory(), } diff --git a/pkg/processor/resourceattributetransposer/README.md b/pkg/processor/resourceattributetransposerprocessor/README.md similarity index 100% rename from pkg/processor/resourceattributetransposer/README.md rename to pkg/processor/resourceattributetransposerprocessor/README.md diff --git a/pkg/processor/resourceattributetransposer/config.go b/pkg/processor/resourceattributetransposerprocessor/config.go similarity index 91% rename from pkg/processor/resourceattributetransposer/config.go rename to pkg/processor/resourceattributetransposerprocessor/config.go index b6d407315..7a0fbc6c9 100644 --- a/pkg/processor/resourceattributetransposer/config.go +++ b/pkg/processor/resourceattributetransposerprocessor/config.go @@ -1,4 +1,4 @@ -package resourceattributetransposer +package resourceattributetransposerprocessor import "go.opentelemetry.io/collector/config" diff --git a/pkg/processor/resourceattributetransposer/config_test.go b/pkg/processor/resourceattributetransposerprocessor/config_test.go similarity index 96% rename from pkg/processor/resourceattributetransposer/config_test.go rename to pkg/processor/resourceattributetransposerprocessor/config_test.go index df48fc268..e8d7ea438 100644 --- a/pkg/processor/resourceattributetransposer/config_test.go +++ b/pkg/processor/resourceattributetransposerprocessor/config_test.go @@ -1,4 +1,4 @@ -package resourceattributetransposer +package resourceattributetransposerprocessor import ( "path" diff --git a/pkg/processor/resourceattributetransposer/factory.go b/pkg/processor/resourceattributetransposerprocessor/factory.go similarity index 96% rename from pkg/processor/resourceattributetransposer/factory.go rename to pkg/processor/resourceattributetransposerprocessor/factory.go index 5b37f3b86..863991363 100644 --- a/pkg/processor/resourceattributetransposer/factory.go +++ b/pkg/processor/resourceattributetransposerprocessor/factory.go @@ -1,4 +1,4 @@ -package resourceattributetransposer +package resourceattributetransposerprocessor import ( "context" diff --git a/pkg/processor/resourceattributetransposer/factory_test.go b/pkg/processor/resourceattributetransposerprocessor/factory_test.go similarity index 95% rename from pkg/processor/resourceattributetransposer/factory_test.go rename to pkg/processor/resourceattributetransposerprocessor/factory_test.go index cbd1a7308..507be8613 100644 --- a/pkg/processor/resourceattributetransposer/factory_test.go +++ b/pkg/processor/resourceattributetransposerprocessor/factory_test.go @@ -1,4 +1,4 @@ -package resourceattributetransposer +package resourceattributetransposerprocessor import ( "context" diff --git a/pkg/processor/resourceattributetransposer/processor.go b/pkg/processor/resourceattributetransposerprocessor/processor.go similarity index 98% rename from pkg/processor/resourceattributetransposer/processor.go rename to pkg/processor/resourceattributetransposerprocessor/processor.go index a9a6645ba..27f92c6ca 100644 --- a/pkg/processor/resourceattributetransposer/processor.go +++ b/pkg/processor/resourceattributetransposerprocessor/processor.go @@ -1,4 +1,4 @@ -package resourceattributetransposer +package resourceattributetransposerprocessor import ( "context" diff --git a/pkg/processor/resourceattributetransposer/processor_test.go b/pkg/processor/resourceattributetransposerprocessor/processor_test.go similarity index 99% rename from pkg/processor/resourceattributetransposer/processor_test.go rename to pkg/processor/resourceattributetransposerprocessor/processor_test.go index da38315d0..90b34247b 100644 --- a/pkg/processor/resourceattributetransposer/processor_test.go +++ b/pkg/processor/resourceattributetransposerprocessor/processor_test.go @@ -1,4 +1,4 @@ -package resourceattributetransposer +package resourceattributetransposerprocessor import ( "context" diff --git a/pkg/processor/resourceattributetransposer/testdata/config.yaml b/pkg/processor/resourceattributetransposerprocessor/testdata/config.yaml similarity index 100% rename from pkg/processor/resourceattributetransposer/testdata/config.yaml rename to pkg/processor/resourceattributetransposerprocessor/testdata/config.yaml From f7439d3acb0465c75233c6ac2a9b59f6632cf85a Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 18 Nov 2021 16:09:22 -0500 Subject: [PATCH 7/7] Change header in README --- pkg/processor/resourceattributetransposerprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/processor/resourceattributetransposerprocessor/README.md b/pkg/processor/resourceattributetransposerprocessor/README.md index c64064488..bafbb0275 100644 --- a/pkg/processor/resourceattributetransposerprocessor/README.md +++ b/pkg/processor/resourceattributetransposerprocessor/README.md @@ -1,4 +1,4 @@ -# Resource to Metrics Attributes Processor +# Resource Attribute Transposer Processor This processor copies a resource level attribute to all individual metric data points associated with the resource. If they key already exists, no action is taken (the data points' attribute _**IS NOT**_ overwritten)