Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

cmd/trace-agent: factor out receiver into "api" package #480

Merged
merged 1 commit into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ci:
# task used by CI
GOOS=windows go build ./cmd/trace-agent # ensure windows builds
go get -u github.com/golang/lint/golint/...
golint -set_exit_status=1 ./cmd/trace-agent ./filters ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil
golint -set_exit_status=1 ./cmd/trace-agent ./filters ./api ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil
go test -v ./...

windows:
Expand Down
70 changes: 35 additions & 35 deletions cmd/trace-agent/receiver.go → api/api.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package api

import (
"context"
Expand Down Expand Up @@ -31,56 +31,56 @@ const (
tagServiceHandler = "handler:services"
)

// APIVersion is a dumb way to version our collector handlers
type APIVersion string
// Version is a dumb way to version our collector handlers
type Version string

const (
// v01 DEPRECATED, FIXME[1.x]
// Traces: JSON, slice of spans
// Services: JSON, map[string]map[string][string]
v01 APIVersion = "v0.1"
v01 Version = "v0.1"
// v02 DEPRECATED, FIXME[1.x]
// Traces: JSON, slice of traces
// Services: JSON, map[string]map[string][string]
v02 APIVersion = "v0.2"
v02 Version = "v0.2"
// v03
// Traces: msgpack/JSON (Content-Type) slice of traces
// Services: msgpack/JSON, map[string]map[string][string]
v03 APIVersion = "v0.3"
v03 Version = "v0.3"
// v04
// Traces: msgpack/JSON (Content-Type) slice of traces + returns service sampling ratios
// Services: msgpack/JSON, map[string]map[string][string]
v04 APIVersion = "v0.4"
v04 Version = "v0.4"
)

// HTTPReceiver is a collector that uses HTTP protocol and just holds
// a chan where the spans received are sent one by one
type HTTPReceiver struct {
traces chan model.Trace
Stats *info.ReceiverStats
PreSampler *sampler.PreSampler
Out chan model.Trace

services chan model.ServicesMetadata
conf *config.AgentConfig
dynConf *config.DynamicConfig
server *http.Server

stats *info.ReceiverStats
preSampler *sampler.PreSampler

maxRequestBodyLength int64
debug bool
}

// NewHTTPReceiver returns a pointer to a new HTTPReceiver
func NewHTTPReceiver(
conf *config.AgentConfig, dynConf *config.DynamicConfig, traces chan model.Trace, services chan model.ServicesMetadata,
conf *config.AgentConfig, dynConf *config.DynamicConfig, out chan model.Trace, services chan model.ServicesMetadata,
) *HTTPReceiver {
// use buffered channels so that handlers are not waiting on downstream processing
return &HTTPReceiver{
conf: conf,
dynConf: dynConf,
stats: info.NewReceiverStats(),
preSampler: sampler.NewPreSampler(conf.PreSampleRate),
Stats: info.NewReceiverStats(),
PreSampler: sampler.NewPreSampler(conf.PreSampleRate),
Out: out,

traces: traces,
conf: conf,
dynConf: dynConf,
services: services,

maxRequestBodyLength: maxRequestBodyLength,
Expand Down Expand Up @@ -110,7 +110,7 @@ func (r *HTTPReceiver) Run() {
osutil.Exitf("%v", err)
}

go r.preSampler.Run()
go r.PreSampler.Run()

go func() {
defer watchdog.LogOnPanic()
Expand All @@ -125,7 +125,7 @@ func (r *HTTPReceiver) Listen(addr, logExtra string) error {
return fmt.Errorf("cannot listen on %s: %v", addr, err)
}

ln, err := NewRateLimitedListener(listener, r.conf.ConnectionLimit)
ln, err := newRateLimitedListener(listener, r.conf.ConnectionLimit)
if err != nil {
return fmt.Errorf("cannot create listener: %v", err)
}
Expand Down Expand Up @@ -167,7 +167,7 @@ func (r *HTTPReceiver) httpHandle(fn http.HandlerFunc) http.HandlerFunc {
}
}

func (r *HTTPReceiver) httpHandleWithVersion(v APIVersion, f func(APIVersion, http.ResponseWriter, *http.Request)) http.HandlerFunc {
func (r *HTTPReceiver) httpHandleWithVersion(v Version, f func(Version, http.ResponseWriter, *http.Request)) http.HandlerFunc {
return r.httpHandle(func(w http.ResponseWriter, req *http.Request) {
contentType := req.Header.Get("Content-Type")
if contentType == "application/msgpack" && (v == v01 || v == v02) {
Expand All @@ -181,7 +181,7 @@ func (r *HTTPReceiver) httpHandleWithVersion(v APIVersion, f func(APIVersion, ht
})
}

func (r *HTTPReceiver) replyTraces(v APIVersion, w http.ResponseWriter) {
func (r *HTTPReceiver) replyTraces(v Version, w http.ResponseWriter) {
switch v {
case v01:
fallthrough
Expand All @@ -197,8 +197,8 @@ func (r *HTTPReceiver) replyTraces(v APIVersion, w http.ResponseWriter) {
}

// handleTraces knows how to handle a bunch of traces
func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *http.Request) {
if !r.preSampler.Sample(req) {
func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http.Request) {
if !r.PreSampler.Sample(req) {
io.Copy(ioutil.Discard, req.Body)
HTTPOK(w)
return
Expand All @@ -221,7 +221,7 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht
}

// We get the address of the struct holding the stats associated to the tags
ts := r.stats.GetTagStats(tags)
ts := r.Stats.GetTagStats(tags)

bytesRead := req.Body.(*model.LimitedReader).Count
if bytesRead > 0 {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht
atomic.AddInt64(&ts.SpansDropped, int64(spans-len(normTrace)))

select {
case r.traces <- normTrace:
case r.Out <- normTrace:
// if our downstream consumer is slow, we drop the trace on the floor
// this is a safety net against us using too much memory
// when clients flood us
Expand All @@ -266,7 +266,7 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht
}

// handleServices handle a request with a list of several services
func (r *HTTPReceiver) handleServices(v APIVersion, w http.ResponseWriter, req *http.Request) {
func (r *HTTPReceiver) handleServices(v Version, w http.ResponseWriter, req *http.Request) {
var servicesMeta model.ServicesMetadata

contentType := req.Header.Get("Content-Type")
Expand All @@ -287,7 +287,7 @@ func (r *HTTPReceiver) handleServices(v APIVersion, w http.ResponseWriter, req *
}

// We get the address of the struct holding the stats associated to the tags
ts := r.stats.GetTagStats(tags)
ts := r.Stats.GetTagStats(tags)

atomic.AddInt64(&ts.ServicesReceived, int64(len(servicesMeta)))

Expand All @@ -308,13 +308,13 @@ func (r *HTTPReceiver) logStats() {
statsd.Client.Gauge("datadog.trace_agent.heartbeat", 1, []string{"version:" + info.Version}, 1)

// We update accStats with the new stats we collected
accStats.Acc(r.stats)
accStats.Acc(r.Stats)

// Publish the stats accumulated during the last flush
r.stats.Publish()
r.Stats.Publish()

// We reset the stats accumulated during the last 10s.
r.stats.Reset()
r.Stats.Reset()

if now.Sub(lastLog) >= time.Minute {
// We expose the stats accumulated to expvar
Expand All @@ -341,20 +341,20 @@ func (r *HTTPReceiver) Languages() string {
langs := make(map[string]bool)
str := []string{}

r.stats.RLock()
for tags := range r.stats.Stats {
r.Stats.RLock()
for tags := range r.Stats.Stats {
if _, ok := langs[tags.Lang]; !ok {
str = append(str, tags.Lang)
langs[tags.Lang] = true
}
}
r.stats.RUnlock()
r.Stats.RUnlock()

sort.Strings(str)
return strings.Join(str, "|")
}

func getTraces(v APIVersion, w http.ResponseWriter, req *http.Request) (model.Traces, bool) {
func getTraces(v Version, w http.ResponseWriter, req *http.Request) (model.Traces, bool) {
var traces model.Traces
contentType := req.Header.Get("Content-Type")

Expand Down Expand Up @@ -395,7 +395,7 @@ func getTraces(v APIVersion, w http.ResponseWriter, req *http.Request) (model.Tr
return traces, true
}

func decodeReceiverPayload(r io.Reader, dest msgp.Decodable, v APIVersion, contentType string) error {
func decodeReceiverPayload(r io.Reader, dest msgp.Decodable, v Version, contentType string) error {
switch contentType {
case "application/msgpack":
return msgp.Decode(r, dest)
Expand Down
Loading