diff --git a/Makefile b/Makefile index 12af28f24..b7f79b0d1 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ ci: # task used by CI GOOS=windows go build ./cmd/trace-agent # ensure windows builds go get -u github.com/golang/lint/golint/... - golint -set_exit_status=1 ./cmd/trace-agent ./filters ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil + golint -set_exit_status=1 ./cmd/trace-agent ./filters ./api ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil go test -v ./... windows: diff --git a/cmd/trace-agent/receiver.go b/api/api.go similarity index 87% rename from cmd/trace-agent/receiver.go rename to api/api.go index 9c5bb5127..8781d51f7 100644 --- a/cmd/trace-agent/receiver.go +++ b/api/api.go @@ -1,4 +1,4 @@ -package main +package api import ( "context" @@ -31,56 +31,56 @@ const ( tagServiceHandler = "handler:services" ) -// APIVersion is a dumb way to version our collector handlers -type APIVersion string +// Version is a dumb way to version our collector handlers +type Version string const ( // v01 DEPRECATED, FIXME[1.x] // Traces: JSON, slice of spans // Services: JSON, map[string]map[string][string] - v01 APIVersion = "v0.1" + v01 Version = "v0.1" // v02 DEPRECATED, FIXME[1.x] // Traces: JSON, slice of traces // Services: JSON, map[string]map[string][string] - v02 APIVersion = "v0.2" + v02 Version = "v0.2" // v03 // Traces: msgpack/JSON (Content-Type) slice of traces // Services: msgpack/JSON, map[string]map[string][string] - v03 APIVersion = "v0.3" + v03 Version = "v0.3" // v04 // Traces: msgpack/JSON (Content-Type) slice of traces + returns service sampling ratios // Services: msgpack/JSON, map[string]map[string][string] - v04 APIVersion = "v0.4" + v04 Version = "v0.4" ) // HTTPReceiver is a collector that uses HTTP protocol and just holds // a chan where the spans received are sent one by one type HTTPReceiver struct { - traces chan model.Trace + Stats *info.ReceiverStats + PreSampler *sampler.PreSampler + Out chan model.Trace + services chan model.ServicesMetadata conf *config.AgentConfig dynConf *config.DynamicConfig server *http.Server - stats *info.ReceiverStats - preSampler *sampler.PreSampler - maxRequestBodyLength int64 debug bool } // NewHTTPReceiver returns a pointer to a new HTTPReceiver func NewHTTPReceiver( - conf *config.AgentConfig, dynConf *config.DynamicConfig, traces chan model.Trace, services chan model.ServicesMetadata, + conf *config.AgentConfig, dynConf *config.DynamicConfig, out chan model.Trace, services chan model.ServicesMetadata, ) *HTTPReceiver { // use buffered channels so that handlers are not waiting on downstream processing return &HTTPReceiver{ - conf: conf, - dynConf: dynConf, - stats: info.NewReceiverStats(), - preSampler: sampler.NewPreSampler(conf.PreSampleRate), + Stats: info.NewReceiverStats(), + PreSampler: sampler.NewPreSampler(conf.PreSampleRate), + Out: out, - traces: traces, + conf: conf, + dynConf: dynConf, services: services, maxRequestBodyLength: maxRequestBodyLength, @@ -110,7 +110,7 @@ func (r *HTTPReceiver) Run() { osutil.Exitf("%v", err) } - go r.preSampler.Run() + go r.PreSampler.Run() go func() { defer watchdog.LogOnPanic() @@ -125,7 +125,7 @@ func (r *HTTPReceiver) Listen(addr, logExtra string) error { return fmt.Errorf("cannot listen on %s: %v", addr, err) } - ln, err := NewRateLimitedListener(listener, r.conf.ConnectionLimit) + ln, err := newRateLimitedListener(listener, r.conf.ConnectionLimit) if err != nil { return fmt.Errorf("cannot create listener: %v", err) } @@ -167,7 +167,7 @@ func (r *HTTPReceiver) httpHandle(fn http.HandlerFunc) http.HandlerFunc { } } -func (r *HTTPReceiver) httpHandleWithVersion(v APIVersion, f func(APIVersion, http.ResponseWriter, *http.Request)) http.HandlerFunc { +func (r *HTTPReceiver) httpHandleWithVersion(v Version, f func(Version, http.ResponseWriter, *http.Request)) http.HandlerFunc { return r.httpHandle(func(w http.ResponseWriter, req *http.Request) { contentType := req.Header.Get("Content-Type") if contentType == "application/msgpack" && (v == v01 || v == v02) { @@ -181,7 +181,7 @@ func (r *HTTPReceiver) httpHandleWithVersion(v APIVersion, f func(APIVersion, ht }) } -func (r *HTTPReceiver) replyTraces(v APIVersion, w http.ResponseWriter) { +func (r *HTTPReceiver) replyTraces(v Version, w http.ResponseWriter) { switch v { case v01: fallthrough @@ -197,8 +197,8 @@ func (r *HTTPReceiver) replyTraces(v APIVersion, w http.ResponseWriter) { } // handleTraces knows how to handle a bunch of traces -func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *http.Request) { - if !r.preSampler.Sample(req) { +func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http.Request) { + if !r.PreSampler.Sample(req) { io.Copy(ioutil.Discard, req.Body) HTTPOK(w) return @@ -221,7 +221,7 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht } // We get the address of the struct holding the stats associated to the tags - ts := r.stats.GetTagStats(tags) + ts := r.Stats.GetTagStats(tags) bytesRead := req.Body.(*model.LimitedReader).Count if bytesRead > 0 { @@ -251,7 +251,7 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht atomic.AddInt64(&ts.SpansDropped, int64(spans-len(normTrace))) select { - case r.traces <- normTrace: + case r.Out <- normTrace: // if our downstream consumer is slow, we drop the trace on the floor // this is a safety net against us using too much memory // when clients flood us @@ -266,7 +266,7 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht } // handleServices handle a request with a list of several services -func (r *HTTPReceiver) handleServices(v APIVersion, w http.ResponseWriter, req *http.Request) { +func (r *HTTPReceiver) handleServices(v Version, w http.ResponseWriter, req *http.Request) { var servicesMeta model.ServicesMetadata contentType := req.Header.Get("Content-Type") @@ -287,7 +287,7 @@ func (r *HTTPReceiver) handleServices(v APIVersion, w http.ResponseWriter, req * } // We get the address of the struct holding the stats associated to the tags - ts := r.stats.GetTagStats(tags) + ts := r.Stats.GetTagStats(tags) atomic.AddInt64(&ts.ServicesReceived, int64(len(servicesMeta))) @@ -308,13 +308,13 @@ func (r *HTTPReceiver) logStats() { statsd.Client.Gauge("datadog.trace_agent.heartbeat", 1, []string{"version:" + info.Version}, 1) // We update accStats with the new stats we collected - accStats.Acc(r.stats) + accStats.Acc(r.Stats) // Publish the stats accumulated during the last flush - r.stats.Publish() + r.Stats.Publish() // We reset the stats accumulated during the last 10s. - r.stats.Reset() + r.Stats.Reset() if now.Sub(lastLog) >= time.Minute { // We expose the stats accumulated to expvar @@ -341,20 +341,20 @@ func (r *HTTPReceiver) Languages() string { langs := make(map[string]bool) str := []string{} - r.stats.RLock() - for tags := range r.stats.Stats { + r.Stats.RLock() + for tags := range r.Stats.Stats { if _, ok := langs[tags.Lang]; !ok { str = append(str, tags.Lang) langs[tags.Lang] = true } } - r.stats.RUnlock() + r.Stats.RUnlock() sort.Strings(str) return strings.Join(str, "|") } -func getTraces(v APIVersion, w http.ResponseWriter, req *http.Request) (model.Traces, bool) { +func getTraces(v Version, w http.ResponseWriter, req *http.Request) (model.Traces, bool) { var traces model.Traces contentType := req.Header.Get("Content-Type") @@ -395,7 +395,7 @@ func getTraces(v APIVersion, w http.ResponseWriter, req *http.Request) (model.Tr return traces, true } -func decodeReceiverPayload(r io.Reader, dest msgp.Decodable, v APIVersion, contentType string) error { +func decodeReceiverPayload(r io.Reader, dest msgp.Decodable, v Version, contentType string) error { switch contentType { case "application/msgpack": return msgp.Decode(r, dest) diff --git a/cmd/trace-agent/receiver_test.go b/api/api_test.go similarity index 87% rename from cmd/trace-agent/receiver_test.go rename to api/api_test.go index ddf1c398e..65e1dcb7f 100644 --- a/cmd/trace-agent/receiver_test.go +++ b/api/api_test.go @@ -1,4 +1,4 @@ -package main +package api import ( "bytes" @@ -33,7 +33,7 @@ var headerFields = map[string]string{ "tracer_version": "Datadog-Meta-Tracer-Version", } -func NewTestReceiverFromConfig(conf *config.AgentConfig) *HTTPReceiver { +func newTestReceiverFromConfig(conf *config.AgentConfig) *HTTPReceiver { dynConf := config.NewDynamicConfig() rawTraceChan := make(chan model.Trace, 5000) @@ -43,7 +43,7 @@ func NewTestReceiverFromConfig(conf *config.AgentConfig) *HTTPReceiver { return receiver } -func NewTestReceiverConfig() *config.AgentConfig { +func newTestReceiverConfig() *config.AgentConfig { conf := config.New() conf.APIKey = "test" @@ -57,8 +57,8 @@ func TestReceiverRequestBodyLength(t *testing.T) { defaultMux := http.DefaultServeMux http.DefaultServeMux = http.NewServeMux() - conf := NewTestReceiverConfig() - receiver := NewTestReceiverFromConfig(conf) + conf := newTestReceiverConfig() + receiver := newTestReceiverFromConfig(conf) receiver.maxRequestBodyLength = 2 go receiver.Run() @@ -105,16 +105,16 @@ func TestReceiverRequestBodyLength(t *testing.T) { func TestLegacyReceiver(t *testing.T) { // testing traces without content-type in agent endpoints, it should use JSON decoding assert := assert.New(t) - conf := NewTestReceiverConfig() + conf := newTestReceiverConfig() testCases := []struct { name string r *HTTPReceiver - apiVersion APIVersion + apiVersion Version contentType string traces model.Trace }{ - {"v01 with empty content-type", NewTestReceiverFromConfig(conf), v01, "", model.Trace{testutil.GetTestSpan()}}, - {"v01 with application/json", NewTestReceiverFromConfig(conf), v01, "application/json", model.Trace{testutil.GetTestSpan()}}, + {"v01 with empty content-type", newTestReceiverFromConfig(conf), v01, "", model.Trace{testutil.GetTestSpan()}}, + {"v01 with application/json", newTestReceiverFromConfig(conf), v01, "application/json", model.Trace{testutil.GetTestSpan()}}, } for _, tc := range testCases { @@ -138,7 +138,7 @@ func TestLegacyReceiver(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.traces: + case rt := <-tc.r.Out: assert.Len(rt, 1) span := rt[0] assert.Equal(uint64(42), span.TraceID) @@ -161,23 +161,23 @@ func TestLegacyReceiver(t *testing.T) { func TestReceiverJSONDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use JSON decoding assert := assert.New(t) - conf := NewTestReceiverConfig() + conf := newTestReceiverConfig() testCases := []struct { name string r *HTTPReceiver - apiVersion APIVersion + apiVersion Version contentType string traces []model.Trace }{ - {"v02 with empty content-type", NewTestReceiverFromConfig(conf), v02, "", testutil.GetTestTrace(1, 1, false)}, - {"v03 with empty content-type", NewTestReceiverFromConfig(conf), v03, "", testutil.GetTestTrace(1, 1, false)}, - {"v04 with empty content-type", NewTestReceiverFromConfig(conf), v04, "", testutil.GetTestTrace(1, 1, false)}, - {"v02 with application/json", NewTestReceiverFromConfig(conf), v02, "application/json", testutil.GetTestTrace(1, 1, false)}, - {"v03 with application/json", NewTestReceiverFromConfig(conf), v03, "application/json", testutil.GetTestTrace(1, 1, false)}, - {"v04 with application/json", NewTestReceiverFromConfig(conf), v04, "application/json", testutil.GetTestTrace(1, 1, false)}, - {"v02 with text/json", NewTestReceiverFromConfig(conf), v02, "text/json", testutil.GetTestTrace(1, 1, false)}, - {"v03 with text/json", NewTestReceiverFromConfig(conf), v03, "text/json", testutil.GetTestTrace(1, 1, false)}, - {"v04 with text/json", NewTestReceiverFromConfig(conf), v04, "text/json", testutil.GetTestTrace(1, 1, false)}, + {"v02 with empty content-type", newTestReceiverFromConfig(conf), v02, "", testutil.GetTestTrace(1, 1, false)}, + {"v03 with empty content-type", newTestReceiverFromConfig(conf), v03, "", testutil.GetTestTrace(1, 1, false)}, + {"v04 with empty content-type", newTestReceiverFromConfig(conf), v04, "", testutil.GetTestTrace(1, 1, false)}, + {"v02 with application/json", newTestReceiverFromConfig(conf), v02, "application/json", testutil.GetTestTrace(1, 1, false)}, + {"v03 with application/json", newTestReceiverFromConfig(conf), v03, "application/json", testutil.GetTestTrace(1, 1, false)}, + {"v04 with application/json", newTestReceiverFromConfig(conf), v04, "application/json", testutil.GetTestTrace(1, 1, false)}, + {"v02 with text/json", newTestReceiverFromConfig(conf), v02, "text/json", testutil.GetTestTrace(1, 1, false)}, + {"v03 with text/json", newTestReceiverFromConfig(conf), v03, "text/json", testutil.GetTestTrace(1, 1, false)}, + {"v04 with text/json", newTestReceiverFromConfig(conf), v04, "text/json", testutil.GetTestTrace(1, 1, false)}, } for _, tc := range testCases { @@ -201,7 +201,7 @@ func TestReceiverJSONDecoder(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.traces: + case rt := <-tc.r.Out: assert.Len(rt, 1) span := rt[0] assert.Equal(uint64(42), span.TraceID) @@ -225,18 +225,18 @@ func TestReceiverMsgpackDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use Msgpack decoding // or it should raise a 415 Unsupported media type assert := assert.New(t) - conf := NewTestReceiverConfig() + conf := newTestReceiverConfig() testCases := []struct { name string r *HTTPReceiver - apiVersion APIVersion + apiVersion Version contentType string traces model.Traces }{ - {"v01 with application/msgpack", NewTestReceiverFromConfig(conf), v01, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, - {"v02 with application/msgpack", NewTestReceiverFromConfig(conf), v02, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, - {"v03 with application/msgpack", NewTestReceiverFromConfig(conf), v03, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, - {"v04 with application/msgpack", NewTestReceiverFromConfig(conf), v04, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, + {"v01 with application/msgpack", newTestReceiverFromConfig(conf), v01, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, + {"v02 with application/msgpack", newTestReceiverFromConfig(conf), v02, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, + {"v03 with application/msgpack", newTestReceiverFromConfig(conf), v03, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, + {"v04 with application/msgpack", newTestReceiverFromConfig(conf), v04, "application/msgpack", testutil.GetTestTrace(1, 1, false)}, } for _, tc := range testCases { @@ -268,7 +268,7 @@ func TestReceiverMsgpackDecoder(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.traces: + case rt := <-tc.r.Out: assert.Len(rt, 1) span := rt[0] assert.Equal(uint64(42), span.TraceID) @@ -290,7 +290,7 @@ func TestReceiverMsgpackDecoder(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.traces: + case rt := <-tc.r.Out: assert.Len(rt, 1) span := rt[0] assert.Equal(uint64(42), span.TraceID) @@ -320,25 +320,25 @@ func TestReceiverMsgpackDecoder(t *testing.T) { func TestReceiverServiceJSONDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use JSON decoding assert := assert.New(t) - conf := NewTestReceiverConfig() + conf := newTestReceiverConfig() testCases := []struct { name string r *HTTPReceiver - apiVersion APIVersion + apiVersion Version contentType string }{ - {"v01 with empty content-type", NewTestReceiverFromConfig(conf), v01, ""}, - {"v02 with empty content-type", NewTestReceiverFromConfig(conf), v02, ""}, - {"v03 with empty content-type", NewTestReceiverFromConfig(conf), v03, ""}, - {"v04 with empty content-type", NewTestReceiverFromConfig(conf), v04, ""}, - {"v01 with application/json", NewTestReceiverFromConfig(conf), v01, "application/json"}, - {"v02 with application/json", NewTestReceiverFromConfig(conf), v02, "application/json"}, - {"v03 with application/json", NewTestReceiverFromConfig(conf), v03, "application/json"}, - {"v04 with application/json", NewTestReceiverFromConfig(conf), v04, "application/json"}, - {"v01 with text/json", NewTestReceiverFromConfig(conf), v01, "text/json"}, - {"v02 with text/json", NewTestReceiverFromConfig(conf), v02, "text/json"}, - {"v03 with text/json", NewTestReceiverFromConfig(conf), v03, "text/json"}, - {"v04 with text/json", NewTestReceiverFromConfig(conf), v04, "text/json"}, + {"v01 with empty content-type", newTestReceiverFromConfig(conf), v01, ""}, + {"v02 with empty content-type", newTestReceiverFromConfig(conf), v02, ""}, + {"v03 with empty content-type", newTestReceiverFromConfig(conf), v03, ""}, + {"v04 with empty content-type", newTestReceiverFromConfig(conf), v04, ""}, + {"v01 with application/json", newTestReceiverFromConfig(conf), v01, "application/json"}, + {"v02 with application/json", newTestReceiverFromConfig(conf), v02, "application/json"}, + {"v03 with application/json", newTestReceiverFromConfig(conf), v03, "application/json"}, + {"v04 with application/json", newTestReceiverFromConfig(conf), v04, "application/json"}, + {"v01 with text/json", newTestReceiverFromConfig(conf), v01, "text/json"}, + {"v02 with text/json", newTestReceiverFromConfig(conf), v02, "text/json"}, + {"v03 with text/json", newTestReceiverFromConfig(conf), v03, "text/json"}, + {"v04 with text/json", newTestReceiverFromConfig(conf), v04, "text/json"}, } for _, tc := range testCases { @@ -394,17 +394,17 @@ func TestReceiverServiceMsgpackDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use Msgpack decoding // or it should raise a 415 Unsupported media type assert := assert.New(t) - conf := NewTestReceiverConfig() + conf := newTestReceiverConfig() testCases := []struct { name string r *HTTPReceiver - apiVersion APIVersion + apiVersion Version contentType string }{ - {"v01 with application/msgpack", NewTestReceiverFromConfig(conf), v01, "application/msgpack"}, - {"v02 with application/msgpack", NewTestReceiverFromConfig(conf), v02, "application/msgpack"}, - {"v03 with application/msgpack", NewTestReceiverFromConfig(conf), v03, "application/msgpack"}, - {"v04 with application/msgpack", NewTestReceiverFromConfig(conf), v04, "application/msgpack"}, + {"v01 with application/msgpack", newTestReceiverFromConfig(conf), v01, "application/msgpack"}, + {"v02 with application/msgpack", newTestReceiverFromConfig(conf), v02, "application/msgpack"}, + {"v03 with application/msgpack", newTestReceiverFromConfig(conf), v03, "application/msgpack"}, + {"v04 with application/msgpack", newTestReceiverFromConfig(conf), v04, "application/msgpack"}, } for _, tc := range testCases { @@ -495,8 +495,8 @@ func TestHandleTraces(t *testing.T) { msgp.Encode(&buf, testutil.GetTestTrace(10, 10, true)) // prepare the receiver - conf := NewTestReceiverConfig() - receiver := NewTestReceiverFromConfig(conf) + conf := newTestReceiverConfig() + receiver := newTestReceiverFromConfig(conf) // response recorder handler := http.HandlerFunc(receiver.httpHandleWithVersion(v04, receiver.handleTraces)) @@ -504,7 +504,7 @@ func TestHandleTraces(t *testing.T) { for n := 0; n < 10; n++ { // consume the traces channel without doing anything select { - case <-receiver.traces: + case <-receiver.Out: default: } @@ -519,7 +519,7 @@ func TestHandleTraces(t *testing.T) { handler.ServeHTTP(rr, req) } - rs := receiver.stats + rs := receiver.Stats assert.Equal(5, len(rs.Stats)) // We have a tagStats struct for each application // We test stats for each app @@ -558,9 +558,9 @@ func TestReceiverPreSamplerCancel(t *testing.T) { n := 100 // Payloads need to be big enough, else bug is not triggered msgp.Encode(&buf, testutil.GetTestTrace(n, n, true)) - conf := NewTestReceiverConfig() + conf := newTestReceiverConfig() conf.PreSampleRate = 0.000001 // Make sure we sample aggressively - receiver := NewTestReceiverFromConfig(conf) + receiver := newTestReceiverFromConfig(conf) server := httptest.NewServer(http.HandlerFunc(receiver.httpHandleWithVersion(v04, receiver.handleTraces))) @@ -601,8 +601,8 @@ func BenchmarkHandleTracesFromOneApp(b *testing.B) { msgp.Encode(&buf, testutil.GetTestTrace(1, 1, true)) // prepare the receiver - conf := NewTestReceiverConfig() - receiver := NewTestReceiverFromConfig(conf) + conf := newTestReceiverConfig() + receiver := newTestReceiverFromConfig(conf) // response recorder handler := http.HandlerFunc(receiver.httpHandleWithVersion(v04, receiver.handleTraces)) @@ -614,7 +614,7 @@ func BenchmarkHandleTracesFromOneApp(b *testing.B) { b.StopTimer() // consume the traces channel without doing anything select { - case <-receiver.traces: + case <-receiver.Out: default: } @@ -641,8 +641,8 @@ func BenchmarkHandleTracesFromMultipleApps(b *testing.B) { msgp.Encode(&buf, testutil.GetTestTrace(1, 1, true)) // prepare the receiver - conf := NewTestReceiverConfig() - receiver := NewTestReceiverFromConfig(conf) + conf := newTestReceiverConfig() + receiver := newTestReceiverFromConfig(conf) // response recorder handler := http.HandlerFunc(receiver.httpHandleWithVersion(v04, receiver.handleTraces)) @@ -654,7 +654,7 @@ func BenchmarkHandleTracesFromMultipleApps(b *testing.B) { b.StopTimer() // consume the traces channel without doing anything select { - case <-receiver.traces: + case <-receiver.Out: default: } diff --git a/cmd/trace-agent/listener.go b/api/listener.go similarity index 68% rename from cmd/trace-agent/listener.go rename to api/listener.go index 28d21a5fc..6f52ef03c 100644 --- a/cmd/trace-agent/listener.go +++ b/api/listener.go @@ -1,4 +1,4 @@ -package main +package api import ( "errors" @@ -9,51 +9,51 @@ import ( log "github.com/cihub/seelog" ) -// RateLimitedListener wraps a regular TCPListener with rate limiting. -type RateLimitedListener struct { +// rateLimitedListener wraps a regular TCPListener with rate limiting. +type rateLimitedListener struct { connLease int32 // How many connections are available for this listener before rate-limiting kicks in *net.TCPListener } -// NewRateLimitedListener returns a new wrapped listener, which is non-initialized -func NewRateLimitedListener(l net.Listener, conns int) (*RateLimitedListener, error) { +// newRateLimitedListener returns a new wrapped listener, which is non-initialized +func newRateLimitedListener(l net.Listener, conns int) (*rateLimitedListener, error) { tcpL, ok := l.(*net.TCPListener) if !ok { return nil, errors.New("cannot wrap listener") } - sl := &RateLimitedListener{connLease: int32(conns), TCPListener: tcpL} + sl := &rateLimitedListener{connLease: int32(conns), TCPListener: tcpL} return sl, nil } // Refresh periodically refreshes the connection lease, and thus cancels any rate limits in place -func (sl *RateLimitedListener) Refresh(conns int) { +func (sl *rateLimitedListener) Refresh(conns int) { for range time.Tick(30 * time.Second) { atomic.StoreInt32(&sl.connLease, int32(conns)) log.Debugf("Refreshed the connection lease: %d conns available", conns) } } -// RateLimitedError indicates a user request being blocked by our rate limit +// rateLimitedError indicates a user request being blocked by our rate limit // It satisfies the net.Error interface -type RateLimitedError struct{} +type rateLimitedError struct{} // Error returns an error string -func (e *RateLimitedError) Error() string { return "request has been rate-limited" } +func (e *rateLimitedError) Error() string { return "request has been rate-limited" } // Temporary tells the HTTP server loop that this error is temporary and recoverable -func (e *RateLimitedError) Temporary() bool { return true } +func (e *rateLimitedError) Temporary() bool { return true } // Timeout tells the HTTP server loop that this error is not a timeout -func (e *RateLimitedError) Timeout() bool { return false } +func (e *rateLimitedError) Timeout() bool { return false } // Accept reimplements the regular Accept but adds rate limiting. -func (sl *RateLimitedListener) Accept() (net.Conn, error) { +func (sl *rateLimitedListener) Accept() (net.Conn, error) { if atomic.LoadInt32(&sl.connLease) <= 0 { // we've reached our cap for this lease period, reject the request - return nil, &RateLimitedError{} + return nil, &rateLimitedError{} } for { diff --git a/cmd/trace-agent/receiver_logger.go b/api/logger.go similarity index 98% rename from cmd/trace-agent/receiver_logger.go rename to api/logger.go index 6a712926c..d5ff0752c 100644 --- a/cmd/trace-agent/receiver_logger.go +++ b/api/logger.go @@ -1,4 +1,4 @@ -package main +package api import ( "sync" diff --git a/cmd/trace-agent/receiver_responses.go b/api/responses.go similarity index 99% rename from cmd/trace-agent/receiver_responses.go rename to api/responses.go index e7db50d01..e78760d45 100644 --- a/cmd/trace-agent/receiver_responses.go +++ b/api/responses.go @@ -1,4 +1,4 @@ -package main +package api import ( "encoding/json" diff --git a/cmd/trace-agent/agent.go b/cmd/trace-agent/agent.go index e17721c0f..851c67903 100644 --- a/cmd/trace-agent/agent.go +++ b/cmd/trace-agent/agent.go @@ -7,6 +7,7 @@ import ( log "github.com/cihub/seelog" + "github.com/DataDog/datadog-trace-agent/api" "github.com/DataDog/datadog-trace-agent/config" "github.com/DataDog/datadog-trace-agent/filters" "github.com/DataDog/datadog-trace-agent/info" @@ -46,7 +47,7 @@ func (pt *processedTrace) getSamplingPriority() (int, bool) { // Agent struct holds all the sub-routines structs and make the data flow between them type Agent struct { - Receiver *HTTPReceiver + Receiver *api.HTTPReceiver Concentrator *Concentrator Blacklister *filters.Blacklister Replacer *filters.Replacer @@ -87,7 +88,7 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent { filteredServiceChan := make(chan model.ServicesMetadata, 50) // create components - r := NewHTTPReceiver(conf, dynConf, rawTraceChan, serviceChan) + r := api.NewHTTPReceiver(conf, dynConf, rawTraceChan, serviceChan) c := NewConcentrator( conf.ExtraAggregators, conf.BucketInterval.Nanoseconds(), @@ -136,7 +137,7 @@ func (a *Agent) Run() { defer watchdogTicker.Stop() // update the data served by expvar so that we don't expose a 0 sample rate - info.UpdatePreSampler(*a.Receiver.preSampler.Stats()) + info.UpdatePreSampler(*a.Receiver.PreSampler.Stats()) // TODO: unify components APIs. Use Start/Stop as non-blocking ways of controlling the blocking Run loop. // Like we do with TraceWriter. @@ -152,7 +153,7 @@ func (a *Agent) Run() { for { select { - case t := <-a.Receiver.traces: + case t := <-a.Receiver.Out: a.Process(t) case <-watchdogTicker.C: a.watchdog() @@ -186,7 +187,7 @@ func (a *Agent) Process(t model.Trace) { // We get the address of the struct holding the stats associated to no tags // TODO: get the real tagStats related to this trace payload. - ts := a.Receiver.stats.GetTagStats(info.Tags{}) + ts := a.Receiver.Stats.GetTagStats(info.Tags{}) // All traces should go through either through the normal score sampler or // the one dedicated to errors @@ -235,7 +236,7 @@ func (a *Agent) Process(t model.Trace) { } rate := sampler.GetTraceAppliedSampleRate(root) - rate *= a.Receiver.preSampler.Rate() + rate *= a.Receiver.PreSampler.Rate() sampler.SetTraceAppliedSampleRate(root, rate) // Need to do this computation before entering the concentrator @@ -332,17 +333,17 @@ func (a *Agent) watchdog() { info.UpdateWatchdogInfo(wi) // Adjust pre-sampling dynamically - rate, err := sampler.CalcPreSampleRate(a.conf.MaxCPU, wi.CPU.UserAvg, a.Receiver.preSampler.RealRate()) + rate, err := sampler.CalcPreSampleRate(a.conf.MaxCPU, wi.CPU.UserAvg, a.Receiver.PreSampler.RealRate()) if rate > a.conf.PreSampleRate { rate = a.conf.PreSampleRate } if err != nil { log.Warnf("problem computing pre-sample rate: %v", err) } - a.Receiver.preSampler.SetRate(rate) - a.Receiver.preSampler.SetError(err) + a.Receiver.PreSampler.SetRate(rate) + a.Receiver.PreSampler.SetError(err) - preSamplerStats := a.Receiver.preSampler.Stats() + preSamplerStats := a.Receiver.PreSampler.Stats() statsd.Client.Gauge("datadog.trace_agent.presampler_rate", preSamplerStats.Rate, nil, 1) info.UpdatePreSampler(*preSamplerStats) } diff --git a/cmd/trace-agent/agent_test.go b/cmd/trace-agent/agent_test.go index b8b2ab695..d79fbb78e 100644 --- a/cmd/trace-agent/agent_test.go +++ b/cmd/trace-agent/agent_test.go @@ -163,7 +163,7 @@ func TestProcess(t *testing.T) { Duration: (500 * time.Millisecond).Nanoseconds(), } - stats := agent.Receiver.stats.GetTagStats(info.Tags{}) + stats := agent.Receiver.Stats.GetTagStats(info.Tags{}) assert := assert.New(t) agent.Process(model.Trace{spanValid})