Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ThreadPoolMapper #1052

Closed
wants to merge 20 commits into from
Closed

Add ThreadPoolMapper #1052

wants to merge 20 commits into from

Conversation

SvenDS9
Copy link
Contributor

@SvenDS9 SvenDS9 commented Feb 27, 2023

Fixes #1045

Changes

  • Add ThreadPoolMapper datapipe
  • Add tests

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Feb 27, 2023
@SvenDS9
Copy link
Contributor Author

SvenDS9 commented Feb 27, 2023

Currently I am unhappy with a few things and open for suggestions:

  • the naming is confusing: the BatchMapper applies a function to the whole batch while the BatchAsyncMapper and BatchThreadPoolMapper apply a function to every element in the batch. Use prepositon 'in'?
  • By using flatmap we lose access to len
  • test coverage for ThreadPoolMapper must be improved, we should be able to copy all tests of the normal Mapper (+ additional tests?)
  • code duplicaton in merge_batch_with_result with AsyncMapper -> extract function?

I also noticed two unrelated things, that should be improved:

@SvenDS9 SvenDS9 marked this pull request as ready for review February 27, 2023 15:51
@ejguan
Copy link
Contributor

ejguan commented Feb 27, 2023

  • the naming is confusing: the BatchMapper applies a function to the whole batch while the BatchAsyncMapper and BatchThreadPoolMapper apply a function to every element in the batch. Use prepositon 'in'?

Good catch. It might be better to just name them ThreadPoolMapper and AsyncMapper. WDYT?
cc: @wenleix @NivekT

  • By using flatmap we lose access to len

Yeah. IMHO, len is currently not well supported in the regime of DataPipe. To resolve it, we might want to make those two DataPipe self-containing all the batch and flatmap functionality.

test coverage for ThreadPoolMapper must be improved, we should be able to copy all tests of the normal Mapper (+ additional tests?)

I think one extra test to validate multiple ThreadPoolMapper running at the same time should be good.

  • code duplicaton in merge_batch_with_result with AsyncMapper -> extract function?

Yea, that would be an ideal case.

flatmap without a function and unbatch do the same thing

I think they are not exactly same. unbatch support multiple-level of unnested.

Thank you for catching that. Updated the issue. There has been always a project in my mind to do a auto-detection on those uncovered test cases.

@SvenDS9
Copy link
Contributor Author

SvenDS9 commented Feb 28, 2023

Yeah. IMHO, len is currently not well supported in the regime of DataPipe. To resolve it, we might want to make those two DataPipe self-containing all the batch and flatmap functionality.

Alternatively we could modify lengthsetter to not only accept int but also Exceptions to be thrown when calling len to circumvent the issue, but TBH I don't particularly like this approach.

Noob question:
Is the test_map_tuple_list_with_col_iterdatapipe actually executed? When I run it locally it fails due to mismatched types. E.g.:

_helper(lambda data: (data[0], data[1], data[0] + data[1]), fn_n1_def, [0, 1], 2)

The return value of the lambda function is always a tuple. but in _helper we also test with lists.

@ejguan
Copy link
Contributor

ejguan commented Feb 28, 2023

Alternatively we could modify lengthsetter to not only accept int but also Exceptions to be thrown when calling len to circumvent the issue, but TBH I don't particularly like this approach.

Actually, we can extend flatmap to accept a len_fn

Is the test_map_tuple_list_with_col_iterdatapipe actually executed? When I run it locally it fails due to mismatched types.

I think it's executed. This test passes locally on my machine.

torchdata/datapipes/iter/transform/callable.py Outdated Show resolved Hide resolved
test/test_iterdatapipe.py Show resolved Hide resolved
test/test_iterdatapipe.py Outdated Show resolved Hide resolved
@SvenDS9 SvenDS9 changed the title Add BatchThreadPoolMapper Add ThreadPoolMapper Mar 2, 2023
@SvenDS9 SvenDS9 requested a review from ejguan March 4, 2023 10:46
@SvenDS9
Copy link
Contributor Author

SvenDS9 commented Mar 6, 2023

After pytorch/pytorch#95067 got merged some tests are currently broken in torchvision (transforms.Identity(), AttributeError: 'Identity' object has no attribute 'name')

Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize this is almost a same DataPipe as prefetch except we provided a function to be applied.
https://github.com/pytorch/data/blob/main/torchdata/datapipes/iter/util/prefetcher.py

This can be a base class for both prefetch and pin_memory where prefetch takes a noop function but pin_memory takes a default pin_memory_fn and is not replicable.

Even fullsync takes the similar approach.

Would you pls take a reference from those them to see if we can consolidate them?

torchdata/datapipes/iter/transform/callable.py Outdated Show resolved Hide resolved
torchdata/datapipes/iter/transform/callable.py Outdated Show resolved Hide resolved
@SvenDS9
Copy link
Contributor Author

SvenDS9 commented Mar 7, 2023

I just realize this is almost a same DataPipe as prefetch except we provided a function to be applied.
https://github.com/pytorch/data/blob/main/torchdata/datapipes/iter/util/prefetcher.py
This can be a base class for both prefetch and pin_memory where prefetch takes a noop function but pin_memory takes a default pin_memory_fn and is not replicable.

Are you sure? Prefetcher calls next on the previous dp in the background, while here next is blocking and we execute a function in parallel.

@ejguan
Copy link
Contributor

ejguan commented Mar 9, 2023

Are you sure? Prefetcher calls next on the previous dp in the background, while here next is blocking and we execute a function in parallel.

Emm you are right. Thanks for correcting me! Please let me know when the PR is ready to be reviewed. You can ask for review by doing re-request review to the reviewer section.

@SvenDS9
Copy link
Contributor Author

SvenDS9 commented Mar 9, 2023

I have made two more comments (see above) but in principal I think the PR is ready for review.

@SvenDS9 SvenDS9 requested a review from ejguan March 9, 2023 17:07
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I think the implementation and test look good now. I only have the comment to do more benchmarking so that we can give users suggestion on the setting for max_workers and scheduled_tasks.

Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much! This is awesome!

@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@ejguan merged this pull request in fea20d4.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ThreadMapperIterDatapipe
4 participants