Skip to content

Commit

Permalink
Support periodic reload of sampling strategies file
Browse files Browse the repository at this point in the history
Signed-off-by: defool <[email protected]>
  • Loading branch information
defool committed Apr 20, 2020
1 parent b99114e commit a53bb83
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 17 deletions.
8 changes: 7 additions & 1 deletion plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,33 @@ package static

import (
"flag"
"time"

"github.com/spf13/viper"
)

const (
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
)

// Options holds configuration for the static sampling strategy store.
type Options struct {
// StrategiesFile is the path for the sampling strategies file in JSON format
StrategiesFile string
// ReloadInterval is the time interval to check and reload sampling strategies file
ReloadInterval time.Duration
}

// AddFlags adds flags for Options
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no checks (default 0s)")
}

// InitFromViper initializes Options with properties from viper
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
return opts
}
91 changes: 77 additions & 14 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ package static

import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"sync/atomic"
"time"

"go.uber.org/zap"

Expand All @@ -30,31 +34,86 @@ import (
type strategyStore struct {
logger *zap.Logger

defaultStrategy *sampling.SamplingStrategyResponse
serviceStrategies map[string]*sampling.SamplingStrategyResponse
defaultStrategy atomic.Value
serviceStrategies atomic.Value

ctx context.Context
cancelFunc context.CancelFunc
}

// NewStrategyStore creates a strategy store that holds static sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
h := &strategyStore{
logger: logger,
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
}
h.serviceStrategies.Store(make(map[string]*sampling.SamplingStrategyResponse))
strategies, err := loadStrategies(options.StrategiesFile)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)

if options.ReloadInterval > 0 {
go h.autoUpdateStrategy(options.ReloadInterval, options.StrategiesFile)
}
return h, nil
}

// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
if strategy, ok := h.serviceStrategies[serviceName]; ok {
serviceStrategies, ok := h.serviceStrategies.Load().(map[string]*sampling.SamplingStrategyResponse)
if !ok {
return nil, fmt.Errorf("wrong type of serviceStrategies")
}
if strategy, ok := serviceStrategies[serviceName]; ok {
return strategy, nil
}
h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName))
return h.defaultStrategy, nil
return h.defaultStrategy.Load().(*sampling.SamplingStrategyResponse), nil
}

// StopUpdateStrategy stops updating the strategy
func (h *strategyStore) StopUpdateStrategy() {
h.cancelFunc()
}

func (h *strategyStore) autoUpdateStrategy(interval time.Duration, filePath string) {
lastString := ""
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)); err == nil {
currStr := string(currBytes)
if lastString == currStr {
continue
}
err := h.updateSamplingStrategy(currBytes)
if err != nil {
h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
}
lastString = currStr
} else {
h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
}
case <-h.ctx.Done():
return
}
}
}

func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
var strategies strategies
if err := json.Unmarshal(bytes, &strategies); err != nil {
return fmt.Errorf("failed to unmarshal strategies: %w", err)
}
h.parseStrategies(&strategies)
h.logger.Info("Updated strategy:" + string(bytes))
return nil
}

// TODO good candidate for a global util function
Expand All @@ -74,40 +133,44 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
h.defaultStrategy = defaultStrategyResponse()
defaultStrategy := defaultStrategyResponse()
h.defaultStrategy.Store(defaultStrategy)
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
return
}
if strategies.DefaultStrategy != nil {
h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
}

merge := true
if h.defaultStrategy.OperationSampling == nil ||
h.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
if defaultStrategy.OperationSampling == nil ||
defaultStrategy.OperationSampling.PerOperationStrategies == nil {
merge = false
}

serviceStrategies := make(map[string]*sampling.SamplingStrategyResponse)
for _, s := range strategies.ServiceStrategies {
h.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
serviceStrategies[s.Service] = h.parseServiceStrategies(s)

// Merge with the default operation strategies, because only merging with
// the default strategy has no effect on service strategies (the default strategy
// is not merged with and only used as a fallback).
opS := h.serviceStrategies[s.Service].OperationSampling
opS := serviceStrategies[s.Service].OperationSampling
if opS == nil {
// Service has no per-operation strategies, so just reference the default settings.
h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling
serviceStrategies[s.Service].OperationSampling = defaultStrategy.OperationSampling
continue
}

if merge {
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
opS.PerOperationStrategies,
h.defaultStrategy.OperationSampling.PerOperationStrategies)
defaultStrategy.OperationSampling.PerOperationStrategies)
}
}
h.defaultStrategy.Store(defaultStrategy)
h.serviceStrategies.Store(serviceStrategies)
}

// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.
Expand Down
55 changes: 53 additions & 2 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package static

import (
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -79,7 +82,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
os := s.OperationSampling
assert.EqualValues(t, os.DefaultSamplingProbability, 0.8)
require.Len(t, os.PerOperationStrategies, 4)
fmt.Println(os)

assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation)
assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)
Expand Down Expand Up @@ -243,3 +246,51 @@ func TestDeepCopy(t *testing.T) {
assert.False(t, copy == s)
assert.EqualValues(t, copy, s)
}

func TestAutoUpdateStrategy(t *testing.T) {
// copy from fixtures/strategies.json
tempFile, _ := ioutil.TempFile("", "for_go_test_*.json")
tempFile.Close()

srcFile, dstFile := "fixtures/strategies.json", tempFile.Name()
srcBytes, err := ioutil.ReadFile(srcFile)
require.NoError(t, err)
err = ioutil.WriteFile(dstFile, srcBytes, 0644)
require.NoError(t, err)

interval := time.Millisecond * 10
store, err := NewStrategyStore(Options{
StrategiesFile: dstFile,
ReloadInterval: interval,
}, zap.NewNop())
require.NoError(t, err)
defer store.(*strategyStore).StopUpdateStrategy()

s, err := store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

// update file
newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1)
err = ioutil.WriteFile(dstFile, []byte(newStr), 0644)
require.NoError(t, err)

// wait for reload
time.Sleep(interval * 2)

// verity reloading
s, err = store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)

// check bad file content
_ = ioutil.WriteFile(dstFile, []byte("bad value"), 0644)
time.Sleep(interval * 2)

// check file not exist
os.Remove(dstFile)

// wait for delete and update failed
time.Sleep(interval * 2)

}

0 comments on commit a53bb83

Please sign in to comment.