Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add support for Elasticsearch #5152

Merged
merged 34 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0afdfd5
add elasticsearch for jeager-v2
akagami-harsh Jan 29, 2024
c60bf5a
fix
akagami-harsh Jan 29, 2024
67707ed
Merge branch 'main' into elasticsearch
akagami-harsh Jan 29, 2024
a47bb35
add tests
akagami-harsh Jan 29, 2024
5ea1df1
Merge branch 'main' into elasticsearch
akagami-harsh Jan 31, 2024
70edeeb
Merge branch 'main' into elasticsearch
akagami-harsh Feb 5, 2024
430a0ce
use separate index_prefix in archive storage
akagami-harsh Feb 5, 2024
d8d4939
Update cmd/jaeger/internal/extension/jaegerstorage/extension.go
akagami-harsh Feb 6, 2024
da38c6a
Update cmd/jaeger/internal/extension/jaegerstorage/extension.go
akagami-harsh Feb 6, 2024
001e4a0
fix
akagami-harsh Feb 6, 2024
f93b1c6
solve merge conflicts
akagami-harsh Feb 7, 2024
9aa2c26
refactor tests
akagami-harsh Feb 7, 2024
c5c20ce
fix
akagami-harsh Feb 7, 2024
bcdf8a0
Merge branch 'main' into elasticsearch
akagami-harsh Feb 7, 2024
236067c
add annotations
akagami-harsh Feb 16, 2024
72c5c42
fix
akagami-harsh Feb 16, 2024
79130b4
remove left over yaml annotations
akagami-harsh Feb 18, 2024
e373c3d
add docker compose for debug
akagami-harsh Feb 18, 2024
e646a3f
fix
akagami-harsh Feb 18, 2024
e61cf78
revert changes
akagami-harsh Feb 19, 2024
6824ec4
fix
akagami-harsh Feb 19, 2024
999d336
Merge branch 'jaegertracing:main' into elasticsearch
akagami-harsh Feb 19, 2024
66f63b5
rename es_config.yaml to config-elasticsearch.yaml
akagami-harsh Feb 19, 2024
a86760c
set UseReadWriteAliases to true in config file
akagami-harsh Feb 20, 2024
7ad3b0d
remove debug docker compose and dockerfile
akagami-harsh Feb 20, 2024
8dc35cb
add validator to validate config
akagami-harsh Feb 20, 2024
0744562
fix
akagami-harsh Feb 20, 2024
275786c
fix
akagami-harsh Feb 20, 2024
3763aa9
fix
akagami-harsh Feb 25, 2024
495daaf
fix lint
akagami-harsh Feb 25, 2024
0f7d2f1
add test for cfg validation
akagami-harsh Feb 26, 2024
a6cec26
fix lint
akagami-harsh Feb 26, 2024
915e83f
fix
akagami-harsh Feb 26, 2024
278d49a
Merge branch 'main' into elasticsearch
yurishkuro Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions cmd/jaeger/config-elasticsearch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: es_main
trace_storage_archive: es_archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
elasticsearch:
es_main:
server_urls: http://localhost:9200
log_level: "error"
index_prefix: "jaeger-main"
use_aliases: true
es_archive:
server_urls: http://localhost:9200
log_level: "error"
index_prefix: "jaeger-archive"
use_aliases: true
receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: es_main
6 changes: 4 additions & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"fmt"
"reflect"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -107,10 +109,17 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
esStarter := &starter[esCfg.Configuration, *es.Factory]{
ext: s,
storageKind: "elasticsearch",
cfg: s.config.Elasticsearch,
builder: es.NewFactoryWithConfig,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
esStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
43 changes: 43 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package jaegerstorage
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -16,6 +18,7 @@ import (
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
Expand Down Expand Up @@ -151,6 +154,46 @@ func TestBadgerStorageExtensionError(t *testing.T) {
require.ErrorContains(t, err, "/bad/path")
}

func TestESStorageExtension(t *testing.T) {
mockEsServerResponse := []byte(`
{
"Version": {
"Number": "6"
}
}
`)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(mockEsServerResponse)
}))
defer server.Close()
storageExtension := makeStorageExtenion(t, &Config{
Elasticsearch: map[string]esCfg.Configuration{
"foo": {
Servers: []string{server.URL},
LogLevel: "error",
},
},
})
ctx := context.Background()
err := storageExtension.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)
require.NoError(t, storageExtension.Shutdown(ctx))
}

func TestESStorageExtensionError(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
Elasticsearch: map[string]esCfg.Configuration{
"foo": {
Servers: []string{"http://badurl"},
LogLevel: "error",
},
},
})
err := ext.Start(context.Background(), componenttest.NewNopHost())
require.ErrorContains(t, err, "failed to initialize elasticsearch storage")
require.ErrorContains(t, err, "badurl")
}

func noopTelemetrySettings() component.TelemetrySettings {
return component.TelemetrySettings{
Logger: zap.L(),
Expand Down
24 changes: 15 additions & 9 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

"github.com/asaskevich/govalidator"
esV8 "github.com/elastic/go-elasticsearch/v8"
"github.com/olivere/elastic"
"go.uber.org/zap"
Expand All @@ -45,7 +46,7 @@ import (

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
Servers []string `mapstructure:"server_urls" valid:"required,url"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
Expand All @@ -54,14 +55,14 @@ type Configuration struct {
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `yaml:"priority_span_template" mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `yaml:"priority_service_template" mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `yaml:"priority_dependencies_template" mapstructure:"priority_dependencies_template"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `mapstructure:"num_shards"`
NumReplicas int64 `mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `mapstructure:"priority_dependencies_template"`
Timeout time.Duration `valid:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
Expand Down Expand Up @@ -467,3 +468,8 @@ func loadTokenFromFile(path string) (string, error) {
}
return strings.TrimRight(string(b), "\r\n"), nil
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
}
33 changes: 33 additions & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,39 @@
}
}

func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 93 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L92-L93

Added lines #L92 - L93 were not covered by tests
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

cfg.MaxDocCount = defaultMaxDocCount
cfg.Enabled = true

archive := make(map[string]*namespaceConfig)
archive[archiveNamespace] = &namespaceConfig{
Configuration: cfg,
namespace: archiveNamespace,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, this is logically wrong, but will work for now. The underlying problem is the existence of storage.ArchiveFactory interface, we need to get rid of it long term, because as far as interface goes the archive storage is just like a regular storage, only with different configuration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


f := NewFactory()
f.InitFromOptions(Options{
Primary: namespaceConfig{
Configuration: cfg,
namespace: primaryNamespace,
},
others: archive,
})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
Expand Down
24 changes: 24 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,30 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig)
}

func TestESStorageFactoryWithConfig(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(mockEsServerResponse)
}))
defer server.Close()
cfg := escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "error",
}
factory, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer factory.Close()
}

func TestESStorageFactoryWithConfigError(t *testing.T) {
cfg := escfg.Configuration{
Servers: []string{"http://badurl"},
LogLevel: "error",
}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
require.ErrorContains(t, err, "failed to create primary Elasticsearch client")
}

func TestPasswordFromFile(t *testing.T) {
defer testutils.VerifyGoLeaksOnce(t)
t.Run("primary client", func(t *testing.T) {
Expand Down
Loading