From a948b4dfc0e1be88a3f48206fe04a6fbf18b6301 Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Wed, 30 Oct 2019 17:31:24 +0700 Subject: [PATCH] Improve cloud aggregation for different URLs (#1220) stats/cloud: improve cloud aggregation for different URLs For cloud output, we currently don't aggregate HTTP metrics with different url tag, but with the same name tag. This PR improves aggregation by setting url tag to be the same as name tag. It's not ideal, but seems like the least bad option in term of UX. Fixes #1166 --- stats/cloud/collector.go | 19 +++++++++++++++++++ stats/cloud/collector_test.go | 30 +++++++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/stats/cloud/collector.go b/stats/cloud/collector.go index b2b8facb05d..dc56d4fa671 100644 --- a/stats/cloud/collector.go +++ b/stats/cloud/collector.go @@ -250,6 +250,24 @@ func (c *Collector) Run(ctx context.Context) { } } +func useCloudTags(source *httpext.Trail) *httpext.Trail { + name, nameExist := source.Tags.Get("name") + url, urlExist := source.Tags.Get("url") + if !nameExist || !urlExist || name == url { + return source + } + + newTags := source.Tags.CloneTags() + newTags["url"] = name + + dest := new(httpext.Trail) + *dest = *source + dest.Tags = stats.IntoSampleTags(&newTags) + dest.Samples = nil + + return dest +} + // Collect receives a set of samples. This method is never called concurrently, and only while // the context for Run() is valid, but should defer as much work as possible to Run(). func (c *Collector) Collect(sampleContainers []stats.SampleContainer) { @@ -269,6 +287,7 @@ func (c *Collector) Collect(sampleContainers []stats.SampleContainer) { for _, sampleContainer := range sampleContainers { switch sc := sampleContainer.(type) { case *httpext.Trail: + sc = useCloudTags(sc) // Check if aggregation is enabled, if c.config.AggregationPeriod.Duration > 0 { newHTTPTrails = append(newHTTPTrails, sc) diff --git a/stats/cloud/collector_test.go b/stats/cloud/collector_test.go index a9af6c16cc4..de964c0e2fe 100644 --- a/stats/cloud/collector_test.go +++ b/stats/cloud/collector_test.go @@ -47,6 +47,26 @@ import ( "github.com/loadimpact/k6/stats" ) +func tagEqual(expected, got *stats.SampleTags) bool { + expectedMap := expected.CloneTags() + gotMap := got.CloneTags() + + if len(expectedMap) != len(gotMap) { + return false + } + + for k, v := range gotMap { + if k == "url" { + if expectedMap["name"] != v { + return false + } + } else if expectedMap[k] != v { + return false + } + } + return true +} + func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) @@ -84,7 +104,7 @@ func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc case *SampleDataMap: receivedData, ok := receivedSample.Data.(*SampleDataMap) assert.True(t, ok) - assert.True(t, expData.Tags.IsEqual(receivedData.Tags)) + assert.True(t, tagEqual(expData.Tags, receivedData.Tags)) assert.True(t, expData.Time.Equal(receivedData.Time)) assert.Equal(t, expData.Type, receivedData.Type) assert.Equal(t, expData.Values, receivedData.Values) @@ -170,7 +190,11 @@ func TestCloudCollector(t *testing.T) { assert.Equal(t, types.Duration(5*time.Millisecond), collector.config.AggregationWaitPeriod.Duration) now := time.Now() - tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"}) + tagMap := map[string]string{"test": "mest", "a": "b", "name": "name", "url": "url"} + tags := stats.IntoSampleTags(&tagMap) + expectedTagMap := tags.CloneTags() + expectedTagMap["url"], _ = tags.Get("name") + expectedTags := stats.IntoSampleTags(&expectedTagMap) expSamples := make(chan []Sample) tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", collector.referenceID), getSampleChecker(t, expSamples)) @@ -243,7 +267,7 @@ func TestCloudCollector(t *testing.T) { Data: func(data interface{}) { aggrData, ok := data.(*SampleDataAggregatedHTTPReqs) assert.True(t, ok) - assert.True(t, aggrData.Tags.IsEqual(tags)) + assert.True(t, aggrData.Tags.IsEqual(expectedTags)) assert.Equal(t, collector.config.AggregationMinSamples.Int64, int64(aggrData.Count)) assert.Equal(t, "aggregated_trend", aggrData.Type) assert.InDelta(t, now.UnixNano(), time.Time(aggrData.Time).UnixNano(), float64(collector.config.AggregationPeriod.Duration))