-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Forward worker exceptions and exit with it #1003
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For lint error, can you please follow https://github.com/pytorch/data/blob/main/CONTRIBUTING.md#code-style to fix them automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to rebase onto main
branch again. I have already renamed PrototypeMultiProcessingReadingService
to MultiProcessingReadingService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall with some questions:
dp = MakeMistakeDataPipe(dp) | ||
for worker_prefetch_cnt in [0, 5, 10]: | ||
for num_workers in [1, 4]: | ||
rs = MultiProcessingReadingService(num_workers=num_workers, worker_prefetch_cnt=worker_prefetch_cnt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if there is a DataPipe after Prefetcher
? Does it still work properly?
e.g. dp.map().prefetch().map()
@priyaramani has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@priyaramani merged this pull request in b717187. |
When a worker thread fails in PrototypeMultiProcessingReadingService, exception was silently neglected and the thread hanged. This PR fixes that by catching and propagating exception to response queue, showing that to user and exiting the process instead of hanging.