Skip to content

Commit

Permalink
Make Elasticsearch archive storage optional (#1334)
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Feb 13, 2019
1 parent 7c0ed8c commit e96fe91
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 11 deletions.
7 changes: 7 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Configuration struct {
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
}
Expand All @@ -80,6 +81,7 @@ type ClientBuilder interface {
GetTagDotReplacement() string
GetUseReadWriteAliases() bool
GetTokenFilePath() string
IsEnabled() bool
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -233,6 +235,11 @@ func (c *Configuration) GetTokenFilePath() string {
return c.TokenFilePath
}

// IsEnabled determines whether storage is enabled
func (c *Configuration) IsEnabled() bool {
return c.Enabled
}

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
Expand Down
21 changes: 15 additions & 6 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return errors.Wrap(err, "failed to create primary Elasticsearch client")
}
f.primaryClient = primaryClient
archiveClient, err := f.archiveConfig.NewClient(logger, metricsFactory)
if err != nil {
return errors.Wrap(err, "failed to create archive Elasticsearch client")
if f.archiveConfig.IsEnabled() {
f.archiveClient, err = f.archiveConfig.NewClient(logger, metricsFactory)
if err != nil {
return errors.Wrap(err, "failed to create archive Elasticsearch client")
}
}
f.archiveClient = archiveClient
return nil
}

Expand Down Expand Up @@ -125,12 +126,20 @@ func loadTagsFromFile(filePath string) ([]string, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
}

func createSpanReader(
Expand Down
36 changes: 31 additions & 5 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestElasticsearchFactory(t *testing.T) {
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error")

f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2")}
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2"), Configuration:escfg.Configuration{Enabled:true}}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2")

f.archiveConfig = &mockClientBuilder{}
Expand Down Expand Up @@ -127,12 +127,12 @@ func TestLoadTagsFromFile(t *testing.T) {

func TestFactory_LoadMapping(t *testing.T) {
spanMapping, serviceMapping := GetMappings(10, 0)
tests := []struct{
name string
tests := []struct {
name string
toTest string
}{
{name: "jaeger-span.json", toTest:spanMapping},
{name: "jaeger-service.json", toTest:serviceMapping},
{name: "jaeger-span.json", toTest: spanMapping},
{name: "jaeger-service.json", toTest: serviceMapping},
}
for _, test := range tests {
mapping := loadMapping(test.name)
Expand All @@ -149,3 +149,29 @@ func TestFactory_LoadMapping(t *testing.T) {
assert.Equal(t, expectedMapping, test.toTest)
}
}

func TestArchiveDisabled(t *testing.T) {
f := NewFactory()
f.Options.Get(archiveNamespace).Enabled = false
w, err := f.CreateArchiveSpanWriter()
assert.Nil(t, w)
assert.Nil(t, err)
r, err := f.CreateArchiveSpanReader()
assert.Nil(t, r)
assert.Nil(t, err)
}

func TestArchiveEnabled2(t *testing.T) {
f := NewFactory()
f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{}
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
f.Options.Get(archiveNamespace).Enabled = true
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
}
9 changes: 9 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixEnabled = ".enabled"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
TagDotReplacement: "@",
Enabled: true,
},
servers: "http://127.0.0.1:9200",
namespace: primaryNamespace,
Expand Down Expand Up @@ -206,6 +208,12 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
"(experimental) Use read and write aliases for indices. Use this option with Elasticsearch rollover "+
"API. It requires an external component to create aliases before startup and then performing its management. "+
"Note that "+nsConfig.namespace+suffixMaxSpanAge+" is not taken into the account and has to be substituted by external component managing read alias.")
if nsConfig.namespace == archiveNamespace {
flagSet.Bool(
nsConfig.namespace+suffixEnabled,
nsConfig.Enabled,
"Enable extra storage")
}
}

// InitFromViper initializes Options with properties from viper
Expand Down Expand Up @@ -240,6 +248,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias)
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
}

// GetPrimary returns primary configuration.
Expand Down

0 comments on commit e96fe91

Please sign in to comment.