Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Elasticsearch 8.x #4829

Merged
merged 18 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci-elasticsearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jobs:
- major: 7.x
image: 7.14.0
distribution: elasticsearch
- major: 8.x
image: 8.8.2
distribution: elasticsearch
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }}
steps:
- name: Harden Runner
Expand Down
20 changes: 11 additions & 9 deletions cmd/es-rollover/app/init/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ type Action struct {

func (c Action) getMapping(version uint, templateName string) (string, error) {
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
TemplateBuilder: es.TextTemplateBuilder{},
PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate),
PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate),
PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure PR description reflects all the changes, and explain why they are needed (especially the priority)

Shards: int64(c.Config.Shards),
Replicas: int64(c.Config.Replicas),
IndexPrefix: c.Config.IndexPrefix,
UseILM: c.Config.UseILM,
ILMPolicyName: c.Config.ILMPolicyName,
EsVersion: version,
}
return mappingBuilder.GetMapping(templateName)
}
Expand All @@ -57,7 +60,7 @@ func (c Action) Do() error {
return err
}
if c.Config.UseILM {
if version == ilmVersionSupport {
if version >= ilmVersionSupport {
policyExist, err := c.ILMClient.Exists(c.Config.ILMPolicyName)
if err != nil {
return err
Expand Down Expand Up @@ -109,7 +112,6 @@ func (c Action) init(version uint, indexopt app.IndexOption) error {
if err != nil {
return err
}

err = c.IndicesClient.CreateTemplate(mapping, indexopt.TemplateName())
if err != nil {
return err
Expand Down
20 changes: 16 additions & 4 deletions cmd/es-rollover/app/init/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,37 @@ import (
)

const (
shards = "shards"
replicas = "replicas"
shards = "shards"
replicas = "replicas"
prioritySpanTemplate = "priority-span-template"
priorityServiceTemplate = "priority-service-template"
priorityDependenciesTemplate = "priority-dependencies-template"
)

// Config holds configuration for index cleaner binary.
type Config struct {
app.Config
Shards int
Replicas int
Shards int
Replicas int
PrioritySpanTemplate int
PriorityServiceTemplate int
PriorityDependenciesTemplate int
}

// AddFlags adds flags for TLS to the FlagSet.
func (c *Config) AddFlags(flags *flag.FlagSet) {
flags.Int(shards, 5, "Number of shards")
flags.Int(replicas, 1, "Number of replicas")
flags.Int(prioritySpanTemplate, 0, "Priority of jaeger-span index template (ESv8 only)")
flags.Int(priorityServiceTemplate, 0, "Priority of jaeger-service index template (ESv8 only)")
flags.Int(priorityDependenciesTemplate, 0, "Priority of jaeger-dependecies index template (ESv8 only)")
}

// InitFromViper initializes config from viper.Viper.
func (c *Config) InitFromViper(v *viper.Viper) {
c.Shards = v.GetInt(shards)
c.Replicas = v.GetInt(replicas)
c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate)
c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate)
c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate)
}
6 changes: 6 additions & 0 deletions cmd/es-rollover/app/init/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ func TestBindFlags(t *testing.T) {
err := command.ParseFlags([]string{
"--shards=8",
"--replicas=16",
"--priority-span-template=300",
"--priority-service-template=301",
"--priority-dependencies-template=302",
})
require.NoError(t, err)

c.InitFromViper(v)
assert.Equal(t, 8, c.Shards)
assert.Equal(t, 16, c.Replicas)
assert.Equal(t, 300, c.PrioritySpanTemplate)
assert.Equal(t, 301, c.PriorityServiceTemplate)
assert.Equal(t, 302, c.PriorityDependenciesTemplate)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/bsm/sarama-cluster v2.1.13+incompatible
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/elastic/go-elasticsearch/v8 v8.11.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-kit/kit v0.13.0
github.com/go-logr/zapr v1.3.0
Expand Down Expand Up @@ -109,6 +110,7 @@ require (
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-kit/log v0.2.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v8 v8.10.0 h1:ALg3DMxSrx07YmeMNcfPf7cFh1Ep2+Qa19EOXTbwr2k=
github.com/elastic/go-elasticsearch/v8 v8.10.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE=
github.com/elastic/go-elasticsearch/v8 v8.11.0 h1:gUazf443rdYAEAD7JHX5lSXRgTkG4N4IcsV8dcWQPxM=
github.com/elastic/go-elasticsearch/v8 v8.11.0/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
3 changes: 2 additions & 1 deletion pkg/es/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func newResponseError(err error, code int, body []byte) ResponseError {
}
}

// Client is a generic client to make requests to ES
// Client executes requests against Elasticsearch using direct HTTP calls,
// without using the official Go client for ES.
type Client struct {
// Http client.
Client *http.Client
Expand Down
2 changes: 1 addition & 1 deletion pkg/es/client/cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *ClusterClient) Version() (uint, error) {
TagLine string `json:"tagline"`
}
body, err := c.request(elasticRequest{
endpoint: "/",
endpoint: "",
method: http.MethodGet,
})
if err != nil {
Expand Down
20 changes: 18 additions & 2 deletions pkg/es/client/cluster_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ const opensearch2 = `
`

const elasticsearch7 = `

{
"name" : "elasticsearch-0",
"cluster_name" : "clustername",
Expand All @@ -124,8 +123,17 @@ const elasticsearch7 = `
}
`

const elasticsearch6 = `
const elasticsearch8 = `
{
"name" : "elasticsearch-0",
"version" : {
"number" : "8.0.0"
},
"tagline" : "You Know, for Search"
}
`

const elasticsearch6 = `
{
"name" : "elasticsearch-0",
"cluster_name" : "clustername",
Expand Down Expand Up @@ -165,6 +173,12 @@ func TestVersion(t *testing.T) {
response: elasticsearch7,
expectedResult: 7,
},
{
name: "success with elasticsearch 8",
responseCode: http.StatusOK,
response: elasticsearch8,
expectedResult: 8,
},
{
name: "success with opensearch 1",
responseCode: http.StatusOK,
Expand Down Expand Up @@ -223,7 +237,9 @@ func TestVersion(t *testing.T) {
if test.errContains != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errContains)
return
}
require.NoError(t, err)
assert.Equal(t, test.expectedResult, result)
})
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/es/client/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,23 @@
return err
}

func (i IndicesClient) version() (uint, error) {
cl := ClusterClient{Client: i.Client}
return cl.Version()
}

// CreateTemplate an ES index template
func (i IndicesClient) CreateTemplate(template, name string) error {
endpointFmt := "_template/%s"
if v, err := i.version(); err == nil {
if v >= 8 {
endpointFmt = "_index_template/%s"
}
} else {
return err
}

Check warning on line 248 in pkg/es/client/index_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/client/index_client.go#L246-L248

Added lines #L246 - L248 were not covered by tests
_, err := i.request(elasticRequest{
endpoint: fmt.Sprintf("_template/%s", name),
endpoint: fmt.Sprintf(endpointFmt, name),
method: http.MethodPut,
body: []byte(template),
})
Expand Down
15 changes: 14 additions & 1 deletion pkg/es/client/index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,24 @@ func TestClientCreateTemplate(t *testing.T) {
templateContent := "template content"
tests := []struct {
name string
versionResp string
responseCode int
response string
errContains string
}{
{
name: "success",
name: "success/v7",
versionResp: elasticsearch7,
responseCode: http.StatusOK,
},
{
name: "success/v8",
versionResp: elasticsearch8,
responseCode: http.StatusOK,
},
{
name: "client error",
versionResp: elasticsearch7,
responseCode: http.StatusBadRequest,
response: esErrResponse,
errContains: "failed to create template: jaeger-template",
Expand All @@ -502,6 +510,11 @@ func TestClientCreateTemplate(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
if req.URL.String() == "/" { // ES version check
res.WriteHeader(http.StatusOK)
res.Write([]byte(test.versionResp))
return
}
assert.True(t, strings.HasSuffix(req.URL.String(), "_template/jaeger-template"))
assert.Equal(t, http.MethodPut, req.Method)
assert.Equal(t, "Basic foobar", req.Header.Get("Authorization"))
Expand Down
39 changes: 37 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"
"time"

esV8 "github.com/elastic/go-elasticsearch/v8"
"github.com/olivere/elastic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -57,6 +58,9 @@ type Configuration struct {
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
PrioritySpanTemplate int64 `yaml:"priority_span_template" mapstructure:"priority_span_template"`
PriorityServiceTemplate int64 `yaml:"priority_service_template" mapstructure:"priority_service_template"`
PriorityDependenciesTemplate int64 `yaml:"priority_dependencies_template" mapstructure:"priority_dependencies_template"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
Expand Down Expand Up @@ -111,7 +115,7 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index")
m := sync.Map{}

service, err := rawClient.BulkProcessor().
bulkProc, err := rawClient.BulkProcessor().
Before(func(id int64, requests []elastic.BulkableRequest) {
m.Store(id, time.Now())
}).
Expand Down Expand Up @@ -184,7 +188,29 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
c.Version = uint(esVersion)
}

return eswrapper.WrapESClient(rawClient, service, c.Version), nil
var rawClientV8 *esV8.Client
if c.Version >= 8 {
rawClientV8, err = newElasticsearchV8(c, logger)
if err != nil {
return nil, fmt.Errorf("error creating v8 client: %v", err)
}
}

return eswrapper.WrapESClient(rawClient, bulkProc, c.Version, rawClientV8), nil
}

func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, error) {
var options esV8.Config
options.Addresses = c.Servers
options.Username = c.Username
options.Password = c.Password
options.DiscoverNodesOnStart = c.Sniffer
transport, err := GetHTTPRoundTripper(c, logger)
if err != nil {
return nil, err
}
options.Transport = transport
return esV8.NewClient(options)
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
Expand All @@ -210,6 +236,15 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.NumReplicas == 0 {
c.NumReplicas = source.NumReplicas
}
if c.PrioritySpanTemplate == 0 {
c.PrioritySpanTemplate = source.PrioritySpanTemplate
}
if c.PriorityServiceTemplate == 0 {
c.PriorityServiceTemplate = source.PriorityServiceTemplate
}
if c.PrioritySpanTemplate == 0 {
c.PriorityDependenciesTemplate = source.PriorityDependenciesTemplate
}
if c.BulkSize == 0 {
c.BulkSize = source.BulkSize
}
Expand Down
Loading
Loading