Skip to content

Commit

Permalink
Yuri's approach for swapping client
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <[email protected]>
  • Loading branch information
haanhvu committed Apr 18, 2023
1 parent d388310 commit d3b7485
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 235 deletions.
241 changes: 102 additions & 139 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/es"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/fswatcher"
"github.com/jaegertracing/jaeger/pkg/metrics"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand Down Expand Up @@ -78,8 +77,6 @@ type Configuration struct {
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
SendGetBodyAs string `mapstructure:"send_get_body_as"`
optionsChan chan []elastic.ClientOptionFunc
passwordFileWatcher *fswatcher.FSWatcher
}

// TagsAsFields holds configuration for tag schema.
Expand All @@ -98,7 +95,8 @@ type TagsAsFields struct {

// ClientBuilder creates new es.Client
type ClientBuilder interface {
ChannelNewClient(clientChan chan<- es.Client, logger *zap.Logger, metricsFactory metrics.Factory) error
NewClient(logger *zap.Logger, options []elastic.ClientOptionFunc, metricsFactory metrics.Factory) (es.Client, error)
GetConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error)
GetRemoteReadClusters() []string
GetNumShards() int64
GetNumReplicas() int64
Expand All @@ -115,6 +113,8 @@ type ClientBuilder interface {
GetTagDotReplacement() string
GetUseReadWriteAliases() bool
GetTokenFilePath() string
GetPasswordFilePath() string
GetUsername() string
IsStorageEnabled() bool
IsCreateIndexTemplates() bool
GetVersion() uint
Expand All @@ -124,115 +124,94 @@ type ClientBuilder interface {
GetSendGetBodyAs() string
}

// ChannelNewClient channel new ElasticSearch client.
// We need a channel because the password (if set from file) of client can change.
func (c *Configuration) ChannelNewClient(clientChan chan<- es.Client, logger *zap.Logger, metricsFactory metrics.Factory) error {
// NewClient creates a new ElasticSearch client
func (c *Configuration) NewClient(logger *zap.Logger, options []elastic.ClientOptionFunc, metricsFactory metrics.Factory) (es.Client, error) {
if len(c.Servers) < 1 {
return errors.New("no servers specified")
return nil, errors.New("no servers specified")
}

err := c.channelConfigOptions(logger)
rawClient, err := elastic.NewClient(options...)
if err != nil {
close(clientChan)
return err
return nil, err
}

go func() {
for options := range c.optionsChan {
rawClient, err := elastic.NewClient(options...)
if err != nil {
close(c.optionsChan)
close(clientChan)
logger.Error("Error creating ES client", zap.Error(err))
}

sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index")
m := sync.Map{}

service, err := rawClient.BulkProcessor().
Before(func(id int64, requests []elastic.BulkableRequest) {
m.Store(id, time.Now())
}).
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
start, ok := m.Load(id)
if !ok {
return
}
m.Delete(id)

// log individual errors, note that err might be false and these errors still present
if response != nil && response.Errors {
for _, it := range response.Items {
for key, val := range it {
if val.Error != nil {
logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key),
zap.Reflect("response", val))
}
}
}
}
sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index")
m := sync.Map{}

sm.Emit(err, time.Since(start.(time.Time)))
if err != nil {
var failed int
if response == nil {
failed = 0
} else {
failed = len(response.Failed())
service, err := rawClient.BulkProcessor().
Before(func(id int64, requests []elastic.BulkableRequest) {
m.Store(id, time.Now())
}).
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
start, ok := m.Load(id)
if !ok {
return
}
m.Delete(id)

// log individual errors, note that err might be false and these errors still present
if response != nil && response.Errors {
for _, it := range response.Items {
for key, val := range it {
if val.Error != nil {
logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key),
zap.Reflect("response", val))
}
total := len(requests)
logger.Error("Elasticsearch could not process bulk request",
zap.Int("request_count", total),
zap.Int("failed_count", failed),
zap.Error(err),
zap.Any("response", response))
}
}).
BulkSize(c.BulkSize).
Workers(c.BulkWorkers).
BulkActions(c.BulkActions).
FlushInterval(c.BulkFlushInterval).
Do(context.Background())
if err != nil {
close(c.optionsChan)
close(clientChan)
logger.Error("Error setting up concurrent processor of bulk requests", zap.Error(err))
}
}

if c.Version == 0 {
// Determine ElasticSearch Version
pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background())
if err != nil {
close(c.optionsChan)
close(clientChan)
logger.Error("Error checking node exists", zap.Error(err))
}
esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0]))
if err != nil {
close(c.optionsChan)
close(clientChan)
logger.Error("Error parsing ES version", zap.Error(err))
}
// OpenSearch is based on ES 7.x
if strings.Contains(pingResult.TagLine, "OpenSearch") {
if pingResult.Version.Number[0] == '1' {
logger.Info("OpenSearch 1.x detected, using ES 7.x index mappings")
esVersion = 7
}
if pingResult.Version.Number[0] == '2' {
logger.Info("OpenSearch 2.x detected, using ES 7.x index mappings")
esVersion = 7
}
sm.Emit(err, time.Since(start.(time.Time)))
if err != nil {
var failed int
if response == nil {
failed = 0
} else {
failed = len(response.Failed())
}
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))
c.Version = uint(esVersion)
total := len(requests)
logger.Error("Elasticsearch could not process bulk request",
zap.Int("request_count", total),
zap.Int("failed_count", failed),
zap.Error(err),
zap.Any("response", response))
}
}).
BulkSize(c.BulkSize).
Workers(c.BulkWorkers).
BulkActions(c.BulkActions).
FlushInterval(c.BulkFlushInterval).
Do(context.Background())
if err != nil {
return nil, err
}

clientChan <- eswrapper.WrapESClient(rawClient, service, c.Version)
if c.Version == 0 {
// Determine ElasticSearch Version
pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background())
if err != nil {
return nil, err
}
esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0]))
if err != nil {
return nil, err
}
}()
// OpenSearch is based on ES 7.x
if strings.Contains(pingResult.TagLine, "OpenSearch") {
if pingResult.Version.Number[0] == '1' {
logger.Info("OpenSearch 1.x detected, using ES 7.x index mappings")
esVersion = 7
}
if pingResult.Version.Number[0] == '2' {
logger.Info("OpenSearch 2.x detected, using ES 7.x index mappings")
esVersion = 7
}
}
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))
c.Version = uint(esVersion)
}

return nil
return eswrapper.WrapESClient(rawClient, service, c.Version), nil
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
Expand Down Expand Up @@ -403,6 +382,16 @@ func (c *Configuration) GetTokenFilePath() string {
return c.TokenFilePath
}

// GetPasswordFilePath returns file path containing the password
func (c *Configuration) GetPasswordFilePath() string {
return c.PasswordFilePath
}

// GetUsername returns the username of Elasticsearch client
func (c *Configuration) GetUsername() string {
return c.Username
}

// IsStorageEnabled determines whether storage is enabled
func (c *Configuration) IsStorageEnabled() bool {
return c.Enabled
Expand Down Expand Up @@ -444,15 +433,12 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) {
return tags, nil
}

// channelConfigOptions wraps the configs to feed to the ElasticSearch client init.
// We need a channel because the password from PasswordFilePath can change.
func (c *Configuration) channelConfigOptions(logger *zap.Logger) error {
// GetConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) {
if c.Password != "" && c.PasswordFilePath != "" {
return fmt.Errorf("both Password and PasswordFilePath are set")
return nil, fmt.Errorf("both Password and PasswordFilePath are set")
}

c.optionsChan = make(chan []elastic.ClientOptionFunc, 1)

options := []elastic.ClientOptionFunc{
elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
// Disable health check when token from context is allowed, this is because at this time
Expand All @@ -468,55 +454,32 @@ func (c *Configuration) channelConfigOptions(logger *zap.Logger) error {
}
options = append(options, elastic.SetHttpClient(httpClient))

if c.SendGetBodyAs != "" {
options = append(options, elastic.SetSendGetBodyAs(c.SendGetBodyAs))
}

options, err := addLoggerOptions(options, c.LogLevel)
if err != nil {
close(c.optionsChan)
return err
}

transport, err := GetHTTPRoundTripper(c, logger)
if err != nil {
close(c.optionsChan)
return err
}
httpClient.Transport = transport

if c.Password != "" {
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))
c.optionsChan <- options
close(c.optionsChan)
}

if c.PasswordFilePath != "" {
passwordFromFile, err := loadFileContent(c.PasswordFilePath)
if err != nil {
close(c.optionsChan)
return err
return nil, fmt.Errorf("failed to load password from file: %w", err)
}
options = append(options, elastic.SetBasicAuth(c.Username, passwordFromFile))
c.optionsChan <- options
}

onPasswordChange := func() { c.onPasswordChange(c.optionsChan, options) }
c.passwordFileWatcher, err = fswatcher.New([]string{c.PasswordFilePath}, onPasswordChange, logger)
if err != nil {
close(c.optionsChan)
return err
}
if c.SendGetBodyAs != "" {
options = append(options, elastic.SetSendGetBodyAs(c.SendGetBodyAs))
}

return nil
}
options, err := addLoggerOptions(options, c.LogLevel)
if err != nil {
return options, err
}

func (c *Configuration) onPasswordChange(optionsChan chan<- []elastic.ClientOptionFunc, options []elastic.ClientOptionFunc) {
passwordFromFile, _ := loadFileContent(c.PasswordFilePath)
if passwordFromFile != "" {
options = append(options, elastic.SetBasicAuth(c.Username, passwordFromFile))
optionsChan <- options
transport, err := GetHTTPRoundTripper(c, logger)
if err != nil {
return nil, err
}
httpClient.Transport = transport
return options, nil
}

func addLoggerOptions(options []elastic.ClientOptionFunc, logLevel string) ([]elastic.ClientOptionFunc, error) {
Expand Down
10 changes: 5 additions & 5 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client es.Client
client func() es.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
Expand All @@ -48,7 +48,7 @@ type DependencyStore struct {

// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client es.Client
Client func() es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
Expand Down Expand Up @@ -84,15 +84,15 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D

// CreateTemplates creates index templates.
func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error {
_, err := s.client.CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
_, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
if err != nil {
return err
}
return nil
}

func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
s.client.Index().Index(indexName).Type(dependencyType).
s.client().Index().Index(indexName).Type(dependencyType).
BodyJson(&dbmodel.TimeDependencies{
Timestamp: ts,
Dependencies: dbmodel.FromDomainDependencies(dependencies),
Expand All @@ -102,7 +102,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := s.getReadIndices(endTs, lookback)
searchResult, err := s.client.Search(indices...).
searchResult, err := s.client().Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -51,7 +52,7 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
Expand All @@ -78,7 +79,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
IndexDateLayout: "2006-01-02",
Expand Down
Loading

0 comments on commit d3b7485

Please sign in to comment.