Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

internal/api: submit service default rates in /v0.4 endpoint #546

Merged
merged 7 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/trace-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Agent struct {
// NewAgent returns a new Agent object, ready to be started. It takes a context
// which may be cancelled in order to gracefully stop the agent.
func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
dynConf := sampler.NewDynamicConfig()
dynConf := sampler.NewDynamicConfig(conf.DefaultEnv)

// inter-component channels
rawTraceChan := make(chan agent.Trace, 5000) // about 1000 traces/sec for 5 sec, TODO: move to *model.Trace
Expand Down
2 changes: 1 addition & 1 deletion internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var headerFields = map[string]string{
}

func newTestReceiverFromConfig(conf *config.AgentConfig) *HTTPReceiver {
dynConf := sampler.NewDynamicConfig()
dynConf := sampler.NewDynamicConfig("none")

rawTraceChan := make(chan agent.Trace, 5000)
serviceChan := make(chan agent.ServicesMetadata, 50)
Expand Down
44 changes: 26 additions & 18 deletions internal/sampler/catalog.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,44 @@
package sampler

import (
"github.com/DataDog/datadog-trace-agent/internal/agent"
)
import "sync"

const defaultServiceRateKey = "service:,env:"

type serviceKeyCatalog map[string]Signature

func byServiceKey(service, env string) string {
return "service:" + service + ",env:" + env
// serviceKeyCatalog reverse-maps service signatures to their generated hashes for
// easy look up.
type serviceKeyCatalog struct {
mu sync.Mutex
lookup map[ServiceSignature]Signature
}

func newServiceKeyCatalog() serviceKeyCatalog {
return serviceKeyCatalog(make(map[string]Signature))
// newServiceLookup returns a new serviceKeyCatalog.
func newServiceLookup() *serviceKeyCatalog {
return &serviceKeyCatalog{
lookup: make(map[ServiceSignature]Signature),
}
}

func (cat serviceKeyCatalog) register(root *agent.Span, env string, sig Signature) {
map[string]Signature(cat)[byServiceKey(root.Service, env)] = sig
func (cat *serviceKeyCatalog) register(svcSig ServiceSignature) Signature {
hash := svcSig.Hash()
cat.mu.Lock()
cat.lookup[svcSig] = hash
cat.mu.Unlock()
return hash
}

func (cat serviceKeyCatalog) getRateByService(rates map[Signature]float64, totalScore float64) map[string]float64 {
rbs := make(map[string]float64, len(rates)+1)
for key, sig := range map[string]Signature(cat) {
// ratesByService returns a map of service signatures mapping to the rates identified using
// the signatures.
func (cat serviceKeyCatalog) ratesByService(rates map[Signature]float64, totalScore float64) map[ServiceSignature]float64 {
rbs := make(map[ServiceSignature]float64, len(rates)+1)
defer cat.mu.Unlock()
cat.mu.Lock()
for key, sig := range cat.lookup {
if rate, ok := rates[sig]; ok {
rbs[key] = rate
} else {
// Backend, with its decay mecanism, should automatically remove the entries
// which have such a low value that they don't matter any more.
delete(cat, key)
delete(cat.lookup, key)
}
}
rbs[defaultServiceRateKey] = totalScore
rbs[ServiceSignature{}] = totalScore
return rbs
}
77 changes: 38 additions & 39 deletions internal/sampler/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,81 +6,80 @@ import (
"github.com/stretchr/testify/assert"
)

func TestByServiceKey(t *testing.T) {
func TestServiceSignatureString(t *testing.T) {
assert := assert.New(t)

assert.Equal(defaultServiceRateKey, byServiceKey("", ""))
assert.Equal("service:mcnulty,env:test", byServiceKey("mcnulty", "test"))
assert.Equal(defaultServiceRateKey, ServiceSignature{}.String())
assert.Equal("service:mcnulty,env:test", ServiceSignature{"mcnulty", "test"}.String())
}

func TestNewServiceKeyCatalog(t *testing.T) {
assert := assert.New(t)

cat := newServiceKeyCatalog()
assert.NotNil(cat)
assert.Equal(map[string]Signature{}, map[string]Signature(cat))
func TestNewServiceLookup(t *testing.T) {
cat := newServiceLookup()
assert.NotNil(t, cat.lookup)
}

func TestServiceKeyCatalogRegister(t *testing.T) {
assert := assert.New(t)

cat := newServiceKeyCatalog()
cat := newServiceLookup()
s := getTestPriorityEngine()

_, root1 := getTestTraceWithService(t, "service1", s)
sig1 := computeServiceSignature(root1, defaultEnv)
cat.register(root1, defaultEnv, sig1)
assert.Equal(map[string]Signature{"service:service1,env:none": sig1}, map[string]Signature(cat))
sig1 := cat.register(ServiceSignature{root1.Service, defaultEnv})
assert.Equal(
map[ServiceSignature]Signature{
ServiceSignature{"service1", "none"}: sig1,
},
cat.lookup,
)

_, root2 := getTestTraceWithService(t, "service2", s)
sig2 := computeServiceSignature(root2, defaultEnv)
cat.register(root2, defaultEnv, sig2)
assert.Equal(map[string]Signature{
"service:service1,env:none": sig1,
"service:service2,env:none": sig2,
}, map[string]Signature(cat))
sig2 := cat.register(ServiceSignature{root2.Service, defaultEnv})
assert.Equal(
map[ServiceSignature]Signature{
ServiceSignature{"service1", "none"}: sig1,
ServiceSignature{"service2", "none"}: sig2,
},
cat.lookup,
)
}

func TestServiceKeyCatalogGetRateByService(t *testing.T) {
func TestServiceKeyCatalogRatesByService(t *testing.T) {
assert := assert.New(t)

cat := newServiceKeyCatalog()
cat := newServiceLookup()
s := getTestPriorityEngine()

_, root1 := getTestTraceWithService(t, "service1", s)
sig1 := computeServiceSignature(root1, defaultEnv)
cat.register(root1, defaultEnv, sig1)
sig1 := cat.register(ServiceSignature{root1.Service, defaultEnv})
_, root2 := getTestTraceWithService(t, "service2", s)
sig2 := computeServiceSignature(root2, defaultEnv)
cat.register(root2, defaultEnv, sig2)
sig2 := cat.register(ServiceSignature{root2.Service, defaultEnv})

rates := map[Signature]float64{
sig1: 0.3,
sig2: 0.7,
}
const totalRate = 0.2

var rateByService map[string]float64

rateByService = cat.getRateByService(rates, totalRate)
assert.Equal(map[string]float64{
"service:service1,env:none": 0.3,
"service:service2,env:none": 0.7,
"service:,env:": 0.2,
rateByService := cat.ratesByService(rates, totalRate)
assert.Equal(map[ServiceSignature]float64{
ServiceSignature{"service1", "none"}: 0.3,
ServiceSignature{"service2", "none"}: 0.7,
ServiceSignature{}: 0.2,
}, rateByService)

delete(rates, sig1)

rateByService = cat.getRateByService(rates, totalRate)
assert.Equal(map[string]float64{
"service:service2,env:none": 0.7,
"service:,env:": 0.2,
rateByService = cat.ratesByService(rates, totalRate)
assert.Equal(map[ServiceSignature]float64{
ServiceSignature{"service2", "none"}: 0.7,
ServiceSignature{}: 0.2,
}, rateByService)

delete(rates, sig2)

rateByService = cat.getRateByService(rates, totalRate)
assert.Equal(map[string]float64{
"service:,env:": 0.2,
rateByService = cat.ratesByService(rates, totalRate)
assert.Equal(map[ServiceSignature]float64{
ServiceSignature{}: 0.2,
}, rateByService)
}
35 changes: 20 additions & 15 deletions internal/sampler/dynamic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,33 @@ type DynamicConfig struct {
RateByService RateByService
}

// NewDynamicConfig creates a new dynamic config object.
func NewDynamicConfig() *DynamicConfig {
// Not much logic here now, as RateByService is fine with
// being used unintialized, but external packages should use this.
return &DynamicConfig{}
// NewDynamicConfig creates a new dynamic config object which maps service signatures
// to their corresponding sampling rates. Each service will have a default assigned
// matching the service rate of the specified env.
func NewDynamicConfig(env string) *DynamicConfig {
return &DynamicConfig{RateByService: RateByService{defaultEnv: env}}
}

// RateByService stores the sampling rate per service. It is thread-safe, so
// one can read/write on it concurrently, using getters and setters.
type RateByService struct {
defaultEnv string // env. to use for service defaults

mu sync.RWMutex // guards rates
rates map[string]float64
mutex sync.RWMutex
}

// SetAll the sampling rate for all services. If a service/env is not
// in the map, then the entry is removed.
func (rbs *RateByService) SetAll(rates map[string]float64) {
rbs.mutex.Lock()
defer rbs.mutex.Unlock()
func (rbs *RateByService) SetAll(rates map[ServiceSignature]float64) {
rbs.mu.Lock()
defer rbs.mu.Unlock()

if rbs.rates == nil {
rbs.rates = make(map[string]float64, len(rates))
}
for k := range rbs.rates {
if _, ok := rates[k]; !ok {
delete(rbs.rates, k)
}
delete(rbs.rates, k)
}
for k, v := range rates {
if v < 0 {
Expand All @@ -47,14 +47,19 @@ func (rbs *RateByService) SetAll(rates map[string]float64) {
if v > 1 {
v = 1
}
rbs.rates[k] = v
rbs.rates[k.String()] = v
if k.Env == rbs.defaultEnv {
// if this is the default env, then this is also the
// service's default rate unbound to any env.
rbs.rates[ServiceSignature{Name: k.Name}.String()] = v
}
}
}

// GetAll returns all sampling rates for all services.
func (rbs *RateByService) GetAll() map[string]float64 {
rbs.mutex.RLock()
defer rbs.mutex.RUnlock()
rbs.mu.RLock()
defer rbs.mu.RUnlock()

ret := make(map[string]float64, len(rbs.rates))
for k, v := range rbs.rates {
Expand Down
Loading