Skip to content

Commit

Permalink
Assert spawned processes terminating in distributed tests (pytorch#13071
Browse files Browse the repository at this point in the history
)

Summary:
Pull Request resolved: pytorch#13071

In the case where a process got stuck and timed out on joining, we would see a None != 1 assertion error in the code path where the exit statuses are compared. This implies that the first process exited with exit code 1 and another one didn't exit at all. With this commit the error message is more descriptive.

Differential Revision: D10785266

fbshipit-source-id: c8cc02d07ea4fdc6f5374afd9a0aac72218fe61d
  • Loading branch information
pietern authored and facebook-github-bot committed Oct 24, 2018
1 parent 2ac7b6b commit 917b203
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
9 changes: 6 additions & 3 deletions test/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,9 +1362,12 @@ def _join_and_reduce(self, fn):
getattr(fn, "skip_if_no_gpu", False) or
getattr(fn, "skip_if_small_worldsize", False)
)
self.JOIN_TIMEOUT = get_timeout(self.id())
for p in self.processes:
p.join(self.JOIN_TIMEOUT)
join_timeout = get_timeout(self.id())
for rank, process in enumerate(self.processes):
process.join(join_timeout)
self.assertFalse(
process.is_alive(),
"Timeout waiting for rank %d to terminate" % rank)

first_process = self.processes[0]
for p in self.processes:
Expand Down
9 changes: 6 additions & 3 deletions test/test_thd_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,12 @@ def _join_and_reduce(self, fn):
getattr(fn, "skip_if_no_gpu", False) or
getattr(fn, "skip_if_small_worldsize", False)
)
self.JOIN_TIMEOUT = get_timeout(self.id())
for p in self.processes:
p.join(self.JOIN_TIMEOUT)
join_timeout = get_timeout(self.id())
for rank, process in enumerate(self.processes):
process.join(join_timeout)
self.assertFalse(
process.is_alive(),
"Timeout waiting for rank %d to terminate" % rank)

first_process = self.processes[0]
for p in self.processes:
Expand Down

0 comments on commit 917b203

Please sign in to comment.