diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 0d625e67965..38f3fc42f3f 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -45,6 +45,7 @@ import ( // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { Servers []string `mapstructure:"server_urls"` + RemoteReadClusters []string `mapstructure:"remote_read_clusters"` Username string `mapstructure:"username"` Password string `mapstructure:"password" json:"-"` TokenFilePath string `mapstructure:"token_file"` @@ -89,6 +90,7 @@ type TagsAsFields struct { // ClientBuilder creates new es.Client type ClientBuilder interface { NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) + GetRemoteReadClusters() []string GetNumShards() int64 GetNumReplicas() int64 GetMaxSpanAge() time.Duration @@ -193,6 +195,9 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac // ApplyDefaults copies settings from source unless its own value is non-zero. func (c *Configuration) ApplyDefaults(source *Configuration) { + if len(c.RemoteReadClusters) == 0 { + c.RemoteReadClusters = source.RemoteReadClusters + } if c.Username == "" { c.Username = source.Username } @@ -246,6 +251,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { } } +// GetRemoteReadClusters returns list of remote read clusters +func (c *Configuration) GetRemoteReadClusters() []string { + return c.RemoteReadClusters +} + // GetNumShards returns number of shards from Configuration func (c *Configuration) GetNumShards() int64 { return c.NumShards diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 4f4bf496d7c..3877840750c 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -151,6 +151,7 @@ func createSpanReader( TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), Archive: archive, + RemoteReadClusters: cfg.GetRemoteReadClusters(), }), nil } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 68ca71c6cb3..da2702f3cd7 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -35,6 +35,7 @@ const ( suffixSnifferTLSEnabled = ".sniffer-tls-enabled" suffixTokenPath = ".token-file" suffixServerURLs = ".server-urls" + suffixRemoteReadClusters = ".remote-read-clusters" suffixMaxSpanAge = ".max-span-age" suffixNumShards = ".num-shards" suffixNumReplicas = ".num-replicas" @@ -59,8 +60,9 @@ const ( suffixLogLevel = ".log-level" // default number of documents to return from a query (elasticsearch allowed limit) // see search.max_buckets and index.max_result_window - defaultMaxDocCount = 10_000 - defaultServerURL = "http://127.0.0.1:9200" + defaultMaxDocCount = 10_000 + defaultServerURL = "http://127.0.0.1:9200" + defaultRemoteReadClusters = "" // default separator for Elasticsearch index date layout. defaultIndexDateSeparator = "-" ) @@ -102,6 +104,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { CreateIndexTemplates: true, Version: 0, Servers: []string{defaultServerURL}, + RemoteReadClusters: []string{}, MaxDocCount: defaultMaxDocCount, LogLevel: "error", } @@ -162,6 +165,11 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixServerURLs, defaultServerURL, "The comma-separated list of Elasticsearch servers, must be full url i.e. http://localhost:9200") + flagSet.String( + nsConfig.namespace+suffixRemoteReadClusters, + defaultRemoteReadClusters, + "Comma-separated list of Elasticsearch remote cluster names for cross-cluster querying."+ + "See Elasticsearch remote clusters and cross-cluster query api.") flagSet.Duration( nsConfig.namespace+suffixTimeout, nsConfig.Timeout, @@ -304,6 +312,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { // TODO: Need to figure out a better way for do this. cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey) cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v) + + remoteReadClusters := stripWhiteSpace(v.GetString(cfg.namespace + suffixRemoteReadClusters)) + if len(remoteReadClusters) > 0 { + cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",") + } } // GetPrimary returns primary configuration. diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index cd0101243d8..d9bee8bd2b6 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -31,6 +31,7 @@ func TestOptions(t *testing.T) { assert.Empty(t, primary.Username) assert.Empty(t, primary.Password) assert.NotEmpty(t, primary.Servers) + assert.Empty(t, primary.RemoteReadClusters) assert.Equal(t, int64(5), primary.NumShards) assert.Equal(t, int64(1), primary.NumReplicas) assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) @@ -58,6 +59,7 @@ func TestOptionsWithFlags(t *testing.T) { "--es.num-replicas=10", "--es.index-date-separator=", // a couple overrides + "--es.remote-read-clusters=cluster_one,cluster_two", "--es.aux.server-urls=3.3.3.3, 4.4.4.4", "--es.aux.max-span-age=24h", "--es.aux.num-replicas=10", @@ -77,6 +79,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "hello", primary.Username) assert.Equal(t, "/foo/bar", primary.TokenFilePath) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) + assert.Equal(t, []string{"cluster_one", "cluster_two"}, primary.RemoteReadClusters) assert.Equal(t, 48*time.Hour, primary.MaxSpanAge) assert.True(t, primary.Sniffer) assert.True(t, primary.SnifferTLSEnabled) @@ -103,6 +106,19 @@ func TestOptionsWithFlags(t *testing.T) { assert.True(t, primary.UseILM) } +func TestEmptyRemoteReadClusters(t *testing.T) { + opts := NewOptions("es", "es.aux") + v, command := config.Viperize(opts.AddFlags) + err := command.ParseFlags([]string{ + "--es.remote-read-clusters=", + }) + require.NoError(t, err) + opts.InitFromViper(v) + + primary := opts.GetPrimary() + assert.Equal(t, []string{}, primary.RemoteReadClusters) +} + func TestMaxSpanAgeSetErrorInArchiveMode(t *testing.T) { opts := NewOptions("es", archiveNamespace) _, command := config.Viperize(opts.AddFlags) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index bbf46789b71..e1bee4e6647 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -116,6 +116,7 @@ type SpanReaderParams struct { TagDotReplacement string Archive bool UseReadWriteAliases bool + RemoteReadClusters []string } // NewSpanReader returns a new SpanReader with a metrics. @@ -129,7 +130,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), indexDateLayout: p.IndexDateLayout, spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), - timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), + timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, } @@ -139,7 +140,7 @@ type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime t type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource -func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { +func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { if archive { var archiveSuffix string if useReadWriteAliases { @@ -147,16 +148,36 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { } else { archiveSuffix = archiveIndexSuffix } - return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { - return []string{archiveIndex(indexName, archiveSuffix)} - } + return addRemoteReadClusters(func(indexPrefix, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + return []string{archiveIndex(indexPrefix, archiveSuffix)} + }, remoteReadClusters) } if useReadWriteAliases { - return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { - return []string{indices + "read"} + return addRemoteReadClusters(func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + return []string{indexPrefix + "read"} + }, remoteReadClusters) + } + return addRemoteReadClusters(timeRangeIndices, remoteReadClusters) +} + +// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices. +// Elasticsearch cross cluster api example GET /twitter,cluster_one:twitter,cluster_two:twitter/_search. +func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) timeRangeIndexFn { + return func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + jaegerIndices := fn(indexPrefix, indexDateLayout, startTime, endTime) + if len(remoteReadClusters) == 0 { + return jaegerIndices } + + for _, jaegerIndex := range jaegerIndices { + for _, remoteCluster := range remoteReadClusters { + remoteIndex := remoteCluster + ":" + jaegerIndex + jaegerIndices = append(jaegerIndices, remoteIndex) + } + } + + return jaegerIndices } - return timeRangeIndices } func getSourceFn(archive bool, maxDocCount int) sourceFn { diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index de841663016..4668a00ff2d 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -149,35 +149,59 @@ func TestSpanReaderIndices(t *testing.T) { date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) dateFormat := date.UTC().Format("2006-01-02") testCases := []struct { - index string - params SpanReaderParams + indices []string + params SpanReaderParams }{ {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false}, - index: spanIndex + dateFormat}, + indices: []string{spanIndex + dateFormat}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", UseReadWriteAliases: true}, - index: spanIndex + "read"}, + indices: []string{spanIndex + "read"}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: false}, - index: "foo:" + indexPrefixSeparator + spanIndex + dateFormat}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", UseReadWriteAliases: true}, - index: "foo:-" + spanIndex + "read"}, + indices: []string{"foo:-" + spanIndex + "read"}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true}, - index: spanIndex + archiveIndexSuffix}, + indices: []string{spanIndex + archiveIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true}, - index: "foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true}, - index: "foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, + indices: []string{ + spanIndex + dateFormat, + "cluster_one:" + spanIndex + dateFormat, + "cluster_two:" + spanIndex + dateFormat}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, + indices: []string{ + spanIndex + archiveIndexSuffix, + "cluster_one:" + spanIndex + archiveIndexSuffix, + "cluster_two:" + spanIndex + archiveIndexSuffix}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, + indices: []string{ + spanIndex + "read", + "cluster_one:" + spanIndex + "read", + "cluster_two:" + spanIndex + "read"}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, + indices: []string{ + spanIndex + archiveReadIndexSuffix, + "cluster_one:" + spanIndex + archiveReadIndexSuffix, + "cluster_two:" + spanIndex + archiveReadIndexSuffix}}, } for _, testCase := range testCases { r := NewSpanReader(testCase.params) actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date) - assert.Equal(t, []string{testCase.index}, actual) + assert.Equal(t, testCase.indices, actual) } }