Skip to content

Commit

Permalink
[exporter/elasticsearch] Implement receiver-based routing under *_dyn…
Browse files Browse the repository at this point in the history
…amic_index config (open-telemetry#35417)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Implement receiver-based routing under *_dynamic_index config.

e.g. Set data_stream.dataset to hostmetricsreceiver (or
hostmetricsreceiver.otel in the OTel output mode) for the scope name
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper

**Link to tracking Issue:** <Issue number if applicable>
Fixes open-telemetry#34246

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored and jriguera committed Oct 4, 2024
1 parent fa8f232 commit ae6d95c
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 81 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_receiver-based-routing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement receiver-based routing under *_dynamic_index config

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34246]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ This can be customised through the following settings:

- `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
⚠️ Note that metrics support is currently in development.

- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name.
⚠️ Note that metrics support is currently in development.
- `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`.

- `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`.
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`.

- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format.
- `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix,
Expand Down
84 changes: 31 additions & 53 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,34 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry

import (
"fmt"
"regexp"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var receiverRegex = regexp.MustCompile(`/receiver/(\w*receiver)`)

func routeWithDefaults(defaultDSType string) func(
pcommon.Map,
pcommon.Map,
pcommon.Map,
string,
bool,
string,
) string {
return func(
recordAttr pcommon.Map,
scopeAttr pcommon.Map,
resourceAttr pcommon.Map,
fIndex string,
otel bool,
scopeName string,
) string {
// Order:
// 1. read data_stream.* from attributes
// 2. read elasticsearch.index.* from attributes
// 3. use default hardcoded data_stream.*
// 3. receiver-based routing
// 4. use default hardcoded data_stream.*
dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr)
dataStreamMode := datasetExists || namespaceExists
Expand All @@ -40,8 +44,17 @@ func routeWithDefaults(defaultDSType string) func(
}
}

// Receiver-based routing
// For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode)
// for the scope name
// github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper
if submatch := receiverRegex.FindStringSubmatch(scopeName); len(submatch) > 0 {
receiverName := submatch[1]
dataset = receiverName
}

// The naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]".
// This is in order to match the soon to be built-in logs-*.otel-* index template.
// This is in order to match the built-in logs-*.otel-* index template.
if otel {
dataset += ".otel"
}
Expand All @@ -53,55 +66,20 @@ func routeWithDefaults(defaultDSType string) func(
}
}

// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes and prefix/suffix attributes.
// This function may mutate record attributes.
func routeLogRecord(
record plog.LogRecord,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeLogs)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
var (
// routeLogRecord returns the name of the index to send the log record to according to data stream routing related attributes.
// This function may mutate record attributes.
routeLogRecord = routeWithDefaults(defaultDataStreamTypeLogs)

// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
// This function may mutate record attributes.
func routeDataPoint(
dataPoint dataPoint,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeMetrics)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
// routeDataPoint returns the name of the index to send the data point to according to data stream routing related attributes.
// This function may mutate record attributes.
routeDataPoint = routeWithDefaults(defaultDataStreamTypeMetrics)

// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
// This function may mutate record attributes.
func routeSpan(
span ptrace.Span,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeTraces)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
// routeSpan returns the name of the index to send the span to according to data stream routing related attributes.
// This function may mutate record attributes.
routeSpan = routeWithDefaults(defaultDataStreamTypeTraces)

// routeSpanEvent returns the name of the index to send the span event to according to data stream routing attributes.
// This function may mutate record attributes.
func routeSpanEvent(
spanEvent ptrace.SpanEvent,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
// span events are sent to logs-*, not traces-*
route := routeWithDefaults(defaultDataStreamTypeLogs)
return route(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
// routeSpanEvent returns the name of the index to send the span event to according to data stream routing related attributes.
// This function may mutate record attributes.
routeSpanEvent = routeWithDefaults(defaultDataStreamTypeLogs)
)
60 changes: 40 additions & 20 deletions exporter/elasticsearchexporter/data_stream_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,90 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/pcommon"
)

type routeTestInfo struct {
name string
otel bool
want string
type routeTestCase struct {
name string
otel bool
scopeName string
want string
}

func createRouteTests(dsType string) []routeTestInfo {
renderWantRoute := func(dsType string, otel bool) string {
func createRouteTests(dsType string) []routeTestCase {
renderWantRoute := func(dsType, dsDataset string, otel bool) string {
if otel {
return fmt.Sprintf("%s-%s.otel-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
return fmt.Sprintf("%s-%s.otel-%s", dsType, dsDataset, defaultDataStreamNamespace)
}
return fmt.Sprintf("%s-%s-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
return fmt.Sprintf("%s-%s-%s", dsType, dsDataset, defaultDataStreamNamespace)
}

return []routeTestInfo{
return []routeTestCase{
{
name: "default",
otel: false,
want: renderWantRoute(dsType, false),
want: renderWantRoute(dsType, defaultDataStreamDataset, false),
},
{
name: "otel",
otel: true,
want: renderWantRoute(dsType, true),
want: renderWantRoute(dsType, defaultDataStreamDataset, true),
},
{
name: "default with receiver scope name",
otel: false,
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
want: renderWantRoute(dsType, "hostmetricsreceiver", false),
},
{
name: "otel with receiver scope name",
otel: true,
scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper",
want: renderWantRoute(dsType, "hostmetricsreceiver", true),
},
{
name: "default with non-receiver scope name",
otel: false,
scopeName: "some_other_scope_name",
want: renderWantRoute(dsType, defaultDataStreamDataset, false),
},
{
name: "otel with non-receiver scope name",
otel: true,
scopeName: "some_other_scope_name",
want: renderWantRoute(dsType, defaultDataStreamDataset, true),
},
}
}

func TestRouteLogRecord(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeLogs)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeLogRecord(plog.NewLogRecord(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
ds := routeLogRecord(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName)
assert.Equal(t, tc.want, ds)
})
}
}

func TestRouteDataPoint(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeMetrics)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeDataPoint(numberDataPoint{pmetric.NewNumberDataPoint()}, plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
ds := routeDataPoint(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName)
assert.Equal(t, tc.want, ds)
})
}
}

func TestRouteSpan(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeTraces)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeSpan(ptrace.NewSpan(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
ds := routeSpan(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName)
assert.Equal(t, tc.want, ds)
})
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (e *elasticsearchExporter) pushLogRecord(
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeLogRecord(record, scope, resource, fIndex, e.otel)
fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name())
}

if e.logstashFormat.Enabled {
Expand Down Expand Up @@ -313,7 +313,7 @@ func (e *elasticsearchExporter) getMetricDataPointIndex(
) (string, error) {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeDataPoint(dataPoint, scope, resource, fIndex, e.otel)
fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name())
}

if e.logstashFormat.Enabled {
Expand Down Expand Up @@ -387,7 +387,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpan(span, scope, resource, fIndex, e.otel)
fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, span.Name())
}

if e.logstashFormat.Enabled {
Expand Down Expand Up @@ -417,7 +417,7 @@ func (e *elasticsearchExporter) pushSpanEvent(
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpanEvent(spanEvent, scope, resource, fIndex, e.otel)
fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name())
}

if e.logstashFormat.Enabled {
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ func TestEncodeLogOtelMode(t *testing.T) {
record, scope, resource := createTestOTelLogRecord(t, tc.rec)

// This sets the data_stream values default or derived from the record/scope/resources
routeLogRecord(record, scope, resource, "", true)
routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), "", true, scope.Name())

b, err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL)
require.NoError(t, err)
Expand Down

0 comments on commit ae6d95c

Please sign in to comment.