-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[es] Add remote read clusters option for cross-cluster querying #2874
Changes from all commits
7f9b50e
8d51bc5
51ee793
5ecffd3
95c3bf3
80a5403
15a0ee6
aab1458
29365cb
6557e5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ func TestOptions(t *testing.T) { | |
assert.Empty(t, primary.Username) | ||
assert.Empty(t, primary.Password) | ||
assert.NotEmpty(t, primary.Servers) | ||
assert.Empty(t, primary.RemoteReadClusters) | ||
assert.Equal(t, int64(5), primary.NumShards) | ||
assert.Equal(t, int64(1), primary.NumReplicas) | ||
assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) | ||
|
@@ -58,6 +59,7 @@ func TestOptionsWithFlags(t *testing.T) { | |
"--es.num-replicas=10", | ||
"--es.index-date-separator=", | ||
// a couple overrides | ||
"--es.remote-read-clusters=cluster_one,cluster_two", | ||
"--es.aux.server-urls=3.3.3.3, 4.4.4.4", | ||
"--es.aux.max-span-age=24h", | ||
"--es.aux.num-replicas=10", | ||
|
@@ -77,6 +79,7 @@ func TestOptionsWithFlags(t *testing.T) { | |
assert.Equal(t, "hello", primary.Username) | ||
assert.Equal(t, "/foo/bar", primary.TokenFilePath) | ||
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) | ||
assert.Equal(t, []string{"cluster_one", "cluster_two"}, primary.RemoteReadClusters) | ||
assert.Equal(t, 48*time.Hour, primary.MaxSpanAge) | ||
assert.True(t, primary.Sniffer) | ||
assert.True(t, primary.SnifferTLSEnabled) | ||
|
@@ -103,6 +106,19 @@ func TestOptionsWithFlags(t *testing.T) { | |
assert.True(t, primary.UseILM) | ||
} | ||
|
||
func TestEmptyRemoteReadClusters(t *testing.T) { | ||
opts := NewOptions("es", "es.aux") | ||
v, command := config.Viperize(opts.AddFlags) | ||
err := command.ParseFlags([]string{ | ||
"--es.remote-read-clusters=", | ||
}) | ||
require.NoError(t, err) | ||
opts.InitFromViper(v) | ||
|
||
primary := opts.GetPrimary() | ||
assert.Equal(t, []string{}, primary.RemoteReadClusters) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the following would be more expressive/readable: |
||
} | ||
|
||
func TestMaxSpanAgeSetErrorInArchiveMode(t *testing.T) { | ||
opts := NewOptions("es", archiveNamespace) | ||
_, command := config.Viperize(opts.AddFlags) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,7 @@ type SpanReaderParams struct { | |
TagDotReplacement string | ||
Archive bool | ||
UseReadWriteAliases bool | ||
RemoteReadClusters []string | ||
} | ||
|
||
// NewSpanReader returns a new SpanReader with a metrics. | ||
|
@@ -129,7 +130,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { | |
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), | ||
indexDateLayout: p.IndexDateLayout, | ||
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), | ||
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), | ||
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), | ||
sourceFn: getSourceFn(p.Archive, p.MaxDocCount), | ||
maxDocCount: p.MaxDocCount, | ||
} | ||
|
@@ -139,24 +140,44 @@ type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime t | |
|
||
type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource | ||
|
||
func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { | ||
func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { | ||
if archive { | ||
var archiveSuffix string | ||
if useReadWriteAliases { | ||
archiveSuffix = archiveReadIndexSuffix | ||
} else { | ||
archiveSuffix = archiveIndexSuffix | ||
} | ||
return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { | ||
return []string{archiveIndex(indexName, archiveSuffix)} | ||
} | ||
return addRemoteReadClusters(func(indexPrefix, indexDateLayout string, startTime time.Time, endTime time.Time) []string { | ||
return []string{archiveIndex(indexPrefix, archiveSuffix)} | ||
}, remoteReadClusters) | ||
} | ||
if useReadWriteAliases { | ||
return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { | ||
return []string{indices + "read"} | ||
return addRemoteReadClusters(func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { | ||
return []string{indexPrefix + "read"} | ||
}, remoteReadClusters) | ||
} | ||
return addRemoteReadClusters(timeRangeIndices, remoteReadClusters) | ||
} | ||
|
||
// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices. | ||
// Elasticsearch cross cluster api example GET /twitter,cluster_one:twitter,cluster_two:twitter/_search. | ||
func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) timeRangeIndexFn { | ||
return func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { | ||
jaegerIndices := fn(indexPrefix, indexDateLayout, startTime, endTime) | ||
if len(remoteReadClusters) == 0 { | ||
return jaegerIndices | ||
} | ||
|
||
for _, jaegerIndex := range jaegerIndices { | ||
for _, remoteCluster := range remoteReadClusters { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related to the earlier comment, if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be avoided now with the changes to |
||
remoteIndex := remoteCluster + ":" + jaegerIndex | ||
jaegerIndices = append(jaegerIndices, remoteIndex) | ||
} | ||
} | ||
|
||
return jaegerIndices | ||
} | ||
return timeRangeIndices | ||
} | ||
|
||
func getSourceFn(archive bool, maxDocCount int) sourceFn { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this line ever be hit? I believe
c.RemoteReadClusters
defaults to[]string{""}
because strings.Split() should result in a non-empty slice overwriting the initial empty slice.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @albertteoh - I didn't realize this nuance with
strings.Split()
but after looking at the docs, this makes sense. I didn't see a great (easy) way around this since the cli takes in strings, so I updatedoptions.go
to check the length and only split when necessary (otherwise default to[]string{}
). I added 2 tests cases around options as well to cover this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the test cases to cover the empty string. I think there's also the following case:
--es.remote-read-clusters=cluster_one,,cluster_two
, where the second item is empty but I think we can consider this a user error.