Skip to content

Commit

Permalink
Make client receive value from client channel
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <[email protected]>
  • Loading branch information
haanhvu committed Apr 15, 2023
1 parent 42099ca commit 546c472
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 127 deletions.
201 changes: 98 additions & 103 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type Configuration struct {
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
SendGetBodyAs string `mapstructure:"send_get_body_as"`
optionsChan chan []elastic.ClientOptionFunc
passwordFileWatcher *fswatcher.FSWatcher
}

// TagsAsFields holds configuration for tag schema.
Expand Down Expand Up @@ -129,113 +131,104 @@ func (c *Configuration) ChannelNewClient(clientChan chan<- es.Client, logger *za
return errors.New("no servers specified")
}

optionsChan := make(chan []elastic.ClientOptionFunc, 1)

err := c.channelConfigOptions(optionsChan, logger)
err := c.channelConfigOptions(logger)
if err != nil {
close(clientChan)
return err
}

go func() {
for {
select {
case options, ok := <-optionsChan:
if !ok {
return
}

rawClient, err := elastic.NewClient(options...)
if err != nil {
close(optionsChan)
close(clientChan)
logger.Error("Error creating ES client", zap.Error(err))
}
for options := range c.optionsChan {
rawClient, err := elastic.NewClient(options...)
if err != nil {
close(c.optionsChan)
close(clientChan)
logger.Error("Error creating ES client", zap.Error(err))
}

sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index")
m := sync.Map{}

service, err := rawClient.BulkProcessor().
Before(func(id int64, requests []elastic.BulkableRequest) {
m.Store(id, time.Now())
}).
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
start, ok := m.Load(id)
if !ok {
return
}
m.Delete(id)

// log individual errors, note that err might be false and these errors still present
if response != nil && response.Errors {
for _, it := range response.Items {
for key, val := range it {
if val.Error != nil {
logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key),
zap.Reflect("response", val))
}
sm := storageMetrics.NewWriteMetrics(metricsFactory, "bulk_index")
m := sync.Map{}

service, err := rawClient.BulkProcessor().
Before(func(id int64, requests []elastic.BulkableRequest) {
m.Store(id, time.Now())
}).
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
start, ok := m.Load(id)
if !ok {
return
}
m.Delete(id)

// log individual errors, note that err might be false and these errors still present
if response != nil && response.Errors {
for _, it := range response.Items {
for key, val := range it {
if val.Error != nil {
logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key),
zap.Reflect("response", val))
}
}
}
}

sm.Emit(err, time.Since(start.(time.Time)))
if err != nil {
var failed int
if response == nil {
failed = 0
} else {
failed = len(response.Failed())
}
total := len(requests)
logger.Error("Elasticsearch could not process bulk request",
zap.Int("request_count", total),
zap.Int("failed_count", failed),
zap.Error(err),
zap.Any("response", response))
sm.Emit(err, time.Since(start.(time.Time)))
if err != nil {
var failed int
if response == nil {
failed = 0
} else {
failed = len(response.Failed())
}
}).
BulkSize(c.BulkSize).
Workers(c.BulkWorkers).
BulkActions(c.BulkActions).
FlushInterval(c.BulkFlushInterval).
Do(context.Background())
total := len(requests)
logger.Error("Elasticsearch could not process bulk request",
zap.Int("request_count", total),
zap.Int("failed_count", failed),
zap.Error(err),
zap.Any("response", response))
}
}).
BulkSize(c.BulkSize).
Workers(c.BulkWorkers).
BulkActions(c.BulkActions).
FlushInterval(c.BulkFlushInterval).
Do(context.Background())
if err != nil {
close(c.optionsChan)
close(clientChan)
logger.Error("Error setting up concurrent processor of bulk requests", zap.Error(err))
}

if c.Version == 0 {
// Determine ElasticSearch Version
pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background())
if err != nil {
close(optionsChan)
close(c.optionsChan)
close(clientChan)
logger.Error("Error setting up concurrent processor of bulk requests", zap.Error(err))
logger.Error("Error checking node exists", zap.Error(err))
}

if c.Version == 0 {
// Determine ElasticSearch Version
pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background())
if err != nil {
close(optionsChan)
close(clientChan)
logger.Error("Error checking node exists", zap.Error(err))
}
esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0]))
if err != nil {
close(optionsChan)
close(clientChan)
logger.Error("Error parsing ES version", zap.Error(err))
esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0]))
if err != nil {
close(c.optionsChan)
close(clientChan)
logger.Error("Error parsing ES version", zap.Error(err))
}
// OpenSearch is based on ES 7.x
if strings.Contains(pingResult.TagLine, "OpenSearch") {
if pingResult.Version.Number[0] == '1' {
logger.Info("OpenSearch 1.x detected, using ES 7.x index mappings")
esVersion = 7
}
// OpenSearch is based on ES 7.x
if strings.Contains(pingResult.TagLine, "OpenSearch") {
if pingResult.Version.Number[0] == '1' {
logger.Info("OpenSearch 1.x detected, using ES 7.x index mappings")
esVersion = 7
}
if pingResult.Version.Number[0] == '2' {
logger.Info("OpenSearch 2.x detected, using ES 7.x index mappings")
esVersion = 7
}
if pingResult.Version.Number[0] == '2' {
logger.Info("OpenSearch 2.x detected, using ES 7.x index mappings")
esVersion = 7
}
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))
c.Version = uint(esVersion)
}

clientChan <- eswrapper.WrapESClient(rawClient, service, c.Version)
logger.Info("Elasticsearch detected", zap.Int("version", esVersion))
c.Version = uint(esVersion)
}

clientChan <- eswrapper.WrapESClient(rawClient, service, c.Version)
}
}()

Expand Down Expand Up @@ -453,12 +446,13 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) {

// channelConfigOptions wraps the configs to feed to the ElasticSearch client init.
// We need a channel because the password from PasswordFilePath can change.
func (c *Configuration) channelConfigOptions(optionsChan chan<- []elastic.ClientOptionFunc, logger *zap.Logger) error {
func (c *Configuration) channelConfigOptions(logger *zap.Logger) error {
if c.Password != "" && c.PasswordFilePath != "" {
close(optionsChan)
return fmt.Errorf("Both Password and PasswordFilePath are set. Only one can be set.")
return fmt.Errorf("both Password and PasswordFilePath are set")
}

c.optionsChan = make(chan []elastic.ClientOptionFunc, 1)

options := []elastic.ClientOptionFunc{
elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
// Disable health check when token from context is allowed, this is because at this time
Expand All @@ -474,45 +468,46 @@ func (c *Configuration) channelConfigOptions(optionsChan chan<- []elastic.Client
}
options = append(options, elastic.SetHttpClient(httpClient))

if c.Password != "" {
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))
}

if c.SendGetBodyAs != "" {
options = append(options, elastic.SetSendGetBodyAs(c.SendGetBodyAs))
}

options, err := addLoggerOptions(options, c.LogLevel)
if err != nil {
close(optionsChan)
close(c.optionsChan)
return err
}

transport, err := GetHTTPRoundTripper(c, logger)
if err != nil {
close(optionsChan)
close(c.optionsChan)
return err
}
httpClient.Transport = transport

if c.Password != "" {
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))
c.optionsChan <- options
close(c.optionsChan)
}

if c.PasswordFilePath != "" {
passwordFromFile, err := loadFileContent(c.PasswordFilePath)
if err != nil {
close(optionsChan)
close(c.optionsChan)
return err
}
options = append(options, elastic.SetBasicAuth(c.Username, passwordFromFile))
c.optionsChan <- options

onPasswordChange := func() { c.onPasswordChange(optionsChan, options) }
_, err = fswatcher.New([]string{c.PasswordFilePath}, onPasswordChange, logger)
onPasswordChange := func() { c.onPasswordChange(c.optionsChan, options) }
c.passwordFileWatcher, err = fswatcher.New([]string{c.PasswordFilePath}, onPasswordChange, logger)
if err != nil {
close(optionsChan)
close(c.optionsChan)
return err
}
}

optionsChan <- options

return nil
}

Expand Down
45 changes: 24 additions & 21 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger

primaryConfig config.ClientBuilder
primaryClient es.Client
archiveConfig config.ClientBuilder
archiveClient es.Client
primaryConfig config.ClientBuilder
primaryClientChan chan es.Client
primaryClient es.Client
archiveConfig config.ClientBuilder
archiveClientChan chan es.Client
archiveClient es.Client
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -89,44 +91,45 @@ func (f *Factory) InitFromOptions(o Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

err := f.assignClientFromChannel(f.primaryConfig, f.primaryClient, metricsFactory, logger)
f.primaryClientChan = make(chan es.Client, 1)
err := f.primaryConfig.ChannelNewClient(f.primaryClientChan, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
close(f.primaryClientChan)
return fmt.Errorf("failed to channel primary Elasticsearch client: %w", err)
}
go func() {
f.primaryClient = <-f.primaryClientChan
}()

if f.archiveConfig.IsStorageEnabled() {
err = f.assignClientFromChannel(f.archiveConfig, f.archiveClient, metricsFactory, logger)
f.archiveClientChan = make(chan es.Client, 1)
err := f.archiveConfig.ChannelNewClient(f.archiveClientChan, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
close(f.archiveClientChan)
return fmt.Errorf("failed to channel archive Elasticsearch client: %w", err)
}
go func() {
f.archiveClient = <-f.archiveClientChan
}()
}

return nil
}

func (f *Factory) assignClientFromChannel(config config.ClientBuilder, esClient es.Client, metricsFactory metrics.Factory, logger *zap.Logger) error {
clientChan := make(chan es.Client, 1)

/*func (f *Factory) assignClientFromChannel(config config.ClientBuilder, clientChan chan es.Client, esClient es.Client, metricsFactory metrics.Factory, logger *zap.Logger) error {
err := config.ChannelNewClient(clientChan, logger, metricsFactory)
if err != nil {
return err
}
go func() {
for {
select {
case client, ok := <-clientChan:
if !ok {
return
}

esClient = client
}
for client := range clientChan {
esClient = <-clientChan
}
}()
return nil
}
}*/

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
Expand Down
7 changes: 4 additions & 3 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func (m *mockClientBuilder) ChannelNewClient(clientChan chan<- es.Client, logger
c.On("CreateTemplate", mock.Anything).Return(tService)
c.On("GetVersion").Return(uint(6))
clientChan <- c
return nil
}
return m.err
}
Expand All @@ -64,11 +63,11 @@ func TestElasticsearchFactory(t *testing.T) {
// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
// so we override it with a mock.
f.primaryConfig = &mockClientBuilder{err: errors.New("made-up error")}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error")
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to channel primary Elasticsearch client: made-up error")

f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2"), Configuration: escfg.Configuration{Enabled: true}}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2")
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to channel archive Elasticsearch client: made-up error2")

f.archiveConfig = &mockClientBuilder{}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand Down Expand Up @@ -97,6 +96,8 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) {
f.primaryConfig = mockConf
f.archiveConfig = mockConf
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
assert.NotNil(t, <-f.primaryClientChan)
assert.NotNil(t, f.primaryClient)
r, err := f.CreateSpanWriter()
require.Error(t, err)
assert.Nil(t, r)
Expand Down

0 comments on commit 546c472

Please sign in to comment.