diff --git a/Makefile b/Makefile index 50734ba4a..6a14ebb55 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ ci: GOOS=windows go build ./cmd/trace-agent # ensure windows builds go get -u golang.org/x/lint/golint golint -set_exit_status=1 ./cmd/trace-agent ./filters ./api ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil - go test -v ./... + go test -v -race ./... windows: # pre-packages resources needed for the windows release diff --git a/config/agent_test.go b/config/agent_test.go index 839b09bf4..f4ca10f67 100644 --- a/config/agent_test.go +++ b/config/agent_test.go @@ -166,7 +166,7 @@ func TestFullYamlConfig(t *testing.T) { assert.Equal(0.5, c.ExtraSampleRate) assert.Equal(5.0, c.MaxTPS) assert.Equal("0.0.0.0", c.ReceiverHost) - assert.EqualValues([]*Endpoint{ + assert.ElementsMatch([]*Endpoint{ {Host: "https://datadog.unittests", APIKey: "api_key_test"}, {Host: "https://my1.endpoint.com", APIKey: "apikey1"}, {Host: "https://my1.endpoint.com", APIKey: "apikey2"}, diff --git a/config/merge_yaml.go b/config/merge_yaml.go index 309aa4202..d3b261ed1 100644 --- a/config/merge_yaml.go +++ b/config/merge_yaml.go @@ -8,13 +8,12 @@ import ( "regexp" "time" - "gopkg.in/yaml.v2" - "github.com/DataDog/datadog-trace-agent/backoff" "github.com/DataDog/datadog-trace-agent/model" "github.com/DataDog/datadog-trace-agent/osutil" writerconfig "github.com/DataDog/datadog-trace-agent/writer/config" log "github.com/cihub/seelog" + "gopkg.in/yaml.v2" ) // apiEndpointPrefix is the URL prefix prepended to the default site value from YamlAgentConfig. diff --git a/writer/multi_writer_test.go b/writer/multi_writer_test.go index 8e3a09281..ab881a7da 100644 --- a/writer/multi_writer_test.go +++ b/writer/multi_writer_test.go @@ -94,11 +94,10 @@ func TestMultiSender(t *testing.T) { mock1.monitor <- "ping1" mock2.monitor <- "ping2" - msg1 := <-multi.mch - msg2 := <-multi.mch - - assert.Equal(t, "ping1", msg1.(string)) - assert.Equal(t, "ping2", msg2.(string)) + assert.ElementsMatch(t, + []string{"ping1", "ping2"}, + []string{(<-multi.mch).(string), (<-multi.mch).(string)}, + ) }) } diff --git a/writer/payload.go b/writer/payload.go index 51e2fd62f..2d51c823c 100644 --- a/writer/payload.go +++ b/writer/payload.go @@ -3,13 +3,13 @@ package writer import ( "container/list" "fmt" + "sync/atomic" "time" - log "github.com/cihub/seelog" - "github.com/DataDog/datadog-trace-agent/backoff" "github.com/DataDog/datadog-trace-agent/watchdog" writerconfig "github.com/DataDog/datadog-trace-agent/writer/config" + log "github.com/cihub/seelog" ) // Payload represents a data payload to be sent to some endpoint @@ -274,7 +274,7 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error { newPayloadSize, s.conf.MaxQueuedBytes) } - for s.conf.MaxQueuedBytes > 0 && s.currentQueuedSize+newPayloadSize > s.conf.MaxQueuedBytes { + for s.conf.MaxQueuedBytes > 0 && atomic.LoadInt64(&s.currentQueuedSize)+newPayloadSize > s.conf.MaxQueuedBytes { if _, err := s.dropOldestPayload("max queued bytes reached"); err != nil { // Should never happen because we know we can fit it in panic(fmt.Errorf("unable to find space for queueing payload of size %d: %v", newPayloadSize, err)) @@ -283,7 +283,7 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error { log.Tracef("Queuing new payload: %v", payload) s.queuedPayloads.PushBack(payload) - s.currentQueuedSize += newPayloadSize + atomic.AddInt64(&s.currentQueuedSize, newPayloadSize) return nil } @@ -336,7 +336,7 @@ func (s *QueuablePayloadSender) flushQueue() error { func (s *QueuablePayloadSender) removeQueuedPayload(e *list.Element) *list.Element { next := e.Next() payload := e.Value.(*Payload) - s.currentQueuedSize -= int64(len(payload.Bytes)) + atomic.AddInt64(&s.currentQueuedSize, -int64(len(payload.Bytes))) s.queuedPayloads.Remove(e) return next } diff --git a/writer/payload_test.go b/writer/payload_test.go index 76e2ee20e..362d1cdd1 100644 --- a/writer/payload_test.go +++ b/writer/payload_test.go @@ -2,6 +2,7 @@ package writer import ( "fmt" + "sync/atomic" "testing" "time" @@ -519,10 +520,10 @@ func TestQueuablePayloadSender_RetryOfTooOldQueue(t *testing.T) { flakyEndpoint.SetError(nil) // Wait for a retry - time.Sleep(200 * time.Millisecond) + time.Sleep(500 * time.Millisecond) // Then we should have no queued payloads - assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads") + assert.EqualValues(0, atomic.LoadInt64(&queuableSender.currentQueuedSize), "We should have nothing queued") // When we stop the sender queuableSender.Stop()