Skip to content

Commit

Permalink
Move the cloud output to the new Output interface
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Mar 4, 2021
1 parent a44f5dc commit a67b3a1
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 346 deletions.
7 changes: 2 additions & 5 deletions cloudapi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ package cloudapi

import "fmt"

// URLForResults returns the cloud URL with the test run results.
func URLForResults(refID string, config Config) string {
path := "runs"
if config.Token.String == "" {
path = "anonymous"
}
return fmt.Sprintf("%s/%s/%s", config.WebAppURL.String, path, refID)
return fmt.Sprintf("%s/runs/%s", config.WebAppURL.String, refID)
}
55 changes: 10 additions & 45 deletions cmd/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/afero"

"github.com/loadimpact/k6/cloudapi"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/consts"
"github.com/loadimpact/k6/loader"
"github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/output/cloud"
"github.com/loadimpact/k6/output/json"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/cloud"
"github.com/loadimpact/k6/stats/csv"
"github.com/loadimpact/k6/stats/datadog"
"github.com/loadimpact/k6/stats/influxdb"
Expand All @@ -49,7 +47,8 @@ import (
func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) {
// Start with the built-in outputs
result := map[string]func(output.Params) (output.Output, error){
"json": json.New,
"json": json.New,
"cloud": cloud.New,

// TODO: remove all of these
"influxdb": func(params output.Params) (output.Output, error) {
Expand All @@ -61,20 +60,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, influxc)
},
"cloud": func(params output.Params) (output.Output, error) {
conf, err := cloudapi.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
if err != nil {
return nil, err
}
cloudc, err := cloud.New(
params.Logger, conf, params.ScriptPath, params.ScriptOptions, params.ExecutionPlan, consts.Version,
)
if err != nil {
return nil, err
}
return newCollectorAdapter(params, cloudc)
return newCollectorAdapter(params, influxc), nil
},
"kafka": func(params output.Params) (output.Output, error) {
conf, err := kafka.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
Expand All @@ -85,7 +71,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, kafkac)
return newCollectorAdapter(params, kafkac), nil
},
"statsd": func(params output.Params) (output.Output, error) {
conf, err := statsd.GetConsolidatedConfig(params.JSONConfig, params.Environment)
Expand All @@ -96,7 +82,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, statsdc)
return newCollectorAdapter(params, statsdc), nil
},
"datadog": func(params output.Params) (output.Output, error) {
conf, err := datadog.GetConsolidatedConfig(params.JSONConfig, params.Environment)
Expand All @@ -107,7 +93,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, datadogc)
return newCollectorAdapter(params, datadogc), nil
},
"csv": func(params output.Params) (output.Output, error) {
conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
Expand All @@ -118,7 +104,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
if err != nil {
return nil, err
}
return newCollectorAdapter(params, csvc)
return newCollectorAdapter(params, csvc), nil
},
}

Expand Down Expand Up @@ -202,27 +188,12 @@ func parseOutputArgument(s string) (t, arg string) {

// TODO: remove this after we transition every collector to the output interface

func newCollectorAdapter(params output.Params, collector lib.Collector) (output.Output, error) {
// Check if all required tags are present
missingRequiredTags := []string{}
requiredTags := collector.GetRequiredSystemTags()
for _, tag := range stats.SystemTagSetValues() {
if requiredTags.Has(tag) && !params.ScriptOptions.SystemTags.Has(tag) {
missingRequiredTags = append(missingRequiredTags, tag.String())
}
}
if len(missingRequiredTags) > 0 {
return nil, fmt.Errorf(
"the specified output '%s' needs the following system tags enabled: %s",
params.OutputType, strings.Join(missingRequiredTags, ", "),
)
}

func newCollectorAdapter(params output.Params, collector lib.Collector) output.Output {
return &collectorAdapter{
outputType: params.OutputType,
collector: collector,
stopCh: make(chan struct{}),
}, nil
}
}

// collectorAdapter is a _temporary_ fix until we move all of the old
Expand Down Expand Up @@ -259,15 +230,9 @@ func (ca *collectorAdapter) AddMetricSamples(samples []stats.SampleContainer) {
ca.collector.Collect(samples)
}

func (ca *collectorAdapter) SetRunStatus(latestStatus lib.RunStatus) {
ca.collector.SetRunStatus(latestStatus)
}

// Stop implements the new output interface.
func (ca *collectorAdapter) Stop() error {
ca.runCtxCancel()
<-ca.stopCh
return nil
}

var _ output.WithRunStatusUpdates = &collectorAdapter{}
6 changes: 0 additions & 6 deletions lib/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,4 @@ type Collector interface {

// Optionally return a link that is shown to the user.
Link() string

// Return the required system sample tags for the specific collector
GetRequiredSystemTags() stats.SystemTagSet

// Set run status
SetRunStatus(status RunStatus)
}
43 changes: 24 additions & 19 deletions stats/cloud/bench_test.go → output/cloud/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package cloud
import (
"bytes"
"compress/gzip"
json "encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -36,26 +37,25 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/cloudapi"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/netext/httpext"
"github.com/loadimpact/k6/lib/testutils"
"github.com/loadimpact/k6/lib/testutils/httpmultibin"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/stats"
)

func BenchmarkAggregateHTTP(b *testing.B) {
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}

config := cloudapi.NewConfig().Apply(cloudapi.Config{
NoCompress: null.BoolFrom(true),
AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200),
AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200),
collector, err := newOutput(output.Params{
Logger: testutils.NewLogger(b),
JSONConfig: json.RawMessage(`{"noCompress": true, "aggregationCalcInterval": "200ms","aggregationPeriod": "200ms"}`),
ScriptOptions: lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
SystemTags: &stats.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
})
collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(b, err)
now := time.Now()
collector.referenceID = "something"
Expand All @@ -79,7 +79,7 @@ func BenchmarkAggregateHTTP(b *testing.B) {
tags := generateTags(i, tagCount, map[string]string{"status": status})
container[i-1] = generateHTTPExtTrail(now, time.Duration(i), tags)
}
collector.Collect(container)
collector.AddMetricSamples(container)
b.StartTimer()
collector.aggregateHTTPTrails(time.Millisecond * 200)
collector.bufferSamples = nil
Expand Down Expand Up @@ -289,9 +289,6 @@ func generateHTTPExtTrail(now time.Time, i time.Duration, tags *stats.SampleTags
}

func BenchmarkHTTPPush(b *testing.B) {
options := lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
}
tb := httpmultibin.NewHTTPMultiBin(b)
tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := fmt.Fprint(w, `{
Expand All @@ -307,12 +304,20 @@ func BenchmarkHTTPPush(b *testing.B) {
},
)

config := cloudapi.NewConfig().Apply(cloudapi.Config{
Host: null.StringFrom(tb.ServerHTTP.URL),
AggregationCalcInterval: types.NullDurationFrom(time.Millisecond * 200),
AggregationPeriod: types.NullDurationFrom(time.Millisecond * 200),
collector, err := newOutput(output.Params{
Logger: testutils.NewLogger(b),
JSONConfig: json.RawMessage(fmt.Sprintf(`{
"host": "%s",
"noCompress": true,
"aggregationCalcInterval": "200ms",
"aggregationPeriod": "200ms"
}`, tb.ServerHTTP.URL)),
ScriptOptions: lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
SystemTags: &stats.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
})
collector, err := New(testutils.NewLogger(b), config, &url.URL{Path: "/script.js"}, options, []lib.ExecutionStep{}, "1.0")
require.NoError(b, err)
collector.referenceID = "fake"

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit a67b3a1

Please sign in to comment.