diff --git a/pkg/kadm/groups.go b/pkg/kadm/groups.go index 25f16b17..8bae20d5 100644 --- a/pkg/kadm/groups.go +++ b/pkg/kadm/groups.go @@ -415,6 +415,143 @@ func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGro }) } +// LeaveGroupBuilder helps build a leave group request, rather than having +// a function signature (string, string, ...string). +// +// All functions on this type accept and return the same pointer, allowing +// for easy build-and-use usage. +type LeaveGroupBuilder struct { + group string + reason *string + instanceIDs []*string +} + +// LeaveGroup returns a LeaveGroupBuilder for the input group. +func LeaveGroup(group string) *LeaveGroupBuilder { + return &LeaveGroupBuilder{ + group: group, + } +} + +// Reason attaches a reason to all members in the leave group request. +// This requires Kafka 3.2+. +func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder { + b.reason = StringPtr(reason) + return b +} + +// InstanceIDs are members to remove from a group. +func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder { + for _, id := range ids { + if id != "" { + b.instanceIDs = append(b.instanceIDs, StringPtr(id)) + } + } + return b +} + +// LeaveGroupResponse contains the response for an individual instance ID that +// left a group. +type LeaveGroupResponse struct { + Group string // Group is the group that was left. + InstanceID string // InstanceID is the instance ID that left the group. + MemberID string // MemberID is the member ID that left the group. + Err error // Err is non-nil if this member did not exist. +} + +// LeaveGroupResponses contains responses for each member of a leave group +// request. The map key is the instance ID that was removed from the group. +type LeaveGroupResponses map[string]LeaveGroupResponse + +// Sorted returns all removed group members by instance ID. +func (ls LeaveGroupResponses) Sorted() []LeaveGroupResponse { + s := make([]LeaveGroupResponse, 0, len(ls)) + for _, l := range ls { + s = append(s, l) + } + sort.Slice(s, func(i, j int) bool { return s[i].InstanceID < s[j].InstanceID }) + return s +} + +// EachError calls fn for every removed member that has a non-nil error. +func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse)) { + for _, l := range ls { + if l.Err != nil { + fn(l) + } + } +} + +// Each calls fn for every removed member. +func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse)) { + for _, l := range ls { + fn(l) + } +} + +// Error iterates over all removed members and returns the first error +// encountered, if any. +func (ls LeaveGroupResponses) Error() error { + for _, l := range ls { + if l.Err != nil { + return l.Err + } + } + return nil +} + +// Ok returns true if there are no errors. This is a shortcut for ls.Error() == +// nil. +func (ls LeaveGroupResponses) Ok() bool { + return ls.Error() == nil +} + +// LeaveGroup causes instance IDs to leave a group. +// +// This function allows manually removing members using instance IDs from a +// group, which allows for fast scale down / host replacement (see KIP-345 for +// more detail). This returns an *AuthErr if the use is not authorized to +// remove members from groups. +func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error) { + if b == nil || len(b.instanceIDs) == 0 { + return nil, nil + } + req := kmsg.NewPtrLeaveGroupRequest() + req.Group = b.group + for _, id := range b.instanceIDs { + m := kmsg.NewLeaveGroupRequestMember() + id := id + m.InstanceID = id + m.Reason = b.reason + req.Members = append(req.Members, m) + } + + resp, err := req.RequestWith(ctx, cl.cl) + if err != nil { + return nil, err + } + if err := maybeAuthErr(resp.ErrorCode); err != nil { + return nil, err + } + if err := kerr.ErrorForCode(resp.ErrorCode); err != nil { + return nil, err + } + + resps := make(LeaveGroupResponses) + for _, m := range resp.Members { + if m.InstanceID == nil { + continue // highly unexpected, buggy kafka + } + resps[*m.InstanceID] = LeaveGroupResponse{ + Group: b.group, + MemberID: m.MemberID, + InstanceID: *m.InstanceID, + Err: kerr.ErrorForCode(resp.ErrorCode), + } + } + return resps, err +} + // OffsetResponse contains the response for an individual offset for offset // methods. type OffsetResponse struct { @@ -1045,31 +1182,39 @@ func (l GroupTopicsLag) Sorted() []TopicLag { } // CalculateGroupLag returns the per-partition lag of all members in a group. -// The input to this method is the returns from the three following methods, +// The input to this method is the returns from the following methods (make +// sure to check shard errors): // -// described := DescribeGroups(ctx, group) -// fetched := FetchOffsets(ctx, group) -// toList := described.AssignedPartitions() -// toList.Merge(fetched.CommittedPartitions() -// ListEndOffsets(ctx, toList.Topics()) +// // Note that FetchOffsets exists to fetch only one group's offsets, +// // but some of the code below slightly changes. +// groups := DescribeGroups(ctx, group) +// commits := FetchManyOffsets(ctx, group) +// var endOffsets ListedOffsets +// listPartitions := described.AssignedPartitions() +// listPartitions.Merge(commits.CommittedPartitions() +// if topics := listPartitions.Topics(); len(topics) > 0 { +// endOffsets = ListEndOffsets(ctx, listPartitions.Topics()) +// } +// for _, group := range groups { +// lag := CalculateGroupLag(group, commits[group.Group].Fetched, endOffsets) +// } // -// If assigned partitions are missing in the listed end offsets listed end -// offsets, the partition will have an error indicating it is missing. A -// missing topic or partition in the commits is assumed to be nothing -// committing yet. +// If assigned partitions are missing in the listed end offsets, the partition +// will have an error indicating it is missing. A missing topic or partition in +// the commits is assumed to be nothing committing yet. func CalculateGroupLag( group DescribedGroup, commit OffsetResponses, - offsets ListedOffsets, + endOffsets ListedOffsets, ) GroupLag { if group.State == "Empty" { - return calculateEmptyLag(commit, offsets) + return calculateEmptyLag(commit, endOffsets) } if commit == nil { // avoid panics below commit = make(OffsetResponses) } - if offsets == nil { - offsets = make(ListedOffsets) + if endOffsets == nil { + endOffsets = make(ListedOffsets) } l := make(map[string]map[int32]GroupMemberLag) @@ -1086,7 +1231,7 @@ func CalculateGroupLag( } tcommit := commit[t.Topic] - tend := offsets[t.Topic] + tend := endOffsets[t.Topic] for _, p := range t.Partitions { var ( pcommit OffsetResponse @@ -1141,7 +1286,7 @@ func CalculateGroupLag( return l } -func calculateEmptyLag(commit OffsetResponses, offsets ListedOffsets) GroupLag { +func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLag { l := make(map[string]map[int32]GroupMemberLag) for t, ps := range commit { lt := l[t] @@ -1149,7 +1294,7 @@ func calculateEmptyLag(commit OffsetResponses, offsets ListedOffsets) GroupLag { lt = make(map[int32]GroupMemberLag) l[t] = lt } - tend := offsets[t] + tend := endOffsets[t] for p, pcommit := range ps { var ( pend ListedOffset @@ -1191,7 +1336,7 @@ func calculateEmptyLag(commit OffsetResponses, offsets ListedOffsets) GroupLag { // lag calculations above, the partitions were not committed to and we // count that as entirely lagging. for t, lt := range l { - tend := offsets[t] + tend := endOffsets[t] for p, pend := range tend { if _, ok := lt[p]; ok { continue diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 98c394eb..c340a1c6 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -777,7 +777,7 @@ start: } authenticate = req.Version == 1 } - cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "mechanism", mechanism.Name(), "authenticate", authenticate) + cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "addr", cxn.addr, "mechanism", mechanism.Name(), "authenticate", authenticate) cxn.mechanism = mechanism return cxn.doSasl(authenticate) } @@ -813,7 +813,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { binary.BigEndian.PutUint32(buf, uint32(len(clientWrite))) buf = append(buf, clientWrite...) - cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "step", step) + cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "addr", cxn.addr, "step", step) _, _, _, _, err = cxn.writeConn(context.Background(), buf, wt, time.Now()) cxn.cl.bufPool.put(buf)