Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[es] Add index rollover mode that can choose day and hour #2965

Merged
merged 13 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 65 additions & 31 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,37 @@ import (

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayout string `mapstructure:"index_date_layout"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayoutSpans string `mapstructure:"-"`
IndexDateLayoutServices string `mapstructure:"-"`
IndexDateLayoutDependencies string `mapstructure:"-"`
IndexRolloverFrequencySpans string `mapstructure:"-"`
IndexRolloverFrequencyServices string `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
}

// TagsAsFields holds configuration for tag schema.
Expand All @@ -96,7 +100,11 @@ type ClientBuilder interface {
GetMaxSpanAge() time.Duration
GetMaxDocCount() int
GetIndexPrefix() string
GetIndexDateLayout() string
GetIndexDateLayoutSpans() string
GetIndexDateLayoutServices() string
GetIndexDateLayoutDependencies() string
GetIndexRolloverFrequencySpansDuration() time.Duration
GetIndexRolloverFrequencyServicesDuration() time.Duration
GetTagsFilePath() string
GetAllTagsAsFields() bool
GetTagDotReplacement() string
Expand Down Expand Up @@ -281,9 +289,35 @@ func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetIndexDateLayout returns index date layout
func (c *Configuration) GetIndexDateLayout() string {
return c.IndexDateLayout
// GetIndexDateLayoutSpans returns jaeger-span index date layout
func (c *Configuration) GetIndexDateLayoutSpans() string {
return c.IndexDateLayoutSpans
}

// GetIndexDateLayoutServices returns jaeger-service index date layout
func (c *Configuration) GetIndexDateLayoutServices() string {
return c.IndexDateLayoutServices
}

// GetIndexDateLayoutDependencies returns jaeger-dependencies index date layout
func (c *Configuration) GetIndexDateLayoutDependencies() string {
return c.IndexDateLayoutDependencies
}

// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration {
if c.IndexRolloverFrequencySpans == "hour" {
return -1 * time.Hour
}
return -24 * time.Hour
}

// GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration {
if c.IndexRolloverFrequencyServices == "hour" {
return -1 * time.Hour
}
return -24 * time.Hour
}

// GetTagsFilePath returns a path to file containing tag keys
Expand Down
48 changes: 26 additions & 22 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(),
f.primaryConfig.GetIndexDateLayout(), f.primaryConfig.GetMaxDocCount())
f.primaryConfig.GetIndexDateLayoutDependencies(), f.primaryConfig.GetMaxDocCount())
return reader, nil
}

Expand Down Expand Up @@ -141,17 +141,20 @@ func createSpanReader(
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.GetMaxDocCount(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.GetMaxDocCount(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(),
ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(),
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
}), nil
}

Expand Down Expand Up @@ -186,16 +189,17 @@ func createSpanWriter(
return nil, err
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
Archive: archive,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
Archive: archive,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
})
if cfg.IsCreateIndexTemplates() {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.GetIndexPrefix())
Expand Down
95 changes: 61 additions & 34 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package es

import (
"flag"
"fmt"
"strings"
"time"

Expand All @@ -29,42 +28,46 @@ import (
)

const (
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixIndexRolloverFrequencySpans = ".index-rollover-frequency-spans"
suffixIndexRolloverFrequencyServices = ".index-rollover-frequency-services"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
// default number of documents to return from a query (elasticsearch allowed limit)
// see search.max_buckets and index.max_result_window
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultRemoteReadClusters = ""
// default separator for Elasticsearch index date layout.
defaultIndexDateSeparator = "-"

defaultIndexRolloverFrequency = "day"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -205,7 +208,17 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
flagSet.String(
nsConfig.namespace+suffixIndexDateSeparator,
defaultIndexDateSeparator,
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".")
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20\".")
flagSet.String(
nsConfig.namespace+suffixIndexRolloverFrequencySpans,
defaultIndexRolloverFrequency,
"Rotates jaeger-span indices over the given period. For example \"day\" creates \"jaeger-span-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+
"This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover")
flagSet.String(
nsConfig.namespace+suffixIndexRolloverFrequencyServices,
defaultIndexRolloverFrequency,
"Rotates jaeger-service indices over the given period. For example \"day\" creates \"jaeger-service-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+
"This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover")
flagSet.Bool(
nsConfig.namespace+suffixTagsAsFieldsAll,
nsConfig.Tags.AllAsFields,
Expand Down Expand Up @@ -295,7 +308,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.IndexDateLayout = initDateLayout(v.GetString(cfg.namespace + suffixIndexDateSeparator))
cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude)
cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile)
Expand All @@ -317,6 +329,16 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
if len(remoteReadClusters) > 0 {
cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",")
}

cfg.IndexRolloverFrequencySpans = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans))
cfg.IndexRolloverFrequencyServices = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices))

separator := v.GetString(cfg.namespace + suffixIndexDateSeparator)
cfg.IndexDateLayoutSpans = initDateLayout(cfg.IndexRolloverFrequencySpans, separator)
cfg.IndexDateLayoutServices = initDateLayout(cfg.IndexRolloverFrequencyServices, separator)

// Dependencies calculation should be daily, and this index size is very small
cfg.IndexDateLayoutDependencies = initDateLayout(defaultIndexRolloverFrequency, separator)
}

// GetPrimary returns primary configuration.
Expand All @@ -343,6 +365,11 @@ func stripWhiteSpace(str string) string {
return strings.Replace(str, " ", "", -1)
}

func initDateLayout(separator string) string {
return fmt.Sprintf("2006%s01%s02", separator, separator)
func initDateLayout(rolloverFreq, sep string) string {
// default to daily format
indexLayout := "2006" + sep + "01" + sep + "02"
if rolloverFreq == "hour" {
indexLayout = indexLayout + sep + "15"
}
return indexLayout
}
Loading