Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Use confighttp.ClientConfig #33367

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .chloggen/elasticsearchexporter-confighttp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for confighttp options.
axw marked this conversation as resolved.
Show resolved Hide resolved

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33367]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Add support for confighttp and related configuration settings, such as "auth".
This change also means that the Elasticsearch URL may be specified as "endpoint",
like the otlphttp exporter.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
167 changes: 100 additions & 67 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,74 @@ This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](

## Configuration options

- `endpoints`: List of Elasticsearch URLs. If `endpoints` and `cloudid` are missing, the
ELASTICSEARCH_URL environment variable will be used.
- `cloudid` (optional):
[ID](https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html) of the
Elastic Cloud Cluster to publish events to. The `cloudid` can be used instead
of `endpoints`.
- `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently.
Exactly one of the following settings is required:

- `endpoint` (no default): The target Elasticsearch URL to which data will be sent
(e.g. `https://elasticsearch:9200`)
- `endpoints` (no default): A list of Elasticsearch URLs to which data will be sent
axw marked this conversation as resolved.
Show resolved Hide resolved
- `cloudid` (no default): The [Elastic Cloud ID](https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html)
of the Elastic Cloud Cluster to which data will be sent (e.g. `foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY=`)

If none of the above attributes are specified, one or more Elasticsearch URLs
axw marked this conversation as resolved.
Show resolved Hide resolved
may be specified as a comma-separated list via the `ELASTICSEARCH_URL`
environment variable.

Elasticsearch credentials may be configured via [Authentication configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration) (`configauth`) settings.
As a shortcut, the following settings are also supported:

- `user` (optional): Username used for HTTP Basic Authentication.
- `password` (optional): Password used for HTTP Basic Authentication.
- `api_key` (optional): Authorization [API Key](https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html) in "encoded" format.

Example:

```yaml
exporters:
elasticsearch:
endpoint: https://elastic.example.com:9200
auth:
authenticator: basicauth

extensions:
basicauth:
username: elastic
password: changeme

······

service:
extensions: [basicauth]
pipelines:
logs:
receivers: [otlp]
processors: [batch]
exporters: [elasticsearch/log]
axw marked this conversation as resolved.
Show resolved Hide resolved
traces:
receivers: [otlp]
exporters: [elasticsearch/trace]
processors: [batch]
axw marked this conversation as resolved.
Show resolved Hide resolved
```

## Advanced configuration
axw marked this conversation as resolved.
Show resolved Hide resolved

### HTTP settings

The Elasticsearch exporter supports common [HTTP Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp#http-configuration-settings), except for `compression` (all requests are gzip-compressed).
axw marked this conversation as resolved.
Show resolved Hide resolved
As a consequence of supporting `confighttp`, the Elasticsearch exporter also supports common [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings).

The Elasticsearch exporter sets `timeout` (HTTP request timeout) to 90s by default.
All other defaults are as defined by `confighttp`.
axw marked this conversation as resolved.
Show resolved Hide resolved

### Queuing

The Elasticsearch exporter supports the common [`sending_queue` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), however the sending queue is currently disabled by default.
axw marked this conversation as resolved.
Show resolved Hide resolved

### Elasticsearch document routing

Telemetry data will be written to signal specific data streams by default:
logs will be written to `logs-generic-default`, and traces will be written to
axw marked this conversation as resolved.
Show resolved Hide resolved
`traces-generic-default`. This can be customised through the following settings:

- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The
[index](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html)
or [data stream](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html)
Expand All @@ -49,16 +110,12 @@ This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](
The last string appended belongs to the date when the data is being generated.
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.
- `pipeline` (optional): Optional [Ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) ID used for processing documents published by the exporter.
- `flush`: Event bulk indexer buffer flush settings
- `bytes` (default=5000000): Write buffer flush size limit.
- `interval` (default=30s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
- `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.
- `max_requests` (default=3): Number of HTTP request retries.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429, 500, 502, 503, 504]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it is recommended to set it to `[429]`. WARNING: The default will be changed to `[429]` in the future.

### Elasticsearch document mapping

The Elasticsearch exporter supports several document schemas and preprocessing
behaviours, which may be configured throug the following settings:

- `mapping`: Events are encoded to JSON. The `mapping` allows users to
configure additional mapping rules.
- `mode` (default=none): The fields naming mode. valid modes are:
Expand All @@ -77,65 +134,41 @@ This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](
will reject documents that have duplicate fields.
- `dedot` (default=true): When enabled attributes with `.` will be split into
proper json objects.
- `sending_queue`
- `enabled` (default = false)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 1000): Maximum number of batches kept in queue; ignored if `enabled` is `false`;
### HTTP settings

- `read_buffer_size` (default=0): Read buffer size of HTTP client.
- `write_buffer_size` (default=0): Write buffer size of HTTP client.
- `timeout` (default=90s): HTTP request time limit.
- `headers` (optional): Headers to be sent with each HTTP request.
### Elasticsearch ingest pipeline

### Security and Authentication settings
Documents may be optionally passed through an Elasticsearch
[Ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) prior to indexing.
This can be configured through the following settings:

- `user` (optional): Username used for HTTP Basic Authentication.
- `password` (optional): Password used for HTTP Basic Authentication.
- `api_key` (optional): Authorization [API Key](https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html) in "encoded" format.
- `pipeline` (optional): Optional [Ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) ID used for processing documents published by the exporter.

### TLS settings
- `ca_file` (optional): Root Certificate Authority (CA) certificate, for
verifying the server's identity, if TLS is enabled.
- `cert_file` (optional): Client TLS certificate.
- `key_file` (optional): Client TLS key.
- `insecure` (optional): In gRPC when set to true, this is used to disable the client transport security. In HTTP, this disables verifying the server's certificate chain and host name.
- `insecure_skip_verify` (optional): Will enable TLS but not verify the certificate.
### Elasticsearch bulk indexing

### Node Discovery
The Elasticsearch exporter uses the Elasticsearch
[Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) for indexing documents.
The behaviour of this bulk indexing can be configured with the following settings:

The Elasticsearch Exporter will check Elasticsearch regularly for available
nodes and updates the list of hosts if discovery is enabled. Newly discovered
nodes will automatically be used for load balancing.
- `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently.
- `flush`: Event bulk indexer buffer flush settings
- `bytes` (default=5000000): Write buffer flush size limit.
- `interval` (default=30s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
- `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.
- `max_requests` (default=3): Number of HTTP request retries.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429, 500, 502, 503, 504]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it is recommended to set it to `[429]`. WARNING: The default will be changed to `[429]` in the future.

### Elasticsearch node discovery

The Elasticsearch Exporter will regularly check Elasticsearch for available nodes.
Newly discovered nodes will automatically be used for load balancing.
Settings related to node discovery are:

- `discover`:
- `on_start` (optional): If enabled the exporter queries Elasticsearch
for all known nodes in the cluster on startup.
- `interval` (optional): Interval to update the list of Elasticsearch nodes.

## Example

```yaml
exporters:
elasticsearch/trace:
endpoints: [https://elastic.example.com:9200]
traces_index: trace_index
elasticsearch/log:
endpoints: [http://localhost:9200]
logs_index: my_log_index
sending_queue:
enabled: true
num_consumers: 20
queue_size: 1000
······
service:
pipelines:
logs:
receivers: [otlp]
processors: [batch]
exporters: [elasticsearch/log]
traces:
receivers: [otlp]
exporters: [elasticsearch/trace]
processors: [batch]
```
Node discovery can be disabled by setting `discover.interval` to 0.
axw marked this conversation as resolved.
Show resolved Hide resolved
112 changes: 58 additions & 54 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

Expand Down Expand Up @@ -58,12 +58,13 @@ type Config struct {
// https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
Pipeline string `mapstructure:"pipeline"`

ClientConfig `mapstructure:",squash"`
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"`
confighttp.ClientConfig `mapstructure:",squash"`
Authentication AuthenticationSettings `mapstructure:",squash"`
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"`
}

type LogstashFormatSettings struct {
Expand All @@ -76,25 +77,6 @@ type DynamicIndexSetting struct {
Enabled bool `mapstructure:"enabled"`
}

type ClientConfig struct {
Authentication AuthenticationSettings `mapstructure:",squash"`

// ReadBufferSize for HTTP client. See http.Transport.ReadBufferSize.
ReadBufferSize int `mapstructure:"read_buffer_size"`

// WriteBufferSize for HTTP client. See http.Transport.WriteBufferSize.
WriteBufferSize int `mapstructure:"write_buffer_size"`

// Timeout configures the HTTP request timeout.
Timeout time.Duration `mapstructure:"timeout"`

// Headers allows users to configure optional HTTP headers that
// will be send with each HTTP request.
Headers map[string]string `mapstructure:"headers,omitempty"`

configtls.ClientConfig `mapstructure:"tls,omitempty"`
}

// AuthenticationSettings defines user authentication related settings.
type AuthenticationSettings struct {
// User is used to configure HTTP Basic Authentication.
Expand Down Expand Up @@ -184,9 +166,8 @@ const (
)

var (
errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified")
errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries")
errConfigCloudIDMutuallyExclusive = errors.New("only one of endpoints or cloudid may be specified")
errConfigEndpointRequired = errors.New("exactly one of [endpoint, endpoints, cloudid] must be specified")
errConfigEmptyEndpoint = errors.New("endpoint must not be empty")
)

func (m MappingMode) String() string {
Expand Down Expand Up @@ -223,32 +204,11 @@ const defaultElasticsearchEnvName = "ELASTICSEARCH_URL"

// Validate validates the elasticsearch server configuration.
func (cfg *Config) Validate() error {
if len(cfg.Endpoints) == 0 && cfg.CloudID == "" {
v := os.Getenv(defaultElasticsearchEnvName)
if v == "" {
return errConfigNoEndpoint
}
for _, endpoint := range strings.Split(v, ",") {
endpoint = strings.TrimSpace(endpoint)
if err := validateEndpoint(endpoint); err != nil {
return fmt.Errorf("invalid endpoint %q: %w", endpoint, err)
}
}
}

if cfg.CloudID != "" {
if len(cfg.Endpoints) > 0 {
return errConfigCloudIDMutuallyExclusive
}
if _, err := parseCloudID(cfg.CloudID); err != nil {
return err
}
endpoints, err := cfg.endpoints()
if err != nil {
return err
}

for _, endpoint := range cfg.Endpoints {
if endpoint == "" {
return errConfigEmptyEndpoint
}
for _, endpoint := range endpoints {
if err := validateEndpoint(endpoint); err != nil {
return fmt.Errorf("invalid endpoint %q: %w", endpoint, err)
}
Expand All @@ -258,10 +218,54 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode)
}

if cfg.Compression != "" {
// TODO support confighttp.ClientConfig.Compression
return errors.New("compression is not currently configurable")
}
return nil
}

func (cfg *Config) endpoints() ([]string, error) {
// Exactly one of endpoint, endpoints, or cloudid must be configured.
// If none are set, then $ELASTICSEARCH_URL may be specified instead.
var endpoints []string
var numEndpointConfigs int
if cfg.Endpoint != "" {
numEndpointConfigs++
endpoints = []string{cfg.Endpoint}
}
if len(cfg.Endpoints) > 0 {
numEndpointConfigs++
endpoints = cfg.Endpoints
}
if cfg.CloudID != "" {
numEndpointConfigs++
u, err := parseCloudID(cfg.CloudID)
if err != nil {
return nil, err
}
endpoints = []string{u.String()}
}
if numEndpointConfigs == 0 {
if v := os.Getenv(defaultElasticsearchEnvName); v != "" {
numEndpointConfigs++
endpoints = strings.Split(v, ",")
for i, endpoint := range endpoints {
endpoints[i] = strings.TrimSpace(endpoint)
}
}
}
if numEndpointConfigs != 1 {
return nil, errConfigEndpointRequired
}
return endpoints, nil
}

func validateEndpoint(endpoint string) error {
if endpoint == "" {
return errConfigEmptyEndpoint
}

u, err := url.Parse(endpoint)
if err != nil {
return err
Expand Down
Loading