-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement FullSyncIterDataPipe #713
Conversation
return self.error is not None | ||
|
||
|
||
class _PrefetchExecutor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This executor takes reference from the internal implementation: https://fburl.com/code/7dk6mvs4
On top of the implementation, I added prefetch_size
and attached index
to Expected
object to make sure it can work with Prefetch
in the future.
Adding test now. |
ffe3a1e
to
fd98a28
Compare
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
test/test_distributed.py
Outdated
data_length = 23 | ||
dp = IterableWrapper(list(range(data_length))).sharding_filter().fullsync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without fullsync
, this pipeline would hang forever.
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
# LICENSE file in the root directory of this source tree. | ||
|
||
# Use the same timeout as PyTorch Distributed | ||
default_timeout_in_s = 30 * 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a comment (no change) - we should put other things such as default buffer size here too.
which is caused by uneven sharded data (functional name: ``fullsync``). It should | ||
be appended at the end of the graph of ``DataPipe`` by ``DistributedReadingService`` | ||
automatically. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: do we recommend against usage of this DataPipe outside of a ReadingService? If not, can we potentially include an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Will add it even though we should always recommend users relying on RS
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
def __getstate__(self): | ||
if IterDataPipe.getstate_hook is not None: | ||
return IterDataPipe.getstate_hook(self) | ||
state = ( | ||
self.datapipe, | ||
self.timeout, | ||
) | ||
return state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, checkpoint for fullsync or prefetch is a little tricky.
Let's confirm the expected behavior. When we do checkpoint, we should pause any further prefetching and save all prefetched data into a buffer. Then, we serialize the buffer ant inner datapipe (because we have to serialize datapipe after prefetching is done). And, only when we start iteration again, would we start prefetching again.
WDYT: @VitalyFedyunin @NivekT
Then, the whole logic of fullsync should be changed. This is even more complicated when the data ends when put the prefetched data into the buffer. I might open a new PR to achieve serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think we should stop the prefetch and capture current data. I feel this can be similar to internal client snapshot, so https://fburl.com/code/6hrjawgh may be helpful for reference
I will land this PR for now. List two follow-up works:
|
Changes
_PrefetchExecutor
to run prefetching in multi-threadingPrefetchIterDataPipe
FullSyncIterDataPipe
FullSyncIterDataPipe
is unclear to me)