Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kadm: update CalculateGroupLag docs, add LeaveGroup APIs; kgo: improve log #230

Merged
merged 3 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 163 additions & 18 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,143 @@ func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGro
})
}

// LeaveGroupBuilder helps build a leave group request, rather than having
// a function signature (string, string, ...string).
//
// All functions on this type accept and return the same pointer, allowing
// for easy build-and-use usage.
type LeaveGroupBuilder struct {
group string
reason *string
instanceIDs []*string
}

// LeaveGroup returns a LeaveGroupBuilder for the input group.
func LeaveGroup(group string) *LeaveGroupBuilder {
return &LeaveGroupBuilder{
group: group,
}
}

// Reason attaches a reason to all members in the leave group request.
// This requires Kafka 3.2+.
func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder {
b.reason = StringPtr(reason)
return b
}

// InstanceIDs are members to remove from a group.
func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder {
for _, id := range ids {
if id != "" {
b.instanceIDs = append(b.instanceIDs, StringPtr(id))
}
}
return b
}

// LeaveGroupResponse contains the response for an individual instance ID that
// left a group.
type LeaveGroupResponse struct {
Group string // Group is the group that was left.
InstanceID string // InstanceID is the instance ID that left the group.
MemberID string // MemberID is the member ID that left the group.
Err error // Err is non-nil if this member did not exist.
}

// LeaveGroupResponses contains responses for each member of a leave group
// request. The map key is the instance ID that was removed from the group.
type LeaveGroupResponses map[string]LeaveGroupResponse

// Sorted returns all removed group members by instance ID.
func (ls LeaveGroupResponses) Sorted() []LeaveGroupResponse {
s := make([]LeaveGroupResponse, 0, len(ls))
for _, l := range ls {
s = append(s, l)
}
sort.Slice(s, func(i, j int) bool { return s[i].InstanceID < s[j].InstanceID })
return s
}

// EachError calls fn for every removed member that has a non-nil error.
func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse)) {
for _, l := range ls {
if l.Err != nil {
fn(l)
}
}
}

// Each calls fn for every removed member.
func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse)) {
for _, l := range ls {
fn(l)
}
}

// Error iterates over all removed members and returns the first error
// encountered, if any.
func (ls LeaveGroupResponses) Error() error {
for _, l := range ls {
if l.Err != nil {
return l.Err
}
}
return nil
}

// Ok returns true if there are no errors. This is a shortcut for ls.Error() ==
// nil.
func (ls LeaveGroupResponses) Ok() bool {
return ls.Error() == nil
}

// LeaveGroup causes instance IDs to leave a group.
//
// This function allows manually removing members using instance IDs from a
// group, which allows for fast scale down / host replacement (see KIP-345 for
// more detail). This returns an *AuthErr if the use is not authorized to
// remove members from groups.
func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error) {
if b == nil || len(b.instanceIDs) == 0 {
return nil, nil
}
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = b.group
for _, id := range b.instanceIDs {
m := kmsg.NewLeaveGroupRequestMember()
id := id
m.InstanceID = id
m.Reason = b.reason
req.Members = append(req.Members, m)
}

resp, err := req.RequestWith(ctx, cl.cl)
if err != nil {
return nil, err
}
if err := maybeAuthErr(resp.ErrorCode); err != nil {
return nil, err
}
if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
return nil, err
}

resps := make(LeaveGroupResponses)
for _, m := range resp.Members {
if m.InstanceID == nil {
continue // highly unexpected, buggy kafka
}
resps[*m.InstanceID] = LeaveGroupResponse{
Group: b.group,
MemberID: m.MemberID,
InstanceID: *m.InstanceID,
Err: kerr.ErrorForCode(resp.ErrorCode),
}
}
return resps, err
}

// OffsetResponse contains the response for an individual offset for offset
// methods.
type OffsetResponse struct {
Expand Down Expand Up @@ -1045,31 +1182,39 @@ func (l GroupTopicsLag) Sorted() []TopicLag {
}

// CalculateGroupLag returns the per-partition lag of all members in a group.
// The input to this method is the returns from the three following methods,
// The input to this method is the returns from the following methods (make
// sure to check shard errors):
//
// described := DescribeGroups(ctx, group)
// fetched := FetchOffsets(ctx, group)
// toList := described.AssignedPartitions()
// toList.Merge(fetched.CommittedPartitions()
// ListEndOffsets(ctx, toList.Topics())
// // Note that FetchOffsets exists to fetch only one group's offsets,
// // but some of the code below slightly changes.
// groups := DescribeGroups(ctx, group)
// commits := FetchManyOffsets(ctx, group)
// var endOffsets ListedOffsets
// listPartitions := described.AssignedPartitions()
// listPartitions.Merge(commits.CommittedPartitions()
// if topics := listPartitions.Topics(); len(topics) > 0 {
// endOffsets = ListEndOffsets(ctx, listPartitions.Topics())
// }
// for _, group := range groups {
// lag := CalculateGroupLag(group, commits[group.Group].Fetched, endOffsets)
// }
//
// If assigned partitions are missing in the listed end offsets listed end
// offsets, the partition will have an error indicating it is missing. A
// missing topic or partition in the commits is assumed to be nothing
// committing yet.
// If assigned partitions are missing in the listed end offsets, the partition
// will have an error indicating it is missing. A missing topic or partition in
// the commits is assumed to be nothing committing yet.
func CalculateGroupLag(
group DescribedGroup,
commit OffsetResponses,
offsets ListedOffsets,
endOffsets ListedOffsets,
) GroupLag {
if group.State == "Empty" {
return calculateEmptyLag(commit, offsets)
return calculateEmptyLag(commit, endOffsets)
}
if commit == nil { // avoid panics below
commit = make(OffsetResponses)
}
if offsets == nil {
offsets = make(ListedOffsets)
if endOffsets == nil {
endOffsets = make(ListedOffsets)
}

l := make(map[string]map[int32]GroupMemberLag)
Expand All @@ -1086,7 +1231,7 @@ func CalculateGroupLag(
}

tcommit := commit[t.Topic]
tend := offsets[t.Topic]
tend := endOffsets[t.Topic]
for _, p := range t.Partitions {
var (
pcommit OffsetResponse
Expand Down Expand Up @@ -1141,15 +1286,15 @@ func CalculateGroupLag(
return l
}

func calculateEmptyLag(commit OffsetResponses, offsets ListedOffsets) GroupLag {
func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLag {
l := make(map[string]map[int32]GroupMemberLag)
for t, ps := range commit {
lt := l[t]
if lt == nil {
lt = make(map[int32]GroupMemberLag)
l[t] = lt
}
tend := offsets[t]
tend := endOffsets[t]
for p, pcommit := range ps {
var (
pend ListedOffset
Expand Down Expand Up @@ -1191,7 +1336,7 @@ func calculateEmptyLag(commit OffsetResponses, offsets ListedOffsets) GroupLag {
// lag calculations above, the partitions were not committed to and we
// count that as entirely lagging.
for t, lt := range l {
tend := offsets[t]
tend := endOffsets[t]
for p, pend := range tend {
if _, ok := lt[p]; ok {
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ start:
}
authenticate = req.Version == 1
}
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "mechanism", mechanism.Name(), "authenticate", authenticate)
cxn.cl.cfg.logger.Log(LogLevelDebug, "beginning sasl authentication", "broker", logID(cxn.b.meta.NodeID), "addr", cxn.addr, "mechanism", mechanism.Name(), "authenticate", authenticate)
cxn.mechanism = mechanism
return cxn.doSasl(authenticate)
}
Expand Down Expand Up @@ -813,7 +813,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
binary.BigEndian.PutUint32(buf, uint32(len(clientWrite)))
buf = append(buf, clientWrite...)

cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "step", step)
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing raw sasl authenticate", "broker", logID(cxn.b.meta.NodeID), "addr", cxn.addr, "step", step)
_, _, _, _, err = cxn.writeConn(context.Background(), buf, wt, time.Now())

cxn.cl.bufPool.put(buf)
Expand Down