diff --git a/cmd/cloud.go b/cmd/cloud.go index 502ff72e410..de279baa5bf 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -22,11 +22,14 @@ package cmd import ( "bytes" + "context" "encoding/json" "fmt" "os" "os/signal" "path/filepath" + "strconv" + "sync" "syscall" "time" @@ -51,7 +54,10 @@ const ( ) //nolint:gochecknoglobals -var exitOnRunning = os.Getenv("K6_EXIT_ON_RUNNING") != "" +var ( + exitOnRunning = os.Getenv("K6_EXIT_ON_RUNNING") != "" + showCloudLogs = true +) //nolint:gochecknoglobals var cloudCmd = &cobra.Command{ @@ -59,11 +65,26 @@ var cloudCmd = &cobra.Command{ Short: "Run a test on the cloud", Long: `Run a test on the cloud. -This will execute the test on the Load Impact cloud service. Use "k6 login cloud" to authenticate.`, +This will execute the test on the k6 cloud service. Use "k6 login cloud" to authenticate.`, Example: ` k6 cloud script.js`[1:], Args: exactArgsWithMsg(1, "arg should either be \"-\", if reading script from stdin, or a path to a script file"), RunE: func(cmd *cobra.Command, args []string) error { + // TODO: don't use the Global logger + logger := logrus.StandardLogger() + // we specifically first parse it and return an error if it has bad value and then check if + // we are going to set it ... so we always parse it instead of it breaking the command if + // the cli flag is removed + if showCloudLogsEnv, ok := os.LookupEnv("K6_SHOW_CLOUD_LOGS"); ok { + showCloudLogsValue, err := strconv.ParseBool(showCloudLogsEnv) + if err != nil { + return fmt.Errorf("parsing K6_SHOW_CLOUD_LOGS returned an error: %w", err) + } + if !cmd.Flags().Changed("show-logs") { + showCloudLogs = showCloudLogsValue + } + + } // TODO: disable in quiet mode? _, _ = BannerColor.Fprintf(stdout, "\n%s\n\n", consts.Banner()) @@ -81,8 +102,6 @@ This will execute the test on the Load Impact cloud service. Use "k6 login cloud filename := args[0] filesystems := loader.CreateFilesystems() - // TODO: don't use the Global logger - logger := logrus.StandardLogger() src, err := loader.ReadSource(logger, filename, pwd, filesystems, os.Stdin) if err != nil { return err @@ -207,6 +226,16 @@ This will execute the test on the Load Impact cloud service. Use "k6 login cloud pb.WithConstProgress(0, "Initializing the cloud test"), ) + progressCtx, progressCancel := context.WithCancel(context.Background()) + progressBarWG := &sync.WaitGroup{} + progressBarWG.Add(1) + defer progressBarWG.Wait() + defer progressCancel() + go func() { + showProgress(progressCtx, conf, []*pb.ProgressBar{progressBar}, logger) + progressBarWG.Done() + }() + // The quiet option hides the progress bar and disallow aborting the test if quiet { return nil @@ -247,6 +276,15 @@ This will execute the test on the Load Impact cloud service. Use "k6 login cloud var progressErr error ticker := time.NewTicker(time.Millisecond * 2000) shouldExitLoop := false + if showCloudLogs { + go func() { + logger.Debug("Connecting to cloud logs server...") + // TODO replace with another context + if err := cloudConfig.StreamLogsToLogger(context.Background(), logger, refID, 0); err != nil { + logger.WithError(err).Error("error while tailing cloud logs") + } + }() + } runningLoop: for { @@ -257,7 +295,6 @@ This will execute the test on the Load Impact cloud service. Use "k6 login cloud if (testProgress.RunStatus > lib.RunStatusRunning) || (exitOnRunning && testProgress.RunStatus == lib.RunStatusRunning) { shouldExitLoop = true } - printBar(progressBar) } else { logger.WithError(progressErr).Error("Test progress error") } @@ -305,6 +342,10 @@ func cloudCmdFlagSet() *pflag.FlagSet { // K6_EXIT_ON_RUNNING=true won't affect the usage message flags.Lookup("exit-on-running").DefValue = "false" + // read the comments above for explanation why this is done this way and what are the problems + flags.BoolVar(&showCloudLogs, "show-logs", showCloudLogs, + "enable showing of logs when a test is executed in the cloud") + return flags } diff --git a/cmd/run.go b/cmd/run.go index 02f80725ab9..124a049a117 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -187,7 +187,11 @@ a commandline interface for interacting with it.`, progressBarWG := &sync.WaitGroup{} progressBarWG.Add(1) go func() { - showProgress(progressCtx, conf, execScheduler, logger) + pbs := []*pb.ProgressBar{execScheduler.GetInitProgressBar()} + for _, s := range execScheduler.GetExecutors() { + pbs = append(pbs, s.GetProgress()) + } + showProgress(progressCtx, conf, pbs, logger) progressBarWG.Done() }() diff --git a/cmd/ui.go b/cmd/ui.go index e0042cf51ce..424844e015b 100644 --- a/cmd/ui.go +++ b/cmd/ui.go @@ -35,7 +35,6 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh/terminal" - "github.com/loadimpact/k6/core/local" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/ui" "github.com/loadimpact/k6/ui/pb" @@ -242,17 +241,12 @@ func renderMultipleBars( // nolint:funlen func showProgress( ctx context.Context, conf Config, - execScheduler *local.ExecutionScheduler, logger *logrus.Logger, + pbs []*pb.ProgressBar, logger *logrus.Logger, ) { - if quiet || conf.HTTPDebug.Valid && conf.HTTPDebug.String != "" { + if quiet { return } - pbs := []*pb.ProgressBar{execScheduler.GetInitProgressBar()} - for _, s := range execScheduler.GetExecutors() { - pbs = append(pbs, s.GetProgress()) - } - var errTermGetSize bool termWidth := defaultTermWidth if stdoutTTY { diff --git a/stats/cloud/cloud_easyjson.go b/stats/cloud/cloud_easyjson.go index 41b733fce78..d0d5a75d476 100644 --- a/stats/cloud/cloud_easyjson.go +++ b/stats/cloud/cloud_easyjson.go @@ -82,7 +82,362 @@ func (v samples) MarshalEasyJSON(w *jwriter.Writer) { func (v *samples) UnmarshalEasyJSON(l *jlexer.Lexer) { easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud(l, v) } -func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud1(in *jlexer.Lexer, out *SampleDataSingle) { +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud1(in *jlexer.Lexer, out *msgStreams) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "stream": + if in.IsNull() { + in.Skip() + } else { + in.Delim('{') + out.Stream = make(map[string]string) + for !in.IsDelim('}') { + key := string(in.String()) + in.WantColon() + var v4 string + v4 = string(in.String()) + (out.Stream)[key] = v4 + in.WantComma() + } + in.Delim('}') + } + case "values": + if in.IsNull() { + in.Skip() + out.Values = nil + } else { + in.Delim('[') + if out.Values == nil { + if !in.IsDelim(']') { + out.Values = make([][2]string, 0, 2) + } else { + out.Values = [][2]string{} + } + } else { + out.Values = (out.Values)[:0] + } + for !in.IsDelim(']') { + var v5 [2]string + if in.IsNull() { + in.Skip() + } else { + in.Delim('[') + v6 := 0 + for !in.IsDelim(']') { + if v6 < 2 { + (v5)[v6] = string(in.String()) + v6++ + } else { + in.SkipRecursive() + } + in.WantComma() + } + in.Delim(']') + } + out.Values = append(out.Values, v5) + in.WantComma() + } + in.Delim(']') + } + default: + in.SkipRecursive() + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud1(out *jwriter.Writer, in msgStreams) { + out.RawByte('{') + first := true + _ = first + { + const prefix string = ",\"stream\":" + out.RawString(prefix[1:]) + if in.Stream == nil && (out.Flags&jwriter.NilMapAsEmpty) == 0 { + out.RawString(`null`) + } else { + out.RawByte('{') + v7First := true + for v7Name, v7Value := range in.Stream { + if v7First { + v7First = false + } else { + out.RawByte(',') + } + out.String(string(v7Name)) + out.RawByte(':') + out.String(string(v7Value)) + } + out.RawByte('}') + } + } + { + const prefix string = ",\"values\":" + out.RawString(prefix) + if in.Values == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { + out.RawString("null") + } else { + out.RawByte('[') + for v8, v9 := range in.Values { + if v8 > 0 { + out.RawByte(',') + } + out.RawByte('[') + for v10 := range v9 { + if v10 > 0 { + out.RawByte(',') + } + out.String(string((v9)[v10])) + } + out.RawByte(']') + } + out.RawByte(']') + } + } + out.RawByte('}') +} + +// MarshalEasyJSON supports easyjson.Marshaler interface +func (v msgStreams) MarshalEasyJSON(w *jwriter.Writer) { + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud1(w, v) +} + +// UnmarshalEasyJSON supports easyjson.Unmarshaler interface +func (v *msgStreams) UnmarshalEasyJSON(l *jlexer.Lexer) { + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud1(l, v) +} +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud2(in *jlexer.Lexer, out *msgDroppedEntries) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "labels": + if in.IsNull() { + in.Skip() + } else { + in.Delim('{') + out.Labels = make(map[string]string) + for !in.IsDelim('}') { + key := string(in.String()) + in.WantColon() + var v11 string + v11 = string(in.String()) + (out.Labels)[key] = v11 + in.WantComma() + } + in.Delim('}') + } + case "timestamp": + out.Timestamp = string(in.String()) + default: + in.SkipRecursive() + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud2(out *jwriter.Writer, in msgDroppedEntries) { + out.RawByte('{') + first := true + _ = first + { + const prefix string = ",\"labels\":" + out.RawString(prefix[1:]) + if in.Labels == nil && (out.Flags&jwriter.NilMapAsEmpty) == 0 { + out.RawString(`null`) + } else { + out.RawByte('{') + v12First := true + for v12Name, v12Value := range in.Labels { + if v12First { + v12First = false + } else { + out.RawByte(',') + } + out.String(string(v12Name)) + out.RawByte(':') + out.String(string(v12Value)) + } + out.RawByte('}') + } + } + { + const prefix string = ",\"timestamp\":" + out.RawString(prefix) + out.String(string(in.Timestamp)) + } + out.RawByte('}') +} + +// MarshalEasyJSON supports easyjson.Marshaler interface +func (v msgDroppedEntries) MarshalEasyJSON(w *jwriter.Writer) { + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud2(w, v) +} + +// UnmarshalEasyJSON supports easyjson.Unmarshaler interface +func (v *msgDroppedEntries) UnmarshalEasyJSON(l *jlexer.Lexer) { + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud2(l, v) +} +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud3(in *jlexer.Lexer, out *msg) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "streams": + if in.IsNull() { + in.Skip() + out.Streams = nil + } else { + in.Delim('[') + if out.Streams == nil { + if !in.IsDelim(']') { + out.Streams = make([]msgStreams, 0, 2) + } else { + out.Streams = []msgStreams{} + } + } else { + out.Streams = (out.Streams)[:0] + } + for !in.IsDelim(']') { + var v13 msgStreams + (v13).UnmarshalEasyJSON(in) + out.Streams = append(out.Streams, v13) + in.WantComma() + } + in.Delim(']') + } + case "dropped_entries": + if in.IsNull() { + in.Skip() + out.DroppedEntries = nil + } else { + in.Delim('[') + if out.DroppedEntries == nil { + if !in.IsDelim(']') { + out.DroppedEntries = make([]msgDroppedEntries, 0, 2) + } else { + out.DroppedEntries = []msgDroppedEntries{} + } + } else { + out.DroppedEntries = (out.DroppedEntries)[:0] + } + for !in.IsDelim(']') { + var v14 msgDroppedEntries + (v14).UnmarshalEasyJSON(in) + out.DroppedEntries = append(out.DroppedEntries, v14) + in.WantComma() + } + in.Delim(']') + } + default: + in.SkipRecursive() + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud3(out *jwriter.Writer, in msg) { + out.RawByte('{') + first := true + _ = first + { + const prefix string = ",\"streams\":" + out.RawString(prefix[1:]) + if in.Streams == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { + out.RawString("null") + } else { + out.RawByte('[') + for v15, v16 := range in.Streams { + if v15 > 0 { + out.RawByte(',') + } + (v16).MarshalEasyJSON(out) + } + out.RawByte(']') + } + } + { + const prefix string = ",\"dropped_entries\":" + out.RawString(prefix) + if in.DroppedEntries == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { + out.RawString("null") + } else { + out.RawByte('[') + for v17, v18 := range in.DroppedEntries { + if v17 > 0 { + out.RawByte(',') + } + (v18).MarshalEasyJSON(out) + } + out.RawByte(']') + } + } + out.RawByte('}') +} + +// MarshalEasyJSON supports easyjson.Marshaler interface +func (v msg) MarshalEasyJSON(w *jwriter.Writer) { + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud3(w, v) +} + +// UnmarshalEasyJSON supports easyjson.Unmarshaler interface +func (v *msg) UnmarshalEasyJSON(l *jlexer.Lexer) { + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud3(l, v) +} +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in *jlexer.Lexer, out *SampleDataSingle) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -131,7 +486,7 @@ func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud1(in *jlexer.Lexer, ou in.Consumed() } } -func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud1(out *jwriter.Writer, in SampleDataSingle) { +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out *jwriter.Writer, in SampleDataSingle) { out.RawByte('{') first := true _ = first @@ -160,14 +515,14 @@ func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud1(out *jwriter.Writer, // MarshalEasyJSON supports easyjson.Marshaler interface func (v SampleDataSingle) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud1(w, v) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(w, v) } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *SampleDataSingle) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud1(l, v) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(l, v) } -func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud2(in *jlexer.Lexer, out *SampleDataMap) { +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud5(in *jlexer.Lexer, out *SampleDataMap) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -217,9 +572,9 @@ func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud2(in *jlexer.Lexer, ou for !in.IsDelim('}') { key := string(in.String()) in.WantColon() - var v4 float64 - v4 = float64(in.Float64()) - (out.Values)[key] = v4 + var v19 float64 + v19 = float64(in.Float64()) + (out.Values)[key] = v19 in.WantComma() } in.Delim('}') @@ -234,7 +589,7 @@ func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud2(in *jlexer.Lexer, ou in.Consumed() } } -func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud2(out *jwriter.Writer, in SampleDataMap) { +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud5(out *jwriter.Writer, in SampleDataMap) { out.RawByte('{') first := true _ = first @@ -258,16 +613,16 @@ func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud2(out *jwriter.Writer, out.RawString(prefix) { out.RawByte('{') - v5First := true - for v5Name, v5Value := range in.Values { - if v5First { - v5First = false + v20First := true + for v20Name, v20Value := range in.Values { + if v20First { + v20First = false } else { out.RawByte(',') } - out.String(string(v5Name)) + out.String(string(v20Name)) out.RawByte(':') - out.Float64(float64(v5Value)) + out.Float64(float64(v20Value)) } out.RawByte('}') } @@ -277,14 +632,14 @@ func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud2(out *jwriter.Writer, // MarshalEasyJSON supports easyjson.Marshaler interface func (v SampleDataMap) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud2(w, v) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud5(w, v) } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *SampleDataMap) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud2(l, v) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud5(l, v) } -func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud3(in *jlexer.Lexer, out *SampleDataAggregatedHTTPReqs) { +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud6(in *jlexer.Lexer, out *SampleDataAggregatedHTTPReqs) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -333,7 +688,7 @@ func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud3(in *jlexer.Lexer, ou in.Consumed() } } -func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud3(out *jwriter.Writer, in SampleDataAggregatedHTTPReqs) { +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud6(out *jwriter.Writer, in SampleDataAggregatedHTTPReqs) { out.RawByte('{') first := true _ = first @@ -367,12 +722,12 @@ func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud3(out *jwriter.Writer, // MarshalEasyJSON supports easyjson.Marshaler interface func (v SampleDataAggregatedHTTPReqs) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud3(w, v) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud6(w, v) } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *SampleDataAggregatedHTTPReqs) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud3(l, v) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud6(l, v) } func easyjson9def2ecdDecode(in *jlexer.Lexer, out *struct { Duration AggregatedMetric `json:"http_req_duration"` @@ -402,19 +757,19 @@ func easyjson9def2ecdDecode(in *jlexer.Lexer, out *struct { } switch key { case "http_req_duration": - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in, &out.Duration) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in, &out.Duration) case "http_req_blocked": - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in, &out.Blocked) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in, &out.Blocked) case "http_req_connecting": - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in, &out.Connecting) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in, &out.Connecting) case "http_req_tls_handshaking": - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in, &out.TLSHandshaking) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in, &out.TLSHandshaking) case "http_req_sending": - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in, &out.Sending) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in, &out.Sending) case "http_req_waiting": - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in, &out.Waiting) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in, &out.Waiting) case "http_req_receiving": - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in, &out.Receiving) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in, &out.Receiving) default: in.SkipRecursive() } @@ -440,41 +795,41 @@ func easyjson9def2ecdEncode(out *jwriter.Writer, in struct { { const prefix string = ",\"http_req_duration\":" out.RawString(prefix[1:]) - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out, in.Duration) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out, in.Duration) } { const prefix string = ",\"http_req_blocked\":" out.RawString(prefix) - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out, in.Blocked) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out, in.Blocked) } { const prefix string = ",\"http_req_connecting\":" out.RawString(prefix) - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out, in.Connecting) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out, in.Connecting) } { const prefix string = ",\"http_req_tls_handshaking\":" out.RawString(prefix) - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out, in.TLSHandshaking) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out, in.TLSHandshaking) } { const prefix string = ",\"http_req_sending\":" out.RawString(prefix) - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out, in.Sending) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out, in.Sending) } { const prefix string = ",\"http_req_waiting\":" out.RawString(prefix) - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out, in.Waiting) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out, in.Waiting) } { const prefix string = ",\"http_req_receiving\":" out.RawString(prefix) - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out, in.Receiving) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out, in.Receiving) } out.RawByte('}') } -func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in *jlexer.Lexer, out *AggregatedMetric) { +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud7(in *jlexer.Lexer, out *AggregatedMetric) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -509,7 +864,7 @@ func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud4(in *jlexer.Lexer, ou in.Consumed() } } -func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out *jwriter.Writer, in AggregatedMetric) { +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud7(out *jwriter.Writer, in AggregatedMetric) { out.RawByte('{') first := true _ = first @@ -530,7 +885,7 @@ func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud4(out *jwriter.Writer, } out.RawByte('}') } -func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud5(in *jlexer.Lexer, out *Sample) { +func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud8(in *jlexer.Lexer, out *Sample) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -571,7 +926,7 @@ func easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud5(in *jlexer.Lexer, ou in.Consumed() } } -func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud5(out *jwriter.Writer, in Sample) { +func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud8(out *jwriter.Writer, in Sample) { out.RawByte('{') first := true _ = first @@ -601,10 +956,10 @@ func easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud5(out *jwriter.Writer, // MarshalEasyJSON supports easyjson.Marshaler interface func (v Sample) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud5(w, v) + easyjson9def2ecdEncodeGithubComLoadimpactK6StatsCloud8(w, v) } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *Sample) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud5(l, v) + easyjson9def2ecdDecodeGithubComLoadimpactK6StatsCloud8(l, v) } diff --git a/stats/cloud/config.go b/stats/cloud/config.go index f74367b7af6..1c13f168f4f 100644 --- a/stats/cloud/config.go +++ b/stats/cloud/config.go @@ -38,6 +38,7 @@ type Config struct { Name null.String `json:"name" envconfig:"K6_CLOUD_NAME"` Host null.String `json:"host" envconfig:"K6_CLOUD_HOST"` + LogsHost null.String `json:"-" envconfig:"K6_CLOUD_LOGS_HOST"` PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"` WebAppURL null.String `json:"webAppURL" envconfig:"K6_CLOUD_WEB_APP_URL"` NoCompress null.Bool `json:"noCompress" envconfig:"K6_CLOUD_NO_COMPRESS"` @@ -156,6 +157,7 @@ type Config struct { func NewConfig() Config { return Config{ Host: null.NewString("https://ingest.k6.io", false), + LogsHost: null.NewString("wss://cloudlogs.k6.io/api/v1/tail", false), WebAppURL: null.NewString("https://app.k6.io", false), MetricPushInterval: types.NewNullDuration(1*time.Second, false), MetricPushConcurrency: null.NewInt(1, false), @@ -190,6 +192,9 @@ func (c Config) Apply(cfg Config) Config { if cfg.Host.Valid && cfg.Host.String != "" { c.Host = cfg.Host } + if cfg.LogsHost.Valid && cfg.LogsHost.String != "" { + c.LogsHost = cfg.LogsHost + } if cfg.WebAppURL.Valid { c.WebAppURL = cfg.WebAppURL } diff --git a/stats/cloud/logs.go b/stats/cloud/logs.go new file mode 100644 index 00000000000..ffd17de7970 --- /dev/null +++ b/stats/cloud/logs.go @@ -0,0 +1,176 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2020 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package cloud + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/gorilla/websocket" + "github.com/mailru/easyjson" + "github.com/sirupsen/logrus" +) + +//easyjson:json +type msg struct { + Streams []msgStreams `json:"streams"` + DroppedEntries []msgDroppedEntries `json:"dropped_entries"` +} + +//easyjson:json +type msgStreams struct { + Stream map[string]string `json:"stream"` + Values [][2]string `json:"values"` // this can be optimized +} + +//easyjson:json +type msgDroppedEntries struct { + Labels map[string]string `json:"labels"` + Timestamp string `json:"timestamp"` +} + +func (m *msg) Log(logger logrus.FieldLogger) { + var level string + + for _, stream := range m.Streams { + fields := labelsToLogrusFields(stream.Stream) + var ok bool + if level, ok = stream.Stream["level"]; ok { + delete(fields, "level") + } + + for _, value := range stream.Values { + nsec, _ := strconv.Atoi(value[0]) + e := logger.WithFields(fields).WithTime(time.Unix(0, int64(nsec))) + lvl, err := logrus.ParseLevel(level) + if err != nil { + e.Info(value[1]) + e.Warn("last message had unknown level " + level) + } else { + e.Log(lvl, value[1]) + } + } + } + + for _, dropped := range m.DroppedEntries { + nsec, _ := strconv.Atoi(dropped.Timestamp) + logger.WithFields(labelsToLogrusFields(dropped.Labels)).WithTime(time.Unix(0, int64(nsec))).Warn("dropped") + } +} + +func labelsToLogrusFields(labels map[string]string) logrus.Fields { + fields := make(logrus.Fields, len(labels)) + + for key, val := range labels { + fields[key] = val + } + + return fields +} + +func (c *Config) getRequest(referenceID string, start time.Duration) (*url.URL, error) { + u, err := url.Parse(c.LogsHost.String) + if err != nil { + return nil, fmt.Errorf("couldn't parse cloud logs host %w", err) + } + + u.RawQuery = fmt.Sprintf(`query={test_run_id="%s"}&start=%d`, + referenceID, + time.Now().Add(-start).UnixNano(), + ) + + return u, nil +} + +// StreamLogsToLogger streams the logs for the configured test to the provided logger until ctx is +// Done or an error occurs. +func (c *Config) StreamLogsToLogger( + ctx context.Context, logger logrus.FieldLogger, referenceID string, start time.Duration, +) error { + u, err := c.getRequest(referenceID, start) + if err != nil { + return err + } + + headers := make(http.Header) + headers.Add("Sec-WebSocket-Protocol", "token="+c.Token.String) + + // We don't need to close the http body or use it for anything until we want to actually log + // what the server returned as body when it errors out + conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose + if err != nil { + return err + } + + go func() { + <-ctx.Done() + + _ = conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseGoingAway, "closing"), + time.Now().Add(time.Second)) + + _ = conn.Close() + }() + + msgBuffer := make(chan []byte, 10) + + defer close(msgBuffer) + + go func() { + for message := range msgBuffer { + var m msg + err := easyjson.Unmarshal(message, &m) + if err != nil { + logger.WithError(err).Errorf("couldn't unmarshal a message from the cloud: %s", string(message)) + + continue + } + + m.Log(logger) + } + }() + + for { + _, message, err := conn.ReadMessage() + select { // check if we should stop before continuing + case <-ctx.Done(): + return nil + default: + } + + if err != nil { + logger.WithError(err).Warn("error reading a message from the cloud") + + return err + } + + select { + case <-ctx.Done(): + return nil + case msgBuffer <- message: + } + } +} diff --git a/stats/cloud/logs_test.go b/stats/cloud/logs_test.go new file mode 100644 index 00000000000..840829ed71d --- /dev/null +++ b/stats/cloud/logs_test.go @@ -0,0 +1,124 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2020 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package cloud + +import ( + "io/ioutil" + "testing" + "time" + + "github.com/loadimpact/k6/lib/testutils" + "github.com/mailru/easyjson" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMsgParsing(t *testing.T) { + m := `{ + "streams": [ + { + "stream": { + "key1": "value1", + "key2": "value2" + }, + "values": [ + [ + "1598282752000000000", + "something to log" + ] + ] + } + ], + "dropped_entries": [ + { + "labels": { + "key3": "value1", + "key4": "value2" + }, + "timestamp": "1598282752000000000" + } + ] +} +` + expectMsg := msg{ + Streams: []msgStreams{ + { + Stream: map[string]string{"key1": "value1", "key2": "value2"}, + Values: [][2]string{{"1598282752000000000", "something to log"}}, + }, + }, + DroppedEntries: []msgDroppedEntries{ + { + Labels: map[string]string{"key3": "value1", "key4": "value2"}, + Timestamp: "1598282752000000000", + }, + }, + } + var message msg + require.NoError(t, easyjson.Unmarshal([]byte(m), &message)) + require.Equal(t, expectMsg, message) +} + +func TestMSGLog(t *testing.T) { + expectMsg := msg{ + Streams: []msgStreams{ + { + Stream: map[string]string{"key1": "value1", "key2": "value2"}, + Values: [][2]string{{"1598282752000000000", "something to log"}}, + }, + { + Stream: map[string]string{"key1": "value1", "key2": "value2", "level": "warn"}, + Values: [][2]string{{"1598282752000000000", "something else log"}}, + }, + }, + DroppedEntries: []msgDroppedEntries{ + { + Labels: map[string]string{"key3": "value1", "key4": "value2", "level": "panic"}, + Timestamp: "1598282752000000000", + }, + }, + } + + logger := logrus.New() + logger.Out = ioutil.Discard + hook := &testutils.SimpleLogrusHook{HookedLevels: logrus.AllLevels} + logger.AddHook(hook) + expectMsg.Log(logger) + logLines := hook.Drain() + assert.Equal(t, 4, len(logLines)) + expectTime := time.Unix(0, 1598282752000000000) + for i, entry := range logLines { + var expectedMsg string + switch i { + case 0: + expectedMsg = "something to log" + case 1: + expectedMsg = "last message had unknown level " + case 2: + expectedMsg = "something else log" + case 3: + expectedMsg = "dropped" + } + require.Equal(t, expectedMsg, entry.Message) + require.Equal(t, expectTime, entry.Time) + } +}