Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
agent: remove unused code and move some files out (#545)
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbr authored Dec 7, 2018
1 parent 60eeae8 commit 8ff04d7
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 163 deletions.
6 changes: 3 additions & 3 deletions cmd/trace-agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewConcentrator(aggregators []string, bsize int64, out chan []agent.StatsBu
buckets: make(map[int64]*agent.StatsRawBucket),
// At start, only allow stats for the current time bucket. Ensure we don't
// override buckets which could have been sent before an Agent restart.
oldestTs: alignTs(agent.Now(), bsize),
oldestTs: alignTs(time.Now().UnixNano(), bsize),
// TODO: Move to configuration.
bufferLen: defaultBufferLen,

Expand Down Expand Up @@ -103,7 +103,7 @@ func (c *Concentrator) Stop() {

// Add appends to the proper stats bucket this trace's statistics
func (c *Concentrator) Add(t agent.ProcessedTrace) {
c.addNow(t, agent.Now())
c.addNow(t, time.Now().UnixNano())
}

func (c *Concentrator) addNow(t agent.ProcessedTrace, now int64) {
Expand Down Expand Up @@ -136,7 +136,7 @@ func (c *Concentrator) addNow(t agent.ProcessedTrace, now int64) {

// Flush deletes and returns complete statistic buckets
func (c *Concentrator) Flush() []agent.StatsBucket {
return c.flushNow(agent.Now())
return c.flushNow(time.Now().UnixNano())
}

func (c *Concentrator) flushNow(now int64) []agent.StatsBucket {
Expand Down
10 changes: 5 additions & 5 deletions cmd/trace-agent/concentrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func getTsInBucket(alignedNow int64, bsize int64, offset int64) int64 {
// testSpan avoids typo and inconsistency in test spans (typical pitfall: duration, start time,
// and end time are aligned, and end time is the one that needs to be aligned
func testSpan(spanID uint64, parentID uint64, duration, offset int64, service, resource string, err int32) *agent.Span {
now := agent.Now()
now := time.Now().UnixNano()
alignedNow := now - now%testBucketInterval

return &agent.Span{
Expand All @@ -47,7 +47,7 @@ func TestConcentratorOldestTs(t *testing.T) {
assert := assert.New(t)
statsChan := make(chan []agent.StatsBucket)

now := agent.Now()
now := time.Now().UnixNano()

// Build that simply have spans spread over time windows.
trace := agent.Trace{
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestConcentratorStatsTotals(t *testing.T) {
statsChan := make(chan []agent.StatsBucket)
c := NewConcentrator([]string{}, testBucketInterval, statsChan)

now := agent.Now()
now := time.Now().UnixNano()
alignedNow := alignTs(now, c.bsize)

// update oldestTs as it running for quite some time, to avoid the fact that at startup
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestConcentratorStatsCounts(t *testing.T) {
statsChan := make(chan []agent.StatsBucket)
c := NewConcentrator([]string{}, testBucketInterval, statsChan)

now := agent.Now()
now := time.Now().UnixNano()
alignedNow := alignTs(now, c.bsize)

// update oldestTs as it running for quite some time, to avoid the fact that at startup
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestConcentratorSublayersStatsCounts(t *testing.T) {
statsChan := make(chan []agent.StatsBucket)
c := NewConcentrator([]string{}, testBucketInterval, statsChan)

now := agent.Now()
now := time.Now().UnixNano()
alignedNow := now - now%c.bsize

trace := agent.Trace{
Expand Down
18 changes: 0 additions & 18 deletions internal/agent/compat.go

This file was deleted.

108 changes: 0 additions & 108 deletions internal/agent/payload.go

This file was deleted.

11 changes: 0 additions & 11 deletions internal/agent/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package agent

import (
"encoding/json"
"net/http"
)

//go:generate msgp -marshal=false
Expand Down Expand Up @@ -30,13 +29,3 @@ func (s1 ServicesMetadata) Merge(s2 ServicesMetadata) {
func EncodeServicesPayload(sm ServicesMetadata) ([]byte, error) {
return json.Marshal(sm)
}

// SetServicesPayloadHeaders takes a Header struct and adds the appropriate
// header keys for the API to be able to decode the services metadata.
func SetServicesPayloadHeaders(h http.Header) {
switch GlobalAgentPayloadVersion {
case AgentPayloadV01:
h.Set("Content-Type", "application/json")
default:
}
}
10 changes: 0 additions & 10 deletions internal/agent/time.go

This file was deleted.

21 changes: 17 additions & 4 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (r *HTTPReceiver) Stop() error {

func (r *HTTPReceiver) httpHandle(fn http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
req.Body = agent.NewLimitedReader(req.Body, r.maxRequestBodyLength)
req.Body = NewLimitedReader(req.Body, r.maxRequestBodyLength)
defer req.Body.Close()

fn(w, req)
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http.
// We get the address of the struct holding the stats associated to the tags
ts := r.Stats.GetTagStats(tags)

bytesRead := req.Body.(*agent.LimitedReader).Count
bytesRead := req.Body.(*LimitedReader).Count
if bytesRead > 0 {
atomic.AddInt64(&ts.TracesBytes, int64(bytesRead))
}
Expand Down Expand Up @@ -291,7 +291,7 @@ func (r *HTTPReceiver) handleServices(v Version, w http.ResponseWriter, req *htt

atomic.AddInt64(&ts.ServicesReceived, int64(len(servicesMeta)))

bytesRead := req.Body.(*agent.LimitedReader).Count
bytesRead := req.Body.(*LimitedReader).Count
if bytesRead > 0 {
atomic.AddInt64(&ts.ServicesBytes, int64(bytesRead))
}
Expand Down Expand Up @@ -376,7 +376,7 @@ func getTraces(v Version, w http.ResponseWriter, req *http.Request) (agent.Trace
HTTPDecodingError(err, []string{tagTraceHandler, fmt.Sprintf("v:%s", v)}, w)
return nil, false
}
traces = agent.TracesFromSpans(spans)
traces = tracesFromSpans(spans)
case v02:
fallthrough
case v03:
Expand Down Expand Up @@ -411,3 +411,16 @@ func decodeReceiverPayload(r io.Reader, dest msgp.Decodable, v Version, contentT
panic(fmt.Sprintf("unhandled content type %q", contentType))
}
}

func tracesFromSpans(spans []agent.Span) agent.Traces {
traces := agent.Traces{}
byID := make(map[uint64][]*agent.Span)
for _, s := range spans {
byID[s.TraceID] = append(byID[s.TraceID], &s)
}
for _, t := range byID {
traces = append(traces, t)
}

return traces
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package agent
package api

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package agent
package api

import (
"bytes"
Expand Down
3 changes: 1 addition & 2 deletions internal/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"net/http"

"github.com/DataDog/datadog-trace-agent/internal/agent"
"github.com/DataDog/datadog-trace-agent/internal/sampler"
"github.com/DataDog/datadog-trace-agent/internal/statsd"
)
Expand Down Expand Up @@ -35,7 +34,7 @@ func HTTPDecodingError(err error, tags []string, w http.ResponseWriter) {
errtag := "decoding-error"
msg := err.Error()

if err == agent.ErrLimitedReaderLimitReached {
if err == ErrLimitedReaderLimitReached {
status = http.StatusRequestEntityTooLarge
errtag := "payload-too-large"
msg = errtag
Expand Down

0 comments on commit 8ff04d7

Please sign in to comment.