diff --git a/plugin/sampling/strategystore/static/options.go b/plugin/sampling/strategystore/static/options.go index 11983a763fe7..37339b8178aa 100644 --- a/plugin/sampling/strategystore/static/options.go +++ b/plugin/sampling/strategystore/static/options.go @@ -16,27 +16,33 @@ package static import ( "flag" + "time" "github.com/spf13/viper" ) const ( - samplingStrategiesFile = "sampling.strategies-file" + samplingStrategiesFile = "sampling.strategies-file" + samplingStrategiesReloadInterval = "sampling.strategies-reload-interval" ) // Options holds configuration for the static sampling strategy store. type Options struct { // StrategiesFile is the path for the sampling strategies file in JSON format StrategiesFile string + // ReloadInterval is the time interval to check and reload sampling strategies file + ReloadInterval time.Duration } // AddFlags adds flags for Options func AddFlags(flagSet *flag.FlagSet) { flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file") + flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no checks (default 0s)") } // InitFromViper initializes Options with properties from viper func (opts *Options) InitFromViper(v *viper.Viper) *Options { opts.StrategiesFile = v.GetString(samplingStrategiesFile) + opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval) return opts } diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index f6769e05a715..90526eb768fb 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -16,10 +16,14 @@ package static import ( "bytes" + "context" "encoding/gob" "encoding/json" "fmt" "io/ioutil" + "path/filepath" + "sync/atomic" + "time" "go.uber.org/zap" @@ -30,31 +34,86 @@ import ( type strategyStore struct { logger *zap.Logger - defaultStrategy *sampling.SamplingStrategyResponse - serviceStrategies map[string]*sampling.SamplingStrategyResponse + defaultStrategy atomic.Value + serviceStrategies atomic.Value + + ctx context.Context + cancelFunc context.CancelFunc } // NewStrategyStore creates a strategy store that holds static sampling strategies. func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) h := &strategyStore{ - logger: logger, - serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse), + logger: logger, + ctx: ctx, + cancelFunc: cancelFunc, } + h.serviceStrategies.Store(make(map[string]*sampling.SamplingStrategyResponse)) strategies, err := loadStrategies(options.StrategiesFile) if err != nil { return nil, err } h.parseStrategies(strategies) + + if options.ReloadInterval > 0 { + go h.autoUpdateStrategy(options.ReloadInterval, options.StrategiesFile) + } return h, nil } // GetSamplingStrategy implements StrategyStore#GetSamplingStrategy. func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { - if strategy, ok := h.serviceStrategies[serviceName]; ok { + serviceStrategies, ok := h.serviceStrategies.Load().(map[string]*sampling.SamplingStrategyResponse) + if !ok { + return nil, fmt.Errorf("wrong type of serviceStrategies") + } + if strategy, ok := serviceStrategies[serviceName]; ok { return strategy, nil } h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName)) - return h.defaultStrategy, nil + return h.defaultStrategy.Load().(*sampling.SamplingStrategyResponse), nil +} + +// StopUpdateStrategy stops updating the strategy +func (h *strategyStore) StopUpdateStrategy() { + h.cancelFunc() +} + +func (h *strategyStore) autoUpdateStrategy(interval time.Duration, filePath string) { + lastString := "" + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)); err == nil { + currStr := string(currBytes) + if lastString == currStr { + continue + } + err := h.updateSamplingStrategy(currBytes) + if err != nil { + h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err)) + } + lastString = currStr + } else { + h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err)) + } + case <-h.ctx.Done(): + return + } + } +} + +func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { + var strategies strategies + if err := json.Unmarshal(bytes, &strategies); err != nil { + return fmt.Errorf("failed to unmarshal strategies: %w", err) + } + h.parseStrategies(&strategies) + h.logger.Info("Updated strategy:" + string(bytes)) + return nil } // TODO good candidate for a global util function @@ -74,40 +133,44 @@ func loadStrategies(strategiesFile string) (*strategies, error) { } func (h *strategyStore) parseStrategies(strategies *strategies) { - h.defaultStrategy = defaultStrategyResponse() + defaultStrategy := defaultStrategyResponse() + h.defaultStrategy.Store(defaultStrategy) if strategies == nil { h.logger.Info("No sampling strategies provided, using defaults") return } if strategies.DefaultStrategy != nil { - h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) + defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) } merge := true - if h.defaultStrategy.OperationSampling == nil || - h.defaultStrategy.OperationSampling.PerOperationStrategies == nil { + if defaultStrategy.OperationSampling == nil || + defaultStrategy.OperationSampling.PerOperationStrategies == nil { merge = false } + serviceStrategies := make(map[string]*sampling.SamplingStrategyResponse) for _, s := range strategies.ServiceStrategies { - h.serviceStrategies[s.Service] = h.parseServiceStrategies(s) + serviceStrategies[s.Service] = h.parseServiceStrategies(s) // Merge with the default operation strategies, because only merging with // the default strategy has no effect on service strategies (the default strategy // is not merged with and only used as a fallback). - opS := h.serviceStrategies[s.Service].OperationSampling + opS := serviceStrategies[s.Service].OperationSampling if opS == nil { // Service has no per-operation strategies, so just reference the default settings. - h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling + serviceStrategies[s.Service].OperationSampling = defaultStrategy.OperationSampling continue } if merge { opS.PerOperationStrategies = mergePerOperationSamplingStrategies( opS.PerOperationStrategies, - h.defaultStrategy.OperationSampling.PerOperationStrategies) + defaultStrategy.OperationSampling.PerOperationStrategies) } } + h.defaultStrategy.Store(defaultStrategy) + h.serviceStrategies.Store(serviceStrategies) } // mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b. diff --git a/plugin/sampling/strategystore/static/strategy_store_test.go b/plugin/sampling/strategystore/static/strategy_store_test.go index 7a93365a98b3..ddad6dc7a2a7 100644 --- a/plugin/sampling/strategystore/static/strategy_store_test.go +++ b/plugin/sampling/strategystore/static/strategy_store_test.go @@ -15,8 +15,11 @@ package static import ( - "fmt" + "io/ioutil" + "os" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -79,7 +82,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) { os := s.OperationSampling assert.EqualValues(t, os.DefaultSamplingProbability, 0.8) require.Len(t, os.PerOperationStrategies, 4) - fmt.Println(os) + assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation) assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate) assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation) @@ -243,3 +246,51 @@ func TestDeepCopy(t *testing.T) { assert.False(t, copy == s) assert.EqualValues(t, copy, s) } + +func TestAutoUpdateStrategy(t *testing.T) { + // copy from fixtures/strategies.json + tempFile, _ := ioutil.TempFile("", "for_go_test_*.json") + tempFile.Close() + + srcFile, dstFile := "fixtures/strategies.json", tempFile.Name() + srcBytes, err := ioutil.ReadFile(srcFile) + require.NoError(t, err) + err = ioutil.WriteFile(dstFile, srcBytes, 0644) + require.NoError(t, err) + + interval := time.Millisecond * 10 + store, err := NewStrategyStore(Options{ + StrategiesFile: dstFile, + ReloadInterval: interval, + }, zap.NewNop()) + require.NoError(t, err) + defer store.(*strategyStore).StopUpdateStrategy() + + s, err := store.GetSamplingStrategy("foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // update file + newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1) + err = ioutil.WriteFile(dstFile, []byte(newStr), 0644) + require.NoError(t, err) + + // wait for reload + time.Sleep(interval * 2) + + // verity reloading + s, err = store.GetSamplingStrategy("foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) + + // check bad file content + _ = ioutil.WriteFile(dstFile, []byte("bad value"), 0644) + time.Sleep(interval * 2) + + // check file not exist + os.Remove(dstFile) + + // wait for delete and update failed + time.Sleep(interval * 2) + +}