Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some comments to the SmartQueue #357

Merged
merged 4 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/executor/test_smartq.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def invalid_worker(q):
with q.new_consumer() as c:
async for item in c:
print("item2=", item.data)
assert c.last_item == item.data
assert c.current_item == item.data
outputs.add(item.data)
await q.mark_done(item)
print("w end")
Expand Down
2 changes: 1 addition & 1 deletion yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
datetime.now(timezone.utc) + batch.timeout if batch.timeout else None
)
try:
current_worker_task = consumer.last_item
current_worker_task = consumer.current_item
if current_worker_task:
emit(
events.TaskStarted(
Expand Down
31 changes: 27 additions & 4 deletions yapapi/executor/_smartq.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@
Item = TypeVar("Item")


class Handle(Generic[Item], object):
class Handle(Generic[Item]):
"""
Handle of the queue item, iow, binding between a queue item and a specific consumer.

Additionally it keeps track of the previously used consumers of the given item
to prevent them from being assigned to this item again.
"""

__slots__ = ("_data", "_prev_consumers", "_consumer")

def __init__(self, data: Item, *, consumer: Optional["Consumer[Item]"] = None):
Expand All @@ -52,9 +59,16 @@ def data(self) -> Item:


class SmartQueue(Generic[Item], object):
def __init__(self, items: Iterable[Item], *, retry_cnt: int = 2):
def __init__(self, items: Iterable[Item]):
"""
:param items: the items to be iterated over
"""
self._items: Iterator[Item] = peekable(items)

"""the items scheduled for reassignment to another consumer"""
self._rescheduled_items: Set[Handle[Item]] = set()

"""the items currently assigned to consumers"""
self._in_progress: Set[Handle[Item]] = set()

# Synchronization primitives
Expand All @@ -72,7 +86,7 @@ def has_unassigned_items(self) -> bool:
An item is _unassigned_ if it's new (hasn't been retrieved yet by any consumer)
or it has been rescheduled and is not in progress.

A queue has unassigned items iff `get()` will immediately return some item,
A queue has unassigned items iff `get()` immediately returns some item,
without waiting for an item that is currently "in progress" to be rescheduled.
"""
return self.has_new_items() or bool(self._rescheduled_items)
Expand Down Expand Up @@ -126,13 +140,15 @@ async def mark_done(self, handle: Handle[Item]) -> None:
)

async def reschedule(self, handle: Handle[Item]) -> None:
"""Free the item for reassignment to another consumer."""
assert handle in self._in_progress, "handle is not in progress"
async with self._lock:
self._in_progress.remove(handle)
self._rescheduled_items.add(handle)
self._new_items.notify_all()

async def reschedule_all(self, consumer: "Consumer[Item]"):
"""Make all items currently assigned to the consumer available for reassignment."""
async with self._lock:
handles = [handle for handle in self._in_progress if handle.consumer == consumer]
for handle in handles:
Expand All @@ -149,6 +165,7 @@ def stats(self) -> Dict:
}

async def wait_until_done(self) -> None:
"""Wait until all items in the queue are processed."""
async with self._lock:
while self.__has_data():
await self._eof.wait()
Expand All @@ -160,6 +177,11 @@ class Consumer(
AsyncIterable[Handle[Item]],
ContextManager["Consumer[Item]"],
):
"""
Provides an interface to asynchronously iterate over items in the given queue
while cooperating with other consumers attached to this queue.
"""

def __init__(self, queue: SmartQueue[Item]):
self._queue = queue
self._fetched: Optional[Handle[Item]] = None
Expand All @@ -177,7 +199,8 @@ def __exit__(
return None

@property
def last_item(self) -> Optional[Item]:
def current_item(self) -> Optional[Item]:
"""The most-recent queue item that has been fetched to be processed by this consumer."""
return self._fetched.data if self._fetched else None

async def __anext__(self) -> Handle[Item]:
Expand Down