Skip to content

Commit

Permalink
rpk group describe: use new kadm Client.Lag function, add -c
Browse files Browse the repository at this point in the history
* For describing groups, often people (me) want to only see the commits
  / lag; -c makes that feasible
* Changes the logic to use kadm's new Lag function, which was modeled
  after this logic but has more expanded error handling
  • Loading branch information
twmb committed Jul 14, 2023
1 parent b37e629 commit 61e2512
Showing 1 changed file with 22 additions and 46 deletions.
68 changes: 22 additions & 46 deletions src/go/rpk/pkg/cli/group/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
)

func NewDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var summary bool

var summary, commits bool
cmd := &cobra.Command{
Use: "describe [GROUPS...]",
Short: "Describe group offset status & lag",
Expand All @@ -46,39 +45,20 @@ information about the members.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

described, err := adm.DescribeGroups(ctx, groups...)
out.HandleShardError("DescribeGroups", err)

lags, err := adm.Lag(ctx, groups...)
if err != nil {
out.Die("unable to describe groups: %v", err)
}
if summary {
printDescribedSummary(described)
printDescribedSummary(lags)
return
}

fetched := adm.FetchManyOffsets(ctx, groups...)
fetched.EachError(func(r kadm.FetchOffsetsResponse) {
fmt.Printf("unable to fetch offsets for group %q: %v\n", r.Group, r.Err)
delete(fetched, r.Group)
})
if fetched.AllFailed() {
out.Die("unable to fetch offsets for any group")
}

var listed kadm.ListedOffsets
listPartitions := described.AssignedPartitions()
listPartitions.Merge(fetched.CommittedPartitions())
if topics := listPartitions.Topics(); len(topics) > 0 {
listed, err = adm.ListEndOffsets(ctx, topics...)
out.HandleShardError("ListOffsets", err)
}

printDescribed(
described,
fetched,
listed,
)
printDescribed(commits, lags)
},
}
cmd.Flags().BoolVarP(&summary, "print-summary", "s", false, "Print only the group summary section")
cmd.Flags().BoolVarP(&commits, "print-commits", "c", false, "Print only the group commits section")
cmd.MarkFlagsMutuallyExclusive("print-summary", "print-commits")
return cmd
}

Expand All @@ -101,17 +81,11 @@ type describeRow struct {
err error
}

func printDescribed(
groups kadm.DescribedGroups,
fetched map[string]kadm.FetchOffsetsResponse,
listed kadm.ListedOffsets,
) {
for _, group := range groups.Sorted() {
lag := kadm.CalculateGroupLag(group, fetched[group.Group].Fetched, listed)

func printDescribed(commits bool, lags kadm.DescribedGroupLags) {
for _, group := range lags.Sorted() {
var rows []describeRow
var useInstanceID, useErr bool
for _, l := range lag.Sorted() {
for _, l := range group.Lag.Sorted() {
row := describeRow{
topic: l.End.Topic,
partition: l.End.Partition,
Expand Down Expand Up @@ -143,38 +117,40 @@ func printDescribed(
rows = append(rows, row)
}

printDescribedGroup(group, rows, useInstanceID, useErr)
printDescribedGroup(commits, group, rows, useInstanceID, useErr)
fmt.Println()
}
}

func printDescribedSummary(groups kadm.DescribedGroups) {
func printDescribedSummary(groups kadm.DescribedGroupLags) {
for _, group := range groups.Sorted() {
printDescribedGroupSummary(group)
}
}

func printDescribedGroupSummary(group kadm.DescribedGroup) {
func printDescribedGroupSummary(group kadm.DescribedGroupLag) {
tw := out.NewTabWriter()
defer tw.Flush()
fmt.Fprintf(tw, "GROUP\t%s\n", group.Group)
fmt.Fprintf(tw, "COORDINATOR\t%d\n", group.Coordinator.NodeID)
fmt.Fprintf(tw, "STATE\t%s\n", group.State)
fmt.Fprintf(tw, "BALANCER\t%s\n", group.Protocol)
fmt.Fprintf(tw, "MEMBERS\t%d\n", len(group.Members))
if group.Err != nil {
fmt.Fprintf(tw, "ERROR\t%s\n", group.Err)
if group.Error() != nil {
fmt.Fprintf(tw, "ERROR\t%s\n", group.Error())
}
}

func printDescribedGroup(
group kadm.DescribedGroup,
commits bool,
group kadm.DescribedGroupLag,
rows []describeRow,
useInstanceID bool,
useErr bool,
) {
printDescribedGroupSummary(group)

if !commits {
printDescribedGroupSummary(group)
}
if len(rows) == 0 {
return
}
Expand Down

0 comments on commit 61e2512

Please sign in to comment.