Skip to content

Commit

Permalink
Fix Elasticsearch version in ES OTEL writer (#2409)
Browse files Browse the repository at this point in the history
* Fix Elasticsearch version in ES OTEL writer

Signed-off-by: Pavol Loffay <[email protected]>

* fmt

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Aug 24, 2020
1 parent 09fde54 commit e7f5227
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func new(ctx context.Context, config *Config, params component.ExporterCreatePar
return nil, err
}
if config.Primary.IsCreateIndexTemplates() {
spanMapping, serviceMapping := es.GetSpanServiceMappings(esCfg.GetNumShards(), esCfg.GetNumReplicas(), esCfg.GetVersion())
spanMapping, serviceMapping := es.GetSpanServiceMappings(esCfg.GetNumShards(), esCfg.GetNumReplicas(), uint(w.esClientVersion()))
if err = w.CreateTemplates(ctx, spanMapping, serviceMapping); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,7 @@ type bulkItem struct {
// isService indicates that this bulk operation is for service index
isService bool
}

func (w *esSpanWriter) esClientVersion() int {
return w.client.MajorVersion()
}
11 changes: 7 additions & 4 deletions cmd/opentelemetry/app/internal/esclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type ElasticsearchClient interface {
Search(ctx context.Context, query SearchBody, size int, indices ...string) (*SearchResponse, error)
// MultiSearch searches data via /_msearch
MultiSearch(ctx context.Context, queries []SearchBody) (*MultiSearchResponse, error)

// Major version returns major ES version
MajorVersion() int
}

// BulkResponse is a response returned by Elasticsearch Bulk API
Expand Down Expand Up @@ -172,20 +175,20 @@ func NewElasticsearchClient(params config.Configuration, logger *zap.Logger) (El
if err != nil {
return nil, err
}
if params.GetVersion() == 0 {
esVersion := int(params.GetVersion())
if esVersion == 0 {
esPing := elasticsearchPing{
username: params.Username,
password: params.Password,
roundTripper: roundTripper,
}
esVersion, err := esPing.getVersion(params.Servers[0])
esVersion, err = esPing.getVersion(params.Servers[0])
if err != nil {
return nil, err
}
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))
params.Version = uint(esVersion)
}
return newElasticsearchClient(int(params.Version), clientConfig{
return newElasticsearchClient(esVersion, clientConfig{
DiscoverNotesOnStartup: params.Sniffer,
Addresses: params.Servers,
Username: params.Username,
Expand Down
4 changes: 4 additions & 0 deletions cmd/opentelemetry/app/internal/esclient/es6client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,7 @@ func (es *elasticsearch6Client) MultiSearch(ctx context.Context, queries []Searc
}
return r, nil
}

func (es *elasticsearch6Client) MajorVersion() int {
return 6
}
6 changes: 6 additions & 0 deletions cmd/opentelemetry/app/internal/esclient/es6client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ func TestES6MultiSearch(t *testing.T) {
return newElasticsearch6Client(clientConfig{}, tripper)
})
}

func TestES6Version(t *testing.T) {
c, err := newElasticsearch6Client(clientConfig{}, nil)
require.NoError(t, err)
assert.Equal(t, 6, c.MajorVersion())
}
4 changes: 4 additions & 0 deletions cmd/opentelemetry/app/internal/esclient/es7client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (es *elasticsearch7Client) MultiSearch(ctx context.Context, queries []Searc
return convertMultiSearchResponse(r), nil
}

func (es *elasticsearch7Client) MajorVersion() int {
return 7
}

func convertMultiSearchResponse(response *es7multiSearchResponse) *MultiSearchResponse {
mResponse := &MultiSearchResponse{}
for _, r := range response.Responses {
Expand Down
6 changes: 6 additions & 0 deletions cmd/opentelemetry/app/internal/esclient/es7client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ func TestES7MultiSearch(t *testing.T) {
return newElasticsearch7Client(clientConfig{}, tripper)
})
}

func TestES7Version(t *testing.T) {
c, err := newElasticsearch7Client(clientConfig{}, nil)
require.NoError(t, err)
assert.Equal(t, 7, c.MajorVersion())
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,7 @@ func (m *mockClient) Search(ctx context.Context, query esclient.SearchBody, size
func (m mockClient) MultiSearch(ctx context.Context, queries []esclient.SearchBody) (*esclient.MultiSearchResponse, error) {
panic("implement me")
}

func (m *mockClient) MajorVersion() int {
panic("implement me")
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,7 @@ func (m *mockClient) Search(ctx context.Context, query esclient.SearchBody, size
func (m *mockClient) MultiSearch(ctx context.Context, queries []esclient.SearchBody) (*esclient.MultiSearchResponse, error) {
return m.multiSearchResponse, nil
}

func (m *mockClient) MajorVersion() int {
panic("implement me")
}

0 comments on commit e7f5227

Please sign in to comment.