diff --git a/.chloggen/elasticsearchexporter_receiver-based-routing.yaml b/.chloggen/elasticsearchexporter_receiver-based-routing.yaml new file mode 100644 index 000000000000..85101276c2b1 --- /dev/null +++ b/.chloggen/elasticsearchexporter_receiver-based-routing.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement receiver-based routing under *_dynamic_index config + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34246] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 083aa2826182..bec554f2bac1 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -121,7 +121,7 @@ This can be customised through the following settings: - `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. @@ -129,13 +129,13 @@ This can be customised through the following settings: - `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. ⚠️ Note that metrics support is currently in development. - `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. - `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`. - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index 851bb92d9756..df9b17c6cc6e 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -5,18 +5,20 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "fmt" + "regexp" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/ptrace" ) +var receiverRegex = regexp.MustCompile(`/receiver/(\w*receiver)`) + func routeWithDefaults(defaultDSType string) func( pcommon.Map, pcommon.Map, pcommon.Map, string, bool, + string, ) string { return func( recordAttr pcommon.Map, @@ -24,11 +26,13 @@ func routeWithDefaults(defaultDSType string) func( resourceAttr pcommon.Map, fIndex string, otel bool, + scopeName string, ) string { // Order: // 1. read data_stream.* from attributes // 2. read elasticsearch.index.* from attributes - // 3. use default hardcoded data_stream.* + // 3. receiver-based routing + // 4. use default hardcoded data_stream.* dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr) namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) dataStreamMode := datasetExists || namespaceExists @@ -40,8 +44,17 @@ func routeWithDefaults(defaultDSType string) func( } } + // Receiver-based routing + // For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode) + // for the scope name + // github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper + if submatch := receiverRegex.FindStringSubmatch(scopeName); len(submatch) > 0 { + receiverName := submatch[1] + dataset = receiverName + } + // The naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]". - // This is in order to match the soon to be built-in logs-*.otel-* index template. + // This is in order to match the built-in logs-*.otel-* index template. if otel { dataset += ".otel" } @@ -53,55 +66,20 @@ func routeWithDefaults(defaultDSType string) func( } } -// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes and prefix/suffix attributes. -// This function may mutate record attributes. -func routeLogRecord( - record plog.LogRecord, - scope pcommon.InstrumentationScope, - resource pcommon.Resource, - fIndex string, - otel bool, -) string { - route := routeWithDefaults(defaultDataStreamTypeLogs) - return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) -} +var ( + // routeLogRecord returns the name of the index to send the log record to according to data stream routing related attributes. + // This function may mutate record attributes. + routeLogRecord = routeWithDefaults(defaultDataStreamTypeLogs) -// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes. -// This function may mutate record attributes. -func routeDataPoint( - dataPoint dataPoint, - scope pcommon.InstrumentationScope, - resource pcommon.Resource, - fIndex string, - otel bool, -) string { - route := routeWithDefaults(defaultDataStreamTypeMetrics) - return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) -} + // routeDataPoint returns the name of the index to send the data point to according to data stream routing related attributes. + // This function may mutate record attributes. + routeDataPoint = routeWithDefaults(defaultDataStreamTypeMetrics) -// routeSpan returns the name of the index to send the span to according to data stream routing attributes. -// This function may mutate record attributes. -func routeSpan( - span ptrace.Span, - scope pcommon.InstrumentationScope, - resource pcommon.Resource, - fIndex string, - otel bool, -) string { - route := routeWithDefaults(defaultDataStreamTypeTraces) - return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) -} + // routeSpan returns the name of the index to send the span to according to data stream routing related attributes. + // This function may mutate record attributes. + routeSpan = routeWithDefaults(defaultDataStreamTypeTraces) -// routeSpanEvent returns the name of the index to send the span event to according to data stream routing attributes. -// This function may mutate record attributes. -func routeSpanEvent( - spanEvent ptrace.SpanEvent, - scope pcommon.InstrumentationScope, - resource pcommon.Resource, - fIndex string, - otel bool, -) string { - // span events are sent to logs-*, not traces-* - route := routeWithDefaults(defaultDataStreamTypeLogs) - return route(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) -} + // routeSpanEvent returns the name of the index to send the span event to according to data stream routing related attributes. + // This function may mutate record attributes. + routeSpanEvent = routeWithDefaults(defaultDataStreamTypeLogs) +) diff --git a/exporter/elasticsearchexporter/data_stream_router_test.go b/exporter/elasticsearchexporter/data_stream_router_test.go index 0d64a6b2184a..81450da4d7a1 100644 --- a/exporter/elasticsearchexporter/data_stream_router_test.go +++ b/exporter/elasticsearchexporter/data_stream_router_test.go @@ -8,70 +8,90 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/pcommon" ) -type routeTestInfo struct { - name string - otel bool - want string +type routeTestCase struct { + name string + otel bool + scopeName string + want string } -func createRouteTests(dsType string) []routeTestInfo { - renderWantRoute := func(dsType string, otel bool) string { +func createRouteTests(dsType string) []routeTestCase { + renderWantRoute := func(dsType, dsDataset string, otel bool) string { if otel { - return fmt.Sprintf("%s-%s.otel-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace) + return fmt.Sprintf("%s-%s.otel-%s", dsType, dsDataset, defaultDataStreamNamespace) } - return fmt.Sprintf("%s-%s-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace) + return fmt.Sprintf("%s-%s-%s", dsType, dsDataset, defaultDataStreamNamespace) } - return []routeTestInfo{ + return []routeTestCase{ { name: "default", otel: false, - want: renderWantRoute(dsType, false), + want: renderWantRoute(dsType, defaultDataStreamDataset, false), }, { name: "otel", otel: true, - want: renderWantRoute(dsType, true), + want: renderWantRoute(dsType, defaultDataStreamDataset, true), + }, + { + name: "default with receiver scope name", + otel: false, + scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper", + want: renderWantRoute(dsType, "hostmetricsreceiver", false), + }, + { + name: "otel with receiver scope name", + otel: true, + scopeName: "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper", + want: renderWantRoute(dsType, "hostmetricsreceiver", true), + }, + { + name: "default with non-receiver scope name", + otel: false, + scopeName: "some_other_scope_name", + want: renderWantRoute(dsType, defaultDataStreamDataset, false), + }, + { + name: "otel with non-receiver scope name", + otel: true, + scopeName: "some_other_scope_name", + want: renderWantRoute(dsType, defaultDataStreamDataset, true), }, } } func TestRouteLogRecord(t *testing.T) { - tests := createRouteTests(defaultDataStreamTypeLogs) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ds := routeLogRecord(plog.NewLogRecord(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel) + ds := routeLogRecord(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName) assert.Equal(t, tc.want, ds) }) } } func TestRouteDataPoint(t *testing.T) { - tests := createRouteTests(defaultDataStreamTypeMetrics) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ds := routeDataPoint(numberDataPoint{pmetric.NewNumberDataPoint()}, plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel) + ds := routeDataPoint(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName) assert.Equal(t, tc.want, ds) }) } } func TestRouteSpan(t *testing.T) { - tests := createRouteTests(defaultDataStreamTypeTraces) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ds := routeSpan(ptrace.NewSpan(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel) + ds := routeSpan(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName) assert.Equal(t, tc.want, ds) }) } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 3c4725cac266..7b493801df8e 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -156,7 +156,7 @@ func (e *elasticsearchExporter) pushLogRecord( ) error { fIndex := e.index if e.dynamicIndex { - fIndex = routeLogRecord(record, scope, resource, fIndex, e.otel) + fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) } if e.logstashFormat.Enabled { @@ -305,7 +305,7 @@ func (e *elasticsearchExporter) getMetricDataPointIndex( ) (string, error) { fIndex := e.index if e.dynamicIndex { - fIndex = routeDataPoint(dataPoint, scope, resource, fIndex, e.otel) + fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) } if e.logstashFormat.Enabled { @@ -379,7 +379,7 @@ func (e *elasticsearchExporter) pushTraceRecord( ) error { fIndex := e.index if e.dynamicIndex { - fIndex = routeSpan(span, scope, resource, fIndex, e.otel) + fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, span.Name()) } if e.logstashFormat.Enabled { @@ -409,7 +409,7 @@ func (e *elasticsearchExporter) pushSpanEvent( ) error { fIndex := e.index if e.dynamicIndex { - fIndex = routeSpanEvent(spanEvent, scope, resource, fIndex, e.otel) + fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) } if e.logstashFormat.Enabled { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index e0e719586b61..5ce8a04115e8 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -1055,7 +1055,7 @@ func TestEncodeLogOtelMode(t *testing.T) { record, scope, resource := createTestOTelLogRecord(t, tc.rec) // This sets the data_stream values default or derived from the record/scope/resources - routeLogRecord(record, scope, resource, "", true) + routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), "", true, scope.Name()) b, err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL) require.NoError(t, err)