Skip to content

Commit

Permalink
Add CLI option for collector to refresh sampling strategies
Browse files Browse the repository at this point in the history
Signed-off-by: defool <[email protected]>
  • Loading branch information
defool committed Apr 19, 2020
1 parent d75eb14 commit c8195f9
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 18 deletions.
6 changes: 6 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ type StrategyStore interface {
// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error)
}

// StrategyUpdater updates sampling strategies.
type StrategyUpdater interface {
//UpdateSamplingStrategy replaces the sampling strategy content.
UpdateSamplingStrategy(bytes []byte) error
}
37 changes: 36 additions & 1 deletion plugin/sampling/strategystore/static/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package static

import (
"context"
"flag"
"io/ioutil"
"path/filepath"
"time"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
Expand Down Expand Up @@ -56,5 +60,36 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, error) {
return NewStrategyStore(*f.options, f.logger)
ss, err := NewStrategyStore(*f.options, f.logger)
if err != nil {
return ss, err
}
if s, ok := ss.(strategystore.StrategyUpdater); f.options.ReloadInterval > 0 && ok {
go f.autoUpdateStrategy(context.Background(), s)
}
return ss, nil
}

func (f *Factory) autoUpdateStrategy(ctx context.Context, s strategystore.StrategyUpdater) {
lastString := ""
interval, filePath := f.options.ReloadInterval, f.options.StrategiesFile
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
if currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)); err == nil {
currStr := string(currBytes)
if lastString == currStr {
continue
}
err := s.UpdateSamplingStrategy(currBytes)
if err != nil {
f.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
}
lastString = currStr
}
case <-ctx.Done():
return
}
}
}
46 changes: 46 additions & 0 deletions plugin/sampling/strategystore/static/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@
package static

import (
"io/ioutil"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

ss "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

var _ ss.Factory = new(Factory)
Expand All @@ -39,3 +45,43 @@ func TestFactory(t *testing.T) {
_, err := f.CreateStrategyStore()
assert.NoError(t, err)
}

func TestAutoReload(t *testing.T) {
// copy from fixtures/strategies.json
srcFile, dstFile := "fixtures/strategies.json", "fixtures/strategies_for_reload.json"
srcBytes, err := ioutil.ReadFile(srcFile)
require.NoError(t, err)
err = ioutil.WriteFile(dstFile, srcBytes, 0644)
require.NoError(t, err)

f := NewFactory()
v, command := config.Viperize(f.AddFlags)
_ = command.ParseFlags([]string{"--sampling.strategies-file=" + dstFile, "--sampling.strategies-reload-interval=50ms"})
f.InitFromViper(v)

// Test reading strategies from a file
//ctx, canf := context.WithCancel(context.Background())
//defer canf()

store, err := f.CreateStrategyStore()
require.NoError(t, err)

s, err := store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

// update file
newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1)
err = ioutil.WriteFile(dstFile, []byte(newStr), 0644)
require.NoError(t, err)

// wait for reload
time.Sleep(time.Millisecond * 50 * 2)

// verity reloading
s, err = store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)

os.Remove(dstFile)
}
8 changes: 7 additions & 1 deletion plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,33 @@ package static

import (
"flag"
"time"

"github.com/spf13/viper"
)

const (
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
)

// Options holds configuration for the static sampling strategy store.
type Options struct {
// StrategiesFile is the path for the sampling strategies file in JSON format
StrategiesFile string
// ReloadInterval is the time interval to check and reload sampling strategies file
ReloadInterval time.Duration
}

// AddFlags adds flags for Options
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no checks (default 0s)")
}

// InitFromViper initializes Options with properties from viper
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
return opts
}
48 changes: 34 additions & 14 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"sync/atomic"

"go.uber.org/zap"

Expand All @@ -30,16 +31,16 @@ import (
type strategyStore struct {
logger *zap.Logger

defaultStrategy *sampling.SamplingStrategyResponse
serviceStrategies map[string]*sampling.SamplingStrategyResponse
defaultStrategy atomic.Value
serviceStrategies atomic.Value
}

// NewStrategyStore creates a strategy store that holds static sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
h := &strategyStore{
logger: logger,
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
logger: logger,
}
h.serviceStrategies.Store(make(map[string]*sampling.SamplingStrategyResponse))
strategies, err := loadStrategies(options.StrategiesFile)
if err != nil {
return nil, err
Expand All @@ -50,10 +51,25 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er

// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
if strategy, ok := h.serviceStrategies[serviceName]; ok {
serviceStrategies, ok := h.serviceStrategies.Load().(map[string]*sampling.SamplingStrategyResponse)
if !ok {
return nil, fmt.Errorf("wrong type of serviceStrategies")
}
if strategy, ok := serviceStrategies[serviceName]; ok {
return strategy, nil
}
return h.defaultStrategy, nil
return h.defaultStrategy.Load().(*sampling.SamplingStrategyResponse), nil
}

// UpdateSamplingStrategy implements StrategyStore#UpdateSamplingStrategy.
func (h *strategyStore) UpdateSamplingStrategy(bytes []byte) error {
var strategies strategies
if err := json.Unmarshal(bytes, &strategies); err != nil {
return fmt.Errorf("failed to unmarshal strategies: %w", err)
}
h.parseStrategies(&strategies)
h.logger.Info("Updated strategy:" + string(bytes))
return nil
}

// TODO good candidate for a global util function
Expand All @@ -73,40 +89,44 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
h.defaultStrategy = defaultStrategyResponse()
defaultStrategy := defaultStrategyResponse()
h.defaultStrategy.Store(defaultStrategy)
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
return
}
if strategies.DefaultStrategy != nil {
h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
}

merge := true
if h.defaultStrategy.OperationSampling == nil ||
h.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
if defaultStrategy.OperationSampling == nil ||
defaultStrategy.OperationSampling.PerOperationStrategies == nil {
merge = false
}

serviceStrategies := make(map[string]*sampling.SamplingStrategyResponse)
for _, s := range strategies.ServiceStrategies {
h.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
serviceStrategies[s.Service] = h.parseServiceStrategies(s)

// Merge with the default operation strategies, because only merging with
// the default strategy has no effect on service strategies (the default strategy
// is not merged with and only used as a fallback).
opS := h.serviceStrategies[s.Service].OperationSampling
opS := serviceStrategies[s.Service].OperationSampling
if opS == nil {
// Service has no per-operation strategies, so just reference the default settings.
h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling
serviceStrategies[s.Service].OperationSampling = defaultStrategy.OperationSampling
continue
}

if merge {
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
opS.PerOperationStrategies,
h.defaultStrategy.OperationSampling.PerOperationStrategies)
defaultStrategy.OperationSampling.PerOperationStrategies)
}
}
h.defaultStrategy.Store(defaultStrategy)
h.serviceStrategies.Store(serviceStrategies)
}

// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.
Expand Down
3 changes: 1 addition & 2 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package static

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -79,7 +78,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
os := s.OperationSampling
assert.EqualValues(t, os.DefaultSamplingProbability, 0.8)
require.Len(t, os.PerOperationStrategies, 4)
fmt.Println(os)
//fmt.Println(os)
assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation)
assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)
Expand Down

0 comments on commit c8195f9

Please sign in to comment.