Skip to content

Commit

Permalink
Merge pull request #230 from twmb/kadm
Browse files Browse the repository at this point in the history
kadm: update CalculateGroupLag docs, add LeaveGroup APIs; kgo: improve log
  • Loading branch information
twmb authored Oct 20, 2022
2 parents a7f5d0d + d3ee144 commit a4a2aaf
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 20 deletions.
181 changes: 163 additions & 18 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,143 @@ func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGro
})
}

// LeaveGroupBuilder helps build a leave group request, rather than having
// a function signature (string, string, ...string).
//
// All functions on this type accept and return the same pointer, allowing
// for easy build-and-use usage.
type LeaveGroupBuilder struct {
group string
reason *string
instanceIDs []*string
}

// LeaveGroup returns a LeaveGroupBuilder for the input group.
func LeaveGroup(group string) *LeaveGroupBuilder {
return &LeaveGroupBuilder{
group: group,
}
}

// Reason attaches a reason to all members in the leave group request.
// This requires Kafka 3.2+.
func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder {
b.reason = StringPtr(reason)
return b
}

// InstanceIDs are members to remove from a group.
func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder {
for _, id := range ids {
if id != "" {
b.instanceIDs = append(b.instanceIDs, StringPtr(id))
}
}
return b
}

// LeaveGroupResponse contains the response for an individual instance ID that
// left a group.
type LeaveGroupResponse struct {
Group string // Group is the group that was left.
InstanceID string // InstanceID is the instance ID that left the group.
MemberID string // MemberID is the member ID that left the group.
Err error // Err is non-nil if this member did not exist.
}

// LeaveGroupResponses contains responses for each member of a leave group
// request. The map key is the instance ID that was removed from the group.
type LeaveGroupResponses map[string]LeaveGroupResponse

// Sorted returns all removed group members by instance ID.
func (ls LeaveGroupResponses) Sorted() []LeaveGroupResponse {
s := make([]LeaveGroupResponse, 0, len(ls))
for _, l := range ls {
s = append(s, l)
}
sort.Slice(s, func(i, j int) bool { return s[i].InstanceID < s[j].InstanceID })
return s
}

// EachError calls fn for every removed member that has a non-nil error.
func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse)) {
for _, l := range ls {
if l.Err != nil {
fn(l)
}
}
}

// Each calls fn for every removed member.
func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse)) {
for _, l := range ls {
fn(l)
}
}

// Error iterates over all removed members and returns the first error
// encountered, if any.
func (ls LeaveGroupResponses) Error() error {
for _, l := range ls {
if l.Err != nil {
return l.Err
}
}
return nil
}

// Ok returns true if there are no errors. This is a shortcut for ls.Error() ==
// nil.
func (ls LeaveGroupResponses) Ok() bool {
return ls.Error() == nil
}

// LeaveGroup causes instance IDs to leave a group.
//
// This function allows manually removing members using instance IDs from a
// group, which allows for fast scale down / host replacement (see KIP-345 for
// more detail). This returns an *AuthErr if the use is not authorized to
// remove members from groups.
func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error) {
if b == nil || len(b.instanceIDs) == 0 {
return nil, nil
}
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = b.group
for _, id := range b.instanceIDs {
m := kmsg.NewLeaveGroupRequestMember()
id := id
m.InstanceID = id
m.Reason = b.reason
req.Members = append(req.Members, m)
}

resp, err := req.RequestWith(ctx, cl.cl)
if err != nil {
return nil, err
}
if err := maybeAuthErr(resp.ErrorCode); err != nil {
return nil, err
}
if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
return nil, err
}

resps := make(LeaveGroupResponses)
for _, m := range resp.Members {
if m.InstanceID == nil {
continue // highly unexpected, buggy kafka
}
resps[*m.InstanceID] = LeaveGroupResponse{
Group: b.group,
MemberID: m.MemberID,
InstanceID: *m.InstanceID,
Err: kerr.ErrorForCode(resp.ErrorCode),
}
}
return resps, err
}

// OffsetResponse contains the response for an individual offset for offset
// methods.
type OffsetResponse struct {
Expand Down Expand Up @@ -1045,31 +1182,39 @@ func (l GroupTopicsLag) Sorted() []TopicLag {
}

// CalculateGroupLag returns the per-partition lag of all members in a group.
// The input to this method is the returns from the three following methods,
// The input to this method is the returns from the following methods (make
// sure to check shard errors):
//
// described := DescribeGroups(ctx, group)
// fetched := FetchOffsets(ctx, group)
// toList := described.AssignedPartitions()
// toList.Merge(fetched.CommittedPartitions()
// ListEndOffsets(ctx, toList.Topics())
// // Note that FetchOffsets exists to fetch only one group's offsets,
// // but some of the code below slightly changes.
// groups := DescribeGroups(ctx, group)
// commits := FetchManyOffsets(ctx, group)
// var endOffsets ListedOffsets
// listPartitions := described.AssignedPartitions()
// listPartitions.Merge(commits.CommittedPartitions()
// if topics := listPartitions.Topics(); len(topics) > 0 {
// endOffsets = ListEndOffsets(ctx, listPartitions.Topics())
// }
// for _, group := range groups {
// lag := CalculateGroupLag(group, commits[group.Group].Fetched, endOffsets)
// }
//
// If assigned partitions are missing in the listed end offsets listed end
// offsets, the partition will have an error indicating it is missing. A
// missing topic or partition in the commits is assumed to be nothing
// committing yet.
// If assigned partitions are missing in the listed end offsets, the partition
// will have an error indicating it is missing. A missing topic or partition in
// the commits is assumed to be nothing committing yet.
func CalculateGroupLag(
group DescribedGroup,
commit OffsetResponses,
offsets ListedOffsets,
endOffsets ListedOffsets,
) GroupLag {
if group.State == "Empty" {
return calculateEmptyLag(commit, offsets)
return calculateEmptyLag(commit, endOffsets)
}
if commit == nil { // avoid panics below
commit = make(OffsetResponses)
}
if offsets == nil {
offsets = make(ListedOffsets)
if endOffsets == nil {
endOffsets = make(ListedOffsets)
}

l := make(map[string]map[int32]GroupMemberLag)
Expand All @@ -1086,7 +1231,7 @@ func CalculateGroupLag(
}

tcommit := commit[t.Topic]
tend := offsets[t.Topic]
tend := endOffsets[t.Topic]
for _, p := range t.Partitions {
var (
pcommit OffsetResponse
Expand Down Expand Up @@ -1141,15 +1286,15 @@ func CalculateGroupLag(
return l
}

func calculateEmptyLag(commit OffsetResponses, offsets ListedOffsets) GroupLag {
func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLag {
l := make(map[string]map[int32]GroupMemberLag)
for t, ps := range commit {
lt := l[t]
if lt == nil {
lt = make(map[int32]GroupMemberLag)
l[t] = lt
}
tend := offsets[t]
tend := endOffsets[t]
for p, pcommit := range ps {
var (
pend ListedOffset
Expand Down Expand Up @@ -1191,7 +1336,7 @@ func calculateEmptyLag(commit OffsetResponses, offsets ListedOffsets) GroupLag {
// lag calculations above, the partitions were not committed to and we
// count that as entirely lagging.
for t, lt := range l {
tend := offsets[t]
tend := endOffsets[t]
for p, pend := range tend {
if _, ok := lt[p]; ok {
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ start:
}
authenticate = req.Version == 1
}
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "mechanism", mechanism.Name(), "authenticate", authenticate)
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "addr", cxn.addr, "mechanism", mechanism.Name(), "authenticate", authenticate)
cxn.mechanism = mechanism
return cxn.doSasl(authenticate)
}
Expand Down Expand Up @@ -813,7 +813,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
binary.BigEndian.PutUint32(buf, uint32(len(clientWrite)))
buf = append(buf, clientWrite...)

cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "step", step)
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "addr", cxn.addr, "step", step)
_, _, _, _, err = cxn.writeConn(context.Background(), buf, wt, time.Now())

cxn.cl.bufPool.put(buf)
Expand Down

0 comments on commit a4a2aaf

Please sign in to comment.