Skip to content

Commit

Permalink
Improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Sep 26, 2022
1 parent b6380c7 commit 0808455
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import heapq
import logging
import math
import operator
import random
import sys
Expand Down Expand Up @@ -1643,26 +1644,42 @@ def _select_keys_for_gather(

while available:
ts = available.peek()
updated_total_nbytes = total_nbytes + ts.get_nbytes()
# When there is no other traffic, the top-priority task is fetched
# regardless of its size to ensure progress
if (
self.transfer_incoming_bytes_limit is not None
and (self.transfer_incoming_bytes or to_gather)
and self.transfer_incoming_bytes + updated_total_nbytes
> self.transfer_incoming_bytes_limit
):
break
if to_gather and updated_total_nbytes > self.transfer_message_target_bytes:
if self._task_exceeds_transfer_limits(ts, total_nbytes):
break
for worker in ts.who_has:
# This also effectively pops from available
self.data_needed[worker].remove(ts)
to_gather.append(ts)
total_nbytes = updated_total_nbytes
total_nbytes += ts.get_nbytes()

return to_gather, total_nbytes

def _task_exceeds_transfer_limits(self, ts: TaskState, total_nbytes: int) -> bool:
"""Return True if gathering the task together with `total_nbytes` of other data in the same message, False otherwise."""
if not self.transfer_incoming_bytes and not total_nbytes:
# When there is no other traffic, the top-priority task is fetched
# regardless of its size to ensure progress
return False

transfer_incoming_bytes_remaining = (
self.transfer_incoming_bytes_limit or math.inf
) - self.transfer_incoming_bytes

if not total_nbytes:
# Ignore the message target for the top-priority task of each worker
# to ensure progress
bytes_left_to_fetch = transfer_incoming_bytes_remaining
else:
bytes_left_to_fetch = (
min(
transfer_incoming_bytes_remaining,
self.transfer_message_target_bytes,
)
- total_nbytes
)

return ts.get_nbytes() > bytes_left_to_fetch

def _ensure_computing(self) -> RecsInstrs:
if not self.running:
return {}, []
Expand Down

0 comments on commit 0808455

Please sign in to comment.