From 0822753e252e3c83cee2f67cf278526ea33c0185 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 5 Sep 2024 13:26:17 +0100 Subject: [PATCH] [exporter/elasticsearch] Add exponential histogram support (#34818) **Description:** Convert exponential histograms to T-digest histograms supported by Elasticsearch **Link to tracking Issue:** Fixes #34813 **Testing:** Unit tests and exporter test **Documentation:** --- ...csearchexporter_exponential-histogram.yaml | 27 ++++ exporter/elasticsearchexporter/README.md | 3 +- exporter/elasticsearchexporter/exporter.go | 11 +- .../elasticsearchexporter/exporter_test.go | 40 ++++++ .../internal/exphistogram/exphistogram.go | 66 +++++++++ .../exphistogram/exphistogram_test.go | 135 ++++++++++++++++++ exporter/elasticsearchexporter/model.go | 20 +++ 7 files changed, 299 insertions(+), 3 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_exponential-histogram.yaml create mode 100644 exporter/elasticsearchexporter/internal/exphistogram/exphistogram.go create mode 100644 exporter/elasticsearchexporter/internal/exphistogram/exphistogram_test.go diff --git a/.chloggen/elasticsearchexporter_exponential-histogram.yaml b/.chloggen/elasticsearchexporter_exponential-histogram.yaml new file mode 100644 index 000000000000..31bb58e2c7fc --- /dev/null +++ b/.chloggen/elasticsearchexporter_exponential-histogram.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add exponential histogram support + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34813] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 4b3af781306a..5212e0790a7f 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -234,10 +234,9 @@ The metric types supported are: - Gauge - Sum - Histogram +- Exponential histogram - Summary -Exponential Histograms are ignored. - [confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings [configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings [configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 339c7c637623..ee7d697e8d5c 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -222,7 +222,6 @@ func (e *elasticsearchExporter) pushMetricsData( return nil } - // TODO: support exponential histogram switch metric.Type() { case pmetric.MetricTypeSum: dps := metric.Sum().DataPoints() @@ -252,6 +251,16 @@ func (e *elasticsearchExporter) pushMetricsData( continue } } + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + val := exponentialHistogramToValue(dp) + if err := upsertDataPoint(dp, val); err != nil { + errs = append(errs, err) + continue + } + } case pmetric.MetricTypeHistogram: dps := metric.Histogram().DataPoints() for l := 0; l < dps.Len(); l++ { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 2f98fd460879..060b972430c2 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -718,6 +718,46 @@ func TestExporterMetrics(t *testing.T) { assertItemsEqual(t, expected, rec.Items(), false) }) + t.Run("publish exponential histogram", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "ecs" + }) + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + scopeA := resourceMetrics.ScopeMetrics().AppendEmpty() + metricSlice := scopeA.Metrics() + fooMetric := metricSlice.AppendEmpty() + fooMetric.SetName("metric.foo") + fooDps := fooMetric.SetEmptyExponentialHistogram().DataPoints() + fooDp := fooDps.AppendEmpty() + fooDp.SetZeroCount(2) + fooDp.Positive().SetOffset(1) + fooDp.Positive().BucketCounts().FromRaw([]uint64{0, 1, 1, 0}) + + fooDp.Negative().SetOffset(1) + fooDp.Negative().BucketCounts().FromRaw([]uint64{1, 0, 0, 1}) + + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[1,1,2,1,1],"values":[-24.0,-3.0,0.0,6.0,12.0]}}}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) + t.Run("publish only valid data points", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { diff --git a/exporter/elasticsearchexporter/internal/exphistogram/exphistogram.go b/exporter/elasticsearchexporter/internal/exphistogram/exphistogram.go new file mode 100644 index 000000000000..255328f38f1d --- /dev/null +++ b/exporter/elasticsearchexporter/internal/exphistogram/exphistogram.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package exphistogram contains utility functions for exponential histogram conversions. +package exphistogram // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/exphistogram" + +import ( + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// LowerBoundary calculates the lower boundary given index and scale. +// Adopted from https://opentelemetry.io/docs/specs/otel/metrics/data-model/#producer-expectations +func LowerBoundary(index, scale int) float64 { + if scale <= 0 { + return LowerBoundaryNegativeScale(index, scale) + } + // Use this form in case the equation above computes +Inf + // as the lower boundary of a valid bucket. + inverseFactor := math.Ldexp(math.Ln2, -scale) + return 2.0 * math.Exp(float64(index-(1<= 0; i-- { + count := bucketCounts.At(i) + if count == 0 { + continue + } + lb := -LowerBoundary(offset+i+1, scale) + ub := -LowerBoundary(offset+i, scale) + counts = append(counts, int64(count)) + values = append(values, lb+(ub-lb)/2) + } + + if zeroCount := dp.ZeroCount(); zeroCount != 0 { + counts = append(counts, int64(zeroCount)) + values = append(values, 0) + } + + offset = int(dp.Positive().Offset()) + bucketCounts = dp.Positive().BucketCounts() + for i := 0; i < bucketCounts.Len(); i++ { + count := bucketCounts.At(i) + if count == 0 { + continue + } + lb := LowerBoundary(offset+i, scale) + ub := LowerBoundary(offset+i+1, scale) + counts = append(counts, int64(count)) + values = append(values, lb+(ub-lb)/2) + } + return +} diff --git a/exporter/elasticsearchexporter/internal/exphistogram/exphistogram_test.go b/exporter/elasticsearchexporter/internal/exphistogram/exphistogram_test.go new file mode 100644 index 000000000000..654b765eab1a --- /dev/null +++ b/exporter/elasticsearchexporter/internal/exphistogram/exphistogram_test.go @@ -0,0 +1,135 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exphistogram + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestToTDigest(t *testing.T) { + for _, tc := range []struct { + name string + scale int32 + zeroCount uint64 + positiveOffset int32 + positiveBuckets []uint64 + negativeOffset int32 + negativeBuckets []uint64 + + expectedCounts []int64 + expectedValues []float64 + }{ + { + name: "empty", + scale: 0, + expectedCounts: nil, + expectedValues: nil, + }, + { + name: "empty, scale=1", + scale: 1, + expectedCounts: nil, + expectedValues: nil, + }, + { + name: "empty, scale=-1", + scale: -1, + expectedCounts: nil, + expectedValues: nil, + }, + { + name: "zeros", + scale: 0, + zeroCount: 1, + expectedCounts: []int64{1}, + expectedValues: []float64{0}, + }, + { + name: "scale=0", + scale: 0, + zeroCount: 1, + positiveBuckets: []uint64{1, 1}, + negativeBuckets: []uint64{1, 1}, + expectedCounts: []int64{1, 1, 1, 1, 1}, + expectedValues: []float64{-3, -1.5, 0, 1.5, 3}, + }, + { + name: "scale=0, no zeros", + scale: 0, + zeroCount: 0, + positiveBuckets: []uint64{1, 1}, + negativeBuckets: []uint64{1, 1}, + expectedCounts: []int64{1, 1, 1, 1}, + expectedValues: []float64{-3, -1.5, 1.5, 3}, + }, + { + name: "scale=0, offset=1", + scale: 0, + zeroCount: 1, + positiveOffset: 1, + positiveBuckets: []uint64{1, 1}, + negativeOffset: 1, + negativeBuckets: []uint64{1, 1}, + expectedCounts: []int64{1, 1, 1, 1, 1}, + expectedValues: []float64{-6, -3, 0, 3, 6}, + }, + { + name: "scale=0, offset=-1", + scale: 0, + zeroCount: 1, + positiveOffset: -1, + positiveBuckets: []uint64{1, 1}, + negativeOffset: -1, + negativeBuckets: []uint64{1, 1}, + expectedCounts: []int64{1, 1, 1, 1, 1}, + expectedValues: []float64{-1.5, -0.75, 0, 0.75, 1.5}, + }, + { + name: "scale=0, different offsets", + scale: 0, + zeroCount: 1, + positiveOffset: -1, + positiveBuckets: []uint64{1, 1}, + negativeOffset: 1, + negativeBuckets: []uint64{1, 1}, + expectedCounts: []int64{1, 1, 1, 1, 1}, + expectedValues: []float64{-6, -3, 0, 0.75, 1.5}, + }, + { + name: "scale=-1", + scale: -1, + zeroCount: 1, + positiveBuckets: []uint64{1, 1}, + negativeBuckets: []uint64{1, 1}, + expectedCounts: []int64{1, 1, 1, 1, 1}, + expectedValues: []float64{-10, -2.5, 0, 2.5, 10}, + }, + { + name: "scale=1", + scale: 1, + zeroCount: 1, + positiveBuckets: []uint64{1, 1}, + negativeBuckets: []uint64{1, 1}, + expectedCounts: []int64{1, 1, 1, 1, 1}, + expectedValues: []float64{-1.7071067811865475, -1.2071067811865475, 0, 1.2071067811865475, 1.7071067811865475}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + dp := pmetric.NewExponentialHistogramDataPoint() + dp.SetScale(tc.scale) + dp.SetZeroCount(tc.zeroCount) + dp.Positive().SetOffset(tc.positiveOffset) + dp.Positive().BucketCounts().FromRaw(tc.positiveBuckets) + dp.Negative().SetOffset(tc.negativeOffset) + dp.Negative().BucketCounts().FromRaw(tc.negativeBuckets) + + counts, values := ToTDigest(dp) + assert.Equal(t, tc.expectedCounts, counts) + assert.Equal(t, tc.expectedValues, values) + }) + } +} diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index bdf030bfc282..07e51c30fe75 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/exphistogram" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" ) @@ -353,6 +354,25 @@ func summaryToValue(dp pmetric.SummaryDataPoint) pcommon.Value { return vm } +func exponentialHistogramToValue(dp pmetric.ExponentialHistogramDataPoint) pcommon.Value { + counts, values := exphistogram.ToTDigest(dp) + + vm := pcommon.NewValueMap() + m := vm.Map() + vmCounts := m.PutEmptySlice("counts") + vmCounts.EnsureCapacity(len(counts)) + for _, c := range counts { + vmCounts.AppendEmpty().SetInt(c) + } + vmValues := m.PutEmptySlice("values") + vmValues.EnsureCapacity(len(values)) + for _, v := range values { + vmValues.AppendEmpty().SetDouble(v) + } + + return vm +} + func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) { // Histogram conversion function is from // https://github.com/elastic/apm-data/blob/3b28495c3cbdc0902983134276eb114231730249/input/otlp/metrics.go#L277