Skip to content

Commit

Permalink
Improve cloud aggregation for different URLs (#1220)
Browse files Browse the repository at this point in the history
stats/cloud: improve cloud aggregation for different URLs

For cloud output, we currently don't aggregate HTTP metrics with
different url tag, but with the same name tag.

This PR improves aggregation by setting url tag to be the same as name
tag. It's not ideal, but seems like the least bad option in term of UX.

Fixes #1166
  • Loading branch information
cuonglm authored Oct 30, 2019
1 parent 78858ef commit a948b4d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
19 changes: 19 additions & 0 deletions stats/cloud/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,24 @@ func (c *Collector) Run(ctx context.Context) {
}
}

func useCloudTags(source *httpext.Trail) *httpext.Trail {
name, nameExist := source.Tags.Get("name")
url, urlExist := source.Tags.Get("url")
if !nameExist || !urlExist || name == url {
return source
}

newTags := source.Tags.CloneTags()
newTags["url"] = name

dest := new(httpext.Trail)
*dest = *source
dest.Tags = stats.IntoSampleTags(&newTags)
dest.Samples = nil

return dest
}

// Collect receives a set of samples. This method is never called concurrently, and only while
// the context for Run() is valid, but should defer as much work as possible to Run().
func (c *Collector) Collect(sampleContainers []stats.SampleContainer) {
Expand All @@ -269,6 +287,7 @@ func (c *Collector) Collect(sampleContainers []stats.SampleContainer) {
for _, sampleContainer := range sampleContainers {
switch sc := sampleContainer.(type) {
case *httpext.Trail:
sc = useCloudTags(sc)
// Check if aggregation is enabled,
if c.config.AggregationPeriod.Duration > 0 {
newHTTPTrails = append(newHTTPTrails, sc)
Expand Down
30 changes: 27 additions & 3 deletions stats/cloud/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ import (
"github.com/loadimpact/k6/stats"
)

func tagEqual(expected, got *stats.SampleTags) bool {
expectedMap := expected.CloneTags()
gotMap := got.CloneTags()

if len(expectedMap) != len(gotMap) {
return false
}

for k, v := range gotMap {
if k == "url" {
if expectedMap["name"] != v {
return false
}
} else if expectedMap[k] != v {
return false
}
}
return true
}

func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
Expand Down Expand Up @@ -84,7 +104,7 @@ func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc
case *SampleDataMap:
receivedData, ok := receivedSample.Data.(*SampleDataMap)
assert.True(t, ok)
assert.True(t, expData.Tags.IsEqual(receivedData.Tags))
assert.True(t, tagEqual(expData.Tags, receivedData.Tags))
assert.True(t, expData.Time.Equal(receivedData.Time))
assert.Equal(t, expData.Type, receivedData.Type)
assert.Equal(t, expData.Values, receivedData.Values)
Expand Down Expand Up @@ -170,7 +190,11 @@ func TestCloudCollector(t *testing.T) {
assert.Equal(t, types.Duration(5*time.Millisecond), collector.config.AggregationWaitPeriod.Duration)

now := time.Now()
tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"})
tagMap := map[string]string{"test": "mest", "a": "b", "name": "name", "url": "url"}
tags := stats.IntoSampleTags(&tagMap)
expectedTagMap := tags.CloneTags()
expectedTagMap["url"], _ = tags.Get("name")
expectedTags := stats.IntoSampleTags(&expectedTagMap)

expSamples := make(chan []Sample)
tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), getSampleChecker(t, expSamples))
Expand Down Expand Up @@ -243,7 +267,7 @@ func TestCloudCollector(t *testing.T) {
Data: func(data interface{}) {
aggrData, ok := data.(*SampleDataAggregatedHTTPReqs)
assert.True(t, ok)
assert.True(t, aggrData.Tags.IsEqual(tags))
assert.True(t, aggrData.Tags.IsEqual(expectedTags))
assert.Equal(t, collector.config.AggregationMinSamples.Int64, int64(aggrData.Count))
assert.Equal(t, "aggregated_trend", aggrData.Type)
assert.InDelta(t, now.UnixNano(), time.Time(aggrData.Time).UnixNano(), float64(collector.config.AggregationPeriod.Duration))
Expand Down

0 comments on commit a948b4d

Please sign in to comment.