Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible race condition in kadm.CommitOffsets #167

Closed
Zach-Johnson opened this issue May 27, 2022 · 2 comments
Closed

Possible race condition in kadm.CommitOffsets #167

Zach-Johnson opened this issue May 27, 2022 · 2 comments

Comments

@Zach-Johnson
Copy link

Hi -
I've noticed some strange behavior when using kadm.CommitOffsets. If I use this function with a groupID that does not exist yet, I see

error UNKNOWN_MEMBER_ID: The coordinator is not aware of this member.

if the admin client is created sufficiently long after the regular client is set up. If I instead call this function immediately after creating the initial client, the request succeeds and the behavior is as I expect it: I get a new consumer group that begins consuming at the offsets I specified.

The context here is that previously I was trying to set up a new consumer group with specific offsets to start from and I used to do something like

client.PollRecords(ctx, 1)
client.SetOffsets(startingPositions)
...

to force the consumer group creation initially which turns out to work badly in some cases because I think I'm abusing SetOffsets here since not necessarily all the topics/partitions will have been consumed on the initial PollRecords

So I suspect there is some race happening in kadm that I'm unaware of, although maybe I'm abusing the behavior here as well - in general, is there a good pattern for forcing a consumer group to start from a specific set of offsets? Perhaps I should do something like this instead?

client.PollRecords(ctx, 1) // Force the consumer group creation if it doesn't exist
client.CommitRecords(....)

The race in kadm seems easily reproducible with something like this with a groupID that doesn't exist yet:

	e := func(err error) {
		if err != nil {
			fmt.Println("error", err)
			os.Exit(1)
		}
	}

	cl, err := kgo.NewClient(opts...)
	e(err)

	defer cl.Close()

	adm := kadm.NewClient(cl)

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

	// time.Sleep(time.Second) // Uncommenting this results in the error

	resp, err := adm.CommitOffsets(ctx, groupID, kadm.Offsets{topic: map[int32]kadm.Offset{
		0: kadm.Offset{
			Topic: topic, Partition: 0, At: 4, LeaderEpoch: -1,
		},
		1: kadm.Offset{
			Topic: topic, Partition: 1, At: 4, LeaderEpoch: -1,
		},
		2: kadm.Offset{
			Topic: topic, Partition: 2, At: 8, LeaderEpoch: -1,
		},
	}})
	e(err)
	e(resp.Error())

	fmt.Println("resp", resp)
@twmb
Copy link
Owner

twmb commented May 29, 2022

The admin level CommitOffsets cannot be used on an active group, so this is likely a race in your usage. I'd actually expect ILLEGAL_GENERATION, but perhaps there's some other race when committing with the first generation. If you commit immediately, it's likely that the kgo client guts have not yet joined the group yet, so your admin commit is going through first and forcing a commit to exist that the group then starts from.

If you want to have a group start from offsets, you need to commit before the group is joined for the first time (or when the group is completely empty). Otherwise, if you're only consuming with one member, you can use the kgo.Client SetOffsets function. This will set the partitions at the given offsets within the client. Once partitions are eventually "dirty", commits for dirty partitions will go through (but partitions that are not consumed are not dirty, so commits for those will not go through).

I recommend using the admin commit before starting the group.

@twmb twmb closed this as completed May 29, 2022
@Zach-Johnson
Copy link
Author

Understood, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants