Skip to content

Commit

Permalink
kadm: add LeaveGroup api
Browse files Browse the repository at this point in the history
In support of CLI tooling, re: KIP-345
  • Loading branch information
twmb committed Oct 20, 2022
1 parent 2ee43e3 commit d3ee144
Showing 1 changed file with 137 additions and 0 deletions.
137 changes: 137 additions & 0 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,143 @@ func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGro
})
}

// LeaveGroupBuilder helps build a leave group request, rather than having
// a function signature (string, string, ...string).
//
// All functions on this type accept and return the same pointer, allowing
// for easy build-and-use usage.
type LeaveGroupBuilder struct {
group string
reason *string
instanceIDs []*string
}

// LeaveGroup returns a LeaveGroupBuilder for the input group.
func LeaveGroup(group string) *LeaveGroupBuilder {
return &LeaveGroupBuilder{
group: group,
}
}

// Reason attaches a reason to all members in the leave group request.
// This requires Kafka 3.2+.
func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder {
b.reason = StringPtr(reason)
return b
}

// InstanceIDs are members to remove from a group.
func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder {
for _, id := range ids {
if id != "" {
b.instanceIDs = append(b.instanceIDs, StringPtr(id))
}
}
return b
}

// LeaveGroupResponse contains the response for an individual instance ID that
// left a group.
type LeaveGroupResponse struct {
Group string // Group is the group that was left.
InstanceID string // InstanceID is the instance ID that left the group.
MemberID string // MemberID is the member ID that left the group.
Err error // Err is non-nil if this member did not exist.
}

// LeaveGroupResponses contains responses for each member of a leave group
// request. The map key is the instance ID that was removed from the group.
type LeaveGroupResponses map[string]LeaveGroupResponse

// Sorted returns all removed group members by instance ID.
func (ls LeaveGroupResponses) Sorted() []LeaveGroupResponse {
s := make([]LeaveGroupResponse, 0, len(ls))
for _, l := range ls {
s = append(s, l)
}
sort.Slice(s, func(i, j int) bool { return s[i].InstanceID < s[j].InstanceID })
return s
}

// EachError calls fn for every removed member that has a non-nil error.
func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse)) {
for _, l := range ls {
if l.Err != nil {
fn(l)
}
}
}

// Each calls fn for every removed member.
func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse)) {
for _, l := range ls {
fn(l)
}
}

// Error iterates over all removed members and returns the first error
// encountered, if any.
func (ls LeaveGroupResponses) Error() error {
for _, l := range ls {
if l.Err != nil {
return l.Err
}
}
return nil
}

// Ok returns true if there are no errors. This is a shortcut for ls.Error() ==
// nil.
func (ls LeaveGroupResponses) Ok() bool {
return ls.Error() == nil
}

// LeaveGroup causes instance IDs to leave a group.
//
// This function allows manually removing members using instance IDs from a
// group, which allows for fast scale down / host replacement (see KIP-345 for
// more detail). This returns an *AuthErr if the use is not authorized to
// remove members from groups.
func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error) {
if b == nil || len(b.instanceIDs) == 0 {
return nil, nil
}
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = b.group
for _, id := range b.instanceIDs {
m := kmsg.NewLeaveGroupRequestMember()
id := id
m.InstanceID = id
m.Reason = b.reason
req.Members = append(req.Members, m)
}

resp, err := req.RequestWith(ctx, cl.cl)
if err != nil {
return nil, err
}
if err := maybeAuthErr(resp.ErrorCode); err != nil {
return nil, err
}
if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
return nil, err
}

resps := make(LeaveGroupResponses)
for _, m := range resp.Members {
if m.InstanceID == nil {
continue // highly unexpected, buggy kafka
}
resps[*m.InstanceID] = LeaveGroupResponse{
Group: b.group,
MemberID: m.MemberID,
InstanceID: *m.InstanceID,
Err: kerr.ErrorForCode(resp.ErrorCode),
}
}
return resps, err
}

// OffsetResponse contains the response for an individual offset for offset
// methods.
type OffsetResponse struct {
Expand Down

0 comments on commit d3ee144

Please sign in to comment.