Skip to content

Commit

Permalink
[exporter/elasticsearch] Add exponential histogram support (open-tele…
Browse files Browse the repository at this point in the history
…metry#34818)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Convert exponential histograms to T-digest histograms supported by
Elasticsearch

**Link to tracking Issue:** <Issue number if applicable>

Fixes open-telemetry#34813

**Testing:** <Describe what testing was performed and which tests were
added.>

Unit tests and exporter test

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored and f7o committed Sep 12, 2024
1 parent 19a2dae commit 0822753
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_exponential-histogram.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add exponential histogram support

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34813]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 1 addition & 2 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,9 @@ The metric types supported are:
- Gauge
- Sum
- Histogram
- Exponential histogram
- Summary

Exponential Histograms are ignored.

[confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings
[configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings
[configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration
Expand Down
11 changes: 10 additions & 1 deletion exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ func (e *elasticsearchExporter) pushMetricsData(
return nil
}

// TODO: support exponential histogram
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
Expand Down Expand Up @@ -252,6 +251,16 @@ func (e *elasticsearchExporter) pushMetricsData(
continue
}
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val := exponentialHistogramToValue(dp)
if err := upsertDataPoint(dp, val); err != nil {
errs = append(errs, err)
continue
}
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
Expand Down
40 changes: 40 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,46 @@ func TestExporterMetrics(t *testing.T) {
assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("publish exponential histogram", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "ecs"
})

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
scopeA := resourceMetrics.ScopeMetrics().AppendEmpty()
metricSlice := scopeA.Metrics()
fooMetric := metricSlice.AppendEmpty()
fooMetric.SetName("metric.foo")
fooDps := fooMetric.SetEmptyExponentialHistogram().DataPoints()
fooDp := fooDps.AppendEmpty()
fooDp.SetZeroCount(2)
fooDp.Positive().SetOffset(1)
fooDp.Positive().BucketCounts().FromRaw([]uint64{0, 1, 1, 0})

fooDp.Negative().SetOffset(1)
fooDp.Negative().BucketCounts().FromRaw([]uint64{1, 0, 0, 1})

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(1)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"default","type":"metrics"},"metric":{"foo":{"counts":[1,1,2,1,1],"values":[-24.0,-3.0,0.0,6.0,12.0]}}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("publish only valid data points", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package exphistogram contains utility functions for exponential histogram conversions.
package exphistogram // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/exphistogram"

import (
"math"

"go.opentelemetry.io/collector/pdata/pmetric"
)

// LowerBoundary calculates the lower boundary given index and scale.
// Adopted from https://opentelemetry.io/docs/specs/otel/metrics/data-model/#producer-expectations
func LowerBoundary(index, scale int) float64 {
if scale <= 0 {
return LowerBoundaryNegativeScale(index, scale)
}
// Use this form in case the equation above computes +Inf
// as the lower boundary of a valid bucket.
inverseFactor := math.Ldexp(math.Ln2, -scale)
return 2.0 * math.Exp(float64(index-(1<<scale))*inverseFactor)
}

// LowerBoundaryNegativeScale calculates the lower boundary for scale <= 0.
// Adopted from https://opentelemetry.io/docs/specs/otel/metrics/data-model/#producer-expectations
func LowerBoundaryNegativeScale(index, scale int) float64 {
return math.Ldexp(1, index<<-scale)
}

// ToTDigest converts an OTLP exponential histogram data point to T-Digest counts and mean centroid values.
func ToTDigest(dp pmetric.ExponentialHistogramDataPoint) (counts []int64, values []float64) {
scale := int(dp.Scale())

offset := int(dp.Negative().Offset())
bucketCounts := dp.Negative().BucketCounts()
for i := bucketCounts.Len() - 1; i >= 0; i-- {
count := bucketCounts.At(i)
if count == 0 {
continue
}
lb := -LowerBoundary(offset+i+1, scale)
ub := -LowerBoundary(offset+i, scale)
counts = append(counts, int64(count))
values = append(values, lb+(ub-lb)/2)
}

if zeroCount := dp.ZeroCount(); zeroCount != 0 {
counts = append(counts, int64(zeroCount))
values = append(values, 0)
}

offset = int(dp.Positive().Offset())
bucketCounts = dp.Positive().BucketCounts()
for i := 0; i < bucketCounts.Len(); i++ {
count := bucketCounts.At(i)
if count == 0 {
continue
}
lb := LowerBoundary(offset+i, scale)
ub := LowerBoundary(offset+i+1, scale)
counts = append(counts, int64(count))
values = append(values, lb+(ub-lb)/2)
}
return
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exphistogram

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func TestToTDigest(t *testing.T) {
for _, tc := range []struct {
name string
scale int32
zeroCount uint64
positiveOffset int32
positiveBuckets []uint64
negativeOffset int32
negativeBuckets []uint64

expectedCounts []int64
expectedValues []float64
}{
{
name: "empty",
scale: 0,
expectedCounts: nil,
expectedValues: nil,
},
{
name: "empty, scale=1",
scale: 1,
expectedCounts: nil,
expectedValues: nil,
},
{
name: "empty, scale=-1",
scale: -1,
expectedCounts: nil,
expectedValues: nil,
},
{
name: "zeros",
scale: 0,
zeroCount: 1,
expectedCounts: []int64{1},
expectedValues: []float64{0},
},
{
name: "scale=0",
scale: 0,
zeroCount: 1,
positiveBuckets: []uint64{1, 1},
negativeBuckets: []uint64{1, 1},
expectedCounts: []int64{1, 1, 1, 1, 1},
expectedValues: []float64{-3, -1.5, 0, 1.5, 3},
},
{
name: "scale=0, no zeros",
scale: 0,
zeroCount: 0,
positiveBuckets: []uint64{1, 1},
negativeBuckets: []uint64{1, 1},
expectedCounts: []int64{1, 1, 1, 1},
expectedValues: []float64{-3, -1.5, 1.5, 3},
},
{
name: "scale=0, offset=1",
scale: 0,
zeroCount: 1,
positiveOffset: 1,
positiveBuckets: []uint64{1, 1},
negativeOffset: 1,
negativeBuckets: []uint64{1, 1},
expectedCounts: []int64{1, 1, 1, 1, 1},
expectedValues: []float64{-6, -3, 0, 3, 6},
},
{
name: "scale=0, offset=-1",
scale: 0,
zeroCount: 1,
positiveOffset: -1,
positiveBuckets: []uint64{1, 1},
negativeOffset: -1,
negativeBuckets: []uint64{1, 1},
expectedCounts: []int64{1, 1, 1, 1, 1},
expectedValues: []float64{-1.5, -0.75, 0, 0.75, 1.5},
},
{
name: "scale=0, different offsets",
scale: 0,
zeroCount: 1,
positiveOffset: -1,
positiveBuckets: []uint64{1, 1},
negativeOffset: 1,
negativeBuckets: []uint64{1, 1},
expectedCounts: []int64{1, 1, 1, 1, 1},
expectedValues: []float64{-6, -3, 0, 0.75, 1.5},
},
{
name: "scale=-1",
scale: -1,
zeroCount: 1,
positiveBuckets: []uint64{1, 1},
negativeBuckets: []uint64{1, 1},
expectedCounts: []int64{1, 1, 1, 1, 1},
expectedValues: []float64{-10, -2.5, 0, 2.5, 10},
},
{
name: "scale=1",
scale: 1,
zeroCount: 1,
positiveBuckets: []uint64{1, 1},
negativeBuckets: []uint64{1, 1},
expectedCounts: []int64{1, 1, 1, 1, 1},
expectedValues: []float64{-1.7071067811865475, -1.2071067811865475, 0, 1.2071067811865475, 1.7071067811865475},
},
} {
t.Run(tc.name, func(t *testing.T) {
dp := pmetric.NewExponentialHistogramDataPoint()
dp.SetScale(tc.scale)
dp.SetZeroCount(tc.zeroCount)
dp.Positive().SetOffset(tc.positiveOffset)
dp.Positive().BucketCounts().FromRaw(tc.positiveBuckets)
dp.Negative().SetOffset(tc.negativeOffset)
dp.Negative().BucketCounts().FromRaw(tc.negativeBuckets)

counts, values := ToTDigest(dp)
assert.Equal(t, tc.expectedCounts, counts)
assert.Equal(t, tc.expectedValues, values)
})
}
}
20 changes: 20 additions & 0 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.22.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/exphistogram"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
)
Expand Down Expand Up @@ -353,6 +354,25 @@ func summaryToValue(dp pmetric.SummaryDataPoint) pcommon.Value {
return vm
}

func exponentialHistogramToValue(dp pmetric.ExponentialHistogramDataPoint) pcommon.Value {
counts, values := exphistogram.ToTDigest(dp)

vm := pcommon.NewValueMap()
m := vm.Map()
vmCounts := m.PutEmptySlice("counts")
vmCounts.EnsureCapacity(len(counts))
for _, c := range counts {
vmCounts.AppendEmpty().SetInt(c)
}
vmValues := m.PutEmptySlice("values")
vmValues.EnsureCapacity(len(values))
for _, v := range values {
vmValues.AppendEmpty().SetDouble(v)
}

return vm
}

func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
// Histogram conversion function is from
// https://github.com/elastic/apm-data/blob/3b28495c3cbdc0902983134276eb114231730249/input/otlp/metrics.go#L277
Expand Down

0 comments on commit 0822753

Please sign in to comment.