Skip to content

Commit

Permalink
Add Elasticsearch OTEL exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Mar 27, 2020
1 parent 5e06df7 commit ac540da
Show file tree
Hide file tree
Showing 18 changed files with 898 additions and 81 deletions.
8 changes: 8 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageEs "github.com/jaegertracing/jaeger/plugin/storage/es"
storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand All @@ -37,9 +39,15 @@ func Components(v *viper.Viper) config.Factories {
opts.InitFromViper(v)
return opts
}}
esExp := elasticsearch.Factory{OptionsFactory: func() *storageEs.Options {
opts := elasticsearch.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaults.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
factories.Exporters[esExp.Type()] = esExp
return factories
}
7 changes: 6 additions & 1 deletion cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (
"github.com/magiconair/properties/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags, cassandra.DefaultOptions().AddFlags)
v, _ := jConfig.Viperize(kafka.DefaultOptions().AddFlags, cassandra.DefaultOptions().AddFlags, elasticsearch.DefaultOptions().AddFlags)
factories := Components(v)
assert.Equal(t, "jaeger_kafka", factories.Exporters[kafka.TypeStr].Type())
assert.Equal(t, "jaeger_cassandra", factories.Exporters[cassandra.TypeStr].Type())
assert.Equal(t, "jaeger_elasticsearch", factories.Exporters[elasticsearch.TypeStr].Type())

kafkaFactory := factories.Exporters[kafka.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config)
Expand All @@ -37,4 +39,7 @@ func TestComponents(t *testing.T) {
cassandraFactory := factories.Exporters[cassandra.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
esFactory := factories.Exporters[elasticsearch.TypeStr]
ec := esFactory.CreateDefaultConfig().(*elasticsearch.Config)
assert.Equal(t, []string{"http://127.0.0.1:9200"}, ec.GetPrimary().Servers)
}
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/elasticsearch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearch

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

"github.com/jaegertracing/jaeger/plugin/storage/es"
)

// Config holds configuration of Jaeger Elasticsearch exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
es.Options `mapstructure:",squash"`
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearch

import (
"path"
"testing"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/es"
)

func TestDefaultConfig(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := &Factory{OptionsFactory: func() *es.Options {
return opts
}}
defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, []string{"http://127.0.0.1:9200"}, defaultCfg.GetPrimary().Servers)
assert.Equal(t, int64(5), defaultCfg.GetPrimary().NumShards)
assert.Equal(t, int64(1), defaultCfg.GetPrimary().NumReplicas)
assert.Equal(t, "@", defaultCfg.GetPrimary().Tags.DotReplacement)
assert.Equal(t, false, defaultCfg.GetPrimary().TLS.Enabled)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--config-file=./testdata/jaeger-config.yaml"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{OptionsFactory: func() *es.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
require.Equal(t, []string{"bar"}, opts.GetPrimary().Servers)
require.Equal(t, "staging", opts.GetPrimary().GetIndexPrefix())
assert.Equal(t, int64(100), opts.GetPrimary().NumShards)
return opts
}}

factories.Exporters[TypeStr] = factory
colConfig, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, colConfig)

cfg := colConfig.Exporters[TypeStr].(*Config)
esCfg := cfg.GetPrimary()
assert.Equal(t, TypeStr, cfg.Name())
assert.Equal(t, []string{"someUrl"}, esCfg.Servers)
assert.Equal(t, true, esCfg.CreateIndexTemplates)
assert.Equal(t, "staging", esCfg.IndexPrefix)
assert.Equal(t, int64(100), esCfg.NumShards)
assert.Equal(t, "user", esCfg.Username)
assert.Equal(t, "pass", esCfg.Password)
assert.Equal(t, "/var/run/k8s", esCfg.TokenFilePath)
assert.Equal(t, true, esCfg.UseReadWriteAliases)
assert.Equal(t, true, esCfg.Sniffer)
assert.Equal(t, true, esCfg.Tags.AllAsFields)
assert.Equal(t, "/etc/jaeger", esCfg.Tags.File)
assert.Equal(t, "O", esCfg.Tags.DotReplacement)
}
16 changes: 16 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/elasticsearch/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package elasticsearch implements Jaeger Elasticsearch storage as OpenTelemetry exporter.
package elasticsearch
35 changes: 35 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/elasticsearch/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearch

import (
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/es"
)

// New creates Elasticsearch exporter/storage.
func New(config *Config, log *zap.Logger) (exporter.TraceExporter, error) {
factory := es.NewFactory()
factory.InitFromOptions(config.Options)
err := factory.Initialize(metrics.NullFactory, log)
if err != nil {
return nil, err
}
return storageOtelExporter.NewSpanWriterExporter(&config.ExporterSettings, factory)
}
78 changes: 78 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/elasticsearch/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearch

import (
"fmt"

"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin/storage/es"
)

const (
// TypeStr defines type of the Elasticsearch exporter.
TypeStr = "jaeger_elasticsearch"
)

// OptionsFactory returns initialized es.OptionsFactory structure.
type OptionsFactory func() *es.Options

// DefaultOptions creates Elasticsearch options supported by this exporter.
func DefaultOptions() *es.Options {
return es.NewOptions("es")
}

// Factory is the factory for Jaeger Elasticsearch exporter.
type Factory struct {
OptionsFactory OptionsFactory
}

// Type gets the type of exporter.
func (Factory) Type() string {
return TypeStr
}

// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL exporter.BaseFactory interface.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
}
}

// CreateTraceExporter creates Jaeger Elasticsearch trace exporter.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) {
esCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
return New(esCfg, log)
}

// CreateMetricsExporter is not implemented.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (exporter.MetricsExporter, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package elasticsearch

import (
"testing"

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/es"
)

func TestCreateTraceExporter(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := &Factory{OptionsFactory: func() *es.Options {
return opts
}}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), factory.CreateDefaultConfig())
require.Nil(t, exporter)
assert.EqualError(t, err, "failed to create primary Elasticsearch client: health check timeout: Head http://127.0.0.1:9200: dial tcp 127.0.0.1:9200: connect: connection refused: no Elasticsearch node available")
}

func TestCreateTraceExporter_nilConfig(t *testing.T) {
factory := &Factory{}
exporter, err := factory.CreateTraceExporter(zap.NewNop(), nil)
require.Nil(t, exporter)
assert.EqualError(t, err, "could not cast configuration to jaeger_elasticsearch")
}

func TestCreateMetricsExporter(t *testing.T) {
f := Factory{OptionsFactory: DefaultOptions}
mReceiver, err := f.CreateMetricsExporter(zap.NewNop(), f.CreateDefaultConfig())
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{OptionsFactory: DefaultOptions}
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestType(t *testing.T) {
factory := Factory{OptionsFactory: DefaultOptions}
assert.Equal(t, TypeStr, factory.Type())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
receivers:
examplereceiver:

processors:
exampleprocessor:

exporters:
jaeger_elasticsearch:
server_urls: "someUrl"
username: user
password: pass
token_file: /var/run/k8s
tags_as_fields:
all: true
config_file: /etc/jaeger
dot_replacement: "O"
use_aliases: true
sniffer: true

service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor]
exporters: [jaeger_elasticsearch]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
es:
num-shards: 100
Loading

0 comments on commit ac540da

Please sign in to comment.