Skip to content

Commit

Permalink
Merge branch 'main' into renovate/github.com-clickhouse-clickhouse-go…
Browse files Browse the repository at this point in the history
…-v2-2.x
  • Loading branch information
songy23 authored Jun 25, 2024
2 parents bb39915 + 8521e0b commit 1994ffd
Show file tree
Hide file tree
Showing 26 changed files with 877 additions and 39 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add-metrics-to-elasticsearch-exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add initial support for metrics

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33513]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 2 additions & 2 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.6.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5 v5.5.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 // indirect
Expand Down Expand Up @@ -821,7 +821,7 @@ require (
k8s.io/api v0.29.3 // indirect
k8s.io/apimachinery v0.29.3 // indirect
k8s.io/client-go v0.29.3 // indirect
k8s.io/klog/v2 v2.130.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/kubelet v0.29.3 // indirect
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/telemetrygen/internal/e2etest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetryge
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.103.0
Expand Down
21 changes: 20 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: traces, logs |
| Stability | [development]: metrics |
| | [beta]: traces, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Felasticsearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Felasticsearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Felasticsearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Felasticsearch) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@JaredTan95](https://www.github.com/JaredTan95), [@ycombinator](https://www.github.com/ycombinator), [@carsonip](https://www.github.com/carsonip) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->
Expand Down Expand Up @@ -91,6 +93,13 @@ This can be customised through the following settings:
takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute)
- `enabled`(default=false): Enable/Disable dynamic index for log records
- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
⚠️ Note that metrics support is currently in development.
- `metrics_dynamic_index` (optional):
takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `metrics_index`.
⚠️ Note that metrics support is currently in development.
- `enabled`(default=false): Enable/Disable dynamic index for metrics
- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`.
- `traces_dynamic_index` (optional):
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
Expand Down Expand Up @@ -170,6 +179,16 @@ Settings related to node discovery are:

Node discovery can be disabled by setting `discover.interval` to 0.

## Exporting metrics

Metrics support is currently in development.
The only metric types supported are:

- Gauge
- Sum

Other metric types (Histogram, Exponential Histogram, Summary) are ignored.

[confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings
[configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings
[configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration
Expand Down
10 changes: 10 additions & 0 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type attrGetter interface {
}

// retrieve attribute out of resource, scope, and record (span or log, if not found in resource)
// Deprecated: Use getFromAttributesNew instead.
func getFromAttributes(name string, resource, scope, record attrGetter) string {
var str string
val, exist := resource.Attributes().Get(name)
Expand All @@ -37,3 +38,12 @@ func getFromAttributes(name string, resource, scope, record attrGetter) string {
}
return str
}

func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string {
for _, attributeMap := range attributeMaps {
if value, exists := attributeMap.Get(name); exists {
return value.AsString()
}
}
return defaultValue
}
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type Config struct {
LogsIndex string `mapstructure:"logs_index"`
// fall back to pure LogsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute)
LogsDynamicIndex DynamicIndexSetting `mapstructure:"logs_dynamic_index"`

// This setting is required when the exporter is used in a metrics pipeline.
MetricsIndex string `mapstructure:"metrics_index"`
// fall back to pure MetricsIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource attributes
MetricsDynamicIndex DynamicIndexSetting `mapstructure:"metrics_dynamic_index"`

// This setting is required when traces pipelines used.
TracesIndex string `mapstructure:"traces_index"`
// fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute)
Expand Down
75 changes: 65 additions & 10 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
MetricsIndex: "metrics-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -109,11 +110,65 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
MetricsIndex: "metrics-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
IdleConnTimeout: &defaultIdleConnTimeout,
Headers: map[string]configopaque.String{
"myheader": "test",
},
},
Authentication: AuthenticationSettings{
User: "elastic",
Password: "search",
APIKey: "AvFsEiPs==",
},
Discovery: DiscoverySettings{
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
},
},
{
id: component.NewIDWithName(metadata.Type, "metric"),
configFile: "config.yaml",
expected: &Config{
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
MetricsIndex: "my_metric_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down
63 changes: 61 additions & 2 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -132,11 +133,69 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom

document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
return fmt.Errorf("failed to encode log event: %w", err)
}
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
}

func (e *elasticsearchExporter) pushMetricsData(
ctx context.Context,
metrics pmetric.Metrics,
) error {
var errs []error

resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
resourceMetric := resourceMetrics.At(i)
resource := resourceMetric.Resource()
scopeMetrics := resourceMetric.ScopeMetrics()
for j := 0; j < scopeMetrics.Len(); j++ {
scope := scopeMetrics.At(j).Scope()
metricSlice := scopeMetrics.At(j).Metrics()

if err := e.pushMetricSlice(ctx, resource, metricSlice, scope); err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}

errs = append(errs, err)
}

}
}

return errors.Join(errs...)
}

func (e *elasticsearchExporter) pushMetricSlice(
ctx context.Context,
resource pcommon.Resource,
slice pmetric.MetricSlice,
scope pcommon.InstrumentationScope,
) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromAttributesNew(indexPrefix, "", resource.Attributes())
suffix := getFromAttributesNew(indexSuffix, "", resource.Attributes())

fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

documents, err := e.model.encodeMetrics(resource, slice, scope)
if err != nil {
return fmt.Errorf("failed to encode a metric event: %w", err)
}

for _, document := range documents {
err := pushDocuments(ctx, fIndex, document, e.bulkIndexer)
if err != nil {
return err
}
}

return nil
}

func (e *elasticsearchExporter) pushTraceData(
ctx context.Context,
td ptrace.Traces,
Expand Down Expand Up @@ -185,7 +244,7 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc

document, err := e.model.encodeSpan(resource, span, scope)
if err != nil {
return fmt.Errorf("Failed to encode trace record: %w", err)
return fmt.Errorf("failed to encode trace record: %w", err)
}
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
}
Loading

0 comments on commit 1994ffd

Please sign in to comment.