Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init Elasticsearch exporter #2324

Merged
merged 14 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
82 changes: 82 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Elasticsearch Exporter

This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www.elastic.co/elasticsearch).

## Configuration options

- `urls`: List of Elasticsearch URLS. If urls and cloudid is missing the
urso marked this conversation as resolved.
Show resolved Hide resolved
ELASTICSEARCH_URL environment variable will be used.
- `cloudid` (optional):
[ID](https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html) of the
Elastic Cloud Cluster to publish events to. The `cloudid` can be used instead
of `urls`.
- `workers` (optional): Number of workers publishing bulk requests concurrently.
- `index`: The
[index](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html)
or [datastream](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html)
name to publish events to. The default value is `logs-generic-default`.
- `pipeline` (optional): Optional [Ingest Node](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html)
pipeline ID used for processing documents published by the exporter.
- `flush`: Event bulk buffer flush settings
- `bytes` (default=5242880): Write buffer flush limit.
- `interval` (default=30s): Write buffer time limit.
- `retry`: Event retry settings
- `enabled` (default=true): Enable/Disable event retry on error. Retry
support is enabled by default.
- `max` (default=3): Number of HTTP retry attempts.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `mapping`: Events are encoded to JSON. The `mapping` allows users to
configure additional mapping rules.
- `mode` (default=ecs): The fields naming mode. valid modes are:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields defined in the
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/opentelemetry-specification/tree/main/semantic_conventions)
to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html).
- `fields` (optional): Configure additional fields mappings.
- `file` (optional): Read additional field mappings from the provided YAML file.
- `dedup` (default=true): Try to find and remove duplicate fields/attributes
from events before publishing to Elasticsearch. Some structured logging
libraries can produce duplicate fields (for example zap). Elasticsearch
will reject documents that have duplicate fields.
- `dedot` (default=true): When enabled attributes with `.` will be split into
proper json objects.

### HTTP settings

- `read_buffer_size` (default=0): Read buffer size.
- `write_buffer_size` (default=0): Write buffer size used when.
- `timeout` (default=90s): HTTP request time limit.
- `headers` (optional): Headers to be send with each HTTP request.

### Security and Authentication settings

- `user` (optional): Username used for HTTP Basic Authentication.
- `password` (optional): Password used for HTTP Basic Authentication.
- `api_key` (optional): Authorization [API Key](https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html).
- `ca_file` (optional): Root Certificate Authority (CA) certificate, for
verifying the server's identity, if TLS is enabled.
- `cert_file` (optional): Client TLS certificate.
- `key_file` (optional): Client TLS key.
- `insecure` (optional): Disable verification of the server's identity, if TLS
is enabled.

### Node Discovery

The Elasticsearch Exporter will check Elasticsearch regularily for available
nodes and updates the list of hosts if discovery is enabled. Newly discovered
nodes will automatically be used for load balancing.

- `discover`:
- `on_start` (optional): If enabled the exporter queries Elasticsearch
for all known nodes in the cluster on startup.
- `interval` (optional): Interval to update the list of Elasticsearch nodes.

## Example

```yaml
exporters:
elasticsearch:
urls:
- "https://localhost:9200"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does a deployment typically look like? Is it common to have TLS for one server, but not for another? Do they have different sets of certs, or would a client only need one CA that works for all clients? I'm asking because endpoints are typically only host:port, without the protocol part, using TLS by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it common to have TLS for one server, but not for another?

No, when using TLS one would want to have TLS for the complete cluster.

Do they have different sets of certs, or would a client only need one CA that works for all clients?

Ideally one CA cert.

I'm asking because endpoints are typically only host:port, without the protocol part, using TLS by default.

The exporter also accepts localhost:9200, but I'm not sure if TLS will be automatically enabled or disabled by default. I assume the client library will expand this to http://localhost:9200 if tls is not explicitely configured.

I took otlphttpexporter as example, which also uses a full URL in its example:

exporters:
  otlphttp:
    endpoint: https://example.com:55681/v1/traces

same for zipkin.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bogdandrutu ^

Interesting, the one from OTLP should have been named URL then, instead of Endpoint. Looks good to me then!

```
217 changes: 217 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearchexporter

import (
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtls"
)

// Config defines configuration for Elastic exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"`

// URLs holds the Elasticsearch URLs the exporter should send events to.
//
// This setting is required if CloudID is not set and if the
// ELASTICSEARCH_URL environment variable is not set.
URLs []string `mapstructure:"urls"`

// CloudID holds the cloud ID to identify the Elastic Cloud cluster to send events to.
// https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html
//
// This setting is required if no URL is configured.
CloudID string `mapstructure:"cloudid"`

// Workers configures the number of workers publishing bulk requests.
Workers int `mapstructure:"workers"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider "num_workers" to be consistent with NumConsumers in our sending_queue helper.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Done


// Index configures the index, index alias, or data stream name events should be indexed in.
//
// https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html
// https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html
//
// This setting is required.
Index string `mapstructure:"index"`

// Pipeline configures the ingest node pipeline name that should be used to process the
// events.
//
// https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
Pipeline string `mapstructure:"pipeline"`

HTTPClientSettings `mapstructure:",squash"`
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
}

type HTTPClientSettings struct {
Authentication AuthenticationSettings `mapstructure:",squash"`

// ReadBufferSize for HTTP client. See http.Transport.ReadBufferSize.
ReadBufferSize int `mapstructure:"read_buffer_size"`

// WriteBufferSize for HTTP client. See http.Transport.WriteBufferSize.
WriteBufferSize int `mapstructure:"write_buffer_size"`

// Timeout configures the HTTP request timeout.
Timeout time.Duration `mapstructure:"timeout"`

// Headers allows users to configure optional HTTP headers that
// will be send with each HTTP request.
Headers map[string]string `mapstructure:"headers,omitempty"`

configtls.TLSClientSetting `mapstructure:",squash"`
}

// AuthenticationSettings defines user authentication related settings.
type AuthenticationSettings struct {
// User is used to configure HTTP Basic Authentication.
User string `mapstructure:"user"`

// Password is used to configure HTTP Basic Authentication.
Password string `mapstructure:"password"`

// APIKey is used to configure ApiKey based Authentication.
//
// https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html
APIKey string `mapstructure:"api_key"`
}

// DiscoverySettings defines Elasticsearch node discovery related settings.
// The exporter will check Elasticsearch regularily for available nodes
// and updates the list of hosts if discovery is enabled. Newly discovered
// nodes will automatically be used for load balancing.
//
// DiscoverySettings should not be enabled when operating Elasticsearch behind a proxy
// or load balancer.
//
// https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how
type DiscoverySettings struct {
// OnStart, if set, instructs the exporter to look for available Elasticsearch
// nodes the first time the exporter connects to the cluster.
OnStart bool `mapstructure:"on_start"`

// Interval instructs the exporter to renew the list of Elasticsearch URLs
// with the given interval. URLs will not be updated if Interval is <=0.
Interval time.Duration `mapstructure:"interval"`
}

// FlushSettings defines settings for configuring the write buffer flushing
// policy in the Elasticsearch exporter. The exporter sends a bulk request with
// all events already serialized into the send-buffer.
type FlushSettings struct {
// Bytes sets the send buffer flushing limit.
Bytes int `mapstructure:"bytes"`

// Interval configures the max age of a document in the send buffer.
Interval time.Duration `mapstructure:"interval"`
}

// RetrySettings defines settings for the HTTP request retries in the Elasticsearch exporter.
// Failed sends are retried with exponential backoff.
type RetrySettings struct {
// Enabled allows users to disable retry without having to comment out all settings.
Enabled bool `mapstructure:"enabled"`

// Max configures how often an HTTP request is retried before it is assumed to be failed.
Max int `mapstructure:"max"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxRequests? If I were to add this to https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/queued_retry.go#L58 that would probably be a more consistent name.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Updated field name and configuration name to max_requests.


// InitialInterval configures the initial waiting time if a request failed.
InitialInterval time.Duration `mapstructure:"initial_interval"`

// MaxInterval configures the max waiting time if consecutive requests failed.
MaxInterval time.Duration `mapstructure:"max_interval"`
}

type MappingsSettings struct {
// Mode configures the field mappings.
Mode string `mapstructure:"mode"`

// Additional field mappings.
Fields map[string]string `mapstructure:"fields"`

// File to read additional fields mappings from.
File string `mapstructure:"file"`

// Try to find and remove duplicate fields
Dedup bool `mapstructure:"dedup"`

Dedot bool `mapstructure:"dedot"`
}

type MappingMode int

const (
MappingNone MappingMode = iota
MappingECS
)

func (m MappingMode) String() string {
switch m {
case MappingNone:
return ""
case MappingECS:
return "ecs"
default:
return ""
}
}

var mappingModes = func() map[string]MappingMode {
table := map[string]MappingMode{}
for _, m := range []MappingMode{
MappingNone,
MappingECS,
} {
table[strings.ToLower(m.String())] = m
}

// config aliases
table["no"] = MappingNone
table["none"] = MappingNone

return table
}()

// Validate validates the elasticsearch server configuration.
func (cfg *Config) Validate() error {
if len(cfg.URLs) == 0 && cfg.CloudID == "" {
return errors.New("Elasticsearch URL or CloudID must be specified")
}

for _, url := range cfg.URLs {
if url == "" {
return errors.New("Elasticsearch URL must not be empty")
}
}

if cfg.Index == "" {
return errors.New("Elasticsearch Index must be specified")
}

if _, ok := mappingModes[cfg.Mapping.Mode]; !ok {
return fmt.Errorf("unknown mapping mode %v", cfg.Mapping.Mode)
}

return nil
}
82 changes: 82 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearchexporter

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
require.NoError(t, err)

factory := NewFactory()
factories.Exporters[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
)
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)

r0 := cfg.Exporters["elasticsearch"]
assert.Equal(t, r0, factory.CreateDefaultConfig())

r1 := cfg.Exporters["elasticsearch/customname"].(*Config)
assert.Equal(t, r1, &Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "elasticsearch/customname"},
URLs: []string{"https://elastic.example.com:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "myindex",
Pipeline: "mypipeline",
HTTPClientSettings: HTTPClientSettings{
Authentication: AuthenticationSettings{
User: "elastic",
Password: "search",
APIKey: "AvFsEiPs==",
},
Timeout: 2 * time.Minute,
Headers: map[string]string{
"myheader": "test",
},
},
Discovery: DiscoverySettings{
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
},
Retry: RetrySettings{
Enabled: true,
Max: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
},
Mapping: MappingsSettings{
Mode: "ecs",
Dedup: true,
Dedot: true,
},
})
}
Loading