From b07d45f1a27764ab4b23c22e7c670944ac226c90 Mon Sep 17 00:00:00 2001 From: Alexandre Fonseca Date: Thu, 25 Oct 2018 12:59:05 +0200 Subject: [PATCH] Cleanup code related to APM events. --- cmd/trace-agent/agent.go | 84 +++++++------- cmd/trace-agent/agent_test.go | 10 +- cmd/trace-agent/concentrator.go | 4 +- cmd/trace-agent/concentrator_test.go | 8 +- cmd/trace-agent/sampler.go | 6 +- cmd/trace-agent/transaction_sampler.go | 108 ------------------ event/extractor.go | 29 +++++ event/internal/extractor/fixed_rate.go | 52 +++++++++ .../internal/extractor/fixed_rate_test.go | 19 +-- event/internal/extractor/legacy.go | 50 ++++++++ event/internal/extractor/noop.go | 15 +++ info/info.go | 2 +- info/testdata/okay.info | 2 +- info/testdata/okay.json | 2 +- info/writer.go | 2 +- model/event.go | 7 ++ model/processed_trace.go | 21 ++++ model/span.go | 18 +++ sampler/prioritysampler.go | 4 +- sampler/prioritysampler_test.go | 21 ++-- writer/trace_writer.go | 63 +++++----- writer/trace_writer_test.go | 51 +++++---- 22 files changed, 336 insertions(+), 242 deletions(-) delete mode 100644 cmd/trace-agent/transaction_sampler.go create mode 100644 event/extractor.go create mode 100644 event/internal/extractor/fixed_rate.go rename cmd/trace-agent/transaction_sampler_test.go => event/internal/extractor/fixed_rate_test.go (84%) create mode 100644 event/internal/extractor/legacy.go create mode 100644 event/internal/extractor/noop.go create mode 100644 model/event.go create mode 100644 model/processed_trace.go diff --git a/cmd/trace-agent/agent.go b/cmd/trace-agent/agent.go index d2c5c6324..7af3ce4c8 100644 --- a/cmd/trace-agent/agent.go +++ b/cmd/trace-agent/agent.go @@ -9,6 +9,7 @@ import ( "github.com/DataDog/datadog-trace-agent/api" "github.com/DataDog/datadog-trace-agent/config" + "github.com/DataDog/datadog-trace-agent/event" "github.com/DataDog/datadog-trace-agent/filters" "github.com/DataDog/datadog-trace-agent/info" "github.com/DataDog/datadog-trace-agent/model" @@ -22,29 +23,6 @@ import ( const processStatsInterval = time.Minute -type processedTrace struct { - Trace model.Trace - WeightedTrace model.WeightedTrace - Root *model.Span - Env string - Sublayers map[*model.Span][]model.SublayerValue -} - -func (pt *processedTrace) weight() float64 { - if pt.Root == nil { - return 1.0 - } - return pt.Root.Weight() -} - -func (pt *processedTrace) getSamplingPriority() (int, bool) { - if pt.Root == nil { - return 0, false - } - p, ok := pt.Root.Metrics[sampler.SamplingPriorityKey] - return int(p), ok -} - // Agent struct holds all the sub-routines structs and make the data flow between them type Agent struct { Receiver *api.HTTPReceiver @@ -54,7 +32,7 @@ type Agent struct { ScoreSampler *Sampler ErrorsScoreSampler *Sampler PrioritySampler *Sampler - TransactionSampler TransactionSampler + EventExtractor event.Extractor TraceWriter *writer.TraceWriter ServiceWriter *writer.ServiceWriter StatsWriter *writer.StatsWriter @@ -65,7 +43,7 @@ type Agent struct { // tags based on their type. obfuscator *obfuscate.Obfuscator - sampledTraceChan chan *writer.SampledTrace + tracePkgChan chan *writer.TracePackage // config conf *config.AgentConfig @@ -82,7 +60,7 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent { // inter-component channels rawTraceChan := make(chan model.Trace, 5000) // about 1000 traces/sec for 5 sec, TODO: move to *model.Trace - sampledTraceChan := make(chan *writer.SampledTrace) + tracePkgChan := make(chan *writer.TracePackage) statsChan := make(chan []model.StatsBucket) serviceChan := make(chan model.ServicesMetadata, 50) filteredServiceChan := make(chan model.ServicesMetadata, 50) @@ -99,10 +77,10 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent { ss := NewScoreSampler(conf) ess := NewErrorsSampler(conf) ps := NewPrioritySampler(conf, dynConf) - ts := NewTransactionSampler(conf) + ee := eventExtractorFromConf(conf) se := NewTraceServiceExtractor(serviceChan) sm := NewServiceMapper(serviceChan, filteredServiceChan) - tw := writer.NewTraceWriter(conf, sampledTraceChan) + tw := writer.NewTraceWriter(conf, tracePkgChan) sw := writer.NewStatsWriter(conf, statsChan) svcW := writer.NewServiceWriter(conf, filteredServiceChan) @@ -114,14 +92,14 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent { ScoreSampler: ss, ErrorsScoreSampler: ess, PrioritySampler: ps, - TransactionSampler: ts, + EventExtractor: ee, TraceWriter: tw, StatsWriter: sw, ServiceWriter: svcW, ServiceExtractor: se, ServiceMapper: sm, obfuscator: obf, - sampledTraceChan: sampledTraceChan, + tracePkgChan: tracePkgChan, conf: conf, dynConf: dynConf, ctx: ctx, @@ -191,7 +169,7 @@ func (a *Agent) Process(t model.Trace) { ts := a.Receiver.Stats.GetTagStats(info.Tags{}) // Extract priority early, as later goroutines might manipulate the Metrics map in parallel which isn't safe. - priority, hasPriority := root.Metrics[sampler.SamplingPriorityKey] + priority, hasPriority := root.GetSamplingPriority() // Depending on the sampling priority, count that trace differently. stat := &ts.TracesPriorityNone @@ -241,7 +219,7 @@ func (a *Agent) Process(t model.Trace) { model.SetSublayersOnSpan(subtrace.Root, subtraceSublayers) } - pt := processedTrace{ + pt := model.ProcessedTrace{ Trace: t, WeightedTrace: model.NewWeightedTrace(t, root), Root: root, @@ -258,38 +236,44 @@ func (a *Agent) Process(t model.Trace) { a.ServiceExtractor.Process(pt.WeightedTrace) }() - go func() { + go func(trace model.ProcessedTrace) { defer watchdog.LogOnPanic() // Everything is sent to concentrator for stats, regardless of sampling. a.Concentrator.Add(pt) - }() + }(pt) // Don't go through sampling for < 0 priority traces if priority < 0 { return } // Run both full trace sampling and transaction extraction in another goroutine. - go func() { + go func(trace model.ProcessedTrace) { defer watchdog.LogOnPanic() + tracePkg := writer.TracePackage{} + sampled, rate := a.sample(pt) - if !sampled { - return + + if sampled { + pt.Sampled = sampled + sampler.AddSampleRate(pt.Root, rate) + tracePkg.Trace = &pt.Trace } - sampler.AddSampleRate(pt.Root, rate) - a.sampledTraceChan <- &writer.SampledTrace{ - Trace: &pt.Trace, - Transactions: a.TransactionSampler.Extract(pt), + // NOTE: Events can be extracted from non-sampled traces. + tracePkg.Events = a.EventExtractor.Extract(pt) + + if !tracePkg.Empty() { + a.tracePkgChan <- &tracePkg } - }() + }(pt) } -func (a *Agent) sample(pt processedTrace) (sampled bool, rate float64) { +func (a *Agent) sample(pt model.ProcessedTrace) (sampled bool, rate float64) { var sampledPriority, sampledScore bool var ratePriority, rateScore float64 - if _, ok := pt.Root.Metrics[sampler.SamplingPriorityKey]; ok { + if _, ok := pt.GetSamplingPriority(); ok { sampledPriority, ratePriority = a.PrioritySampler.Add(pt) } @@ -343,3 +327,15 @@ func traceContainsError(trace model.Trace) bool { } return false } + +func eventExtractorFromConf(conf *config.AgentConfig) event.Extractor { + if len(conf.AnalyzedSpansByService) > 0 { + return event.NewFixedRateExtractor(conf.AnalyzedSpansByService) + } + if len(conf.AnalyzedRateByServiceLegacy) > 0 { + return event.NewLegacyExtractor(conf.AnalyzedRateByServiceLegacy) + } + + // TODO: Replace disabled extractor with TaggedExtractor + return event.NewNoopExtractor() +} diff --git a/cmd/trace-agent/agent_test.go b/cmd/trace-agent/agent_test.go index 1e313b013..b191aac2e 100644 --- a/cmd/trace-agent/agent_test.go +++ b/cmd/trace-agent/agent_test.go @@ -192,8 +192,8 @@ func TestProcess(t *testing.T) { defer cancel() now := time.Now() - disabled := float64(-99) - for _, key := range []float64{ + disabled := int(-99) + for _, key := range []int{ disabled, -1, -1, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2, } { span := &model.Span{ @@ -204,7 +204,7 @@ func TestProcess(t *testing.T) { Metrics: map[string]float64{}, } if key != disabled { - span.Metrics[sampler.SamplingPriorityKey] = key + span.SetSamplingPriority(key) } agent.Process(model.Trace{span}) } @@ -339,9 +339,9 @@ func TestSampling(t *testing.T) { if tt.hasErrors { root.Error = 1 } - pt := processedTrace{Trace: model.Trace{root}, Root: root} + pt := model.ProcessedTrace{Trace: model.Trace{root}, Root: root} if tt.hasPriority { - pt.Root.Metrics[sampler.SamplingPriorityKey] = 1 + pt.Root.SetSamplingPriority(1) } sampled, rate := a.sample(pt) diff --git a/cmd/trace-agent/concentrator.go b/cmd/trace-agent/concentrator.go index 409b99487..5c1b38686 100644 --- a/cmd/trace-agent/concentrator.go +++ b/cmd/trace-agent/concentrator.go @@ -103,11 +103,11 @@ func (c *Concentrator) Stop() { } // Add appends to the proper stats bucket this trace's statistics -func (c *Concentrator) Add(t processedTrace) { +func (c *Concentrator) Add(t model.ProcessedTrace) { c.addNow(t, model.Now()) } -func (c *Concentrator) addNow(t processedTrace, now int64) { +func (c *Concentrator) addNow(t model.ProcessedTrace, now int64) { c.mu.Lock() for _, s := range t.WeightedTrace { diff --git a/cmd/trace-agent/concentrator_test.go b/cmd/trace-agent/concentrator_test.go index f57e9266b..4b3180610 100644 --- a/cmd/trace-agent/concentrator_test.go +++ b/cmd/trace-agent/concentrator_test.go @@ -62,7 +62,7 @@ func TestConcentratorOldestTs(t *testing.T) { trace.ComputeTopLevel() wt := model.NewWeightedTrace(trace, trace.GetRoot()) - testTrace := processedTrace{ + testTrace := model.ProcessedTrace{ Env: "none", Trace: trace, WeightedTrace: wt, @@ -173,7 +173,7 @@ func TestConcentratorStatsTotals(t *testing.T) { trace.ComputeTopLevel() wt := model.NewWeightedTrace(trace, trace.GetRoot()) - testTrace := processedTrace{ + testTrace := model.ProcessedTrace{ Env: "none", Trace: trace, WeightedTrace: wt, @@ -279,7 +279,7 @@ func TestConcentratorStatsCounts(t *testing.T) { trace.ComputeTopLevel() wt := model.NewWeightedTrace(trace, trace.GetRoot()) - testTrace := processedTrace{ + testTrace := model.ProcessedTrace{ Env: "none", Trace: trace, WeightedTrace: wt, @@ -358,7 +358,7 @@ func TestConcentratorSublayersStatsCounts(t *testing.T) { sublayers[subtrace.Root] = subtraceSublayers } - testTrace := processedTrace{ + testTrace := model.ProcessedTrace{ Env: "none", Trace: trace, WeightedTrace: wt, diff --git a/cmd/trace-agent/sampler.go b/cmd/trace-agent/sampler.go index 48fabfede..522c057b4 100644 --- a/cmd/trace-agent/sampler.go +++ b/cmd/trace-agent/sampler.go @@ -6,11 +6,13 @@ import ( "sync/atomic" "time" + log "github.com/cihub/seelog" + "github.com/DataDog/datadog-trace-agent/config" "github.com/DataDog/datadog-trace-agent/info" + "github.com/DataDog/datadog-trace-agent/model" "github.com/DataDog/datadog-trace-agent/sampler" "github.com/DataDog/datadog-trace-agent/watchdog" - log "github.com/cihub/seelog" ) // Sampler chooses wich spans to write to the API @@ -62,7 +64,7 @@ func (s *Sampler) Run() { } // Add samples a trace and returns true if trace was sampled (should be kept), false otherwise -func (s *Sampler) Add(t processedTrace) (sampled bool, rate float64) { +func (s *Sampler) Add(t model.ProcessedTrace) (sampled bool, rate float64) { atomic.AddUint64(&s.totalTraceCount, 1) sampled, rate = s.engine.Sample(t.Trace, t.Root, t.Env) if sampled { diff --git a/cmd/trace-agent/transaction_sampler.go b/cmd/trace-agent/transaction_sampler.go deleted file mode 100644 index 204ace254..000000000 --- a/cmd/trace-agent/transaction_sampler.go +++ /dev/null @@ -1,108 +0,0 @@ -package main - -import ( - "github.com/DataDog/datadog-trace-agent/config" - "github.com/DataDog/datadog-trace-agent/model" - "github.com/DataDog/datadog-trace-agent/sampler" -) - -// TransactionSampler filters and samples interesting spans in a trace based on implementation specific criteria. -type TransactionSampler interface { - // Extract extracts matching spans from the given trace and returns them. - Extract(t processedTrace) []*model.Span -} - -// NewTransactionSampler creates a new empty transaction sampler -func NewTransactionSampler(conf *config.AgentConfig) TransactionSampler { - if len(conf.AnalyzedSpansByService) > 0 { - return newTransactionSampler(conf.AnalyzedSpansByService) - } - if len(conf.AnalyzedRateByServiceLegacy) > 0 { - return newLegacyTransactionSampler(conf.AnalyzedRateByServiceLegacy) - } - return &disabledTransactionSampler{} -} - -type disabledTransactionSampler struct{} - -func (s *disabledTransactionSampler) Extract(t processedTrace) []*model.Span { - return nil -} - -type transactionSampler struct { - analyzedSpansByService map[string]map[string]float64 -} - -func newTransactionSampler(analyzedSpansByService map[string]map[string]float64) *transactionSampler { - return &transactionSampler{ - analyzedSpansByService: analyzedSpansByService, - } -} - -// Extract extracts analyzed spans and returns them as a slice -func (s *transactionSampler) Extract(t processedTrace) []*model.Span { - var transactions []*model.Span - - // Get the trace priority - priority, hasPriority := t.getSamplingPriority() - // inspect the WeightedTrace so that we can identify top-level spans - for _, span := range t.WeightedTrace { - if s.shouldAnalyze(span, hasPriority, priority) { - transactions = append(transactions, span.Span) - } - } - - return transactions -} - -func (s *transactionSampler) shouldAnalyze(span *model.WeightedSpan, hasPriority bool, priority int) bool { - if operations, ok := s.analyzedSpansByService[span.Service]; ok { - if analyzeRate, ok := operations[span.Name]; ok { - // If the trace has been manually sampled, we keep all matching spans - highPriority := hasPriority && priority >= 2 - if highPriority || sampler.SampleByRate(span.TraceID, analyzeRate) { - return true - } - } - } - return false -} - -type legacyTransactionSampler struct { - analyzedRateByService map[string]float64 -} - -func newLegacyTransactionSampler(analyzedRateByService map[string]float64) *legacyTransactionSampler { - return &legacyTransactionSampler{ - analyzedRateByService: analyzedRateByService, - } -} - -// Extract extracts analyzed spans and returns them as a slice -func (s *legacyTransactionSampler) Extract(t processedTrace) []*model.Span { - var transactions []*model.Span - - // inspect the WeightedTrace so that we can identify top-level spans - for _, span := range t.WeightedTrace { - if s.shouldAnalyze(span) { - transactions = append(transactions, span.Span) - } - } - - return transactions -} - -// shouldAnalyze tells if a span should be considered as analyzed -// Only top-level spans are eligible to be analyzed -func (s *legacyTransactionSampler) shouldAnalyze(span *model.WeightedSpan) bool { - if !span.TopLevel { - return false - } - - if analyzeRate, ok := s.analyzedRateByService[span.Service]; ok { - if sampler.SampleByRate(span.TraceID, analyzeRate) { - return true - } - } - return false -} diff --git a/event/extractor.go b/event/extractor.go new file mode 100644 index 000000000..59e8632d2 --- /dev/null +++ b/event/extractor.go @@ -0,0 +1,29 @@ +package event + +import ( + "github.com/DataDog/datadog-trace-agent/event/internal/extractor" + "github.com/DataDog/datadog-trace-agent/model" +) + +// Extractor extracts APM event spans from a trace. +type Extractor interface { + // Extract extracts APM event spans from the given weighted trace information and returns them. + Extract(t model.ProcessedTrace) []*model.APMEvent +} + +// NewNoopExtractor returns a new APM event extractor that does not extract any events. +func NewNoopExtractor() Extractor { + return extractor.NewNoop() +} + +// NewLegacyExtractor returns an APM event extractor that extracts APM events from a trace following the specified +// extraction rates for any spans matching a specific service. +func NewLegacyExtractor(rateByService map[string]float64) Extractor { + return extractor.NewLegacy(rateByService) +} + +// NewFixedRateExtractor returns an APM event extractor that extracts APM events from a trace following the provided +// extraction rates for any spans matching a (service name, operation name) pair. +func NewFixedRateExtractor(rateByServiceAndName map[string]map[string]float64) Extractor { + return extractor.NewFixedRate(rateByServiceAndName) +} diff --git a/event/internal/extractor/fixed_rate.go b/event/internal/extractor/fixed_rate.go new file mode 100644 index 000000000..88d6bf1bc --- /dev/null +++ b/event/internal/extractor/fixed_rate.go @@ -0,0 +1,52 @@ +package extractor + +import ( + "github.com/DataDog/datadog-trace-agent/model" + "github.com/DataDog/datadog-trace-agent/sampler" +) + +// FixedRate is an event extractor that extracts APM events from traces based on +// `(service name, operation name) => sampling rate` mappings. +type FixedRate struct { + rateByServiceAndName map[string]map[string]float64 +} + +// NewFixedRate returns an APM event extractor that extracts APM events from a trace following the provided +// extraction rates for any spans matching a (service name, operation name) pair. +func NewFixedRate(rateByServiceAndName map[string]map[string]float64) *FixedRate { + return &FixedRate{ + rateByServiceAndName: rateByServiceAndName, + } +} + +// Extract extracts analyzed spans from the trace and returns them as a slice +func (s *FixedRate) Extract(t model.ProcessedTrace) []*model.APMEvent { + var events []*model.APMEvent + + // Get the trace priority + priority, hasPriority := t.GetSamplingPriority() + + for _, span := range t.WeightedTrace { + if s.shouldAnalyze(span, hasPriority, priority) { + events = append(events, &model.APMEvent{ + Span: span.Span, + TraceSampled: t.Sampled, + }) + } + } + + return events +} + +func (s *FixedRate) shouldAnalyze(span *model.WeightedSpan, hasPriority bool, priority int) bool { + if operations, ok := s.rateByServiceAndName[span.Service]; ok { + if analyzeRate, ok := operations[span.Name]; ok { + // If the trace has been manually sampled, we keep all matching spans + highPriority := hasPriority && priority >= 2 + if highPriority || sampler.SampleByRate(span.TraceID, analyzeRate) { + return true + } + } + } + return false +} diff --git a/cmd/trace-agent/transaction_sampler_test.go b/event/internal/extractor/fixed_rate_test.go similarity index 84% rename from cmd/trace-agent/transaction_sampler_test.go rename to event/internal/extractor/fixed_rate_test.go index b4a1b8677..4e1d56276 100644 --- a/cmd/trace-agent/transaction_sampler_test.go +++ b/event/internal/extractor/fixed_rate_test.go @@ -1,24 +1,24 @@ -package main +package extractor import ( "fmt" + "math/rand" "testing" "github.com/DataDog/datadog-trace-agent/model" - "github.com/DataDog/datadog-trace-agent/sampler" "github.com/stretchr/testify/assert" ) -func createTrace(serviceName string, operationName string, topLevel bool, hasPriority bool, priority int) processedTrace { +func createTrace(serviceName string, operationName string, topLevel bool, hasPriority bool, priority int) model.ProcessedTrace { ws := model.WeightedSpan{TopLevel: topLevel, Span: &model.Span{Service: serviceName, Name: operationName}} if hasPriority { - ws.SetMetric(sampler.SamplingPriorityKey, float64(priority)) + ws.SetSamplingPriority(priority) } wt := model.WeightedTrace{&ws} - return processedTrace{WeightedTrace: wt, Root: ws.Span} + return model.ProcessedTrace{WeightedTrace: wt, Root: ws.Span} } -func TestTransactionSampler(t *testing.T) { +func TestAnalyzedExtractor(t *testing.T) { assert := assert.New(t) config := make(map[string]map[string]float64) @@ -30,7 +30,7 @@ func TestTransactionSampler(t *testing.T) { tests := []struct { name string - trace processedTrace + trace model.ProcessedTrace expectedSampling bool }{ {"Top-level service and span name match", createTrace("myService", "myOperation", true, false, 0), true}, @@ -48,8 +48,9 @@ func TestTransactionSampler(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ts := newTransactionSampler(config) - analyzedSpans := ts.Extract(test.trace) + ae := NewFixedRate(config) + test.trace.Sampled = rand.Int() > 0 + analyzedSpans := ae.Extract(test.trace) if test.expectedSampling { assert.Len(analyzedSpans, 1, fmt.Sprintf("Trace %v should have been sampled", test.trace)) diff --git a/event/internal/extractor/legacy.go b/event/internal/extractor/legacy.go new file mode 100644 index 000000000..478a0ec83 --- /dev/null +++ b/event/internal/extractor/legacy.go @@ -0,0 +1,50 @@ +package extractor + +import ( + "github.com/DataDog/datadog-trace-agent/model" + "github.com/DataDog/datadog-trace-agent/sampler" +) + +// Legacy is an event extractor that extracts APM events from traces based on `serviceName => sampling +// ratio` mappings. +type Legacy struct { + rateByService map[string]float64 +} + +// NewLegacy returns an APM event extractor that extracts APM events from a trace following the specified +// extraction rates for any spans matching a specific service. +func NewLegacy(rateByService map[string]float64) *Legacy { + return &Legacy{ + rateByService: rateByService, + } +} + +// Extract extracts apm events from the trace and returns them as a slice. +func (s *Legacy) Extract(t model.ProcessedTrace) []*model.APMEvent { + var events []*model.APMEvent + + for _, span := range t.WeightedTrace { + if s.shouldExtractEvent(span) { + events = append(events, &model.APMEvent{ + Span: span.Span, + TraceSampled: t.Sampled, + }) + } + } + + return events +} + +func (s *Legacy) shouldExtractEvent(span *model.WeightedSpan) bool { + if !span.TopLevel { + return false + } + + if analyzeRate, ok := s.rateByService[span.Service]; ok { + if sampler.SampleByRate(span.TraceID, analyzeRate) { + return true + } + } + + return false +} diff --git a/event/internal/extractor/noop.go b/event/internal/extractor/noop.go new file mode 100644 index 000000000..79dc767ce --- /dev/null +++ b/event/internal/extractor/noop.go @@ -0,0 +1,15 @@ +package extractor + +import "github.com/DataDog/datadog-trace-agent/model" + +// Noop is a no-op APM event extractor used when APM event extraction is disabled. +type Noop struct{} + +// NewNoop returns a new APM event extractor that does not extract any events. +func NewNoop() *Noop { + return &Noop{} +} + +func (s *Noop) Extract(t model.ProcessedTrace) []*model.APMEvent { + return nil +} diff --git a/info/info.go b/info/info.go index 47a652ff5..4d4cfc625 100644 --- a/info/info.go +++ b/info/info.go @@ -87,7 +87,7 @@ const ( --- Writer stats (1 min) --- - Traces: {{.Status.TraceWriter.Payloads}} payloads, {{.Status.TraceWriter.Traces}} traces, {{if gt .Status.TraceWriter.Transactions 0}}{{.Status.TraceWriter.Transactions}} transactions, {{end}}{{.Status.TraceWriter.Bytes}} bytes + Traces: {{.Status.TraceWriter.Payloads}} payloads, {{.Status.TraceWriter.Traces}} traces, {{if gt .Status.TraceWriter.Events 0}}{{.Status.TraceWriter.Events}} events, {{end}}{{.Status.TraceWriter.Bytes}} bytes {{if gt .Status.TraceWriter.Errors 0}}WARNING: Traces API errors (1 min): {{.Status.TraceWriter.Errors}}{{end}} Stats: {{.Status.StatsWriter.Payloads}} payloads, {{.Status.StatsWriter.StatsBuckets}} stats buckets, {{.Status.StatsWriter.Bytes}} bytes {{if gt .Status.StatsWriter.Errors 0}}WARNING: Stats API errors (1 min): {{.Status.StatsWriter.Errors}}{{end}} diff --git a/info/testdata/okay.info b/info/testdata/okay.info index d7153e562..71e3d1bb4 100644 --- a/info/testdata/okay.info +++ b/info/testdata/okay.info @@ -23,6 +23,6 @@ Trace Agent (v 0.99.0) --- Writer stats (1 min) --- - Traces: 4 payloads, 26 traces, 123 transactions, 3245 bytes + Traces: 4 payloads, 26 traces, 123 events, 3245 bytes Stats: 6 payloads, 12 stats buckets, 8329 bytes Services: 1 payloads, 2 services, 1234 bytes diff --git a/info/testdata/okay.json b/info/testdata/okay.json index abdf44504..1d0f8801c 100644 --- a/info/testdata/okay.json +++ b/info/testdata/okay.json @@ -1,7 +1,7 @@ { "cmdline": ["./trace-agent"], "config": {"Enabled":true,"Hostname":"localhost.localdomain","DefaultEnv":"none","Endpoints":[{"Host": "https://trace1.agent.datadoghq.com"}, {"Host": "https://trace2.agent.datadoghq.com"}],"APIPayloadBufferMaxSize":16777216,"BucketInterval":10000000000,"ExtraAggregators":[],"ExtraSampleRate":1,"MaxTPS":10,"ReceiverHost":"localhost","ReceiverPort":8126,"ConnectionLimit":2000,"ReceiverTimeout":0,"StatsdHost":"127.0.0.1","StatsdPort":8125,"LogLevel":"INFO","LogFilePath":"/var/log/datadog/trace-agent.log"}, - "trace_writer": {"Payloads":4,"Bytes":3245,"Traces":26,"Transactions":123,"Errors":0}, + "trace_writer": {"Payloads":4,"Bytes":3245,"Traces":26,"Events":123,"Errors":0}, "stats_writer": {"Payloads":6,"Bytes":8329,"StatsBuckets":12,"Errors":0}, "service_writer": {"Payloads":1,"Bytes":1234,"Services":2,"Errors":0}, "memstats": {"Alloc":773552,"TotalAlloc":773552,"Sys":3346432,"Lookups":6,"Mallocs":7231,"Frees":561,"HeapAlloc":773552,"HeapSys":1572864,"HeapIdle":49152,"HeapInuse":1523712,"HeapReleased":0,"HeapObjects":6670,"StackInuse":524288,"StackSys":524288,"MSpanInuse":24480,"MSpanSys":32768,"MCacheInuse":4800,"MCacheSys":16384,"BuckHashSys":2675,"GCSys":131072,"OtherSys":1066381,"NextGC":4194304,"LastGC":0,"PauseTotalNs":0,"PauseNs":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"PauseEnd":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"NumGC":0,"GCCPUFraction":0,"EnableGC":true,"DebugGC":false,"BySize":[{"Size":0,"Mallocs":0,"Frees":0},{"Size":8,"Mallocs":126,"Frees":0},{"Size":16,"Mallocs":825,"Frees":0},{"Size":32,"Mallocs":4208,"Frees":0},{"Size":48,"Mallocs":345,"Frees":0},{"Size":64,"Mallocs":262,"Frees":0},{"Size":80,"Mallocs":93,"Frees":0},{"Size":96,"Mallocs":70,"Frees":0},{"Size":112,"Mallocs":97,"Frees":0},{"Size":128,"Mallocs":24,"Frees":0},{"Size":144,"Mallocs":25,"Frees":0},{"Size":160,"Mallocs":57,"Frees":0},{"Size":176,"Mallocs":128,"Frees":0},{"Size":192,"Mallocs":13,"Frees":0},{"Size":208,"Mallocs":77,"Frees":0},{"Size":224,"Mallocs":3,"Frees":0},{"Size":240,"Mallocs":2,"Frees":0},{"Size":256,"Mallocs":17,"Frees":0},{"Size":288,"Mallocs":64,"Frees":0},{"Size":320,"Mallocs":12,"Frees":0},{"Size":352,"Mallocs":20,"Frees":0},{"Size":384,"Mallocs":1,"Frees":0},{"Size":416,"Mallocs":59,"Frees":0},{"Size":448,"Mallocs":0,"Frees":0},{"Size":480,"Mallocs":3,"Frees":0},{"Size":512,"Mallocs":2,"Frees":0},{"Size":576,"Mallocs":17,"Frees":0},{"Size":640,"Mallocs":6,"Frees":0},{"Size":704,"Mallocs":10,"Frees":0},{"Size":768,"Mallocs":0,"Frees":0},{"Size":896,"Mallocs":11,"Frees":0},{"Size":1024,"Mallocs":11,"Frees":0},{"Size":1152,"Mallocs":12,"Frees":0},{"Size":1280,"Mallocs":2,"Frees":0},{"Size":1408,"Mallocs":2,"Frees":0},{"Size":1536,"Mallocs":0,"Frees":0},{"Size":1664,"Mallocs":10,"Frees":0},{"Size":2048,"Mallocs":17,"Frees":0},{"Size":2304,"Mallocs":7,"Frees":0},{"Size":2560,"Mallocs":1,"Frees":0},{"Size":2816,"Mallocs":1,"Frees":0},{"Size":3072,"Mallocs":1,"Frees":0},{"Size":3328,"Mallocs":7,"Frees":0},{"Size":4096,"Mallocs":4,"Frees":0},{"Size":4608,"Mallocs":1,"Frees":0},{"Size":5376,"Mallocs":6,"Frees":0},{"Size":6144,"Mallocs":4,"Frees":0},{"Size":6400,"Mallocs":0,"Frees":0},{"Size":6656,"Mallocs":1,"Frees":0},{"Size":6912,"Mallocs":0,"Frees":0},{"Size":8192,"Mallocs":0,"Frees":0},{"Size":8448,"Mallocs":0,"Frees":0},{"Size":8704,"Mallocs":1,"Frees":0},{"Size":9472,"Mallocs":0,"Frees":0},{"Size":10496,"Mallocs":0,"Frees":0},{"Size":12288,"Mallocs":1,"Frees":0},{"Size":13568,"Mallocs":0,"Frees":0},{"Size":14080,"Mallocs":0,"Frees":0},{"Size":16384,"Mallocs":0,"Frees":0},{"Size":16640,"Mallocs":0,"Frees":0},{"Size":17664,"Mallocs":1,"Frees":0}]}, diff --git a/info/writer.go b/info/writer.go index dd8ecdcff..636d21c37 100644 --- a/info/writer.go +++ b/info/writer.go @@ -4,7 +4,7 @@ package info type TraceWriterInfo struct { Payloads int64 Traces int64 - Transactions int64 + Events int64 Spans int64 Errors int64 Retries int64 diff --git a/model/event.go b/model/event.go new file mode 100644 index 000000000..615b48959 --- /dev/null +++ b/model/event.go @@ -0,0 +1,7 @@ +package model + +// APMEvent is an event extracted from received traces and sent to Datadog's Trace Search functionality. +type APMEvent struct { + Span *Span + TraceSampled bool +} diff --git a/model/processed_trace.go b/model/processed_trace.go new file mode 100644 index 000000000..ebdb178c8 --- /dev/null +++ b/model/processed_trace.go @@ -0,0 +1,21 @@ +package model + +type ProcessedTrace struct { + Trace Trace + WeightedTrace WeightedTrace + Root *Span + Env string + Sublayers map[*Span][]SublayerValue + Sampled bool +} + +func (pt *ProcessedTrace) Weight() float64 { + if pt.Root == nil { + return 1.0 + } + return pt.Root.Weight() +} + +func (pt *ProcessedTrace) GetSamplingPriority() (int, bool) { + return pt.Root.GetSamplingPriority() +} diff --git a/model/span.go b/model/span.go index e8f37a9b4..606dc38f1 100644 --- a/model/span.go +++ b/model/span.go @@ -9,6 +9,9 @@ const ( SpanSampleRateMetricKey = "_sample_rate" // Fake type of span to indicate it is time to flush flushMarkerType = "_FLUSH_MARKER" + + // SamplingPriorityKey is the key of the sampling priority value in the metrics map of the root span + SamplingPriorityKey = "_sampling_priority_v1" ) // RandomID generates a random uint64 that we use for IDs @@ -52,3 +55,18 @@ func (s *Span) SetMetric(key string, val float64) { } s.Metrics[key] = val } + +// GetSamplingPriority returns the value of the sampling priority metric set on this span and a boolean indicating if +// such a metric was actually found or not. +func (s *Span) GetSamplingPriority() (int, bool) { + if s == nil { + return 0, false + } + p, ok := s.Metrics[SamplingPriorityKey] + return int(p), ok +} + +// SetSamplingPriority sets the sampling priority value on this span, overwriting any previously set value. +func (s *Span) SetSamplingPriority(priority int) { + s.SetMetric(SamplingPriorityKey, float64(priority)) +} diff --git a/sampler/prioritysampler.go b/sampler/prioritysampler.go index 47805d2ba..0e8f7ff64 100644 --- a/sampler/prioritysampler.go +++ b/sampler/prioritysampler.go @@ -22,8 +22,6 @@ import ( ) const ( - // SamplingPriorityKey is the key of the sampling priority value in the metrics map of the root span - SamplingPriorityKey = "_sampling_priority_v1" // SamplingPriorityRateKey is the metrics key holding the sampling rate at which this trace // was sampled. SamplingPriorityRateKey = "_sampling_priority_rate_v1" @@ -95,7 +93,7 @@ func (s *PriorityEngine) Sample(trace model.Trace, root *model.Span, env string) return false, 0 } - samplingPriority := root.Metrics[SamplingPriorityKey] + samplingPriority, _ := root.GetSamplingPriority() // Regardless of rates, sampling here is based on the metadata set // by the client library. Which, is turn, is based on agent hints, diff --git a/sampler/prioritysampler_test.go b/sampler/prioritysampler_test.go index 2242b79e2..b34108de3 100644 --- a/sampler/prioritysampler_test.go +++ b/sampler/prioritysampler_test.go @@ -5,9 +5,10 @@ import ( "math/rand" "testing" + log "github.com/cihub/seelog" + "github.com/DataDog/datadog-trace-agent/config" "github.com/DataDog/datadog-trace-agent/model" - log "github.com/cihub/seelog" "github.com/stretchr/testify/assert" ) @@ -35,7 +36,7 @@ func getTestTraceWithService(t *testing.T, service string, s *PriorityEngine) (m &model.Span{TraceID: tID, SpanID: 2, ParentID: 1, Start: 100, Duration: 200000, Service: service, Type: "sql"}, } r := rand.Float64() - priority := 0.0 + priority := 0 rates := s.getRateByService() key := byServiceKey(trace[0].Service, defaultEnv) var rate float64 @@ -47,7 +48,7 @@ func getTestTraceWithService(t *testing.T, service string, s *PriorityEngine) (m if r <= rate { priority = 1 } - trace[0].Metrics = map[string]float64{SamplingPriorityKey: priority} + trace[0].SetSamplingPriority(priority) return trace, trace[0] } @@ -66,7 +67,7 @@ func TestPrioritySample(t *testing.T) { s = getTestPriorityEngine() trace, root = getTestTraceWithService(t, "my-service", s) - root.Metrics[SamplingPriorityKey] = -1 + root.SetSamplingPriority(-1) sampled, rate := s.Sample(trace, root, env) assert.False(sampled, "trace with negative priority is dropped") assert.Equal(0.0, rate, "dropping all traces") @@ -76,7 +77,7 @@ func TestPrioritySample(t *testing.T) { s = getTestPriorityEngine() trace, root = getTestTraceWithService(t, "my-service", s) - root.Metrics[SamplingPriorityKey] = 0 + root.SetSamplingPriority(0) sampled, _ = s.Sample(trace, root, env) assert.False(sampled, "trace with priority 0 is dropped") assert.True(0.0 < s.Sampler.Backend.GetTotalScore(), "sampling a priority 0 trace should increase total score") @@ -85,7 +86,7 @@ func TestPrioritySample(t *testing.T) { s = getTestPriorityEngine() trace, root = getTestTraceWithService(t, "my-service", s) - root.Metrics[SamplingPriorityKey] = 1 + root.SetSamplingPriority(1) sampled, _ = s.Sample(trace, root, env) assert.True(sampled, "trace with priority 1 is kept") assert.True(0.0 < s.Sampler.Backend.GetTotalScore(), "sampling a priority 0 trace should increase total score") @@ -94,7 +95,7 @@ func TestPrioritySample(t *testing.T) { s = getTestPriorityEngine() trace, root = getTestTraceWithService(t, "my-service", s) - root.Metrics[SamplingPriorityKey] = 2 + root.SetSamplingPriority(2) sampled, rate = s.Sample(trace, root, env) assert.True(sampled, "trace with priority 2 is kept") assert.Equal(1.0, rate, "sampling all traces") @@ -104,14 +105,14 @@ func TestPrioritySample(t *testing.T) { s = getTestPriorityEngine() trace, root = getTestTraceWithService(t, "my-service", s) - root.Metrics[SamplingPriorityKey] = 999 + root.SetSamplingPriority(999) sampled, rate = s.Sample(trace, root, env) assert.True(sampled, "trace with high priority is kept") assert.Equal(1.0, rate, "sampling all traces") assert.Equal(0.0, s.Sampler.Backend.GetTotalScore(), "sampling a high priority trace should *NOT* increase total score") assert.Equal(0.0, s.Sampler.Backend.GetSampledScore(), "sampling a high priority trace should *NOT* increase sampled score") - delete(root.Metrics, SamplingPriorityKey) + delete(root.Metrics, model.SamplingPriorityKey) sampled, _ = s.Sample(trace, root, env) assert.False(sampled, "this should not happen but a trace without priority sampling set should be dropped") } @@ -125,7 +126,7 @@ func TestPrioritySampleTracerWeight(t *testing.T) { clientRate := 0.33 for i := 0; i < 10; i++ { trace, root := getTestTraceWithService(t, "my-service", s) - root.Metrics[SamplingPriorityKey] = float64(i % 2) + root.SetSamplingPriority(i % 2) root.Metrics[SamplingPriorityRateKey] = clientRate _, rate := s.Sample(trace, root, env) assert.Equal(clientRate, rate) diff --git a/writer/trace_writer.go b/writer/trace_writer.go index aee51a480..833aa2125 100644 --- a/writer/trace_writer.go +++ b/writer/trace_writer.go @@ -19,15 +19,18 @@ import ( const pathTraces = "/api/v0.2/traces" -// SampledTrace represents the result of a trace sample operation. -type SampledTrace struct { - Trace *model.Trace - Transactions []*model.Span +// TracePackage represents the result of a trace sampling operation. +// +// If a trace was sampled, then Trace will be set to that trace. Otherwise, it will be nil. +// If events were extracted from a trace, then Events will be populated from these events. Otherwise, it will be empty. +type TracePackage struct { + Trace *model.Trace + Events []*model.APMEvent } -// Empty returns true if this SampledTrace has no data. -func (s *SampledTrace) Empty() bool { - return s.Trace == nil && len(s.Transactions) == 0 +// Empty returns true if this TracePackage has no data. +func (s *TracePackage) Empty() bool { + return s.Trace == nil && len(s.Events) == 0 } // TraceWriter ingests sampled traces and flushes them to the API. @@ -36,10 +39,10 @@ type TraceWriter struct { hostName string env string conf writerconfig.TraceWriterConfig - in <-chan *SampledTrace + in <-chan *TracePackage traces []*model.APITrace - transactions []*model.Span + events []*model.Span spansInBuffer int sender payloadSender @@ -47,7 +50,7 @@ type TraceWriter struct { } // NewTraceWriter returns a new writer for traces. -func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWriter { +func NewTraceWriter(conf *config.AgentConfig, in <-chan *TracePackage) *TraceWriter { cfg := conf.TraceWriterConfig endpoints := newEndpoints(conf, pathTraces) sender := newMultiSender(endpoints, cfg.SenderConfig) @@ -58,8 +61,8 @@ func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWri hostName: conf.Hostname, env: conf.DefaultEnv, - traces: []*model.APITrace{}, - transactions: []*model.Span{}, + traces: []*model.APITrace{}, + events: []*model.Span{}, in: in, @@ -142,14 +145,14 @@ func (w *TraceWriter) Stop() { w.sender.Stop() } -func (w *TraceWriter) handleSampledTrace(sampledTrace *SampledTrace) { +func (w *TraceWriter) handleSampledTrace(sampledTrace *TracePackage) { if sampledTrace == nil || sampledTrace.Empty() { log.Debug("Ignoring empty sampled trace") return } trace := sampledTrace.Trace - transactions := sampledTrace.Transactions + events := sampledTrace.Events var n int @@ -157,8 +160,8 @@ func (w *TraceWriter) handleSampledTrace(sampledTrace *SampledTrace) { n += len(*trace) } - if transactions != nil { - n += len(transactions) + if events != nil { + n += len(events) } if w.spansInBuffer > 0 && w.spansInBuffer+n > w.conf.MaxSpansPerPayload { @@ -167,7 +170,7 @@ func (w *TraceWriter) handleSampledTrace(sampledTrace *SampledTrace) { } w.appendTrace(sampledTrace.Trace) - w.appendTransactions(sampledTrace.Transactions) + w.appendEvents(sampledTrace.Events) if n > w.conf.MaxSpansPerPayload { // If what we just added already goes over the limit, report this but lets carry on and flush @@ -187,13 +190,13 @@ func (w *TraceWriter) appendTrace(trace *model.Trace) { w.spansInBuffer += len(*trace) } -func (w *TraceWriter) appendTransactions(transactions []*model.Span) { - for _, transaction := range transactions { - log.Tracef("Handling new transaction: %v", transaction) - w.transactions = append(w.transactions, transaction) +func (w *TraceWriter) appendEvents(events []*model.APMEvent) { + for _, event := range events { + log.Tracef("Handling new APM event: %v", event) + w.events = append(w.events, event.Span) } - w.spansInBuffer += len(transactions) + w.spansInBuffer += len(events) } func (w *TraceWriter) flushDueToMaxSpansPerPayload() { @@ -203,22 +206,22 @@ func (w *TraceWriter) flushDueToMaxSpansPerPayload() { func (w *TraceWriter) flush() { numTraces := len(w.traces) - numTransactions := len(w.transactions) + numEvents := len(w.events) // If no traces, we can't construct anything - if numTraces == 0 && numTransactions == 0 { + if numTraces == 0 && numEvents == 0 { return } atomic.AddInt64(&w.stats.Traces, int64(numTraces)) - atomic.AddInt64(&w.stats.Transactions, int64(numTransactions)) + atomic.AddInt64(&w.stats.Events, int64(numEvents)) atomic.AddInt64(&w.stats.Spans, int64(w.spansInBuffer)) tracePayload := model.TracePayload{ HostName: w.hostName, Env: w.env, Traces: w.traces, - Transactions: w.transactions, + Transactions: w.events, } serialized, err := proto.Marshal(&tracePayload) @@ -257,7 +260,7 @@ func (w *TraceWriter) flush() { payload := newPayload(serialized, headers) - log.Debugf("flushing traces=%v transactions=%v", len(w.traces), len(w.transactions)) + log.Debugf("flushing traces=%v events=%v", len(w.traces), len(w.events)) w.sender.Send(payload) w.resetBuffer() } @@ -265,7 +268,7 @@ func (w *TraceWriter) flush() { func (w *TraceWriter) resetBuffer() { // Reset traces w.traces = w.traces[:0] - w.transactions = w.transactions[:0] + w.events = w.events[:0] w.spansInBuffer = 0 } @@ -276,7 +279,7 @@ func (w *TraceWriter) updateInfo() { // Load counters and reset them for the next flush twInfo.Payloads = atomic.SwapInt64(&w.stats.Payloads, 0) twInfo.Traces = atomic.SwapInt64(&w.stats.Traces, 0) - twInfo.Transactions = atomic.SwapInt64(&w.stats.Transactions, 0) + twInfo.Events = atomic.SwapInt64(&w.stats.Events, 0) twInfo.Spans = atomic.SwapInt64(&w.stats.Spans, 0) twInfo.Bytes = atomic.SwapInt64(&w.stats.Bytes, 0) twInfo.Retries = atomic.SwapInt64(&w.stats.Retries, 0) @@ -285,7 +288,7 @@ func (w *TraceWriter) updateInfo() { statsd.Client.Count("datadog.trace_agent.trace_writer.payloads", int64(twInfo.Payloads), nil, 1) statsd.Client.Count("datadog.trace_agent.trace_writer.traces", int64(twInfo.Traces), nil, 1) - statsd.Client.Count("datadog.trace_agent.trace_writer.transactions", int64(twInfo.Transactions), nil, 1) + statsd.Client.Count("datadog.trace_agent.trace_writer.events", int64(twInfo.Events), nil, 1) statsd.Client.Count("datadog.trace_agent.trace_writer.spans", int64(twInfo.Spans), nil, 1) statsd.Client.Count("datadog.trace_agent.trace_writer.bytes", int64(twInfo.Bytes), nil, 1) statsd.Client.Count("datadog.trace_agent.trace_writer.retries", int64(twInfo.Retries), nil, 1) diff --git a/writer/trace_writer_test.go b/writer/trace_writer_test.go index d4888cf58..8e4ecd5db 100644 --- a/writer/trace_writer_test.go +++ b/writer/trace_writer_test.go @@ -33,7 +33,7 @@ func TestTraceWriter(t *testing.T) { traceWriter.Start() // Send a few sampled traces through the writer - sampledTraces := []*SampledTrace{ + sampledTraces := []*TracePackage{ // These 2 should be grouped together in a single payload randomSampledTrace(1, 1), randomSampledTrace(1, 1), @@ -91,7 +91,7 @@ func TestTraceWriter(t *testing.T) { "Content-Encoding": "gzip", } assert.Len(receivedPayloads, 1, "We expected 1 payload") - assertPayloads(assert, traceWriter, expectedHeaders, []*SampledTrace{testSampledTrace}, + assertPayloads(assert, traceWriter, expectedHeaders, []*TracePackage{testSampledTrace}, testEndpoint.SuccessPayloads()) // Wrap up @@ -122,7 +122,7 @@ func TestTraceWriter(t *testing.T) { ) // Send a bunch of sampled traces that should go together in a single payload - payload1SampledTraces := []*SampledTrace{ + payload1SampledTraces := []*TracePackage{ randomSampledTrace(2, 0), randomSampledTrace(2, 0), randomSampledTrace(2, 0), @@ -137,7 +137,7 @@ func TestTraceWriter(t *testing.T) { } // Send a single trace that goes over the span limit - payload2SampledTraces := []*SampledTrace{ + payload2SampledTraces := []*TracePackage{ randomSampledTrace(20, 0), } expectedNumPayloads++ @@ -155,7 +155,7 @@ func TestTraceWriter(t *testing.T) { // Send a third payload with other 3 traces with an errored out endpoint testEndpoint.SetError(fmt.Errorf("non retriable error")) - payload3SampledTraces := []*SampledTrace{ + payload3SampledTraces := []*TracePackage{ randomSampledTrace(2, 0), randomSampledTrace(2, 0), randomSampledTrace(2, 0), @@ -178,7 +178,7 @@ func TestTraceWriter(t *testing.T) { err: fmt.Errorf("non retriable error"), endpoint: testEndpoint, }) - payload4SampledTraces := []*SampledTrace{ + payload4SampledTraces := []*TracePackage{ randomSampledTrace(2, 0), randomSampledTrace(2, 0), randomSampledTrace(2, 0), @@ -242,7 +242,7 @@ func TestTraceWriter(t *testing.T) { }) } -func calculateTracePayloadSize(sampledTraces []*SampledTrace) int64 { +func calculateTracePayloadSize(sampledTraces []*TracePackage) int64 { apiTraces := make([]*model.APITrace, len(sampledTraces)) for i, trace := range sampledTraces { @@ -275,18 +275,21 @@ func calculateTracePayloadSize(sampledTraces []*SampledTrace) int64 { } func assertPayloads(assert *assert.Assertions, traceWriter *TraceWriter, expectedHeaders map[string]string, - sampledTraces []*SampledTrace, payloads []*payload) { + sampledTraces []*TracePackage, payloads []*payload) { var expectedTraces []*model.Trace - var expectedTransactions []*model.Span + var expectedEvents []*model.APMEvent for _, sampledTrace := range sampledTraces { expectedTraces = append(expectedTraces, sampledTrace.Trace) - expectedTransactions = append(expectedTransactions, sampledTrace.Transactions...) + + for _, event := range sampledTrace.Events { + expectedEvents = append(expectedEvents, event) + } } var expectedTraceIdx int - var expectedTransactionIdx int + var expectedEventIdx int for _, payload := range payloads { assert.Equal(expectedHeaders, payload.headers, "Payload headers should match expectation") @@ -320,12 +323,12 @@ func assertPayloads(assert *assert.Assertions, traceWriter *TraceWriter, expecte for _, seenTransaction := range tracePayload.Transactions { numSpans++ - if !assert.True(proto.Equal(expectedTransactions[expectedTransactionIdx], seenTransaction), + if !assert.True(proto.Equal(expectedEvents[expectedEventIdx].Span, seenTransaction), "Unmarshalled transaction should match expectation at index %d", expectedTraceIdx) { return } - expectedTransactionIdx++ + expectedEventIdx++ } // If there's more than 1 trace or transaction in this payload, don't let it go over the limit. Otherwise, @@ -336,8 +339,8 @@ func assertPayloads(assert *assert.Assertions, traceWriter *TraceWriter, expecte } } -func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testutil.TestStatsClient) { - payloadChannel := make(chan *SampledTrace) +func testTraceWriter() (*TraceWriter, chan *TracePackage, *testEndpoint, *testutil.TestStatsClient) { + payloadChannel := make(chan *TracePackage) conf := &config.AgentConfig{ Hostname: testHostName, DefaultEnv: testEnv, @@ -352,15 +355,21 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut return traceWriter, payloadChannel, testEndpoint, testStatsClient } -func randomSampledTrace(numSpans, numTransactions int) *SampledTrace { - if numSpans < numTransactions { - panic("can't have more transactions than spans in a RandomSampledTrace") +func randomSampledTrace(numSpans, numEvents int) *TracePackage { + if numSpans < numEvents { + panic("can't have more events than spans in a RandomSampledTrace") } trace := testutil.GetTestTrace(1, numSpans, true)[0] - return &SampledTrace{ - Trace: &trace, - Transactions: trace[:numTransactions], + events := make([]*model.APMEvent, 0, numEvents) + + for _, span := range trace[:numEvents] { + events = append(events, &model.APMEvent{Span: span}) + } + + return &TracePackage{ + Trace: &trace, + Events: events, } }