Skip to content

Commit

Permalink
kgo: work around KIP-814 limitations
Browse files Browse the repository at this point in the history
I noticed fork zywillc#1 attempting to
fix a problem that I didn't know existed. After a full day of looking
into it, as it turns out, KIP-345's implementation had no solution for
restarting leaders while changing interests, and KIP-814 further doesn't
close the gap. As a client, we can leverage a little bit of KIP-814 to
close the gap ourselves.

We only do this for well known balancers because we cannot be sure if
custom balancers make weird choices depending on time, number of
invocations, instance id / member id oridering, etc., so we cannot
rely on balance plans to be the same from one run to the next.

We do rely on this for our own balancers. Technically this may not be
true if multiple member-id-only members are restarting at once. That's
ok. We expect instance id using groups to be filled with instance id
members and thus always have the same sort ordering in our balancers.

We could add another optional interface to opt in to this enhanced
re-check behavior, but I'm not sure how common it is for somebody to:
(a) write a custom balancer
(b) use instance IDs
(c) change topic interests across restarts
so we'll just leave the option out for now as it'd be complicated to
discover.

Regardless, there's a lot of documentation on what's being done and why.

Also see KAFKA-13435 and the unaddressed KAFKA-12759
  • Loading branch information
twmb committed Oct 20, 2022
1 parent 6cac810 commit b18341d
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 10 deletions.
7 changes: 6 additions & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,12 @@ func AutoCommitMarks() GroupOpt {
// issues a leave group request on behalf of this instance ID (see kcl), or you
// can manually use the kmsg package with a proper LeaveGroupRequest.
//
// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4.0+.
// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4+.
//
// NOTE: If you restart a consumer group leader that is using an instance ID,
// it will not cause a rebalance even if you change which topics the leader is
// consuming. If your cluster is 3.2+, this client internally works around this
// limitation and you do not need to trigger a rebalance manually.
func InstanceID(id string) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.instanceID = &id }}
}
Expand Down
75 changes: 72 additions & 3 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,9 @@ start:
syncReq.InstanceID = g.cfg.instanceID
syncReq.ProtocolType = &g.cfg.protocol
syncReq.Protocol = &protocol
syncReq.GroupAssignment = plan // nil unless we are the leader
if !joinResp.SkipAssignment {
syncReq.GroupAssignment = plan // nil unless we are the leader
}
var (
syncResp *kmsg.SyncGroupResponse
synced = make(chan struct{})
Expand Down Expand Up @@ -1084,6 +1086,26 @@ start:
return err
}

// KIP-814 fixes one limitation with KIP-345, but has another
// fundamental limitation. When an instance ID leader restarts, its
// first join always gets its old assignment *even if* the member's
// topic interests have changed. The broker tells us to skip doing
// assignment ourselves, but we ignore that for our well known
// balancers. Instead, we balance (but avoid sending it while syncing,
// as we are supposed to), and if our sync assignment differs from our
// own calculated assignment, We know we have a stale broker assignment
// and must trigger a rebalance.
if plan != nil && joinResp.SkipAssignment {
for _, assign := range plan {
if assign.MemberID == g.memberID {
if !bytes.Equal(assign.MemberAssignment, syncResp.MemberAssignment) {
g.rejoin("instance group leader restarted and was reassigned old plan, our topic interests changed and we must rejoin to force a rebalance")
}
break
}
}
}

return nil
}

Expand Down Expand Up @@ -1117,30 +1139,77 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
protocol = *resp.Protocol
}

// KIP-345 has a fundamental limitation that KIP-814 also does not
// solve.
//
// When using instance IDs, if a leader restarts, its first join
// receives its old assignment no matter what. KIP-345 resulted in
// leaderless consumer groups, KIP-814 fixes this by notifying the
// restarted leader that it is still leader but that it should not
// balance.
//
// If the join response is <= v8, we hackily work around the leaderless
// situation by checking if the LeaderID is prefixed with our
// InstanceID. This is how Kafka and Redpanda are both implemented. At
// worst, if we mis-predict the leader, then we may accidentally try to
// cause a rebalance later and it will do nothing. That's fine. At
// least we can cause rebalances now, rather than having a leaderless,
// not-ever-rebalancing client.
//
// KIP-814 does not solve our problem fully: if we restart and rejoin,
// we always get our old assignment even if we changed what topics we
// were interested in. Because we have our old assignment, we think
// that the plan is fine *even with* our new interests, and we wait for
// some external rebalance trigger. We work around this limitation
// above (see "KIP-814") only for well known balancers; we cannot work
// around this limitation for not well known balancers because they may
// do so weird things we cannot control nor reason about.
leader := resp.LeaderID == resp.MemberID
leaderNoPlan := !leader && resp.Version <= 8 && g.cfg.instanceID != nil && strings.HasPrefix(resp.LeaderID, *g.cfg.instanceID+"-")
if leader {
g.leader.set(true)
g.cfg.logger.Log(LogLevelInfo, "joined, balancing group",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", g.cfg.instanceID,
"instance_id", strptr{g.cfg.instanceID},
"generation", g.generation,
"balance_protocol", protocol,
"leader", true,
)
plan, err = g.balanceGroup(protocol, resp.Members, resp.SkipAssignment)
} else if leaderNoPlan {
g.leader.set(true)
g.cfg.logger.Log(LogLevelInfo, "joined as leader but unable to balance group due to KIP-345 limitations",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", strptr{g.cfg.instanceID},
"generation", g.generation,
"balance_protocol", protocol,
"leader", true,
)
} else {
g.cfg.logger.Log(LogLevelInfo, "joined",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", g.cfg.instanceID,
"instance_id", strptr{g.cfg.instanceID},
"generation", g.generation,
"leader", false,
)
}
return
}

type strptr struct {
s *string
}

func (s strptr) String() string {
if s.s == nil {
return "<nil>"
}
return *s.s
}

// If other group members consume topics we are not interested in, we track the
// entire group's topics in this groupExternal type. On metadata update, we see
// if any partitions for any of these topics have changed, and if so, we as
Expand Down
15 changes: 9 additions & 6 deletions pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,7 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error)
// own metadata update to see if partition counts have changed for these random
// topics.
func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember, skipBalance bool) ([]kmsg.SyncGroupRequestGroupAssignment, error) {
if skipBalance {
g.cl.cfg.logger.Log(LogLevelInfo, "parsing group balance as leader but not assigning (KIP-814)")
} else {
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")
}
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")

b, err := g.findBalancer("balance group", proto)
if err != nil {
Expand Down Expand Up @@ -443,7 +439,14 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
// have logged the current interests, we do not need to actually
// balance.
if skipBalance {
return nil, nil
switch proto := b.ProtocolName(); proto {
case RangeBalancer().ProtocolName(),
RoundRobinBalancer().ProtocolName(),
StickyBalancer().ProtocolName(),
CooperativeStickyBalancer().ProtocolName():
default:
return nil, nil
}
}

// If the returned IntoSyncAssignment is a BalancePlan, which it likely
Expand Down

0 comments on commit b18341d

Please sign in to comment.