diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 322e9f10..c992b397 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -929,7 +929,10 @@ func (cl *Client) CloseAllowingRebalance() { cl.Close() } -// Close leaves any group and closes all connections and goroutines. +// Close leaves any group and closes all connections and goroutines. This +// function waits for the group to be left. If you want to force leave a group +// immediately and ensure a speedy shutdown you can use LeaveGroupContext first +// (and then Close will be immediate). // // If you are group consuming and have overridden the default // OnPartitionsRevoked, you must manually commit offsets before closing the @@ -942,6 +945,10 @@ func (cl *Client) CloseAllowingRebalance() { // notification of revoked partitions. If you want to automatically allow // rebalancing, use CloseAllowingRebalance. func (cl *Client) Close() { + cl.close(cl.ctx) +} + +func (cl *Client) close(ctx context.Context) (rerr error) { defer cl.cfg.hooks.each(func(h Hook) { if h, ok := h.(HookClientClosed); ok { h.OnClientClosed(cl) @@ -951,7 +958,7 @@ func (cl *Client) Close() { c := &cl.consumer c.kill.Store(true) if c.g != nil { - cl.LeaveGroup() + rerr = cl.LeaveGroupContext(ctx) } else if c.d != nil { c.mu.Lock() // lock for assign c.assignPartitions(nil, assignInvalidateAll, nil, "") // we do not use a log message when not in a group @@ -963,7 +970,7 @@ func (cl *Client) Close() { // loopFetch from starting. Assigning also waits for the prior session // to be complete, meaning loopFetch cannot be running. - sessCloseCtx, sessCloseCancel := context.WithTimeout(cl.ctx, time.Second) + sessCloseCtx, sessCloseCancel := context.WithTimeout(ctx, time.Second) var wg sync.WaitGroup cl.allSinksAndSources(func(sns sinkAndSource) { if sns.source.session.id != 0 { @@ -1015,6 +1022,8 @@ func (cl *Client) Close() { closing.Close() } } + + return rerr } // Request issues a request to Kafka, waiting for and returning the response. diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 4ca249f0..ec99efa5 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -150,36 +150,82 @@ type groupConsumer struct { // We set this once to manage the group lifecycle once. managing bool - dying bool // set when closing, read in findNewAssignments + dying bool // set when closing, read in findNewAssignments + left chan struct{} + leaveErr error // set before left is closed +} + +// LeaveGroup leaves a group. Close automatically leaves the group, so this is +// only necessary to call if you plan to leave the group but continue to use +// the client. If a rebalance is in progress, this function waits for the +// rebalance to complete before the group can be left. This is necessary to +// allow you to safely issue one final offset commit in OnPartitionsRevoked. If +// you have overridden the default revoke, you must manually commit offsets +// before leaving the group. +// +// If you have configured the group with an InstanceID, this does not leave the +// group. With instance IDs, it is expected that clients will restart and +// re-use the same instance ID. To leave a group using an instance ID, you must +// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka +// scripts or kcl). +// +// It is recommended to use LeaveGroupContext to see if the leave was +// successful. +func (cl *Client) LeaveGroup() { + cl.LeaveGroupContext(context.Background()) } -// LeaveGroup leaves a group if in one. Calling the client's Close function -// also leaves a group, so this is only necessary to call if you plan to leave -// the group and continue using the client. Note that if a rebalance is in -// progress, this function waits for the rebalance to complete before the group -// can be left. This is necessary to allow you to safely issue one final offset -// commit in OnPartitionsRevoked. If you have overridden the default revoke, -// you must manually commit offsets before leaving the group. +// LeaveGroup leaves a group. Close automatically leaves the group, so this is +// only necessary to call if you plan to leave the group but continue to use +// the client. If a rebalance is in progress, this function waits for the +// rebalance to complete before the group can be left. This is necessary to +// allow you to safely issue one final offset commit in OnPartitionsRevoked. If +// you have overridden the default revoke, you must manually commit offsets +// before leaving the group. +// +// The context can be used to avoid waiting for the client to leave the group. +// Not waiting may result in your client being stuck in the group and the +// partitions this client was consuming being stuck until the session timeout. +// This function returns any leave group error or context cancel error. If the +// context is nil, this immediately leaves the group and does not wait and does +// not return an error. // // If you have configured the group with an InstanceID, this does not leave the // group. With instance IDs, it is expected that clients will restart and // re-use the same instance ID. To leave a group using an instance ID, you must // manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka // scripts or kcl). -func (cl *Client) LeaveGroup() { +func (cl *Client) LeaveGroupContext(ctx context.Context) error { c := &cl.consumer if c.g == nil { - return + return nil + } + var immediate bool + if ctx == nil { + var cancel func() + ctx, cancel = context.WithCancel(context.Background()) + cancel() + immediate = true } - c.waitAndAddRebalance() - c.mu.Lock() // lock for assign - c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup") - wait := c.g.leave() - c.mu.Unlock() - c.unaddRebalance() + go func() { + c.waitAndAddRebalance() + c.mu.Lock() // lock for assign + c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup") + c.g.leave(ctx) + c.mu.Unlock() + c.unaddRebalance() + }() - wait() // wait after we unlock + select { + case <-ctx.Done(): + if immediate { + return nil + } + return ctx.Err() + case <-c.g.left: + return c.g.leaveErr + } } // GroupMetadata returns the current group member ID and generation, or an @@ -214,6 +260,8 @@ func (c *consumer) initGroup() { rejoinCh: make(chan string, 1), heartbeatForceCh: make(chan func(error)), using: make(map[string]int), + + left: make(chan struct{}), } c.g = g if !g.cfg.setCommitCallback { @@ -411,7 +459,7 @@ func (g *groupConsumer) manage() { } } -func (g *groupConsumer) leave() (wait func()) { +func (g *groupConsumer) leave(ctx context.Context) { // If g.using is nonzero before this check, then a manage goroutine has // started. If not, it will never start because we set dying. g.mu.Lock() @@ -421,43 +469,47 @@ func (g *groupConsumer) leave() (wait func()) { g.cancel() g.mu.Unlock() - done := make(chan struct{}) + if wasManaging { + // We want to wait for the manage goroutine to be done + // so that we call the user's on{Assign,RevokeLost}. + <-g.manageDone + } - go func() { - defer close(done) + if wasDead { + // If we already called leave(), then we just wait for + // the prior leave to finish and we avoid re-issuing a + // LeaveGroup request. + return + } - if wasManaging { - // We want to wait for the manage goroutine to be done - // so that we call the user's on{Assign,RevokeLost}. - <-g.manageDone - } + go func() { + defer close(g.left) - if wasDead { - // If we already called leave(), then we just wait for - // the prior leave to finish and we avoid re-issuing a - // LeaveGroup request. + if g.cfg.instanceID != nil { return } - if g.cfg.instanceID == nil { - g.cfg.logger.Log(LogLevelInfo, "leaving group", - "group", g.cfg.group, - "member_id", g.memberID, // lock not needed now since nothing can change it (manageDone) - ) - // If we error when leaving, there is not much - // we can do. We may as well just return. - req := kmsg.NewPtrLeaveGroupRequest() - req.Group = g.cfg.group - req.MemberID = g.memberID - member := kmsg.NewLeaveGroupRequestMember() - member.MemberID = g.memberID - member.Reason = kmsg.StringPtr("client leaving group per normal operation") - req.Members = append(req.Members, member) - req.RequestWith(g.cl.ctx, g.cl) + g.cfg.logger.Log(LogLevelInfo, "leaving group", + "group", g.cfg.group, + "member_id", g.memberID, // lock not needed now since nothing can change it (manageDone) + ) + // If we error when leaving, there is not much + // we can do. We may as well just return. + req := kmsg.NewPtrLeaveGroupRequest() + req.Group = g.cfg.group + req.MemberID = g.memberID + member := kmsg.NewLeaveGroupRequestMember() + member.MemberID = g.memberID + member.Reason = kmsg.StringPtr("client leaving group per normal operation") + req.Members = append(req.Members, member) + + resp, err := req.RequestWith(ctx, g.cl) + if err != nil { + g.leaveErr = err + return } + g.leaveErr = kerr.ErrorForCode(resp.ErrorCode) }() - - return func() { <-done } } // returns the difference of g.nowAssigned and g.lastAssigned.