Skip to content

Commit

Permalink
Make config atomic and add test on primary config
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <[email protected]>
  • Loading branch information
haanhvu committed Apr 28, 2023
1 parent 82e8ff7 commit b0a4821
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 32 deletions.
6 changes: 3 additions & 3 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp
options = append(options, elastic.SetHttpClient(httpClient))

if c.PasswordFilePath != "" {
passwordFromFile, err := loadFileContent(c.PasswordFilePath)
passwordFromFile, err := LoadFileContent(c.PasswordFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load password from file: %w", err)
}
Expand Down Expand Up @@ -409,7 +409,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
if c.AllowTokenFromContext {
logger.Warn("Token file and token propagation are both enabled, token from file won't be used")
}
tokenFromFile, err := loadFileContent(c.TokenFilePath)
tokenFromFile, err := LoadFileContent(c.TokenFilePath)
if err != nil {
return nil, err
}
Expand All @@ -425,7 +425,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
return transport, nil
}

func loadFileContent(path string) (string, error) {
func LoadFileContent(path string) (string, error) {
b, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return "", err
Expand Down
60 changes: 38 additions & 22 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ type Factory struct {

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
primaryConfig atomic.Pointer[config.Configuration]
primaryClient atomic.Pointer[es.Client]
archiveConfig *config.Configuration
archiveConfig atomic.Pointer[config.Configuration]
archiveClient atomic.Pointer[es.Client]

watchers []*fswatcher.FSWatcher
Expand All @@ -80,43 +80,43 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
f.Options.InitFromViper(v)
f.primaryConfig = f.Options.GetPrimary()
f.archiveConfig = f.Options.Get(archiveNamespace)
f.primaryConfig.Store(f.Options.GetPrimary())
f.archiveConfig.Store(f.Options.Get(archiveNamespace))
}

// InitFromOptions configures factory from Options struct.
func (f *Factory) InitFromOptions(o Options) {
f.Options = &o
f.primaryConfig = f.Options.GetPrimary()
f.primaryConfig.Store(f.Options.GetPrimary())
if cfg := f.Options.Get(archiveNamespace); cfg != nil {
f.archiveConfig = cfg
f.archiveConfig.Store(cfg)
}
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
primaryClient, err := f.newClientFn(f.primaryConfig.Load(), logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient.Store(&primaryClient)

primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Load().PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err)
}
f.watchers = append(f.watchers, primaryWatcher)

if f.archiveConfig.Enabled {
archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory)
if f.archiveConfig.Load().Enabled {
archiveClient, err := f.newClientFn(f.archiveConfig.Load(), logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
f.archiveClient.Store(&archiveClient)

archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.PasswordFilePath}, f.onArchivePasswordChange, f.logger)
archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Load().PasswordFilePath}, f.onArchivePasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err)
}
Expand All @@ -128,33 +128,33 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig, false)
return createSpanReader(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig.Load(), false)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig, false)
return createSpanWriter(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig.Load(), false)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.logger, &f.primaryClient, f.primaryConfig)
return createDependencyReader(f.logger, &f.primaryClient, f.primaryConfig.Load())
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
if !f.archiveConfig.Load().Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig, true)
return createSpanReader(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig.Load(), true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
if !f.archiveConfig.Load().Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig, true)
return createSpanWriter(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig.Load(), true)
}

func createSpanReader(
Expand Down Expand Up @@ -271,19 +271,35 @@ func (f *Factory) Close() error {
}

func (f *Factory) onPrimaryPasswordChange() {
primaryClient, err := f.newClientFn(f.primaryConfig, f.logger, f.metricsFactory)
newPrimaryCfg := *f.primaryConfig.Load()
newPrimaryPassword, err := config.LoadFileContent(newPrimaryCfg.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for primary Elasticsearch client", zap.Error(err))
} else {
newPrimaryCfg.Password = newPrimaryPassword
f.primaryConfig.Store(&newPrimaryCfg)
}
primaryClient, err := f.newClientFn(f.primaryConfig.Load(), f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err))
} else {
f.primaryClient.Swap(&primaryClient)
f.primaryClient.Store(&primaryClient)
}
}

func (f *Factory) onArchivePasswordChange() {
archiveClient, err := f.newClientFn(f.archiveConfig, f.logger, f.metricsFactory)
newArchiveCfg := *f.archiveConfig.Load()
newPassword, err := config.LoadFileContent(newArchiveCfg.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for archive Elasticsearch client", zap.Error(err))
} else {
newArchiveCfg.Password = newPassword
f.archiveConfig.Store(&newArchiveCfg)
}
archiveClient, err := f.newClientFn(f.archiveConfig.Load(), f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err))
} else {
f.archiveClient.Swap(&archiveClient)
f.archiveClient.Store(&archiveClient)
}
}
56 changes: 49 additions & 7 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,22 @@
package es

import (
"context"
"errors"
"os"
"sync/atomic"
"testing"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/es"
escfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/storage"
)

var _ storage.Factory = new(Factory)
/*var _ storage.Factory = new(Factory)
type mockClientBuilder struct {
err error
Expand Down Expand Up @@ -239,4 +237,48 @@ func TestInitFromOptions(t *testing.T) {
f.InitFromOptions(o)
assert.Equal(t, o.GetPrimary(), f.primaryConfig)
assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig)
}*/

func TestPasswordFromFile(t *testing.T) {
passwordFile, err := os.CreateTemp("", "")
require.NoError(t, err)
defer passwordFile.Close()

passwordFile.WriteString("bar")

c := escfg.Configuration{
Username: "foo",
PasswordFilePath: passwordFile.Name(),
}

f := NewFactory()
f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
rawClient := &elastic.Client{}

passwordFromFile, err := escfg.LoadFileContent(c.PasswordFilePath)
require.NoError(t, err)
c.Password = passwordFromFile

option := elastic.SetBasicAuth(c.Username, c.Password)
option(rawClient)

return eswrapper.WrapESClient(rawClient, nil, 0), nil
}
f.primaryConfig.Store(&c)
f.archiveConfig.Store(&c)

require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
assert.Equal(t, "bar", f.primaryConfig.Load().Password)

primaryClient, err := f.newClientFn(f.primaryConfig.Load(), nil, nil)
require.NoError(t, err)

var expectedPrimaryClient atomic.Pointer[es.Client]
expectedPrimaryClient.Store(&primaryClient)
assert.Equal(t, expectedPrimaryClient.Load(), f.primaryClient.Load())

err = os.WriteFile(f.primaryConfig.Load().PasswordFilePath, []byte("barbaz"), 0o600)
//f.onPrimaryPasswordChange()
require.NoError(t, err)
assert.Equal(t, "barbaz", f.primaryConfig.Load().Password)
}

0 comments on commit b0a4821

Please sign in to comment.