Skip to content

Commit

Permalink
[jaeger-v2] Add support for Elasticsearch (#5152)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- part of #4843

## Description of the changes
- add support for elasticsearch in jaeger-v2

## How was this change tested?
- tested locally

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Harshvir Potpose <[email protected]>
Signed-off-by: Harshvir Potpose <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
akagami-harsh and yurishkuro authored Feb 27, 2024
1 parent 8a5f824 commit 7a04461
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 11 deletions.
38 changes: 38 additions & 0 deletions cmd/jaeger/config-elasticsearch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: es_main
trace_storage_archive: es_archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
elasticsearch:
es_main:
server_urls: http://localhost:9200
log_level: "error"
index_prefix: "jaeger-main"
use_aliases: true
es_archive:
server_urls: http://localhost:9200
log_level: "error"
index_prefix: "jaeger-archive"
use_aliases: true
receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: es_main
6 changes: 4 additions & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"fmt"
"reflect"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -107,10 +109,17 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
esStarter := &starter[esCfg.Configuration, *es.Factory]{
ext: s,
storageKind: "elasticsearch",
cfg: s.config.Elasticsearch,
builder: es.NewFactoryWithConfig,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
esStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
43 changes: 43 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package jaegerstorage
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -16,6 +18,7 @@ import (
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
Expand Down Expand Up @@ -151,6 +154,46 @@ func TestBadgerStorageExtensionError(t *testing.T) {
require.ErrorContains(t, err, "/bad/path")
}

func TestESStorageExtension(t *testing.T) {
mockEsServerResponse := []byte(`
{
"Version": {
"Number": "6"
}
}
`)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(mockEsServerResponse)
}))
defer server.Close()
storageExtension := makeStorageExtenion(t, &Config{
Elasticsearch: map[string]esCfg.Configuration{
"foo": {
Servers: []string{server.URL},
LogLevel: "error",
},
},
})
ctx := context.Background()
err := storageExtension.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
require.NoError(t, storageExtension.Shutdown(ctx))
}

func TestESStorageExtensionError(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
Elasticsearch: map[string]esCfg.Configuration{
"foo": {
Servers: []string{"http://badurl"},
LogLevel: "error",
},
},
})
err := ext.Start(context.Background(), componenttest.NewNopHost())
require.ErrorContains(t, err, "failed to initialize elasticsearch storage")
require.ErrorContains(t, err, "badurl")
}

func noopTelemetrySettings() component.TelemetrySettings {
return component.TelemetrySettings{
Logger: zap.L(),
Expand Down
24 changes: 15 additions & 9 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

"github.com/asaskevich/govalidator"
esV8 "github.com/elastic/go-elasticsearch/v8"
"github.com/olivere/elastic"
"go.uber.org/zap"
Expand All @@ -45,7 +46,7 @@ import (

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
Servers []string `mapstructure:"server_urls" valid:"required,url"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
Expand All @@ -54,14 +55,14 @@ type Configuration struct {
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `yaml:"priority_span_template" mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `yaml:"priority_service_template" mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `yaml:"priority_dependencies_template" mapstructure:"priority_dependencies_template"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `mapstructure:"num_shards"`
NumReplicas int64 `mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `mapstructure:"priority_dependencies_template"`
Timeout time.Duration `mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
Expand Down Expand Up @@ -467,3 +468,8 @@ func loadTokenFromFile(path string) (string, error) {
}
return strings.TrimRight(string(b), "\r\n"), nil
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
}
33 changes: 33 additions & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,39 @@ func NewFactory() *Factory {
}
}

func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

cfg.MaxDocCount = defaultMaxDocCount
cfg.Enabled = true

archive := make(map[string]*namespaceConfig)
archive[archiveNamespace] = &namespaceConfig{
Configuration: cfg,
namespace: archiveNamespace,
}

f := NewFactory()
f.InitFromOptions(Options{
Primary: namespaceConfig{
Configuration: cfg,
namespace: primaryNamespace,
},
others: archive,
})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
Expand Down
57 changes: 57 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,63 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig)
}

func TestESStorageFactoryWithConfig(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(mockEsServerResponse)
}))
defer server.Close()
cfg := escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "error",
}
factory, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer factory.Close()
}

func TestConfigurationValidation(t *testing.T) {
testCases := []struct {
name string
cfg escfg.Configuration
wantErr bool
}{
{
name: "valid configuration",
cfg: escfg.Configuration{
Servers: []string{"http://localhost:9200"},
},
wantErr: false,
},
{
name: "missing servers",
cfg: escfg.Configuration{},
wantErr: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
if test.wantErr {
require.Error(t, err)
_, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

func TestESStorageFactoryWithConfigError(t *testing.T) {
cfg := escfg.Configuration{
Servers: []string{"http://badurl"},
LogLevel: "error",
}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
require.ErrorContains(t, err, "failed to create primary Elasticsearch client")
}

func TestPasswordFromFile(t *testing.T) {
defer testutils.VerifyGoLeaksOnce(t)
t.Run("primary client", func(t *testing.T) {
Expand Down

0 comments on commit 7a04461

Please sign in to comment.