Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ES config and factory #4396

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 1 addition & 130 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,8 @@ type TagsAsFields struct {
Include string `mapstructure:"include"`
}

// ClientBuilder creates new es.Client
type ClientBuilder interface {
NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)
GetRemoteReadClusters() []string
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetMaxDocCount() int
GetIndexPrefix() string
GetIndexDateLayoutSpans() string
GetIndexDateLayoutServices() string
GetIndexDateLayoutDependencies() string
GetIndexRolloverFrequencySpansDuration() time.Duration
GetIndexRolloverFrequencyServicesDuration() time.Duration
GetTagsFilePath() string
GetAllTagsAsFields() bool
GetTagDotReplacement() string
GetUseReadWriteAliases() bool
GetTokenFilePath() string
IsStorageEnabled() bool
IsCreateIndexTemplates() bool
GetVersion() uint
TagKeysAsFields() ([]string, error)
GetUseILM() bool
GetLogLevel() string
GetSendGetBodyAs() string
}

// NewClient creates a new ElasticSearch client
func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
if len(c.Servers) < 1 {
return nil, errors.New("no servers specified")
}
Expand Down Expand Up @@ -275,51 +247,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// GetRemoteReadClusters returns list of remote read clusters
func (c *Configuration) GetRemoteReadClusters() []string {
return c.RemoteReadClusters
}

// GetNumShards returns number of shards from Configuration
func (c *Configuration) GetNumShards() int64 {
return c.NumShards
}

// GetNumReplicas returns number of replicas from Configuration
func (c *Configuration) GetNumReplicas() int64 {
return c.NumReplicas
}

// GetMaxSpanAge returns max span age from Configuration
func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetMaxDocCount returns the maximum number of documents that a query should return
func (c *Configuration) GetMaxDocCount() int {
return c.MaxDocCount
}

// GetIndexPrefix returns index prefix
func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetIndexDateLayoutSpans returns jaeger-span index date layout
func (c *Configuration) GetIndexDateLayoutSpans() string {
return c.IndexDateLayoutSpans
}

// GetIndexDateLayoutServices returns jaeger-service index date layout
func (c *Configuration) GetIndexDateLayoutServices() string {
return c.IndexDateLayoutServices
}

// GetIndexDateLayoutDependencies returns jaeger-dependencies index date layout
func (c *Configuration) GetIndexDateLayoutDependencies() string {
return c.IndexDateLayoutDependencies
}

// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration {
if c.IndexRolloverFrequencySpans == "hour" {
Expand All @@ -336,62 +263,6 @@ func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duratio
return -24 * time.Hour
}

// GetTagsFilePath returns a path to file containing tag keys
func (c *Configuration) GetTagsFilePath() string {
return c.Tags.File
}

// GetAllTagsAsFields returns true if all tags should be stored as object fields
func (c *Configuration) GetAllTagsAsFields() bool {
return c.Tags.AllAsFields
}

// GetVersion returns Elasticsearch version
func (c *Configuration) GetVersion() uint {
return c.Version
}

// GetTagDotReplacement returns character is used to replace dots in tag keys, when
// the tag is stored as object field.
func (c *Configuration) GetTagDotReplacement() string {
return c.Tags.DotReplacement
}

// GetUseReadWriteAliases indicates whether read alias should be used
func (c *Configuration) GetUseReadWriteAliases() bool {
return c.UseReadWriteAliases
}

// GetUseILM indicates whether ILM should be used
func (c *Configuration) GetUseILM() bool {
return c.UseILM
}

// GetLogLevel returns the log-level the ES client should log at.
func (c *Configuration) GetLogLevel() string {
return c.LogLevel
}

// GetSendGetBodyAs returns the SendGetBodyAs the ES client should use.
func (c *Configuration) GetSendGetBodyAs() string {
return c.SendGetBodyAs
}

// GetTokenFilePath returns file path containing the bearer token
func (c *Configuration) GetTokenFilePath() string {
return c.TokenFilePath
}

// IsStorageEnabled determines whether storage is enabled
func (c *Configuration) IsStorageEnabled() bool {
return c.Enabled
}

// IsCreateIndexTemplates determines whether index templates should be created or not
func (c *Configuration) IsCreateIndexTemplates() bool {
return c.CreateIndexTemplates
}

// TagKeysAsFields returns tags from the file and command line merged
func (c *Configuration) TagKeysAsFields() ([]string, error) {
var tags []string
Expand Down
79 changes: 41 additions & 38 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,19 @@ type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger

primaryConfig config.ClientBuilder
newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
primaryClient es.Client
archiveConfig config.ClientBuilder
archiveConfig *config.Configuration
archiveClient es.Client
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
Options: NewOptions(primaryNamespace, archiveNamespace),
Options: NewOptions(primaryNamespace, archiveNamespace),
newClientFn: config.NewClient,
}
}

Expand Down Expand Up @@ -89,13 +92,13 @@ func (f *Factory) InitFromOptions(o Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.primaryConfig.NewClient(logger, metricsFactory)
primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient = primaryClient
if f.archiveConfig.IsStorageEnabled() {
f.archiveClient, err = f.archiveConfig.NewClient(logger, metricsFactory)
if f.archiveConfig.Enabled {
f.archiveClient, err = f.newClientFn(f.archiveConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
Expand All @@ -120,15 +123,15 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.IsStorageEnabled() {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.IsStorageEnabled() {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
Expand All @@ -138,40 +141,40 @@ func createSpanReader(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
cfg *config.Configuration,
archive bool,
) (spanstore.Reader, error) {
if cfg.GetUseILM() && !cfg.GetUseReadWriteAliases() {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.GetMaxDocCount(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
MaxDocCount: cfg.MaxDocCount,
MaxSpanAge: cfg.MaxSpanAge,
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(),
ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(),
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
TagDotReplacement: cfg.Tags.DotReplacement,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
RemoteReadClusters: cfg.RemoteReadClusters,
}), nil
}

func createSpanWriter(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
cfg *config.Configuration,
archive bool,
) (spanstore.Writer, error) {
var tags []string
var err error
if cfg.GetUseILM() && !cfg.GetUseReadWriteAliases() {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
if tags, err = cfg.TagKeysAsFields(); err != nil {
Expand All @@ -181,11 +184,11 @@ func createSpanWriter(

mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: cfg.GetNumShards(),
Replicas: cfg.GetNumReplicas(),
EsVersion: cfg.GetVersion(),
IndexPrefix: cfg.GetIndexPrefix(),
UseILM: cfg.GetUseILM(),
Shards: cfg.NumShards,
Replicas: cfg.NumReplicas,
EsVersion: cfg.Version,
IndexPrefix: cfg.IndexPrefix,
UseILM: cfg.UseILM,
}

spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()
Expand All @@ -196,19 +199,19 @@ func createSpanWriter(
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
AllTagsAsFields: cfg.Tags.AllAsFields,
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
UseReadWriteAliases: cfg.UseReadWriteAliases,
})

// Creating a template here would conflict with the one created for ILM resulting to no index rollover
if cfg.IsCreateIndexTemplates() && !cfg.GetUseILM() {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.GetIndexPrefix())
if cfg.CreateIndexTemplates && !cfg.UseILM {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix)
if err != nil {
return nil, err
}
Expand All @@ -219,15 +222,15 @@ func createSpanWriter(
func createDependencyReader(
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
cfg *config.Configuration,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Logger: logger,
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayoutDependencies(),
MaxDocCount: cfg.GetMaxDocCount(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.IndexPrefix,
IndexDateLayout: cfg.IndexDateLayoutDependencies,
MaxDocCount: cfg.MaxDocCount,
UseReadWriteAliases: cfg.UseReadWriteAliases,
})
return reader, nil
}
Expand Down
Loading