-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataPipe] Automatically close parent streams and discarded streams #560
Conversation
[ghstack-poisoned]
ghstack-source-id: 88237d7a9aa2f2e473bbfbe7311fdec6e067f0a9 Pull Request resolved: #560
…d streams" [ghstack-poisoned]
ghstack-source-id: b0a21facf64c47650e43048e59f2d482ced0228d Pull Request resolved: #560
Domain tests are broken because of out-of-sync torch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -107,7 +107,9 @@ def __iter__(self) -> Iterator[Tuple[str, io.BufferedIOBase]]: | |||
inner_path = os.path.join(path, info.filename) | |||
file_obj = rar.open(info) | |||
|
|||
yield inner_path, StreamWrapper(file_obj) # type: ignore[misc] | |||
yield inner_path, StreamWrapper(file_obj, stream) # type: ignore[misc] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we pass in the name
here to StreamWrapper
? Or there is a good reason to leave it out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no reason to skip names, they are convenient for debugging. Will add in both cases.
@@ -56,10 +56,13 @@ def __iter__(self) -> Iterator[Tuple[str, BufferedIOBase]]: | |||
try: | |||
extracted_fobj = lzma.open(data_stream, mode="rb") # type: ignore[call-overload] | |||
new_pathname = pathname.rstrip(".xz") | |||
yield new_pathname, StreamWrapper(extracted_fobj) # type: ignore[misc] | |||
yield new_pathname, StreamWrapper(extracted_fobj, data_stream) # type: ignore[misc] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Pass in name
to StreamWrapper
?
…d streams" [ghstack-poisoned]
ghstack-source-id: 60a7f917991aa545487a88bbf6c147b6faff50e0 Pull Request resolved: #560
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one case.
for remaining in ref_it: | ||
janitor(remaining) | ||
|
||
# TODO(VItalyFedyunin): This should be Exception or warn when debug mode is enabled | ||
if len(self.buffer) > 0: | ||
for k, v in self.buffer.items(): | ||
janitor(v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should put it into try-finally
since we might raise ValueError
in the middle of iteration.
…d streams" [ghstack-poisoned]
ghstack-source-id: b796cfbf986dd01362a0d84b18915725428f0fa5 Pull Request resolved: #560
@VitalyFedyunin has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
Stack from ghstack (oldest at bottom):
Differential Revision: D37625298