Skip to content

Commit

Permalink
Revert Buffer._backgroun_cycle
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 27, 2023
1 parent 327dfa8 commit ef0b8fe
Showing 1 changed file with 30 additions and 35 deletions.
65 changes: 30 additions & 35 deletions distributed/shuffle/_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,45 +137,40 @@ def empty(self) -> bool:
return not self.shards

async def _background_task(self) -> None:
done = False
while not done:
with context_meter.meter("idle"):
done = await self._background_cycle()

async def _background_cycle(self) -> bool:
def _continue() -> bool:
return bool(self.shards or self._inputs_done)

async with self._shards_available:
await self._shards_available.wait_for(_continue)
if self._inputs_done and not self.shards:
return True
part_id = max(self.sizes, key=self.sizes.__getitem__)
if self.max_message_size > 0:
size = 0
shards = []
while size < self.max_message_size:
try:
shard = self.shards[part_id].pop()
shards.append(shard)
s = self.sizes_detail[part_id].pop()
size += s
self.sizes[part_id] -= s
except IndexError:
while True:
with context_meter.meter("idle"):
async with self._shards_available:
await self._shards_available.wait_for(_continue)
if self._inputs_done and not self.shards:
break
finally:
if not self.shards[part_id]:
del self.shards[part_id]
assert not self.sizes[part_id]
del self.sizes[part_id]
assert not self.sizes_detail[part_id]
del self.sizes_detail[part_id]
else:
shards = self.shards.pop(part_id)
size = self.sizes.pop(part_id)
self._shards_available.notify_all()
await self.process(part_id, shards, size)
return False
part_id = max(self.sizes, key=self.sizes.__getitem__)
if self.max_message_size > 0:
size = 0
shards = []
while size < self.max_message_size:
try:
shard = self.shards[part_id].pop()
shards.append(shard)
s = self.sizes_detail[part_id].pop()
size += s
self.sizes[part_id] -= s
except IndexError:
break
finally:
if not self.shards[part_id]:
del self.shards[part_id]
assert not self.sizes[part_id]
del self.sizes[part_id]
assert not self.sizes_detail[part_id]
del self.sizes_detail[part_id]
else:
shards = self.shards.pop(part_id)
size = self.sizes.pop(part_id)
self._shards_available.notify_all()
await self.process(part_id, shards, size)

async def write(self, data: dict[str, ShardType]) -> None:
"""
Expand Down

0 comments on commit ef0b8fe

Please sign in to comment.