diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index fcba246885902..0ab92eb038054 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -63,8 +63,8 @@ core,github.com/DataDog/sketches-go/ddsketch,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/encoding,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/mapping,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/pb/sketchpb,Apache-2.0,"Datadog, Inc." -core,github.com/DataDog/sketches-go/ddsketch/store,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/stat,Apache-2.0,"Datadog, Inc." +core,github.com/DataDog/sketches-go/ddsketch/store,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/viper,MIT,"Datadog, Inc." core,github.com/DataDog/watermarkpodautoscaler/api/v1alpha1,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/zstd,BSD-3-Clause,"Datadog, Inc." diff --git a/pkg/otlp/model/translator/config.go b/pkg/otlp/model/translator/config.go new file mode 100644 index 0000000000000..0cb09daa162b9 --- /dev/null +++ b/pkg/otlp/model/translator/config.go @@ -0,0 +1,142 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import "fmt" + +type translatorConfig struct { + // metrics export behavior + HistMode HistogramMode + SendCountSum bool + Quantiles bool + SendMonotonic bool + ResourceAttributesAsTags bool + + // cache configuration + sweepInterval int64 + deltaTTL int64 + + // hostname provider configuration + fallbackHostnameProvider HostnameProvider +} + +// Option is a translator creation option. +type Option func(*translatorConfig) error + +// WithDeltaTTL sets the delta TTL for cumulative metrics datapoints. +// By default, 3600 seconds are used. +func WithDeltaTTL(deltaTTL int64) Option { + return func(t *translatorConfig) error { + if deltaTTL <= 0 { + return fmt.Errorf("time to live must be positive: %d", deltaTTL) + } + t.deltaTTL = deltaTTL + t.sweepInterval = 1 + if t.deltaTTL > 1 { + t.sweepInterval = t.deltaTTL / 2 + } + return nil + } +} + +// WithFallbackHostnameProvider sets the fallback hostname provider. +// By default, an empty hostname is used as a fallback. +func WithFallbackHostnameProvider(provider HostnameProvider) Option { + return func(t *translatorConfig) error { + t.fallbackHostnameProvider = provider + return nil + } +} + +// WithQuantiles enables quantiles exporting for summary metrics. +func WithQuantiles() Option { + return func(t *translatorConfig) error { + t.Quantiles = true + return nil + } +} + +// WithResourceAttributesAsTags sets resource attributes as tags. +func WithResourceAttributesAsTags() Option { + return func(t *translatorConfig) error { + t.ResourceAttributesAsTags = true + return nil + } +} + +// HistogramMode is an export mode for OTLP Histogram metrics. +type HistogramMode string + +const ( + // HistogramModeNoBuckets disables bucket export. + HistogramModeNoBuckets HistogramMode = "nobuckets" + // HistogramModeCounters exports buckets as Datadog counts. + HistogramModeCounters HistogramMode = "counters" + // HistogramModeDistributions exports buckets as Datadog distributions. + HistogramModeDistributions HistogramMode = "distributions" +) + +// WithHistogramMode sets the histograms mode. +// The default mode is HistogramModeOff. +func WithHistogramMode(mode HistogramMode) Option { + return func(t *translatorConfig) error { + + switch mode { + case HistogramModeNoBuckets, HistogramModeCounters: + t.HistMode = mode + default: + return fmt.Errorf("unknown histogram mode: %q", mode) + } + return nil + } +} + +// WithCountSumMetrics exports .count and .sum histogram metrics. +func WithCountSumMetrics() Option { + return func(t *translatorConfig) error { + t.SendCountSum = true + return nil + } +} + +// NumberMode is an export mode for OTLP Number metrics. +type NumberMode string + +const ( + // NumberModeCumulativeToDelta calculates delta for + // cumulative monotonic metrics in the client side and reports + // them as Datadog counts. + NumberModeCumulativeToDelta NumberMode = "cumulative_to_delta" + + // NumberModeRawValue reports the raw value for cumulative monotonic + // metrics as a Datadog gauge. + NumberModeRawValue NumberMode = "raw_value" +) + +// WithNumberMode sets the number mode. +// The default mode is NumberModeCumulativeToDelta. +func WithNumberMode(mode NumberMode) Option { + return func(t *translatorConfig) error { + switch mode { + case NumberModeCumulativeToDelta: + t.SendMonotonic = true + case NumberModeRawValue: + t.SendMonotonic = false + default: + return fmt.Errorf("unknown number mode: %q", mode) + } + return nil + } +} diff --git a/pkg/otlp/model/translator/consumer.go b/pkg/otlp/model/translator/consumer.go new file mode 100644 index 0000000000000..87653209eb28a --- /dev/null +++ b/pkg/otlp/model/translator/consumer.go @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + + "github.com/DataDog/datadog-agent/pkg/quantile" +) + +// MetricDataType is a timeseries-style metric type. +type MetricDataType int + +const ( + // Gauge is the Datadog Gauge metric type. + Gauge MetricDataType = iota + // Count is the Datadog Count metric type. + Count +) + +// TimeSeriesConsumer is timeseries consumer. +type TimeSeriesConsumer interface { + // ConsumeTimeSeries consumes a timeseries-style metric. + ConsumeTimeSeries( + ctx context.Context, + name string, + typ MetricDataType, + timestamp uint64, + value float64, + tags []string, + host string, + ) +} + +// SketchConsumer is a pkg/quantile sketch consumer. +type SketchConsumer interface { + // ConsumeSketch consumes a pkg/quantile-style sketch. + ConsumeSketch( + ctx context.Context, + name string, + timestamp uint64, + sketch *quantile.Sketch, + tags []string, + host string, + ) +} + +// Consumer is a metrics consumer. +type Consumer interface { + TimeSeriesConsumer + SketchConsumer +} + +// HostConsumer is a hostname consumer. +// It is an optional interface that can be implemented by a Consumer. +type HostConsumer interface { + // ConsumeHost consumes a hostname. + ConsumeHost(host string) +} diff --git a/pkg/otlp/model/translator/hostname_provider.go b/pkg/otlp/model/translator/hostname_provider.go new file mode 100644 index 0000000000000..829d0654add26 --- /dev/null +++ b/pkg/otlp/model/translator/hostname_provider.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import "context" + +// HostnameProvider gets a hostname +type HostnameProvider interface { + // Hostname gets the hostname from the machine. + Hostname(ctx context.Context) (string, error) +} + +var _ HostnameProvider = (*noHostProvider)(nil) + +type noHostProvider struct{} + +func (*noHostProvider) Hostname(context.Context) (string, error) { + return "", nil +} diff --git a/pkg/otlp/model/translator/metrics_translator.go b/pkg/otlp/model/translator/metrics_translator.go new file mode 100644 index 0000000000000..2e35c20f08fc1 --- /dev/null +++ b/pkg/otlp/model/translator/metrics_translator.go @@ -0,0 +1,452 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + "fmt" + "math" + "strconv" + + "github.com/DataDog/datadog-agent/pkg/quantile" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + + "github.com/DataDog/datadog-agent/pkg/otlp/model/attributes" +) + +const metricName string = "metric name" + +// Translator is a metrics translator. +type Translator struct { + prevPts *ttlCache + logger *zap.Logger + cfg translatorConfig +} + +// New creates a new translator with given options. +func New(logger *zap.Logger, options ...Option) (*Translator, error) { + cfg := translatorConfig{ + HistMode: HistogramModeNoBuckets, + SendCountSum: true, + Quantiles: false, + SendMonotonic: true, + ResourceAttributesAsTags: false, + sweepInterval: 1800, + deltaTTL: 3600, + fallbackHostnameProvider: &noHostProvider{}, + } + + for _, opt := range options { + err := opt(&cfg) + if err != nil { + return nil, err + } + } + + cache := newTTLCache(cfg.sweepInterval, cfg.deltaTTL) + return &Translator{cache, logger, cfg}, nil +} + +// getTags maps an attributeMap into a slice of Datadog tags +func getTags(labels pdata.AttributeMap) []string { + tags := make([]string, 0, labels.Len()) + labels.Range(func(key string, value pdata.AttributeValue) bool { + v := value.AsString() + if v == "" { + // Tags can't end with ":" so we replace empty values with "n/a" + v = "n/a" + } + tags = append(tags, fmt.Sprintf("%s:%s", key, v)) + return true + }) + return tags +} + +// isCumulativeMonotonic checks if a metric is a cumulative monotonic metric +func isCumulativeMonotonic(md pdata.Metric) bool { + switch md.DataType() { + case pdata.MetricDataTypeSum: + return md.Sum().AggregationTemporality() == pdata.AggregationTemporalityCumulative && + md.Sum().IsMonotonic() + } + return false +} + +// isSkippable checks if a value can be skipped (because it is not supported by the backend). +// It logs that the value is unsupported for debugging since this sometimes means there is a bug. +func (t *Translator) isSkippable(name string, v float64) bool { + skippable := math.IsInf(v, 0) || math.IsNaN(v) + if skippable { + t.logger.Debug("Unsupported metric value", zap.String(metricName, name), zap.Float64("value", v)) + } + return skippable +} + +// mapNumberMetrics maps double datapoints into Datadog metrics +func (t *Translator) mapNumberMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + dt MetricDataType, + slice pdata.NumberDataPointSlice, + attrTags []string, + host string, +) { + + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + var val float64 + switch p.Type() { + case pdata.MetricValueTypeDouble: + val = p.DoubleVal() + case pdata.MetricValueTypeInt: + val = float64(p.IntVal()) + } + + if t.isSkippable(name, val) { + continue + } + + consumer.ConsumeTimeSeries(ctx, name, dt, uint64(p.Timestamp()), val, tags, host) + } +} + +// mapNumberMonotonicMetrics maps monotonic datapoints into Datadog metrics +func (t *Translator) mapNumberMonotonicMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + slice pdata.NumberDataPointSlice, + attrTags []string, + host string, +) { + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + ts := uint64(p.Timestamp()) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + + var val float64 + switch p.Type() { + case pdata.MetricValueTypeDouble: + val = p.DoubleVal() + case pdata.MetricValueTypeInt: + val = float64(p.IntVal()) + } + + if t.isSkippable(name, val) { + continue + } + + if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, val); ok { + consumer.ConsumeTimeSeries(ctx, name, Count, ts, dx, tags, host) + } + } +} + +func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBound float64) { + // See https://github.com/open-telemetry/opentelemetry-proto/blob/v0.10.0/opentelemetry/proto/metrics/v1/metrics.proto#L427-L439 + lowerBound = math.Inf(-1) + upperBound = math.Inf(1) + if idx > 0 { + lowerBound = p.ExplicitBounds()[idx-1] + } + if idx < len(p.ExplicitBounds()) { + upperBound = p.ExplicitBounds()[idx] + } + return +} + +func (t *Translator) getSketchBuckets( + ctx context.Context, + consumer SketchConsumer, + name string, + ts uint64, + p pdata.HistogramDataPoint, + delta bool, + tags []string, + host string, +) { + as := &quantile.Agent{} + for j := range p.BucketCounts() { + lowerBound, upperBound := getBounds(p, j) + // InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound + // https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111 + if math.IsInf(upperBound, 1) { + upperBound = lowerBound + } else if math.IsInf(lowerBound, -1) { + lowerBound = upperBound + } + + count := p.BucketCounts()[j] + if delta { + as.InsertInterpolate(lowerBound, upperBound, uint(count)) + } else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok { + as.InsertInterpolate(lowerBound, upperBound, uint(dx)) + } + + } + + consumer.ConsumeSketch(ctx, name, ts, as.Finish(), tags, host) +} + +func (t *Translator) getLegacyBuckets( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + p pdata.HistogramDataPoint, + delta bool, + tags []string, + host string, +) { + // We have a single metric, 'bucket', which is tagged with the bucket bounds. See: + // https://github.com/DataDog/integrations-core/blob/7.30.1/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/transformers/histogram.py + fullName := fmt.Sprintf("%s.bucket", name) + for idx, val := range p.BucketCounts() { + lowerBound, upperBound := getBounds(p, idx) + bucketTags := []string{ + fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)), + fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)), + } + bucketTags = append(bucketTags, tags...) + + count := float64(val) + ts := uint64(p.Timestamp()) + if delta { + consumer.ConsumeTimeSeries(ctx, fullName, Count, ts, count, bucketTags, host) + } else if dx, ok := t.prevPts.putAndGetDiff(fullName, bucketTags, ts, count); ok { + consumer.ConsumeTimeSeries(ctx, fullName, Count, ts, dx, bucketTags, host) + } + } +} + +// mapHistogramMetrics maps double histogram metrics slices to Datadog metrics +// +// A Histogram metric has: +// - The count of values in the population +// - The sum of values in the population +// - A number of buckets, each of them having +// - the bounds that define the bucket +// - the count of the number of items in that bucket +// - a sample value from each bucket +// +// We follow a similar approach to our OpenMetrics check: +// we report sum and count by default; buckets count can also +// be reported (opt-in) tagged by lower bound. +func (t *Translator) mapHistogramMetrics( + ctx context.Context, + consumer Consumer, + name string, + slice pdata.HistogramDataPointSlice, + delta bool, + attrTags []string, + host string, +) { + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + ts := uint64(p.Timestamp()) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + + if t.cfg.SendCountSum { + count := float64(p.Count()) + countName := fmt.Sprintf("%s.count", name) + if delta { + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, count, tags, host) + } else if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, count); ok { + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, dx, tags, host) + } + } + + if t.cfg.SendCountSum { + sum := p.Sum() + sumName := fmt.Sprintf("%s.sum", name) + if !t.isSkippable(sumName, p.Sum()) { + if delta { + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, sum, tags, host) + } else if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, sum); ok { + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, dx, tags, host) + } + } + } + + switch t.cfg.HistMode { + case HistogramModeCounters: + t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host) + case HistogramModeDistributions: + t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host) + } + } +} + +// formatFloat formats a float number as close as possible to what +// we do on the Datadog Agent Python OpenMetrics check, which, in turn, tries to +// follow https://github.com/OpenObservability/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#considerations-canonical-numbers +func formatFloat(f float64) string { + if math.IsInf(f, 1) { + return "inf" + } else if math.IsInf(f, -1) { + return "-inf" + } else if math.IsNaN(f) { + return "nan" + } else if f == 0 { + return "0" + } + + // Add .0 to whole numbers + s := strconv.FormatFloat(f, 'g', -1, 64) + if f == math.Floor(f) { + s = s + ".0" + } + return s +} + +// getQuantileTag returns the quantile tag for summary types. +func getQuantileTag(quantile float64) string { + return fmt.Sprintf("quantile:%s", formatFloat(quantile)) +} + +// mapSummaryMetrics maps summary datapoints into Datadog metrics +func (t *Translator) mapSummaryMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + slice pdata.SummaryDataPointSlice, + attrTags []string, + host string, +) { + + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + ts := uint64(p.Timestamp()) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + + // count and sum are increasing; we treat them as cumulative monotonic sums. + { + countName := fmt.Sprintf("%s.count", name) + if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, float64(p.Count())); ok && !t.isSkippable(countName, dx) { + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, dx, tags, host) + } + } + + { + sumName := fmt.Sprintf("%s.sum", name) + if !t.isSkippable(sumName, p.Sum()) { + if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, p.Sum()); ok { + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, dx, tags, host) + } + } + } + + if t.cfg.Quantiles { + fullName := fmt.Sprintf("%s.quantile", name) + quantiles := p.QuantileValues() + for i := 0; i < quantiles.Len(); i++ { + q := quantiles.At(i) + + if t.isSkippable(fullName, q.Value()) { + continue + } + + quantileTags := []string{getQuantileTag(q.Quantile())} + quantileTags = append(quantileTags, tags...) + consumer.ConsumeTimeSeries(ctx, fullName, Gauge, ts, q.Value(), quantileTags, host) + } + } + } +} + +// MapMetrics maps OTLP metrics into the DataDog format +func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics, consumer Consumer) error { + rms := md.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + + var attributeTags []string + + // Only fetch attribute tags if they're not already converted into labels. + // Otherwise some tags would be present twice in a metric's tag list. + if !t.cfg.ResourceAttributesAsTags { + attributeTags = attributes.TagsFromAttributes(rm.Resource().Attributes()) + } + + host, ok := attributes.HostnameFromAttributes(rm.Resource().Attributes()) + if !ok { + var err error + host, err = t.cfg.fallbackHostnameProvider.Hostname(context.Background()) + if err != nil { + return fmt.Errorf("failed to get fallback host: %w", err) + } + } + + // Track hosts if the consumer is a HostConsumer. + if c, ok := consumer.(HostConsumer); ok { + c.ConsumeHost(host) + } + + ilms := rm.InstrumentationLibraryMetrics() + for j := 0; j < ilms.Len(); j++ { + ilm := ilms.At(j) + metricsArray := ilm.Metrics() + for k := 0; k < metricsArray.Len(); k++ { + md := metricsArray.At(k) + switch md.DataType() { + case pdata.MetricDataTypeGauge: + t.mapNumberMetrics(ctx, consumer, md.Name(), Gauge, md.Gauge().DataPoints(), attributeTags, host) + case pdata.MetricDataTypeSum: + switch md.Sum().AggregationTemporality() { + case pdata.AggregationTemporalityCumulative: + if t.cfg.SendMonotonic && isCumulativeMonotonic(md) { + t.mapNumberMonotonicMetrics(ctx, consumer, md.Name(), md.Sum().DataPoints(), attributeTags, host) + } else { + t.mapNumberMetrics(ctx, consumer, md.Name(), Gauge, md.Sum().DataPoints(), attributeTags, host) + } + case pdata.AggregationTemporalityDelta: + t.mapNumberMetrics(ctx, consumer, md.Name(), Count, md.Sum().DataPoints(), attributeTags, host) + default: // pdata.AggregationTemporalityUnspecified or any other not supported type + t.logger.Debug("Unknown or unsupported aggregation temporality", + zap.String(metricName, md.Name()), + zap.Any("aggregation temporality", md.Sum().AggregationTemporality()), + ) + continue + } + case pdata.MetricDataTypeHistogram: + switch md.Histogram().AggregationTemporality() { + case pdata.AggregationTemporalityCumulative, pdata.AggregationTemporalityDelta: + delta := md.Histogram().AggregationTemporality() == pdata.AggregationTemporalityDelta + t.mapHistogramMetrics(ctx, consumer, md.Name(), md.Histogram().DataPoints(), delta, attributeTags, host) + default: // pdata.AggregationTemporalityUnspecified or any other not supported type + t.logger.Debug("Unknown or unsupported aggregation temporality", + zap.String("metric name", md.Name()), + zap.Any("aggregation temporality", md.Histogram().AggregationTemporality()), + ) + continue + } + case pdata.MetricDataTypeSummary: + t.mapSummaryMetrics(ctx, consumer, md.Name(), md.Summary().DataPoints(), attributeTags, host) + default: // pdata.MetricDataTypeNone or any other not supported type + t.logger.Debug("Unknown or unsupported metric type", zap.String(metricName, md.Name()), zap.Any("data type", md.DataType())) + continue + } + } + } + } + + return nil +} diff --git a/pkg/otlp/model/translator/metrics_translator_test.go b/pkg/otlp/model/translator/metrics_translator_test.go new file mode 100644 index 0000000000000..8b59d8d3aa9af --- /dev/null +++ b/pkg/otlp/model/translator/metrics_translator_test.go @@ -0,0 +1,1072 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + "math" + "testing" + "time" + + "github.com/DataDog/datadog-agent/pkg/quantile" + gocache "github.com/patrickmn/go-cache" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + + "github.com/DataDog/datadog-agent/pkg/otlp/model/attributes" +) + +func TestGetTags(t *testing.T) { + attributes := pdata.NewAttributeMapFromMap(map[string]pdata.AttributeValue{ + "key1": pdata.NewAttributeValueString("val1"), + "key2": pdata.NewAttributeValueString("val2"), + "key3": pdata.NewAttributeValueString(""), + }) + + assert.ElementsMatch(t, + getTags(attributes), + [...]string{"key1:val1", "key2:val2", "key3:n/a"}, + ) +} + +func TestIsCumulativeMonotonic(t *testing.T) { + // Some of these examples are from the hostmetrics receiver + // and reflect the semantic meaning of the metrics there. + // + // If the receiver changes these examples should be added here too + + { // Sum: Cumulative but not monotonic + metric := pdata.NewMetric() + metric.SetName("system.filesystem.usage") + metric.SetDescription("Filesystem bytes used.") + metric.SetUnit("bytes") + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(false) + sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + + assert.False(t, isCumulativeMonotonic(metric)) + } + + { // Sum: Cumulative and monotonic + metric := pdata.NewMetric() + metric.SetName("system.network.packets") + metric.SetDescription("The number of packets transferred.") + metric.SetUnit("1") + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + + assert.True(t, isCumulativeMonotonic(metric)) + } + + { // DoubleSumL Cumulative and monotonic + metric := pdata.NewMetric() + metric.SetName("metric.example") + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + + assert.True(t, isCumulativeMonotonic(metric)) + } + + { // Not IntSum + metric := pdata.NewMetric() + metric.SetName("system.cpu.load_average.1m") + metric.SetDescription("Average CPU Load over 1 minute.") + metric.SetUnit("1") + metric.SetDataType(pdata.MetricDataTypeGauge) + + assert.False(t, isCumulativeMonotonic(metric)) + } +} + +type testProvider string + +func (t testProvider) Hostname(context.Context) (string, error) { + return string(t), nil +} + +func newTranslator(t *testing.T, logger *zap.Logger) *Translator { + tr, err := New( + logger, + WithFallbackHostnameProvider(testProvider("fallbackHostname")), + WithCountSumMetrics(), + WithHistogramMode(HistogramModeNoBuckets), + WithNumberMode(NumberModeCumulativeToDelta), + ) + + require.NoError(t, err) + return tr +} + +type metric struct { + name string + typ MetricDataType + timestamp uint64 + value float64 + tags []string + host string +} + +var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil) + +type mockTimeSeriesConsumer struct { + metrics []metric +} + +func (m *mockTimeSeriesConsumer) ConsumeTimeSeries( + _ context.Context, + name string, + typ MetricDataType, + ts uint64, + val float64, + tags []string, + host string, +) { + m.metrics = append(m.metrics, + metric{ + name: name, + typ: typ, + timestamp: ts, + value: val, + tags: tags, + host: host, + }, + ) +} + +func newGauge(name string, ts uint64, val float64, tags []string) metric { + return metric{name: name, typ: Gauge, timestamp: ts, value: val, tags: tags} +} + +func newCount(name string, ts uint64, val float64, tags []string) metric { + return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags} +} + +func TestMapIntMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := pdata.NewNumberDataPointSlice() + point := slice.AppendEmpty() + point.SetIntVal(17) + point.SetTimestamp(ts) + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.test", Gauge, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{newGauge("int64.test", uint64(ts), 17, []string{})}, + ) + + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.delta.test", Count, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{newCount("int64.delta.test", uint64(ts), 17, []string{})}, + ) + + // With attribute tags + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.test", Gauge, slice, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{newGauge("int64.test", uint64(ts), 17, []string{"attribute_tag:attribute_value"})}, + ) +} + +func TestMapDoubleMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := pdata.NewNumberDataPointSlice() + point := slice.AppendEmpty() + point.SetDoubleVal(math.Pi) + point.SetTimestamp(ts) + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.test", Gauge, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{newGauge("float64.test", uint64(ts), math.Pi, []string{})}, + ) + + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.delta.test", Count, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{newCount("float64.delta.test", uint64(ts), math.Pi, []string{})}, + ) + + // With attribute tags + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.test", Gauge, slice, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{newGauge("float64.test", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"})}, + ) +} + +func seconds(i int) pdata.Timestamp { + return pdata.NewTimestampFromTime(time.Unix(int64(i), 0)) +} + +func TestMapIntMonotonicMetrics(t *testing.T) { + // Create list of values + deltas := []int64{1, 2, 200, 3, 7, 0} + cumulative := make([]int64, len(deltas)+1) + cumulative[0] = 0 + for i := 1; i < len(cumulative); i++ { + cumulative[i] = cumulative[i-1] + deltas[i-1] + } + + //Map to OpenTelemetry format + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(cumulative)) + for i, val := range cumulative { + point := slice.AppendEmpty() + point.SetIntVal(val) + point.SetTimestamp(seconds(i)) + } + + // Map to Datadog format + metricName := "metric.example" + expected := make([]metric, len(deltas)) + for i, val := range deltas { + expected[i] = newCount(metricName, uint64(seconds(i+1)), float64(val), []string{}) + } + + ctx := context.Background() + consumer := &mockTimeSeriesConsumer{} + tr := newTranslator(t, zap.NewNop()) + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + + assert.ElementsMatch(t, expected, consumer.metrics) +} + +func TestMapIntMonotonicDifferentDimensions(t *testing.T) { + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + + // No tags + point := slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + + point = slice.AppendEmpty() + point.SetIntVal(20) + point.SetTimestamp(seconds(1)) + + // One tag: valA + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valA") + + point = slice.AppendEmpty() + point.SetIntVal(30) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valA") + + // same tag: valB + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valB") + + point = slice.AppendEmpty() + point.SetIntVal(40) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valB") + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 20, []string{}), + newCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + newCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + }, + ) +} + +func TestMapIntMonotonicWithReboot(t *testing.T) { + values := []int64{0, 30, 0, 20} + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(i)) + point.SetIntVal(val) + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 30, []string{}), + newCount(metricName, uint64(seconds(3)), 20, []string{}), + }, + ) +} + +func TestMapIntMonotonicOutOfOrder(t *testing.T) { + stamps := []int{1, 0, 2, 3} + values := []int64{0, 1, 2, 3} + + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(stamps[i])) + point.SetIntVal(val) + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 2, []string{}), + newCount(metricName, uint64(seconds(3)), 1, []string{}), + }, + ) +} + +func TestMapDoubleMonotonicMetrics(t *testing.T) { + deltas := []float64{1, 2, 200, 3, 7, 0} + cumulative := make([]float64, len(deltas)+1) + cumulative[0] = 0 + for i := 1; i < len(cumulative); i++ { + cumulative[i] = cumulative[i-1] + deltas[i-1] + } + + //Map to OpenTelemetry format + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(cumulative)) + for i, val := range cumulative { + point := slice.AppendEmpty() + point.SetDoubleVal(val) + point.SetTimestamp(seconds(i)) + } + + // Map to Datadog format + metricName := "metric.example" + expected := make([]metric, len(deltas)) + for i, val := range deltas { + expected[i] = newCount(metricName, uint64(seconds(i+1)), val, []string{}) + } + + ctx := context.Background() + consumer := &mockTimeSeriesConsumer{} + tr := newTranslator(t, zap.NewNop()) + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + + assert.ElementsMatch(t, expected, consumer.metrics) +} + +func TestMapDoubleMonotonicDifferentDimensions(t *testing.T) { + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + + // No tags + point := slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + + point = slice.AppendEmpty() + point.SetDoubleVal(20) + point.SetTimestamp(seconds(1)) + + // One tag: valA + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valA") + + point = slice.AppendEmpty() + point.SetDoubleVal(30) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valA") + + // one tag: valB + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valB") + + point = slice.AppendEmpty() + point.SetDoubleVal(40) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valB") + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 20, []string{}), + newCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + newCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + }, + ) +} + +func TestMapDoubleMonotonicWithReboot(t *testing.T) { + values := []float64{0, 30, 0, 20} + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(2 * i)) + point.SetDoubleVal(val) + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 30, []string{}), + newCount(metricName, uint64(seconds(6)), 20, []string{}), + }, + ) +} + +func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { + stamps := []int{1, 0, 2, 3} + values := []float64{0, 1, 2, 3} + + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(stamps[i])) + point.SetDoubleVal(val) + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 2, []string{}), + newCount(metricName, uint64(seconds(3)), 1, []string{}), + }, + ) +} + +type mockFullConsumer struct { + mockTimeSeriesConsumer + anySketch bool +} + +func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) { + c.anySketch = true +} + +func TestMapDeltaHistogramMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := pdata.NewHistogramDataPointSlice() + point := slice.AppendEmpty() + point.SetCount(20) + point.SetSum(math.Pi) + point.SetBucketCounts([]uint64{2, 18}) + point.SetExplicitBounds([]float64{0}) + point.SetTimestamp(ts) + + noBuckets := []metric{ + newCount("doubleHist.test.count", uint64(ts), 20, []string{}), + newCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{}), + } + + buckets := []metric{ + newCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0"}), + newCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf"}), + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + delta := true + + tr.cfg.HistMode = HistogramModeNoBuckets + consumer := &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, noBuckets, consumer.metrics) + assert.False(t, consumer.anySketch) + + tr.cfg.HistMode = HistogramModeCounters + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics) + assert.False(t, consumer.anySketch) + + // With attribute tags + noBucketsAttributeTags := []metric{ + newCount("doubleHist.test.count", uint64(ts), 20, []string{"attribute_tag:attribute_value"}), + newCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"}), + } + + bucketsAttributeTags := []metric{ + newCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0", "attribute_tag:attribute_value"}), + newCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf", "attribute_tag:attribute_value"}), + } + + tr.cfg.HistMode = HistogramModeNoBuckets + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) + assert.False(t, consumer.anySketch) + + tr.cfg.HistMode = HistogramModeCounters + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics) + assert.False(t, consumer.anySketch) +} + +func TestMapCumulativeHistogramMetrics(t *testing.T) { + slice := pdata.NewHistogramDataPointSlice() + point := slice.AppendEmpty() + point.SetCount(20) + point.SetSum(math.Pi) + point.SetBucketCounts([]uint64{2, 18}) + point.SetExplicitBounds([]float64{0}) + point.SetTimestamp(seconds(0)) + + point = slice.AppendEmpty() + point.SetCount(20 + 30) + point.SetSum(math.Pi + 20) + point.SetBucketCounts([]uint64{2 + 11, 18 + 2}) + point.SetExplicitBounds([]float64{0}) + point.SetTimestamp(seconds(2)) + + expected := []metric{ + newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), + newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + delta := false + + tr.cfg.HistMode = HistogramModeCounters + consumer := &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.False(t, consumer.anySketch) + assert.ElementsMatch(t, + consumer.metrics, + expected, + ) +} + +func TestLegacyBucketsTags(t *testing.T) { + // Test that passing the same tags slice doesn't reuse the slice. + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + + tags := make([]string, 0, 10) + + pointOne := pdata.NewHistogramDataPoint() + pointOne.SetBucketCounts([]uint64{2, 18}) + pointOne.SetExplicitBounds([]float64{0}) + pointOne.SetTimestamp(seconds(0)) + consumer := &mockTimeSeriesConsumer{} + tr.getLegacyBuckets(ctx, consumer, "test.histogram.one", pointOne, true, tags, "") + seriesOne := consumer.metrics + + pointTwo := pdata.NewHistogramDataPoint() + pointTwo.SetBucketCounts([]uint64{2, 18}) + pointTwo.SetExplicitBounds([]float64{1}) + pointTwo.SetTimestamp(seconds(0)) + consumer = &mockTimeSeriesConsumer{} + tr.getLegacyBuckets(ctx, consumer, "test.histogram.two", pointTwo, true, tags, "") + seriesTwo := consumer.metrics + + assert.ElementsMatch(t, seriesOne[0].tags, []string{"lower_bound:-inf", "upper_bound:0"}) + assert.ElementsMatch(t, seriesTwo[0].tags, []string{"lower_bound:-inf", "upper_bound:1.0"}) +} + +func TestFormatFloat(t *testing.T) { + tests := []struct { + f float64 + s string + }{ + {f: 0, s: "0"}, + {f: 0.001, s: "0.001"}, + {f: 0.9, s: "0.9"}, + {f: 0.95, s: "0.95"}, + {f: 0.99, s: "0.99"}, + {f: 0.999, s: "0.999"}, + {f: 1, s: "1.0"}, + {f: 2, s: "2.0"}, + {f: math.Inf(1), s: "inf"}, + {f: math.Inf(-1), s: "-inf"}, + {f: math.NaN(), s: "nan"}, + {f: 1e-10, s: "1e-10"}, + } + + for _, test := range tests { + assert.Equal(t, test.s, formatFloat(test.f)) + } +} + +func exampleSummaryDataPointSlice(ts pdata.Timestamp, sum float64, count uint64) pdata.SummaryDataPointSlice { + slice := pdata.NewSummaryDataPointSlice() + point := slice.AppendEmpty() + point.SetCount(count) + point.SetSum(sum) + qSlice := point.QuantileValues() + + qMin := qSlice.AppendEmpty() + qMin.SetQuantile(0.0) + qMin.SetValue(0) + + qMedian := qSlice.AppendEmpty() + qMedian.SetQuantile(0.5) + qMedian.SetValue(100) + + q999 := qSlice.AppendEmpty() + q999.SetQuantile(0.999) + q999.SetValue(500) + + qMax := qSlice.AppendEmpty() + qMax.SetQuantile(1) + qMax.SetValue(600) + point.SetTimestamp(ts) + return slice +} + +func TestMapSummaryMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := exampleSummaryDataPointSlice(ts, 10_001, 101) + + newTranslator := func(tags []string, quantiles bool) *Translator { + c := newTestCache() + c.cache.Set(c.metricDimensionsToMapKey("summary.example.count", tags), numberCounter{0, 1}, gocache.NoExpiration) + c.cache.Set(c.metricDimensionsToMapKey("summary.example.sum", tags), numberCounter{0, 1}, gocache.NoExpiration) + options := []Option{WithFallbackHostnameProvider(testProvider("fallbackHostname"))} + if quantiles { + options = append(options, WithQuantiles()) + } + tr, err := New(zap.NewNop(), options...) + require.NoError(t, err) + tr.prevPts = c + return tr + } + + noQuantiles := []metric{ + newCount("summary.example.count", uint64(ts), 100, []string{}), + newCount("summary.example.sum", uint64(ts), 10_000, []string{}), + } + quantiles := []metric{ + newGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0"}), + newGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5"}), + newGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999"}), + newGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0"}), + } + ctx := context.Background() + tr := newTranslator([]string{}, false) + consumer := &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + noQuantiles, + ) + tr = newTranslator([]string{}, true) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + append(noQuantiles, quantiles...), + ) + + noQuantilesAttr := []metric{ + newCount("summary.example.count", uint64(ts), 100, []string{"attribute_tag:attribute_value"}), + newCount("summary.example.sum", uint64(ts), 10_000, []string{"attribute_tag:attribute_value"}), + } + + quantilesAttr := []metric{ + newGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0", "attribute_tag:attribute_value"}), + newGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5", "attribute_tag:attribute_value"}), + newGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999", "attribute_tag:attribute_value"}), + newGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0", "attribute_tag:attribute_value"}), + } + tr = newTranslator([]string{"attribute_tag:attribute_value"}, false) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, + consumer.metrics, + noQuantilesAttr, + ) + tr = newTranslator([]string{"attribute_tag:attribute_value"}, true) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, + consumer.metrics, + append(noQuantilesAttr, quantilesAttr...), + ) +} + +const ( + testHostname = "res-hostname" +) + +func createTestMetrics() pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + + attrs := rm.Resource().Attributes() + attrs.InsertString(attributes.AttributeDatadogHostname, testHostname) + ilms := rm.InstrumentationLibraryMetrics() + + metricsArray := ilms.AppendEmpty().Metrics() + metricsArray.AppendEmpty() // first one is TypeNone to test that it's ignored + + // IntGauge + met := metricsArray.AppendEmpty() + met.SetName("int.gauge") + met.SetDataType(pdata.MetricDataTypeGauge) + dpsInt := met.Gauge().DataPoints() + dpInt := dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(1) + + // DoubleGauge + met = metricsArray.AppendEmpty() + met.SetName("double.gauge") + met.SetDataType(pdata.MetricDataTypeGauge) + dpsDouble := met.Gauge().DataPoints() + dpDouble := dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.Pi) + + // aggregation unspecified sum + met = metricsArray.AppendEmpty() + met.SetName("unspecified.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityUnspecified) + + // Int Sum (delta) + met = metricsArray.AppendEmpty() + met.SetName("int.delta.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsInt = met.Sum().DataPoints() + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(2) + + // Double Sum (delta) + met = metricsArray.AppendEmpty() + met.SetName("double.delta.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.E) + + // Int Sum (delta monotonic) + met = metricsArray.AppendEmpty() + met.SetName("int.delta.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsInt = met.Sum().DataPoints() + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(2) + + // Double Sum (delta monotonic) + met = metricsArray.AppendEmpty() + met.SetName("double.delta.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.E) + + // aggregation unspecified histogram + met = metricsArray.AppendEmpty() + met.SetName("unspecified.histogram") + met.SetDataType(pdata.MetricDataTypeHistogram) + met.Histogram().SetAggregationTemporality(pdata.AggregationTemporalityUnspecified) + + // Histogram (delta) + met = metricsArray.AppendEmpty() + met.SetName("double.histogram") + met.SetDataType(pdata.MetricDataTypeHistogram) + met.Histogram().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDoubleHist := met.Histogram().DataPoints() + dpDoubleHist := dpsDoubleHist.AppendEmpty() + dpDoubleHist.SetCount(20) + dpDoubleHist.SetSum(math.Phi) + dpDoubleHist.SetBucketCounts([]uint64{2, 18}) + dpDoubleHist.SetExplicitBounds([]float64{0}) + dpDoubleHist.SetTimestamp(seconds(0)) + + // Int Sum (cumulative) + met = metricsArray.AppendEmpty() + met.SetName("int.cumulative.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dpsInt = met.Sum().DataPoints() + dpsInt.EnsureCapacity(2) + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(4) + + // Double Sum (cumulative) + met = metricsArray.AppendEmpty() + met.SetName("double.cumulative.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(4) + + // Int Sum (cumulative monotonic) + met = metricsArray.AppendEmpty() + met.SetName("int.cumulative.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + met.Sum().SetIsMonotonic(true) + dpsInt = met.Sum().DataPoints() + dpsInt.EnsureCapacity(2) + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(4) + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(2)) + dpInt.SetIntVal(7) + + // Double Sum (cumulative monotonic) + met = metricsArray.AppendEmpty() + met.SetName("double.cumulative.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + met.Sum().SetIsMonotonic(true) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(4) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(2)) + dpDouble.SetDoubleVal(4 + math.Pi) + + // Summary + met = metricsArray.AppendEmpty() + met.SetName("summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice := exampleSummaryDataPointSlice(seconds(0), 1, 1) + slice.CopyTo(met.Summary().DataPoints()) + + met = metricsArray.AppendEmpty() + met.SetName("summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice = exampleSummaryDataPointSlice(seconds(2), 10_001, 101) + slice.CopyTo(met.Summary().DataPoints()) + return md +} + +func testGauge(name string, val float64) metric { + m := newGauge(name, 0, val, []string{}) + m.host = testHostname + return m +} + +func testCount(name string, val float64, seconds uint64) metric { + m := newCount(name, seconds*1e9, val, []string{}) + m.host = testHostname + return m +} + +func TestMapMetrics(t *testing.T) { + md := createTestMetrics() + + core, observed := observer.New(zapcore.DebugLevel) + testLogger := zap.New(core) + ctx := context.Background() + consumer := &mockFullConsumer{} + tr := newTranslator(t, testLogger) + err := tr.MapMetrics(ctx, md, consumer) + require.NoError(t, err) + assert.False(t, consumer.anySketch) + + assert.ElementsMatch(t, consumer.metrics, []metric{ + testGauge("int.gauge", 1), + testGauge("double.gauge", math.Pi), + testCount("int.delta.sum", 2, 0), + testCount("double.delta.sum", math.E, 0), + testCount("int.delta.monotonic.sum", 2, 0), + testCount("double.delta.monotonic.sum", math.E, 0), + testCount("double.histogram.sum", math.Phi, 0), + testCount("double.histogram.count", 20, 0), + testCount("summary.sum", 10_000, 2), + testCount("summary.count", 100, 2), + testGauge("int.cumulative.sum", 4), + testGauge("double.cumulative.sum", 4), + testCount("int.cumulative.monotonic.sum", 3, 2), + testCount("double.cumulative.monotonic.sum", math.Pi, 2), + }) + + // One metric type was unknown or unsupported + assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1) + // Two metric aggregation temporality was unknown or unsupported + assert.Equal(t, observed.FilterMessage("Unknown or unsupported aggregation temporality").Len(), 2) +} + +func createNaNMetrics() pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + + attrs := rm.Resource().Attributes() + attrs.InsertString(attributes.AttributeDatadogHostname, testHostname) + ilms := rm.InstrumentationLibraryMetrics() + + metricsArray := ilms.AppendEmpty().Metrics() + + // DoubleGauge + met := metricsArray.AppendEmpty() + met.SetName("nan.gauge") + met.SetDataType(pdata.MetricDataTypeGauge) + dpsDouble := met.Gauge().DataPoints() + dpDouble := dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Double Sum (delta) + met = metricsArray.AppendEmpty() + met.SetName("nan.delta.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Double Sum (delta monotonic) + met = metricsArray.AppendEmpty() + met.SetName("nan.delta.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Histogram + met = metricsArray.AppendEmpty() + met.SetName("nan.histogram") + met.SetDataType(pdata.MetricDataTypeHistogram) + met.Histogram().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDoubleHist := met.Histogram().DataPoints() + dpDoubleHist := dpsDoubleHist.AppendEmpty() + dpDoubleHist.SetCount(20) + dpDoubleHist.SetSum(math.NaN()) + dpDoubleHist.SetBucketCounts([]uint64{2, 18}) + dpDoubleHist.SetTimestamp(seconds(0)) + + // Double Sum (cumulative) + met = metricsArray.AppendEmpty() + met.SetName("nan.cumulative.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Double Sum (cumulative monotonic) + met = metricsArray.AppendEmpty() + met.SetName("nan.cumulative.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + met.Sum().SetIsMonotonic(true) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Summary + met = metricsArray.AppendEmpty() + met.SetName("nan.summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice := exampleSummaryDataPointSlice(seconds(0), math.NaN(), 1) + slice.CopyTo(met.Summary().DataPoints()) + + met = metricsArray.AppendEmpty() + met.SetName("nan.summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice = exampleSummaryDataPointSlice(seconds(2), 10_001, 101) + slice.CopyTo(met.Summary().DataPoints()) + return md +} + +func TestNaNMetrics(t *testing.T) { + md := createNaNMetrics() + + core, observed := observer.New(zapcore.DebugLevel) + testLogger := zap.New(core) + ctx := context.Background() + tr := newTranslator(t, testLogger) + consumer := &mockFullConsumer{} + err := tr.MapMetrics(ctx, md, consumer) + assert.False(t, consumer.anySketch) + require.NoError(t, err) + + assert.ElementsMatch(t, consumer.metrics, []metric{ + testCount("nan.histogram.count", 20, 0), + testCount("nan.summary.count", 100, 2), + }) + + // One metric type was unknown or unsupported + assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7) +} diff --git a/pkg/otlp/model/translator/sketches_test.go b/pkg/otlp/model/translator/sketches_test.go new file mode 100644 index 0000000000000..05c1eb7365f85 --- /dev/null +++ b/pkg/otlp/model/translator/sketches_test.go @@ -0,0 +1,213 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + "fmt" + "math" + "testing" + + "github.com/DataDog/datadog-agent/pkg/quantile" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +var _ SketchConsumer = (*sketchConsumer)(nil) + +type sketchConsumer struct { + sk *quantile.Sketch +} + +// ConsumeSketch implements the translator.Consumer interface. +func (c *sketchConsumer) ConsumeSketch( + _ context.Context, + _ string, + _ uint64, + sketch *quantile.Sketch, + _ []string, + _ string, +) { + c.sk = sketch +} + +func TestHistogramSketches(t *testing.T) { + N := 1_000 + M := 50_000.0 + + // Given a cumulative distribution function for a distribution + // with support [0, N], generate an OTLP Histogram data point with N buckets, + // (-inf, 0], (0, 1], ..., (N-1, N], (N, inf) + // which contains N*M uniform samples of the distribution. + fromCDF := func(cdf func(x float64) float64) pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + bounds := make([]float64, N+1) + buckets := make([]uint64, N+2) + buckets[0] = 0 + count := uint64(0) + for i := 0; i < N; i++ { + bounds[i] = float64(i) + // the bucket with bounds (i, i+1) has the + // cdf delta between the bounds as a value. + buckets[i+1] = uint64((cdf(float64(i+1)) - cdf(float64(i))) * M) + count += buckets[i+1] + } + bounds[N] = float64(N) + buckets[N+1] = 0 + p.SetExplicitBounds(bounds) + p.SetBucketCounts(buckets) + p.SetCount(count) + return p + } + + tests := []struct { + // distribution name + name string + // the cumulative distribution function (within [0,N]) + cdf func(x float64) float64 + // error tolerance for testing cdf(quantile(q)) ≈ q + epsilon float64 + }{ + { + // https://en.wikipedia.org/wiki/Continuous_uniform_distribution + name: "Uniform distribution (a=0,b=N)", + cdf: func(x float64) float64 { return x / float64(N) }, + epsilon: 0.01, + }, + { + // https://en.wikipedia.org/wiki/U-quadratic_distribution + name: "U-quadratic distribution (a=0,b=N)", + cdf: func(x float64) float64 { + a := 0.0 + b := float64(N) + alpha := 12.0 / math.Pow(b-a, 3) + beta := (b + a) / 2.0 + return alpha / 3 * (math.Pow(x-beta, 3) + math.Pow(beta-alpha, 3)) + }, + epsilon: 0.025, + }, + } + + defaultEps := 1.0 / 128.0 + tol := 1e-8 + cfg := quantile.Default() + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := fromCDF(test.cdf) + consumer := &sketchConsumer{} + tr.getSketchBuckets(ctx, consumer, "test", 0, p, true, []string{}, "") + sk := consumer.sk + + // Check the minimum is 0.0 + assert.Equal(t, 0.0, sk.Quantile(cfg, 0)) + // Check the quantiles are approximately correct + for i := 1; i <= 99; i++ { + q := (float64(i)) / 100.0 + assert.InEpsilon(t, + // test that the CDF is the (approximate) inverse of the quantile function + test.cdf(sk.Quantile(cfg, q)), + q, + test.epsilon, + fmt.Sprintf("error too high for p%d", i), + ) + } + + cumulSum := uint64(0) + for i := 0; i < len(p.BucketCounts())-3; i++ { + { + q := float64(cumulSum) / float64(p.Count()) * (1 - tol) + quantileValue := sk.Quantile(cfg, q) + // quantileValue, if computed from the explicit buckets, would have to be <= bounds[i]. + // Because of remapping, it is <= bounds[i+1]. + // Because of DDSketch accuracy guarantees, it is <= bounds[i+1] * (1 + defaultEps) + maxExpectedQuantileValue := p.ExplicitBounds()[i+1] * (1 + defaultEps) + assert.LessOrEqual(t, quantileValue, maxExpectedQuantileValue) + } + + cumulSum += p.BucketCounts()[i+1] + + { + q := float64(cumulSum) / float64(p.Count()) * (1 + tol) + quantileValue := sk.Quantile(cfg, q) + // quantileValue, if computed from the explicit buckets, would have to be >= bounds[i+1]. + // Because of remapping, it is >= bounds[i]. + // Because of DDSketch accuracy guarantees, it is >= bounds[i] * (1 - defaultEps) + minExpectedQuantileValue := p.ExplicitBounds()[i] * (1 - defaultEps) + assert.GreaterOrEqual(t, quantileValue, minExpectedQuantileValue) + } + } + }) + } +} + +func TestInfiniteBounds(t *testing.T) { + + tests := []struct { + name string + getHist func() pdata.HistogramDataPoint + }{ + { + name: "(-inf, inf): 100", + getHist: func() pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + p.SetExplicitBounds([]float64{}) + p.SetBucketCounts([]uint64{100}) + p.SetCount(100) + p.SetSum(0) + return p + }, + }, + { + name: "(-inf, 0]: 100, (0, +inf]: 100", + getHist: func() pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + p.SetExplicitBounds([]float64{0}) + p.SetBucketCounts([]uint64{100, 100}) + p.SetCount(200) + p.SetSum(0) + return p + }, + }, + { + name: "(-inf, -1]: 100, (-1, 1]: 10, (1, +inf]: 100", + getHist: func() pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + p.SetExplicitBounds([]float64{-1, 1}) + p.SetBucketCounts([]uint64{100, 10, 100}) + p.SetCount(210) + p.SetSum(0) + return p + }, + }, + } + + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) + for _, testInstance := range tests { + t.Run(testInstance.name, func(t *testing.T) { + p := testInstance.getHist() + consumer := &sketchConsumer{} + tr.getSketchBuckets(ctx, consumer, "test", 0, p, true, []string{}, "") + sk := consumer.sk + assert.InDelta(t, sk.Basic.Sum, p.Sum(), 1) + assert.Equal(t, uint64(sk.Basic.Cnt), p.Count()) + }) + } + +} diff --git a/pkg/otlp/model/translator/ttlcache.go b/pkg/otlp/model/translator/ttlcache.go new file mode 100644 index 0000000000000..1d164aa6cbf1c --- /dev/null +++ b/pkg/otlp/model/translator/ttlcache.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "sort" + "strings" + "time" + + gocache "github.com/patrickmn/go-cache" +) + +const ( + metricKeySeparator = string(byte(0)) +) + +type ttlCache struct { + cache *gocache.Cache +} + +// numberCounter keeps the value of a number +// monotonic counter at a given point in time +type numberCounter struct { + ts uint64 + value float64 +} + +func newTTLCache(sweepInterval int64, deltaTTL int64) *ttlCache { + cache := gocache.New(time.Duration(deltaTTL)*time.Second, time.Duration(sweepInterval)*time.Second) + return &ttlCache{cache} +} + +// Uses a logic similar to what is done in the span processor to build metric keys: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b2327211df976e0a57ef0425493448988772a16b/processor/spanmetricsprocessor/processor.go#L353-L387 +// TODO: make this a public util function? +func concatDimensionValue(metricKeyBuilder *strings.Builder, value string) { + metricKeyBuilder.WriteString(value) + metricKeyBuilder.WriteString(metricKeySeparator) +} + +// metricDimensionsToMapKey maps name and tags to a string to use as an identifier +// The tags order does not matter +func (*ttlCache) metricDimensionsToMapKey(name string, tags []string) string { + var metricKeyBuilder strings.Builder + + dimensions := make([]string, len(tags)) + copy(dimensions, tags) + + dimensions = append(dimensions, name) + sort.Strings(dimensions) + + for _, dim := range dimensions { + concatDimensionValue(&metricKeyBuilder, dim) + } + return metricKeyBuilder.String() +} + +// putAndGetDiff submits a new value for a given metric and returns the difference with the +// last submitted value (ordered by timestamp). The diff value is only valid if `ok` is true. +func (t *ttlCache) putAndGetDiff(name string, tags []string, ts uint64, val float64) (dx float64, ok bool) { + key := t.metricDimensionsToMapKey(name, tags) + if c, found := t.cache.Get(key); found { + cnt := c.(numberCounter) + if cnt.ts > ts { + // We were given a point older than the one in memory so we drop it + // We keep the existing point in memory since it is the most recent + return 0, false + } + // if dx < 0, we assume there was a reset, thus we save the point + // but don't export it (it's the first one so we can't do a delta) + dx = val - cnt.value + ok = dx >= 0 + } + + t.cache.Set(key, numberCounter{ts, val}, gocache.DefaultExpiration) + return +} diff --git a/pkg/otlp/model/translator/ttlcache_test.go b/pkg/otlp/model/translator/ttlcache_test.go new file mode 100644 index 0000000000000..c9124fb88d535 --- /dev/null +++ b/pkg/otlp/model/translator/ttlcache_test.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func newTestCache() *ttlCache { + cache := newTTLCache(1800, 3600) + return cache +} +func TestPutAndGetDiff(t *testing.T) { + prevPts := newTestCache() + _, ok := prevPts.putAndGetDiff("test", []string{}, 1, 5) + // no diff since it is the first point + assert.False(t, ok) + _, ok = prevPts.putAndGetDiff("test", []string{}, 0, 0) + // no diff since ts is lower than the stored point + assert.False(t, ok) + _, ok = prevPts.putAndGetDiff("test", []string{}, 2, 2) + // no diff since the value is lower than the stored value + assert.False(t, ok) + dx, ok := prevPts.putAndGetDiff("test", []string{}, 3, 4) + // diff with the most recent point (2,2) + assert.True(t, ok) + assert.Equal(t, 2.0, dx) +} + +func TestMetricDimensionsToMapKey(t *testing.T) { + metricName := "metric.name" + c := newTestCache() + noTags := c.metricDimensionsToMapKey(metricName, []string{}) + someTags := c.metricDimensionsToMapKey(metricName, []string{"key1:val1", "key2:val2"}) + sameTags := c.metricDimensionsToMapKey(metricName, []string{"key2:val2", "key1:val1"}) + diffTags := c.metricDimensionsToMapKey(metricName, []string{"key3:val3"}) + + assert.NotEqual(t, noTags, someTags) + assert.NotEqual(t, someTags, diffTags) + assert.Equal(t, someTags, sameTags) +} + +func TestMetricDimensionsToMapKeyNoTagsChange(t *testing.T) { + // The original metricDimensionsToMapKey had an issue where: + // - if the capacity of the tags array passed to it was higher than its length + // - and the metric name is earlier (in alphabetical order) than one of the tags + // then the original tag array would be modified (without a reallocation, since there is enough capacity), + // and would contain a tag labeled as the metric name, while the final tag (in alphabetical order) + // would get left out. + // This test checks that this doesn't happen anymore. + + metricName := "a.metric.name" + c := newTestCache() + + originalTags := make([]string, 2, 3) + originalTags[0] = "key1:val1" + originalTags[1] = "key2:val2" + c.metricDimensionsToMapKey(metricName, originalTags) + assert.Equal(t, []string{"key1:val1", "key2:val2"}, originalTags) + +}