Skip to content

Commit

Permalink
Merge pull request #1874 from loadimpact/move-cloud-output-to-new-int…
Browse files Browse the repository at this point in the history
…erface

Move the cloud output to the new Output interface
  • Loading branch information
na-- authored Mar 4, 2021
2 parents a44f5dc + 97d2416 commit 57f7244
Show file tree
Hide file tree
Showing 17 changed files with 471 additions and 515 deletions.
9 changes: 2 additions & 7 deletions cloudapi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@

package cloudapi

import "fmt"

// URLForResults returns the cloud URL with the test run results.
func URLForResults(refID string, config Config) string {
path := "runs"
if config.Token.String == "" {
path = "anonymous"
}
return fmt.Sprintf("%s/%s/%s", config.WebAppURL.String, path, refID)
return config.WebAppURL.String + "/runs/" + refID
}
3 changes: 3 additions & 0 deletions cmd/login_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ This will set the default token used when just "k6 run -o cloud" is passed.`,
newCloudConf.Token = null.StringFrom(res.Token)
}

if currentDiskConf.Collectors == nil {
currentDiskConf.Collectors = make(map[string]json.RawMessage)
}
currentDiskConf.Collectors["cloud"], err = json.Marshal(newCloudConf)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cmd/login_influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ This will set the default server used when just "-o influxdb" is passed.`,
return err
}

if config.Collectors == nil {
config.Collectors = make(map[string]json.RawMessage)
}
config.Collectors["influxdb"], err = json.Marshal(conf)
if err != nil {
return err
Expand Down
55 changes: 10 additions & 45 deletions cmd/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/afero"

"github.com/loadimpact/k6/cloudapi"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/consts"
"github.com/loadimpact/k6/loader"
"github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/output/cloud"
"github.com/loadimpact/k6/output/json"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/csv"
"github.com/loadimpact/k6/stats/datadog"
"github.com/loadimpact/k6/stats/influxdb"
Expand All @@ -49,7 +47,8 @@ import (
func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) {
// Start with the built-in outputs
result := map[string]func(output.Params) (output.Output, error){
"json": json.New,
"json": json.New,
"cloud": cloud.New,

// TODO: remove all of these
"influxdb": func(params output.Params) (output.Output, error) {
Expand All @@ -61,20 +60,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, influxc)
},
"cloud": func(params output.Params) (output.Output, error) {
conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
if err != nil {
return nil, err
}
cloudc, err := cloud.New(
params.Logger, conf, params.ScriptPath, params.ScriptOptions, params.ExecutionPlan, consts.Version,
)
if err != nil {
return nil, err
}
return newCollectorAdapter(params, cloudc)
return newCollectorAdapter(params, influxc), nil
},
"kafka": func(params output.Params) (output.Output, error) {
conf, err := kafka.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
Expand All @@ -85,7 +71,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, kafkac)
return newCollectorAdapter(params, kafkac), nil
},
"statsd": func(params output.Params) (output.Output, error) {
conf, err := statsd.GetConsolidatedConfig(params.JSONConfig, params.Environment)
Expand All @@ -96,7 +82,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, statsdc)
return newCollectorAdapter(params, statsdc), nil
},
"datadog": func(params output.Params) (output.Output, error) {
conf, err := datadog.GetConsolidatedConfig(params.JSONConfig, params.Environment)
Expand All @@ -107,7 +93,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, datadogc)
return newCollectorAdapter(params, datadogc), nil
},
"csv": func(params output.Params) (output.Output, error) {
conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
Expand All @@ -118,7 +104,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, csvc)
return newCollectorAdapter(params, csvc), nil
},
}

Expand Down Expand Up @@ -202,27 +188,12 @@ func parseOutputArgument(s string) (t, arg string) {

// TODO: remove this after we transition every collector to the output interface

func newCollectorAdapter(params output.Params, collector lib.Collector) (output.Output, error) {
// Check if all required tags are present
missingRequiredTags := []string{}
requiredTags := collector.GetRequiredSystemTags()
for _, tag := range stats.SystemTagSetValues() {
if requiredTags.Has(tag) && !params.ScriptOptions.SystemTags.Has(tag) {
missingRequiredTags = append(missingRequiredTags, tag.String())
}
}
if len(missingRequiredTags) > 0 {
return nil, fmt.Errorf(
"the specified output '%s' needs the following system tags enabled: %s",
params.OutputType, strings.Join(missingRequiredTags, ", "),
)
}

func newCollectorAdapter(params output.Params, collector lib.Collector) output.Output {
return &collectorAdapter{
outputType: params.OutputType,
collector: collector,
stopCh: make(chan struct{}),
}, nil
}
}

// collectorAdapter is a _temporary_ fix until we move all of the old
Expand Down Expand Up @@ -259,15 +230,9 @@ func (ca *collectorAdapter) AddMetricSamples(samples []stats.SampleContainer) {
ca.collector.Collect(samples)
}

func (ca *collectorAdapter) SetRunStatus(latestStatus lib.RunStatus) {
ca.collector.SetRunStatus(latestStatus)
}

// Stop implements the new output interface.
func (ca *collectorAdapter) Stop() error {
ca.runCtxCancel()
<-ca.stopCh
return nil
}

var _ output.WithRunStatusUpdates = &collectorAdapter{}
6 changes: 0 additions & 6 deletions lib/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,4 @@ type Collector interface {

// Optionally return a link that is shown to the user.
Link() string

// Return the required system sample tags for the specific collector
GetRequiredSystemTags() stats.SystemTagSet

// Set run status
SetRunStatus(status RunStatus)
}
53 changes: 29 additions & 24 deletions stats/cloud/bench_test.go → output/cloud/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package cloud
import (
"bytes"
"compress/gzip"
json "encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -36,29 +37,28 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/cloudapi"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/netext/httpext"
"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/testutils/httpmultibin"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/stats"
)

func BenchmarkAggregateHTTP(b *testing.B) {
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}

config := cloudapi.NewConfig().Apply(cloudapi.Config{
NoCompress: null.BoolFrom(true),
AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200),
AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200),
out, err := newOutput(output.Params{
Logger: testutils.NewLogger(b),
JSONConfig: json.RawMessage(`{"noCompress": true, "aggregationCalcInterval": "200ms","aggregationPeriod": "200ms"}`),
ScriptOptions: lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
SystemTags: &stats.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
})
collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(b, err)
now := time.Now()
collector.referenceID = "something"
out.referenceID = "something"
containersCount := 500000

for _, tagCount := range []int{1, 5, 35, 315, 3645} {
Expand All @@ -79,10 +79,10 @@ func BenchmarkAggregateHTTP(b *testing.B) {
tags := generateTags(i, tagCount, map[string]string{"status": status})
container[i-1] = generateHTTPExtTrail(now, time.Duration(i), tags)
}
collector.Collect(container)
out.AddMetricSamples(container)
b.StartTimer()
collector.aggregateHTTPTrails(time.Millisecond * 200)
collector.bufferSamples = nil
out.aggregateHTTPTrails(time.Millisecond * 200)
out.bufferSamples = nil
}
})
}
Expand Down Expand Up @@ -289,9 +289,6 @@ func generateHTTPExtTrail(now time.Time, i time.Duration, tags *stats.SampleTags
}

func BenchmarkHTTPPush(b *testing.B) {
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
tb := httpmultibin.NewHTTPMultiBin(b)
tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := fmt.Fprint(w, `{
Expand All @@ -307,14 +304,22 @@ func BenchmarkHTTPPush(b *testing.B) {
},
)

config := cloudapi.NewConfig().Apply(cloudapi.Config{
Host: null.StringFrom(tb.ServerHTTP.URL),
AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200),
AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200),
out, err := newOutput(output.Params{
Logger: testutils.NewLogger(b),
JSONConfig: json.RawMessage(fmt.Sprintf(`{
"host": "%s",
"noCompress": true,
"aggregationCalcInterval": "200ms",
"aggregationPeriod": "200ms"
}`, tb.ServerHTTP.URL)),
ScriptOptions: lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
SystemTags: &stats.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
})
collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(b, err)
collector.referenceID = "fake"
out.referenceID = "fake"

for _, count := range []int{1000, 5000, 50000, 100000, 250000} {
count := count
Expand All @@ -325,7 +330,7 @@ func BenchmarkHTTPPush(b *testing.B) {
b.StopTimer()
toSend := append([]*Sample{}, samples...)
b.StartTimer()
require.NoError(b, collector.PushMetric("fake", false, toSend))
require.NoError(b, out.PushMetric("fake", false, toSend))
}
})
}
Expand Down
File renamed without changes.
8 changes: 5 additions & 3 deletions stats/cloud/data.go → output/cloud/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ func (d durations) Less(i, j int) bool { return d[i] < d[j] }
// Used when there are fewer samples in the bucket (so we can interpolate)
// and for benchmark comparisons and verification of the quickselect
// algorithm (it should return exactly the same results if interpolation isn't used).
func (d durations) SortGetNormalBounds(radius, iqrLowerCoef, iqrUpperCoef float64, interpolate bool) (min, max time.Duration) {
func (d durations) SortGetNormalBounds(
radius, iqrLowerCoef, iqrUpperCoef float64, interpolate bool,
) (min, max time.Duration) {
if len(d) == 0 {
return
}
Expand Down Expand Up @@ -276,7 +278,7 @@ func (d durations) SortGetNormalBounds(radius, iqrLowerCoef, iqrUpperCoef float6

min = q1 - time.Duration(iqrLowerCoef*iqr) // lower fence, anything below this is an outlier
max = q3 + time.Duration(iqrUpperCoef*iqr) // upper fence, anything above this is an outlier
return
return min, max
}

// Reworked and translated in Go from:
Expand All @@ -288,7 +290,7 @@ func (d durations) SortGetNormalBounds(radius, iqrLowerCoef, iqrUpperCoef float6
// that only depends on the sort.Interface methods, but that would
// probably introduce some performance overhead because of the
// dynamic dispatch.
func (d durations) quickSelect(k int) time.Duration {
func (d durations) quickSelect(k int) time.Duration { //nolint:gocognit
n := len(d)
l := 0
ir := n - 1
Expand Down
Loading

0 comments on commit 57f7244

Please sign in to comment.