Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Cleanup code related to APM events.
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexJF committed Oct 29, 2018
1 parent 456dc89 commit ff459b9
Show file tree
Hide file tree
Showing 22 changed files with 318 additions and 242 deletions.
84 changes: 40 additions & 44 deletions cmd/trace-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/DataDog/datadog-trace-agent/api"
"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/event"
"github.com/DataDog/datadog-trace-agent/filters"
"github.com/DataDog/datadog-trace-agent/info"
"github.com/DataDog/datadog-trace-agent/model"
Expand All @@ -22,29 +23,6 @@ import (

const processStatsInterval = time.Minute

type processedTrace struct {
Trace model.Trace
WeightedTrace model.WeightedTrace
Root *model.Span
Env string
Sublayers map[*model.Span][]model.SublayerValue
}

func (pt *processedTrace) weight() float64 {
if pt.Root == nil {
return 1.0
}
return pt.Root.Weight()
}

func (pt *processedTrace) getSamplingPriority() (int, bool) {
if pt.Root == nil {
return 0, false
}
p, ok := pt.Root.Metrics[sampler.SamplingPriorityKey]
return int(p), ok
}

// Agent struct holds all the sub-routines structs and make the data flow between them
type Agent struct {
Receiver *api.HTTPReceiver
Expand All @@ -54,7 +32,7 @@ type Agent struct {
ScoreSampler *Sampler
ErrorsScoreSampler *Sampler
PrioritySampler *Sampler
TransactionSampler TransactionSampler
EventExtractor event.Extractor
TraceWriter *writer.TraceWriter
ServiceWriter *writer.ServiceWriter
StatsWriter *writer.StatsWriter
Expand All @@ -65,7 +43,7 @@ type Agent struct {
// tags based on their type.
obfuscator *obfuscate.Obfuscator

sampledTraceChan chan *writer.SampledTrace
tracePkgChan chan *writer.TracePackage

// config
conf *config.AgentConfig
Expand All @@ -82,7 +60,7 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {

// inter-component channels
rawTraceChan := make(chan model.Trace, 5000) // about 1000 traces/sec for 5 sec, TODO: move to *model.Trace
sampledTraceChan := make(chan *writer.SampledTrace)
tracePkgChan := make(chan *writer.TracePackage)
statsChan := make(chan []model.StatsBucket)
serviceChan := make(chan model.ServicesMetadata, 50)
filteredServiceChan := make(chan model.ServicesMetadata, 50)
Expand All @@ -99,10 +77,10 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
ss := NewScoreSampler(conf)
ess := NewErrorsSampler(conf)
ps := NewPrioritySampler(conf, dynConf)
ts := NewTransactionSampler(conf)
ee := eventExtractorFromConf(conf)
se := NewTraceServiceExtractor(serviceChan)
sm := NewServiceMapper(serviceChan, filteredServiceChan)
tw := writer.NewTraceWriter(conf, sampledTraceChan)
tw := writer.NewTraceWriter(conf, tracePkgChan)
sw := writer.NewStatsWriter(conf, statsChan)
svcW := writer.NewServiceWriter(conf, filteredServiceChan)

Expand All @@ -114,14 +92,14 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
ScoreSampler: ss,
ErrorsScoreSampler: ess,
PrioritySampler: ps,
TransactionSampler: ts,
EventExtractor: ee,
TraceWriter: tw,
StatsWriter: sw,
ServiceWriter: svcW,
ServiceExtractor: se,
ServiceMapper: sm,
obfuscator: obf,
sampledTraceChan: sampledTraceChan,
tracePkgChan: tracePkgChan,
conf: conf,
dynConf: dynConf,
ctx: ctx,
Expand Down Expand Up @@ -191,7 +169,7 @@ func (a *Agent) Process(t model.Trace) {
ts := a.Receiver.Stats.GetTagStats(info.Tags{})

// Extract priority early, as later goroutines might manipulate the Metrics map in parallel which isn't safe.
priority, hasPriority := root.Metrics[sampler.SamplingPriorityKey]
priority, hasPriority := root.GetSamplingPriority()

// Depending on the sampling priority, count that trace differently.
stat := &ts.TracesPriorityNone
Expand Down Expand Up @@ -241,7 +219,7 @@ func (a *Agent) Process(t model.Trace) {
model.SetSublayersOnSpan(subtrace.Root, subtraceSublayers)
}

pt := processedTrace{
pt := model.ProcessedTrace{
Trace: t,
WeightedTrace: model.NewWeightedTrace(t, root),
Root: root,
Expand All @@ -258,38 +236,44 @@ func (a *Agent) Process(t model.Trace) {
a.ServiceExtractor.Process(pt.WeightedTrace)
}()

go func() {
go func(pt model.ProcessedTrace) {
defer watchdog.LogOnPanic()
// Everything is sent to concentrator for stats, regardless of sampling.
a.Concentrator.Add(pt)
}()
}(pt)

// Don't go through sampling for < 0 priority traces
if priority < 0 {
return
}
// Run both full trace sampling and transaction extraction in another goroutine.
go func() {
go func(pt model.ProcessedTrace) {
defer watchdog.LogOnPanic()

tracePkg := writer.TracePackage{}

sampled, rate := a.sample(pt)
if !sampled {
return

if sampled {
pt.Sampled = sampled
sampler.AddSampleRate(pt.Root, rate)
tracePkg.Trace = &pt.Trace
}
sampler.AddSampleRate(pt.Root, rate)

a.sampledTraceChan <- &writer.SampledTrace{
Trace: &pt.Trace,
Transactions: a.TransactionSampler.Extract(pt),
// NOTE: Events can be extracted from non-sampled traces.
tracePkg.Events = a.EventExtractor.Extract(pt)

if !tracePkg.Empty() {
a.tracePkgChan <- &tracePkg
}
}()
}(pt)
}

func (a *Agent) sample(pt processedTrace) (sampled bool, rate float64) {
func (a *Agent) sample(pt model.ProcessedTrace) (sampled bool, rate float64) {
var sampledPriority, sampledScore bool
var ratePriority, rateScore float64

if _, ok := pt.Root.Metrics[sampler.SamplingPriorityKey]; ok {
if _, ok := pt.GetSamplingPriority(); ok {
sampledPriority, ratePriority = a.PrioritySampler.Add(pt)
}

Expand Down Expand Up @@ -343,3 +327,15 @@ func traceContainsError(trace model.Trace) bool {
}
return false
}

func eventExtractorFromConf(conf *config.AgentConfig) event.Extractor {
if len(conf.AnalyzedSpansByService) > 0 {
return event.NewFixedRateExtractor(conf.AnalyzedSpansByService)
}
if len(conf.AnalyzedRateByServiceLegacy) > 0 {
return event.NewLegacyExtractor(conf.AnalyzedRateByServiceLegacy)
}

// TODO: Replace disabled extractor with TaggedExtractor
return event.NewNoopExtractor()
}
10 changes: 5 additions & 5 deletions cmd/trace-agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func TestProcess(t *testing.T) {
defer cancel()

now := time.Now()
disabled := float64(-99)
for _, key := range []float64{
disabled := int(-99)
for _, key := range []int{
disabled, -1, -1, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2,
} {
span := &model.Span{
Expand All @@ -204,7 +204,7 @@ func TestProcess(t *testing.T) {
Metrics: map[string]float64{},
}
if key != disabled {
span.Metrics[sampler.SamplingPriorityKey] = key
span.SetSamplingPriority(key)
}
agent.Process(model.Trace{span})
}
Expand Down Expand Up @@ -339,9 +339,9 @@ func TestSampling(t *testing.T) {
if tt.hasErrors {
root.Error = 1
}
pt := processedTrace{Trace: model.Trace{root}, Root: root}
pt := model.ProcessedTrace{Trace: model.Trace{root}, Root: root}
if tt.hasPriority {
pt.Root.Metrics[sampler.SamplingPriorityKey] = 1
pt.Root.SetSamplingPriority(1)
}

sampled, rate := a.sample(pt)
Expand Down
4 changes: 2 additions & 2 deletions cmd/trace-agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (c *Concentrator) Stop() {
}

// Add appends to the proper stats bucket this trace's statistics
func (c *Concentrator) Add(t processedTrace) {
func (c *Concentrator) Add(t model.ProcessedTrace) {
c.addNow(t, model.Now())
}

func (c *Concentrator) addNow(t processedTrace, now int64) {
func (c *Concentrator) addNow(t model.ProcessedTrace, now int64) {
c.mu.Lock()

for _, s := range t.WeightedTrace {
Expand Down
8 changes: 4 additions & 4 deletions cmd/trace-agent/concentrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestConcentratorOldestTs(t *testing.T) {
trace.ComputeTopLevel()
wt := model.NewWeightedTrace(trace, trace.GetRoot())

testTrace := processedTrace{
testTrace := model.ProcessedTrace{
Env: "none",
Trace: trace,
WeightedTrace: wt,
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestConcentratorStatsTotals(t *testing.T) {
trace.ComputeTopLevel()
wt := model.NewWeightedTrace(trace, trace.GetRoot())

testTrace := processedTrace{
testTrace := model.ProcessedTrace{
Env: "none",
Trace: trace,
WeightedTrace: wt,
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestConcentratorStatsCounts(t *testing.T) {
trace.ComputeTopLevel()
wt := model.NewWeightedTrace(trace, trace.GetRoot())

testTrace := processedTrace{
testTrace := model.ProcessedTrace{
Env: "none",
Trace: trace,
WeightedTrace: wt,
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestConcentratorSublayersStatsCounts(t *testing.T) {
sublayers[subtrace.Root] = subtraceSublayers
}

testTrace := processedTrace{
testTrace := model.ProcessedTrace{
Env: "none",
Trace: trace,
WeightedTrace: wt,
Expand Down
6 changes: 4 additions & 2 deletions cmd/trace-agent/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"sync/atomic"
"time"

log "github.com/cihub/seelog"

"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/info"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/sampler"
"github.com/DataDog/datadog-trace-agent/watchdog"
log "github.com/cihub/seelog"
)

// Sampler chooses wich spans to write to the API
Expand Down Expand Up @@ -62,7 +64,7 @@ func (s *Sampler) Run() {
}

// Add samples a trace and returns true if trace was sampled (should be kept), false otherwise
func (s *Sampler) Add(t processedTrace) (sampled bool, rate float64) {
func (s *Sampler) Add(t model.ProcessedTrace) (sampled bool, rate float64) {
atomic.AddUint64(&s.totalTraceCount, 1)
sampled, rate = s.engine.Sample(t.Trace, t.Root, t.Env)
if sampled {
Expand Down
Loading

0 comments on commit ff459b9

Please sign in to comment.