diff --git a/src/go/rpk/pkg/cli/cluster/logdirs.go b/src/go/rpk/pkg/cli/cluster/logdirs.go index 3b3044a28a90..1c30250ae003 100644 --- a/src/go/rpk/pkg/cli/cluster/logdirs.go +++ b/src/go/rpk/pkg/cli/cluster/logdirs.go @@ -79,7 +79,7 @@ where revision is a Redpanda internal concept. listed, err := adm.ListTopics(context.Background(), topics...) out.MaybeDie(err, "unable to describe topics: %v", err) listed.EachError(func(d kadm.TopicDetail) { - fmt.Fprintf(os.Stderr, "unable to discover the partitions on topic %q: %v", d.Topic, d.Err) + fmt.Fprintf(os.Stderr, "unable to discover the partitions on topic %q: %v\n", d.Topic, d.Err) }) s = listed.TopicsSet() } diff --git a/src/go/rpk/pkg/cli/profile/clear.go b/src/go/rpk/pkg/cli/profile/clear.go new file mode 100644 index 000000000000..1632b134a132 --- /dev/null +++ b/src/go/rpk/pkg/cli/profile/clear.go @@ -0,0 +1,40 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package profile + +import ( + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newClearCommand(fs afero.Fs, p *config.Params) *cobra.Command { + return &cobra.Command{ + Use: "clear", + Short: "Clear the current profile", + Long: `Clear the current profile + +This small command clears the current profile, which can be useful to unset an +prod cluster profile. +`, + Args: cobra.ExactArgs(0), + Run: func(_ *cobra.Command, args []string) { + cfg, err := p.Load(fs) + out.MaybeDie(err, "unable to load config: %v", err) + y, ok := cfg.ActualRpkYaml() + if !ok { + return + } + y.CurrentProfile = "" + y.Write(fs) + }, + } +} diff --git a/src/go/rpk/pkg/cli/profile/create.go b/src/go/rpk/pkg/cli/profile/create.go index cea090ccbe0f..b6c3f65cdba8 100644 --- a/src/go/rpk/pkg/cli/profile/create.go +++ b/src/go/rpk/pkg/cli/profile/create.go @@ -11,7 +11,9 @@ package profile import ( "context" + "errors" "fmt" + "os" "time" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cloudapi" @@ -26,10 +28,11 @@ import ( func newCreateCommand(fs afero.Fs, p *config.Params) *cobra.Command { var ( - set []string - fromSimple string - fromCloud string - description string + set []string + fromRedpanda string + fromProfile string + fromCloud string + description string ) cmd := &cobra.Command{ @@ -40,8 +43,14 @@ func newCreateCommand(fs afero.Fs, p *config.Params) *cobra.Command { There are multiple ways to create a profile. If no name is provided, the name "default" is used. -* If you have an older redpanda.yaml, you can use --from-simple to generate - a new profile from the existing redpanda.yaml file. +* You can use --from-redpanda to generate a new profile from an existing + redpanda.yaml file. The special values "current" create a profile from the + current redpanda.yaml as it is loaded within rpk. + +* You can use --from-profile to generate a profile from an existing profile or + from from a profile in a yaml file. First, the filename is checked, then an + existing profile name is checked. The special value "current" creates a new + profile from the existing profile. * You can use --from-cloud to generate a profile from an existing cloud cluster id. Note that you must be logged in with 'rpk cloud login' first. @@ -53,7 +62,7 @@ name "default" is used. latter corresponds to the path kafka_api.tls.enabled in the profile's YAML. The --set flag is always applied last and can be used to set additional fields -in tandem with --from-simple or --from-cloud. +in tandem with --from-redpanda or --from-cloud. The --set flag supports autocompletion, suggesting the -X key format. If you begin writing a YAML path, the flag will suggest the rest of the path. @@ -81,7 +90,7 @@ rpk always switches to the newly created profile. ctx, cancel := context.WithTimeout(cmd.Context(), 10*time.Second) defer cancel() - cloudMTLS, cloudSASL, err := createCtx(ctx, fs, y, cfg, fromSimple, fromCloud, set, name, description) + cloudMTLS, cloudSASL, err := createCtx(ctx, fs, y, cfg, fromRedpanda, fromProfile, fromCloud, set, name, description) out.MaybeDieErr(err) fmt.Printf("Created and switched to new profile %q.\n", name) @@ -96,7 +105,8 @@ rpk always switches to the newly created profile. } cmd.Flags().StringSliceVarP(&set, "set", "s", nil, "Create and switch to a new profile, setting profile fields with key=value pairs") - cmd.Flags().StringVar(&fromSimple, "from-simple", "", "Create and switch to a new profile from a (simpler to define) redpanda.yaml file") + cmd.Flags().StringVar(&fromRedpanda, "from-redpanda", "", "Create and switch to a new profile from a redpanda.yaml file") + cmd.Flags().StringVar(&fromProfile, "from-profile", "", "Create and switch to a new profile from an existing profile or from a profile in a yaml file") cmd.Flags().StringVar(&fromCloud, "from-cloud", "", "Create and switch to a new profile generated from a Redpanda Cloud cluster ID") cmd.Flags().StringVarP(&description, "description", "d", "", "Optional description of the profile") @@ -111,14 +121,15 @@ func createCtx( fs afero.Fs, y *config.RpkYaml, cfg *config.Config, - fromSimple string, + fromRedpanda string, + fromProfile string, fromCloud string, set []string, name string, description string, ) (cloudMTLS, cloudSASL bool, err error) { - if fromCloud != "" && fromSimple != "" { - return false, false, fmt.Errorf("cannot use --from-cloud and --from-simple together") + if (fromCloud != "" && fromRedpanda != "") || (fromCloud != "" && fromProfile != "") || (fromRedpanda != "" && fromProfile != "") { + return false, false, fmt.Errorf("can only use one of --from-cloud, --from-redpanda, or --from-profile") } if p := y.Profile(name); p != nil { return false, false, fmt.Errorf("profile %q already exists", name) @@ -133,19 +144,43 @@ func createCtx( return false, false, err } - case fromSimple != "": + case fromProfile != "": + switch { + case fromProfile == "current": + p = *cfg.VirtualProfile() + default: + raw, err := afero.ReadFile(fs, fromProfile) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return false, false, fmt.Errorf("unable to read file %q: %v", fromProfile, err) + } + y, err := cfg.ActualRpkYamlOrEmpty() + if err != nil { + return false, false, fmt.Errorf("file %q does not exist, and we cannot read rpk.yaml: %v", fromProfile, err) + } + src := y.Profile(fromProfile) + if src == nil { + return false, false, fmt.Errorf("unable to find profile %q", fromProfile) + } + p = *src + } else if err := yaml.Unmarshal(raw, &p); err != nil { + return false, false, fmt.Errorf("unable to yaml decode file %q: %v", fromProfile, err) + } + } + + case fromRedpanda != "": var nodeCfg config.RpkNodeConfig switch { - case fromSimple == "loaded" || fromSimple == "current": + case fromRedpanda == "current": nodeCfg = cfg.VirtualRedpandaYaml().Rpk default: - raw, err := afero.ReadFile(fs, fromSimple) + raw, err := afero.ReadFile(fs, fromRedpanda) if err != nil { - return false, false, fmt.Errorf("unable to read file %q: %v", fromSimple, err) + return false, false, fmt.Errorf("unable to read file %q: %v", fromRedpanda, err) } var rpyaml config.RedpandaYaml if err := yaml.Unmarshal(raw, &rpyaml); err != nil { - return false, false, fmt.Errorf("unable to yaml decode file %q: %v", fromSimple, err) + return false, false, fmt.Errorf("unable to yaml decode file %q: %v", fromRedpanda, err) } nodeCfg = rpyaml.Rpk } diff --git a/src/go/rpk/pkg/cli/profile/current.go b/src/go/rpk/pkg/cli/profile/current.go new file mode 100644 index 000000000000..1507354cffe6 --- /dev/null +++ b/src/go/rpk/pkg/cli/profile/current.go @@ -0,0 +1,48 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package profile + +import ( + "fmt" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newCurrentCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var noNewline bool + cmd := &cobra.Command{ + Use: "current", + Short: "Print the current profile name", + Long: `Print the current profile name. + +This is a tiny command that simply prints the current profile name, which may +be useful in scripts, or a PS1, or to confirm what you have selected. +`, + Args: cobra.ExactArgs(0), + Run: func(_ *cobra.Command, args []string) { + cfg, err := p.Load(fs) + out.MaybeDie(err, "unable to load config: %v", err) + + if !noNewline { + defer fmt.Println() + } + y, ok := cfg.ActualRpkYaml() + if !ok { + return + } + fmt.Print(y.CurrentProfile) + }, + } + cmd.Flags().BoolVarP(&noNewline, "no-newline", "n", false, "Do not print a newline after the profile name") + return cmd +} diff --git a/src/go/rpk/pkg/cli/profile/delete.go b/src/go/rpk/pkg/cli/profile/delete.go index d0cbc1fdefb1..61738134f619 100644 --- a/src/go/rpk/pkg/cli/profile/delete.go +++ b/src/go/rpk/pkg/cli/profile/delete.go @@ -29,7 +29,7 @@ was the selected profile, rpk will use in-memory defaults until a new profile is selected. `, Args: cobra.ExactArgs(1), - ValidArgsFunction: validProfiles(fs, p), + ValidArgsFunction: ValidProfiles(fs, p), Run: func(_ *cobra.Command, args []string) { cfg, err := p.Load(fs) out.MaybeDie(err, "unable to load config: %v", err) diff --git a/src/go/rpk/pkg/cli/profile/edit.go b/src/go/rpk/pkg/cli/profile/edit.go index 9011b1cc5cc9..5607de77b6b1 100644 --- a/src/go/rpk/pkg/cli/profile/edit.go +++ b/src/go/rpk/pkg/cli/profile/edit.go @@ -35,7 +35,7 @@ If you want to edit the current raw profile as it exists in rpk.yaml, you can use the --raw flag. `, Args: cobra.MaximumNArgs(1), - ValidArgsFunction: validProfiles(fs, p), + ValidArgsFunction: ValidProfiles(fs, p), Run: func(_ *cobra.Command, args []string) { cfg, err := p.Load(fs) out.MaybeDie(err, "unable to load config: %v", err) @@ -63,7 +63,7 @@ can use the --raw flag. out.MaybeDieErr(err) var renamed, updatedCurrent bool - if update.Name != name { + if update.Name != name && update.Name != "" { renamed = true if y.CurrentProfile == name { updatedCurrent = true diff --git a/src/go/rpk/pkg/cli/profile/edit_defaults.go b/src/go/rpk/pkg/cli/profile/edit_defaults.go new file mode 100644 index 000000000000..85286157cb98 --- /dev/null +++ b/src/go/rpk/pkg/cli/profile/edit_defaults.go @@ -0,0 +1,47 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package profile + +import ( + "fmt" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + rpkos "github.com/redpanda-data/redpanda/src/go/rpk/pkg/os" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newEditDefaultsCommand(fs afero.Fs, p *config.Params) *cobra.Command { + cmd := &cobra.Command{ + Use: "edit-defaults", + Short: "Edit rpk defaults", + Long: `Edit rpk defaults. + +This command opens your default editor to edit the specified profile, or +the current profile if no profile is specified. +`, + Args: cobra.ExactArgs(0), + Run: func(*cobra.Command, []string) { + cfg, err := p.Load(fs) + out.MaybeDie(err, "unable to load config: %v", err) + y, err := cfg.ActualRpkYamlOrEmpty() + out.MaybeDie(err, "unable to load config: %v", err) + + y.Defaults, err = rpkos.EditTmpYAMLFile(fs, y.Defaults) + out.MaybeDieErr(err) + + err = y.Write(fs) + out.MaybeDie(err, "unable to write rpk.yaml: %v", err) + fmt.Println("Defaults updated successfully.") + }, + } + return cmd +} diff --git a/src/go/rpk/pkg/cli/profile/print.go b/src/go/rpk/pkg/cli/profile/print.go index f7bcd10697d0..da0772304ff7 100644 --- a/src/go/rpk/pkg/cli/profile/print.go +++ b/src/go/rpk/pkg/cli/profile/print.go @@ -32,7 +32,7 @@ variables applied. If you wish to print the current raw profile as it exists in rpk.yaml, you can use the --raw flag. `, Args: cobra.MaximumNArgs(1), - ValidArgsFunction: validProfiles(fs, p), + ValidArgsFunction: ValidProfiles(fs, p), Run: func(_ *cobra.Command, args []string) { cfg, err := p.Load(fs) out.MaybeDie(err, "unable to load config: %v", err) diff --git a/src/go/rpk/pkg/cli/profile/print_defaults.go b/src/go/rpk/pkg/cli/profile/print_defaults.go new file mode 100644 index 000000000000..26a421efb557 --- /dev/null +++ b/src/go/rpk/pkg/cli/profile/print_defaults.go @@ -0,0 +1,38 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package profile + +import ( + "fmt" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" +) + +func newPrintDefaultsCommand(fs afero.Fs, p *config.Params) *cobra.Command { + return &cobra.Command{ + Use: "print-defaults", + Short: "Print rpk default", + Long: `Print rpk defaults.`, + Args: cobra.ExactArgs(0), + Run: func(_ *cobra.Command, args []string) { + cfg, err := p.Load(fs) + out.MaybeDie(err, "unable to load config: %v", err) + + y := cfg.VirtualRpkYaml() + m, err := yaml.Marshal(y.Defaults) + out.MaybeDie(err, "unable to encode profile: %v", err) + fmt.Println(string(m)) + }, + } +} diff --git a/src/go/rpk/pkg/cli/profile/profile.go b/src/go/rpk/pkg/cli/profile/profile.go index 4e243f46b9b5..c2edc2305bb8 100644 --- a/src/go/rpk/pkg/cli/profile/profile.go +++ b/src/go/rpk/pkg/cli/profile/profile.go @@ -33,19 +33,25 @@ your configuration in one place. cmd.AddCommand( newCreateCommand(fs, p), + newClearCommand(fs, p), + newCurrentCommand(fs, p), newDeleteCommand(fs, p), newEditCommand(fs, p), + newEditDefaultsCommand(fs, p), newListCommand(fs, p), newPrintCommand(fs, p), + newPrintDefaultsCommand(fs, p), + newPromptCommand(fs, p), newRenameToCommand(fs, p), newSetCommand(fs, p), + newSetDefaultsCommand(fs, p), newUseCommand(fs, p), ) return cmd } -func validProfiles(fs afero.Fs, p *config.Params) func(*cobra.Command, []string, string) ([]string, cobra.ShellCompDirective) { +func ValidProfiles(fs afero.Fs, p *config.Params) func(*cobra.Command, []string, string) ([]string, cobra.ShellCompDirective) { return func(cmd *cobra.Command, _ []string, toComplete string) ([]string, cobra.ShellCompDirective) { cfg, err := p.Load(fs) if err != nil { diff --git a/src/go/rpk/pkg/cli/profile/prompt.go b/src/go/rpk/pkg/cli/profile/prompt.go new file mode 100644 index 000000000000..38172f54e649 --- /dev/null +++ b/src/go/rpk/pkg/cli/profile/prompt.go @@ -0,0 +1,288 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package profile + +import ( + "bytes" + "errors" + "fmt" + "strings" + + "github.com/fatih/color" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newPromptCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var validate bool + cmd := &cobra.Command{ + Use: "prompt", + Short: "Prompt a profile name formatted for a PS1 prompt", + Long: `Prompt a profile name formatted for a PS1 prompt. + +This command prints ANSI-escaped text per your current profile's "prompt" +field. If the current profile does not have a prompt, this prints nothing. +If the prompt is invalid, this exits 0 with no message. To validate the +current prompt, use the --validate flag. + +This command may introduce other % variables in the future, if you want to +print a % directly, use %% to escape it. + +To use this in zsh, be sure to add setopt PROMPT_SUBST to your .zshrc. +To edit your PS1, use something like PS1='$(rpk profile prompt)' in your +shell rc file. + +FORMAT + +The "prompt" field supports space or comma separated modifiers and a quoted +string that is be modified. Inside the string, the variable %p or %n refers to +the profile name. As a few examples: + + prompt: hi-white, bg-red, bold, "[%p]" + prompt: hi-red "PROD" + prompt: white, "dev-%n + +If you want to have multiple formats, you can wrap each formatted section in +parentheses. + + prompt: ("--") (hi-white bg-red bold "[%p]") + +COLORS + +All ANSI colors are supported, with names matching the color name: +"black", "red", "green", "yellow", "blue", "magenta", "cyan", "white". + +The "hi-" prefix indicates a high-intensity color: "hi-black", "hi-red", etc. +The "bg-" prefix modifies the background color: "bg-black", "bg-hi-red", etc. + +MODIFIERS + +Four modifiers are supported, "bold", "faint", "underline", and "invert". +`, + Args: cobra.ExactArgs(0), + Run: func(_ *cobra.Command, args []string) { + cfg, err := p.Load(fs) + out.MaybeDie(err, "unable to load config: %v", err) + + var errmsg string + if validate { + defer func() { + if errmsg != "" { + out.DieString(errmsg) + } + }() + } + + y, ok := cfg.ActualRpkYaml() + if !ok { + errmsg = "rpk.yaml file is missing or cannot be loaded" + return + } + p := y.Profile(y.CurrentProfile) + if p == nil { + errmsg = fmt.Sprintf("current profile %q does not exist", y.CurrentProfile) + return + } + + prompt := cfg.VirtualRpkYaml().Defaults.Prompt + if p.Prompt != "" { + prompt = p.Prompt + } + + parens, err := splitPromptParens(prompt) + if err != nil { + errmsg = err.Error() + return + } + + var sb strings.Builder + for _, g := range parens { + text, attrs, err := parsePrompt(g, p.Name) + if err != nil { + errmsg = err.Error() + return + } + c := color.New(attrs...) + c.EnableColor() + sb.WriteString(c.Sprint(text)) + } + if validate { + fmt.Print("Prompt ok! Output:\nvvv\n") + defer fmt.Print("\n^^^\n") + } + fmt.Print(sb.String()) + }, + } + cmd.Flags().BoolVar(&validate, "validate", false, "Exit with an error message if the prompt is invalid") + return cmd +} + +func parsePrompt(s string, name string) (string, []color.Attribute, error) { + var ( + b = make([]byte, 0, 16) // current text or attribute buffer + text []byte // if non nil, this has been initialized; we cannot have multiple quoted strings + attrs []color.Attribute + inQuote bool + ) + for i := 0; i < len(s); i++ { + c := s[i] + switch c { + case '\\': + if !inQuote { + return "", nil, errors.New("backslash is only allowed inside a quoted string") + } + if len(s) > i+1 { + b = append(b, s[i+1]) + i++ + } + case '"': + if inQuote { + text = append(text, b...) + b = b[:0] + inQuote = false + } else if text != nil { + return "", nil, errors.New("only one quoted string can appear in a prompt, we saw a second") + } else { + b = bytes.TrimSpace(b) + if len(b) > 0 { + return "", nil, fmt.Errorf("unexpected text %q before quoted string", string(b)) + } + inQuote = true + } + case ' ', '\t', ',': + if inQuote { + b = append(b, c) + continue + } + b = bytes.TrimSpace(b) + if len(b) == 0 { + continue + } + attr, ok := out.ParseColor(string(b)) + if !ok { + return "", nil, fmt.Errorf("invalid color or attribute %q", string(b)) + } + attrs = append(attrs, attr) + b = b[:0] + default: + b = append(b, c) + } + } + + // If b is non-empty, the prompt either ended in unneeded spaces or it + // ended in an attr -- any ending quote is handled in the above block. + if b = bytes.TrimSpace(b); len(b) > 0 { + attr, ok := out.ParseColor(string(b)) + if !ok { + return "", nil, fmt.Errorf("invalid color or attribute %q", string(b)) + } + attrs = append(attrs, attr) + } + + output := make([]byte, 0, len(s)+len(text)) + for i := 0; i < len(text); i++ { + c := text[i] + switch c { + case '%': + if len(text) > i+1 { + switch text[i+1] { + case 'p', 'n': + output = append(output, name...) + i++ + case '%': + output = append(output, '%') + i++ + default: + return "", nil, fmt.Errorf("unknown escape %%%c", text[i+1]) + } + } + default: + output = append(output, c) + } + } + return string(output), attrs, nil +} + +func splitPromptParens(s string) ([]string, error) { + s = strings.TrimSpace(s) + if len(s) == 0 { + return nil, nil + } + + var ( + parens []string + current []byte + onlyParen = s[0] != '(' + inParen = onlyParen + inQuote bool + inEsc bool + ) + + for i := 0; i < len(s); i++ { + c := s[i] + + if !inParen { + switch c { + case ' ', '\t', ',': + continue + case '(': + inParen = true + continue + default: + return nil, fmt.Errorf("unexpected character %c while looking for paren group", c) + } + } + + if inEsc { + inEsc = false + current = append(current, c) + continue + } + + if inQuote { + if c == '\\' { + inEsc = true + } else if c == '"' { + inQuote = false + } + current = append(current, c) + continue + } + + switch c { + case '\\': + inEsc = true + case '"': + inQuote = true + case ')': + if onlyParen { + return nil, errors.New("unexpected closing paren )") + } + inParen = false + current = bytes.TrimSpace(current) + if len(current) > 0 { + parens = append(parens, string(current)) + } + current = current[:0] + continue + } + current = append(current, c) + } + if onlyParen { + if len(current) > 0 { + parens = append(parens, string(current)) + } + } else if len(current) > 0 { + return nil, errors.New("prompt is missing closing paren") + } + return parens, nil +} diff --git a/src/go/rpk/pkg/cli/profile/prompt_test.go b/src/go/rpk/pkg/cli/profile/prompt_test.go new file mode 100644 index 000000000000..3e5127159083 --- /dev/null +++ b/src/go/rpk/pkg/cli/profile/prompt_test.go @@ -0,0 +1,86 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package profile + +import ( + "reflect" + "testing" + + "github.com/fatih/color" +) + +func TestSplitPromptParens(t *testing.T) { + for _, test := range []struct { + in string + exp []string + expErr bool + }{ + {"", nil, false}, + {"()", nil, false}, + {" ( ) ", nil, false}, + {" ", nil, false}, + {` blue red ,gr\een,\"text"`, []string{`blue red ,gr\een,\"text"`}, false}, // we just parse parens, not contents + {` unexpected end paren) `, nil, true}, + {` (unclosed open paren`, nil, true}, + {` ("\)")`, []string{`"\)"`}, false}, + {` ( ) asdf `, nil, true}, + {` ( ) (red blue green ) (bog)`, []string{"red blue green", "bog"}, false}, + } { + t.Run(test.in, func(t *testing.T) { + got, err := splitPromptParens(test.in) + gotErr := err != nil + if gotErr != test.expErr { + t.Errorf("got err? %v (%v), exp %v", gotErr, err, test.expErr) + return + } + if !reflect.DeepEqual(got, test.exp) { + t.Errorf("got %v != exp %v", got, test.exp) + } + }) + } +} + +func TestParsePrompt(t *testing.T) { + const name = "foo" + for _, test := range []struct { + in string + expText string + expAttr []color.Attribute + expErr bool + }{ + {"", "", nil, false}, // empty is ok + {`blue , green , bg-hi-blue, "%n"`, "foo", []color.Attribute{color.FgBlue, color.FgGreen, color.BgHiBlue}, false}, // somewhat complete + {`\"blue\"`, "", nil, true}, // backslash only allowed in quoted str + {` "prompt" `, "prompt", nil, false}, // simple + {`unknown-thing `, "", nil, true}, // unknown keyword stripped + {`blue green red, bg-hi-blue`, "", []color.Attribute{color.FgBlue, color.FgGreen, color.FgRed, color.BgHiBlue}, false}, // attr at end is kept + {` " %n " `, " foo ", nil, false}, // name swapped in + {`"\\\%%%%n"`, "\\%%n", nil, false}, // escaping works, and %% works + {`"text1" "text2"`, "", nil, true}, // one quoted string + {`b"text"`, "", nil, true}, // unexpected text before quote + {`blue unknown`, "", nil, true}, // unknown attr at end + {`"%u"`, "", nil, true}, // unknown % escape + } { + t.Run(test.in, func(t *testing.T) { + gotText, gotAttr, err := parsePrompt(test.in, name) + gotErr := err != nil + if gotErr != test.expErr { + t.Errorf("got err? %v (%v), exp %v", gotErr, err, test.expErr) + return + } + if gotText != test.expText { + t.Errorf("got text %v != exp %v", gotText, test.expText) + } + if !reflect.DeepEqual(gotAttr, test.expAttr) { + t.Errorf("got attr %v != exp %v", gotAttr, test.expAttr) + } + }) + } +} diff --git a/src/go/rpk/pkg/cli/profile/set_defaults.go b/src/go/rpk/pkg/cli/profile/set_defaults.go new file mode 100644 index 000000000000..75386f511bcf --- /dev/null +++ b/src/go/rpk/pkg/cli/profile/set_defaults.go @@ -0,0 +1,90 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package profile + +import ( + "fmt" + "strings" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newSetDefaultsCommand(fs afero.Fs, p *config.Params) *cobra.Command { + return &cobra.Command{ + Use: "set-defaults [KEY=VALUE]+", + Short: "Set rpk default fields", + Long: `Set rpk default fields. + +This command takes a list of key=value pairs to write to the defaults section +of rpk.yaml. The defaults section contains a set of settings that apply to all +profiles and changes the way that rpk acts. For a list of default flags and +what they mean, check 'rpk -X help' and look for any key that begins with +"defaults". + +This command supports autocompletion of valid keys. You can also use the +format 'set key value' if you intend to only set one key. +`, + + Args: cobra.MinimumNArgs(1), + ValidArgsFunction: validSetDefaultsArgs, + Run: func(_ *cobra.Command, args []string) { + cfg, err := p.Load(fs) + out.MaybeDie(err, "unable to load config: %v", err) + + // Other set commands are `set key value`, if people + // use that older form by force of habit, we support + // it. + if len(args) == 2 && !strings.Contains(args[0], "=") { + args = []string{args[0] + "=" + args[1]} + } + + y, err := cfg.ActualRpkYamlOrEmpty() + out.MaybeDie(err, "unable to load rpk.yaml: %v", err) + + err = doSetDefaults(y, args) + out.MaybeDieErr(err) + err = y.Write(fs) + out.MaybeDieErr(err) + fmt.Println("rpk.yaml updated successfully.") + }, + } +} + +func doSetDefaults(y *config.RpkYaml, set []string) error { + for _, kv := range set { + split := strings.SplitN(kv, "=", 2) + if len(split) != 2 { + return fmt.Errorf("invalid key=value pair %q", kv) + } + k, v := split[0], split[1] + if y, ok := config.XFlagYamlPath(k); ok { + k = y + } + err := config.Set(&y, k, v) + if err != nil { + return err + } + } + return nil +} + +func validSetDefaultsArgs(_ *cobra.Command, _ []string, toComplete string) (ps []string, d cobra.ShellCompDirective) { + var possibilities []string + _, ypaths := config.XRpkDefaultsFlags() + for _, p := range ypaths { + if strings.HasPrefix(p, toComplete) { + possibilities = append(possibilities, p+"=") + } + } + return possibilities, cobra.ShellCompDirectiveNoSpace +} diff --git a/src/go/rpk/pkg/cli/profile/use.go b/src/go/rpk/pkg/cli/profile/use.go index 9705f78e0109..835e950753a9 100644 --- a/src/go/rpk/pkg/cli/profile/use.go +++ b/src/go/rpk/pkg/cli/profile/use.go @@ -23,7 +23,7 @@ func newUseCommand(fs afero.Fs, p *config.Params) *cobra.Command { Use: "use [NAME]", Short: "Select the rpk profile to use", Args: cobra.ExactArgs(1), - ValidArgsFunction: validProfiles(fs, p), + ValidArgsFunction: ValidProfiles(fs, p), Run: func(_ *cobra.Command, args []string) { cfg, err := p.Load(fs) out.MaybeDie(err, "unable to load config: %v", err) diff --git a/src/go/rpk/pkg/cli/root.go b/src/go/rpk/pkg/cli/root.go index e5056f68b91e..f86aeee26839 100644 --- a/src/go/rpk/pkg/cli/root.go +++ b/src/go/rpk/pkg/cli/root.go @@ -75,9 +75,9 @@ func Execute() { } pf := root.PersistentFlags() pf.StringVar(&p.ConfigFlag, "config", "", "Redpanda or rpk config file; default search paths are ~/.config/rpk/rpk.yaml, $PWD, and /etc/redpanda/redpanda.yaml") + pf.StringVar(&p.Profile, "profile", "", "rpk profile to use") pf.StringArrayVarP(&p.FlagOverrides, "config-opt", "X", nil, "Override rpk configuration settings; '-X help' for detail or '-X list' for terser detail") - pf.StringVarP(&p.LogLevel, "verbose", "v", "none", "Log level (none, error, warn, info, debug)") - pf.Lookup("verbose").NoOptDefVal = "info" + pf.BoolVarP(&p.DebugLogs, "verbose", "v", false, "Enable verbose logging") root.RegisterFlagCompletionFunc("config-opt", func(_ *cobra.Command, _ []string, toComplete string) ([]string, cobra.ShellCompDirective) { var opts []string @@ -93,6 +93,7 @@ func Execute() { } return opts, cobra.ShellCompDirectiveNoSpace }) + root.RegisterFlagCompletionFunc("profile", profile.ValidProfiles(fs, p)) root.AddCommand( acl.NewCommand(fs, p), diff --git a/src/go/rpk/pkg/cli/topic/describe.go b/src/go/rpk/pkg/cli/topic/describe.go index a9ff6c74004b..ee3703014b1b 100644 --- a/src/go/rpk/pkg/cli/topic/describe.go +++ b/src/go/rpk/pkg/cli/topic/describe.go @@ -33,6 +33,7 @@ func newDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command { summary bool configs bool partitions bool + stable bool ) cmd := &cobra.Command{ Use: "describe [TOPIC]", @@ -152,7 +153,7 @@ partitions section. By default, the summary and configs sections are printed. } header("PARTITIONS", partitions, func() { - offsets := listStartEndOffsets(cl, topic, len(t.Partitions)) + offsets := listStartEndOffsets(cl, topic, len(t.Partitions), stable) tw := out.NewTable(describePartitionsHeaders( t.Partitions, @@ -183,6 +184,8 @@ partitions section. By default, the summary and configs sections are printed. cmd.Flags().BoolVarP(&partitions, "print-partitions", "p", false, "Print the detailed partitions section") cmd.Flags().BoolVarP(&all, "print-all", "a", false, "Print all sections") + cmd.Flags().BoolVar(&stable, "stable", false, "Include the stable offsets column in the partitions section; only relevant if you produce to this topic transactionally") + return cmd } @@ -200,6 +203,9 @@ func getDescribeUsed(partitions []kmsg.MetadataResponseTopicPartition, offsets [ } } for _, o := range offsets { + // The default stableErr is errUnlisted. We avoid listing + // stable offsets unless the user asks, so by default, we do + // not print the stable column. if o.stableErr == nil && o.endErr == nil && o.stable != o.end { useStable = true } @@ -304,9 +310,7 @@ var errUnlisted = errors.New("list failed") // always contain the one topic we asked for, and it will contain all // partitions we asked for. The logic below will panic redpanda replies // incorrectly. -func listStartEndOffsets( - cl *kgo.Client, topic string, numPartitions int, -) []startStableEndOffset { +func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable bool) []startStableEndOffset { offsets := make([]startStableEndOffset, 0, numPartitions) for i := 0; i < numPartitions; i++ { @@ -352,41 +356,45 @@ func listStartEndOffsets( return offsets } - // Next we ask for the latest offsets (special timestamp -1). + // Both HWM and stable offset checks require Timestamp = -1. for i := range req.Topics[0].Partitions { req.Topics[0].Partitions[i].Timestamp = -1 } - shards = cl.RequestSharded(context.Background(), req) - allFailed = kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { - resp := shard.Resp.(*kmsg.ListOffsetsResponse) - if len(resp.Topics) > 0 { - for _, partition := range resp.Topics[0].Partitions { - o := &offsets[partition.Partition] - o.end = partition.Offset - o.endErr = kerr.ErrorForCode(partition.ErrorCode) + + // If the user requested stable offsets, we ask for them second. If we + // requested these before requesting the HWM, then we could show stable + // being higher than the HWM. Stable offsets are only relevant if + // transactions are in play. + if stable { + req.IsolationLevel = 1 + shards = cl.RequestSharded(context.Background(), req) + allFailed = kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { + resp := shard.Resp.(*kmsg.ListOffsetsResponse) + if len(resp.Topics) > 0 { + for _, partition := range resp.Topics[0].Partitions { + o := &offsets[partition.Partition] + o.stable = partition.Offset + o.stableErr = kerr.ErrorForCode(partition.ErrorCode) + } } + }) + if allFailed { + return offsets } - }) - // It is less likely to succeed on the first attempt and fail the second, - // but we may as well avoid trying a third if we do. - if allFailed { - return offsets } - // Finally, we ask for the last stable offsets (relevant for transactions). - req.IsolationLevel = 1 + // Finally, the HWM. shards = cl.RequestSharded(context.Background(), req) kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { resp := shard.Resp.(*kmsg.ListOffsetsResponse) if len(resp.Topics) > 0 { for _, partition := range resp.Topics[0].Partitions { o := &offsets[partition.Partition] - o.stable = partition.Offset - o.stableErr = kerr.ErrorForCode(partition.ErrorCode) + o.end = partition.Offset + o.endErr = kerr.ErrorForCode(partition.ErrorCode) } } }) - return offsets } diff --git a/src/go/rpk/pkg/config/params.go b/src/go/rpk/pkg/config/params.go index 47d886be498f..990f64abe9c4 100644 --- a/src/go/rpk/pkg/config/params.go +++ b/src/go/rpk/pkg/config/params.go @@ -71,6 +71,7 @@ const ( const ( xkindProfile = iota // configuration for the current profile xkindCloudAuth // configuration for the current cloud_auth + xkindDefault // configuration for rpk.yaml defaults ) type xflag struct { @@ -266,6 +267,73 @@ var xflags = map[string]xflag{ return nil }, }, + + "defaults.prompt": { + "defaults.prompt", + "bg-red \"%n\"", + xkindDefault, + func(v string, y *RpkYaml) error { + y.Defaults.Prompt = v + return nil + }, + }, + + "defaults.no_default_cluster": { + "defaults.no_default_cluster", + "false", + xkindDefault, + func(v string, y *RpkYaml) error { + b, err := strconv.ParseBool(v) + y.Defaults.NoDefaultCluster = b + return err + }, + }, + + "defaults.dial_timeout": { + "defaults.dial_timeout", + "3s", + xkindDefault, + func(v string, y *RpkYaml) error { + return y.Defaults.DialTimeout.UnmarshalText([]byte(v)) + }, + }, + + "defaults.request_timeout_overhead": { + "defaults.request_timeout_overhead", + "10s", + xkindDefault, + func(v string, y *RpkYaml) error { + return y.Defaults.RequestTimeoutOverhead.UnmarshalText([]byte(v)) + }, + }, + + "defaults.retry_timeout": { + "defaults.retry_timeout", + "30s", + xkindDefault, + func(v string, y *RpkYaml) error { + return y.Defaults.RetryTimeout.UnmarshalText([]byte(v)) + }, + }, + + "defaults.fetch_max_wait": { + "defaults.fetch_max_wait", + "5s", + xkindDefault, + func(v string, y *RpkYaml) error { + return y.Defaults.FetchMaxWait.UnmarshalText([]byte(v)) + }, + }, + + "defaults.redpanda_client_id": { + "defaults.redpanda_client_id", + "rpk", + xkindDefault, + func(v string, y *RpkYaml) error { + y.Defaults.RedpandaClientID = v + return nil + }, + }, } // XFlags returns the list of -X flags that are supported by rpk. @@ -299,6 +367,19 @@ func XCloudAuthFlags() (xs, yamlPaths []string) { return } +// XRpkDefaultsFlags returns all X flags that modify rpk defaults, and their +// corresponding yaml paths. Note that for rpk defaults, the X flags always +// have the same name as the yaml path and always begin with "defaults.". +func XRpkDefaultsFlags() (xs, yamlPaths []string) { + for k, v := range xflags { + if v.kind == xkindDefault { + xs = append(xs, k) + yamlPaths = append(yamlPaths, v.path) + } + } + return +} + // XFlagYamlPath returns the yaml path for the given x flag, if the // flag exists. func XFlagYamlPath(x string) (string, bool) { @@ -312,20 +393,18 @@ func XFlagYamlPath(x string) (string, bool) { // Params contains rpk-wide configuration parameters. type Params struct { // ConfigFlag is any flag-specified config path. - // - // This is unused until step (2) in the refactoring process. ConfigFlag string - // LogLevel can be either none (default), error, warn, info, or debug, - // or any prefix of those strings, upper or lower case. + // Profile is any flag-specified profile name. + Profile string + + // DebugLogs opts into debug logging. // - // This field is meant to be set, to actually get a logger after the + // This field only for setting, to actually get a logger after the // field is set, use Logger(). - LogLevel string + DebugLogs bool // FlagOverrides are any flag-specified config overrides. - // - // This is unused until step (2) in the refactoring process. FlagOverrides []string loggerOnce sync.Once @@ -423,6 +502,41 @@ cloud.client_id=somestring cloud.client_secret=somelongerstring An oauth client secret to use for authenticating with the Redpanda Cloud API. + +defaults.prompt="%n" + A format string to use for the default prompt; see 'rpk profile prompt' for + more information. + +defaults.no_default_cluster=false + A boolean that disables rpk from talking to localhost:9092 if no other + cluster is specified. + +defaults.dial_timeout=3s + A duration that rpk will wait for a connection to be established before + timing out. + +defaults.request_timeout_overhead=10s + A duration that limits how long rpk waits for responses, *on top* of any + request-internal timeout. For example, ListOffsets has no Timeout field so + if request_timeout_overhead is 10s, rpk will wait for 10s for a response. + However, JoinGroup has a RebalanceTimeoutMillis field, so the 10s is applied + on top of the rebalance timeout. + +defaults.retry_timeout=30s + This timeout specifies how long rpk will retry Kafka API requests. This + timeout is evaluated before any backoff -- if a request fails, we first check + if the retry timeout has elapsed and if so, we stop retrying. If not, we wait + for the backoff and then retry. + +defaults.fetch_max_wait=5s + This timeout specifies the maximum time that brokers will wait before + replying to a fetch request with whatever data is available. + +defaults.redpanda_client_id=rpk + This string value is the client ID that rpk uses when issuing Kafka protocol + requests to Redpanda. This client ID shows up in Redpanda logs and metrics, + changing it can be useful if you want to have your own rpk client stand out + from others that may be hitting the cluster. ` } @@ -443,6 +557,13 @@ admin.tls.cert=/path/to/cert.pem admin.tls.key=/path/to/key.pem cloud.client_id=somestring cloud.client_secret=somelongerstring +defaults.prompt="%n" +defaults.no_default_cluster=boolean +defaults.dial_timeout=duration(3s,1m,2h) +defaults.request_timeout_overhead=duration(10s,1m,2h) +defaults.retry_timeout=duration(30s,1m,2h) +defaults.fetch_max_wait=duration(5s,1m,2h) +defaults.redpanda_client_id=rpk ` } @@ -460,7 +581,17 @@ func (p *Params) InstallKafkaFlags(cmd *cobra.Command) { pf.StringVar(&p.password, "password", "", "SASL password to be used for authentication") pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512)") + pf.MarkHidden("brokers") + pf.MarkHidden(FlagSASLUser) + pf.MarkHidden("password") + pf.MarkHidden("sasl-mechanism") + p.InstallTLSFlags(cmd) + + pf.MarkHidden(FlagEnableTLS) + pf.MarkHidden(FlagTLSCA) + pf.MarkHidden(FlagTLSCert) + pf.MarkHidden(FlagTLSKey) } // InstallTLSFlags adds the original rpk Kafka API TLS set of flags to this @@ -489,6 +620,14 @@ func (p *Params) InstallAdminFlags(cmd *cobra.Command) { pf.StringVar(&p.adminCAFile, "admin-api-tls-truststore", "", "The CA certificate to be used for TLS communication with the admin API") pf.StringVar(&p.adminCertFile, "admin-api-tls-cert", "", "The certificate to be used for TLS authentication with the admin API") pf.StringVar(&p.adminKeyFile, "admin-api-tls-key", "", "The certificate key to be used for TLS authentication with the admin API") + + pf.MarkHidden("api-urls") + pf.MarkHidden("hosts") + pf.MarkHidden("admin-url") + pf.MarkHidden("admin-api-tls-enabled") + pf.MarkHidden("admin-api-tls-truststore") + pf.MarkHidden("admin-api-tls-cert") + pf.MarkHidden("admin-api-tls-key") } // InstallCloudFlags adds the --client-id and --client-secret flags that @@ -599,6 +738,11 @@ func (p *Params) Load(fs afero.Fs) (*Config, error) { c.fixSchemePorts() // strip any scheme, default any missing ports c.addConfigToProfiles() c.parseDevOverrides() + + if !c.rpkYaml.Defaults.NoDefaultCluster { + c.ensureBrokerAddrs() + } + return c, nil } @@ -607,41 +751,13 @@ func (p *Params) SugarLogger() *zap.SugaredLogger { return p.Logger().Sugar() } -// Logger parses p.LogLevel and returns the corresponding zap logger or -// a NopLogger if the log level is invalid. +// Logger parses returns the corresponding zap logger or a NopLogger. func (p *Params) Logger() *zap.Logger { p.loggerOnce.Do(func() { - // First we normalize the level. We support prefixes such - // that "w" means warn. - p.LogLevel = strings.TrimSpace(strings.ToLower(p.LogLevel)) - if p.LogLevel == "" { - p.LogLevel = "none" - } - var ok bool - for _, level := range []string{"none", "error", "warn", "info", "debug"} { - if strings.HasPrefix(level, p.LogLevel) { - p.LogLevel, ok = level, true - break - } - } - if !ok { + if !p.DebugLogs { p.logger = zap.NewNop() return } - var level zapcore.Level - switch p.LogLevel { - case "none": - p.logger = zap.NewNop() - return - case "error": - level = zap.ErrorLevel - case "warn": - level = zap.WarnLevel - case "info": - level = zap.InfoLevel - case "debug": - level = zap.DebugLevel - } // Now the zap config. We want to to the console and make the logs // somewhat nice. The log time is effectively time.TimeMillisOnly. @@ -649,7 +765,7 @@ func (p *Params) Logger() *zap.Logger { // level to three letters, and we only add color if this is a // terminal. zcfg := zap.NewProductionConfig() - zcfg.Level = zap.NewAtomicLevelAt(level) + zcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) zcfg.DisableCaller = true zcfg.DisableStacktrace = true zcfg.Sampling = nil @@ -712,15 +828,12 @@ func readFile(fs afero.Fs, path string) (string, []byte, error) { return abs, file, err } -func (p *Params) backcompatOldCloudYaml(fs afero.Fs) error { +func (*Params) backcompatOldCloudYaml(fs afero.Fs) error { def, err := DefaultRpkYamlPath() if err != nil { - // If the user has deliberately unset HOME and is using a --config - // flag, we will just avoid backcompatting the __cloud.yaml file. - if p.ConfigFlag != "" { - return nil - } - return err + //nolint:nilerr // This error only happens if the user unset $HOME, and + // if they do that, we will avoid failing / avoid backcompat here. + return nil } // Read and parse the old file. If it does not exist, that's great. @@ -799,7 +912,11 @@ func (p *Params) readRpkConfig(fs afero.Fs, c *Config) error { if p.ConfigFlag != "" { path = p.ConfigFlag } else if err != nil { - return err + //nolint:nilerr // If $HOME is unset, we do not read any file. If the user + // eventually tries to write, we fail in Write. Allowing + // $HOME to not exists allows rpk to work in CI settings + // where all config flags are being specified. + return nil } abs, file, err := readFile(fs, path) if err != nil { @@ -831,6 +948,13 @@ func (p *Params) readRpkConfig(fs afero.Fs, c *Config) error { } yaml.Unmarshal(file, &c.rpkYamlActual) + if p.Profile != "" { + if c.rpkYaml.Profile(p.Profile) == nil { + return fmt.Errorf("selected profile %q does not exist", p.Profile) + } + c.rpkYaml.CurrentProfile = p.Profile + c.rpkYamlActual.CurrentProfile = p.Profile + } c.rpkYamlExists = true c.rpkYaml.fileLocation = abs c.rpkYamlActual.fileLocation = abs @@ -933,6 +1057,27 @@ func (c *Config) ensureRpkCloudAuth() { dst.PushAuth(def) } +func (c *Config) ensureBrokerAddrs() { + { + dst := &c.redpandaYaml + if len(dst.Rpk.KafkaAPI.Brokers) == 0 { + dst.Rpk.KafkaAPI.Brokers = []string{net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultKafkaPort))} + } + if len(dst.Rpk.AdminAPI.Addresses) == 0 { + dst.Rpk.AdminAPI.Addresses = []string{net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultAdminPort))} + } + } + { + dst := c.rpkYaml.Profile(c.rpkYaml.CurrentProfile) // must exist by this function + if len(dst.KafkaAPI.Brokers) == 0 { + dst.KafkaAPI.Brokers = []string{net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultKafkaPort))} + } + if len(dst.AdminAPI.Addresses) == 0 { + dst.AdminAPI.Addresses = []string{net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultAdminPort))} + } + } +} + // We merge redpanda.yaml's rpk section back into rpk.yaml's profile. This // picks up any extras from addUnsetRedpandaDefaults that were not set in the // rpk file. We call this after ensureRpkProfile, so we do not need to @@ -1055,14 +1200,6 @@ func (c *Config) addUnsetRedpandaDefaults(actual bool) { dst.Rpk.AdminAPI.TLS = dst.Rpk.KafkaAPI.TLS } } - - if len(dst.Rpk.KafkaAPI.Brokers) == 0 { - dst.Rpk.KafkaAPI.Brokers = []string{net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultKafkaPort))} - } - - if len(dst.Rpk.AdminAPI.Addresses) == 0 { - dst.Rpk.AdminAPI.Addresses = []string{net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultAdminPort))} - } } func (c *Config) fixSchemePorts() error { diff --git a/src/go/rpk/pkg/config/params_test.go b/src/go/rpk/pkg/config/params_test.go index 71bcaead195d..fdbf717df5c6 100644 --- a/src/go/rpk/pkg/config/params_test.go +++ b/src/go/rpk/pkg/config/params_test.go @@ -303,21 +303,6 @@ func TestAddUnsetRedpandaDefaults(t *testing.T) { inCfg *RedpandaYaml expCfg *RedpandaYaml }{ - { - name: "default kafka broker and default admin api", - inCfg: &RedpandaYaml{}, - expCfg: &RedpandaYaml{ - Rpk: RpkNodeConfig{ - KafkaAPI: RpkKafkaAPI{ - Brokers: []string{"127.0.0.1:9092"}, - }, - AdminAPI: RpkAdminAPI{ - Addresses: []string{"127.0.0.1:9644"}, - }, - }, - }, - }, - { name: "rpk configuration left alone if present", inCfg: &RedpandaYaml{ @@ -1056,6 +1041,7 @@ func TestXSetExamples(t *testing.T) { for _, fn := range []func() (xs, yamlPaths []string){ XProfileFlags, XCloudAuthFlags, + XRpkDefaultsFlags, } { xs, yamlPaths := fn() for i, x := range xs { @@ -1073,6 +1059,8 @@ func TestXSetExamples(t *testing.T) { err = Set(new(RpkProfile), yamlPath, xf.testExample) case xkindCloudAuth: err = Set(new(RpkCloudAuth), yamlPath, xf.testExample) + case xkindDefault: + err = Set(new(RpkYaml), yamlPath, xf.testExample) default: t.Errorf("unrecognized xflag kind %v", xf.kind) continue @@ -1087,3 +1075,15 @@ func TestXSetExamples(t *testing.T) { t.Errorf("xflags still contains keys %v after checking all examples in this test", maps.Keys(m)) } } + +func TestXSetDefaultsPaths(t *testing.T) { + xs, paths := XRpkDefaultsFlags() + for i, x := range xs { + if paths[i] != x { + t.Errorf("XRpkDefaultsFlags() returned different xflag %s and path %s", x, paths[i]) + } + if !strings.HasPrefix(x, "defaults.") { + t.Errorf("XRpkDefaultsFlags() returned xflag %s that doesn't start with defaults.", x) + } + } +} diff --git a/src/go/rpk/pkg/config/rpk_yaml.go b/src/go/rpk/pkg/config/rpk_yaml.go index 95d2da637165..3418e16dcd88 100644 --- a/src/go/rpk/pkg/config/rpk_yaml.go +++ b/src/go/rpk/pkg/config/rpk_yaml.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "reflect" + "time" "github.com/spf13/afero" "go.uber.org/zap" @@ -34,10 +35,7 @@ func DefaultRpkYamlPath() (string, error) { } func defaultVirtualRpkYaml() (RpkYaml, error) { - path, err := DefaultRpkYamlPath() - if err != nil { - return RpkYaml{}, err - } + path, _ := DefaultRpkYamlPath() // if err is non-nil, we fail in Write y := RpkYaml{ fileLocation: path, Version: 1, @@ -88,15 +86,49 @@ type ( // upgrade rpk". Version int `yaml:"version"` + Defaults RpkDefaults `yaml:"defaults,omitempty"` + CurrentProfile string `yaml:"current_profile"` CurrentCloudAuth string `yaml:"current_cloud_auth"` Profiles []RpkProfile `yaml:"profiles,omitempty"` CloudAuths []RpkCloudAuth `yaml:"cloud_auth,omitempty"` } + RpkDefaults struct { + // Prompt is the prompt to use for all profiles, unless the + // profile itself overrides it. + Prompt string `yaml:"prompt"` + + // NoDefaultCluster disables localhost:{9092,9644} as a default + // profile when no other is selected. + NoDefaultCluster bool `yaml:"no_default_cluster"` + + // DialTimeout is how long we allow for initiating a connection + // to brokers for the Admin API and Kafka API. + DialTimeout Duration `yaml:"dial_timeout"` + + // RequestTimeoutOverhead, for Kafka API requests, how long do + // we give the request on top of any request's timeout field. + RequestTimeoutOverhead Duration `yaml:"request_timeout_overhead"` + + // RetryTimeout allows us to retry requests. If see we need to + // retry before the retry timeout has elapsed, we do -- even if + // backing off after we know to retry pushes us past the + // timeout. + RetryTimeout Duration `yaml:"retry_timeout"` + + // FetchMaxWait is how long we give the broker to respond to + // fetch requests. + FetchMaxWait Duration `yaml:"fetch_max_wait"` + + // RedpandaClientID is the client ID to use for the Kafka API. + RedpandaClientID string `yaml:"redpanda_client_id"` + } + RpkProfile struct { - Name string `yaml:"name,omitempty"` + Name string `yaml:"name"` Description string `yaml:"description,omitempty"` + Prompt string `yaml:"prompt,omitempty"` FromCloud bool `yaml:"from_cloud,omitempty"` CloudCluster *RpkCloudCluster `yaml:"cloud_cluster,omitempty"` KafkaAPI RpkKafkaAPI `yaml:"kafka_api,omitempty"` @@ -121,6 +153,8 @@ type ( ClientID string `yaml:"client_id,omitempty"` ClientSecret string `yaml:"client_secret,omitempty"` } + + Duration struct{ time.Duration } ) // Profile returns the given profile, or nil if it does not exist. @@ -214,6 +248,11 @@ func (p *RpkProfile) SugarLogger() *zap.SugaredLogger { return p.Logger().Sugar() } +// Defaults returns the virtual defaults for the rpk.yaml. +func (p *RpkProfile) Defaults() *RpkDefaults { + return &p.c.rpkYaml.Defaults +} + // HasClientCredentials returns if both ClientID and ClientSecret are empty. func (a *RpkCloudAuth) HasClientCredentials() bool { k, _ := a.Kind() @@ -267,3 +306,21 @@ func (y *RpkYaml) WriteAt(fs afero.Fs, path string) error { } return rpkos.ReplaceFile(fs, path, b, 0o644) } + +//////////////// +// MISC TYPES // +//////////////// + +// MarshalText implements encoding.TextMarshaler. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d.Duration.String()), nil +} + +// UnmarshalText implements encoding.TextUnmarshaler. +func (d *Duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return err +} + +func (*Duration) YamlTypeNameForTest() string { return "duration" } diff --git a/src/go/rpk/pkg/config/rpk_yaml_test.go b/src/go/rpk/pkg/config/rpk_yaml_test.go index b3a306a97ef2..c029b043784f 100644 --- a/src/go/rpk/pkg/config/rpk_yaml_test.go +++ b/src/go/rpk/pkg/config/rpk_yaml_test.go @@ -78,10 +78,34 @@ func TestRpkYamlVersion(t *testing.T) { } fmt.Fprintf(sb, "%s%s: ", spaces(), sf.Name) - if !walk(sf.Type) { - return false + addr := sf.Type + typ := sf.Type + if addr.Kind() != reflect.Ptr { + addr = reflect.PointerTo(addr) + } + for typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + + _, hasUnmarshalText := addr.MethodByName("UnmarshalText") + if hasUnmarshalText { + fnt, ok := addr.MethodByName("YamlTypeNameForTest") + if !ok { + t.Errorf("field %s.%s at %s has UnmarshalText no YamlTypeNameForTest", typ.Name(), sf.Name, sb.String()) + return false + } + if fnt.Type.NumIn() != 1 || fnt.Type.NumOut() != 1 || fnt.Type.Out(0).Kind() != reflect.String { + t.Errorf("field %s.%s at %s YamlTypeNameForTest: wrong signature", typ.Name(), sf.Name, sb.String()) + return false + } + name := reflect.New(typ).MethodByName("YamlTypeNameForTest").Call(nil)[0].String() + fmt.Fprintf(sb, "%s", name) + } else { + if !walk(sf.Type) { + return false + } } - fmt.Fprintf(sb, ", `yaml:\"%s\"`\n", tag) + fmt.Fprintf(sb, " `yaml:\"%s\"`\n", tag) } return true } @@ -95,10 +119,11 @@ func TestRpkYamlVersion(t *testing.T) { shastr := hex.EncodeToString(sha[:]) const ( - v1sha = "0a7e145ddf8b86aea0478bf976b0149b6a7df57cae19d89b87b12f5ccdeddb14" // 23-05-25 + v1sha = "2ad0d7eee688f2a38a0ca126c919bbc5af2c42b6c848fa40947dfc6dd5a2fce3" // 23-06-08 ) if shastr != v1sha { t.Errorf("rpk.yaml type shape has changed (got sha %s != exp %s, if fields were reordered, update the valid v1 sha, otherwise bump the rpk.yaml version number", shastr, v1sha) + t.Errorf("current shape:\n%s\n", sb.String()) } } diff --git a/src/go/rpk/pkg/kafka/client_franz.go b/src/go/rpk/pkg/kafka/client_franz.go index 7ebdf3a49756..ffb6d3feb022 100644 --- a/src/go/rpk/pkg/kafka/client_franz.go +++ b/src/go/rpk/pkg/kafka/client_franz.go @@ -31,6 +31,11 @@ import ( func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*kgo.Client, error) { k := &p.KafkaAPI + d := p.Defaults() + if len(k.Brokers) == 0 && d.NoDefaultCluster { + return nil, errors.New("no brokers specified and rpk.yaml is configured to not use a default cluster") + } + opts := []kgo.Opt{ kgo.SeedBrokers(k.Brokers...), kgo.ClientID("rpk"), @@ -68,6 +73,25 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k kgo.MetadataMinAge(250 * time.Millisecond), } + // We apply user overrides after our defaults above. Options are + // applied in order, so appending at the end overrides anything + // above. + if d := d.DialTimeout; d.Duration != 0 { + opts = append(opts, kgo.DialTimeout(d.Duration)) + } + if d := d.RequestTimeoutOverhead; d.Duration != 0 { + opts = append(opts, kgo.RequestTimeoutOverhead(d.Duration)) + } + if d := d.RetryTimeout; d.Duration != 0 { + opts = append(opts, kgo.RetryTimeout(d.Duration)) + } + if d := d.FetchMaxWait; d.Duration != 0 { + opts = append(opts, kgo.FetchMaxWait(d.Duration)) + } + if id := d.RedpandaClientID; id != "" { + opts = append(opts, kgo.ClientID(id)) + } + if k.SASL != nil { mech := scram.Auth{ User: k.SASL.User, @@ -97,9 +121,7 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k } // NewAdmin returns a franz-go admin client. -func NewAdmin( - fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt, -) (*kadm.Client, error) { +func NewAdmin(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*kadm.Client, error) { cl, err := NewFranzClient(fs, p, extraOpts...) if err != nil { return nil, err @@ -116,9 +138,7 @@ func MetaString(meta kgo.BrokerMetadata) string { // EachShard calls fn for each non-erroring response in shards. If some, but not // all, requests fail, this prints a summary message. -func EachShard( - req kmsg.Request, shards []kgo.ResponseShard, fn func(kgo.ResponseShard), -) (allFailed bool) { +func EachShard(req kmsg.Request, shards []kgo.ResponseShard, fn func(kgo.ResponseShard)) (allFailed bool) { if len(shards) == 1 && shards[0].Err != nil { shard := shards[0] meta := "" diff --git a/src/go/rpk/pkg/os/file.go b/src/go/rpk/pkg/os/file.go index 16c4f4c02779..a25dd499608a 100644 --- a/src/go/rpk/pkg/os/file.go +++ b/src/go/rpk/pkg/os/file.go @@ -143,6 +143,9 @@ func EditTmpYAMLFile[T any](fs afero.Fs, v T) (T, error) { if err != nil { return update, fmt.Errorf("unable to edit: %w", err) } + if len(read) == 0 || string(read) == "\n" { + return update, fmt.Errorf("no changes made") + } if err := yaml.Unmarshal(read, &update); err != nil { return update, fmt.Errorf("unable to parse edited file: %w", err) } diff --git a/src/go/rpk/pkg/out/color.go b/src/go/rpk/pkg/out/color.go new file mode 100644 index 000000000000..2dcc27358356 --- /dev/null +++ b/src/go/rpk/pkg/out/color.go @@ -0,0 +1,99 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package out + +import ( + "strings" + + "github.com/fatih/color" +) + +// ParseColor parses a string into a color.Attribute. Foreground colors are +// simply named ("red", "black"), highlights have a "hi-" prefix, and +// background colors have an additional "bg-" prefix. Bold, faint, underline, +// and invert attributes are also supported by name directly. +func ParseColor(c string) (color.Attribute, bool) { + switch strings.ToLower(strings.TrimSpace(c)) { + case "black": + return color.FgBlack, true + case "red": + return color.FgRed, true + case "green": + return color.FgGreen, true + case "yellow": + return color.FgYellow, true + case "blue": + return color.FgBlue, true + case "magenta": + return color.FgMagenta, true + case "cyan": + return color.FgCyan, true + case "white": + return color.FgWhite, true + case "hi-black": + return color.FgHiBlack, true + case "hi-red": + return color.FgHiRed, true + case "hi-green": + return color.FgHiGreen, true + case "hi-yellow": + return color.FgHiYellow, true + case "hi-blue": + return color.FgHiBlue, true + case "hi-magenta": + return color.FgHiMagenta, true + case "hi-cyan": + return color.FgHiCyan, true + case "hi-white": + return color.FgHiWhite, true + case "bg-black": + return color.BgBlack, true + case "bg-red": + return color.BgRed, true + case "bg-green": + return color.BgGreen, true + case "bg-yellow": + return color.BgYellow, true + case "bg-blue": + return color.BgBlue, true + case "bg-magenta": + return color.BgMagenta, true + case "bg-cyan": + return color.BgCyan, true + case "bg-white": + return color.BgWhite, true + case "bg-hi-black": + return color.BgHiBlack, true + case "bg-hi-red": + return color.BgHiRed, true + case "bg-hi-green": + return color.BgHiGreen, true + case "bg-hi-yellow": + return color.BgHiYellow, true + case "bg-hi-blue": + return color.BgHiBlue, true + case "bg-hi-magenta": + return color.BgHiMagenta, true + case "bg-hi-cyan": + return color.BgHiCyan, true + case "bg-hi-white": + return color.BgHiWhite, true + case "bold": + return color.Bold, true + case "faint": + return color.Faint, true + case "underline": + return color.Underline, true + case "invert": + return color.ReverseVideo, true + default: + return 0, false + } +} diff --git a/src/go/rpk/pkg/out/out.go b/src/go/rpk/pkg/out/out.go index 49e2e7c7f08c..362f4c077300 100644 --- a/src/go/rpk/pkg/out/out.go +++ b/src/go/rpk/pkg/out/out.go @@ -73,6 +73,13 @@ func Die(msg string, args ...interface{}) { os.Exit(1) } +// DieString is like Die, but does not format the message. This still adds +// a newline. +func DieString(msg string) { + fmt.Fprintln(os.Stderr, msg) + os.Exit(1) +} + // MaybeDie calls Die if err is non-nil. func MaybeDie(err error, msg string, args ...interface{}) { if err != nil { diff --git a/tests/rptest/clients/rpk_remote.py b/tests/rptest/clients/rpk_remote.py index c3ea5ff8aaab..6d4452309026 100644 --- a/tests/rptest/clients/rpk_remote.py +++ b/tests/rptest/clients/rpk_remote.py @@ -74,9 +74,9 @@ def create_profile(self, name): cmd = ["create", name] return self._run_profile(cmd) - def create_profile_simple(self, name, cfg_location): + def create_profile_redpanda(self, name, cfg_location): return self._run_profile( - ['create', name, "--from-simple", cfg_location]) + ['create', name, "--from-redpanda", cfg_location]) def use_profile(self, name): cmd = ["use", name] diff --git a/tests/rptest/tests/rpk_profile_test.py b/tests/rptest/tests/rpk_profile_test.py index 584cc6d4dda0..1e086919c7b8 100644 --- a/tests/rptest/tests/rpk_profile_test.py +++ b/tests/rptest/tests/rpk_profile_test.py @@ -88,9 +88,9 @@ def test_use_profile(self): assert "no-flag-test" in topic_list @cluster(num_nodes=3) - def test_create_profile_from_simple(self): + def test_create_profile_from_redpanda(self): """ - Create redpanda.yaml, use create rpk profile --from simple + Create redpanda.yaml, use create rpk profile --from-redpanda """ node = self.redpanda.get_node(0) rpk = RpkRemoteTool(self.redpanda, node) @@ -99,8 +99,8 @@ def test_create_profile_from_simple(self): rpk.config_set("rpk.kafka_api.brokers", self.redpanda.brokers_list()) # Then we create the profile based on the redpanda.yaml - rpk.create_profile_simple("simple_test", - RedpandaService.NODE_CONFIG_FILE) + rpk.create_profile_redpanda("simple_test", + RedpandaService.NODE_CONFIG_FILE) rpk_cfg = read_rpk_cfg(node) redpanda_cfg = read_redpanda_cfg(node)