Skip to content

Commit

Permalink
[chore][exporter/elasticsearch] Refactor data point value handling (#…
Browse files Browse the repository at this point in the history
…35298)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Refactor data point value handling for readability. No behavior change.

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored and jriguera committed Oct 4, 2024
1 parent 7a1681b commit b59c7cd
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 81 deletions.
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/data_stream_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestRouteDataPoint(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeDataPoint(pmetric.NewNumberDataPoint(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
ds := routeDataPoint(numberDataPoint{pmetric.NewNumberDataPoint()}, plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
assert.Equal(t, tc.want, ds)
})
}
Expand Down
31 changes: 7 additions & 24 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (e *elasticsearchExporter) pushMetricsData(
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)

upsertDataPoint := func(dp dataPoint, dpValue pcommon.Value) error {
upsertDataPoint := func(dp dataPoint) error {
fIndex, err := e.getMetricDataPointIndex(resource, scope, dp)
if err != nil {
return err
Expand All @@ -212,7 +212,7 @@ func (e *elasticsearchExporter) pushMetricsData(
}

if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource,
resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp, dpValue); err != nil {
resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp); err != nil {
return err
}
return nil
Expand All @@ -223,12 +223,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val, err := numberToValue(dp)
if err != nil {
errs = append(errs, err)
continue
}
if err := upsertDataPoint(dp, val); err != nil {
if err := upsertDataPoint(numberDataPoint{dp}); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -237,12 +232,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val, err := numberToValue(dp)
if err != nil {
errs = append(errs, err)
continue
}
if err := upsertDataPoint(dp, val); err != nil {
if err := upsertDataPoint(numberDataPoint{dp}); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -251,8 +241,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val := exponentialHistogramToValue(dp)
if err := upsertDataPoint(dp, val); err != nil {
if err := upsertDataPoint(exponentialHistogramDataPoint{dp}); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -261,12 +250,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val, err := histogramToValue(dp)
if err != nil {
errs = append(errs, err)
continue
}
if err := upsertDataPoint(dp, val); err != nil {
if err := upsertDataPoint(histogramDataPoint{dp}); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -275,8 +259,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val := summaryToValue(dp)
if err := upsertDataPoint(dp, val); err != nil {
if err := upsertDataPoint(summaryDataPoint{dp}); err != nil {
errs = append(errs, err)
continue
}
Expand Down
141 changes: 90 additions & 51 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint, pcommon.Value) error
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint) error
encodeDocument(objmodel.Document) ([]byte, error)
}

Expand All @@ -87,6 +87,8 @@ type dataPoint interface {
Timestamp() pcommon.Timestamp
StartTimestamp() pcommon.Timestamp
Attributes() pcommon.Map
Value() (pcommon.Value, error)
DynamicTemplate(pmetric.Metric) string
}

const (
Expand Down Expand Up @@ -242,19 +244,24 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error)
}

// upsertMetricDataPointValue upserts a datapoint value to documents which is already hashed by resource and index
func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint) error {
switch m.mode {
case MappingOTel:
return m.upsertMetricDataPointValueOTelMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp, value)
return m.upsertMetricDataPointValueOTelMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp)
case MappingECS:
return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp, value)
return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp)
default:
// Defaults to ECS for backward compatibility
return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp, value)
return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp)
}
}

func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ string, _ pcommon.InstrumentationScope, _ string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ string, _ pcommon.InstrumentationScope, _ string, metric pmetric.Metric, dp dataPoint) error {
value, err := dp.Value()
if err != nil {
return err
}

hash := metricECSHash(dp.Timestamp(), dp.Attributes())
var (
document objmodel.Document
Expand All @@ -272,7 +279,11 @@ func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]obj
return nil
}

func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint) error {
value, err := dp.Value()
if err != nil {
return err
}
// documents is per-resource. Therefore, there is no need to hash resource attributes
hash := metricOTelHash(dp, scope.Attributes(), metric.Unit())
var (
Expand Down Expand Up @@ -302,61 +313,39 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob
// TODO: support quantiles
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34561

document.AddDynamicTemplate("metrics."+metric.Name(), metricDpToDynamicTemplate(metric, dp))
// DynamicTemplate returns the name of dynamic template that applies to the metric and data point,
// so that the field is indexed into Elasticsearch with the correct mapping. The name should correspond to a
// dynamic template that is defined in ES mapping, e.g.
// https://github.com/elastic/elasticsearch/blob/8.15/x-pack/plugin/core/template-resources/src/main/resources/metrics%40mappings.json
document.AddDynamicTemplate("metrics."+metric.Name(), dp.DynamicTemplate(metric))
documents[hash] = document
return nil
}

// metricDpToDynamicTemplate returns the name of dynamic template that applies to the metric and data point,
// so that the field is indexed into Elasticsearch with the correct mapping. The name should correspond to a
// dynamic template that is defined in ES mapping, e.g.
// https://github.com/elastic/elasticsearch/blob/8.15/x-pack/plugin/core/template-resources/src/main/resources/metrics%40mappings.json
func metricDpToDynamicTemplate(metric pmetric.Metric, dp dataPoint) string {
switch metric.Type() {
case pmetric.MetricTypeSum:
switch dp.(pmetric.NumberDataPoint).ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
if metric.Sum().IsMonotonic() {
return "counter_double"
}
return "gauge_double"
case pmetric.NumberDataPointValueTypeInt:
if metric.Sum().IsMonotonic() {
return "counter_long"
}
return "gauge_long"
default:
return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue
}
case pmetric.MetricTypeGauge:
switch dp.(pmetric.NumberDataPoint).ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
return "gauge_double"
case pmetric.NumberDataPointValueTypeInt:
return "gauge_long"
default:
return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue
}
case pmetric.MetricTypeHistogram, pmetric.MetricTypeExponentialHistogram:
return "histogram"
case pmetric.MetricTypeSummary:
return "summary_metrics"
}
return ""
type summaryDataPoint struct {
pmetric.SummaryDataPoint
}

func summaryToValue(dp pmetric.SummaryDataPoint) pcommon.Value {
func (dp summaryDataPoint) Value() (pcommon.Value, error) {
// TODO: Add support for quantiles
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34561
vm := pcommon.NewValueMap()
m := vm.Map()
m.PutDouble("sum", dp.Sum())
m.PutInt("value_count", int64(dp.Count()))
return vm
return vm, nil
}

func exponentialHistogramToValue(dp pmetric.ExponentialHistogramDataPoint) pcommon.Value {
counts, values := exphistogram.ToTDigest(dp)
func (dp summaryDataPoint) DynamicTemplate(_ pmetric.Metric) string {
return "summary_metrics"
}

type exponentialHistogramDataPoint struct {
pmetric.ExponentialHistogramDataPoint
}

func (dp exponentialHistogramDataPoint) Value() (pcommon.Value, error) {
counts, values := exphistogram.ToTDigest(dp.ExponentialHistogramDataPoint)

vm := pcommon.NewValueMap()
m := vm.Map()
Expand All @@ -371,7 +360,23 @@ func exponentialHistogramToValue(dp pmetric.ExponentialHistogramDataPoint) pcomm
vmValues.AppendEmpty().SetDouble(v)
}

return vm
return vm, nil
}

func (dp exponentialHistogramDataPoint) DynamicTemplate(_ pmetric.Metric) string {
return "histogram"
}

type histogramDataPoint struct {
pmetric.HistogramDataPoint
}

func (dp histogramDataPoint) Value() (pcommon.Value, error) {
return histogramToValue(dp.HistogramDataPoint)
}

func (dp histogramDataPoint) DynamicTemplate(_ pmetric.Metric) string {
return "histogram"
}

func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
Expand Down Expand Up @@ -422,9 +427,11 @@ func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
return vm, nil
}

var errInvalidNumberDataPoint = errors.New("invalid number data point")
type numberDataPoint struct {
pmetric.NumberDataPoint
}

func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) {
func (dp numberDataPoint) Value() (pcommon.Value, error) {
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
value := dp.DoubleValue()
Expand All @@ -438,6 +445,38 @@ func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) {
return pcommon.Value{}, errInvalidNumberDataPoint
}

func (dp numberDataPoint) DynamicTemplate(metric pmetric.Metric) string {
switch metric.Type() {
case pmetric.MetricTypeSum:
switch dp.NumberDataPoint.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
if metric.Sum().IsMonotonic() {
return "counter_double"
}
return "gauge_double"
case pmetric.NumberDataPointValueTypeInt:
if metric.Sum().IsMonotonic() {
return "counter_long"
}
return "gauge_long"
default:
return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue
}
case pmetric.MetricTypeGauge:
switch dp.NumberDataPoint.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
return "gauge_double"
case pmetric.NumberDataPointValueTypeInt:
return "gauge_long"
default:
return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue
}
}
return ""
}

var errInvalidNumberDataPoint = errors.New("invalid number data point")

func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, stringifyArrayValues bool) {
resourceMapVal := pcommon.NewValueMap()
resourceMap := resourceMapVal.Map()
Expand Down
9 changes: 4 additions & 5 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,15 @@ func TestEncodeMetric(t *testing.T) {

var docsBytes [][]byte
for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ {
val, err := numberToValue(metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i))
require.NoError(t, err)
err = model.upsertMetricDataPointValue(docs,
err := model.upsertMetricDataPointValue(
docs,
metrics.ResourceMetrics().At(0).Resource(),
"",
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(),
"",
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0),
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i),
val)
numberDataPoint{metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)},
)
require.NoError(t, err)
}

Expand Down

0 comments on commit b59c7cd

Please sign in to comment.