Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor missing-data command #6332

Merged
merged 3 commits into from
May 13, 2022
Merged

Conversation

crusaderky
Copy link
Collaborator

Partially closes #5896

@crusaderky crusaderky self-assigned this May 12, 2022
@@ -4679,53 +4679,48 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
self.send_all(client_msgs, worker_msgs)

def handle_missing_data(
self, key: str, errant_worker: str, stimulus_id: str, **kwargs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all changes to scheduler.py are purely cosmetic

@crusaderky crusaderky marked this pull request as ready for review May 12, 2022 23:33
@github-actions
Copy link
Contributor

github-actions bot commented May 13, 2022

Unit Test Results

       15 files  +       3         15 suites  +3   7h 9m 55s ⏱️ + 1h 20m 53s
  2 775 tests +       1    2 695 ✔️ +     14    79 💤  -   12  1  - 1 
20 587 runs  +3 969  19 678 ✔️ +3 856  907 💤 +113  2 ±0 

For more details on these failures, see this check.

Results for commit 6551803. ± Comparison against base commit 4b81f06.

♻️ This comment has been updated with latest results.

@@ -4683,53 +4683,48 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
self.send_all(client_msgs, worker_msgs)

def handle_missing_data(
self, key: str, errant_worker: str, stimulus_id: str, **kwargs
self, key: str, worker: str, errant_worker: str, stimulus_id: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my clarification: how does worker get passed in? I don't see it as a field in MissingDataMsg.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's automatically added by self.batched_stream.send

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see logic for that anywhere in BatchedSend.send, or within the _background_send coroutine?

def send(self, *msgs) -> None:
"""Schedule a message for sending to the other side
This completes quickly and synchronously
"""
if self.comm is not None and self.comm.closed():
raise CommClosedError(f"Comm {self.comm!r} already closed.")
self.message_count += len(msgs)
self.buffer.extend(msgs)
# Avoid spurious wakeups if possible
if self.next_deadline is None:
self.waker.set()

Copy link
Collaborator Author

@crusaderky crusaderky May 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's here, on the receiving end:

await self.handle_stream(comm=comm, extra={"worker": worker})

)
recommendations[ts] = "fetch"
del data, response
self.transitions(recommendations, stimulus_id=stimulus_id)
self._handle_instructions(instructions)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This MissingDataMsg will now come after any messages generated by transitions. I think that shouldn't matter, but want to double check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be seriously worried if order mattered

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordering does sometimes matter and I suggest to not reject such a possibility just because it would be bad design. The scheduler should be able to handle all possible kind of orderings but this can still introduce subtle and unintended changes in behavior and I would advise caution either way

@crusaderky crusaderky merged commit 50d2911 into dask:main May 13, 2022
@crusaderky crusaderky deleted the WSMR/missing_data branch May 13, 2022 20:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Migrate ensure_communicating transitions to new WorkerState event mechanism
3 participants