-
-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker failover - consumer groups #556
Comments
|
Yes. A ForceClose() method would be useful for graceful shutdowns and early termination of autocommit goroutines. I was committing offsets to the group after it was started and the commits were going through for the new groups. Maybe the commits were going through because the consumer groups haven't joined yet (#167 (comment)). Lets say we need to resume an existing consumer group from a different offset, should we first close the group, commit offsets through kadm and then reinit the consumer group? |
this worked |
Shall i close this issue? Like you said, |
We can leave open to track ForceClose, although I'm a bit leaning towards ClosedForced, to be next to the other Close functions in godoc (unfortunate limitation due to alphabetical everything). Another option is CloseContext. wdyt? // CloseForced is the same as CloseAllowingRebalance, but does not wait for the
// group to be left once the context is closed. If you are consuming in a
// group, this function attempts to leave the group but only waits until the
// context is closed.
//
// A nil context is valid and forces the client to quit immediately. This returns any
// context error or leave group error.
func (*Client) CloseForced(context.Context) error Realistically, I should also add |
This allows more control over timeouts when leaving a group or closing the client, and also gives more insight into group leave errors. Closes #556.
After implementing it, I'll just add LeaveGroupContext and document that you can speed up Close by first using LeaveGroupContext. |
This allows more control over timeouts when leaving a group or closing the client, and also gives more insight into group leave errors. Closes #556.
This allows more control over timeouts when leaving a group or closing the client, and also gives more insight into group leave errors. Closes #556.
Thank you @twmb |
Hi @twmb. Thank you for this lib.
We have a scenario where we have N different brokers with topics and partitions, all replicated to one aggregated downstream broker node. Our goal is to replicate data from nodeUpstream1 to nodeDownstream, and in the event of nodeUpstream1 going down, we should be able to switch over to nodeUpstream2, nodeUpstream3, and so on in a round-robin fashion. This approach helps us deduplicate data downstream without requiring additional stores.
However, we have encountered few issues with our current implementation. I would like to discuss and confirm if we are following the right approach or if there are any better alternatives. The issues we are facing are as follows:
OnBrokerDisconnect() hook gets called multiple times when a broker goes down. It also occurs when idle connections are reaped internally. Is this behavior expected, or should the hook be called only once per broker disconnect event?
Close() cannot be called for the consumer group because it issues a fresh new request to LeaveGroup. This request gets stuck when the broker is already down. Is there a recommended way to gracefully handle this situation and ensure the consumer group is closed properly?
If autocommit is enabled, there seems to be no way to clean up the associated goroutines. Is there a recommended approach to clean up these goroutines when autocommit is enabled?
PurgeTopicsFromClient seems to be the closest cleanup option we found in the library. However, when we round-robin back to the same node, it causes an error (UNKNOWN_MEMBER_ID: The coordinator is not aware of this member.) when trying to commit offsets using kadm.CommitOffsets to resume the consumer group from where it was left off.
implementation: https://github.com/joeirimpan/kaf-relay/blob/fallbacks/consumer.go
The text was updated successfully, but these errors were encountered: