Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
writer: fix race in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbr committed Oct 22, 2018
1 parent 48f6c4c commit 7776b45
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ci:
GOOS=windows go build ./cmd/trace-agent # ensure windows builds
go get -u golang.org/x/lint/golint
golint -set_exit_status=1 ./cmd/trace-agent ./filters ./api ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil
go test -v ./...
go test -v -race ./...

windows:
# pre-packages resources needed for the windows release
Expand Down
2 changes: 1 addition & 1 deletion config/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestFullYamlConfig(t *testing.T) {
assert.Equal(0.5, c.ExtraSampleRate)
assert.Equal(5.0, c.MaxTPS)
assert.Equal("0.0.0.0", c.ReceiverHost)
assert.EqualValues([]*Endpoint{
assert.ElementsMatch([]*Endpoint{
{Host: "https://datadog.unittests", APIKey: "api_key_test"},
{Host: "https://my1.endpoint.com", APIKey: "apikey1"},
{Host: "https://my1.endpoint.com", APIKey: "apikey2"},
Expand Down
3 changes: 1 addition & 2 deletions config/merge_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"regexp"
"time"

"gopkg.in/yaml.v2"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/osutil"
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
log "github.com/cihub/seelog"
"gopkg.in/yaml.v2"
)

// apiEndpointPrefix is the URL prefix prepended to the default site value from YamlAgentConfig.
Expand Down
9 changes: 4 additions & 5 deletions writer/multi_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ func TestMultiSender(t *testing.T) {
mock1.monitor <- "ping1"
mock2.monitor <- "ping2"

msg1 := <-multi.mch
msg2 := <-multi.mch

assert.Equal(t, "ping1", msg1.(string))
assert.Equal(t, "ping2", msg2.(string))
assert.ElementsMatch(t,
[]string{"ping1", "ping2"},
[]string{(<-multi.mch).(string), (<-multi.mch).(string)},
)
})
}

Expand Down
10 changes: 5 additions & 5 deletions writer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package writer
import (
"container/list"
"fmt"
"sync/atomic"
"time"

log "github.com/cihub/seelog"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/watchdog"
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
log "github.com/cihub/seelog"
)

// Payload represents a data payload to be sent to some endpoint
Expand Down Expand Up @@ -274,7 +274,7 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {
newPayloadSize, s.conf.MaxQueuedBytes)
}

for s.conf.MaxQueuedBytes > 0 && s.currentQueuedSize+newPayloadSize > s.conf.MaxQueuedBytes {
for s.conf.MaxQueuedBytes > 0 && atomic.LoadInt64(&s.currentQueuedSize)+newPayloadSize > s.conf.MaxQueuedBytes {
if _, err := s.dropOldestPayload("max queued bytes reached"); err != nil {
// Should never happen because we know we can fit it in
panic(fmt.Errorf("unable to find space for queueing payload of size %d: %v", newPayloadSize, err))
Expand All @@ -283,7 +283,7 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {

log.Tracef("Queuing new payload: %v", payload)
s.queuedPayloads.PushBack(payload)
s.currentQueuedSize += newPayloadSize
atomic.AddInt64(&s.currentQueuedSize, newPayloadSize)

return nil
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func (s *QueuablePayloadSender) flushQueue() error {
func (s *QueuablePayloadSender) removeQueuedPayload(e *list.Element) *list.Element {
next := e.Next()
payload := e.Value.(*Payload)
s.currentQueuedSize -= int64(len(payload.Bytes))
atomic.AddInt64(&s.currentQueuedSize, -int64(len(payload.Bytes)))
s.queuedPayloads.Remove(e)
return next
}
Expand Down
5 changes: 3 additions & 2 deletions writer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package writer

import (
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -519,10 +520,10 @@ func TestQueuablePayloadSender_RetryOfTooOldQueue(t *testing.T) {
flakyEndpoint.SetError(nil)

// Wait for a retry
time.Sleep(200 * time.Millisecond)
time.Sleep(500 * time.Millisecond)

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")
assert.EqualValues(0, atomic.LoadInt64(&queuableSender.currentQueuedSize), "We should have nothing queued")

// When we stop the sender
queuableSender.Stop()
Expand Down

0 comments on commit 7776b45

Please sign in to comment.