Skip to content

Commit

Permalink
ref: Use message.position_to_commit from arroyo (#3246)
Browse files Browse the repository at this point in the history
This simplifies getting the right position to commit from Arroyo. Also fixes a bug where the wrong offset was being committed on shutdown (it should be message.next_offset but we were actually committing message.offset)
  • Loading branch information
lynnagara authored Oct 13, 2022
1 parent 6908733 commit 101e866
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions snuba/subscriptions/executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from arroyo.processing import StreamProcessor
from arroyo.processing.strategies import MessageRejected, ProcessingStrategy
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
from arroyo.types import Commit, Position
from arroyo.types import Commit

from snuba import state
from snuba.consumers.utils import get_partition_count
Expand Down Expand Up @@ -431,9 +431,7 @@ def poll(self) -> None:

self.__queue.popleft()

self.__commit(
{message.partition: Position(message.next_offset, message.timestamp)}
)
self.__commit({message.partition: message.position_to_commit})

def submit(self, message: Message[SubscriptionTaskResult]) -> None:
assert not self.__closed
Expand Down Expand Up @@ -468,7 +466,7 @@ def join(self, timeout: Optional[float] = None) -> None:

future.result(remaining)

offset = {message.partition: Position(message.offset, message.timestamp)}
offset = {message.partition: message.position_to_commit}

logger.info("Committing offset: %r", offset)
self.__commit(offset, force=True)

0 comments on commit 101e866

Please sign in to comment.