Skip to content

Commit

Permalink
fix: support negative begin_offset in brod_consumer
Browse files Browse the repository at this point in the history
Previously, when a user subscribes to a topic using a consumer group
that has committed `0` as its last offset and uses
`brod_group_subscriber_v2` to consume messages from this topic the
following logic is triggered:

1. `brod_group_subscriber_worker.resolve_committed_offsets/3` is used to
   determine the' asked offset`. If the offset has a value of `0`, this
   means this function returns `-1`.
2. `brod_topic_subscriber.resolve_begin_offset/1` leaves this value
   as-is because `-1` is considered a special offset.
3. `brod_consumer.resolve_begin_offset/1` receives `-1` and passes it to
   `brod_utils:resolve_offset/4`.

Unfortunately, based on my tests, the behaviour of `resolve_offset/4` is
undefined when it comes to negative offsets. In my case, it returns the
latest offset of the topic. This is wrong and causes the consumer to get
stuck.

As a solution, I have implemented a simple mapper from special offsets
(`-1`, `-2`) to a semantically correct value `:earliest` that
`brod_utils:resolve_offset/4` can resolve
  • Loading branch information
Tasyp committed Nov 19, 2024
1 parent 5172dbe commit f6ff79a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

- 4.3.3
- Fixed `brod_consumer` so that it could correctly start consumption when the begin offset is -1

- 4.3.2
- Upgrade kafka_protocol from 4.1.9 to 4.1.10 for partition leader discover/connect timeout fix.

Expand Down
11 changes: 10 additions & 1 deletion src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,16 @@ resolve_begin_offset(#state{ begin_offset = BeginOffset
, topic = Topic
, partition = Partition
} = State) when ?IS_SPECIAL_OFFSET(BeginOffset) ->
case resolve_offset(Connection, Topic, Partition, BeginOffset) of
AdjustedBeginOffset =
case BeginOffset of
-1 ->
?OFFSET_EARLIEST;
-2 ->
?OFFSET_EARLIEST;
Value ->
Value
end,
case resolve_offset(Connection, Topic, Partition, AdjustedBeginOffset) of
{ok, NewBeginOffset} ->
{ok, State#state{begin_offset = NewBeginOffset}};
{error, Reason} ->
Expand Down

0 comments on commit f6ff79a

Please sign in to comment.