Skip to content

Parallel Computing for Machine Learning(2月14日)

lirui edited this page Feb 14, 2021 · 1 revision

异步梯度下降 (Parameter Server)

从理论收敛率来看,异步算法比同步算法要慢,需要更多的迭代次数,但是具体实现时,异步算法避免了同步造成的时间上的浪费,所以实际上异步算法要比同步算法更快一些

map reduce不能实现异步算法

Parameter Server还是Client-server架构,有多个节点,其中一个或几个节点作为server,用来协调其它的节点,其它的节点用来做worker,用来做计算,计算几乎都是worker在做,worker来做计算,server来更新梯度参数,worker和server之间是可以通信的,server把最新的模型参数发给worker,worker将计算出来的梯度发给server,看起来和map reduce还是很像的,主要区别在于同步和异步

性质:

1、是Client-server架构,这一点与map reduce是一样的

2、通信是message passing,这一点和map reduce也是一样的

3、区别在于parameter server是异步的,而map reduce是bulk synchronous

建议用Ray这个开源系统,Ray:A distributed framework for emerging AI application OSDI 2018

Map reduce 计算的时间是由最慢的那个worker决定,加速比很低,时间会浪费在等待中

parameter server,一个worker完成了工作就可以和server进行通信,立刻进行下一轮计算,由于没有同步,这些worker无需等待其它worker完成计算,用异步有很明显的好处,worker几乎不会空转,整个系统的运行效率非常高

异步梯度下降:

和Map reduce一样,还是做数据并发,将数据划分到节点上,每个worker有一部分数据样本,假设m个worker节点,数据样本被划分成m份,每个节点上有一份数据

如果用Ray或类似的软件来实现异步梯度下降,需要给worker端的计算和server端的计算写代码,首先是worker端,每个worker都不停重复以下操作:

1、向server发送请求,要最新的模型参数w,这需要一次通信

2、worker在本地根据w算出本地的梯度gi向量

3、将gi向量发送给server,这又是一次通信

server端的运算,

1、server监听worker发来的消息,

2、如果worker想要参数w,就立刻将w发给worker,

3、如果worker传来梯度gi,server就立刻用gi来更新参数w

server端做这样一个随机梯度下降,这里α叫做步长或者学习率,learning rate

异步算法经典:Hogwild:A lock-free approach to parallelizing stochastic gradient descent NIPS 2011

从实际效果来看,异步算法远比同步算法要快

从理论上来看,异步算法的收敛要比同步算法要慢,异步算法需要更多的迭代次数才能收敛,

异步算法还是有一些限制的,不会那么完美,比如说出现一个worker远比其它worker要慢,收敛就会受影响,

举例:假如3个worker在to时刻同时开始计算,在t0时刻拿到的都是t0时刻的参数w,worker1和worker2都能很正常完成计算,然后将结果发送给server,然后向server索要最新的参数,server收到梯度后也会更新server端的参数,到了t1这个时刻,server端的参数已经更新了好多次了,但是worker3碰巧运行不正常,变得非常慢,1号和2号都更新了好几轮了,参数也已经更新了好几次了,3号这个时候才算出一个梯度,3号参数是基于t0时刻算的,现在参数已经完全不一样了,所以3号参数算出的梯度已经过时了,没用了,如果这个时候server拿3号算出的梯度更新参数,只会让参数变得更差,所以异步算法的实现是有要求的,这些worker都必须比较稳定,worker可以稍微快那么一点点,也可以稍微慢那么一点点,但是假如有worker出现慢很多倍的情况,收敛就会出问题

Decentralized Network 去中心化的网络

Peer to Peer,点对点,这种架构没有server,所有的节点都是worker

特点:

1、点对点架构,没有server

2、message-passing方式通信

3、节点只跟邻居节点通信

编程需要写每个节点怎么做计算和通信

和之前两种编程方法一样,都是data parallelism,数据被划分到节点上,每一个节点都只有一部分数据,节点之间彼此看不到对方的数据,每个节点都有自己本地的数据,每个节点都有对模型参数w的一个copy,但是每个节点上的w都不太一样,直到最后它们才能收敛到相同的地方去

每个节点独立重复以下四个步骤:

1、用节点本地的数据和本地的参数wi来计算本地的梯度,

2、需要邻居节点的参数wk,这一步需要通信

3、将自己的参数和邻居节点的参数做加权平均,用这个加权平均做自己新的参数

4、这个加权平均的参数保证了大家的参数都会收敛到相同的参数上去,节点本地做一次梯度下降,用来更新自己的参数,

去中心化的梯度下降,随机梯度下降都是可以收敛的

去中心化网络构成了一个图,收敛率和图的状况有关,图的连接越紧密,算法收敛越快,如果这个图是个完全图,算法收敛会非常快,如果这个图不是强连接的,也就是说这个图可以拆成两个部分,那么算法几乎根本就不会收敛

总结:

1、机器学习现在用大模型大数据,因此计算量很大,wall-clock runtime很长,并行计算可以降低这个值

2、方法1、在一个主机上插多个处理器,这样最简单,但是一个主机上插不了太多处理器;方法二,很多节点,每个节点有多个处理器,节点协同工作,一起计算

3、重要概念:通信——共享内存和信息传递;系统架构:client-server和peer to peer;同步和异步;并发:数据并发和模型并发,模型并发是模型参数划分到m个worker上,每个worker都有全部的数据,但只有部分的模型参数

4、三种编程模型:MapReduce:Message passing、Client-server,synchronous;Parameter Server:Message passing,client-server,and asynchronous;Decentralized:Message passing,Peer to Peer,Synchronous or asynchronous

5、并行计算 vs 分布式计算 可以认为分布式计算就是一种并行计算,分布式计算本身就是并行的,有很多处理器同时在工作,区别在于,两者并非泾渭分明,没有明显的界限,在学术界并没有共识,在HPC人的眼中,如果节点都放在一个地方,用网线连起来,这就是并行计算,不叫分布式计算,如果节点不在一起,就可以认为是分布式计算,在机器学习人眼中,更宽泛一些,如果数据或者模型被划分到多个节点上,这就叫分布式计算,假如计算都是在一个节点上完成,即使这个节点上有很多处理器,也不能叫分布式计算,只能叫并行计算

机器学习的人比较喜欢分布式计算这个词

如何用TensorFlow做并行计算?

TensorFlow的distribute库里面实现了6种并行计算的框架,TensorFlow称为Strategies,用户需要根据硬件来选择最合适的框架,其中最简单的是MirroredStrategy,是用于一台机器上插了多个GPU,很类似Map-Reduce编程模型,

其它五种:TPUStrategy、MultiWorkerMirroredStrategy、CentralStorageStrategy、ParameterServerStrategy、OneDeviceStrategy

1、Parallel Training CNN on MNIST

安装TensorFlow的GPU版本

Find Devices

from tensorflow.python.client import device_lib

device_lib.list_local_devices()

For example, my server has 1 CPU and 4 GPUs

/device:CPU:0

/device:GPU:0

/device:GPU:1

/device:GPU:2

/device:GPU:3

如果要用TensorFlow中的Mirrored Strategy训练神经网络,代码如下;

from tensorflow import distribute

strategy = distribute.MirroredStrategy()

后面要用Strategy来做并行计算,可以在MirroredStrategy后面的括号中指定用那几块GPU,如果不指定的话,系统会把能用到的GPU全部都用到,

#number of processors

m = strategy.num_replicas_in_sync //得到GPU的数量

print('Number of devices: {}'.format(m))

Number of devices:4

用MNIST数据来训练CNN

Load and Process MNIST Data

import tensorflow as tf

def scale(image, label): //自己定义一个函数,对图像和标签进行预处理

image = tf.cast(image, tf.float32) //图片变成32位浮点数

image /=255

return image,label

需要安装tensorflow_datasets这个库,然后导入这个库,这个库提供了常用的数据集,

import tensorflow_datasets as tfds

datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True) //如果要用mnist数据集,在这里指定就可以了,

mnist_train = datasets['train'].map(scale).cache() //然后是对数据做预处理,需要调用前面定义的scale函数,

mnist_test = datasets['test'].map(scale)

接下来需要将数据划分成很多batch

load and process MNIST Data

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 128 //GPU每次用128个样本来计算随机梯度

BATCH_SIZE = BATCH_SIZE_PER_REPLICA*m

data_train = mnist_train.shuffle(BUFFER_SIZE).batch(BATCH_SIZE) //将刚刚得到的数据做shuffle操作,将它随机打乱,shuffle函数中可以指定Buffer Size,对于小数据集,不设置buffer size也可以,用buffer size的原因是大数据在内存中存放不下,只能将一部分数据放在内存中,所以要指定buffer size,batch函数目的是将数据划分成很多个batch,batch size是每个batch的大小,随机梯度中用太大的batch size不好,会影响在测试集上的准确度,

data_test = mnist_test.batch(BATCH_SIZE) //测试集只需要用batch函数划分成很多个batch就好了,不需要用shuffle函数随机打乱

训练和测试的时候会用到划分过的数据

数据处理好了,并行计算也设置好了,现在用keras搭一个卷积网络

Build Neural Network

from tensorflow import keras //导入keras神经网络

from tensorflow.keras.layers import Conv2D, MaxPooling2D,Flatten,Dense //导入各种层

with strategy.scope(): //用并行计算,唯一的区别在这一行,想做并行计算,多做这么一行就可以了

 model = keras.Sequential()  //搭建神经网络的方法与不用并行计算的方法完全一样,初始化一个sequential model,然后将所有的层逐个搭上去就可以了

 model.add(Conv2D(32,3,activation='relu', input_shape=(28,28,1)))

 model.add(MaxPooling2D())

 model.add(Conv2D(64,3,activation='relu'))

 model.add(MaxPooling2D())

 model.add(Flatten())

 model.add(Dense(64, activation = 'relu'))

 model.add(Dense(10, activation = 'softmax'))

可以用model.summary输出神经网络的概要,不论用并行计算,还是不用并行计算,输出的概要都完全一样,

用karas搭好了神经网络,下一步就是编译这个模型,用model.compile函数

with strategy.scope():

 model.compile(loss='sparse_categorical_crossentropy',      //指定损失函数是crossentropy

               optimizer=keras.optimizers.RMSprop(learning_rate=1E-3),  //优化算法是RMSprop

               metrics=['accuracy'])    //评价标准是accuracy

然后用model.fit(data_train, epochs=10)拿划分成batch的训练数据来拟合模型,这一步跟单CPU或单GPU一模一样,用一台机器上的多块GPU非常容易,只需要加上一句with strategy.scope():就可以了,其余和单GPU一模一样

训练结束之后可以在测试模型上评价模型的loss和accuracy

eval_loss, eval_acc = model.evaluate(data_test)

将测试数据data_test传递给model.evaluate函数,会返回loss和accuracy,这一步跟单CPU或单GPU一模一样,没有任何区别,

以上就是用minorred Strategy来并行训练神经网络

接下来是minorred strategy的技术原理,尤其是怎么样让GPU通信,

minorred strategy的主要原理是Ring All-Reduce,与Map Reduce中的Reduce函数有些区别,

Reduce与All-Reduce的区别:Reduce的目的是让server获得reduce的结果,Map-Reduce中用到了Reduce函数,reduce可以做求和、求平均以及计数等操作

如果做求和操作,server就会得到求和的结果,举个例子,有4个worker,它们分别输出7,3,4,1,这四个数字,如果执行求和这个Reduce操作,worker就会把结果发给server,server就会知道7+3+4+1=15,但是reduce之后,这4个worker并不知道求和的结果是15,如果我们用All-Reduce操作,那么reduce之后,这4个worker就都知道求和的结果是15了,因此,区别就是reduce,只有server知道结果,all-reduce,不仅server知道结果,worker也知道结果

有不同的办法来实现All-Reduce,比如可以是reduce+broadcast的结合,在reduce之后做一个broadcast操作;还有一种方法是,all-to-all Communication,没有server只有4个worker,直接让workers去做all to all Communication,worker1的结果是7,它将7发送给其余3个worker,同样的道理,其它worker也做同样的操作,每个worker都知道其余所有worker的输出了,大家就都可以分别算出结果了。这种方式可行,但是非常没有效率,通信量太大

更高效的通信方式是ring all-reduce。它的网络连接方式是一个环形,这样不需要单独的一个GPU或CPU作为server,这4个GPU独立做计算,它们各自用各自的数据算出各自的随机梯度,它们算出各自的随机梯度之后,我们需要将它们的随机梯度加起来,拿四个梯度的加和来更新模型参数,为了求4个随机梯度之和,需要做all-reduce,在全部的GPU都完成计算之后,通过ring all-reduce转一两圈,每个GPU就有了4个梯度的加和,注意算法必须是同步算法,因为all reduce需要是Synchronization,做reduce之前,算法需要同步,即要等待所有的GPU计算出它们的梯度,假设当前时刻它们已经计算出梯度了,这个时候可以开始做all reduce操作了,目的是将它们相加得到g,并且需要将g发送到每一个GPU上,让每一个GPU都得到g的一个copy,一种简单但是低效的算法,tensorflow并不会使用这种方法,仅方便理解,GPU0将向量g0发送给GPU1,GPU1得到g0之后可以将g0和g1加起来,这样GPU1就有了g0+g1,GPU1将g0+g1发送给GPU2,GPU1不是将g0和g1分别发出去,它发送的是两者的加和,只是发送一个向量,这样GPU2就可以计算出g0+g1+g2,GPU2再将g0+g1+g2的加和发送给GPU3,GPU3就可以得到g0+g1+g2+g3,到了这步,我们已经得到了4个梯度的加和,记作g,已经达到了第一个目的,计算加和,如果是做reduce,到这一步就已经结束了,但是all-reduce要求每个GPU都得到一个g,现在只有GPU3有这4个梯度的加和,下一步是将g传播到其它3个GPU,现在GPU3将g发送给GPU0,这样GPU0也有了梯度g,GPU0再将梯度发送给GPU1,GPU1也有了g,最后再将g发送给GPU2,到这一步,4块GPU全都得到了g,现在已经完成了all-reduce。大家全部都做梯度下降,4块GPU做完全相同的操作,得到的新的参数一样。

缺点:这个环形网络有4个通道,当GPU0给GPU1传递的时候,其余3条通道都在闲置的状态,任意时刻,4条传输通道只有一条在使用

Communication time = md/b (ignore latency) m:number of GPUs; d:number of parameters; b:network bandwidth,通信时间正比于GPU数量m,GPU越多,通信代价越大,

另一种更加复杂,但是更高效的方法,每块GPU都计算出各自的向量g0=[a0;b0;c0;d0];g1=[a1;b1;c1;d1];g2=[a2;b2;c2;d2];g3=[a3;b3;c3;d3],可以将每个梯度再切成4块,用a、b、c、d来表示这4块,切成4块是因为有4块CPU,第一轮通信,GPU0可以传输给GPU1,GPU1可以传输给GPU2,GPU2可以传输给GPU3,GPU3可以传输给GPU0,环状的通信网络,这四条通道可以同时使用,GPU0将a0发送给GPU1,与此同时GPU1将b1发送给GPU2,GPU2将c2发送给GPU3,GPU3将d3发送给GPU0,这四条通信是同时进行的,第二轮通信,GPU1将a0+a1的加和发送给GPU2,GPU2将b1+b2的加和发送给GPU3,GPU3将c2+c3的加和发送给GPU0,GPU0将d0+d3的加和发送给GPU1,这时GPU2就可以算出a0+a1+a2的加和,第三轮通信,GPU2将a0+a1+a2的加和发送给GPU3,GPU3将b1+b2+b3的加和发送给GPU0,GPU0将c0+c1+c2的加和发送给GPU1,GPU1将d0+d1+d3的加和发送给GPU2,GPU3得到了所有a的加和,第四轮通信,GPU3将所有a的加和发送给GPU0,GPU0将所有b的加和发送给GPU1,GPU1将所有c的加和发送给GPU2,GPU2将所有d的加和发送给GPU3,继续做这种通信,第五轮通信,第六轮通信结束后,所有GPU都有了这4个梯度的加和

所有通道都被利用

Communication time:d/b d:number of parameters. b:network bandwidth,通信时间与m无关

Clone this wiki locally