Skip to content

Commit

Permalink
stats/cloud: stop sending cloud metrics when limit reached
Browse files Browse the repository at this point in the history
Once the cloud test gets aborted by limit, we should not send metrics
anymore. The test and collector will still running, though.

Close #1074
  • Loading branch information
cuonglm committed Aug 29, 2019
1 parent 14c8177 commit 082adaa
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 14 deletions.
50 changes: 36 additions & 14 deletions stats/cloud/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"encoding/json"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -54,9 +55,10 @@ type Collector struct {
anonymous bool
runStatus lib.RunStatus

bufferMutex sync.Mutex
bufferHTTPTrails []*httpext.Trail
bufferSamples []*Sample
mu sync.Mutex // guard fields below
bufferHTTPTrails []*httpext.Trail
bufferSamples []*Sample
stopSendingMetrics bool

opts lib.Options

Expand Down Expand Up @@ -291,18 +293,18 @@ func (c *Collector) Collect(sampleContainers []stats.SampleContainer) {
}

if len(newSamples) > 0 || len(newHTTPTrails) > 0 {
c.bufferMutex.Lock()
c.mu.Lock()
c.bufferSamples = append(c.bufferSamples, newSamples...)
c.bufferHTTPTrails = append(c.bufferHTTPTrails, newHTTPTrails...)
c.bufferMutex.Unlock()
c.mu.Unlock()
}
}

func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) {
c.bufferMutex.Lock()
c.mu.Lock()
newHTTPTrails := c.bufferHTTPTrails
c.bufferHTTPTrails = nil
c.bufferMutex.Unlock()
c.mu.Unlock()

aggrPeriod := int64(c.config.AggregationPeriod.Duration)

Expand Down Expand Up @@ -415,15 +417,15 @@ func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) {
}

if len(newSamples) > 0 {
c.bufferMutex.Lock()
c.mu.Lock()
c.bufferSamples = append(c.bufferSamples, newSamples...)
c.bufferMutex.Unlock()
c.mu.Unlock()
}
}

func (c *Collector) flushHTTPTrails() {
c.bufferMutex.Lock()
defer c.bufferMutex.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

newSamples := []*Sample{}
for _, trail := range c.bufferHTTPTrails {
Expand All @@ -441,15 +443,28 @@ func (c *Collector) flushHTTPTrails() {
c.aggrBuckets = map[int64]aggregationBucket{}
c.bufferSamples = append(c.bufferSamples, newSamples...)
}

func (c *Collector) shouldStopSendingMetrics(err error) bool {
if err == nil {
return false
}

return strings.HasPrefix(err.Error(), "(403/E4)")
}

func (c *Collector) pushMetrics() {
c.bufferMutex.Lock()
c.mu.Lock()
if c.stopSendingMetrics {
c.mu.Unlock()
return
}
if len(c.bufferSamples) == 0 {
c.bufferMutex.Unlock()
c.mu.Unlock()
return
}
buffer := c.bufferSamples
c.bufferSamples = nil
c.bufferMutex.Unlock()
c.mu.Unlock()

logrus.WithFields(logrus.Fields{
"samples": len(buffer),
Expand All @@ -465,6 +480,13 @@ func (c *Collector) pushMetrics() {
logrus.WithFields(logrus.Fields{
"error": err,
}).Warn("Failed to send metrics to cloud")
if c.shouldStopSendingMetrics(err) {
logrus.Warn("Stop sending metrics to cloud due to error.")
c.mu.Lock()
c.stopSendingMetrics = true
c.mu.Unlock()
break
}
}
buffer = buffer[size:]
}
Expand Down
101 changes: 101 additions & 0 deletions stats/cloud/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,104 @@ func TestCloudCollectorMaxPerPacket(t *testing.T) {
wg.Wait()
require.True(t, gotTheLimit)
}

func TestCloudCollectorStopSendingMetric(t *testing.T) {
t.Parallel()
tb := testutils.NewHTTPMultiBin(t)
tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := fmt.Fprint(w, `{
"reference_id": "12",
"config": {
"metricPushInterval": "200ms",
"aggregationPeriod": "100ms",
"maxMetricSamplesPerPackage": 20,
"aggregationCalcInterval": "100ms",
"aggregationWaitPeriod": "100ms"
}
}`)
require.NoError(t, err)
}))
defer tb.Cleanup()

script := &loader.SourceData{
Data: []byte(""),
URL: &url.URL{Path: "/script.js"},
}

options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}

config := NewConfig().Apply(Config{
Host: null.StringFrom(tb.ServerHTTP.URL),
NoCompress: null.BoolFrom(true),
})
collector, err := New(config, script, options, "1.0")
require.NoError(t, err)
now := time.Now()
tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"})

count := 1
tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID),
func(w http.ResponseWriter, r *http.Request) {
count++
if count == 5 {
type payload struct {
Error ErrorResponse `json:"error"`
}
res := &payload{}
res.Error = ErrorResponse{Code: 4}
w.Header().Set("Content-Type", "application/json")
data, err := json.Marshal(res)
if err != nil {
t.Fatal(err)
}
w.WriteHeader(http.StatusForbidden)
_, _ = w.Write(data)
return
}
body, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
receivedSamples := []Sample{}
assert.NoError(t, json.Unmarshal(body, &receivedSamples))
})

require.NoError(t, collector.Init())
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
collector.Run(ctx)
wg.Done()
}()

collector.Collect([]stats.SampleContainer{stats.Sample{
Time: now,
Metric: metrics.VUs,
Tags: stats.NewSampleTags(tags.CloneTags()),
Value: 1.0,
}})
for j := time.Duration(1); j <= 200; j++ {
var container = make([]stats.SampleContainer, 0, 500)
for i := time.Duration(1); i <= 50; i++ {
container = append(container, &httpext.Trail{
Blocked: i % 200 * 100 * time.Millisecond,
Connecting: i % 200 * 200 * time.Millisecond,
TLSHandshaking: i % 200 * 300 * time.Millisecond,
Sending: i * i * 400 * time.Millisecond,
Waiting: 500 * time.Millisecond,
Receiving: 600 * time.Millisecond,

EndTime: now.Add(i * 100),
ConnDuration: 500 * time.Millisecond,
Duration: j * i * 1500 * time.Millisecond,
Tags: stats.NewSampleTags(tags.CloneTags()),
})
}
collector.Collect(container)
}

cancel()
wg.Wait()
require.True(t, collector.stopSendingMetrics)
}

0 comments on commit 082adaa

Please sign in to comment.