Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: add LeaveGroupContext #569

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,10 @@ func (cl *Client) CloseAllowingRebalance() {
cl.Close()
}

// Close leaves any group and closes all connections and goroutines.
// Close leaves any group and closes all connections and goroutines. This
// function waits for the group to be left. If you want to force leave a group
// immediately and ensure a speedy shutdown you can use LeaveGroupContext first
// (and then Close will be immediate).
//
// If you are group consuming and have overridden the default
// OnPartitionsRevoked, you must manually commit offsets before closing the
Expand All @@ -942,6 +945,10 @@ func (cl *Client) CloseAllowingRebalance() {
// notification of revoked partitions. If you want to automatically allow
// rebalancing, use CloseAllowingRebalance.
func (cl *Client) Close() {
cl.close(cl.ctx)
}

func (cl *Client) close(ctx context.Context) (rerr error) {
defer cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookClientClosed); ok {
h.OnClientClosed(cl)
Expand All @@ -951,7 +958,7 @@ func (cl *Client) Close() {
c := &cl.consumer
c.kill.Store(true)
if c.g != nil {
cl.LeaveGroup()
rerr = cl.LeaveGroupContext(ctx)
} else if c.d != nil {
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, nil, "") // we do not use a log message when not in a group
Expand All @@ -963,7 +970,7 @@ func (cl *Client) Close() {
// loopFetch from starting. Assigning also waits for the prior session
// to be complete, meaning loopFetch cannot be running.

sessCloseCtx, sessCloseCancel := context.WithTimeout(cl.ctx, time.Second)
sessCloseCtx, sessCloseCancel := context.WithTimeout(ctx, time.Second)
var wg sync.WaitGroup
cl.allSinksAndSources(func(sns sinkAndSource) {
if sns.source.session.id != 0 {
Expand Down Expand Up @@ -1015,6 +1022,8 @@ func (cl *Client) Close() {
closing.Close()
}
}

return rerr
}

// Request issues a request to Kafka, waiting for and returning the response.
Expand Down
131 changes: 91 additions & 40 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,36 +150,82 @@ type groupConsumer struct {
// We set this once to manage the group lifecycle once.
managing bool

dying bool // set when closing, read in findNewAssignments
dying bool // set when closing, read in findNewAssignments
left chan struct{}
leaveErr error // set before left is closed
}

// LeaveGroup leaves a group. Close automatically leaves the group, so this is
// only necessary to call if you plan to leave the group but continue to use
// the client. If a rebalance is in progress, this function waits for the
// rebalance to complete before the group can be left. This is necessary to
// allow you to safely issue one final offset commit in OnPartitionsRevoked. If
// you have overridden the default revoke, you must manually commit offsets
// before leaving the group.
//
// If you have configured the group with an InstanceID, this does not leave the
// group. With instance IDs, it is expected that clients will restart and
// re-use the same instance ID. To leave a group using an instance ID, you must
// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
// scripts or kcl).
//
// It is recommended to use LeaveGroupContext to see if the leave was
// successful.
func (cl *Client) LeaveGroup() {
cl.LeaveGroupContext(cl.ctx)
}

// LeaveGroup leaves a group if in one. Calling the client's Close function
// also leaves a group, so this is only necessary to call if you plan to leave
// the group and continue using the client. Note that if a rebalance is in
// progress, this function waits for the rebalance to complete before the group
// can be left. This is necessary to allow you to safely issue one final offset
// commit in OnPartitionsRevoked. If you have overridden the default revoke,
// you must manually commit offsets before leaving the group.
// LeaveGroup leaves a group. Close automatically leaves the group, so this is
// only necessary to call if you plan to leave the group but continue to use
// the client. If a rebalance is in progress, this function waits for the
// rebalance to complete before the group can be left. This is necessary to
// allow you to safely issue one final offset commit in OnPartitionsRevoked. If
// you have overridden the default revoke, you must manually commit offsets
// before leaving the group.
//
// The context can be used to avoid waiting for the client to leave the group.
// Not waiting may result in your client being stuck in the group and the
// partitions this client was consuming being stuck until the session timeout.
// This function returns any leave group error or context cancel error. If the
// context is nil, this immediately leaves the group and does not wait and does
// not return an error.
//
// If you have configured the group with an InstanceID, this does not leave the
// group. With instance IDs, it is expected that clients will restart and
// re-use the same instance ID. To leave a group using an instance ID, you must
// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
// scripts or kcl).
func (cl *Client) LeaveGroup() {
func (cl *Client) LeaveGroupContext(ctx context.Context) error {
c := &cl.consumer
if c.g == nil {
return
return nil
}
var immediate bool
if ctx == nil {
var cancel func()
ctx, cancel = context.WithCancel(context.Background())
cancel()
immediate = true
}

c.waitAndAddRebalance()
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
wait := c.g.leave()
c.mu.Unlock()
c.unaddRebalance()
go func() {
c.waitAndAddRebalance()
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
c.g.leave(ctx)
c.mu.Unlock()
c.unaddRebalance()
}()

wait() // wait after we unlock
select {
case <-ctx.Done():
if immediate {
return nil
}
return ctx.Err()
case <-c.g.left:
return c.g.leaveErr
}
}

// GroupMetadata returns the current group member ID and generation, or an
Expand Down Expand Up @@ -214,6 +260,8 @@ func (c *consumer) initGroup() {
rejoinCh: make(chan string, 1),
heartbeatForceCh: make(chan func(error)),
using: make(map[string]int),

left: make(chan struct{}),
}
c.g = g
if !g.cfg.setCommitCallback {
Expand Down Expand Up @@ -411,7 +459,7 @@ func (g *groupConsumer) manage() {
}
}

func (g *groupConsumer) leave() (wait func()) {
func (g *groupConsumer) leave(ctx context.Context) {
// If g.using is nonzero before this check, then a manage goroutine has
// started. If not, it will never start because we set dying.
g.mu.Lock()
Expand All @@ -421,43 +469,46 @@ func (g *groupConsumer) leave() (wait func()) {
g.cancel()
g.mu.Unlock()

done := make(chan struct{})

go func() {
defer close(done)

if wasManaging {
// We want to wait for the manage goroutine to be done
// so that we call the user's on{Assign,RevokeLost}.
<-g.manageDone
}

if wasDead {
// If we already called leave(), then we just wait for
// the prior leave to finish and we avoid re-issuing a
// LeaveGroup request.
return
}

if g.cfg.instanceID == nil {
g.cfg.logger.Log(LogLevelInfo, "leaving group",
"group", g.cfg.group,
"member_id", g.memberID, // lock not needed now since nothing can change it (manageDone)
)
// If we error when leaving, there is not much
// we can do. We may as well just return.
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = g.cfg.group
req.MemberID = g.memberID
member := kmsg.NewLeaveGroupRequestMember()
member.MemberID = g.memberID
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
req.Members = append(req.Members, member)
req.RequestWith(g.cl.ctx, g.cl)
defer close(g.left)

if g.cfg.instanceID != nil {
return
}
}()

return func() { <-done }
g.cfg.logger.Log(LogLevelInfo, "leaving group",
"group", g.cfg.group,
"member_id", g.memberID, // lock not needed now since nothing can change it (manageDone)
)
// If we error when leaving, there is not much
// we can do. We may as well just return.
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = g.cfg.group
req.MemberID = g.memberID
member := kmsg.NewLeaveGroupRequestMember()
member.MemberID = g.memberID
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
req.Members = append(req.Members, member)

resp, err := req.RequestWith(ctx, g.cl)
if err != nil {
g.leaveErr = err
return
}
g.leaveErr = kerr.ErrorForCode(resp.ErrorCode)
}()
}

// returns the difference of g.nowAssigned and g.lastAssigned.
Expand Down