Skip to content

Commit

Permalink
Improve logging in P2P's scheduler plugin (dask#8410)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Dec 18, 2023
1 parent e1b57a8 commit 0ce1ee5
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions distributed/shuffle/_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ def get_or_create(
self.active_shuffles[spec.id] = state
self._shuffles[spec.id].add(state)
state.participating_workers.add(worker)
logger.warning(
"Shuffle %s initialized by task %r executed on worker %s",
spec.id,
key,
worker,
)
return ToPickle(state.run_spec)

def _raise_if_barrier_unknown(self, id: ShuffleId) -> None:
Expand Down Expand Up @@ -377,7 +383,7 @@ def transition(
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id)
logger.debug(
"Shuffle %s forgotten because task '%s' transitioned to %s due to "
"Shuffle %s forgotten because task %r transitioned to %s due to "
"stimulus '%s'",
shuffle_id,
key,
Expand Down Expand Up @@ -408,9 +414,10 @@ def _fail_on_workers(self, shuffle: SchedulerShuffleState, message: str) -> None
}
self.scheduler.send_all({}, worker_msgs)

def _clean_on_scheduler(self, id: ShuffleId, stimulus_id: str | None) -> None:
def _clean_on_scheduler(self, id: ShuffleId, stimulus_id: str) -> None:
shuffle = self.active_shuffles.pop(id)
if not shuffle._archived_by and stimulus_id:
logger.warning("Shuffle %s deactivated due to stimulus '%s'", id, stimulus_id)
if not shuffle._archived_by:
shuffle._archived_by = stimulus_id
self._archived_by_stimulus[stimulus_id].add(shuffle)

Expand Down

0 comments on commit 0ce1ee5

Please sign in to comment.