Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some comments to the SmartQueue #357

Merged
merged 4 commits into from
May 11, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions yapapi/executor/_smartq.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@
Item = TypeVar("Item")


class Handle(Generic[Item], object):
class Handle(
Generic[Item], object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're here, let's remove this object, so Python2-ic

): # Handling? QueueItem? ConsumedItem? ConsumerItem? ConsumerBinding?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got used to Handle, it's not that bad. Either way, let's not forget to remove this comment.

"""
Binding between a queue item and a specific consumer.

Additionally it keeps track of the previously used consumers of the given item
to prevent them from being assigned to this item again.
"""

__slots__ = ("_data", "_prev_consumers", "_consumer")

def __init__(self, data: Item, *, consumer: Optional["Consumer[Item]"] = None):
Expand All @@ -52,9 +61,17 @@ def data(self) -> Item:


class SmartQueue(Generic[Item], object):
def __init__(self, items: Iterable[Item], *, retry_cnt: int = 2):
def __init__(self, items: Iterable[Item]):
"""
:param items: the items to be iterated over
:param retry_cnt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we drop this argument from __init__() (I understand that's because it's unused) then let's not add a docstring for it.

"""
self._items: Iterator[Item] = peekable(items)

"""the items scheduled for reassignment to another consumer"""
self._rescheduled_items: Set[Handle[Item]] = set()

"""the items currently assigned to consumers"""
self._in_progress: Set[Handle[Item]] = set()

# Synchronization primitives
Expand All @@ -72,7 +89,7 @@ def has_unassigned_items(self) -> bool:
An item is _unassigned_ if it's new (hasn't been retrieved yet by any consumer)
or it has been rescheduled and is not in progress.

A queue has unassigned items iff `get()` will immediately return some item,
A queue has unassigned items iff `get()` immediately returns some item,
without waiting for an item that is currently "in progress" to be rescheduled.
"""
return self.has_new_items() or bool(self._rescheduled_items)
Expand Down Expand Up @@ -126,13 +143,15 @@ async def mark_done(self, handle: Handle[Item]) -> None:
)

async def reschedule(self, handle: Handle[Item]) -> None:
"""free the item for reassignment to another consumer"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I we ever run yapapi code through flake8 it will complain about method docstrings not starting with a capital letter and not ending with a full stop.

assert handle in self._in_progress, "handle is not in progress"
async with self._lock:
self._in_progress.remove(handle)
self._rescheduled_items.add(handle)
self._new_items.notify_all()

async def reschedule_all(self, consumer: "Consumer[Item]"):
"""make all items currently assigned to the consumer available for reassignment"""
async with self._lock:
handles = [handle for handle in self._in_progress if handle.consumer == consumer]
for handle in handles:
Expand All @@ -149,6 +168,7 @@ def stats(self) -> Dict:
}

async def wait_until_done(self) -> None:
"""wait until all items in the queue are processed"""
async with self._lock:
while self.__has_data():
await self._eof.wait()
Expand All @@ -160,6 +180,11 @@ class Consumer(
AsyncIterable[Handle[Item]],
ContextManager["Consumer[Item]"],
):
"""
Provides an interface to asynchronously iterate over items in the given queue
while cooperating with other consumers attached to this queue.
"""

def __init__(self, queue: SmartQueue[Item]):
self._queue = queue
self._fetched: Optional[Handle[Item]] = None
Expand All @@ -178,6 +203,7 @@ def __exit__(

@property
def last_item(self) -> Optional[Item]:
"""shouldn't it be called `current_item` ?"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I'd call it current_item as well.

return self._fetched.data if self._fetched else None

async def __anext__(self) -> Handle[Item]:
Expand Down