From 2791c68339cd5cced2347f1842264520462ee0a8 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 6 Aug 2021 16:37:36 -0400 Subject: [PATCH 01/21] Add metric tracking library to replace awsmetrics dependency. --- .../tracking/identity.go | 86 ++++++ .../tracking/identity_test.go | 181 +++++++++++++ .../tracking/metric.go | 20 ++ .../tracking/tracker.go | 191 +++++++++++++ .../tracking/tracker_test.go | 256 ++++++++++++++++++ .../tracking/value.go | 25 ++ 6 files changed, 759 insertions(+) create mode 100644 processor/cumulativetodeltaprocessor/tracking/identity.go create mode 100644 processor/cumulativetodeltaprocessor/tracking/identity_test.go create mode 100644 processor/cumulativetodeltaprocessor/tracking/metric.go create mode 100644 processor/cumulativetodeltaprocessor/tracking/tracker.go create mode 100644 processor/cumulativetodeltaprocessor/tracking/tracker_test.go create mode 100644 processor/cumulativetodeltaprocessor/tracking/value.go diff --git a/processor/cumulativetodeltaprocessor/tracking/identity.go b/processor/cumulativetodeltaprocessor/tracking/identity.go new file mode 100644 index 000000000000..4d790475498e --- /dev/null +++ b/processor/cumulativetodeltaprocessor/tracking/identity.go @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracking + +import ( + "bytes" + "strconv" + + "go.opentelemetry.io/collector/model/pdata" + tracetranslator "go.opentelemetry.io/collector/translator/trace" +) + +type MetricIdentity struct { + Resource pdata.Resource + InstrumentationLibrary pdata.InstrumentationLibrary + MetricDataType pdata.MetricDataType + MetricIsMonotonic bool + MetricName string + MetricUnit string + StartTimestamp pdata.Timestamp + LabelsMap pdata.StringMap + MetricValueType pdata.MetricValueType +} + +const A = int32('A') +const SEP = byte(0x1E) +const SEPSTR = string(SEP) + +func (mi *MetricIdentity) Write(b *bytes.Buffer) { + b.WriteRune(A + int32(mi.MetricDataType)) + b.WriteByte(SEP) + b.WriteRune(A + int32(mi.MetricValueType)) + mi.Resource.Attributes().Sort().Range(func(k string, v pdata.AttributeValue) bool { + b.WriteByte(SEP) + b.WriteString(k) + b.WriteByte(':') + b.WriteString(tracetranslator.AttributeValueToString(v)) + return true + }) + + b.WriteByte(SEP) + b.WriteString(mi.InstrumentationLibrary.Name()) + b.WriteByte(SEP) + b.WriteString(mi.InstrumentationLibrary.Version()) + b.WriteByte(SEP) + if mi.MetricIsMonotonic { + b.WriteByte('Y') + } else { + b.WriteByte('N') + } + + b.WriteByte(SEP) + b.WriteString(mi.MetricName) + b.WriteByte(SEP) + b.WriteString(mi.MetricUnit) + + mi.LabelsMap.Sort().Range(func(k, v string) bool { + b.WriteByte(SEP) + b.WriteString(k) + b.WriteByte(':') + b.WriteString(v) + return true + }) + b.WriteByte(SEP) + b.WriteString(strconv.FormatInt(int64(mi.StartTimestamp), 36)) +} + +func (mi *MetricIdentity) IsFloatVal() bool { + return mi.MetricValueType == pdata.MetricValueTypeDouble +} + +func (mi *MetricIdentity) IsSupportedMetricType() bool { + return mi.MetricDataType == pdata.MetricDataTypeSum +} diff --git a/processor/cumulativetodeltaprocessor/tracking/identity_test.go b/processor/cumulativetodeltaprocessor/tracking/identity_test.go new file mode 100644 index 000000000000..7a40f55562bc --- /dev/null +++ b/processor/cumulativetodeltaprocessor/tracking/identity_test.go @@ -0,0 +1,181 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracking + +import ( + "bytes" + "strings" + "testing" + + "go.opentelemetry.io/collector/model/pdata" +) + +func TestMetricIdentity_Write(t *testing.T) { + resource := pdata.NewResource() + resource.Attributes().InitFromMap(map[string]pdata.AttributeValue{ + "resource": pdata.NewAttributeValueBool(true), + }) + + il := pdata.NewInstrumentationLibrary() + il.SetName("ilm_name") + il.SetVersion("ilm_version") + + labels := pdata.NewStringMap() + labels.InitFromMap(map[string]string{ + "label": "value", + }) + type fields struct { + Resource pdata.Resource + InstrumentationLibrary pdata.InstrumentationLibrary + MetricDataType pdata.MetricDataType + MetricIsMonotonic bool + MetricName string + MetricUnit string + StartTimestamp pdata.Timestamp + LabelsMap pdata.StringMap + MetricValueType pdata.MetricValueType + } + tests := []struct { + name string + fields fields + want []string + }{ + { + name: "all present", + fields: fields{ + Resource: resource, + InstrumentationLibrary: il, + LabelsMap: labels, + MetricName: "m_name", + MetricUnit: "m_unit", + }, + want: []string{"A" + SEPSTR + "A", "resource:true", "ilm_name", "ilm_version", "label:value", "N", "0", "m_name", "m_unit"}, + }, + { + name: "value and data type", + fields: fields{ + Resource: resource, + InstrumentationLibrary: il, + LabelsMap: labels, + MetricDataType: pdata.MetricDataTypeSum, + MetricValueType: pdata.MetricValueTypeInt, + MetricIsMonotonic: true, + }, + want: []string{"C" + SEPSTR + "B", "Y"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mi := &MetricIdentity{ + Resource: tt.fields.Resource, + InstrumentationLibrary: tt.fields.InstrumentationLibrary, + MetricDataType: tt.fields.MetricDataType, + MetricIsMonotonic: tt.fields.MetricIsMonotonic, + MetricName: tt.fields.MetricName, + MetricUnit: tt.fields.MetricUnit, + StartTimestamp: tt.fields.StartTimestamp, + LabelsMap: tt.fields.LabelsMap, + MetricValueType: tt.fields.MetricValueType, + } + b := &bytes.Buffer{} + mi.Write(b) + got := b.String() + for _, want := range tt.want { + if !strings.Contains(got, SEPSTR+want+SEPSTR) && !strings.HasSuffix(got, SEPSTR+want) && !strings.HasPrefix(got, want+SEPSTR) { + t.Errorf("MetricIdentity.Write() = %v, want %v", got, want) + } + } + }) + } +} + +func TestMetricIdentity_IsFloatVal(t *testing.T) { + type fields struct { + MetricValueType pdata.MetricValueType + } + tests := []struct { + name string + fields fields + want bool + }{ + { + name: "float", + fields: fields{ + MetricValueType: pdata.MetricValueTypeDouble, + }, + want: true, + }, + { + name: "int", + fields: fields{ + MetricValueType: pdata.MetricValueTypeInt, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mi := &MetricIdentity{ + Resource: pdata.NewResource(), + InstrumentationLibrary: pdata.NewInstrumentationLibrary(), + LabelsMap: pdata.NewStringMap(), + MetricDataType: pdata.MetricDataTypeSum, + MetricValueType: tt.fields.MetricValueType, + } + if got := mi.IsFloatVal(); got != tt.want { + t.Errorf("MetricIdentity.IsFloatVal() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestMetricIdentity_IsSupportedMetricType(t *testing.T) { + type fields struct { + MetricDataType pdata.MetricDataType + } + tests := []struct { + name string + fields fields + want bool + }{ + { + name: "sum", + fields: fields{ + MetricDataType: pdata.MetricDataTypeSum, + }, + want: true, + }, + { + name: "histogram", + fields: fields{ + MetricDataType: pdata.MetricDataTypeHistogram, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mi := &MetricIdentity{ + Resource: pdata.NewResource(), + InstrumentationLibrary: pdata.NewInstrumentationLibrary(), + LabelsMap: pdata.NewStringMap(), + MetricDataType: tt.fields.MetricDataType, + } + if got := mi.IsSupportedMetricType(); got != tt.want { + t.Errorf("MetricIdentity.IsSupportedMetricType() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/processor/cumulativetodeltaprocessor/tracking/metric.go b/processor/cumulativetodeltaprocessor/tracking/metric.go new file mode 100644 index 000000000000..20524bec5e01 --- /dev/null +++ b/processor/cumulativetodeltaprocessor/tracking/metric.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracking + +type MetricPoint struct { + Identity MetricIdentity + Value ValuePoint +} diff --git a/processor/cumulativetodeltaprocessor/tracking/tracker.go b/processor/cumulativetodeltaprocessor/tracking/tracker.go new file mode 100644 index 000000000000..c3d72fb0deaa --- /dev/null +++ b/processor/cumulativetodeltaprocessor/tracking/tracker.go @@ -0,0 +1,191 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracking + +import ( + "bytes" + "context" + "math" + "sync" + "time" + + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +// Allocate a minimum of 64 bytes to the builder initially +const initialBytes = 64 + +var identityBufferPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, initialBytes)) + }, +} + +type State struct { + Identity MetricIdentity + PrevPoint ValuePoint + mu sync.Mutex +} + +func (s *State) Lock() { + s.mu.Lock() +} + +func (s *State) Unlock() { + s.mu.Unlock() +} + +type DeltaValue struct { + StartTimestamp pdata.Timestamp + FloatValue float64 + IntValue int64 +} + +type MetricTracker interface { + Convert(MetricPoint) (DeltaValue, bool) +} + +func NewMetricTracker(ctx context.Context, logger *zap.Logger, maxStale time.Duration) MetricTracker { + t := &metricTracker{logger: logger, maxStale: maxStale} + if maxStale > 0 { + go t.sweeper(ctx, t.removeStale) + } + return t +} + +type metricTracker struct { + logger *zap.Logger + maxStale time.Duration + states sync.Map +} + +func (t *metricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) { + metricID := in.Identity + metricPoint := in.Value + if !metricID.IsSupportedMetricType() { + return + } + + // NaN is used to signal "stale" metrics. + // These are ignored for now. + // https://github.com/open-telemetry/opentelemetry-collector/pull/3423 + if metricID.IsFloatVal() && math.IsNaN(metricPoint.FloatValue) { + return + } + + b := identityBufferPool.Get().(*bytes.Buffer) + b.Reset() + metricID.Write(b) + hashableID := b.String() + identityBufferPool.Put(b) + + var s interface{} + var ok bool + if s, ok = t.states.Load(hashableID); !ok { + s, ok = t.states.LoadOrStore(hashableID, &State{ + Identity: metricID, + PrevPoint: metricPoint, + }) + } + + if !ok { + if metricID.MetricIsMonotonic { + out = DeltaValue{ + StartTimestamp: metricPoint.ObservedTimestamp, + FloatValue: metricPoint.FloatValue, + IntValue: metricPoint.IntValue, + } + valid = true + } + return + } + valid = true + + state := s.(*State) + state.Lock() + defer state.Unlock() + + out.StartTimestamp = state.PrevPoint.ObservedTimestamp + + if metricID.IsFloatVal() { + value := metricPoint.FloatValue + prevValue := state.PrevPoint.FloatValue + delta := value - prevValue + + // Detect reset on a monotonic counter + if metricID.MetricIsMonotonic && value < prevValue { + delta = value + } + + out.FloatValue = delta + } else { + value := metricPoint.IntValue + prevValue := state.PrevPoint.IntValue + delta := value - prevValue + + // Detect reset on a monotonic counter + if metricID.MetricIsMonotonic && value < prevValue { + delta = value + } + + out.IntValue = delta + } + + state.PrevPoint = metricPoint + return +} + +func (t *metricTracker) removeStale(staleBefore pdata.Timestamp) { + t.states.Range(func(key, value interface{}) bool { + s := value.(*State) + + // There is a known race condition here. + // Because the state may be in the process of updating at the + // same time as the stale removal, there is a chance that we + // will remove a "stale" state that is in the process of + // updating. This can only happen when datapoints arrive around + // the expiration time. + // + // In this case, the possible outcomes are: + // * Updating goroutine wins, point will not be stale + // * Stale removal wins, updating goroutine will still see + // the removed state but the state after the update will + // not be persisted. The next update will load an entirely + // new state. + s.Lock() + lastObserved := s.PrevPoint.ObservedTimestamp + s.Unlock() + if lastObserved < staleBefore { + t.logger.Debug("removing stale state key", zap.String("key", key.(string))) + t.states.Delete(key) + } + return true + }) +} + +func (t *metricTracker) sweeper(ctx context.Context, remove func(pdata.Timestamp)) { + ticker := time.NewTicker(t.maxStale) + for { + select { + case currentTime := <-ticker.C: + staleBefore := pdata.TimestampFromTime(currentTime.Add(-t.maxStale)) + remove(staleBefore) + case <-ctx.Done(): + ticker.Stop() + return + } + } +} diff --git a/processor/cumulativetodeltaprocessor/tracking/tracker_test.go b/processor/cumulativetodeltaprocessor/tracking/tracker_test.go new file mode 100644 index 000000000000..be8638317c4e --- /dev/null +++ b/processor/cumulativetodeltaprocessor/tracking/tracker_test.go @@ -0,0 +1,256 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracking + +import ( + "context" + "reflect" + "testing" + "time" + + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +func TestMetricTracker_Convert(t *testing.T) { + miSum := MetricIdentity{ + Resource: pdata.NewResource(), + InstrumentationLibrary: pdata.NewInstrumentationLibrary(), + MetricDataType: pdata.MetricDataTypeSum, + MetricIsMonotonic: true, + MetricName: "", + MetricUnit: "", + LabelsMap: pdata.NewStringMap(), + } + miIntSum := miSum + miIntSum.MetricValueType = pdata.MetricValueTypeInt + miSum.MetricValueType = pdata.MetricValueTypeDouble + + m := NewMetricTracker(context.Background(), zap.NewNop(), 0) + + tests := []struct { + name string + value ValuePoint + wantOut DeltaValue + }{ + { + name: "Initial Value recorded", + value: ValuePoint{ + ObservedTimestamp: 10, + FloatValue: 100.0, + IntValue: 100, + }, + wantOut: DeltaValue{ + StartTimestamp: 10, + FloatValue: 100.0, + IntValue: 100, + }, + }, + { + name: "Higher Value Recorded", + value: ValuePoint{ + ObservedTimestamp: 50, + FloatValue: 225.0, + IntValue: 225, + }, + wantOut: DeltaValue{ + StartTimestamp: 10, + FloatValue: 125.0, + IntValue: 125, + }, + }, + { + name: "Lower Value Recorded - No Previous Offset", + value: ValuePoint{ + ObservedTimestamp: 100, + FloatValue: 75.0, + IntValue: 75, + }, + wantOut: DeltaValue{ + StartTimestamp: 50, + FloatValue: 75.0, + IntValue: 75, + }, + }, + { + name: "Record delta above first recorded value", + value: ValuePoint{ + ObservedTimestamp: 150, + FloatValue: 300.0, + IntValue: 300, + }, + wantOut: DeltaValue{ + StartTimestamp: 100, + FloatValue: 225.0, + IntValue: 225, + }, + }, + { + name: "Lower Value Recorded - Previous Offset Recorded", + value: ValuePoint{ + ObservedTimestamp: 200, + FloatValue: 25.0, + IntValue: 25, + }, + wantOut: DeltaValue{ + StartTimestamp: 150, + FloatValue: 25.0, + IntValue: 25, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + floatPoint := MetricPoint{ + Identity: miSum, + Value: tt.value, + } + intPoint := MetricPoint{ + Identity: miIntSum, + Value: tt.value, + } + + if gotOut, valid := m.Convert(floatPoint); !valid || !reflect.DeepEqual(gotOut.StartTimestamp, tt.wantOut.StartTimestamp) || !reflect.DeepEqual(gotOut.FloatValue, tt.wantOut.FloatValue) { + t.Errorf("MetricTracker.Convert(MetricDataTypeSum) = %v, want %v", gotOut, tt.wantOut) + } + + if gotOut, valid := m.Convert(intPoint); !valid || !reflect.DeepEqual(gotOut.StartTimestamp, tt.wantOut.StartTimestamp) || !reflect.DeepEqual(gotOut.IntValue, tt.wantOut.IntValue) { + t.Errorf("MetricTracker.Convert(MetricDataTypeIntSum) = %v, want %v", gotOut, tt.wantOut) + } + }) + } + + t.Run("Invalid metric identity", func(t *testing.T) { + invalidID := miIntSum + invalidID.MetricDataType = pdata.MetricDataTypeGauge + _, valid := m.Convert(MetricPoint{ + Identity: invalidID, + Value: ValuePoint{ + ObservedTimestamp: 0, + FloatValue: 100.0, + IntValue: 100, + }, + }) + if valid { + t.Error("Expected invalid for non cumulative metric") + } + }) +} + +func Test_metricTracker_removeStale(t *testing.T) { + currentTime := pdata.Timestamp(100) + freshPoint := ValuePoint{ + ObservedTimestamp: currentTime, + } + stalePoint := ValuePoint{ + ObservedTimestamp: currentTime - 1, + } + + type fields struct { + MaxStale time.Duration + States map[string]*State + } + tests := []struct { + name string + fields fields + wantOut map[string]*State + }{ + { + name: "Removes stale entry, leaves fresh entry", + fields: fields{ + MaxStale: 0, // This logic isn't tested here + States: map[string]*State{ + "stale": { + PrevPoint: stalePoint, + }, + "fresh": { + PrevPoint: freshPoint, + }, + }, + }, + wantOut: map[string]*State{ + "fresh": { + PrevPoint: freshPoint, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := &metricTracker{ + logger: zap.NewNop(), + maxStale: tt.fields.MaxStale, + } + for k, v := range tt.fields.States { + tr.states.Store(k, v) + } + tr.removeStale(currentTime) + + gotOut := make(map[string]*State) + tr.states.Range(func(key, value interface{}) bool { + gotOut[key.(string)] = value.(*State) + return true + }) + + if !reflect.DeepEqual(gotOut, tt.wantOut) { + t.Errorf("MetricTracker.removeStale() = %v, want %v", gotOut, tt.wantOut) + } + }) + } +} + +func Test_metricTracker_sweeper(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + sweepEvent := make(chan pdata.Timestamp) + closed := false + + onSweep := func(staleBefore pdata.Timestamp) { + sweepEvent <- staleBefore + } + + tr := &metricTracker{ + logger: zap.NewNop(), + maxStale: 1 * time.Millisecond, + } + + start := time.Now() + go func() { + tr.sweeper(ctx, onSweep) + closed = true + close(sweepEvent) + }() + + for i := 1; i <= 2; i++ { + staleBefore := <-sweepEvent + tickTime := time.Since(start) + tr.maxStale*time.Duration(i) + if closed { + t.Fatalf("Sweeper returned prematurely.") + } + + if tickTime < tr.maxStale { + t.Errorf("Sweeper tick time is too fast. (%v, want %v)", tickTime, tr.maxStale) + } + + staleTime := staleBefore.AsTime() + if time.Since(staleTime) < tr.maxStale { + t.Errorf("Sweeper called with invalid staleBefore value = %v", staleTime) + } + } + cancel() + <-sweepEvent + if !closed { + t.Errorf("Sweeper did not terminate.") + } +} diff --git a/processor/cumulativetodeltaprocessor/tracking/value.go b/processor/cumulativetodeltaprocessor/tracking/value.go new file mode 100644 index 000000000000..f113c9492fbc --- /dev/null +++ b/processor/cumulativetodeltaprocessor/tracking/value.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracking + +import ( + "go.opentelemetry.io/collector/model/pdata" +) + +type ValuePoint struct { + ObservedTimestamp pdata.Timestamp + FloatValue float64 + IntValue int64 +} From bbe3cbcc8f08868efa5cdefb4d321f695fa662b6 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 6 Aug 2021 16:38:28 -0400 Subject: [PATCH 02/21] Update configuration of cumulativetodelta processor. The configuration has been updated so that: * An empty list of metrics defaults to converting all cumulative metrics to delta. * The state TTL (MaxStale) is now considered configurable * There is a configuration option to enable/disable conversion of non-monotonic cumulative metrics. Non-monotonic cumulative metrics are normally considered gauges and aggregation temporality should not be converted. The default is that only monotonic values are converted. --- .../cumulativetodeltaprocessor/config.go | 20 +++-- .../cumulativetodeltaprocessor/config_test.go | 75 ++++++------------- .../cumulativetodeltaprocessor/factory.go | 1 + .../factory_test.go | 3 +- .../{config_full.yaml => config.yaml} | 3 + .../testdata/config_missing_name.yaml | 20 ----- 6 files changed, 42 insertions(+), 80 deletions(-) rename processor/cumulativetodeltaprocessor/testdata/{config_full.yaml => config.yaml} (82%) delete mode 100644 processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 0acd0c8bf30c..e56429acdcf0 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -15,7 +15,7 @@ package cumulativetodeltaprocessor import ( - "fmt" + "time" "go.opentelemetry.io/collector/config" ) @@ -24,15 +24,19 @@ import ( type Config struct { config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - // List of cumulative sum metrics to convert to delta + // List of cumulative metrics to convert to delta. Default: converts all cumulative metrics to delta. Metrics []string `mapstructure:"metrics"` + + // The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. + MaxStale time.Duration `mapstructure:"max_stale"` + + // Set to false in order to convert non monotonic metrics + MonotonicOnly bool `mapstructure:"monotonic_only"` } -// Validate checks whether the input configuration has all of the required fields for the processor. -// An error is returned if there are any invalid inputs. -func (config *Config) Validate() error { - if len(config.Metrics) == 0 { - return fmt.Errorf("metric names are missing") - } +var _ config.Processor = (*Config)(nil) + +// Validate checks if the processor configuration is valid +func (cfg *Config) Validate() error { return nil } diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index b9673b4d7d65..66a43adbeeb2 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -15,9 +15,9 @@ package cumulativetodeltaprocessor import ( - "fmt" "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,72 +26,45 @@ import ( "go.opentelemetry.io/collector/config/configtest" ) +const configFile = "config.yaml" + func TestLoadingFullConfig(t *testing.T) { + + factories, err := componenttest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Processors[typeStr] = factory + cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", configFile), factories) + assert.NoError(t, err) + require.NotNil(t, cfg) + tests := []struct { - configFile string - expCfg *Config + expCfg *Config }{ { - configFile: "config_full.yaml", expCfg: &Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + ProcessorSettings: config.NewProcessorSettings(config.NewIDWithName(typeStr, "alt")), Metrics: []string{ "metric1", "metric2", }, + MaxStale: 10 * time.Second, + MonotonicOnly: false, }, }, - } - - for _, test := range tests { - t.Run(test.expCfg.ID().String(), func(t *testing.T) { - factories, err := componenttest.NopFactories() - assert.NoError(t, err) - - factory := NewFactory() - factories.Processors[typeStr] = factory - config, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", test.configFile), factories) - assert.NoError(t, err) - require.NotNil(t, config) - - cfg := config.Processors[test.expCfg.ID()] - assert.Equal(t, test.expCfg, cfg) - }) - } -} - -func TestValidateConfig(t *testing.T) { - tests := []struct { - configName string - succeed bool - errorMessage string - }{ { - configName: "config_full.yaml", - succeed: true, - }, - { - configName: "config_missing_name.yaml", - succeed: false, - errorMessage: "metric names are missing", + expCfg: &Config{ + ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + MonotonicOnly: true, + }, }, } for _, test := range tests { - factories, err := componenttest.NopFactories() - assert.NoError(t, err) - - factory := NewFactory() - factories.Processors[typeStr] = factory - t.Run(test.configName, func(t *testing.T) { - config, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", test.configName), factories) - if test.succeed { - assert.NotNil(t, config) - assert.NoError(t, err) - } else { - assert.EqualError(t, err, fmt.Sprintf("processor %q has invalid configuration: %s", typeStr, test.errorMessage)) - } + t.Run(test.expCfg.ID().String(), func(t *testing.T) { + cfg := cfg.Processors[test.expCfg.ID()] + assert.Equal(t, test.expCfg, cfg) }) - } } diff --git a/processor/cumulativetodeltaprocessor/factory.go b/processor/cumulativetodeltaprocessor/factory.go index d28f6bb61901..2749c96c1ea2 100644 --- a/processor/cumulativetodeltaprocessor/factory.go +++ b/processor/cumulativetodeltaprocessor/factory.go @@ -42,6 +42,7 @@ func NewFactory() component.ProcessorFactory { func createDefaultConfig() config.Processor { return &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + MonotonicOnly: true, } } diff --git a/processor/cumulativetodeltaprocessor/factory_test.go b/processor/cumulativetodeltaprocessor/factory_test.go index d679d4d2b475..0793b859c4bb 100644 --- a/processor/cumulativetodeltaprocessor/factory_test.go +++ b/processor/cumulativetodeltaprocessor/factory_test.go @@ -38,6 +38,7 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, cfg, &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + MonotonicOnly: true, }) assert.NoError(t, configtest.CheckConfigStruct(cfg)) } @@ -49,7 +50,7 @@ func TestCreateProcessors(t *testing.T) { errorMessage string }{ { - configName: "config_full.yaml", + configName: "config.yaml", succeed: true, }, } diff --git a/processor/cumulativetodeltaprocessor/testdata/config_full.yaml b/processor/cumulativetodeltaprocessor/testdata/config.yaml similarity index 82% rename from processor/cumulativetodeltaprocessor/testdata/config_full.yaml rename to processor/cumulativetodeltaprocessor/testdata/config.yaml index a544070782ab..5b07746c7c3f 100644 --- a/processor/cumulativetodeltaprocessor/testdata/config_full.yaml +++ b/processor/cumulativetodeltaprocessor/testdata/config.yaml @@ -3,9 +3,12 @@ receivers: processors: cumulativetodelta: + cumulativetodelta/alt: metrics: - metric1 - metric2 + max_stale: 10s + monotonic_only: false exporters: nop: diff --git a/processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml b/processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml deleted file mode 100644 index 5043e802889e..000000000000 --- a/processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml +++ /dev/null @@ -1,20 +0,0 @@ -receivers: - nop: - -processors: - cumulativetodelta: - metrics: - -exporters: - nop: - -service: - pipelines: - traces: - receivers: [nop] - processors: [cumulativetodelta] - exporters: [nop] - metrics: - receivers: [nop] - processors: [cumulativetodelta] - exporters: [nop] From d4d3c492b04c54ff4d191156b4360b54df1d7082 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 6 Aug 2021 16:54:05 -0400 Subject: [PATCH 03/21] Update cumulativetodelta processor logic. * Uses the internal tracker library instead of AWS metrics library. This enables separation of timeseries by resource and instrumentation library. * Metric data points which are invalid (the first in a series of non-monontic cumulative values) are now removed from the dataset. --- .../cumulativetodeltaprocessor/processor.go | 150 +++++++++++------- 1 file changed, 89 insertions(+), 61 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 05aa0ea4ed81..3b6783e4bafc 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -16,33 +16,37 @@ package cumulativetodeltaprocessor import ( "context" - "math" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" - awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/tracking" ) type cumulativeToDeltaProcessor struct { - metrics map[string]bool + metrics map[string]struct{} logger *zap.Logger - deltaCalculator awsmetrics.MetricCalculator + deltaCalculator tracking.MetricTracker + monotonicOnly bool + cancelFunc context.CancelFunc } func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { - inputMetricSet := make(map[string]bool, len(config.Metrics)) - for _, name := range config.Metrics { - inputMetricSet[name] = true - } - - return &cumulativeToDeltaProcessor{ - metrics: inputMetricSet, + ctx, cancel := context.WithCancel(context.Background()) + p := &cumulativeToDeltaProcessor{ logger: logger, - deltaCalculator: newDeltaCalculator(), + deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStale), + monotonicOnly: config.MonotonicOnly, + cancelFunc: cancel, + } + if len(config.Metrics) > 0 { + p.metrics = make(map[string]struct{}, len(config.Metrics)) + for _, m := range config.Metrics { + p.metrics[m] = struct{}{} + } } + return p } // Start is invoked during service startup. @@ -53,64 +57,88 @@ func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) e // processMetrics implements the ProcessMetricsFunc type. func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { resourceMetricsSlice := md.ResourceMetrics() - for i := 0; i < resourceMetricsSlice.Len(); i++ { - rm := resourceMetricsSlice.At(i) + resourceMetricsSlice.RemoveIf(func(rm pdata.ResourceMetrics) bool { ilms := rm.InstrumentationLibraryMetrics() - for j := 0; j < ilms.Len(); j++ { - ilm := ilms.At(j) - metricSlice := ilm.Metrics() - for k := 0; k < metricSlice.Len(); k++ { - metric := metricSlice.At(k) - if ctdp.metrics[metric.Name()] { - if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.AggregationTemporalityCumulative { - dataPoints := metric.Sum().DataPoints() - - for l := 0; l < dataPoints.Len(); l++ { - fromDataPoint := dataPoints.At(l) - labelMap := make(map[string]string) - - fromDataPoint.Attributes().Range(func(k string, v pdata.AttributeValue) bool { - labelMap[k] = v.AsString() - return true - }) - datapointValue := fromDataPoint.DoubleVal() - if math.IsNaN(datapointValue) { - continue - } - result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, datapointValue, fromDataPoint.Timestamp().AsTime()) - - fromDataPoint.SetDoubleVal(result.(delta).value) - fromDataPoint.SetStartTimestamp(pdata.NewTimestampFromTime(result.(delta).prevTimestamp)) - } - metric.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + ilms.RemoveIf(func(ilm pdata.InstrumentationLibraryMetrics) bool { + ms := ilm.Metrics() + ms.RemoveIf(func(m pdata.Metric) bool { + if ctdp.metrics != nil { + if _, ok := ctdp.metrics[m.Name()]; !ok { + return false } } - } - } - } + baseIdentity := tracking.MetricIdentity{ + Resource: rm.Resource(), + InstrumentationLibrary: ilm.InstrumentationLibrary(), + MetricDataType: m.DataType(), + MetricName: m.Name(), + MetricUnit: m.Unit(), + } + switch m.DataType() { + case pdata.MetricDataTypeSum: + ms := m.Sum() + if ms.AggregationTemporality() != pdata.AggregationTemporalityCumulative { + return false + } + if ctdp.monotonicOnly && !ms.IsMonotonic() { + return false + } + baseIdentity.MetricIsMonotonic = ms.IsMonotonic() + ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) + ms.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + return ms.DataPoints().Len() == 0 + default: + return false + } + }) + return ilm.Metrics().Len() == 0 + }) + return rm.InstrumentationLibraryMetrics().Len() == 0 + }) return md, nil } // Shutdown is invoked during service shutdown. func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error { + ctdp.cancelFunc() return nil } -func newDeltaCalculator() awsmetrics.MetricCalculator { - return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) { - result := delta{value: val.(float64), prevTimestamp: timestamp} - - if prev != nil { - deltaValue := val.(float64) - prev.RawValue.(float64) - result.value = deltaValue - result.prevTimestamp = prev.Timestamp - return result, true - } - return result, false - }) -} +func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) { + switch dps := in.(type) { + case pdata.NumberDataPointSlice: + dps.RemoveIf(func(dp pdata.NumberDataPoint) bool { + id := baseIdentity + id.StartTimestamp = dp.StartTimestamp() + id.LabelsMap = dp.LabelsMap() + id.MetricValueType = dp.Type() + point := tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + } + if id.IsFloatVal() { + point.FloatValue = dp.DoubleVal() + } else { + point.IntValue = dp.IntVal() + } + trackingPoint := tracking.MetricPoint{ + Identity: id, + Value: point, + } + delta, valid := ctdp.deltaCalculator.Convert(trackingPoint) -type delta struct { - value float64 - prevTimestamp time.Time + // When converting non-monotonic cumulative counters, + // the first data point is omitted since the initial + // reference is not assumed to be zero + if !valid { + return true + } + dp.SetStartTimestamp(delta.StartTimestamp) + if id.IsFloatVal() { + dp.SetDoubleVal(delta.FloatValue) + } else { + dp.SetIntVal(delta.IntValue) + } + return false + }) + } } From bcbc722480748c32fd718845008da930a3c75b24 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 6 Aug 2021 17:01:02 -0400 Subject: [PATCH 04/21] Update processor default test case. By default, the cumulative to delta processor now converts all metrics. Previously, the processor did not convert any metrics unless listed in the configuration. --- processor/cumulativetodeltaprocessor/processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index a3ab5b7c4664..b1899f835fa7 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -54,7 +54,7 @@ var ( outMetrics: generateTestMetrics(testMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, - isCumulative: []bool{true, true}, + isCumulative: []bool{false, false}, }), }, { From 86840f118a345445eef95474710f151d342d2971 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 6 Aug 2021 17:04:54 -0400 Subject: [PATCH 05/21] Remove extra validate function call. --- processor/cumulativetodeltaprocessor/factory.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/factory.go b/processor/cumulativetodeltaprocessor/factory.go index 2749c96c1ea2..66141c9dc7f1 100644 --- a/processor/cumulativetodeltaprocessor/factory.go +++ b/processor/cumulativetodeltaprocessor/factory.go @@ -47,7 +47,7 @@ func createDefaultConfig() config.Processor { } func createMetricsProcessor( - ctx context.Context, + _ context.Context, params component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Metrics, @@ -57,7 +57,6 @@ func createMetricsProcessor( return nil, fmt.Errorf("configuration parsing error") } - processorConfig.Validate() metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, params.Logger) return processorhelper.NewMetricsProcessor( From 8617d906dc3bdf8916ba61c203464022dededa50 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 6 Aug 2021 17:05:09 -0400 Subject: [PATCH 06/21] Add processor benchmark. --- .../processor_test.go | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index b1899f835fa7..ac8ad6c65dda 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -22,10 +22,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" ) type testMetric struct { @@ -192,3 +194,46 @@ func generateTestMetrics(tm testMetric) pdata.Metrics { return md } + +func BenchmarkConsumeMetrics(b *testing.B) { + c := consumertest.NewNop() + params := component.ProcessorCreateSettings{ + Logger: zap.NewNop(), + TracerProvider: nil, + BuildInfo: component.BuildInfo{}, + } + cfg := createDefaultConfig().(*Config) + cfg.Metrics = []string{""} + p, err := createMetricsProcessor(context.Background(), params, cfg, c) + if err != nil { + b.Fatal(err) + } + + metrics := pdata.NewMetrics() + rms := metrics.ResourceMetrics().AppendEmpty() + r := rms.Resource() + r.Attributes().Insert("resource", pdata.NewAttributeValueBool(true)) + ilms := rms.InstrumentationLibraryMetrics().AppendEmpty() + ilms.InstrumentationLibrary().SetName("test") + ilms.InstrumentationLibrary().SetVersion("0.1") + m := ilms.Metrics().AppendEmpty() + m.SetDataType(pdata.MetricDataTypeSum) + m.Sum().SetIsMonotonic(true) + dp := m.Sum().DataPoints().AppendEmpty() + dp.LabelsMap().Insert("tag", "value") + + reset := func() { + m.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dp.SetDoubleVal(100.0) + } + + // Load initial value + reset() + p.ConsumeMetrics(context.Background(), metrics) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + reset() + p.ConsumeMetrics(context.Background(), metrics) + } +} From 6464c55a18b91efab70c25825de14e3c0096c046 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 6 Aug 2021 17:05:25 -0400 Subject: [PATCH 07/21] Remove aws metrics library. --- processor/cumulativetodeltaprocessor/go.mod | 3 --- 1 file changed, 3 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/go.mod b/processor/cumulativetodeltaprocessor/go.mod index aae0d070728c..385311a75cf7 100644 --- a/processor/cumulativetodeltaprocessor/go.mod +++ b/processor/cumulativetodeltaprocessor/go.mod @@ -3,7 +3,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumul go 1.17 require ( - github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.36.0 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.36.1-0.20210923171211-10f543a9a43f go.opentelemetry.io/collector/model v0.36.1-0.20210923171211-10f543a9a43f @@ -40,5 +39,3 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) - -replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics => ./../../internal/aws/metrics From 5e59ad4bb23bd2f721dd133a21acac9d54a88437 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Mon, 16 Aug 2021 11:38:01 -0400 Subject: [PATCH 08/21] Update readme. --- processor/cumulativetodeltaprocessor/README.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index a2c35dc06475..94518a0e100e 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -9,7 +9,15 @@ The cumulative to delta processor (`cumulativetodeltaprocessor`) converts cumula ## Configuration -Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta. +The default configuration is to convert all monotonic sum metrics from aggregation temporality cumulative to aggregation temporality delta. + +The following settings can be optionally configured: + +- `metrics`: The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta. Defaults to converting all metric names. +- `max_stale`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 +- `monotonic_only`: Specify whether only monotonic metrics are converted from cumulative to delta. Default: `true`. Set to `false` to convert metrics regardless of monotonic setting. + +#### Example ```yaml processors: @@ -17,10 +25,11 @@ processors: cumulativetodelta: # list the cumulative sum metrics to convert to delta + # (optional - defaults to converting all monotonic cumulative sum metrics) metrics: - - . . - -``` \ No newline at end of file +``` From ebefb02fd0ea98310ee690141a24c037bf11caef Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Wed, 18 Aug 2021 11:02:51 -0400 Subject: [PATCH 09/21] Convert to using attributes rather than labels map. --- .../cumulativetodeltaprocessor/processor.go | 2 +- .../tracking/identity.go | 6 +++--- .../tracking/identity_test.go | 18 +++++++++--------- .../tracking/tracker_test.go | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 3b6783e4bafc..203a50dcca94 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -110,7 +110,7 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId dps.RemoveIf(func(dp pdata.NumberDataPoint) bool { id := baseIdentity id.StartTimestamp = dp.StartTimestamp() - id.LabelsMap = dp.LabelsMap() + id.Attributes = dp.Attributes() id.MetricValueType = dp.Type() point := tracking.ValuePoint{ ObservedTimestamp: dp.Timestamp(), diff --git a/processor/cumulativetodeltaprocessor/tracking/identity.go b/processor/cumulativetodeltaprocessor/tracking/identity.go index 4d790475498e..7cb6da416fe1 100644 --- a/processor/cumulativetodeltaprocessor/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/tracking/identity.go @@ -30,7 +30,7 @@ type MetricIdentity struct { MetricName string MetricUnit string StartTimestamp pdata.Timestamp - LabelsMap pdata.StringMap + Attributes pdata.AttributeMap MetricValueType pdata.MetricValueType } @@ -66,11 +66,11 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) { b.WriteByte(SEP) b.WriteString(mi.MetricUnit) - mi.LabelsMap.Sort().Range(func(k, v string) bool { + mi.Attributes.Sort().Range(func(k string, v pdata.AttributeValue) bool { b.WriteByte(SEP) b.WriteString(k) b.WriteByte(':') - b.WriteString(v) + b.WriteString(tracetranslator.AttributeValueToString(v)) return true }) b.WriteByte(SEP) diff --git a/processor/cumulativetodeltaprocessor/tracking/identity_test.go b/processor/cumulativetodeltaprocessor/tracking/identity_test.go index 7a40f55562bc..5b59408760cd 100644 --- a/processor/cumulativetodeltaprocessor/tracking/identity_test.go +++ b/processor/cumulativetodeltaprocessor/tracking/identity_test.go @@ -32,9 +32,9 @@ func TestMetricIdentity_Write(t *testing.T) { il.SetName("ilm_name") il.SetVersion("ilm_version") - labels := pdata.NewStringMap() - labels.InitFromMap(map[string]string{ - "label": "value", + attributes := pdata.NewAttributeMap() + attributes.InitFromMap(map[string]pdata.AttributeValue{ + "label": pdata.NewAttributeValueString("value"), }) type fields struct { Resource pdata.Resource @@ -44,7 +44,7 @@ func TestMetricIdentity_Write(t *testing.T) { MetricName string MetricUnit string StartTimestamp pdata.Timestamp - LabelsMap pdata.StringMap + Attributes pdata.AttributeMap MetricValueType pdata.MetricValueType } tests := []struct { @@ -57,7 +57,7 @@ func TestMetricIdentity_Write(t *testing.T) { fields: fields{ Resource: resource, InstrumentationLibrary: il, - LabelsMap: labels, + Attributes: attributes, MetricName: "m_name", MetricUnit: "m_unit", }, @@ -68,7 +68,7 @@ func TestMetricIdentity_Write(t *testing.T) { fields: fields{ Resource: resource, InstrumentationLibrary: il, - LabelsMap: labels, + Attributes: attributes, MetricDataType: pdata.MetricDataTypeSum, MetricValueType: pdata.MetricValueTypeInt, MetricIsMonotonic: true, @@ -86,7 +86,7 @@ func TestMetricIdentity_Write(t *testing.T) { MetricName: tt.fields.MetricName, MetricUnit: tt.fields.MetricUnit, StartTimestamp: tt.fields.StartTimestamp, - LabelsMap: tt.fields.LabelsMap, + Attributes: tt.fields.Attributes, MetricValueType: tt.fields.MetricValueType, } b := &bytes.Buffer{} @@ -130,7 +130,7 @@ func TestMetricIdentity_IsFloatVal(t *testing.T) { mi := &MetricIdentity{ Resource: pdata.NewResource(), InstrumentationLibrary: pdata.NewInstrumentationLibrary(), - LabelsMap: pdata.NewStringMap(), + Attributes: pdata.NewAttributeMap(), MetricDataType: pdata.MetricDataTypeSum, MetricValueType: tt.fields.MetricValueType, } @@ -170,7 +170,7 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) { mi := &MetricIdentity{ Resource: pdata.NewResource(), InstrumentationLibrary: pdata.NewInstrumentationLibrary(), - LabelsMap: pdata.NewStringMap(), + Attributes: pdata.NewAttributeMap(), MetricDataType: tt.fields.MetricDataType, } if got := mi.IsSupportedMetricType(); got != tt.want { diff --git a/processor/cumulativetodeltaprocessor/tracking/tracker_test.go b/processor/cumulativetodeltaprocessor/tracking/tracker_test.go index be8638317c4e..d187879f7e32 100644 --- a/processor/cumulativetodeltaprocessor/tracking/tracker_test.go +++ b/processor/cumulativetodeltaprocessor/tracking/tracker_test.go @@ -32,7 +32,7 @@ func TestMetricTracker_Convert(t *testing.T) { MetricIsMonotonic: true, MetricName: "", MetricUnit: "", - LabelsMap: pdata.NewStringMap(), + Attributes: pdata.NewAttributeMap(), } miIntSum := miSum miIntSum.MetricValueType = pdata.MetricValueTypeInt From 4ca2171a17d791aca655fb8b933c2fe333702458 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 17 Sep 2021 14:52:43 -0400 Subject: [PATCH 10/21] Fix usage of deprecated api. --- processor/cumulativetodeltaprocessor/processor_test.go | 9 +++++---- .../cumulativetodeltaprocessor/tracking/identity.go | 5 ++--- processor/cumulativetodeltaprocessor/tracking/tracker.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index ac8ad6c65dda..a4fda37300e0 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -198,9 +198,10 @@ func generateTestMetrics(tm testMetric) pdata.Metrics { func BenchmarkConsumeMetrics(b *testing.B) { c := consumertest.NewNop() params := component.ProcessorCreateSettings{ - Logger: zap.NewNop(), - TracerProvider: nil, - BuildInfo: component.BuildInfo{}, + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + BuildInfo: component.BuildInfo{}, } cfg := createDefaultConfig().(*Config) cfg.Metrics = []string{""} @@ -220,7 +221,7 @@ func BenchmarkConsumeMetrics(b *testing.B) { m.SetDataType(pdata.MetricDataTypeSum) m.Sum().SetIsMonotonic(true) dp := m.Sum().DataPoints().AppendEmpty() - dp.LabelsMap().Insert("tag", "value") + dp.Attributes().Insert("tag", pdata.NewAttributeValueString("value")) reset := func() { m.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) diff --git a/processor/cumulativetodeltaprocessor/tracking/identity.go b/processor/cumulativetodeltaprocessor/tracking/identity.go index 7cb6da416fe1..46530d7991ef 100644 --- a/processor/cumulativetodeltaprocessor/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/tracking/identity.go @@ -19,7 +19,6 @@ import ( "strconv" "go.opentelemetry.io/collector/model/pdata" - tracetranslator "go.opentelemetry.io/collector/translator/trace" ) type MetricIdentity struct { @@ -46,7 +45,7 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) { b.WriteByte(SEP) b.WriteString(k) b.WriteByte(':') - b.WriteString(tracetranslator.AttributeValueToString(v)) + b.WriteString(v.AsString()) return true }) @@ -70,7 +69,7 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) { b.WriteByte(SEP) b.WriteString(k) b.WriteByte(':') - b.WriteString(tracetranslator.AttributeValueToString(v)) + b.WriteString(v.AsString()) return true }) b.WriteByte(SEP) diff --git a/processor/cumulativetodeltaprocessor/tracking/tracker.go b/processor/cumulativetodeltaprocessor/tracking/tracker.go index c3d72fb0deaa..fecb8d229317 100644 --- a/processor/cumulativetodeltaprocessor/tracking/tracker.go +++ b/processor/cumulativetodeltaprocessor/tracking/tracker.go @@ -181,7 +181,7 @@ func (t *metricTracker) sweeper(ctx context.Context, remove func(pdata.Timestamp for { select { case currentTime := <-ticker.C: - staleBefore := pdata.TimestampFromTime(currentTime.Add(-t.maxStale)) + staleBefore := pdata.NewTimestampFromTime(currentTime.Add(-t.maxStale)) remove(staleBefore) case <-ctx.Done(): ticker.Stop() From 36c7d2a1e605ebeba2cd13ebde3a868faa869585 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 17 Sep 2021 15:03:31 -0400 Subject: [PATCH 11/21] Retain NaN values. --- processor/cumulativetodeltaprocessor/processor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 203a50dcca94..9600b10808d8 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -16,6 +16,7 @@ package cumulativetodeltaprocessor import ( "context" + "math" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" @@ -116,6 +117,10 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId ObservedTimestamp: dp.Timestamp(), } if id.IsFloatVal() { + // Do not attempt to transform NaN values + if math.IsNaN(dp.DoubleVal()) { + return false + } point.FloatValue = dp.DoubleVal() } else { point.IntValue = dp.IntVal() From 88fa32956528b5a70572ae9ca443c4fd2fa8bd7c Mon Sep 17 00:00:00 2001 From: Allan Feldman <6374032+a-feld@users.noreply.github.com> Date: Fri, 24 Sep 2021 10:13:37 -0400 Subject: [PATCH 12/21] Exported field comments should start with field name. Co-authored-by: Joshua MacDonald --- processor/cumulativetodeltaprocessor/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index e56429acdcf0..8da68cb3d4c3 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -27,7 +27,7 @@ type Config struct { // List of cumulative metrics to convert to delta. Default: converts all cumulative metrics to delta. Metrics []string `mapstructure:"metrics"` - // The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. + // MaxStale is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. MaxStale time.Duration `mapstructure:"max_stale"` // Set to false in order to convert non monotonic metrics From 007837de22a6ccc632810c71f5b6dc87864c9582 Mon Sep 17 00:00:00 2001 From: Allan Feldman Date: Fri, 24 Sep 2021 12:20:57 -0400 Subject: [PATCH 13/21] Remove monotonic configuration option. --- processor/cumulativetodeltaprocessor/config.go | 3 --- processor/cumulativetodeltaprocessor/config_test.go | 4 +--- processor/cumulativetodeltaprocessor/factory.go | 1 - processor/cumulativetodeltaprocessor/factory_test.go | 1 - processor/cumulativetodeltaprocessor/processor.go | 7 ++++--- processor/cumulativetodeltaprocessor/testdata/config.yaml | 1 - 6 files changed, 5 insertions(+), 12 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 8da68cb3d4c3..b28480189f97 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -29,9 +29,6 @@ type Config struct { // MaxStale is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. MaxStale time.Duration `mapstructure:"max_stale"` - - // Set to false in order to convert non monotonic metrics - MonotonicOnly bool `mapstructure:"monotonic_only"` } var _ config.Processor = (*Config)(nil) diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index 66a43adbeeb2..7e5ed9a92959 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -49,14 +49,12 @@ func TestLoadingFullConfig(t *testing.T) { "metric1", "metric2", }, - MaxStale: 10 * time.Second, - MonotonicOnly: false, + MaxStale: 10 * time.Second, }, }, { expCfg: &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), - MonotonicOnly: true, }, }, } diff --git a/processor/cumulativetodeltaprocessor/factory.go b/processor/cumulativetodeltaprocessor/factory.go index 66141c9dc7f1..4eeedd96e16b 100644 --- a/processor/cumulativetodeltaprocessor/factory.go +++ b/processor/cumulativetodeltaprocessor/factory.go @@ -42,7 +42,6 @@ func NewFactory() component.ProcessorFactory { func createDefaultConfig() config.Processor { return &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), - MonotonicOnly: true, } } diff --git a/processor/cumulativetodeltaprocessor/factory_test.go b/processor/cumulativetodeltaprocessor/factory_test.go index 0793b859c4bb..e3a69c42f085 100644 --- a/processor/cumulativetodeltaprocessor/factory_test.go +++ b/processor/cumulativetodeltaprocessor/factory_test.go @@ -38,7 +38,6 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, cfg, &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), - MonotonicOnly: true, }) assert.NoError(t, configtest.CheckConfigStruct(cfg)) } diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 9600b10808d8..2ffe800f655d 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -29,7 +29,6 @@ type cumulativeToDeltaProcessor struct { metrics map[string]struct{} logger *zap.Logger deltaCalculator tracking.MetricTracker - monotonicOnly bool cancelFunc context.CancelFunc } @@ -38,7 +37,6 @@ func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulati p := &cumulativeToDeltaProcessor{ logger: logger, deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStale), - monotonicOnly: config.MonotonicOnly, cancelFunc: cancel, } if len(config.Metrics) > 0 { @@ -81,9 +79,12 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pda if ms.AggregationTemporality() != pdata.AggregationTemporalityCumulative { return false } - if ctdp.monotonicOnly && !ms.IsMonotonic() { + + // Ignore any metrics that aren't monotonic + if !ms.IsMonotonic() { return false } + baseIdentity.MetricIsMonotonic = ms.IsMonotonic() ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) ms.SetAggregationTemporality(pdata.AggregationTemporalityDelta) diff --git a/processor/cumulativetodeltaprocessor/testdata/config.yaml b/processor/cumulativetodeltaprocessor/testdata/config.yaml index 5b07746c7c3f..6cc7645807e6 100644 --- a/processor/cumulativetodeltaprocessor/testdata/config.yaml +++ b/processor/cumulativetodeltaprocessor/testdata/config.yaml @@ -8,7 +8,6 @@ processors: - metric1 - metric2 max_stale: 10s - monotonic_only: false exporters: nop: From db462d07b6465ec1e702eb5103b84678cb621774 Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 10:27:19 -0700 Subject: [PATCH 14/21] Fixes after merge --- processor/cumulativetodeltaprocessor/config_test.go | 4 ++-- processor/cumulativetodeltaprocessor/processor.go | 4 ++-- processor/cumulativetodeltaprocessor/processor_test.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index 7e5ed9a92959..dd5b8ccc6955 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -44,7 +44,7 @@ func TestLoadingFullConfig(t *testing.T) { }{ { expCfg: &Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewIDWithName(typeStr, "alt")), + ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "alt")), Metrics: []string{ "metric1", "metric2", @@ -54,7 +54,7 @@ func TestLoadingFullConfig(t *testing.T) { }, { expCfg: &Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), }, }, } diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 2ffe800f655d..df20c8bff571 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -76,7 +76,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pda switch m.DataType() { case pdata.MetricDataTypeSum: ms := m.Sum() - if ms.AggregationTemporality() != pdata.AggregationTemporalityCumulative { + if ms.AggregationTemporality() != pdata.MetricAggregationTemporalityCumulative { return false } @@ -87,7 +87,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pda baseIdentity.MetricIsMonotonic = ms.IsMonotonic() ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) - ms.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + ms.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) return ms.DataPoints().Len() == 0 default: return false diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index f804a3b7aae7..280d1c6d0817 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -224,7 +224,7 @@ func BenchmarkConsumeMetrics(b *testing.B) { dp.Attributes().Insert("tag", pdata.NewAttributeValueString("value")) reset := func() { - m.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + m.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) dp.SetDoubleVal(100.0) } From f5cc3984c621934ea5d6d4027f893cca6df7d9e8 Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 10:45:19 -0700 Subject: [PATCH 15/21] Rename MaxStale to MaxStaleness --- .../cumulativetodeltaprocessor/config.go | 4 ++-- .../cumulativetodeltaprocessor/config_test.go | 2 +- .../cumulativetodeltaprocessor/processor.go | 2 +- .../tracking/tracker.go | 16 +++++++------- .../tracking/tracker_test.go | 22 +++++++++---------- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index b28480189f97..7a8e0f539a59 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -27,8 +27,8 @@ type Config struct { // List of cumulative metrics to convert to delta. Default: converts all cumulative metrics to delta. Metrics []string `mapstructure:"metrics"` - // MaxStale is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. - MaxStale time.Duration `mapstructure:"max_stale"` + // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. + MaxStaleness time.Duration `mapstructure:"max_stale"` } var _ config.Processor = (*Config)(nil) diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index dd5b8ccc6955..aeff65dcd96a 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -49,7 +49,7 @@ func TestLoadingFullConfig(t *testing.T) { "metric1", "metric2", }, - MaxStale: 10 * time.Second, + MaxStaleness: 10 * time.Second, }, }, { diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index df20c8bff571..d37e6fae6281 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -36,7 +36,7 @@ func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulati ctx, cancel := context.WithCancel(context.Background()) p := &cumulativeToDeltaProcessor{ logger: logger, - deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStale), + deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness), cancelFunc: cancel, } if len(config.Metrics) > 0 { diff --git a/processor/cumulativetodeltaprocessor/tracking/tracker.go b/processor/cumulativetodeltaprocessor/tracking/tracker.go index fecb8d229317..2f6b7d51eb68 100644 --- a/processor/cumulativetodeltaprocessor/tracking/tracker.go +++ b/processor/cumulativetodeltaprocessor/tracking/tracker.go @@ -58,18 +58,18 @@ type MetricTracker interface { Convert(MetricPoint) (DeltaValue, bool) } -func NewMetricTracker(ctx context.Context, logger *zap.Logger, maxStale time.Duration) MetricTracker { - t := &metricTracker{logger: logger, maxStale: maxStale} - if maxStale > 0 { +func NewMetricTracker(ctx context.Context, logger *zap.Logger, maxStaleness time.Duration) MetricTracker { + t := &metricTracker{logger: logger, maxStaleness: maxStaleness} + if maxStaleness > 0 { go t.sweeper(ctx, t.removeStale) } return t } type metricTracker struct { - logger *zap.Logger - maxStale time.Duration - states sync.Map + logger *zap.Logger + maxStaleness time.Duration + states sync.Map } func (t *metricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) { @@ -177,11 +177,11 @@ func (t *metricTracker) removeStale(staleBefore pdata.Timestamp) { } func (t *metricTracker) sweeper(ctx context.Context, remove func(pdata.Timestamp)) { - ticker := time.NewTicker(t.maxStale) + ticker := time.NewTicker(t.maxStaleness) for { select { case currentTime := <-ticker.C: - staleBefore := pdata.NewTimestampFromTime(currentTime.Add(-t.maxStale)) + staleBefore := pdata.NewTimestampFromTime(currentTime.Add(-t.maxStaleness)) remove(staleBefore) case <-ctx.Done(): ticker.Stop() diff --git a/processor/cumulativetodeltaprocessor/tracking/tracker_test.go b/processor/cumulativetodeltaprocessor/tracking/tracker_test.go index d187879f7e32..8e8f1f066ff6 100644 --- a/processor/cumulativetodeltaprocessor/tracking/tracker_test.go +++ b/processor/cumulativetodeltaprocessor/tracking/tracker_test.go @@ -159,8 +159,8 @@ func Test_metricTracker_removeStale(t *testing.T) { } type fields struct { - MaxStale time.Duration - States map[string]*State + MaxStaleness time.Duration + States map[string]*State } tests := []struct { name string @@ -170,7 +170,7 @@ func Test_metricTracker_removeStale(t *testing.T) { { name: "Removes stale entry, leaves fresh entry", fields: fields{ - MaxStale: 0, // This logic isn't tested here + MaxStaleness: 0, // This logic isn't tested here States: map[string]*State{ "stale": { PrevPoint: stalePoint, @@ -190,8 +190,8 @@ func Test_metricTracker_removeStale(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tr := &metricTracker{ - logger: zap.NewNop(), - maxStale: tt.fields.MaxStale, + logger: zap.NewNop(), + maxStaleness: tt.fields.MaxStaleness, } for k, v := range tt.fields.States { tr.states.Store(k, v) @@ -221,8 +221,8 @@ func Test_metricTracker_sweeper(t *testing.T) { } tr := &metricTracker{ - logger: zap.NewNop(), - maxStale: 1 * time.Millisecond, + logger: zap.NewNop(), + maxStaleness: 1 * time.Millisecond, } start := time.Now() @@ -234,17 +234,17 @@ func Test_metricTracker_sweeper(t *testing.T) { for i := 1; i <= 2; i++ { staleBefore := <-sweepEvent - tickTime := time.Since(start) + tr.maxStale*time.Duration(i) + tickTime := time.Since(start) + tr.maxStaleness*time.Duration(i) if closed { t.Fatalf("Sweeper returned prematurely.") } - if tickTime < tr.maxStale { - t.Errorf("Sweeper tick time is too fast. (%v, want %v)", tickTime, tr.maxStale) + if tickTime < tr.maxStaleness { + t.Errorf("Sweeper tick time is too fast. (%v, want %v)", tickTime, tr.maxStaleness) } staleTime := staleBefore.AsTime() - if time.Since(staleTime) < tr.maxStale { + if time.Since(staleTime) < tr.maxStaleness { t.Errorf("Sweeper called with invalid staleBefore value = %v", staleTime) } } From ee86cfdccd99b7a34cd30233d5206caeb582344b Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 11:06:17 -0700 Subject: [PATCH 16/21] Update README to reflect removal of monotonic_only setting --- processor/cumulativetodeltaprocessor/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 94518a0e100e..302eeb804bb8 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -15,7 +15,6 @@ The following settings can be optionally configured: - `metrics`: The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta. Defaults to converting all metric names. - `max_stale`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 -- `monotonic_only`: Specify whether only monotonic metrics are converted from cumulative to delta. Default: `true`. Set to `false` to convert metrics regardless of monotonic setting. #### Example From e63e77c0a0cbf96abc94e614901ca23321d75da3 Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 16:35:26 -0700 Subject: [PATCH 17/21] Change processor to only convert metrics explicitly specified in the config --- processor/cumulativetodeltaprocessor/README.md | 2 +- processor/cumulativetodeltaprocessor/config.go | 2 +- processor/cumulativetodeltaprocessor/processor.go | 2 ++ processor/cumulativetodeltaprocessor/processor_test.go | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 302eeb804bb8..c8fa523d8b30 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -13,7 +13,7 @@ The default configuration is to convert all monotonic sum metrics from aggregati The following settings can be optionally configured: -- `metrics`: The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta. Defaults to converting all metric names. +- `metrics`: The processor uses metric names to identify a set of cumulative metrics and converts them to delta. - `max_stale`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 #### Example diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 7a8e0f539a59..1c80d6f7ee52 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -24,7 +24,7 @@ import ( type Config struct { config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - // List of cumulative metrics to convert to delta. Default: converts all cumulative metrics to delta. + // List of cumulative metrics to convert to delta. Metrics []string `mapstructure:"metrics"` // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index d37e6fae6281..5133fd5a9f8f 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -65,6 +65,8 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pda if _, ok := ctdp.metrics[m.Name()]; !ok { return false } + } else { + return false } baseIdentity := tracking.MetricIdentity{ Resource: rm.Resource(), diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index 280d1c6d0817..ebe51cf69e33 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -56,7 +56,7 @@ var ( outMetrics: generateTestMetrics(testMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, - isCumulative: []bool{false, false}, + isCumulative: []bool{true, true}, }), }, { From 789f07f1feaea94e8bdf48ed920e168de6cd0c5e Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:22:24 -0700 Subject: [PATCH 18/21] Reintroduce test for invalid config with no metric names --- .../cumulativetodeltaprocessor/config.go | 9 +++- .../cumulativetodeltaprocessor/config_test.go | 43 ++++++++++++++++--- .../testdata/config.yaml | 1 - .../testdata/config_missing_name.yaml | 20 +++++++++ 4 files changed, 64 insertions(+), 9 deletions(-) create mode 100644 processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index 1c80d6f7ee52..db85d2dc596c 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -15,6 +15,7 @@ package cumulativetodeltaprocessor import ( + "fmt" "time" "go.opentelemetry.io/collector/config" @@ -33,7 +34,11 @@ type Config struct { var _ config.Processor = (*Config)(nil) -// Validate checks if the processor configuration is valid -func (cfg *Config) Validate() error { +// Validate checks whether the input configuration has all of the required fields for the processor. +// An error is returned if there are any invalid inputs. +func (config *Config) Validate() error { + if len(config.Metrics) == 0 { + return fmt.Errorf("metric names are missing") + } return nil } diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index aeff65dcd96a..c537ea3e6026 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -15,6 +15,7 @@ package cumulativetodeltaprocessor import ( + "fmt" "path" "testing" "time" @@ -44,7 +45,7 @@ func TestLoadingFullConfig(t *testing.T) { }{ { expCfg: &Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "alt")), + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), Metrics: []string{ "metric1", "metric2", @@ -52,11 +53,6 @@ func TestLoadingFullConfig(t *testing.T) { MaxStaleness: 10 * time.Second, }, }, - { - expCfg: &Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), - }, - }, } for _, test := range tests { @@ -66,3 +62,38 @@ func TestLoadingFullConfig(t *testing.T) { }) } } + +func TestValidateConfig(t *testing.T) { + tests := []struct { + configName string + succeed bool + errorMessage string + }{ + { + configName: "config.yaml", + succeed: true, + }, + { + configName: "config_missing_name.yaml", + succeed: false, + errorMessage: "metric names are missing", + }, + } + + for _, test := range tests { + factories, err := componenttest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Processors[typeStr] = factory + t.Run(test.configName, func(t *testing.T) { + config, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", test.configName), factories) + if test.succeed { + assert.NotNil(t, config) + assert.NoError(t, err) + } else { + assert.EqualError(t, err, fmt.Sprintf("processor %q has invalid configuration: %s", typeStr, test.errorMessage)) + } + }) + } +} diff --git a/processor/cumulativetodeltaprocessor/testdata/config.yaml b/processor/cumulativetodeltaprocessor/testdata/config.yaml index 6cc7645807e6..54fc85d6f37c 100644 --- a/processor/cumulativetodeltaprocessor/testdata/config.yaml +++ b/processor/cumulativetodeltaprocessor/testdata/config.yaml @@ -3,7 +3,6 @@ receivers: processors: cumulativetodelta: - cumulativetodelta/alt: metrics: - metric1 - metric2 diff --git a/processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml b/processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml new file mode 100644 index 000000000000..5043e802889e --- /dev/null +++ b/processor/cumulativetodeltaprocessor/testdata/config_missing_name.yaml @@ -0,0 +1,20 @@ +receivers: + nop: + +processors: + cumulativetodelta: + metrics: + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [nop] + processors: [cumulativetodelta] + exporters: [nop] + metrics: + receivers: [nop] + processors: [cumulativetodelta] + exporters: [nop] From 58a72c17d2bad9e130e218fa47d45949279665c9 Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:23:41 -0700 Subject: [PATCH 19/21] Rename max_stale to max_staleness --- processor/cumulativetodeltaprocessor/config.go | 2 +- processor/cumulativetodeltaprocessor/testdata/config.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index db85d2dc596c..a002939fa329 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -29,7 +29,7 @@ type Config struct { Metrics []string `mapstructure:"metrics"` // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. - MaxStaleness time.Duration `mapstructure:"max_stale"` + MaxStaleness time.Duration `mapstructure:"max_staleness"` } var _ config.Processor = (*Config)(nil) diff --git a/processor/cumulativetodeltaprocessor/testdata/config.yaml b/processor/cumulativetodeltaprocessor/testdata/config.yaml index 54fc85d6f37c..7b6565afd54c 100644 --- a/processor/cumulativetodeltaprocessor/testdata/config.yaml +++ b/processor/cumulativetodeltaprocessor/testdata/config.yaml @@ -6,7 +6,7 @@ processors: metrics: - metric1 - metric2 - max_stale: 10s + max_staleness: 10s exporters: nop: From c84a17a04033407d4b461d5b716373bd52e9a223 Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:26:12 -0700 Subject: [PATCH 20/21] Fix README --- processor/cumulativetodeltaprocessor/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index c8fa523d8b30..bc8caa5b1321 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -9,7 +9,7 @@ The cumulative to delta processor (`cumulativetodeltaprocessor`) converts cumula ## Configuration -The default configuration is to convert all monotonic sum metrics from aggregation temporality cumulative to aggregation temporality delta. +Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative metrics and converts them from cumulative to delta. The following settings can be optionally configured: @@ -24,7 +24,6 @@ processors: cumulativetodelta: # list the cumulative sum metrics to convert to delta - # (optional - defaults to converting all monotonic cumulative sum metrics) metrics: - - From 466187c42bf387e1d9134d4aff5a564fd2fd2b80 Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Fri, 15 Oct 2021 17:38:33 -0700 Subject: [PATCH 21/21] List of metric names can no longer be nil or empty --- processor/cumulativetodeltaprocessor/processor.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 5133fd5a9f8f..7fae9d4efdfd 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -61,11 +61,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pda ilms.RemoveIf(func(ilm pdata.InstrumentationLibraryMetrics) bool { ms := ilm.Metrics() ms.RemoveIf(func(m pdata.Metric) bool { - if ctdp.metrics != nil { - if _, ok := ctdp.metrics[m.Name()]; !ok { - return false - } - } else { + if _, ok := ctdp.metrics[m.Name()]; !ok { return false } baseIdentity := tracking.MetricIdentity{