From c8195f9af57b32a806d116190d888587948afa87 Mon Sep 17 00:00:00 2001 From: defool Date: Sun, 19 Apr 2020 20:54:55 +0800 Subject: [PATCH] Add CLI option for collector to refresh sampling strategies Signed-off-by: defool --- .../app/sampling/strategystore/interface.go | 6 +++ .../sampling/strategystore/static/factory.go | 37 +++++++++++++- .../strategystore/static/factory_test.go | 46 ++++++++++++++++++ .../sampling/strategystore/static/options.go | 8 +++- .../strategystore/static/strategy_store.go | 48 +++++++++++++------ .../static/strategy_store_test.go | 3 +- 6 files changed, 130 insertions(+), 18 deletions(-) diff --git a/cmd/collector/app/sampling/strategystore/interface.go b/cmd/collector/app/sampling/strategystore/interface.go index e1926ded1228..53cfd0a35be3 100644 --- a/cmd/collector/app/sampling/strategystore/interface.go +++ b/cmd/collector/app/sampling/strategystore/interface.go @@ -23,3 +23,9 @@ type StrategyStore interface { // GetSamplingStrategy retrieves the sampling strategy for the specified service. GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) } + +// StrategyUpdater updates sampling strategies. +type StrategyUpdater interface { + //UpdateSamplingStrategy replaces the sampling strategy content. + UpdateSamplingStrategy(bytes []byte) error +} diff --git a/plugin/sampling/strategystore/static/factory.go b/plugin/sampling/strategystore/static/factory.go index ddc769e94afe..f78390c9541e 100644 --- a/plugin/sampling/strategystore/static/factory.go +++ b/plugin/sampling/strategystore/static/factory.go @@ -15,7 +15,11 @@ package static import ( + "context" "flag" + "io/ioutil" + "path/filepath" + "time" "github.com/spf13/viper" "github.com/uber/jaeger-lib/metrics" @@ -56,5 +60,36 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) // CreateStrategyStore implements strategystore.Factory func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, error) { - return NewStrategyStore(*f.options, f.logger) + ss, err := NewStrategyStore(*f.options, f.logger) + if err != nil { + return ss, err + } + if s, ok := ss.(strategystore.StrategyUpdater); f.options.ReloadInterval > 0 && ok { + go f.autoUpdateStrategy(context.Background(), s) + } + return ss, nil +} + +func (f *Factory) autoUpdateStrategy(ctx context.Context, s strategystore.StrategyUpdater) { + lastString := "" + interval, filePath := f.options.ReloadInterval, f.options.StrategiesFile + ticker := time.NewTicker(interval) + for { + select { + case <-ticker.C: + if currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)); err == nil { + currStr := string(currBytes) + if lastString == currStr { + continue + } + err := s.UpdateSamplingStrategy(currBytes) + if err != nil { + f.logger.Error("UpdateSamplingStrategy failed", zap.Error(err)) + } + lastString = currStr + } + case <-ctx.Done(): + return + } + } } diff --git a/plugin/sampling/strategystore/static/factory_test.go b/plugin/sampling/strategystore/static/factory_test.go index 205c35651ba1..91f96de855c5 100644 --- a/plugin/sampling/strategystore/static/factory_test.go +++ b/plugin/sampling/strategystore/static/factory_test.go @@ -15,15 +15,21 @@ package static import ( + "io/ioutil" + "os" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" ss "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/plugin" + "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) var _ ss.Factory = new(Factory) @@ -39,3 +45,43 @@ func TestFactory(t *testing.T) { _, err := f.CreateStrategyStore() assert.NoError(t, err) } + +func TestAutoReload(t *testing.T) { + // copy from fixtures/strategies.json + srcFile, dstFile := "fixtures/strategies.json", "fixtures/strategies_for_reload.json" + srcBytes, err := ioutil.ReadFile(srcFile) + require.NoError(t, err) + err = ioutil.WriteFile(dstFile, srcBytes, 0644) + require.NoError(t, err) + + f := NewFactory() + v, command := config.Viperize(f.AddFlags) + _ = command.ParseFlags([]string{"--sampling.strategies-file=" + dstFile, "--sampling.strategies-reload-interval=50ms"}) + f.InitFromViper(v) + + // Test reading strategies from a file + //ctx, canf := context.WithCancel(context.Background()) + //defer canf() + + store, err := f.CreateStrategyStore() + require.NoError(t, err) + + s, err := store.GetSamplingStrategy("foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // update file + newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1) + err = ioutil.WriteFile(dstFile, []byte(newStr), 0644) + require.NoError(t, err) + + // wait for reload + time.Sleep(time.Millisecond * 50 * 2) + + // verity reloading + s, err = store.GetSamplingStrategy("foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) + + os.Remove(dstFile) +} diff --git a/plugin/sampling/strategystore/static/options.go b/plugin/sampling/strategystore/static/options.go index 11983a763fe7..37339b8178aa 100644 --- a/plugin/sampling/strategystore/static/options.go +++ b/plugin/sampling/strategystore/static/options.go @@ -16,27 +16,33 @@ package static import ( "flag" + "time" "github.com/spf13/viper" ) const ( - samplingStrategiesFile = "sampling.strategies-file" + samplingStrategiesFile = "sampling.strategies-file" + samplingStrategiesReloadInterval = "sampling.strategies-reload-interval" ) // Options holds configuration for the static sampling strategy store. type Options struct { // StrategiesFile is the path for the sampling strategies file in JSON format StrategiesFile string + // ReloadInterval is the time interval to check and reload sampling strategies file + ReloadInterval time.Duration } // AddFlags adds flags for Options func AddFlags(flagSet *flag.FlagSet) { flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file") + flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no checks (default 0s)") } // InitFromViper initializes Options with properties from viper func (opts *Options) InitFromViper(v *viper.Viper) *Options { opts.StrategiesFile = v.GetString(samplingStrategiesFile) + opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval) return opts } diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index 78d2acae1b59..9489dd09ddd3 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "sync/atomic" "go.uber.org/zap" @@ -30,16 +31,16 @@ import ( type strategyStore struct { logger *zap.Logger - defaultStrategy *sampling.SamplingStrategyResponse - serviceStrategies map[string]*sampling.SamplingStrategyResponse + defaultStrategy atomic.Value + serviceStrategies atomic.Value } // NewStrategyStore creates a strategy store that holds static sampling strategies. func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) { h := &strategyStore{ - logger: logger, - serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse), + logger: logger, } + h.serviceStrategies.Store(make(map[string]*sampling.SamplingStrategyResponse)) strategies, err := loadStrategies(options.StrategiesFile) if err != nil { return nil, err @@ -50,10 +51,25 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er // GetSamplingStrategy implements StrategyStore#GetSamplingStrategy. func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { - if strategy, ok := h.serviceStrategies[serviceName]; ok { + serviceStrategies, ok := h.serviceStrategies.Load().(map[string]*sampling.SamplingStrategyResponse) + if !ok { + return nil, fmt.Errorf("wrong type of serviceStrategies") + } + if strategy, ok := serviceStrategies[serviceName]; ok { return strategy, nil } - return h.defaultStrategy, nil + return h.defaultStrategy.Load().(*sampling.SamplingStrategyResponse), nil +} + +// UpdateSamplingStrategy implements StrategyStore#UpdateSamplingStrategy. +func (h *strategyStore) UpdateSamplingStrategy(bytes []byte) error { + var strategies strategies + if err := json.Unmarshal(bytes, &strategies); err != nil { + return fmt.Errorf("failed to unmarshal strategies: %w", err) + } + h.parseStrategies(&strategies) + h.logger.Info("Updated strategy:" + string(bytes)) + return nil } // TODO good candidate for a global util function @@ -73,40 +89,44 @@ func loadStrategies(strategiesFile string) (*strategies, error) { } func (h *strategyStore) parseStrategies(strategies *strategies) { - h.defaultStrategy = defaultStrategyResponse() + defaultStrategy := defaultStrategyResponse() + h.defaultStrategy.Store(defaultStrategy) if strategies == nil { h.logger.Info("No sampling strategies provided, using defaults") return } if strategies.DefaultStrategy != nil { - h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) + defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) } merge := true - if h.defaultStrategy.OperationSampling == nil || - h.defaultStrategy.OperationSampling.PerOperationStrategies == nil { + if defaultStrategy.OperationSampling == nil || + defaultStrategy.OperationSampling.PerOperationStrategies == nil { merge = false } + serviceStrategies := make(map[string]*sampling.SamplingStrategyResponse) for _, s := range strategies.ServiceStrategies { - h.serviceStrategies[s.Service] = h.parseServiceStrategies(s) + serviceStrategies[s.Service] = h.parseServiceStrategies(s) // Merge with the default operation strategies, because only merging with // the default strategy has no effect on service strategies (the default strategy // is not merged with and only used as a fallback). - opS := h.serviceStrategies[s.Service].OperationSampling + opS := serviceStrategies[s.Service].OperationSampling if opS == nil { // Service has no per-operation strategies, so just reference the default settings. - h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling + serviceStrategies[s.Service].OperationSampling = defaultStrategy.OperationSampling continue } if merge { opS.PerOperationStrategies = mergePerOperationSamplingStrategies( opS.PerOperationStrategies, - h.defaultStrategy.OperationSampling.PerOperationStrategies) + defaultStrategy.OperationSampling.PerOperationStrategies) } } + h.defaultStrategy.Store(defaultStrategy) + h.serviceStrategies.Store(serviceStrategies) } // mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b. diff --git a/plugin/sampling/strategystore/static/strategy_store_test.go b/plugin/sampling/strategystore/static/strategy_store_test.go index 7a93365a98b3..531a77479733 100644 --- a/plugin/sampling/strategystore/static/strategy_store_test.go +++ b/plugin/sampling/strategystore/static/strategy_store_test.go @@ -15,7 +15,6 @@ package static import ( - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -79,7 +78,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) { os := s.OperationSampling assert.EqualValues(t, os.DefaultSamplingProbability, 0.8) require.Len(t, os.PerOperationStrategies, 4) - fmt.Println(os) + //fmt.Println(os) assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation) assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate) assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)