diff --git a/cmd/trace-agent/concentrator.go b/cmd/trace-agent/concentrator.go index 0ee302d1d..41c133f2a 100644 --- a/cmd/trace-agent/concentrator.go +++ b/cmd/trace-agent/concentrator.go @@ -50,7 +50,7 @@ func NewConcentrator(aggregators []string, bsize int64, out chan []agent.StatsBu buckets: make(map[int64]*agent.StatsRawBucket), // At start, only allow stats for the current time bucket. Ensure we don't // override buckets which could have been sent before an Agent restart. - oldestTs: alignTs(agent.Now(), bsize), + oldestTs: alignTs(time.Now().UnixNano(), bsize), // TODO: Move to configuration. bufferLen: defaultBufferLen, @@ -103,7 +103,7 @@ func (c *Concentrator) Stop() { // Add appends to the proper stats bucket this trace's statistics func (c *Concentrator) Add(t agent.ProcessedTrace) { - c.addNow(t, agent.Now()) + c.addNow(t, time.Now().UnixNano()) } func (c *Concentrator) addNow(t agent.ProcessedTrace, now int64) { @@ -136,7 +136,7 @@ func (c *Concentrator) addNow(t agent.ProcessedTrace, now int64) { // Flush deletes and returns complete statistic buckets func (c *Concentrator) Flush() []agent.StatsBucket { - return c.flushNow(agent.Now()) + return c.flushNow(time.Now().UnixNano()) } func (c *Concentrator) flushNow(now int64) []agent.StatsBucket { diff --git a/cmd/trace-agent/concentrator_test.go b/cmd/trace-agent/concentrator_test.go index 26d8f51a1..8b825563e 100644 --- a/cmd/trace-agent/concentrator_test.go +++ b/cmd/trace-agent/concentrator_test.go @@ -25,7 +25,7 @@ func getTsInBucket(alignedNow int64, bsize int64, offset int64) int64 { // testSpan avoids typo and inconsistency in test spans (typical pitfall: duration, start time, // and end time are aligned, and end time is the one that needs to be aligned func testSpan(spanID uint64, parentID uint64, duration, offset int64, service, resource string, err int32) *agent.Span { - now := agent.Now() + now := time.Now().UnixNano() alignedNow := now - now%testBucketInterval return &agent.Span{ @@ -47,7 +47,7 @@ func TestConcentratorOldestTs(t *testing.T) { assert := assert.New(t) statsChan := make(chan []agent.StatsBucket) - now := agent.Now() + now := time.Now().UnixNano() // Build that simply have spans spread over time windows. trace := agent.Trace{ @@ -153,7 +153,7 @@ func TestConcentratorStatsTotals(t *testing.T) { statsChan := make(chan []agent.StatsBucket) c := NewConcentrator([]string{}, testBucketInterval, statsChan) - now := agent.Now() + now := time.Now().UnixNano() alignedNow := alignTs(now, c.bsize) // update oldestTs as it running for quite some time, to avoid the fact that at startup @@ -212,7 +212,7 @@ func TestConcentratorStatsCounts(t *testing.T) { statsChan := make(chan []agent.StatsBucket) c := NewConcentrator([]string{}, testBucketInterval, statsChan) - now := agent.Now() + now := time.Now().UnixNano() alignedNow := alignTs(now, c.bsize) // update oldestTs as it running for quite some time, to avoid the fact that at startup @@ -336,7 +336,7 @@ func TestConcentratorSublayersStatsCounts(t *testing.T) { statsChan := make(chan []agent.StatsBucket) c := NewConcentrator([]string{}, testBucketInterval, statsChan) - now := agent.Now() + now := time.Now().UnixNano() alignedNow := now - now%c.bsize trace := agent.Trace{ diff --git a/internal/agent/compat.go b/internal/agent/compat.go deleted file mode 100644 index 9f419d193..000000000 --- a/internal/agent/compat.go +++ /dev/null @@ -1,18 +0,0 @@ -package agent - -// TracesFromSpans transforms a slice of spans into a slice of traces -// grouping them by trace IDs -// FIXME[1.x] this can be removed as we get pre-assembled traces from -// clients -func TracesFromSpans(spans []Span) Traces { - traces := Traces{} - byID := make(map[uint64][]*Span) - for _, s := range spans { - byID[s.TraceID] = append(byID[s.TraceID], &s) - } - for _, t := range byID { - traces = append(traces, t) - } - - return traces -} diff --git a/internal/agent/payload.go b/internal/agent/payload.go deleted file mode 100644 index 49f9fffff..000000000 --- a/internal/agent/payload.go +++ /dev/null @@ -1,108 +0,0 @@ -package agent - -import ( - "bytes" - "compress/gzip" - "encoding/json" - "errors" - "fmt" - "net/http" - "sync" -) - -// AgentPayload is the main payload to carry data that has been -// pre-processed to the Datadog mothership. -// This is a legacy payload format, used in API v0.1. -type AgentPayload struct { - HostName string `json:"hostname"` // the host name that will be resolved by the API - Env string `json:"env"` // the default environment this agent uses - Traces []Trace `json:"traces"` // the traces we sampled - Stats []StatsBucket `json:"stats"` // the statistics we pre-computed - - // private - mu sync.RWMutex - extras map[string]string -} - -// IsEmpty tells if a payload contains data. If not, it's useless -// to flush it. -func (p *AgentPayload) IsEmpty() bool { - return len(p.Stats) == 0 && len(p.Traces) == 0 -} - -// Extras returns this payloads extra metadata fields -func (p *AgentPayload) Extras() map[string]string { - p.mu.RLock() - defer p.mu.RUnlock() - return p.extras -} - -// SetExtra sets the given metadata field on a payload -func (p *AgentPayload) SetExtra(key, val string) { - p.mu.Lock() - defer p.mu.Unlock() - - if p.extras == nil { - p.extras = make(map[string]string) - } - - p.extras[key] = val -} - -// AgentPayloadVersion is the version the agent agrees to with -// the API so that they can encode/decode the data accordingly -type AgentPayloadVersion string - -const ( - // AgentPayloadV01 is a simple json'd/gzip'd dump of the payload - AgentPayloadV01 AgentPayloadVersion = "v0.1" -) - -var ( - // GlobalAgentPayloadVersion is a default that will be used - // in all the AgentPayload method. Override for special cases. - GlobalAgentPayloadVersion = AgentPayloadV01 -) - -// EncodeAgentPayload will return a slice of bytes representing the -// payload (according to GlobalAgentPayloadVersion) -func EncodeAgentPayload(p *AgentPayload) ([]byte, error) { - var b bytes.Buffer - var err error - - switch GlobalAgentPayloadVersion { - case AgentPayloadV01: - gz, err := gzip.NewWriterLevel(&b, gzip.BestSpeed) - if err != nil { - return nil, err - } - err = json.NewEncoder(gz).Encode(p) - gz.Close() - default: - err = errors.New("unknown payload version") - } - - return b.Bytes(), err -} - -// AgentPayloadAPIPath returns the path (after the first slash) to which -// the payload should be sent to be understood by the API given the -// configured payload version. -func AgentPayloadAPIPath() string { - return fmt.Sprintf("/api/%s/collector", GlobalAgentPayloadVersion) -} - -// SetAgentPayloadHeaders takes a Header struct and adds the appropriate -// header keys for the API to be able to decode the data. -func SetAgentPayloadHeaders(h http.Header, extras map[string]string) { - switch GlobalAgentPayloadVersion { - case AgentPayloadV01: - h.Set("Content-Type", "application/json") - h.Set("Content-Encoding", "gzip") - - for key, value := range extras { - h.Set(key, value) - } - default: - } -} diff --git a/internal/agent/services.go b/internal/agent/services.go index c9de41339..d977375e8 100644 --- a/internal/agent/services.go +++ b/internal/agent/services.go @@ -2,7 +2,6 @@ package agent import ( "encoding/json" - "net/http" ) //go:generate msgp -marshal=false @@ -30,13 +29,3 @@ func (s1 ServicesMetadata) Merge(s2 ServicesMetadata) { func EncodeServicesPayload(sm ServicesMetadata) ([]byte, error) { return json.Marshal(sm) } - -// SetServicesPayloadHeaders takes a Header struct and adds the appropriate -// header keys for the API to be able to decode the services metadata. -func SetServicesPayloadHeaders(h http.Header) { - switch GlobalAgentPayloadVersion { - case AgentPayloadV01: - h.Set("Content-Type", "application/json") - default: - } -} diff --git a/internal/agent/time.go b/internal/agent/time.go deleted file mode 100644 index 4f93b0baa..000000000 --- a/internal/agent/time.go +++ /dev/null @@ -1,10 +0,0 @@ -package agent - -import ( - "time" -) - -// Now returns a timestamp in our nanoseconds default format -func Now() int64 { - return time.Now().UnixNano() -} diff --git a/internal/api/api.go b/internal/api/api.go index fe38d24d2..89b87d044 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -160,7 +160,7 @@ func (r *HTTPReceiver) Stop() error { func (r *HTTPReceiver) httpHandle(fn http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { - req.Body = agent.NewLimitedReader(req.Body, r.maxRequestBodyLength) + req.Body = NewLimitedReader(req.Body, r.maxRequestBodyLength) defer req.Body.Close() fn(w, req) @@ -223,7 +223,7 @@ func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http. // We get the address of the struct holding the stats associated to the tags ts := r.Stats.GetTagStats(tags) - bytesRead := req.Body.(*agent.LimitedReader).Count + bytesRead := req.Body.(*LimitedReader).Count if bytesRead > 0 { atomic.AddInt64(&ts.TracesBytes, int64(bytesRead)) } @@ -291,7 +291,7 @@ func (r *HTTPReceiver) handleServices(v Version, w http.ResponseWriter, req *htt atomic.AddInt64(&ts.ServicesReceived, int64(len(servicesMeta))) - bytesRead := req.Body.(*agent.LimitedReader).Count + bytesRead := req.Body.(*LimitedReader).Count if bytesRead > 0 { atomic.AddInt64(&ts.ServicesBytes, int64(bytesRead)) } @@ -376,7 +376,7 @@ func getTraces(v Version, w http.ResponseWriter, req *http.Request) (agent.Trace HTTPDecodingError(err, []string{tagTraceHandler, fmt.Sprintf("v:%s", v)}, w) return nil, false } - traces = agent.TracesFromSpans(spans) + traces = tracesFromSpans(spans) case v02: fallthrough case v03: @@ -411,3 +411,16 @@ func decodeReceiverPayload(r io.Reader, dest msgp.Decodable, v Version, contentT panic(fmt.Sprintf("unhandled content type %q", contentType)) } } + +func tracesFromSpans(spans []agent.Span) agent.Traces { + traces := agent.Traces{} + byID := make(map[uint64][]*agent.Span) + for _, s := range spans { + byID[s.TraceID] = append(byID[s.TraceID], &s) + } + for _, t := range byID { + traces = append(traces, t) + } + + return traces +} diff --git a/internal/agent/limited_reader.go b/internal/api/limited_reader.go similarity index 98% rename from internal/agent/limited_reader.go rename to internal/api/limited_reader.go index feb3c155c..c647ac323 100644 --- a/internal/agent/limited_reader.go +++ b/internal/api/limited_reader.go @@ -1,4 +1,4 @@ -package agent +package api import ( "errors" diff --git a/internal/agent/limited_reader_test.go b/internal/api/limited_reader_test.go similarity index 99% rename from internal/agent/limited_reader_test.go rename to internal/api/limited_reader_test.go index d347469a3..c3a2d8299 100644 --- a/internal/agent/limited_reader_test.go +++ b/internal/api/limited_reader_test.go @@ -1,4 +1,4 @@ -package agent +package api import ( "bytes" diff --git a/internal/api/responses.go b/internal/api/responses.go index 50a045364..829242235 100644 --- a/internal/api/responses.go +++ b/internal/api/responses.go @@ -6,7 +6,6 @@ import ( "io" "net/http" - "github.com/DataDog/datadog-trace-agent/internal/agent" "github.com/DataDog/datadog-trace-agent/internal/sampler" "github.com/DataDog/datadog-trace-agent/internal/statsd" ) @@ -35,7 +34,7 @@ func HTTPDecodingError(err error, tags []string, w http.ResponseWriter) { errtag := "decoding-error" msg := err.Error() - if err == agent.ErrLimitedReaderLimitReached { + if err == ErrLimitedReaderLimitReached { status = http.StatusRequestEntityTooLarge errtag := "payload-too-large" msg = errtag