Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update comparison logic for worker state #3321

Merged
merged 4 commits into from
Feb 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ def __init__(

self.extra = extra or {}

def __hash__(self):
return hash((self.name, self.host))

def __eq__(self, other):
return type(self) == type(other) and hash(self) == hash(other)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late to the party. There is a bug in here: It is not guaranteed that two WorkerState objects are actually the same even though there hash value is the same. Hashing is not a bijective function.

I think we need to go the extra route here and check for (self.name, self.host) == (other.name, other.host)

Copy link
Contributor

@StephanErb StephanErb Feb 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomAugspurger @bnaul please see above.

While I totally agree that hash collisions are very very unlikely, Dask has a massive install base with sometimes hundreds to thousands of workers per cluster. So as far as we know the once in a million event could happen next Tuesday. 🤷‍♂


@property
def host(self):
return get_address_host(self.address)
Expand Down Expand Up @@ -2557,7 +2563,7 @@ def handle_release_data(self, key=None, worker=None, client=None, **msg):
if ts is None:
return
ws = self.workers[worker]
if ts.processing_on is not ws:
if ts.processing_on != ws:
return
r = self.stimulus_missing_data(key=key, ensure=False, **msg)
self.transitions(r)
Expand Down Expand Up @@ -4020,7 +4026,7 @@ def transition_processing_memory(
if ws is None:
return {key: "released"}

if ws is not ts.processing_on: # someone else has this task
if ws != ts.processing_on: # someone else has this task
logger.info(
"Unexpected worker completed task, likely due to"
" work stealing. Expected: %s, Got: %s, Key: %s",
Expand Down