Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reloading ES client's password from file #4342

Merged
merged 15 commits into from
Sep 9, 2023
17 changes: 15 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Configuration struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
PasswordFilePath string `mapstructure:"password_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
Expand Down Expand Up @@ -296,6 +297,10 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) {

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) {
if c.Password != "" && c.PasswordFilePath != "" {
return nil, fmt.Errorf("both Password and PasswordFilePath are set")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move just before L319, to keep related logic together


options := []elastic.ClientOptionFunc{
elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
// Disable health check when token from context is allowed, this is because at this time
Expand All @@ -310,6 +315,14 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp
Timeout: c.Timeout,
}
options = append(options, elastic.SetHttpClient(httpClient))

if c.PasswordFilePath != "" {
passwordFromFile, err := LoadFileContent(c.PasswordFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load password from file: %w", err)
}
c.Password = passwordFromFile
}
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))

if c.SendGetBodyAs != "" {
Expand Down Expand Up @@ -396,7 +409,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
if c.AllowTokenFromContext {
logger.Warn("Token file and token propagation are both enabled, token from file won't be used")
}
tokenFromFile, err := loadToken(c.TokenFilePath)
tokenFromFile, err := LoadFileContent(c.TokenFilePath)
if err != nil {
return nil, err
}
Expand All @@ -412,7 +425,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
return transport, nil
}

func loadToken(path string) (string, error) {
func LoadFileContent(path string) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO this should probably remain being called LoadToken, since it's not a plain file read

b, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return "", err
Expand Down
10 changes: 5 additions & 5 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client es.Client
client func() es.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
Expand All @@ -48,7 +48,7 @@ type DependencyStore struct {

// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client es.Client
Client func() es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
Expand Down Expand Up @@ -84,15 +84,15 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D

// CreateTemplates creates index templates.
func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error {
_, err := s.client.CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
_, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
if err != nil {
return err
}
return nil
}

func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
s.client.Index().Index(indexName).Type(dependencyType).
s.client().Index().Index(indexName).Type(dependencyType).
BodyJson(&dbmodel.TimeDependencies{
Timestamp: ts,
Dependencies: dbmodel.FromDomainDependencies(dependencies),
Expand All @@ -102,7 +102,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := s.getReadIndices(endTs, lookback)
searchResult, err := s.client.Search(indices...).
searchResult, err := s.client().Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -51,7 +52,7 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
Expand All @@ -78,7 +79,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
IndexDateLayout: "2006-01-02",
Expand Down
113 changes: 86 additions & 27 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
package es

import (
"errors"
"flag"
"fmt"
"io"
"sync/atomic"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/fswatcher"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore"
Expand Down Expand Up @@ -53,10 +56,12 @@ type Factory struct {

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
primaryClient es.Client
archiveConfig *config.Configuration
archiveClient es.Client
primaryConfig atomic.Pointer[config.Configuration]
primaryClient atomic.Pointer[es.Client]
archiveConfig atomic.Pointer[config.Configuration]
archiveClient atomic.Pointer[es.Client]

watchers []*fswatcher.FSWatcher
}

// NewFactory creates a new Factory.
Expand All @@ -75,80 +80,95 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
f.Options.InitFromViper(v)
f.primaryConfig = f.Options.GetPrimary()
f.archiveConfig = f.Options.Get(archiveNamespace)
f.primaryConfig.Store(f.Options.GetPrimary())
f.archiveConfig.Store(f.Options.Get(archiveNamespace))
}

// InitFromOptions configures factory from Options struct.
func (f *Factory) InitFromOptions(o Options) {
f.Options = &o
f.primaryConfig = f.Options.GetPrimary()
f.primaryConfig.Store(f.Options.GetPrimary())
if cfg := f.Options.Get(archiveNamespace); cfg != nil {
f.archiveConfig = cfg
f.archiveConfig.Store(cfg)
}
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
primaryClient, err := f.newClientFn(f.primaryConfig.Load(), logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient = primaryClient
if f.archiveConfig.Enabled {
f.archiveClient, err = f.newClientFn(f.archiveConfig, logger, metricsFactory)
f.primaryClient.Store(&primaryClient)

primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Load().PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating a watcher should be conditional on PasswordFilePath != ""

if err != nil {
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err)
}
f.watchers = append(f.watchers, primaryWatcher)

if f.archiveConfig.Load().Enabled {
archiveClient, err := f.newClientFn(f.archiveConfig.Load(), logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
f.archiveClient.Store(&archiveClient)

archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Load().PasswordFilePath}, f.onArchivePasswordChange, f.logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also conditional

if err != nil {
return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err)
}
f.watchers = append(f.watchers, archiveWatcher)
}

return nil
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false)
return createSpanReader(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig.Load(), false)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false)
return createSpanWriter(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig.Load(), false)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig)
return createDependencyReader(f.logger, &f.primaryClient, f.primaryConfig.Load())
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
if !f.archiveConfig.Load().Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
return createSpanReader(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig.Load(), true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
if !f.archiveConfig.Load().Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
return createSpanWriter(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig.Load(), true)
}

func createSpanReader(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
client *atomic.Pointer[es.Client],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding a function to the factory and passing it as argument to createSpanReader() and similar functions. This will reduce the coupling: createXyz functions would not need to know about atomic pointers.

func (f *Factory) getPrimaryClient() es.Client { 
    return f.primaryClient.Load()
}

cfg *config.Configuration,
archive bool,
) (spanstore.Reader, error) {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Client: func() es.Client { return *client.Load() },
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.MaxDocCount,
Expand All @@ -168,7 +188,7 @@ func createSpanReader(
func createSpanWriter(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
client *atomic.Pointer[es.Client],
cfg *config.Configuration,
archive bool,
) (spanstore.Writer, error) {
Expand Down Expand Up @@ -196,7 +216,7 @@ func createSpanWriter(
return nil, err
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Client: func() es.Client { return *client.Load() },
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.IndexPrefix,
Expand All @@ -221,11 +241,11 @@ func createSpanWriter(

func createDependencyReader(
logger *zap.Logger,
client es.Client,
client *atomic.Pointer[es.Client],
cfg *config.Configuration,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Client: func() es.Client { return *client.Load() },
Logger: logger,
IndexPrefix: cfg.IndexPrefix,
IndexDateLayout: cfg.IndexDateLayoutDependencies,
Expand All @@ -239,8 +259,47 @@ var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
for _, w := range f.watchers {
errs = append(errs, w.Close())
}
if cfg := f.Options.Get(archiveNamespace); cfg != nil {
cfg.TLS.Close()
errs = append(errs, cfg.TLS.Close())
}
errs = append(errs, f.Options.GetPrimary().TLS.Close())
return errors.Join(errs...)
}

func (f *Factory) onPrimaryPasswordChange() {
newPrimaryCfg := *f.primaryConfig.Load()
newPrimaryPassword, err := config.LoadFileContent(newPrimaryCfg.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for primary Elasticsearch client", zap.Error(err))
} else {
newPrimaryCfg.Password = newPrimaryPassword
f.primaryConfig.Store(&newPrimaryCfg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need to overwrite the primary config:

newConfig := primaryConfig // copy by value
newConfig.Password = newPrimaryPassword
f.newClientFn(newConfig, ...)

}
primaryClient, err := f.newClientFn(f.primaryConfig.Load(), f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err))
} else {
f.primaryClient.Store(&primaryClient)
}
}

func (f *Factory) onArchivePasswordChange() {
newArchiveCfg := *f.archiveConfig.Load()
newPassword, err := config.LoadFileContent(newArchiveCfg.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for archive Elasticsearch client", zap.Error(err))
} else {
newArchiveCfg.Password = newPassword
f.archiveConfig.Store(&newArchiveCfg)
}
archiveClient, err := f.newClientFn(f.archiveConfig.Load(), f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err))
} else {
f.archiveClient.Store(&archiveClient)
}
return f.Options.GetPrimary().TLS.Close()
}
Loading