Skip to content

Commit

Permalink
Add round robin sharding documentation (#1050)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #1050

Also fixed some wording and typos

Test Plan: Imported from OSS

Reviewed By: ejguan

Differential Revision: D43625322

Pulled By: NivekT

fbshipit-source-id: 37adee2d72fdf20c12560eb2134ef93749ac0293
  • Loading branch information
NivekT authored and ejguan committed Feb 28, 2023
1 parent 63e5732 commit 68dc43b
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/source/dlv2_tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Here is an example of a ``DataPipe`` graph:
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapiep.map(fn).batch(8)
datapipe = datapipe.map(fn).batch(8)
Multiprocessing
----------------
Expand Down
10 changes: 5 additions & 5 deletions docs/source/dp_tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ Working with DataLoader
---------------------------------------------

In this section, we will demonstrate how you can use ``DataPipe`` with ``DataLoader``.
For the most part, you should be able to use it just by passing ``dataset=datapipe`` as an input arugment
For the most part, you should be able to use it just by passing ``dataset=datapipe`` as an input argument
into the ``DataLoader``. For detailed documentation related to ``DataLoader``,
please visit `this page <https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading>`_.
please visit `this PyTorch Core page <https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading>`_.


Please refer to `this page <dlv2_tutorial.html>`_ about using ``DataPipe`` with ``DataLoader2``.
Expand Down Expand Up @@ -102,7 +102,7 @@ pass defined functions to DataPipes rather than lambda functions because the for
def filter_for_data(filename):
return "sample_data" in filename and filename.endswith(".csv")
def row_processer(row):
def row_processor(row):
return {"label": np.array(row[0], np.int32), "data": np.array(row[1:], dtype=np.float64)}
def build_datapipes(root_dir="."):
Expand All @@ -112,7 +112,7 @@ pass defined functions to DataPipes rather than lambda functions because the for
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
# Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
datapipe = datapipe.shuffle()
datapipe = datapipe.map(row_processer)
datapipe = datapipe.map(row_processor)
return datapipe
Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe into the DataLoader. Note that
Expand Down Expand Up @@ -180,7 +180,7 @@ Note:
- Place ``ShardingFilter`` (``datapipe.sharding_filter``) as early as possible in the pipeline, especially before expensive
operations such as decoding, in order to avoid repeating these expensive operations across worker/distributed processes.
- For the data source that needs to be sharded, it is crucial to add ``Shuffler`` before ``ShardingFilter``
to ensure data are globally shuffled before splitted into shards. Otherwise, each worker process would
to ensure data are globally shuffled before being split into shards. Otherwise, each worker process would
always process the same shard of data for all epochs. And, it means each batch would only consist of data
from the same shard, which leads to low accuracy during training. However, it doesn't apply to the data
source that has already been sharded for each multi-/distributed process, since ``ShardingFilter`` is no
Expand Down
12 changes: 7 additions & 5 deletions docs/source/reading_service.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
:tocdepth: 3

.. currentmodule:: torchdata.datapipes.iter

ReadingService
===============

Expand All @@ -13,9 +15,9 @@ Dynamic Sharding

Dynamic sharding is achieved by ``MultiProcessingReadingService`` and ``DistributedReadingService`` to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of ``DataPipe`` letting users to define the sharding place within the pipeline.

- ``sharding_filter``: When the pipeline is replicable, each distributed/multiprocessing worker loads data from one replica of the ``DataPipe`` graph, and skip the data not blonged to the corresponding worker at the place of ``sharding_filter``.
- ``sharding_filter`` (:class:`ShardingFilter`): When the pipeline is replicable, each distributed/multiprocessing worker loads data from its own replica of the ``DataPipe`` graph, while skipping samples that do not belong to the corresponding worker at the point where ``sharding_filter`` is placed.

- ``sharding_round_robin_dispatch``: When there is any ``sharding_round_robin_dispatch`` ``DataPipe`` in the pipeline, that branch will be treated as a non-replicable branch. Then, a single dispatching process will be created to load data from the non-repliable branch and distributed data to the subsequent worker processes.
- ``sharding_round_robin_dispatch`` (:class:`ShardingRoundRobinDispatcher`): When there is any ``sharding_round_robin_dispatch`` ``DataPipe`` in the pipeline, that branch (i.e. all DataPipes prior to ``sharding_round_robin_dispatch``) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distributed data to the subsequent worker processes.

The following is an example of having two types of sharding strategies in the pipeline.

Expand Down Expand Up @@ -116,12 +118,14 @@ When multiprocessing takes place, the graph becomes:

``Client`` in the graph is a ``DataPipe`` that send request and receive response from multiprocessing queues.

.. module:: torchdata.dataloader2

Determinism
^^^^^^^^^^^^

In ``DataLoader2``, a ``SeedGenerator`` becomes a single source of randomness and each ``ReadingService`` would access to it via ``initialize_iteration()`` and generate corresponding random seeds for random ``DataPipe`` operations.

In order to make sure that the Dataset shards are mutually exclusive and collectively exhaunsitve on multiprocessing processes and distributed nodes, ``MultiProcessingReadingService`` and ``DistributedReadingService`` would help ``DataLoader2`` to synchronize random states for any random ``DataPipe`` operation prior to ``sharding_filter`` or ``sharding_round_robin_dispatch``. For the remaining ``DataPipe`` operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ``ReadingService``, in order to perform different random transformations.
In order to make sure that the Dataset shards are mutually exclusive and collectively exhaustive on multiprocessing processes and distributed nodes, ``MultiProcessingReadingService`` and ``DistributedReadingService`` would help :class:`DataLoader2` to synchronize random states for any random ``DataPipe`` operation prior to ``sharding_filter`` or ``sharding_round_robin_dispatch``. For the remaining ``DataPipe`` operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ``ReadingService``, in order to perform different random transformations.

Graph Mode
^^^^^^^^^^^
Expand All @@ -131,8 +135,6 @@ This also allows easier transition of data-preprocessing pipeline from research
Extend ReadingService
----------------------

.. module:: torchdata.dataloader2

The followings are interfaces for custom ``ReadingService``.

.. autoclass:: ReadingServiceInterface
Expand Down
1 change: 1 addition & 0 deletions docs/source/torchdata.datapipes.iter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ A miscellaneous set of DataPipes with different functionalities.
Prefetcher
RandomSplitter
ShardingFilter
ShardingRoundRobinDispatcher

Selecting DataPipes
-------------------------
Expand Down
26 changes: 22 additions & 4 deletions torchdata/datapipes/iter/util/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,35 @@
@functional_datapipe("sharding_round_robin_dispatch")
class ShardingRoundRobinDispatcherIterDataPipe(IterDataPipe):
r"""
Wrapper that indicates the prior ``DataPipe`` graph is non-replicable and will be
iterated in a separate dispatching process to coordinate data to worker processes
in a round-robin manner, when multiprocessing takes place
Wrapper that indicates the prior section of ``DataPipe`` graph is non-replicable and will be
iterated in a separate, single dispatching process to distribute data to worker processes
in a round-robin manner when multiprocessing is being used.
(functional name: ``sharding_round_robin_dispatch``).
Args:
source_datapipe: Iterable DataPipe that will be sharded
sharding_group_filter: Optional ``SHARDING_PRIORITIES`` value
Note:
- ``sharding_group_filter`` only accepts ``SHARDING_PRIORITIES.MULTIPROCESSING`` for now
- ``sharding_group_filter`` only accepts ``SHARDING_PRIORITIES.MULTIPROCESSING`` for now
- When using distributed training, you can add a ``sharding_filter()`` prior to this DataPipe
to distribute samples among worker nodes.
Examples:
>>> # xdoctest: +SKIP
>>> from torchdata.datapipes.iter import IterableWrapper
>>> from torch.utils.data.datapipes.iter.sharding import SHARDING_PRIORITIES
>>> dp = IterableWrapper(range(10))
>>> # `.shuffle()` will be executed in a single dispatching processing, then the samples are distributed
>>> # to worker processes
>>> dp = dp.shuffle().sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)
>>> # `.map()` will be executed within each worker process
>>> dp = dp.map(lambda x: x + 1)
>>> # Distributed case: the 10 samples will be distributed among the nodes
>>> dp = IterableWrapper(range(10)).sharding_filter()
>>> # `.map()` will be executed in a single dispatching processing in each node
>>> # You may apply further transformation after within each worker process
>>> dp = dp.map(lambda x: x + 1).sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)
"""

def __init__(self, source_datapipe: IterDataPipe, sharding_group_filter: Optional[SHARDING_PRIORITIES] = None):
Expand Down

0 comments on commit 68dc43b

Please sign in to comment.