Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paddle cluster design #1696

Merged
merged 13 commits into from
Apr 24, 2017
161 changes: 161 additions & 0 deletions doc/design/cluster_design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Paddle大规模分布式训练设计

## 概览
参考[这里](https://github.com/PaddlePaddle/Paddle/pull/1620/files)

## 分布式训练架构

常见的深度学习分布式训练的架构如图:

<img src="images/trainer.png"/>

为了完成一个深度学习的训练任务,集群中会运行多个trainer和parameter server,集群会把模型的参
数分布式的存储在多个parameter server上,trainer完成每个mini-batch数据训练之后会把梯度发送
Copy link
Contributor

@helinwang helinwang Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"trainer完成每个mini-batch数据训练之后会把梯度发送 给parameter server,parameter server将某个分片的模型参数和梯度执行整合和优化。然后trainer 从所有的parameter server下载模型参数并开始下一轮mini-batch的训练。":这个是synchronous SDG,即trainer每一轮计算完了立即与parameter server同步所有的gradient,parameter等所有trainer计算完一起更新parameter,然后trainer再进行下一轮。

现在咱们parameter server架构支持的另一种方法是asynchronous SDG,parameter同步并不是每一轮计算都需要进行,单个trainer也不需要等待其他trainer。这里面讲的比较详细,data parallel部分:
screen shot 2017-03-23 at 2 44 30 pm

建议这里可以不这么详细的说更新过程,或者分两类synchronous SDG, asynchronous SDG来详细的说更新过程。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

分synchronous SGD, asynchronous SGD分别描述了更新过程。

给parameter server,parameter server将某个分片的模型参数和梯度执行整合和优化。然后trainer
从所有的parameter server下载模型参数并开始下一轮mini-batch的训练。

可以看到,可以进一步的优化以下方面:
1. 模型的参数是保存在parameter server进程的内存中的。在一个训练任务过程中任意一台
parameter server不能异常退出,否则训练不能继续执行
1. 不能在一个训练任务中动态的增加Trainer个数或parameter个数
1. parameter server保存模型参数考虑多个备份防止单点故障
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为了简单起见,能否parameter server的模型参数定期存到分布式文件系统中,又分布式文件系统负责多个备份。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有道理!我疏忽了!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, 但如果在pserver层实现一个基于内存的分布式存储不如直接使用外部的存储服务,例如Redis的主从。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果训练任务参数同步会占满网络带宽,使用分布式存储则会和训练任务互相抢占带宽。使用不同的物理网卡区分训练网络带宽和存储网络带宽又会带来额外成本。所以还是改成一个可配置参数?或者要求用户自己mount一个分布式存储?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

定时存储已经把这个影响降低了很多

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero 赞同分布式存储作为一个补充的选项。
pserver的存储的数据会被分为多个shard,每个shard会存在0到多个replic,要保证每次更新都能使集群中的数据达成一致,同时保证高可用性,建议分为两种方式:

  • 默认不启动checkpointing机制,采用本地磁盘做存储
    可以不需要外置的分布式存储服务,pserver的存储直接挂在某个节点的磁盘上,使用多replic的机制保证数据的高可用
    pserver01

更新pserver数据的过程分为写入master shad=>master shard同步至slave shard=>返回master shard=>返回trainer
为了保证数据在内存和磁盘的一致性,每个shard的写入过程需要写入buffer=>写入tranlog=>flush到磁盘=>写入成功。

  • 启动checkpointing机制
    通过 @typhoonzero 提到的每个pserver挂载到一个分布式的Volume,可定期将数据snapshot到存储服务上,这个分布式存储服务会存储每个shard的数据信息,为了保证数据能够恢复到每个pserver上,所以还需要元数据信息。元数据信息中保存了shard数量,pserver的数量等信息。
    pserver02

1. 为了使训练任务至少可以抵御“单点故障”(任意时刻只可能同时有一台服务器故障),模型参数的更新和分发
需要保证原子性操作或满足事务性操作
1. 可以同时调度大量的训练任务和使用模型的应用在一个集群上
1. 支持训练任务的前置任务和后置任务,支持训练任务的定时调度和对在线流式数据的处理

## 模型参数数据备份
为了实现parameter server集群可以容忍单点故障,必须将每个模型参数的分片在集群中存储多个副本。虽然
Copy link
Contributor

@gongweibao gongweibao Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我的理解:parameterserver保存多个备份是不经济的。比如两备份。主从备份的参数是要同步更新的,这个参数量比较大的情况下,加大网络传输压力。

采用单副本,checkpoint恢复的方式是不是更好一些?可以隔一段时间生成一个checkpoint,网络传输压力小很多

也可以考虑使用校验和的技术减少副本大小,但为了整体系统的简单可靠,优先选择使用副本的方式。

<img src="images/replica.png"/>

上图显示了在2台parameter server中实现每个模型参数的分片均保存两个副本的状态。parameter 负责存储
所有参数分片副本并在etcd中同步每个副本的状态。每个分片的多个副本中同时只有一个处于"master"状态,
处于"master"状态的副本是当前活动的副本。当一台parameter server故障时,集群中剩下的parameter server
会重新选举出新的"master"副本并继续提供服务。

用户在启动parameter server是可以指定副本的个数(>=1),副本越多容灾能力越强,越少性能越好。但通常不会
使用>3个的副本配置。

etcd中数据存储格式为:
1. pserver集群状态`[CLUSTER_CHROOT]/pserver_cluster_status`
```json
{
"cluster_status": "OK|UNHEALTHY|UNKNOWN"
"reason": "",
"nodes": [0,1,2,3]
}
```

1. 每个pserver的状态: [CLUSTER_CHROOT]/pservers/[pserverid]
```json
{
"id": 0,
"instance": "pserver1",
"status": "up",
"start_time": 1490184573.25,
"sync": true,
}
```
1. mini-batch计数器,记录此id对应的parameter server正在执行的mini batch id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为何parameter server需要跟mini batch有相关性?照我理解它只用管gradient和parameter。trainer负责mini batch。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我理解错了,在sync SGD是记录的batchid只用来打印性能记录的日志 。

[CLUSTER_CHROOT]/pservers/[pserverid]/mini-batch-id
1. parameter分片信息: [CLUSTER_CHROOT]/pshards/[shardid]/[replicaid]
比如上图显示的分片将生成下面的4个etcd路径:
```bash
/pshards/0/0
/pshards/0/1
/pshards/1/0
/pshards/1/1
```
每个replica的信息如下:
```json
{
"id": 0,
"shardid": 0,
"created": 1490184573.25,
"modified": 1490184573.25,
"status": "master", # indicates the replica is in use
}
```

## 数据一致性
存在多个副本数据的情况下就需要考虑,多个副本之间的数据一致性。如果使用数据强一致性(例如paxos/raft或两段式提交),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有没有可能不用多个副本。比如一个parameter server挂了之后,对应这些parameter的更新暂停,先算其它的parameter。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以的,在下面有介绍。可以配置只使用一个副本,并开启检查点。或者都不开

则在故障恢复时可以获得一个完整的数据集,但每次更新模型参数的性能会下降,因为需要保证多个副本都完全更新之后才算更新
成功。如果使用异步同步(最终一致性),则在重新选举"master"副本时,可能得到的副本并没有完成数据同步。

本文档讨论使用两阶段提交实现模型副本数据的更新。
* 每个副本通常由多个parameter block组成,多个block之间可以并发更新,但更新同一个block需要保证顺序性。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parameter block是什么,需要给出定义。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在附录中增加了术语,也在这里增加了简要的解释

* 每次需要更新一个block的时候,trainer首先向存放"master"副本的服务器提交“准备更新”请求,"master"副本检查其他副本的状态并创建一个更新事务,然后返回OK。
* trainer再向"master"发送变化部分的梯度数据和这份数据的id,然后"master"并发的更新本地和其他副本的模型数据,更新成功返回OK,如果有更新失败的节点,则执行"rollback",退回到更新前状态并返回错误代码。

<img src="images/two_phase_commit.png"/>

## 模型数据检查点(Checkpointing)
模型数据检查点,可以在磁盘上保存一份存储在parameter server内存中的模型数据的完整镜像。在一个不可中断并缺少备份的训练任务中,可以通过阶段性的在每个parameter server的本地磁盘保存检查点快照达到容灾的目的,比如每个pass保存一次快照。在出现单点故障时,只需要恢复这台节点,或者将这台节点迁移到另一个节点并启动即可恢复训练任务。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉还需要每隔一段时间(比如几分钟)存一次,覆盖上一次的。每个pass存一次有点太慢了,比如百度的语音数据很大,一个pass就可能需要1天。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpoint序列化出来的数据最好也是在一个分布式的存储上,防止磁盘或者raid卡坏掉导致无法恢复到某检查点的状态。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修改了描述,可以配置检查点存储周期。


## 训练数据的存储和分发
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉咱们可以考虑一下master怎么自动把数据集分片:
如果是用户自定义格式,master如何知道怎么分片。
如果是我们定义格式,接口是什么(用户如何把他们的数据转换成我们的格式)。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我理解这里分两个部分:数据接入和数据分片

  • 数据接入

    考虑到数据源可能会有Kafka,HDFS,图片或者用户自定义的文件,所以需要支持自定义数据处理的函数,参考之前reader的设计,提供一个支持RPC调用的reader:

    def dist_reader(trainerid):
        return data
    
    • trainerid:
      每个trainer启动时会拥有一个trainerid,reader根据trainerid来决策返回什么数据
    • RPC:
      trainer通过RPC的方式远程调用reader,获取训练数据。
    • 线程安全:
      dist_reader会同时被多个trainer进行调用,必须保证线程安全
  • 数据分片

    在数据接入层里,我们需要自定义数据分片的方式(可以在Paddle中提供一些常用的分片方式),一般的理解,trainer读取到的数据可以通过:traienr_count(trainer进程数量),trainerid(当前请求数据trainerID)来确定,以读取一个大文件举例:

    queue_list = []
    trainer_count = 4
    queue_list = [Queue.Queue(maxsize=10) for i in xrange(trainer_count)]
    
    def load_file():
        global trainer_count
        global queue_list
        line_count = 0
        with open("data.txt", "r") as f:
            for line in f:
                line_count += 1
                queue_list[line_count%trainer_count].put(line)
    
    t = threading.Thread(target=load_file, args=())
    t.start()
    
    def dist_read(trainer_id):
        return queue_list[trainer_id].get()
    
  • 结构图
    picture1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

实际应用中,用户会根据自己的需求分片数据么?这里不太了解。目前只考虑了直接使用HDFS的的分片机制,trainer就近寻找HDFS分片好的数据副本。

Copy link
Contributor

@helinwang helinwang Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero 实际应用中,用户上传了一个大的数据集到云上,默认情况咱们集群无法知道这个数据是怎么存的,也就不知道怎么分片。需要用户以某种方法指定。(比如训练图像分类的时候,要是用户指定了一个列表文件,每一行是一个图像的相对路径以及label,集群就能自动根据行数来分片)
另一种方法,是先让用户把数据全部提供给集群一次,集群把数据存成自己的格式。因为是自己的格式,就知道怎么分片,并且可以把格式设计成对顺序读取有利的格式、甚至设计成支持hdfs自动分片的格式。这里一种实现方法。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 我要是没理解错的话,这个方法是单点读数据再通过rpc serve数据。感觉这个可以是一种读数据的方法,不过要是把它作为读数据的默认方法,我有点担心网络性能:那个读数据的单点很容易成为瓶颈。

Copy link
Contributor

@Yancey1989 Yancey1989 Apr 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang 不好意思回复晚了,后来和 @typhoonzero 线下讨论的时候也提到这个问题了,一种理想的情况是分布式存储部署在计算节点之中,Trainner启动在数据片所在的节点上(或是同一交换机下的节点),但是这么做需要直接读取分布式存储的元数据信息,可能无法同时支持多种分布式存储。我再想一下,然后更新这个doc吧:)

生产环境中的训练数据集通常体积很大,并被存储在诸如Hadoop HDFS, Ceph, AWS S3之类的分布式存储之上。这些分布式存储服务通常会把数据切割成多个分片分布式的存储在多个节点之上,而多个trainer通常也需要预先完成文件的切割。但通常的方法是从HDFS上将数据拷贝到训练集群,然后切割到多个trainer服务器上,如图(Mount/Copy):
Copy link
Contributor

@gongweibao gongweibao Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

预先拷贝数据,如果数据量比较大,trainer节点就变成了有状态的重节点,不利于trainer的迅速启动。
是否采用master分片,分发数据handle, trainer拉取数据,在计算的过程中cache需要的部分,把网络流量隐藏在计算过程中会更好一些?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是现状的描述,设计参考图下面的描述。


<img src="images/trainer_data.png"/>

考虑到HDFS实际上已经完成了数据切割的任务,而且如果存在前置的数据预处理任务(Map-Reduce或Spark SQL),这些任务的输出也都存放于HDFS之上,则trainer可以直接调用HDFS LowLevel API,从元数据节点获得每个数据分片存储的位置,直接获得分片。

***注:每个数据分片保存多个mini_batch***

进一步优化,trainer可以寻找在物理网络拓扑中离自己最近的一个分片副本获取数据。

trainer和训练数据分片的均衡:
* 当trainer >= 数据分片:
trainer个数和数据分片个数相同时,可以获得最高的吞吐量。当trainer个数再大于分片数量时,必然有Trainer获取不到数据分片,处于等待状态。但对整体任务运行没有影响,等待的trainer也会消耗很小的资源。

<img src="images/more_trainer.png"/>

* 当trainer < 数据分片
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个可能性不是特大,这本书里的figure1.8总结了一下常见的数据集尺寸。10^4条样本已经算很小的数据集了。小数据集分片不多,但一般也不需要大量机器训练。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

明白了。这个先保留?毕竟无法确定使用时用户究竟会怎么配置?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯好的。

每个trainer负责多个数据分片,轮询方式完成一个分片训练之后开始下一个分片。

<img src="images/less_trainer.png"/>

## 故障恢复
在通用集群上运行的应用和任务,通常需要有能够自动伸缩的能力,这样在在线集群进行扩容时,可以适当的减小训练任务的资源(进程数/并发数),而不需要直接停止训练任务,修改参数后重新提交任务。

然而对于常见的在线服务(比如Web服务,RPC服务等),是可以“无状态”伸缩的,即扩容和缩容只需要增删对应的节点,集群能力就可以自动伸缩,Web服务的每个节点不会维护自身的状态变化和自身的数据,这些数据通常会借由外部的存储或服务完成,如MySQL,Redis等。而对于训练任务来说,每个parameter server都需要保存状态(mini-batch id)和数据(parameters),在增删节点的时候都会涉及到数据重新分布(re-sharding)和处理数据同步的问题。

用户只要根据实际训练任务场景,配置parameter server和trainer的初始节点个数,最大节点个数和最小节点个数,模型副本个数,是否开启检查点等配置项,即可配置并启动一个可以容灾的训练集群。具体的过程如下:

1. 配置parameter server和trainer的初始节点个数、最大节点个数、最小节点个数、模型副本个数、是否开启检查点等配置以及训练任务相关配置。
1. 启动parameter server和trainer,每个实例会在etcd中注册一个临时节点。这样当某个parameter server或trainer失效是,etcd中的节点会反应这个示例的状态。每个parameter server在所有的parameter server上会使用etcd watcher监听节点的变化状态,已完成后续处理。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对etcd不是特熟悉,"etcd中的节点会反应这个示例的状态"是不是指etcd的lock是lease方式实现的(我的猜测),所以能够知道parameter server或trainer的实例的状态(挂了没有)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etcd中可以创建一个节点(文件?)并设置一个较短的TTL过期时间。客户端会不断的写入这个节点并更新TTL,这样如果客户端故障(挂掉),这个节点会很快消失,用于判断客户端的存活状态。和心跳类似。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

明白了,谢谢!

1. parameter server如果开启了检查点,则先判断是否已经存在本地检查点快照数据,如果有,则从快照数据中加载状态和数据,并开始提供服务。如果没有则执行初始化启动步骤。
1. 提交用户定义的深度学习网络(topology),并根据网络中参数完成pre-sharding,将参数block哈希到512或1024个slot中,每个slot即为一个参数分片。根据实际存在的parameter server个数,将slot和parameter server完成对应的映射,使slot可以平均存储在这些parameter server上。
1. parameter server开始监听端口并接收数据。每次接收到数据,都使用两段式提交方式同步到所有的副本。如果需要存储检查点,则在同步所有副本之后,保存检查点。
1. 当故障发生后,parameter server会收到etcd发送的watcher信号,此时将暂停trainer的训练(此时要检查最后一次更新的mini_batch id,如果处于不同步状态,需要执行rollback),执行re-sharding步骤:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

能否举个例子,什么故障发生,以及watcher信号的内容是什么。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

1. 根据现有存活的parameter server的个数,找出丢失master分片的参数slot,重新标记成为master,然后确保集群中一个分片只选择出一个master。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

什么是“丢失master分片的参数slot”?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经修改这段描述。

2. 重新分布每个slot,将slot平均分布在所有parameter server上,保证负载均衡。
3. 重新开始trainer的训练。

新增节点的方法类似,此处不再赘述。

## 动态扩容/缩容
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

动态扩容第一个貌似版本不需要支持。咱们可以先把前面的讨论清楚,实现第一个版本,之后有空了再讨论这些。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

虽然故障恢复可以提供任意时刻的节点新增和删除仍然可以保证任务正常运行,但通常这样是比较暴力的。为了能graceful的关闭多个节点,parameter server需要提供对应的API接口:

```python
def resize(n):
# do resize
return success
```

接口完成先发送信号暂停训练任务,re-shard数据分片,然后重新开启训练。这样可以避免程序bug导致的数据不同步问题出现。

## 性能考虑
由于每次数据提交都需要完成分片同步,而且在每个pass之后执行检查点保存,必然会带来parameter server性能下降。可以根据不同的场景配置不同的容灾方案。

* 测试任务/极短训练任务:如果训练任务在几十分钟或小时级别可以运行完成,可以考虑不开启副本也不开启检查点。
* 短期训练任务/测试任务:训练任务运行时间如果在数小时或数天范围,可以考虑只使用一个副本(每个slot只保存一份),并开启检查点。在这个时长内出现不可恢复的硬件故障的概率极低。
* 大型训练任务:训练时间以周或月为单位。建议开启多个副本和检查点。这样可以在任意一个pass停止任务,并重新从这个pass开始训练。或者在通用集群运行时,可以考虑动态扩容和缩容。

## 实现考虑
由于两阶段提交和数据备份同步、选举部分实现比较复杂,可以考虑使用一些开源库函数,比如2pc,raft库等,后期在优化过程中逐步替换。


## TODO:
All-Reduce和Ring的不同设计考虑
Binary file added doc/design/images/arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/less_trainer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/more_trainer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/replica.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/trainer.graffle
Binary file not shown.
Binary file added doc/design/images/trainer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/trainer_data.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/design/images/two_phase_commit.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.