forked from pytorch/data
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update
mux_longest
data pipe (pytorch#372)
Summary: Pull Request resolved: pytorch#372 OSS issue discussion: pytorch#346 This diff updates `mux_longest` data pipe. `mux_longest`: Yields one element at a time from each of the input Iterable DataPipes (functional name: ``mux_longest``). As in, one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration, and so on. It skips over DataPipes that are exhausted, and ends when all input DataPipes are exhausted. This is same as current `MultiplexerIterDataPipe` in pytorch (https://github.com/pytorch/pytorch/blob/4fb7fa081e4fb5df3bf7bc85dcb9a3a9a3ac7133/torch/utils/data/datapipes/iter/combining.py#L375-L390) `mux_longest` example: ``` >>> from torchdata.datapipes.iter import IterableWrapper >>> dp1, dp2, dp3 = IterableWrapper(range(5)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25)) >>> list(dp1.mux_longest(dp2, dp3)) [0, 10, 20, 1, 11, 21, 2, 12, 22, 3, 13, 23, 4, 14, 24] ``` Reviewed By: NivekT, ejguan Differential Revision: D35805772 fbshipit-source-id: 095409427cf3714fb5f94bd99a090a6603526225
- Loading branch information
1 parent
41e16d2
commit 271300d
Showing
5 changed files
with
96 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Copyright (c) Meta Platforms, Inc. and affiliates. | ||
# All rights reserved. | ||
# | ||
# This source code is licensed under the BSD-style license found in the | ||
# LICENSE file in the root directory of this source tree. | ||
|
||
from torch.utils.data.datapipes._decorator import functional_datapipe | ||
from torch.utils.data.datapipes.datapipe import IterDataPipe | ||
from typing import Sized, Set, Optional | ||
|
||
|
||
@functional_datapipe('mux_longest') | ||
class MultiplexerLongestIterDataPipe(IterDataPipe): | ||
r""" | ||
Yields one element at a time from each of the input Iterable DataPipes (functional name: ``mux_longest``). As in, | ||
one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration, | ||
and so on. It skips over DataPipes that are exhausted, and ends when all input DataPipes are exhausted. | ||
Args: | ||
datapipes: Iterable DataPipes that will take turn to yield their elements, until they are all exhausted | ||
Example: | ||
>>> from torchdata.datapipes.iter import IterableWrapper | ||
>>> dp1, dp2, dp3 = IterableWrapper(range(5)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25)) | ||
>>> list(dp1.mux_longest(dp2, dp3)) | ||
[0, 10, 20, 1, 11, 21, 2, 12, 22, 3, 13, 23, 4, 14, 24] | ||
""" | ||
def __init__(self, *datapipes): | ||
self.datapipes = datapipes | ||
self.length: Optional[int] = None | ||
|
||
def __iter__(self): | ||
iterators = [iter(x) for x in self.datapipes] | ||
finished: Set[int] = set() | ||
while len(finished) < len(iterators): | ||
for i in range(len(iterators)): | ||
if i not in finished: | ||
try: | ||
value = next(iterators[i]) | ||
yield value | ||
except StopIteration: | ||
finished.add(i) | ||
|
||
def __len__(self): | ||
if self.length is not None: | ||
if self.length == -1: | ||
raise TypeError("{} instance doesn't have valid length".format(type(self).__name__)) | ||
return self.length | ||
if all(isinstance(dp, Sized) for dp in self.datapipes): | ||
self.length = sum(len(dp) for dp in self.datapipes) | ||
else: | ||
self.length = -1 | ||
return len(self) |