Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ps quick start #4848

Merged
merged 10 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 66 additions & 11 deletions docs/guides/06_distributed_training/cluster_quick_start_ps_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

因此参数服务器模式对于存储超大规模模型参数的训练场景十分友好,常被用于训练拥有海量稀疏参数的搜索推荐领域模型。

2.1 任务介绍
1.1 任务介绍
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

本节将采用推荐领域非常经典的模型wide_and_deep为例,介绍如何使用飞桨分布式完成参数服务器训练任务,本次快速开始的完整示例代码位于 https://github.com/PaddlePaddle/FleetX/tree/develop/examples/wide_and_deep_dataset。
在编写分布式训练程序之前,用户需要确保已经安装PaddlePaddle2.3及以上版本的飞桨开源框架。

2.2 操作方法
1.2 操作方法
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

参数服务器训练的基本代码主要包括如下几个部分:
Expand All @@ -37,7 +37,7 @@

下面将逐一进行讲解。

2.2.1 导入依赖
1.2.1 导入依赖
""""""""""""

导入必要的依赖,例如分布式训练专用的Fleet API(paddle.distributed.fleet)。
Expand All @@ -47,7 +47,7 @@
import paddle
import paddle.distributed.fleet as fleet

2.2.2 定义分布式模式并初始化分布式训练环境
1.2.2 定义分布式模式并初始化分布式训练环境
""""""""""""

通过 ``fleet.init()`` 接口,用户可以定义训练相关的环境,注意此环境是用户预先在环境变量中配置好的,包括:训练节点个数,服务节点个数,当前节点的序号,服务节点完整的IP:PORT列表等。
Expand All @@ -58,7 +58,8 @@
paddle.enable_static()
fleet.init(is_collective=False)

2.2.3 加载模型
1.2.3 加载模型
""""""""""""

.. code-block:: python

Expand All @@ -67,27 +68,81 @@
model = WideDeepModel()
model.net(is_train=True)

2.2.4 构建dataset加载数据
1.2.4 构建dataset加载数据
""""""""""""

由于搜索推荐场景涉及到的训练数据通常较大,为提升训练中的数据读取效率,参数服务器采用InMemoryDataset/QueueDataset进行高性能的IO。

InMemoryDataset/QueueDataset所对应的数据处理脚本参考examples/wide_and_deep_dataset/reader.py,与单机DataLoader相比,存在如下区别:

1. 继承自 ``fleet.MultiSlotDataGenerator`` 基类。
2. 实现基类中的 ``generate_sample()`` 函数,逐行读取数据进行处理(不需要对数据文件进行操作),并返回一个可以迭代的reader方法。
3. reader方法需返回一个list,其中的每个元素都是一个元组,元组的第一个元素为特征名(string类型),第二个元素为特征值(list类型)

一个完整的reader.py伪代码如下:

.. code-block:: python

import paddle
# 导入所需要的fleet依赖
import paddle.distributed.fleet as fleet

# 需要继承fleet.MultiSlotDataGenerator
class WideDeepDatasetReader(fleet.MultiSlotDataGenerator):
def line_process(self, line):
features = line.rstrip('\n').split('\t')
# 省略数据处理过程,具体可参考单机reader
# 返回值为一个list,其中的每个元素均为一个list,不需要转成np.array格式
return [dense_feature] + sparse_feature + [label]

# 实现generate_sample()函数
# 该方法有一个名为line的参数,只需要逐行处理数据,不需要对数据文件进行操作
def generate_sample(self, line):
def wd_reader():
# 按行处理数据
input_data = self.line_process(line)

# 构造特征名数组feature_name
feature_name = ["dense_input"]
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")

# 返回一个list,其中的每个元素都是一个元组
# 元组的第一个元素为特征名(string类型),第二个元素为特征值(list类型)
yield zip(feature_name, input_data)

# generate_sample()函数需要返回一个可以迭代的reader方法
return wd_reader

if __name__ == "__main__":
my_data_generator = WideDeepDatasetReader()
my_data_generator.run_from_stdin()

在训练脚本中,构建dataset加载数据:

.. code-block:: python

# 具体数据处理参考examples/wide_and_deep_dataset中reader.py
dataset = paddle.distributed.QueueDataset()
thread_num = 1

# use_var指定网络中的输入数据,pipe_command指定数据处理脚本
# 要求use_var中输入数据的顺序与数据处理脚本输出的特征顺序一一对应
dataset.init(use_var=model.inputs,
pipe_command="python reader.py",
batch_size=batch_size,
thread_num=thread_num)

train_files_list = [os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)]

# set_filelist指定dataset读取的训练文件的列表
dataset.set_filelist(train_files_list)

备注:dataset具体用法参见\ `使用InMemoryDataset/QueueDataset进行训练 <https://fleet-x.readthedocs.io/en/latest/paddle_fleet_rst/parameter_server/performance/dataset.html>`_\。
备注:dataset更详细用法参见\ `使用InMemoryDataset/QueueDataset进行训练 <https://fleet-x.readthedocs.io/en/latest/paddle_fleet_rst/parameter_server/performance/dataset.html>`_\。


2.2.5 定义同步训练 Strategy 及 Optimizer
1.2.5 定义同步训练 Strategy 及 Optimizer
""""""""""""

在Fleet API中,用户可以使用 ``fleet.DistributedStrategy()`` 接口定义自己想要使用的分布式策略。
Expand All @@ -104,7 +159,7 @@
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

2.2.6 开始训练
1.2.6 开始训练
""""""""""""

完成模型及训练策略以后,我们就可以开始训练模型了。因为在参数服务器模式下会有不同的角色,所以根据不同节点分配不同的任务。
Expand Down Expand Up @@ -138,7 +193,7 @@
备注:Paddle2.3版本及以后,ParameterServer训练将废弃掉dataloader + exe.run()方式,请切换到dataset + exe.train_from_dataset()方式。


2.3 运行训练脚本
1.3 运行训练脚本
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

定义完训练脚本后,我们就可以用 ``fleetrun`` 指令运行分布式任务了。其中 ``server_num`` , ``worker_num`` 分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有2个。
Expand Down
7 changes: 3 additions & 4 deletions docs/guides/06_distributed_training/index_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
您可以通过以下内容,了解飞桨分布式训练的特性和使用指南:

- `环境部署 <./deployment_cn.html>`_ : 部署环境以使用飞桨框架进行分布式训练。
- `分布式训练快速开始 <./cluster_quick_start_cn.html>`_ : 使用飞桨框架快速开始分布式训练
- `快速开始-数据并行 <./cluster_quick_start_collective_cn.html>`_ : 使用飞桨数据并行快速开始分布式训练
- `快速开始-参数服务器 <./cluster_quick_start_ps_cn.html>`_ : 使用飞桨参数服务器快速开始分布式训练。
- `使用FleetAPI进行分布式训练 <./fleet_api_howto_cn.html>`_ : 使用飞桨框架FleetAPI完成分布式训练。

.. toctree::
:hidden:

deployment_cn.rst
cluster_quick_start_cn.rst
fleet_api_howto_cn.rst
cluster_quick_start_collective_cn.rst
cluster_quick_start_ps_cn.rst