Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PrototypeRS] Adding 'pause' and 'resume' operations to halt DataPipes #879

Closed
wants to merge 45 commits into from

Conversation

NivekT
Copy link
Contributor

@NivekT NivekT commented Nov 3, 2022

Stack from ghstack:

The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with sharding_round_robin_dispatch. This will be fixed in an upcoming PR and is tracked by #991.


Implementation note (feel free to discuss):
_pause exists for both DataLoader2Iterator and DataLoader2 as private methods.
resume exists for both DataLoader2Iterator as public but it is private for DataLoader2.

limit and clear_limit exist only for DataLoader2Iterator as a public methods.

  • Limit persists through iterations (with the iterator) until it is manually cleared via clear_limit.
  • Creating a new iterator (iter(dl2)) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: D41744759

NivekT added a commit that referenced this pull request Nov 3, 2022
ghstack-source-id: 75446ff3de241b6f06f17375f064d1bdcf4d020e
Pull Request resolved: #879
@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Nov 3, 2022
@NivekT NivekT marked this pull request as draft November 3, 2022 22:45
Comment on lines 142 to 146
graph = traverse_dps(source_datapipe)
for dp, _ in graph.values():
if hasattr(dp, "full_stop") and callable(dp.full_stop):
dp.full_stop()
protocol.response_full_stop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any potential issue with traversing through the graph and calling .full_stop() on all of them?

torchdata/dataloader2/reading_service.py Outdated Show resolved Hide resolved
torchdata/datapipes/iter/util/prefetcher.py Outdated Show resolved Hide resolved
@NivekT
Copy link
Contributor Author

NivekT commented Nov 4, 2022

Another thing to discuss: should we do double underscore or something to protect __reset`` and __full_stop` from users?

The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
NivekT added a commit that referenced this pull request Nov 7, 2022
ghstack-source-id: f7074399cac090f0c2821bd2f54cf5d74fbf350d
Pull Request resolved: #879
Copy link
Contributor Author

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the implementation but I still need some test cases. Suggestions are welcomed.

torchdata/datapipes/iter/util/prefetcher.py Outdated Show resolved Hide resolved
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the name to pause? full stop seems like an operation ran at the end epoch

torchdata/datapipes/iter/util/prefetcher.py Outdated Show resolved Hide resolved
torchdata/datapipes/iter/util/prefetcher.py Outdated Show resolved Hide resolved
The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
NivekT added a commit that referenced this pull request Nov 10, 2022
ghstack-source-id: 15842d2f4fdf6fff618dbcca03769debb1ada883
Pull Request resolved: #879
The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
NivekT added a commit that referenced this pull request Nov 11, 2022
ghstack-source-id: 9184795806b214e9adb6e786d28988d251bc63f5
Pull Request resolved: #879
@NivekT NivekT changed the title [PrototypeRS] Adding 'full stop' to halt DataPipes [PrototypeRS] Adding 'pause' and 'resume' operations to halt DataPipes Nov 11, 2022
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
NivekT added a commit that referenced this pull request Nov 14, 2022
ghstack-source-id: 9164020ea440fb43eab6c8902c878efddef1ddb9
Pull Request resolved: #879
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
NivekT added a commit that referenced this pull request Nov 16, 2022
ghstack-source-id: a99c23a0ee69082acea7ed0b8d9d5793d2fd42ff
Pull Request resolved: #879
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
NivekT added a commit that referenced this pull request Nov 16, 2022
ghstack-source-id: 023aa6972a9006dc349481105c392c2a219f2649
Pull Request resolved: #879
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
NivekT added a commit that referenced this pull request Nov 16, 2022
ghstack-source-id: dfb19de6a1d4cf4d4ab70fa3f83a5616107106b6
Pull Request resolved: #879
@ejguan
Copy link
Contributor

ejguan commented Nov 17, 2022

I forget to mention about one scenario. If I iterate datapipe after pause but before resume, I expect a StopIteration is raised. Am I right?

>>> dp.pause()
>>> for d in dp:
...     print(d)  # Nothing should be printed
>>> dp.resume()
>>> for d in dp:
...     print(d)
# Values

@NivekT
Copy link
Contributor Author

NivekT commented Nov 17, 2022

I forget to mention about one scenario. If I iterate datapipe after pause but before resume, I expect a StopIteration is raised. Am I right?

>>> dp.pause()
>>> for d in dp:
...     print(d)  # Nothing should be printed
>>> dp.resume()
>>> for d in dp:
...     print(d)
# Values

If you try to call next after pause, this implementation raises RuntimeError.

@ejguan
Copy link
Contributor

ejguan commented Nov 17, 2022

If you try to call next after pause, this implementation raises RuntimeError.

If we want to achieve mini-epoch for OSS as well, we might need it to be StopIteration.

@NivekT
Copy link
Contributor Author

NivekT commented Nov 17, 2022

I see. I think what we can do is:

Inside the internal of PrototypeRS, if it cannot submit a GetNextRequest after pause without calling resume first. This is implemented in this PR.

Publicly, DataLoader2 can behave different (maybe like you said return StopIteration), but it needs to do so without triggering reset when a new iterator is made? That might get a bit tricky and I need to think more about it. This part is not covered by this PR (since I'm not exposing public API of pause or resume here).

…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.


[ghstack-poisoned]
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good in terms of test cases. I left a few comments below

torchdata/dataloader2/reading_service.py Outdated Show resolved Hide resolved
torchdata/dataloader2/reading_service.py Outdated Show resolved Hide resolved
torchdata/dataloader2/reading_service.py Outdated Show resolved Hide resolved
torchdata/dataloader2/reading_service.py Outdated Show resolved Hide resolved
self._executor = None

def resume(self):
self._executor = _PrefetchExecutor(iter(self.datapipe), 1, self._callback_fn, self.timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the resume for FullSync is going to re-read from the beginning. Is this expected? Or, we can raise Exception for fullsync saying it doesn't support checkpointing yet.

Copy link
Contributor Author

@NivekT NivekT Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way for it not to read from the beginning again? Halting the executor by changing __iter__?

For now, I can raise an Exception in both pause and resume.

torchdata/dataloader2/communication/iter.py Outdated Show resolved Hide resolved
torchdata/dataloader2/communication/iter.py Outdated Show resolved Hide resolved
torchdata/dataloader2/communication/iter.py Outdated Show resolved Hide resolved
test/dataloader2/test_proto_multi_rs.py Outdated Show resolved Hide resolved
@NivekT
Copy link
Contributor Author

NivekT commented Feb 7, 2023

CI is expected to fail until PyTorch core nightly updates

…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
@NivekT
Copy link
Contributor Author

NivekT commented Feb 8, 2023

@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
@NivekT
Copy link
Contributor Author

NivekT commented Feb 8, 2023

Addressed and resolved most of the comments but 2. CI is being re-ran after nightly updated.

Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Thank you. I left a few comments to help simplify the test and a comment for internal users

test/dataloader2/test_proto_multi_rs.py Outdated Show resolved Hide resolved
test/dataloader2/test_proto_multi_rs.py Show resolved Hide resolved
test/dataloader2/test_proto_multi_rs.py Outdated Show resolved Hide resolved
test/dataloader2/test_proto_multi_rs.py Outdated Show resolved Hide resolved
torchdata/dataloader2/dataloader2.py Outdated Show resolved Hide resolved
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
@NivekT
Copy link
Contributor Author

NivekT commented Feb 8, 2023

@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
@NivekT
Copy link
Contributor Author

NivekT commented Feb 9, 2023

@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

…lt DataPipes"


The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.

This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991.

---
Implementation note (feel free to discuss):
`_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods.
`resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`.

`limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods.
- Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`.
- Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator.

Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759)

[ghstack-poisoned]
@NivekT
Copy link
Contributor Author

NivekT commented Feb 10, 2023

@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@NivekT
Copy link
Contributor Author

NivekT commented Feb 13, 2023

@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@NivekT merged this pull request in cfcdf6e.

self.thread = thread
self.thread.start()

while prefetch_data.run_prefetcher:
Copy link
Contributor

@ejguan ejguan Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing: Should this line be while not prefetch_data.stop_iteration:? The current implementation means that iterator exits earlier than it's supposed to be.

It's a little bit fuzzy to me if we are able to call next over the iterator of this DataPipe after pause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my main question is if pause of DataPipe is exposed to users.

Copy link
Contributor Author

@NivekT NivekT Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. Though from a glance changing that while condition will also require changing the exception conditions.

Particularly, it should become except StopIteration: prefetch_data.run_prefetcher = False and the same for except communication.iter.InvalidStateResetRequired.

Overall, it should be:
prefetch_data.run_prefetcher - indicates whether or not we should be prefetching, it should become False when the source runs out of elements, invalid state, termination, and maybe other exception raised.
prefetch_data.stop_iteration - this should. indicate when we should stop yielding elements, it should mostly be the same as above except when the source is out of elements, it should keep yielding until the buffer is empty.

There maybe other corner cases I have not stated here. We can test it out and see.

Comment on lines +72 to +75
except StopIteration:
prefetch_data.stop_iteration = True
except communication.iter.InvalidStateResetRequired:
prefetch_data.stop_iteration = True
Copy link
Contributor

@ejguan ejguan Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my last comment. stop_iteration should dominate run_prefetcher. When stop_iterator is met, there is no reason to process with prefetching, right?

Whenever stop_iteration turns to True, run_prefetcher should be False

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you stated should be True, but it requires changing other parts as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants