-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add design doc: concurrent data transfer and kernel execution #7276
Conversation
doc/design/Double_Buffering.md
Outdated
|
||
buffer_size = 2 | ||
for pass_id in range(5): | ||
for i in range(buffer_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From this API, we can not see that stage_program
will run in a separated thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CUDA can use stream mechanism to overlap kernel execution with data transfers.
https://devblogs.nvidia.com/parallelforall/how-overlap-data-transfers-cuda-cc/
The description here maybe not very clear. I will refine this description.
doc/design/Double_Buffering.md
Outdated
for pass_id in range(5): | ||
for i in range(buffer_size): | ||
exe.run(fluid.stage_program) | ||
for data in train_reader(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only two for
is needed:
for i in range(buffer_size):
for data in train_reader():
exe.run(fluid.stage_program, feed=feeder.feed(data))
for i in range(buffer_size):
exe.run(fluid.default_main_program())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx! I have refine the code.
The process can be described follows:
- The
Staging area
should be warmed up at first. - Then getting data from the stage area, but not the data set(on the cpu side), to do forward calculation, meanwhile the new data is copied from the data set to the stage area.
- Finally, when the data set is empty, the program will continue getting data from the stage are to do forward calculation, until the stage area becomes empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that default_main_program()
run twice, which means run forward and backward twice in each mini-batch. Then the final step should run stage_program
instead of main.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated this doc, maybe it is clearer than before.
b6a4adb
to
de0de43
Compare
b58c991
to
4df9cb5
Compare
4df9cb5
to
1a722b0
Compare
1a722b0
to
e3dd104
Compare
6688915
to
23f44e6
Compare
23f44e6
to
4515f34
Compare
To support the above description, we need to define a new class: `Channel`. Here, we borrow the concept of [`Channel`](https://www.golang-book.com/books/intro/10#section2) in the go language. | ||
|
||
``` | ||
template <typename T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/operators/detail/simple_block_queue.h you can reuse this code or change the name. It now acts as a "channel" internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to set the capacity for this Queue?
places = get_places() | ||
channel_list = create_channel_list(name="data_buffer") | ||
with parallel.for(places) as for_loop: | ||
cur_channel = create_channel(channel_list, for_loop.i, channel_size=2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallel.for
is an operator that execute one block concurrently, so within the with
block, there should be code to create operators?
doc/design/Double_Buffering.md
Outdated
using MetaType = LoDTensor; | ||
using BufferElement = std::vector<MetaType>; | ||
|
||
class Buffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffer could be just a Variable.
doc/design/Double_Buffering.md
Outdated
|
||
Buffer* GetBuffer(const platform::Place place, const size_t capacity, | ||
const size_t bytes_limit) { | ||
static std::map<platform::Place, Buffer*> buffering; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid global variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated this design doc, your comments have been fixed.
doc/design/Double_Buffering.md
Outdated
In general, the model training is divided into two steps: data preparation and model calculation. Because training a deep learning model needs a lot of computing resources. If using CPU, it will take a long time to complete an iteration. Training a good model usually takes tens of thousands of iterations. Obviously, this is unacceptable. Therefore, we usually choose the accelerator (e.g. GPU) for model training. But using accelerator for training model brings a new problem. Because our training data is in CPU, before the accelerator training model, it need to wait for the data to be copied from the CPU side to the accelerator. So the time to train the model on the accelerator is the sum of the time of loading the data, the time of data transfer, and the time of the accelerator calculation. Therefore, although the accelerator's computation speed is very fast, sometimes the data transfer time is very long, which may cause the accelerator training model to be slower than the direct CPU training model. | ||
|
||
## Problem | ||
The data transfer between host and device is synchronized, by default. If the accelerator is `GPU`, a time line for the execution of traing model on `GPU` is shown in the following diagram. This is just a schematic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To use async, to fully utilize GPUs
To support the above description, we need to define a new class: `Channel`. Here, we borrow the concept of [`Channel`](https://www.golang-book.com/books/intro/10#section2) in the go language. | ||
|
||
``` | ||
template <typename T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to set the capacity for this Queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tonyyang-svail can you please take a look at this?
for_loop.input(label)], out=cur_channel) | ||
|
||
main_program = Program() | ||
with program(main_program): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/fluid/tests/test_parallel_op.py#L79 for current usage of parallel_do
op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a piece of pseudo-code that describes data transferring and model training to run concurrently. The code structure is only a logical structure of concurrent execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we discuss on this comment before this is merged?
|
||
``` | ||
... | ||
concurrent_program = Program() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expose the channel concept to the user?
We can do the following, and use channel as the implementation, without exposing it to the user:
main_program = Program()
with program(main_program):
# automatically create a separate thread for data loading.
# automatically infer the data destination is GPU place or CPU place.
image = fluid.layers.data_layer(..., buf_size=100)
label = fluid.layers.data_layer(..., buf_size=100)
with parallel.for(places) as for_loop:
y_predict = fluid.layers.fc(input=image, size=1, act=None)
cost = fluid.layers.square_error_cost(input=y_predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
....
for pass_id in range(PASS):
for data in train_reader():
executor.run(main_program, fetch=[...])
Please compare the Python code in this PR and the above code, feels like exposing the channel concept to the user only adds verbosity to the program.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expose the channel concept to the user?
I think we should have a deeper discussion about this.
automatically create a separate thread for data loading.
Currently, data loading is done on Python side, and we do not consider this in this PR.
This PR mainly analyzes the concurrent execution of data transferring from the CPU end to GPU transmission and computing model at the GPU.
feels like exposing the channel concept to the user only adds verbosity to the program
Exposing the channel concept maybe not appropriate, and some user may never use golang
. But this can make the program more flexible.
template <typename T> | ||
class Channel { | ||
private: | ||
using ChannelElement = std::vector<T>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ChannelElement
should be is a point of T
, since we could sen/recv an element to/from a Channel, instead of sen/recv a vector of elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Replacing ChannelElement
to T
will make the code more general.
When I started writing this doc, I thought that we should put a batch data
into Channel
. For image classification task, the batch data
includes data
and label
.
image = fluid.layers.data_layer(...) | ||
label = fluid.layers.data_layer(...) | ||
places = get_places() | ||
channel_list = create_channel_list(name="data_buffer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm confusion about the Python code:
channel_list = create_channel_list(name="data_buffer")
Does it means create a list of channel?
For another, I think a Channel also need a type, and if we allow user to create a Channel in Python, how to define the type of a Channel, does it should be according with proto:: VarDesc::VarType ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it means create a list of channel?
Yes, on the condition of single machine multi-card, every GPU corresponds to a channel.
does it should be according with proto:: VarDesc::VarType ?
We may need to create new types.
executor.run(main_program, fetch=[...]) | ||
|
||
``` | ||
In Python code, we define two `program`, `concurrent_program` used to send data into `Channel` and `main_program` used to get data from the `Channel` and execute training. If you're familiar with [`Goroutine`](https://www.golang-book.com/books/intro/10#section1) in the go language, you'll find that `main_program` and `concurrent_program` just like two `Goroutine`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we make main_program
and concurrent_program
like two Goroutine, I think we need to declare the code block will be run as a Goroutine, and pass the channel as a argument, so for my personal opinion, the Python code would look like:
chan = fluid.channel.make(type=var.LoDTensor)
with fluid.go(concurrent_program, chan):
image = fluid.layers.data_layer(...)
label = fluid.layers.data_layer(...)
places = get_places()
with parallel.for(places) as for_loop:
fluid.channle.send(chan, data=[for_loop.input(image), for_loop.input(label)])
with fluid.go(main_program, chan):
places = get_places()
with parallel.for(places) as for_loop:
image, label = fluid.channel.recv(chan)
y_predict = fluid.layers.fc(input=image, size=1, act=None)
cost = fluid.layers.square_error_cost(input=y_predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
I think fluid.channel.make(type=var.LoDTensor)
should be inside of fluid.go
.
And I will update this PR.
with fluid.go(concurrent_program, chan_list_name): | ||
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST) | ||
|
||
with fluid.go(concurrent_program, chan_list_config) as go: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we look at the go
key word in Go, it can be used to create a goroutine running any function:
go foo()
go bar(a, b, c)
Here we use chan_list_config
as an argument, coupling channel with goroutine, is that necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I wrote this doc, I thought that one program corresponded to one goroutine, and chan_list_config
may be used by more program, not just concurrent_program
and main_program
. So I regard chan_list_config
as an argument of fluid.go
. I am not very familiar with the go language. Maybe this design is not appropriate. I will continue to update this part, and if you have some advice, please tell me, many thanks.
with fluid.go(concurrent_program, chan_list_name): | ||
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST) | ||
|
||
with fluid.go(concurrent_program, chan_list_config) as go: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this syntax valid? concurrent_program
seems not defined anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is just pseudo code.
image = fluid.layers.data_layer(...) | ||
label = fluid.layers.data_layer(...) | ||
chan_list = fluid.channel_list.make(type=var.CHANNEL_LIST, name=chan_list_name) | ||
places = get_places() | ||
with parallel.for(places) as for_loop: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that we don't need to read data concurrently, there is just one data source (pair(image, label)
), concurrently reading them doesn't seems to help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this design doc, I want to overlap the time of data transfer(from CPU to GPU) and model training on GPU. As for data reading, there is no consideration here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks. I think I understand what the code here is trying to do: load tensor on to different GPU memory for different channels. In the code below,
image = fluid.layers.data_layer(...)
label = fluid.layers.data_layer(...)
places = get_places()
with parallel.for(places) as for_loop:
chan = fluid.channel_list.get_channel(go.chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i))
fluid.channle.send(chan, data=[for_loop.input(image), for_loop.input(label)])
Do you mean parallel.for(places)
means each for block runs on the given GPU place? There is only send
in the for block, but data is already read to CPU place, there is no code that copies the data from CPU place to the GPU place of each for block.
Btw, I think parallel.for should be unrelated to where each block is running, it's just a concurrency syntax, we need other syntax to specify where each block is running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallel.for(places)
just like what ParallelDoOp
does, data copying can be done in parallel.for(places)
.
Paddle/paddle/operators/parallel_do_op.cc
Lines 105 to 106 in 23f5c18
SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places, | |
Inputs(kInputs)); |
The difference is that
parallel.for(places)
does not need merge output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chengduoZH Thanks for your help! Now I understand the situation better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@helinwang Thanks for your review!
places = get_places() | ||
with parallel.for(places) as for_loop: | ||
chan = fluid.channel_list.get_channel(chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i)) | ||
chan = fluid.channel_list.get_channel(go.chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe read data to one channel, and multiple consumer consuming from that channel (a single channel, not multiple channel) is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data reading has been done on Python side.
In the case of multi-GPU, since different GPU have the different address, each GPU should own one channel. So I use multiple channels here.
@@ -0,0 +1,93 @@ | |||
# Design Doc: Concurrent data transfer and kernel execution. | |||
Training deep learning model on GPU involves three stages: loading data from disk, copying the data from the CPU side to the GPU side, and executing training program on GPU. We need an efficient mechanism to take full use of hardware resources to make those stages executed concurrently. At present, Fluid uses producer-consumer mechanism at the python side to load data from disk. So this design doc mainly solves the time overlap of data transfer and kernel execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python ==> Python
|
||
``` | ||
template <typename T> | ||
class Channel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To implement CSP in Fluid, we need a channel type, which could work with our fluid.Select
op, and we want fluid.Select
depend on Linux's epoll system call for an efficient implementation. This part of work is still under design. So, I suggest to remove this section about channel design from this design doc, and make this doc more focused on how to overlap CPU-to-GPU data transfer and model update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will fix this for next commitment.
|
||
``` | ||
... | ||
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this program is the most interesting part of this doc, but I don't see the creation of a channel-typed variable like
ch = fluid.channel(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of multi-GPU, since different GPU have the different address, each GPU should own one channel. So I create a channel_list and all the channels are created in this chan = fluid.channel_list.get_channel(go.chan_list...)
.
... | ||
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST) | ||
|
||
with fluid.go(concurrent_program, chan_list_config) as go: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though about this for a while, now I believe that fluid.go
should start a thread to run a block in the current ProgramDesc, instead of a new ProgramDesc, because a ProgramDesc is a program, like a .go file, and Go's go statement doesn't run a .go file, but a lambda represented by a block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting!
But go
's block may be different from others' because, in my view, go
's block does not need gradient op. I don't know whether my view is right or not.
It would be more convenient to look here.