From f82b364a41aaf1a5731b8d44f77601fbb4eab634 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Tue, 8 Mar 2022 15:02:37 +0200 Subject: [PATCH] Flatten some long functions in cmd/ and enable splitting them apart --- cmd/archive.go | 96 ++++---- cmd/cloud.go | 524 ++++++++++++++++++++++-------------------- cmd/convert.go | 3 +- cmd/inspect.go | 3 +- cmd/login.go | 9 +- cmd/login_cloud.go | 2 +- cmd/login_influxdb.go | 2 +- cmd/pause.go | 2 +- cmd/resume.go | 2 +- cmd/root.go | 19 +- cmd/run.go | 461 +++++++++++++++++++------------------ cmd/scale.go | 2 +- cmd/stats.go | 2 +- cmd/status.go | 2 +- cmd/version.go | 2 +- 15 files changed, 588 insertions(+), 543 deletions(-) diff --git a/cmd/archive.go b/cmd/archive.go index bd57e53468a..9751c09998a 100644 --- a/cmd/archive.go +++ b/cmd/archive.go @@ -25,9 +25,58 @@ import ( "github.com/spf13/pflag" ) -func getArchiveCmd(gs *globalState) *cobra.Command { - archiveOut := "archive.tar" - // archiveCmd represents the archive command +// cmdArchive handles the `k6 archive` sub-command +type cmdArchive struct { + gs *globalState + + archiveOut string +} + +func (c *cmdArchive) run(cmd *cobra.Command, args []string) error { + test, err := loadTest(c.gs, cmd, args, getPartialConfig) + if err != nil { + return err + } + + // It's important to NOT set the derived options back to the runner + // here, only the consolidated ones. Otherwise, if the script used + // an execution shortcut option (e.g. `iterations` or `duration`), + // we will have multiple conflicting execution options since the + // derivation will set `scenarios` as well. + err = test.initRunner.SetOptions(test.consolidatedConfig.Options) + if err != nil { + return err + } + + // Archive. + arc := test.initRunner.MakeArchive() + f, err := c.gs.fs.Create(c.archiveOut) + if err != nil { + return err + } + + err = arc.Write(f) + if cerr := f.Close(); err == nil && cerr != nil { + err = cerr + } + return err +} + +func (c *cmdArchive) flagSet() *pflag.FlagSet { + flags := pflag.NewFlagSet("", pflag.ContinueOnError) + flags.SortFlags = false + flags.AddFlagSet(optionFlagSet()) + flags.AddFlagSet(runtimeOptionFlagSet(false)) + flags.StringVarP(&c.archiveOut, "archive-out", "O", c.archiveOut, "archive output filename") + return flags +} + +func getCmdArchive(gs *globalState) *cobra.Command { + c := &cmdArchive{ + gs: gs, + archiveOut: "archive.tar", + } + archiveCmd := &cobra.Command{ Use: "archive", Short: "Create an archive", @@ -41,48 +90,11 @@ An archive is a fully self-contained test run, and can be executed identically e # Run the resulting archive. k6 run myarchive.tar`[1:], Args: cobra.ExactArgs(1), - RunE: func(cmd *cobra.Command, args []string) error { - test, err := loadTest(gs, cmd, args, getPartialConfig) - if err != nil { - return err - } - - // It's important to NOT set the derived options back to the runner - // here, only the consolidated ones. Otherwise, if the script used - // an execution shortcut option (e.g. `iterations` or `duration`), - // we will have multiple conflicting execution options since the - // derivation will set `scenarios` as well. - err = test.initRunner.SetOptions(test.consolidatedConfig.Options) - if err != nil { - return err - } - - // Archive. - arc := test.initRunner.MakeArchive() - f, err := gs.fs.Create(archiveOut) - if err != nil { - return err - } - - err = arc.Write(f) - if cerr := f.Close(); err == nil && cerr != nil { - err = cerr - } - return err - }, + RunE: c.run, } archiveCmd.Flags().SortFlags = false - archiveCmd.Flags().AddFlagSet(archiveCmdFlagSet(&archiveOut)) + archiveCmd.Flags().AddFlagSet(c.flagSet()) return archiveCmd } - -func archiveCmdFlagSet(archiveOut *string) *pflag.FlagSet { - flags := pflag.NewFlagSet("", pflag.ContinueOnError) - flags.SortFlags = false - flags.AddFlagSet(optionFlagSet()) - flags.AddFlagSet(runtimeOptionFlagSet(false)) - flags.StringVarP(archiveOut, "archive-out", "O", *archiveOut, "archive output filename") - return flags -} diff --git a/cmd/cloud.go b/cmd/cloud.go index b51df6df3a2..c264d719bae 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -44,300 +44,316 @@ import ( "go.k6.io/k6/ui/pb" ) -//nolint:funlen,gocognit,gocyclo,cyclop -func getCloudCmd(globalState *globalState) *cobra.Command { - showCloudLogs := true - exitOnRunning := false +// cmdCloud handles the `k6 cloud` sub-command +type cmdCloud struct { + gs *globalState - cloudCmd := &cobra.Command{ - Use: "cloud", - Short: "Run a test on the cloud", - Long: `Run a test on the cloud. + showCloudLogs bool + exitOnRunning bool +} -This will execute the test on the k6 cloud service. Use "k6 login cloud" to authenticate.`, - Example: ` - k6 cloud script.js`[1:], - Args: exactArgsWithMsg(1, "arg should either be \"-\", if reading script from stdin, or a path to a script file"), - PreRunE: func(cmd *cobra.Command, args []string) error { - // TODO: refactor (https://github.com/loadimpact/k6/issues/883) - // - // We deliberately parse the env variables, to validate for wrong - // values, even if we don't subsequently use them (if the respective - // CLI flag was specified, since it has a higher priority). - if showCloudLogsEnv, ok := globalState.envVars["K6_SHOW_CLOUD_LOGS"]; ok { - showCloudLogsValue, err := strconv.ParseBool(showCloudLogsEnv) - if err != nil { - return fmt.Errorf("parsing K6_SHOW_CLOUD_LOGS returned an error: %w", err) - } - if !cmd.Flags().Changed("show-logs") { - showCloudLogs = showCloudLogsValue - } - } +func (c *cmdCloud) preRun(cmd *cobra.Command, args []string) error { + // TODO: refactor (https://github.com/loadimpact/k6/issues/883) + // + // We deliberately parse the env variables, to validate for wrong + // values, even if we don't subsequently use them (if the respective + // CLI flag was specified, since it has a higher priority). + if showCloudLogsEnv, ok := c.gs.envVars["K6_SHOW_CLOUD_LOGS"]; ok { + showCloudLogsValue, err := strconv.ParseBool(showCloudLogsEnv) + if err != nil { + return fmt.Errorf("parsing K6_SHOW_CLOUD_LOGS returned an error: %w", err) + } + if !cmd.Flags().Changed("show-logs") { + c.showCloudLogs = showCloudLogsValue + } + } - if exitOnRunningEnv, ok := globalState.envVars["K6_EXIT_ON_RUNNING"]; ok { - exitOnRunningValue, err := strconv.ParseBool(exitOnRunningEnv) - if err != nil { - return fmt.Errorf("parsing K6_EXIT_ON_RUNNING returned an error: %w", err) - } - if !cmd.Flags().Changed("exit-on-running") { - exitOnRunning = exitOnRunningValue - } - } + if exitOnRunningEnv, ok := c.gs.envVars["K6_EXIT_ON_RUNNING"]; ok { + exitOnRunningValue, err := strconv.ParseBool(exitOnRunningEnv) + if err != nil { + return fmt.Errorf("parsing K6_EXIT_ON_RUNNING returned an error: %w", err) + } + if !cmd.Flags().Changed("exit-on-running") { + c.exitOnRunning = exitOnRunningValue + } + } - return nil - }, - RunE: func(cmd *cobra.Command, args []string) error { - printBanner(globalState) + return nil +} - progressBar := pb.New( - pb.WithConstLeft("Init"), - pb.WithConstProgress(0, "Loading test script..."), - ) - printBar(globalState, progressBar) +// TODO: split apart some more +// nolint: funlen,gocognit,cyclop +func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { + printBanner(c.gs) - test, err := loadTest(globalState, cmd, args, getPartialConfig) - if err != nil { - return err - } + progressBar := pb.New( + pb.WithConstLeft("Init"), + pb.WithConstProgress(0, "Loading test script..."), + ) + printBar(c.gs, progressBar) - // It's important to NOT set the derived options back to the runner - // here, only the consolidated ones. Otherwise, if the script used - // an execution shortcut option (e.g. `iterations` or `duration`), - // we will have multiple conflicting execution options since the - // derivation will set `scenarios` as well. - err = test.initRunner.SetOptions(test.consolidatedConfig.Options) - if err != nil { - return err - } + test, err := loadTest(c.gs, cmd, args, getPartialConfig) + if err != nil { + return err + } - // TODO: validate for usage of execution segment - // TODO: validate for externally controlled executor (i.e. executors that aren't distributable) - // TODO: move those validations to a separate function and reuse validateConfig()? - - modifyAndPrintBar(globalState, progressBar, pb.WithConstProgress(0, "Building the archive...")) - arc := test.initRunner.MakeArchive() - - // TODO: Fix this - // We reuse cloud.Config for parsing options.ext.loadimpact, but this probably shouldn't be - // done, as the idea of options.ext is that they are extensible without touching k6. But in - // order for this to happen, we shouldn't actually marshall cloud.Config on top of it, because - // it will be missing some fields that aren't actually mentioned in the struct. - // So in order for use to copy the fields that we need for loadimpact's api we unmarshal in - // map[string]interface{} and copy what we need if it isn't set already - var tmpCloudConfig map[string]interface{} - if val, ok := arc.Options.External["loadimpact"]; ok { - dec := json.NewDecoder(bytes.NewReader(val)) - dec.UseNumber() // otherwise float64 are used - if err = dec.Decode(&tmpCloudConfig); err != nil { - return err - } - } + // It's important to NOT set the derived options back to the runner + // here, only the consolidated ones. Otherwise, if the script used + // an execution shortcut option (e.g. `iterations` or `duration`), + // we will have multiple conflicting execution options since the + // derivation will set `scenarios` as well. + err = test.initRunner.SetOptions(test.consolidatedConfig.Options) + if err != nil { + return err + } - // Cloud config - cloudConfig, err := cloudapi.GetConsolidatedConfig( - test.derivedConfig.Collectors["cloud"], globalState.envVars, "", arc.Options.External) - if err != nil { - return err - } - if !cloudConfig.Token.Valid { - return errors.New("Not logged in, please use `k6 login cloud`.") //nolint:golint,revive,stylecheck - } - if tmpCloudConfig == nil { - tmpCloudConfig = make(map[string]interface{}, 3) - } + // TODO: validate for usage of execution segment + // TODO: validate for externally controlled executor (i.e. executors that aren't distributable) + // TODO: move those validations to a separate function and reuse validateConfig()? + + modifyAndPrintBar(c.gs, progressBar, pb.WithConstProgress(0, "Building the archive...")) + arc := test.initRunner.MakeArchive() + + // TODO: Fix this + // We reuse cloud.Config for parsing options.ext.loadimpact, but this probably shouldn't be + // done, as the idea of options.ext is that they are extensible without touching k6. But in + // order for this to happen, we shouldn't actually marshall cloud.Config on top of it, because + // it will be missing some fields that aren't actually mentioned in the struct. + // So in order for use to copy the fields that we need for loadimpact's api we unmarshal in + // map[string]interface{} and copy what we need if it isn't set already + var tmpCloudConfig map[string]interface{} + if val, ok := arc.Options.External["loadimpact"]; ok { + dec := json.NewDecoder(bytes.NewReader(val)) + dec.UseNumber() // otherwise float64 are used + if err = dec.Decode(&tmpCloudConfig); err != nil { + return err + } + } - if cloudConfig.Token.Valid { - tmpCloudConfig["token"] = cloudConfig.Token - } - if cloudConfig.Name.Valid { - tmpCloudConfig["name"] = cloudConfig.Name - } - if cloudConfig.ProjectID.Valid { - tmpCloudConfig["projectID"] = cloudConfig.ProjectID - } + // Cloud config + cloudConfig, err := cloudapi.GetConsolidatedConfig( + test.derivedConfig.Collectors["cloud"], c.gs.envVars, "", arc.Options.External) + if err != nil { + return err + } + if !cloudConfig.Token.Valid { + return errors.New("Not logged in, please use `k6 login cloud`.") //nolint:golint,revive,stylecheck + } + if tmpCloudConfig == nil { + tmpCloudConfig = make(map[string]interface{}, 3) + } - if arc.Options.External == nil { - arc.Options.External = make(map[string]json.RawMessage) - } - arc.Options.External["loadimpact"], err = json.Marshal(tmpCloudConfig) - if err != nil { - return err - } + if cloudConfig.Token.Valid { + tmpCloudConfig["token"] = cloudConfig.Token + } + if cloudConfig.Name.Valid { + tmpCloudConfig["name"] = cloudConfig.Name + } + if cloudConfig.ProjectID.Valid { + tmpCloudConfig["projectID"] = cloudConfig.ProjectID + } - name := cloudConfig.Name.String - if !cloudConfig.Name.Valid || cloudConfig.Name.String == "" { - name = filepath.Base(test.testPath) - } + if arc.Options.External == nil { + arc.Options.External = make(map[string]json.RawMessage) + } + arc.Options.External["loadimpact"], err = json.Marshal(tmpCloudConfig) + if err != nil { + return err + } - globalCtx, globalCancel := context.WithCancel(globalState.ctx) - defer globalCancel() + name := cloudConfig.Name.String + if !cloudConfig.Name.Valid || cloudConfig.Name.String == "" { + name = filepath.Base(test.testPath) + } - logger := globalState.logger + globalCtx, globalCancel := context.WithCancel(c.gs.ctx) + defer globalCancel() - // Start cloud test run - modifyAndPrintBar(globalState, progressBar, pb.WithConstProgress(0, "Validating script options")) - client := cloudapi.NewClient( - logger, cloudConfig.Token.String, cloudConfig.Host.String, consts.Version, cloudConfig.Timeout.TimeDuration()) - if err = client.ValidateOptions(arc.Options); err != nil { - return err - } + logger := c.gs.logger - modifyAndPrintBar(globalState, progressBar, pb.WithConstProgress(0, "Uploading archive")) - refID, err := client.StartCloudTestRun(name, cloudConfig.ProjectID.Int64, arc) - if err != nil { - return err - } + // Start cloud test run + modifyAndPrintBar(c.gs, progressBar, pb.WithConstProgress(0, "Validating script options")) + client := cloudapi.NewClient( + logger, cloudConfig.Token.String, cloudConfig.Host.String, consts.Version, cloudConfig.Timeout.TimeDuration()) + if err = client.ValidateOptions(arc.Options); err != nil { + return err + } - // Trap Interrupts, SIGINTs and SIGTERMs. - gracefulStop := func(sig os.Signal) { - logger.WithField("sig", sig).Print("Stopping cloud test run in response to signal...") - // Do this in a separate goroutine so that if it blocks, the - // second signal can still abort the process execution. - go func() { - stopErr := client.StopCloudTestRun(refID) - if stopErr != nil { - logger.WithError(stopErr).Error("Stop cloud test error") - } else { - logger.Info("Successfully sent signal to stop the cloud test, now waiting for it to actually stop...") - } - globalCancel() - }() - } - hardStop := func(sig os.Signal) { - logger.WithField("sig", sig).Error("Aborting k6 in response to signal, we won't wait for the test to end.") - } - stopSignalHandling := handleTestAbortSignals(globalState, gracefulStop, hardStop) - defer stopSignalHandling() + modifyAndPrintBar(c.gs, progressBar, pb.WithConstProgress(0, "Uploading archive")) + refID, err := client.StartCloudTestRun(name, cloudConfig.ProjectID.Int64, arc) + if err != nil { + return err + } - et, err := lib.NewExecutionTuple(test.derivedConfig.ExecutionSegment, test.derivedConfig.ExecutionSegmentSequence) - if err != nil { - return err - } - testURL := cloudapi.URLForResults(refID, cloudConfig) - executionPlan := test.derivedConfig.Scenarios.GetFullExecutionRequirements(et) - printExecutionDescription( - globalState, "cloud", test.testPath, testURL, test.derivedConfig, et, executionPlan, nil, - ) - - modifyAndPrintBar( - globalState, progressBar, - pb.WithConstLeft("Run "), pb.WithConstProgress(0, "Initializing the cloud test"), - ) - - progressCtx, progressCancel := context.WithCancel(globalCtx) - progressBarWG := &sync.WaitGroup{} - progressBarWG.Add(1) - defer progressBarWG.Wait() - defer progressCancel() - go func() { - showProgress(progressCtx, globalState, []*pb.ProgressBar{progressBar}, logger) - progressBarWG.Done() - }() - - var ( - startTime time.Time - maxDuration time.Duration - ) - maxDuration, _ = lib.GetEndOffset(executionPlan) - - testProgressLock := &sync.Mutex{} - var testProgress *cloudapi.TestProgressResponse - progressBar.Modify( - pb.WithProgress(func() (float64, []string) { - testProgressLock.Lock() - defer testProgressLock.Unlock() - - if testProgress == nil { - return 0, []string{"Waiting..."} - } - - statusText := testProgress.RunStatusText - - if testProgress.RunStatus == lib.RunStatusFinished { - testProgress.Progress = 1 - } else if testProgress.RunStatus == lib.RunStatusRunning { - if startTime.IsZero() { - startTime = time.Now() - } - spent := time.Since(startTime) - if spent > maxDuration { - statusText = maxDuration.String() - } else { - statusText = fmt.Sprintf("%s/%s", pb.GetFixedLengthDuration(spent, maxDuration), maxDuration) - } - } - - return testProgress.Progress, []string{statusText} - }), - ) - - ticker := time.NewTicker(time.Millisecond * 2000) - if showCloudLogs { - go func() { - logger.Debug("Connecting to cloud logs server...") - if err := cloudConfig.StreamLogsToLogger(globalCtx, logger, refID, 0); err != nil { - logger.WithError(err).Error("error while tailing cloud logs") - } - }() + // Trap Interrupts, SIGINTs and SIGTERMs. + gracefulStop := func(sig os.Signal) { + logger.WithField("sig", sig).Print("Stopping cloud test run in response to signal...") + // Do this in a separate goroutine so that if it blocks, the + // second signal can still abort the process execution. + go func() { + stopErr := client.StopCloudTestRun(refID) + if stopErr != nil { + logger.WithError(stopErr).Error("Stop cloud test error") + } else { + logger.Info("Successfully sent signal to stop the cloud test, now waiting for it to actually stop...") } + globalCancel() + }() + } + hardStop := func(sig os.Signal) { + logger.WithField("sig", sig).Error("Aborting k6 in response to signal, we won't wait for the test to end.") + } + stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, hardStop) + defer stopSignalHandling() - for range ticker.C { - newTestProgress, progressErr := client.GetTestProgress(refID) - if progressErr != nil { - logger.WithError(progressErr).Error("Test progress error") - continue - } + et, err := lib.NewExecutionTuple(test.derivedConfig.ExecutionSegment, test.derivedConfig.ExecutionSegmentSequence) + if err != nil { + return err + } + testURL := cloudapi.URLForResults(refID, cloudConfig) + executionPlan := test.derivedConfig.Scenarios.GetFullExecutionRequirements(et) + printExecutionDescription( + c.gs, "cloud", test.testPath, testURL, test.derivedConfig, et, executionPlan, nil, + ) + + modifyAndPrintBar( + c.gs, progressBar, + pb.WithConstLeft("Run "), pb.WithConstProgress(0, "Initializing the cloud test"), + ) + + progressCtx, progressCancel := context.WithCancel(globalCtx) + progressBarWG := &sync.WaitGroup{} + progressBarWG.Add(1) + defer progressBarWG.Wait() + defer progressCancel() + go func() { + showProgress(progressCtx, c.gs, []*pb.ProgressBar{progressBar}, logger) + progressBarWG.Done() + }() + + var ( + startTime time.Time + maxDuration time.Duration + ) + maxDuration, _ = lib.GetEndOffset(executionPlan) + + testProgressLock := &sync.Mutex{} + var testProgress *cloudapi.TestProgressResponse + progressBar.Modify( + pb.WithProgress(func() (float64, []string) { + testProgressLock.Lock() + defer testProgressLock.Unlock() - testProgressLock.Lock() - testProgress = newTestProgress - testProgressLock.Unlock() + if testProgress == nil { + return 0, []string{"Waiting..."} + } + + statusText := testProgress.RunStatusText - if (newTestProgress.RunStatus > lib.RunStatusRunning) || - (exitOnRunning && newTestProgress.RunStatus == lib.RunStatusRunning) { - globalCancel() - break + if testProgress.RunStatus == lib.RunStatusFinished { + testProgress.Progress = 1 + } else if testProgress.RunStatus == lib.RunStatusRunning { + if startTime.IsZero() { + startTime = time.Now() + } + spent := time.Since(startTime) + if spent > maxDuration { + statusText = maxDuration.String() + } else { + statusText = fmt.Sprintf("%s/%s", pb.GetFixedLengthDuration(spent, maxDuration), maxDuration) } } - if testProgress == nil { - //nolint:stylecheck,golint - return errext.WithExitCodeIfNone(errors.New("Test progress error"), exitcodes.CloudFailedToGetProgress) - } + return testProgress.Progress, []string{statusText} + }), + ) - if !globalState.flags.quiet { - valueColor := getColor(globalState.flags.noColor || !globalState.stdOut.isTTY, color.FgCyan) - printToStdout(globalState, fmt.Sprintf( - " test status: %s\n", valueColor.Sprint(testProgress.RunStatusText), - )) - } else { - logger.WithField("run_status", testProgress.RunStatusText).Debug("Test finished") + ticker := time.NewTicker(time.Millisecond * 2000) + if c.showCloudLogs { + go func() { + logger.Debug("Connecting to cloud logs server...") + if err := cloudConfig.StreamLogsToLogger(globalCtx, logger, refID, 0); err != nil { + logger.WithError(err).Error("error while tailing cloud logs") } + }() + } - if testProgress.ResultStatus == cloudapi.ResultStatusFailed { - // TODO: use different exit codes for failed thresholds vs failed test (e.g. aborted by system/limit) - //nolint:stylecheck,golint - return errext.WithExitCodeIfNone(errors.New("The test has failed"), exitcodes.CloudTestRunFailed) - } + for range ticker.C { + newTestProgress, progressErr := client.GetTestProgress(refID) + if progressErr != nil { + logger.WithError(progressErr).Error("Test progress error") + continue + } + + testProgressLock.Lock() + testProgress = newTestProgress + testProgressLock.Unlock() + + if (newTestProgress.RunStatus > lib.RunStatusRunning) || + (c.exitOnRunning && newTestProgress.RunStatus == lib.RunStatusRunning) { + globalCancel() + break + } + } - return nil - }, + if testProgress == nil { + //nolint:stylecheck,golint + return errext.WithExitCodeIfNone(errors.New("Test progress error"), exitcodes.CloudFailedToGetProgress) } - cloudCmd.Flags().SortFlags = false - cloudCmd.Flags().AddFlagSet(cloudCmdFlagSet(&showCloudLogs, &exitOnRunning)) - return cloudCmd + + if !c.gs.flags.quiet { + valueColor := getColor(c.gs.flags.noColor || !c.gs.stdOut.isTTY, color.FgCyan) + printToStdout(c.gs, fmt.Sprintf( + " test status: %s\n", valueColor.Sprint(testProgress.RunStatusText), + )) + } else { + logger.WithField("run_status", testProgress.RunStatusText).Debug("Test finished") + } + + if testProgress.ResultStatus == cloudapi.ResultStatusFailed { + // TODO: use different exit codes for failed thresholds vs failed test (e.g. aborted by system/limit) + //nolint:stylecheck,golint + return errext.WithExitCodeIfNone(errors.New("The test has failed"), exitcodes.CloudTestRunFailed) + } + + return nil } -func cloudCmdFlagSet(showCloudLogs, exitOnRunning *bool) *pflag.FlagSet { +func (c *cmdCloud) flagSet() *pflag.FlagSet { flags := pflag.NewFlagSet("", pflag.ContinueOnError) flags.SortFlags = false flags.AddFlagSet(optionFlagSet()) flags.AddFlagSet(runtimeOptionFlagSet(false)) // TODO: Figure out a better way to handle the CLI flags - flags.BoolVar(exitOnRunning, "exit-on-running", *exitOnRunning, + flags.BoolVar(&c.exitOnRunning, "exit-on-running", c.exitOnRunning, "exits when test reaches the running status") - flags.BoolVar(showCloudLogs, "show-logs", *showCloudLogs, + flags.BoolVar(&c.showCloudLogs, "show-logs", c.showCloudLogs, "enable showing of logs when a test is executed in the cloud") return flags } + +func getCmdCloud(gs *globalState) *cobra.Command { + c := &cmdCloud{ + gs: gs, + showCloudLogs: true, + exitOnRunning: false, + } + + cloudCmd := &cobra.Command{ + Use: "cloud", + Short: "Run a test on the cloud", + Long: `Run a test on the cloud. + +This will execute the test on the k6 cloud service. Use "k6 login cloud" to authenticate.`, + Example: ` + k6 cloud script.js`[1:], + Args: exactArgsWithMsg(1, "arg should either be \"-\", if reading script from stdin, or a path to a script file"), + PreRunE: c.preRun, + RunE: c.run, + } + cloudCmd.Flags().SortFlags = false + cloudCmd.Flags().AddFlagSet(c.flagSet()) + return cloudCmd +} diff --git a/cmd/convert.go b/cmd/convert.go index 62d3a8ded07..74530222943 100644 --- a/cmd/convert.go +++ b/cmd/convert.go @@ -32,8 +32,9 @@ import ( "go.k6.io/k6/lib" ) +// TODO: split apart like `k6 run` and `k6 archive`? //nolint:funlen,gocognit -func getConvertCmd(globalState *globalState) *cobra.Command { +func getCmdConvert(globalState *globalState) *cobra.Command { var ( convertOutput string optionsFilePath string diff --git a/cmd/inspect.go b/cmd/inspect.go index e154665ad15..ab7fcb1b01a 100644 --- a/cmd/inspect.go +++ b/cmd/inspect.go @@ -29,7 +29,8 @@ import ( "go.k6.io/k6/lib/types" ) -func getInspectCmd(gs *globalState) *cobra.Command { +// TODO: split apart like `k6 run` and `k6 archive` +func getCmdInspect(gs *globalState) *cobra.Command { var addExecReqs bool // inspectCmd represents the inspect command diff --git a/cmd/login.go b/cmd/login.go index e2a1e11eddc..18ecf9d7319 100644 --- a/cmd/login.go +++ b/cmd/login.go @@ -24,8 +24,8 @@ import ( "github.com/spf13/cobra" ) -func getLoginCmd() *cobra.Command { - // loginCmd represents the login command +// getCmdLogin returns the `k6 login` sub-command, together with its children. +func getCmdLogin(gs *globalState) *cobra.Command { loginCmd := &cobra.Command{ Use: "login", Short: "Authenticate with a service", @@ -38,5 +38,10 @@ on the commandline.`, return cmd.Usage() }, } + loginCmd.AddCommand( + getCmdLoginCloud(gs), + getCmdLoginInfluxDB(gs), + ) + return loginCmd } diff --git a/cmd/login_cloud.go b/cmd/login_cloud.go index 927ae8250cc..5f3ff82d5cb 100644 --- a/cmd/login_cloud.go +++ b/cmd/login_cloud.go @@ -37,7 +37,7 @@ import ( ) //nolint:funlen,gocognit -func getLoginCloudCommand(globalState *globalState) *cobra.Command { +func getCmdLoginCloud(globalState *globalState) *cobra.Command { // loginCloudCommand represents the 'login cloud' command loginCloudCommand := &cobra.Command{ Use: "cloud", diff --git a/cmd/login_influxdb.go b/cmd/login_influxdb.go index b5d5544b090..4f34021fada 100644 --- a/cmd/login_influxdb.go +++ b/cmd/login_influxdb.go @@ -34,7 +34,7 @@ import ( ) //nolint:funlen -func getLoginInfluxDBCommand(globalState *globalState) *cobra.Command { +func getCmdLoginInfluxDB(globalState *globalState) *cobra.Command { // loginInfluxDBCommand represents the 'login influxdb' command loginInfluxDBCommand := &cobra.Command{ Use: "influxdb [uri]", diff --git a/cmd/pause.go b/cmd/pause.go index bc5629c6e0a..03c68c1bfd4 100644 --- a/cmd/pause.go +++ b/cmd/pause.go @@ -28,7 +28,7 @@ import ( "go.k6.io/k6/api/v1/client" ) -func getPauseCmd(globalState *globalState) *cobra.Command { +func getCmdPause(globalState *globalState) *cobra.Command { // pauseCmd represents the pause command pauseCmd := &cobra.Command{ Use: "pause", diff --git a/cmd/resume.go b/cmd/resume.go index d7737973f14..0b2501c1e91 100644 --- a/cmd/resume.go +++ b/cmd/resume.go @@ -28,7 +28,7 @@ import ( "go.k6.io/k6/api/v1/client" ) -func getResumeCmd(globalState *globalState) *cobra.Command { +func getCmdResume(globalState *globalState) *cobra.Command { // resumeCmd represents the resume command resumeCmd := &cobra.Command{ Use: "resume", diff --git a/cmd/root.go b/cmd/root.go index a1590c8dd26..2016474d7b4 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -234,16 +234,15 @@ func newRootCommand(gs *globalState) *rootCommand { rootCmd.SetErr(gs.stdErr) // TODO: use gs.logger.WriterLevel(logrus.ErrorLevel)? rootCmd.SetIn(gs.stdIn) - loginCmd := getLoginCmd() - loginCmd.AddCommand( - getLoginCloudCommand(gs), - getLoginInfluxDBCommand(gs), - ) - rootCmd.AddCommand( - getArchiveCmd(gs), getCloudCmd(gs), getConvertCmd(gs), getInspectCmd(gs), - loginCmd, getPauseCmd(gs), getResumeCmd(gs), getScaleCmd(gs), getRunCmd(gs), - getStatsCmd(gs), getStatusCmd(gs), getVersionCmd(gs), - ) + subCommands := []func(*globalState) *cobra.Command{ + getCmdArchive, getCmdCloud, getCmdConvert, getCmdInspect, + getCmdLogin, getCmdPause, getCmdResume, getCmdScale, getCmdRun, + getCmdStats, getCmdStatus, getCmdVersion, + } + + for _, sc := range subCommands { + rootCmd.AddCommand(sc(gs)) + } c.cmd = rootCmd return c diff --git a/cmd/run.go b/cmd/run.go index 029e900c83f..4df4b44576e 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -48,9 +48,241 @@ import ( "go.k6.io/k6/ui/pb" ) +// cmdRun handles the `k6 run` sub-command +type cmdRun struct { + gs *globalState +} + +// TODO: split apart some more //nolint:funlen,gocognit,gocyclo,cyclop -func getRunCmd(globalState *globalState) *cobra.Command { - // runCmd represents the run command. +func (c *cmdRun) run(cmd *cobra.Command, args []string) error { + printBanner(c.gs) + + test, err := loadTest(c.gs, cmd, args, getConfig) + if err != nil { + return err + } + + // Write the full consolidated *and derived* options back to the Runner. + conf := test.derivedConfig + if err = test.initRunner.SetOptions(conf.Options); err != nil { + return err + } + + // We prepare a bunch of contexts: + // - The runCtx is cancelled as soon as the Engine's run() lambda finishes, + // and can trigger things like the usage report and end of test summary. + // Crucially, metrics processing by the Engine will still work after this + // context is cancelled! + // - The lingerCtx is cancelled by Ctrl+C, and is used to wait for that + // event when k6 was ran with the --linger option. + // - The globalCtx is cancelled only after we're completely done with the + // test execution and any --linger has been cleared, so that the Engine + // can start winding down its metrics processing. + globalCtx, globalCancel := context.WithCancel(c.gs.ctx) + defer globalCancel() + lingerCtx, lingerCancel := context.WithCancel(globalCtx) + defer lingerCancel() + runCtx, runCancel := context.WithCancel(lingerCtx) + defer runCancel() + + logger := c.gs.logger + // Create a local execution scheduler wrapping the runner. + logger.Debug("Initializing the execution scheduler...") + execScheduler, err := local.NewExecutionScheduler(test.initRunner, logger) + if err != nil { + return err + } + + // This is manually triggered after the Engine's Run() has completed, + // and things like a single Ctrl+C don't affect it. We use it to make + // sure that the progressbars finish updating with the latest execution + // state one last time, after the test run has finished. + progressCtx, progressCancel := context.WithCancel(globalCtx) + defer progressCancel() + initBar := execScheduler.GetInitProgressBar() + progressBarWG := &sync.WaitGroup{} + progressBarWG.Add(1) + go func() { + pbs := []*pb.ProgressBar{execScheduler.GetInitProgressBar()} + for _, s := range execScheduler.GetExecutors() { + pbs = append(pbs, s.GetProgress()) + } + showProgress(progressCtx, c.gs, pbs, logger) + progressBarWG.Done() + }() + + // Create all outputs. + executionPlan := execScheduler.GetExecutionPlan() + outputs, err := createOutputs(c.gs, test, executionPlan) + if err != nil { + return err + } + + // Create the engine. + initBar.Modify(pb.WithConstProgress(0, "Init engine")) + engine, err := core.NewEngine( + execScheduler, conf.Options, test.runtimeOptions, + outputs, logger, test.builtInMetrics, + ) + if err != nil { + return err + } + + // Spin up the REST API server, if not disabled. + if c.gs.flags.address != "" { + initBar.Modify(pb.WithConstProgress(0, "Init API server")) + go func() { + logger.Debugf("Starting the REST API server on %s", c.gs.flags.address) + if aerr := api.ListenAndServe(c.gs.flags.address, engine, logger); aerr != nil { + // Only exit k6 if the user has explicitly set the REST API address + if cmd.Flags().Lookup("address").Changed { + logger.WithError(aerr).Error("Error from API server") + c.gs.osExit(int(exitcodes.CannotStartRESTAPI)) + } else { + logger.WithError(aerr).Warn("Error from API server") + } + } + }() + } + + // We do this here so we can get any output URLs below. + initBar.Modify(pb.WithConstProgress(0, "Starting outputs")) + err = engine.StartOutputs() + if err != nil { + return err + } + defer engine.StopOutputs() + + printExecutionDescription( + c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, + ) + + // Trap Interrupts, SIGINTs and SIGTERMs. + gracefulStop := func(sig os.Signal) { + logger.WithField("sig", sig).Debug("Stopping k6 in response to signal...") + lingerCancel() // stop the test run, metric processing is cancelled below + } + hardStop := func(sig os.Signal) { + logger.WithField("sig", sig).Error("Aborting k6 in response to signal") + globalCancel() // not that it matters, given the following command... + } + stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, hardStop) + defer stopSignalHandling() + + // Initialize the engine + initBar.Modify(pb.WithConstProgress(0, "Init VUs...")) + engineRun, engineWait, err := engine.Init(globalCtx, runCtx) + if err != nil { + err = common.UnwrapGojaInterruptedError(err) + // Add a generic engine exit code if we don't have a more specific one + return errext.WithExitCodeIfNone(err, exitcodes.GenericEngine) + } + + // Init has passed successfully, so unless disabled, make sure we send a + // usage report after the context is done. + if !conf.NoUsageReport.Bool { + reportDone := make(chan struct{}) + go func() { + <-runCtx.Done() + _ = reportUsage(execScheduler) + close(reportDone) + }() + defer func() { + select { + case <-reportDone: + case <-time.After(3 * time.Second): + } + }() + } + + // Start the test run + initBar.Modify(pb.WithConstProgress(0, "Starting test...")) + var interrupt error + err = engineRun() + if err != nil { + err = common.UnwrapGojaInterruptedError(err) + if common.IsInterruptError(err) { + // Don't return here since we need to work with --linger, + // show the end-of-test summary and exit cleanly. + interrupt = err + } + if !conf.Linger.Bool && interrupt == nil { + return errext.WithExitCodeIfNone(err, exitcodes.GenericEngine) + } + } + runCancel() + logger.Debug("Engine run terminated cleanly") + + progressCancel() + progressBarWG.Wait() + + executionState := execScheduler.GetState() + // Warn if no iterations could be completed. + if executionState.GetFullIterationCount() == 0 { + logger.Warn("No script iterations finished, consider making the test duration longer") + } + + // Handle the end-of-test summary. + if !test.runtimeOptions.NoSummary.Bool { + summaryResult, err := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ + Metrics: engine.Metrics, + RootGroup: engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), + TestRunDuration: executionState.GetCurrentTestRunDuration(), + NoColor: c.gs.flags.noColor, + UIState: lib.UIState{ + IsStdOutTTY: c.gs.stdOut.isTTY, + IsStdErrTTY: c.gs.stdErr.isTTY, + }, + }) + if err == nil { + err = handleSummaryResult(c.gs.fs, c.gs.stdOut, c.gs.stdErr, summaryResult) + } + if err != nil { + logger.WithError(err).Error("failed to handle the end-of-test summary") + } + } + + if conf.Linger.Bool { + select { + case <-lingerCtx.Done(): + // do nothing, we were interrupted by Ctrl+C already + default: + logger.Debug("Linger set; waiting for Ctrl+C...") + if !c.gs.flags.quiet { + printToStdout(c.gs, "Linger set; waiting for Ctrl+C...") + } + <-lingerCtx.Done() + logger.Debug("Ctrl+C received, exiting...") + } + } + globalCancel() // signal the Engine that it should wind down + logger.Debug("Waiting for engine processes to finish...") + engineWait() + logger.Debug("Everything has finished, exiting k6!") + if interrupt != nil { + return interrupt + } + if engine.IsTainted() { + return errext.WithExitCodeIfNone(errors.New("some thresholds have failed"), exitcodes.ThresholdsHaveFailed) + } + return nil +} + +func (c *cmdRun) flagSet() *pflag.FlagSet { + flags := pflag.NewFlagSet("", pflag.ContinueOnError) + flags.SortFlags = false + flags.AddFlagSet(optionFlagSet()) + flags.AddFlagSet(runtimeOptionFlagSet(true)) + flags.AddFlagSet(configFlagSet()) + return flags +} + +func getCmdRun(gs *globalState) *cobra.Command { + c := &cmdRun{ + gs: gs, + } + runCmd := &cobra.Command{ Use: "run", Short: "Start a load test", @@ -77,223 +309,11 @@ a commandline interface for interacting with it.`, # Send metrics to an influxdb server k6 run -o influxdb=http://1.2.3.4:8086/k6`[1:], Args: exactArgsWithMsg(1, "arg should either be \"-\", if reading script from stdin, or a path to a script file"), - RunE: func(cmd *cobra.Command, args []string) error { - printBanner(globalState) - - test, err := loadTest(globalState, cmd, args, getConfig) - if err != nil { - return err - } - - // Write the full consolidated *and derived* options back to the Runner. - conf := test.derivedConfig - if err = test.initRunner.SetOptions(conf.Options); err != nil { - return err - } - - // We prepare a bunch of contexts: - // - The runCtx is cancelled as soon as the Engine's run() lambda finishes, - // and can trigger things like the usage report and end of test summary. - // Crucially, metrics processing by the Engine will still work after this - // context is cancelled! - // - The lingerCtx is cancelled by Ctrl+C, and is used to wait for that - // event when k6 was ran with the --linger option. - // - The globalCtx is cancelled only after we're completely done with the - // test execution and any --linger has been cleared, so that the Engine - // can start winding down its metrics processing. - globalCtx, globalCancel := context.WithCancel(globalState.ctx) - defer globalCancel() - lingerCtx, lingerCancel := context.WithCancel(globalCtx) - defer lingerCancel() - runCtx, runCancel := context.WithCancel(lingerCtx) - defer runCancel() - - logger := globalState.logger - // Create a local execution scheduler wrapping the runner. - logger.Debug("Initializing the execution scheduler...") - execScheduler, err := local.NewExecutionScheduler(test.initRunner, logger) - if err != nil { - return err - } - - // This is manually triggered after the Engine's Run() has completed, - // and things like a single Ctrl+C don't affect it. We use it to make - // sure that the progressbars finish updating with the latest execution - // state one last time, after the test run has finished. - progressCtx, progressCancel := context.WithCancel(globalCtx) - defer progressCancel() - initBar := execScheduler.GetInitProgressBar() - progressBarWG := &sync.WaitGroup{} - progressBarWG.Add(1) - go func() { - pbs := []*pb.ProgressBar{execScheduler.GetInitProgressBar()} - for _, s := range execScheduler.GetExecutors() { - pbs = append(pbs, s.GetProgress()) - } - showProgress(progressCtx, globalState, pbs, logger) - progressBarWG.Done() - }() - - // Create all outputs. - executionPlan := execScheduler.GetExecutionPlan() - outputs, err := createOutputs(globalState, test, executionPlan) - if err != nil { - return err - } - - // Create the engine. - initBar.Modify(pb.WithConstProgress(0, "Init engine")) - engine, err := core.NewEngine( - execScheduler, conf.Options, test.runtimeOptions, - outputs, logger, test.builtInMetrics, - ) - if err != nil { - return err - } - - // Spin up the REST API server, if not disabled. - if globalState.flags.address != "" { - initBar.Modify(pb.WithConstProgress(0, "Init API server")) - go func() { - logger.Debugf("Starting the REST API server on %s", globalState.flags.address) - if aerr := api.ListenAndServe(globalState.flags.address, engine, logger); aerr != nil { - // Only exit k6 if the user has explicitly set the REST API address - if cmd.Flags().Lookup("address").Changed { - logger.WithError(aerr).Error("Error from API server") - globalState.osExit(int(exitcodes.CannotStartRESTAPI)) - } else { - logger.WithError(aerr).Warn("Error from API server") - } - } - }() - } - - // We do this here so we can get any output URLs below. - initBar.Modify(pb.WithConstProgress(0, "Starting outputs")) - err = engine.StartOutputs() - if err != nil { - return err - } - defer engine.StopOutputs() - - printExecutionDescription( - globalState, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, - ) - - // Trap Interrupts, SIGINTs and SIGTERMs. - gracefulStop := func(sig os.Signal) { - logger.WithField("sig", sig).Debug("Stopping k6 in response to signal...") - lingerCancel() // stop the test run, metric processing is cancelled below - } - hardStop := func(sig os.Signal) { - logger.WithField("sig", sig).Error("Aborting k6 in response to signal") - globalCancel() // not that it matters, given the following command... - } - stopSignalHandling := handleTestAbortSignals(globalState, gracefulStop, hardStop) - defer stopSignalHandling() - - // Initialize the engine - initBar.Modify(pb.WithConstProgress(0, "Init VUs...")) - engineRun, engineWait, err := engine.Init(globalCtx, runCtx) - if err != nil { - err = common.UnwrapGojaInterruptedError(err) - // Add a generic engine exit code if we don't have a more specific one - return errext.WithExitCodeIfNone(err, exitcodes.GenericEngine) - } - - // Init has passed successfully, so unless disabled, make sure we send a - // usage report after the context is done. - if !conf.NoUsageReport.Bool { - reportDone := make(chan struct{}) - go func() { - <-runCtx.Done() - _ = reportUsage(execScheduler) - close(reportDone) - }() - defer func() { - select { - case <-reportDone: - case <-time.After(3 * time.Second): - } - }() - } - - // Start the test run - initBar.Modify(pb.WithConstProgress(0, "Starting test...")) - var interrupt error - err = engineRun() - if err != nil { - err = common.UnwrapGojaInterruptedError(err) - if common.IsInterruptError(err) { - // Don't return here since we need to work with --linger, - // show the end-of-test summary and exit cleanly. - interrupt = err - } - if !conf.Linger.Bool && interrupt == nil { - return errext.WithExitCodeIfNone(err, exitcodes.GenericEngine) - } - } - runCancel() - logger.Debug("Engine run terminated cleanly") - - progressCancel() - progressBarWG.Wait() - - executionState := execScheduler.GetState() - // Warn if no iterations could be completed. - if executionState.GetFullIterationCount() == 0 { - logger.Warn("No script iterations finished, consider making the test duration longer") - } - - // Handle the end-of-test summary. - if !test.runtimeOptions.NoSummary.Bool { - summaryResult, err := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ - Metrics: engine.Metrics, - RootGroup: engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), - TestRunDuration: executionState.GetCurrentTestRunDuration(), - NoColor: globalState.flags.noColor, - UIState: lib.UIState{ - IsStdOutTTY: globalState.stdOut.isTTY, - IsStdErrTTY: globalState.stdErr.isTTY, - }, - }) - if err == nil { - err = handleSummaryResult(globalState.fs, globalState.stdOut, globalState.stdErr, summaryResult) - } - if err != nil { - logger.WithError(err).Error("failed to handle the end-of-test summary") - } - } - - if conf.Linger.Bool { - select { - case <-lingerCtx.Done(): - // do nothing, we were interrupted by Ctrl+C already - default: - logger.Debug("Linger set; waiting for Ctrl+C...") - if !globalState.flags.quiet { - printToStdout(globalState, "Linger set; waiting for Ctrl+C...") - } - <-lingerCtx.Done() - logger.Debug("Ctrl+C received, exiting...") - } - } - globalCancel() // signal the Engine that it should wind down - logger.Debug("Waiting for engine processes to finish...") - engineWait() - logger.Debug("Everything has finished, exiting k6!") - if interrupt != nil { - return interrupt - } - if engine.IsTainted() { - return errext.WithExitCodeIfNone(errors.New("some thresholds have failed"), exitcodes.ThresholdsHaveFailed) - } - return nil - }, + RunE: c.run, } runCmd.Flags().SortFlags = false - runCmd.Flags().AddFlagSet(runCmdFlagSet()) + runCmd.Flags().AddFlagSet(c.flagSet()) return runCmd } @@ -329,15 +349,6 @@ func reportUsage(execScheduler *local.ExecutionScheduler) error { return err } -func runCmdFlagSet() *pflag.FlagSet { - flags := pflag.NewFlagSet("", pflag.ContinueOnError) - flags.SortFlags = false - flags.AddFlagSet(optionFlagSet()) - flags.AddFlagSet(runtimeOptionFlagSet(true)) - flags.AddFlagSet(configFlagSet()) - return flags -} - func handleSummaryResult(fs afero.Fs, stdOut, stdErr io.Writer, result map[string]io.Reader) error { var errs []error diff --git a/cmd/scale.go b/cmd/scale.go index d2e29da838f..e64a4942033 100644 --- a/cmd/scale.go +++ b/cmd/scale.go @@ -29,7 +29,7 @@ import ( "go.k6.io/k6/api/v1/client" ) -func getScaleCmd(globalState *globalState) *cobra.Command { +func getCmdScale(globalState *globalState) *cobra.Command { // scaleCmd represents the scale command scaleCmd := &cobra.Command{ Use: "scale", diff --git a/cmd/stats.go b/cmd/stats.go index e375926b7f1..8a36efd277b 100644 --- a/cmd/stats.go +++ b/cmd/stats.go @@ -26,7 +26,7 @@ import ( "go.k6.io/k6/api/v1/client" ) -func getStatsCmd(globalState *globalState) *cobra.Command { +func getCmdStats(globalState *globalState) *cobra.Command { // statsCmd represents the stats command statsCmd := &cobra.Command{ Use: "stats", diff --git a/cmd/status.go b/cmd/status.go index 3f90fc67ef1..ffa73b7a3ec 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -26,7 +26,7 @@ import ( "go.k6.io/k6/api/v1/client" ) -func getStatusCmd(globalState *globalState) *cobra.Command { +func getCmdStatus(globalState *globalState) *cobra.Command { // statusCmd represents the status command statusCmd := &cobra.Command{ Use: "status", diff --git a/cmd/version.go b/cmd/version.go index f1cf37b9725..a7747536568 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -26,7 +26,7 @@ import ( "go.k6.io/k6/lib/consts" ) -func getVersionCmd(globalState *globalState) *cobra.Command { +func getCmdVersion(globalState *globalState) *cobra.Command { // versionCmd represents the version command. versionCmd := &cobra.Command{ Use: "version",