-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cumulativetodeltaprocessor: Reopening #4444 Update cumulative to delta #5772
Changes from 17 commits
2791c68
bbe3cbc
d4d3c49
bcbc722
86840f1
8617d90
6464c55
5e59ad4
ebefb02
4ca2171
36c7d2a
88fa329
007837d
5f928c2
db462d0
f5cc398
ee86cfd
e63e77c
c032dd8
789f07f
58a72c1
c84a17a
466187c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
package cumulativetodeltaprocessor | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/config" | ||
) | ||
|
@@ -24,15 +24,16 @@ import ( | |
type Config struct { | ||
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct | ||
|
||
// List of cumulative sum metrics to convert to delta | ||
// List of cumulative metrics to convert to delta. Default: converts all cumulative metrics to delta. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does it mean "default" here? You mean |
||
Metrics []string `mapstructure:"metrics"` | ||
|
||
// MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. | ||
MaxStaleness time.Duration `mapstructure:"max_stale"` | ||
} | ||
|
||
// Validate checks whether the input configuration has all of the required fields for the processor. | ||
// An error is returned if there are any invalid inputs. | ||
func (config *Config) Validate() error { | ||
if len(config.Metrics) == 0 { | ||
return fmt.Errorf("metric names are missing") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we keep this for the moment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that seems fair. I will re-add. |
||
var _ config.Processor = (*Config)(nil) | ||
|
||
// Validate checks if the processor configuration is valid | ||
func (cfg *Config) Validate() error { | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,7 +46,7 @@ func createDefaultConfig() config.Processor { | |
} | ||
|
||
func createMetricsProcessor( | ||
ctx context.Context, | ||
_ context.Context, | ||
params component.ProcessorCreateSettings, | ||
cfg config.Processor, | ||
nextConsumer consumer.Metrics, | ||
|
@@ -56,7 +56,6 @@ func createMetricsProcessor( | |
return nil, fmt.Errorf("configuration parsing error") | ||
} | ||
|
||
processorConfig.Validate() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. |
||
metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, params.Logger) | ||
|
||
return processorhelper.NewMetricsProcessor( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,32 +17,35 @@ package cumulativetodeltaprocessor | |
import ( | ||
"context" | ||
"math" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/model/pdata" | ||
"go.uber.org/zap" | ||
|
||
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/tracking" | ||
) | ||
|
||
type cumulativeToDeltaProcessor struct { | ||
metrics map[string]bool | ||
metrics map[string]struct{} | ||
logger *zap.Logger | ||
deltaCalculator awsmetrics.MetricCalculator | ||
deltaCalculator tracking.MetricTracker | ||
cancelFunc context.CancelFunc | ||
} | ||
|
||
func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { | ||
inputMetricSet := make(map[string]bool, len(config.Metrics)) | ||
for _, name := range config.Metrics { | ||
inputMetricSet[name] = true | ||
} | ||
|
||
return &cumulativeToDeltaProcessor{ | ||
metrics: inputMetricSet, | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
p := &cumulativeToDeltaProcessor{ | ||
logger: logger, | ||
deltaCalculator: newDeltaCalculator(), | ||
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness), | ||
cancelFunc: cancel, | ||
} | ||
if len(config.Metrics) > 0 { | ||
p.metrics = make(map[string]struct{}, len(config.Metrics)) | ||
for _, m := range config.Metrics { | ||
p.metrics[m] = struct{}{} | ||
} | ||
} | ||
return p | ||
} | ||
|
||
// Start is invoked during service startup. | ||
|
@@ -53,64 +56,95 @@ func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) e | |
// processMetrics implements the ProcessMetricsFunc type. | ||
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { | ||
resourceMetricsSlice := md.ResourceMetrics() | ||
for i := 0; i < resourceMetricsSlice.Len(); i++ { | ||
rm := resourceMetricsSlice.At(i) | ||
resourceMetricsSlice.RemoveIf(func(rm pdata.ResourceMetrics) bool { | ||
ilms := rm.InstrumentationLibraryMetrics() | ||
for j := 0; j < ilms.Len(); j++ { | ||
ilm := ilms.At(j) | ||
metricSlice := ilm.Metrics() | ||
for k := 0; k < metricSlice.Len(); k++ { | ||
metric := metricSlice.At(k) | ||
if ctdp.metrics[metric.Name()] { | ||
if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.MetricAggregationTemporalityCumulative { | ||
dataPoints := metric.Sum().DataPoints() | ||
|
||
for l := 0; l < dataPoints.Len(); l++ { | ||
fromDataPoint := dataPoints.At(l) | ||
labelMap := make(map[string]string) | ||
|
||
fromDataPoint.Attributes().Range(func(k string, v pdata.AttributeValue) bool { | ||
labelMap[k] = v.AsString() | ||
return true | ||
}) | ||
datapointValue := fromDataPoint.DoubleVal() | ||
if math.IsNaN(datapointValue) { | ||
continue | ||
} | ||
result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, datapointValue, fromDataPoint.Timestamp().AsTime()) | ||
ilms.RemoveIf(func(ilm pdata.InstrumentationLibraryMetrics) bool { | ||
ms := ilm.Metrics() | ||
ms.RemoveIf(func(m pdata.Metric) bool { | ||
if ctdp.metrics != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably better to never have "nil" but empty map? This is also a logic that we should add later that if the list is empty gets to pass all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the map cannot be nil or empty anymore, I just removed this check. |
||
if _, ok := ctdp.metrics[m.Name()]; !ok { | ||
return false | ||
} | ||
} | ||
baseIdentity := tracking.MetricIdentity{ | ||
Resource: rm.Resource(), | ||
InstrumentationLibrary: ilm.InstrumentationLibrary(), | ||
MetricDataType: m.DataType(), | ||
MetricName: m.Name(), | ||
MetricUnit: m.Unit(), | ||
} | ||
switch m.DataType() { | ||
case pdata.MetricDataTypeSum: | ||
ms := m.Sum() | ||
if ms.AggregationTemporality() != pdata.MetricAggregationTemporalityCumulative { | ||
return false | ||
} | ||
|
||
fromDataPoint.SetDoubleVal(result.(delta).value) | ||
fromDataPoint.SetStartTimestamp(pdata.NewTimestampFromTime(result.(delta).prevTimestamp)) | ||
} | ||
metric.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) | ||
// Ignore any metrics that aren't monotonic | ||
if !ms.IsMonotonic() { | ||
return false | ||
} | ||
|
||
baseIdentity.MetricIsMonotonic = ms.IsMonotonic() | ||
ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) | ||
ms.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) | ||
return ms.DataPoints().Len() == 0 | ||
default: | ||
return false | ||
} | ||
} | ||
} | ||
} | ||
}) | ||
return ilm.Metrics().Len() == 0 | ||
}) | ||
return rm.InstrumentationLibraryMetrics().Len() == 0 | ||
}) | ||
return md, nil | ||
} | ||
|
||
// Shutdown is invoked during service shutdown. | ||
func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error { | ||
ctdp.cancelFunc() | ||
return nil | ||
} | ||
|
||
func newDeltaCalculator() awsmetrics.MetricCalculator { | ||
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) { | ||
result := delta{value: val.(float64), prevTimestamp: timestamp} | ||
|
||
if prev != nil { | ||
deltaValue := val.(float64) - prev.RawValue.(float64) | ||
result.value = deltaValue | ||
result.prevTimestamp = prev.Timestamp | ||
return result, true | ||
} | ||
return result, false | ||
}) | ||
} | ||
func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) { | ||
switch dps := in.(type) { | ||
case pdata.NumberDataPointSlice: | ||
dps.RemoveIf(func(dp pdata.NumberDataPoint) bool { | ||
id := baseIdentity | ||
id.StartTimestamp = dp.StartTimestamp() | ||
id.Attributes = dp.Attributes() | ||
id.MetricValueType = dp.Type() | ||
point := tracking.ValuePoint{ | ||
ObservedTimestamp: dp.Timestamp(), | ||
} | ||
if id.IsFloatVal() { | ||
// Do not attempt to transform NaN values | ||
if math.IsNaN(dp.DoubleVal()) { | ||
return false | ||
} | ||
point.FloatValue = dp.DoubleVal() | ||
} else { | ||
point.IntValue = dp.IntVal() | ||
} | ||
trackingPoint := tracking.MetricPoint{ | ||
Identity: id, | ||
Value: point, | ||
} | ||
delta, valid := ctdp.deltaCalculator.Convert(trackingPoint) | ||
|
||
type delta struct { | ||
value float64 | ||
prevTimestamp time.Time | ||
// When converting non-monotonic cumulative counters, | ||
// the first data point is omitted since the initial | ||
// reference is not assumed to be zero | ||
if !valid { | ||
return true | ||
} | ||
dp.SetStartTimestamp(delta.StartTimestamp) | ||
if id.IsFloatVal() { | ||
dp.SetDoubleVal(delta.FloatValue) | ||
} else { | ||
dp.SetIntVal(delta.IntValue) | ||
} | ||
return false | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do understand the NR need, but this "default" should be what the user expects the most, and I think this is not expected (in my opinion).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I see your point. I have reverted to the previous behavior. Now only metrics that are explicitly listed in the config are converted from cumulative -> delta.
I will add this as another issue to follow up on. I like @dmitryax's suggestion of changing this to support a regex #4444 (comment). I believe this would serve the need to be able to support converting all metrics by specifying
*
or something.