From edcfcad53064a201a923d0099dcd36cfa34a7c58 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Thu, 5 Apr 2018 16:00:13 +0800 Subject: [PATCH 01/26] init --- paddle/fluid/operators/lookup_table_op.cc | 3 + python/paddle/fluid/distribute_transpiler.py | 76 +++++---- python/paddle/fluid/layers/nn.py | 8 +- .../tests/book/dist_test/test_word2vec.env | 7 + .../tests/book/dist_test/test_word2vec.py | 150 ++++++++++++++++++ .../book/dist_test/test_word2vec_pserver.py | 28 ++++ .../book/dist_test/test_word2vec_pserver.sh | 9 ++ .../book/dist_test/test_word2vec_trainer.py | 28 ++++ .../book/dist_test/test_word2vec_trainer.sh | 9 ++ 9 files changed, 288 insertions(+), 30 deletions(-) create mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec.env create mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec.py create mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py create mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh create mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py create mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh diff --git a/paddle/fluid/operators/lookup_table_op.cc b/paddle/fluid/operators/lookup_table_op.cc index deabcdc99f819..b97f0c3d7ef62 100644 --- a/paddle/fluid/operators/lookup_table_op.cc +++ b/paddle/fluid/operators/lookup_table_op.cc @@ -93,6 +93,9 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker { "(boolean, default false) " "Sparse update.") .SetDefault(false); + AddAttr("is_distributed", + "(boolean, default false) distributed lookup table.") + .SetDefault(false); AddAttr("padding_idx", "(int64, default -1) " "If the value is -1, it makes no effect to lookup. " diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 9311fc9904eb7..252b85b51afd2 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -13,14 +13,13 @@ # limitations under the License. from __future__ import print_function + +import math + import framework -from framework import Program, default_main_program, default_startup_program, Parameter, Variable -import optimizer -from layer_helper import LayerHelper from distributed_spliter import * -import math +from framework import Program, default_main_program, Variable from . import core -import debuger class VarBlock: @@ -130,6 +129,27 @@ def split_dense_variable(var_list, return blocks +# TODO +# 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op +# 2. create inputs and outputs for prefetch_op +# 3. delete w of lookup_table, delete it's init_op in startup_program +# 4. create prefetch_block in pserver_program +# 5. create w in pserver_main_program +# 6. create init_op for w in pservev_init_program +# 7. optimize op did not need to change now +# 8. we only support one table parameter, if there are more than one +# lookup_table_op with the same table parameter, pserver will +# only have one lookup_table_op + + +def split_sparse_ids(): + pass + + +def get_prefetch_outputs(): + pass + + class DistributeTranspiler: def transpile(self, optimize_ops, @@ -137,7 +157,7 @@ def transpile(self, trainer_id, program=None, pservers="127.0.0.1:6174", - trainers=1, + trainer_num=1, split_method=round_robin): """ Transpile the program to distributed data-parallelism programs. @@ -174,8 +194,8 @@ def transpile(self, :type program: Program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string - :param trainers: total number of workers/trainers in the job - :type trainers: int + :param trainer_num: total number of workers/trainers in the job + :type trainer_num: int :param split_method: A function to determin how to split variables to different servers equally. :type split_method: function @@ -183,8 +203,8 @@ def transpile(self, assert (callable(split_method)) if program is None: program = default_main_program() - self.program = program - self.trainers = trainers + self.origin_program = program + self.trainer_num = trainer_num self.optimize_ops = optimize_ops # TODO(typhoonzero): currently trainer_id is fetched from cluster system # like Kubernetes, we should port this to use etcd later when developing @@ -250,10 +270,10 @@ def transpile(self, def get_trainer_program(self): # remove optimize ops and add a send op to main_program - self.program.global_block().delete_ops(self.optimize_ops) + self.origin_program.global_block().delete_ops(self.optimize_ops) # FIXME(typhoonzero): serialize once will fix error occurs when clone. - self.program.__str__() - return self.program + self.origin_program.__str__() + return self.origin_program def get_pserver_program(self, endpoint): """ @@ -284,8 +304,8 @@ def get_pserver_program(self, endpoint): type=v.type, dtype=v.dtype, shape=v.shape) - if self.trainers > 1: - for trainer_id in xrange(self.trainers): + if self.trainer_num > 1: + for trainer_id in xrange(self.trainer_num): var = pserver_program.global_block().create_var( name="%s.trainer_%d" % (orig_var_name, trainer_id), persistable=False, @@ -382,7 +402,7 @@ def __append_optimize_op__(op, block): attrs={ "OptimizeBlock": optimize_block, "endpoint": endpoint, - "Fanin": self.trainers + "Fanin": self.trainer_num }) pserver_program.sync_with_cpp() return pserver_program @@ -517,7 +537,7 @@ def _clone_var(self, block, var): def _append_split_op(self, program, gradblocks): # Split variables that need to be split and append respective ops add_suffix = False - if self.trainers > 1: + if self.trainer_num > 1: add_suffix = True var_mapping = self._create_vars_from_blocklist( program, gradblocks, add_trainer_suffix=add_suffix) @@ -608,9 +628,9 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint, return merged_var = \ pserver_block.vars[self._orig_varname(grad_block.name)] - if self.trainers > 1: + if self.trainer_num > 1: vars2merge = [] - for i in xrange(self.trainers): + for i in xrange(self.trainer_num): per_trainer_name = "%s.trainer_%d" % \ (self._orig_varname(grad_block.name), i) vars2merge.append(pserver_block.vars[per_trainer_name]) @@ -624,7 +644,7 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint, type="scale", inputs={"X": merged_var}, outputs={"Out": merged_var}, - attrs={"scale": 1.0 / float(self.trainers)}) + attrs={"scale": 1.0 / float(self.trainer_num)}) new_inputs[key] = merged_var elif key == "Param": # param is already created on global program @@ -660,7 +680,7 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint, new_shape = None if key in ["Param", "Grad", "LearningRate"]: continue - var = self.program.global_block().vars[opt_op.input(key)[0]] + var = self.origin_program.global_block().vars[opt_op.input(key)[0]] # update accumulator variable shape param_shape = new_inputs["Param"].shape new_shape = self._get_optimizer_input_shape(opt_op.type, key, @@ -673,8 +693,8 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint, new_inputs[key] = tmpvar # change output's ParamOut variable - outputs = self._get_output_map_from_op(self.program.global_block().vars, - opt_op) + outputs = self._get_output_map_from_op( + self.origin_program.global_block().vars, opt_op) outputs["ParamOut"] = new_inputs["Param"] optimize_block.append_op( @@ -686,8 +706,8 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint, def _append_pserver_non_opt_ops(self, optimize_block, opt_op): program = optimize_block.program # Append the ops for parameters that do not need to be optimized/updated - inputs = self._get_input_map_from_op(self.program.global_block().vars, - opt_op) + inputs = self._get_input_map_from_op( + self.origin_program.global_block().vars, opt_op) for varlist in inputs.itervalues(): if not isinstance(varlist, list): varlist = [varlist] @@ -700,8 +720,8 @@ def _append_pserver_non_opt_ops(self, optimize_block, opt_op): dtype=var.dtype, shape=var.shape) - outputs = self._get_output_map_from_op(self.program.global_block().vars, - opt_op) + outputs = self._get_output_map_from_op( + self.origin_program.global_block().vars, opt_op) for varlist in outputs.itervalues(): if not isinstance(varlist, list): @@ -814,7 +834,7 @@ def _get_lr_ops(self): find_ops = [] # find ops which output is lr var - block = self.program.global_block() + block = self.origin_program.global_block() for op in block.ops: if set(op.output_arg_names) & lr_vars: find_ops.append(op) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 3d13133bf25aa..abf39e15542ab 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -190,6 +190,7 @@ def fc(input, def embedding(input, size, is_sparse=False, + is_distributed=False, padding_idx=None, param_attr=None, dtype='float32'): @@ -240,8 +241,11 @@ def embedding(input, inputs={'Ids': input, 'W': w}, outputs={'Out': tmp}, - attrs={'is_sparse': is_sparse, - 'padding_idx': padding_idx}) + attrs={ + 'is_sparse': is_sparse, + 'is_distributed': is_distributed, + 'padding_idx': padding_idx + }) return tmp diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.env b/python/paddle/fluid/tests/book/dist_test/test_word2vec.env new file mode 100644 index 0000000000000..220e68a1a6966 --- /dev/null +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec.env @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +export PADDLE_INIT_PSERVERS="127.0.0.1" +export TRAINERS=1 +export POD_IP="127.0.0.1" +export PADDLE_INIT_TRAINER_ID=0 + diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py new file mode 100644 index 0000000000000..b4a01f6b880cd --- /dev/null +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py @@ -0,0 +1,150 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid as fluid +import paddle.v2 +import unittest +import os +import numpy as np +import math +import sys + + +def train(use_cuda=False, is_sparse=False, is_local=False): + PASS_NUM = 100 + EMBED_SIZE = 32 + HIDDEN_SIZE = 256 + N = 5 + BATCH_SIZE = 32 + IS_SPARSE = is_sparse + + def __network__(words): + embed_first = fluid.layers.embedding( + input=words[0], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') + embed_second = fluid.layers.embedding( + input=words[1], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') + embed_third = fluid.layers.embedding( + input=words[2], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') + embed_forth = fluid.layers.embedding( + input=words[3], + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') + + concat_embed = fluid.layers.concat( + input=[embed_first, embed_second, embed_third, embed_forth], axis=1) + hidden1 = fluid.layers.fc(input=concat_embed, + size=HIDDEN_SIZE, + act='sigmoid') + predict_word = fluid.layers.fc(input=hidden1, + size=dict_size, + act='softmax') + cost = fluid.layers.cross_entropy(input=predict_word, label=words[4]) + avg_cost = fluid.layers.mean(cost) + return avg_cost, predict_word + + word_dict = paddle.v2.dataset.imikolov.build_dict() + dict_size = len(word_dict) + + first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') + second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') + third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') + forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') + next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') + + avg_cost, predict_word = __network__( + [first_word, second_word, third_word, forth_word, next_word]) + + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) + optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) + + train_reader = paddle.v2.batch( + paddle.v2.dataset.imikolov.train(word_dict, N), BATCH_SIZE) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + feeder = fluid.DataFeeder( + feed_list=[first_word, second_word, third_word, forth_word, next_word], + place=place) + + def train_loop(main_program): + exe.run(fluid.default_startup_program()) + + batch_num = 0 + for pass_id in range(PASS_NUM): + for data in train_reader(): + avg_cost_np = exe.run(main_program, + feed=feeder.feed(data), + fetch_list=[avg_cost]) + if batch_num == 1: + return + if avg_cost_np[0] < 5.0: + return + if math.isnan(float(avg_cost_np[0])): + sys.exit("got NaN loss, training failed.") + batch_num += 1 + + raise AssertionError("Cost is too large {0:2.2}".format(avg_cost_np[0])) + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) + + +class W2VTest(unittest.TestCase): + def test_main(self): + train() + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py new file mode 100644 index 0000000000000..2581859a810ed --- /dev/null +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py @@ -0,0 +1,28 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import test_word2vec + +os.environ["GLOG_v"] = '3' +os.environ["GLOG_logtostderr"] = '1' + +os.environ["PADDLE_INIT_PSERVERS"] = "127.0.0.1" +os.environ["TRAINERS"] = "1" +os.environ["POD_IP"] = "127.0.0.1" +os.environ["PADDLE_INIT_TRAINER_ID"] = "0" + +os.environ["TRAINING_ROLE"] = "PSERVER" + +test_word2vec.train() diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh new file mode 100644 index 0000000000000..d1535eb6b5173 --- /dev/null +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +export GLOG_v=3 +export GLOG_logtostderr=1 + +source test_word2vec.env + +export TRAINING_ROLE=PSERVER +python test_word2vec.py diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py new file mode 100644 index 0000000000000..382de6e0a029c --- /dev/null +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py @@ -0,0 +1,28 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import test_word2vec + +os.environ["GLOG_v"] = '3' +os.environ["GLOG_logtostderr"] = '1' + +os.environ["PADDLE_INIT_PSERVERS"] = "127.0.0.1" +os.environ["TRAINERS"] = "1" +os.environ["POD_IP"] = "127.0.0.1" +os.environ["PADDLE_INIT_TRAINER_ID"] = "0" + +os.environ["TRAINING_ROLE"] = "TRAINER" + +test_word2vec.train() diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh new file mode 100644 index 0000000000000..4c37b3726ed9d --- /dev/null +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +export GLOG_v=3 +export GLOG_logtostderr=1 + +source test_word2vec.env + +export TRAINING_ROLE=TRAINER +python test_word2vec.py From 66ab88a8b0eb9024dbe1445da88a4a8afabcca3b Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Thu, 5 Apr 2018 21:57:27 +0800 Subject: [PATCH 02/26] add some check --- python/paddle/fluid/distribute_transpiler.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 252b85b51afd2..c767e2c52076c 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -268,6 +268,25 @@ def transpile(self, outputs={"Out": [orig_param]}, attrs={"axis": 0}) + # process lookup_table_op + # 1. check all lookup_table_op is distributed + # 2. check all lookup_table_op share the same table. + lookup_table_ops = [] + have_distributed_lookup_table_op = None + for op in program.global_block().ops: + if op.type == "lookup_table": + if have_distributed_lookup_table_op is None: + have_distributed_lookup_table_op = op.attr['is_distributed'] + # all lookup_table op's attribute is_distributed should be the same + assert have_distributed_lookup_table_op == op.attr[ + 'is_distributed'] + lookup_table_ops.append(op) + + if have_distributed_lookup_table_op: + table_param_name = lookup_table_ops[0].input("W")[0] + for lookup_table_op in lookup_table_ops: + assert table_param_name == lookup_table_op.input("W")[0] + def get_trainer_program(self): # remove optimize ops and add a send op to main_program self.origin_program.global_block().delete_ops(self.optimize_ops) From 29174df1c66c0b416b4edd636909abec2249b6bb Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 7 Apr 2018 21:35:41 +0800 Subject: [PATCH 03/26] add dist transpile logic --- paddle/fluid/framework/block_desc.h | 3 +- paddle/fluid/operators/prefetch_op.cc | 4 +- python/paddle/fluid/distribute_transpiler.py | 101 +++++++++++++++--- .../tests/book/dist_test/test_word2vec.py | 16 ++- 4 files changed, 101 insertions(+), 23 deletions(-) diff --git a/paddle/fluid/framework/block_desc.h b/paddle/fluid/framework/block_desc.h index 468423e0e8e7b..1faa3c8bb2fd8 100644 --- a/paddle/fluid/framework/block_desc.h +++ b/paddle/fluid/framework/block_desc.h @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include +#include #include #include @@ -91,7 +92,7 @@ class BlockDesc { /* * Remove Op and its input/output variables. - * Note that for either input or ouput variable, if it is also an input or + * Note that for either input or output variable, if it is also an input or * output variable of other ops, we should remain it. */ void RemoveOp(size_t s, size_t e); diff --git a/paddle/fluid/operators/prefetch_op.cc b/paddle/fluid/operators/prefetch_op.cc index 09ab7da663b5e..c1e41f25e8fab 100644 --- a/paddle/fluid/operators/prefetch_op.cc +++ b/paddle/fluid/operators/prefetch_op.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include +#include // NOLINT #include #include "paddle/fluid/framework/data_type.h" @@ -71,7 +71,7 @@ class PrefetchOpMaker : public framework::OpProtoAndCheckerMaker { "(RPCClient) The RPC client object which will be" "initialized at most once."); AddOutput("Out", - "(SelectedRows) result " + "(LoDTensor) result " "to be fetched from parameter server") .AsDuplicable(); AddAttr>( diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index c767e2c52076c..980a9cccda0b8 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -21,6 +21,9 @@ from framework import Program, default_main_program, Variable from . import core +LOOKUP_TABLE_TYPE = "lookup_table" +RPC_CLIENT_VAR_NAME = "RPC_CLIENT_VAR" + class VarBlock: def __init__(self, varname, offset, size): @@ -245,7 +248,7 @@ def transpile(self, self.param_grad_ep_mapping[ep]["grads"].append(grad) rpc_client_var = program.global_block().create_var( - name="RPC_CLIENT_VAR", + name=RPC_CLIENT_VAR_NAME, persistable=True, type=core.VarDesc.VarType.RAW) @@ -271,21 +274,89 @@ def transpile(self, # process lookup_table_op # 1. check all lookup_table_op is distributed # 2. check all lookup_table_op share the same table. - lookup_table_ops = [] - have_distributed_lookup_table_op = None + distributed_lookup_table_ops = [] + # support only one distributed_lookup_table now + self.table_name = None for op in program.global_block().ops: - if op.type == "lookup_table": - if have_distributed_lookup_table_op is None: - have_distributed_lookup_table_op = op.attr['is_distributed'] - # all lookup_table op's attribute is_distributed should be the same - assert have_distributed_lookup_table_op == op.attr[ - 'is_distributed'] - lookup_table_ops.append(op) - - if have_distributed_lookup_table_op: - table_param_name = lookup_table_ops[0].input("W")[0] - for lookup_table_op in lookup_table_ops: - assert table_param_name == lookup_table_op.input("W")[0] + if op.type == LOOKUP_TABLE_TYPE: + if op.attrs['is_distributed'] is True: + if self.table_name is None: + self.table_name = op.input("W")[0] + if self.table_name != op.input("W")[0]: + raise RuntimeError("all distributed lookup_table_ops" + " should have only one table") + distributed_lookup_table_ops.append(op) + + self.has_distributed_lookup_table = len( + distributed_lookup_table_ops) > 0 + if self.has_distributed_lookup_table: + + def create_var(block, name): + return block.create_var( + name=name, type=core.VarDesc.VarType.LOD_TENSOR) + + # replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op + self.prefetch_input_vars = [ + create_var( + block=program.global_block(), + name=str(self.table_name + "_prefetch_in_" + str(index))) + for index in range(len(pserver_endpoints)) + ] + self.prefetch_output_vars = [ + create_var( + block=program.global_block(), + name=str(self.table_name + "_prefetch_out_" + str(index))) + for index in range(len(pserver_endpoints)) + ] + while True: + all_ops = program.global_block().ops + for op in all_ops: + if op.type == LOOKUP_TABLE_TYPE: + op_index = list(all_ops).index(op) + ids_name = op.input("Ids") + out_name = op.output("Out") + + # insert split_ids_op + split_ids_op = program.global_block().desc.insert_op( + op_index) + split_ids_op.set_type("split_ids") + split_ids_op.set_input("Ids", ids_name) + split_ids_op.set_output( + "Out", + [var.name for var in self.prefetch_input_vars]) + + # insert prefetch_op + prefetch_op = program.global_block().desc.insert_op( + op_index + 1) + prefetch_op.set_type("prefetch") + prefetch_op.set_input( + "X", + [var.name for var in self.prefetch_input_vars]) + prefetch_op.set_output( + "Out", + [var.name for var in self.prefetch_output_vars]) + prefetch_op.set_output("RPCClient", + [RPC_CLIENT_VAR_NAME]) + prefetch_op.set_output("Out", out_name) + prefetch_op.set_attr("epmap", eplist) + + # insert concat_op + concat_op = program.global_block().desc.insert_op( + op_index + 2) + concat_op.set_type("concat") + concat_op.set_input( + "X", + [var.name for var in self.prefetch_output_vars]) + concat_op.set_attr("axis", 0) + + program.sync_with_cpp() + # delete lookup_table_op + program.global_block().delete_ops([op]) + program.sync_with_cpp() + + continue + # if for loop did not break, it means that there is no more lookup_table_op + break def get_trainer_program(self): # remove optimize ops and add a send op to main_program diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py index b4a01f6b880cd..76e1ce170cbc2 100644 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py @@ -36,25 +36,29 @@ def __network__(words): size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, - param_attr='shared_w') + param_attr='shared_w', + is_distributed=True) embed_second = fluid.layers.embedding( input=words[1], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, - param_attr='shared_w') + param_attr='shared_w', + is_distributed=True) embed_third = fluid.layers.embedding( input=words[2], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, - param_attr='shared_w') + param_attr='shared_w', + is_distributed=True) embed_forth = fluid.layers.embedding( input=words[3], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, - param_attr='shared_w') + param_attr='shared_w', + is_distributed=True) concat_embed = fluid.layers.concat( input=[embed_first, embed_second, embed_third, embed_forth], axis=1) @@ -130,7 +134,9 @@ def train_loop(main_program): params_grads, trainer_id, pservers=pserver_endpoints, - trainers=trainers) + trainer_num=trainers) + with open("program.proto", "w") as f: + f.write(str(fluid.default_main_program())) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, From 54656a12eb35b768490b875253fb043a67c615d3 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 7 Apr 2018 22:42:54 +0800 Subject: [PATCH 04/26] add insert op for block --- paddle/fluid/operators/concat_op.cc | 4 +- python/paddle/fluid/distribute_transpiler.py | 58 ++++++++++---------- python/paddle/fluid/framework.py | 15 +++-- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/paddle/fluid/operators/concat_op.cc b/paddle/fluid/operators/concat_op.cc index 0eedd8ee51ebf..ec96ba656c4ec 100644 --- a/paddle/fluid/operators/concat_op.cc +++ b/paddle/fluid/operators/concat_op.cc @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/concat_op.h" + +#include #include namespace paddle { @@ -33,7 +35,7 @@ class ConcatOp : public framework::OperatorWithKernel { size_t axis = static_cast(ctx->Attrs().Get("axis")); const size_t n = ins.size(); - PADDLE_ENFORCE_GT(n, 1, "Input tensors count should > 1."); + // PADDLE_ENFORCE_GT(n, 1, "Input tensors count should > 1."); auto out_dims = ins[0]; size_t in_zero_dims_size = out_dims.size(); diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 980a9cccda0b8..a8bbe05bcf103 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -317,39 +317,41 @@ def create_var(block, name): out_name = op.output("Out") # insert split_ids_op - split_ids_op = program.global_block().desc.insert_op( - op_index) - split_ids_op.set_type("split_ids") - split_ids_op.set_input("Ids", ids_name) - split_ids_op.set_output( - "Out", - [var.name for var in self.prefetch_input_vars]) + program.global_block().insert_op( + index=op_index, + type="split_ids", + inputs={ + 'Ids': [ + program.global_block().vars[varname] + for varname in ids_name + ] + }, + outputs={"Out": self.prefetch_input_vars}) # insert prefetch_op - prefetch_op = program.global_block().desc.insert_op( - op_index + 1) - prefetch_op.set_type("prefetch") - prefetch_op.set_input( - "X", - [var.name for var in self.prefetch_input_vars]) - prefetch_op.set_output( - "Out", - [var.name for var in self.prefetch_output_vars]) - prefetch_op.set_output("RPCClient", - [RPC_CLIENT_VAR_NAME]) - prefetch_op.set_output("Out", out_name) - prefetch_op.set_attr("epmap", eplist) + program.global_block().insert_op( + index=op_index + 1, + type="prefetch", + inputs={'X': self.prefetch_input_vars}, + outputs={ + "Out": self.prefetch_output_vars, + "RPCClient": rpc_client_var + }, + attrs={"epmap": eplist}) # insert concat_op - concat_op = program.global_block().desc.insert_op( - op_index + 2) - concat_op.set_type("concat") - concat_op.set_input( - "X", - [var.name for var in self.prefetch_output_vars]) - concat_op.set_attr("axis", 0) + program.global_block().insert_op( + index=op_index + 2, + type="concat", + inputs={'X': self.prefetch_output_vars}, + outputs={ + "Out": [ + program.global_block().vars[varname] + for varname in out_name + ] + }, + attrs={"axis": 0}) - program.sync_with_cpp() # delete lookup_table_op program.global_block().delete_ops([op]) program.sync_with_cpp() diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index e15456bfc0835..1973002eca3d4 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -645,7 +645,7 @@ class Block(object): def __init__(self, program, idx): self.desc = program.desc.block(idx) self.vars = dict() # var_name --> var - self.ops = collections.deque() # operator list + self.ops = list() # operator list self.program = program self.removed_vars = dict() @@ -817,6 +817,13 @@ def append_op(self, *args, **kwargs): self.ops.append(op) return op + def insert_op(self, index, *args, **kwargs): + self.sync_with_cpp() + op_desc = self.desc.insert_op(index) + op = Operator(block=self, desc=op_desc, *args, **kwargs) + self.ops.insert(index, op) + return op + def delete_ops(self, ops): # remove from cpp # FIXME(typhoonzero): remove only the first occurrence. @@ -828,12 +835,12 @@ def delete_ops(self, ops): self.desc.remove_op(start, end + 1) def slice_ops(self, start, end): - return list(self.ops)[start:end] + return self.ops[start:end] def prepend_op(self, *args, **kwargs): op_desc = self.desc.prepend_op() op = Operator(self, op_desc, *args, **kwargs) - self.ops.appendleft(op) + self.ops.insert(0, op) return op def sync_with_cpp(self): @@ -878,7 +885,7 @@ def sync_with_cpp(self): for index in range((start_index - 1 - 1), -1, -1): op_desc = ops_in_cpp[index] op = Operator(self, op_desc) - self.ops.appendleft(op) + self.ops.insert(0, op) # sync ops append to the end of cpp_ops for index in range((end_index + 1), len(ops_in_cpp)): From 171560bd34636d70da7684c99ee6f13559e22c1c Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sun, 8 Apr 2018 00:06:15 +0800 Subject: [PATCH 05/26] init change get_pserver_program --- python/paddle/fluid/distribute_transpiler.py | 26 +++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index a8bbe05bcf103..40874454f059e 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -214,6 +214,7 @@ def transpile(self, # fluid distributed training with fault-tolerance. self.trainer_id = trainer_id pserver_endpoints = pservers.split(",") + self.pserver_endpoints = pserver_endpoints # step1 param_list = [pg[0] for pg in params_grads] @@ -411,7 +412,7 @@ def get_pserver_program(self, endpoint): # step3 optimize_block = pserver_program.create_block(0) # step 4 - # Create a union-find data struct from optimize ops, + # Create a union-find data structure from optimize ops, # If two ops are connected, we could add these two ops # into one set. ufind = self._create_ufind(self.optimize_ops) @@ -486,6 +487,28 @@ def __append_optimize_op__(op, block): # __append_optimize_op__(glb_op, optimize_block) # break + # process distributed lookup_table + if self.has_distributed_lookup_table: + table_var = pserver_program.global_block().vars[self.table_name] + prefetch_block = pserver_program.create_block(optimize_block.idx) + pserver_index = self.pserver_endpoints.index(endpoint) + trainer_ids = self.prefetch_input_vars[pserver_index] + pserver_ids = pserver_program.global_block().create_var( + name=trainer_ids.name, type=trainer_ids.type) + trainer_out = self.prefetch_output_vars[pserver_index] + pserver_out = pserver_program.global_block().create_var( + name=trainer_out.name, type=trainer_out.type) + prefetch_block.append_op( + type="lookup_table", + inputs={'Ids': pserver_ids, + "W": table_var}, + outputs={"Out": pserver_out}, + attrs={ + "is_sparse": True, # has no effect on lookup_table op + "is_distributed": True, + "padding_idx": -1 + }) + # step5 append the listen_and_serv op pserver_program.global_block().append_op( type="listen_and_serv", @@ -496,6 +519,7 @@ def __append_optimize_op__(op, block): "endpoint": endpoint, "Fanin": self.trainer_num }) + pserver_program.sync_with_cpp() return pserver_program From 38ed3e8922c897872ec14555be05d7dd426b8db4 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sun, 8 Apr 2018 14:36:17 +0800 Subject: [PATCH 06/26] optimize code --- python/paddle/fluid/distribute_transpiler.py | 54 +++++++++++--------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 400d4064136b7..1702bd3650e8f 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -16,14 +16,9 @@ import math +import distributed_splitter as splitter import framework -from distributed_spliter import * from framework import Program, default_main_program, Variable -from framework import Program, default_main_program, default_startup_program, Parameter, Variable -import optimizer -from layer_helper import LayerHelper -import distributed_splitter as splitter -import math from . import core LOOKUP_TABLE_TYPE = "lookup_table" @@ -297,23 +292,20 @@ def transpile(self, distributed_lookup_table_ops) > 0 if self.has_distributed_lookup_table: - def create_var(block, name): - return block.create_var( - name=name, type=core.VarDesc.VarType.LOD_TENSOR) + def create_splited_vars(source_var, block, tag): + return [ + block.create_var( + name=str(self.table_name + tag + str(index)), + type=source_var.type, + shape=source_var.shape, + dtype=source_var.dtype) + for index in range(len(pserver_endpoints)) + ] # replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op - self.prefetch_input_vars = [ - create_var( - block=program.global_block(), - name=str(self.table_name + "_prefetch_in_" + str(index))) - for index in range(len(pserver_endpoints)) - ] - self.prefetch_output_vars = [ - create_var( - block=program.global_block(), - name=str(self.table_name + "_prefetch_out_" + str(index))) - for index in range(len(pserver_endpoints)) - ] + self.prefetch_input_vars = None + self.prefetch_output_vars = None + while True: all_ops = program.global_block().ops for op in all_ops: @@ -322,6 +314,19 @@ def create_var(block, name): ids_name = op.input("Ids") out_name = op.output("Out") + if self.prefetch_input_vars is None: + ids_var = program.global_block().vars[ids_name[0]] + self.prefetch_input_vars = create_splited_vars( + source_var=ids_var, + block=program.global_block(), + tag="_prefetch_in_") + if self.prefetch_output_vars is None: + out_var = program.global_block().vars[out_name[0]] + self.prefetch_output_vars = create_splited_vars( + source_var=out_var, + block=program.global_block(), + tag="_prefetch_out_") + # insert split_ids_op program.global_block().insert_op( index=op_index, @@ -499,12 +504,15 @@ def __append_optimize_op__(op, block): pserver_index = self.pserver_endpoints.index(endpoint) trainer_ids = self.prefetch_input_vars[pserver_index] pserver_ids = pserver_program.global_block().create_var( - name=trainer_ids.name, type=trainer_ids.type) + name=trainer_ids.name, + type=trainer_ids.type, + shape=trainer_ids.shape, + dtype=trainer_ids.dtype) trainer_out = self.prefetch_output_vars[pserver_index] pserver_out = pserver_program.global_block().create_var( name=trainer_out.name, type=trainer_out.type) prefetch_block.append_op( - type="lookup_table", + type=LOOKUP_TABLE_TYPE, inputs={'Ids': pserver_ids, "W": table_var}, outputs={"Out": pserver_out}, From eb31b66b19d84d8222ce6be194a4e8745231c18d Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sun, 8 Apr 2018 15:45:28 +0800 Subject: [PATCH 07/26] fix a bug --- python/paddle/fluid/distribute_transpiler.py | 5 ++++- python/paddle/fluid/tests/book/dist_test/test_word2vec.py | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 1702bd3650e8f..d08a271b6ac2d 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -510,7 +510,10 @@ def __append_optimize_op__(op, block): dtype=trainer_ids.dtype) trainer_out = self.prefetch_output_vars[pserver_index] pserver_out = pserver_program.global_block().create_var( - name=trainer_out.name, type=trainer_out.type) + name=trainer_out.name, + type=trainer_out.type, + shape=trainer_out.shape, + dtype=trainer_out.dtype) prefetch_block.append_op( type=LOOKUP_TABLE_TYPE, inputs={'Ids': pserver_ids, diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py index 76e1ce170cbc2..c47d98297c7e9 100644 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py @@ -139,8 +139,12 @@ def train_loop(main_program): f.write(str(fluid.default_main_program())) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) + with open("pserver.proto", "w") as f: + f.write(str(pserver_prog)) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) + with open("startup.proto", "w") as f: + f.write(str(pserver_startup)) exe.run(pserver_startup) exe.run(pserver_prog) elif training_role == "TRAINER": From d6725926158cb3c51bd8d5ac7ebfcfa00e24737d Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sun, 8 Apr 2018 20:27:19 +0800 Subject: [PATCH 08/26] can run now --- paddle/fluid/operators/detail/grpc_server.cc | 1 + paddle/fluid/operators/listen_and_serv_op.cc | 45 ++++++++++++------- paddle/fluid/operators/listen_and_serv_op.h | 2 + paddle/fluid/operators/prefetch_op.cc | 4 +- python/paddle/fluid/distribute_transpiler.py | 3 +- .../tests/book/dist_test/test_word2vec.py | 18 +++++--- 6 files changed, 48 insertions(+), 25 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index d5fc163bc2540..0b582a08bc0bf 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -161,6 +161,7 @@ class RequestPrefetch final : public RequestBase { ::grpc::ByteBuffer reply; std::string var_name = request_->OutVarname(); + VLOG(3) << "prefetch var " << var_name; auto var_desc = program_->Block(0).FindVar(var_name); framework::Scope* local_scope = &scope_->NewScope(); auto* var = local_scope->FindVar(var_name); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 9188f2d989e60..5d293665f0bcc 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -13,7 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #include -#include +#include // NOLINT +#include #include "paddle/fluid/operators/listen_and_serv_op.h" @@ -88,8 +89,9 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, auto ins = Inputs("X"); auto fan_in = Attr("Fanin"); - auto *block = Attr(kOptimizeBlock); - auto *program = block->Program(); + auto *optimize_block = Attr(kOptimizeBlock); + auto *prefetch_block = Attr(kPrefetchBlock); + auto *program = optimize_block->Program(); size_t num_blocks = program->Size(); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); @@ -97,18 +99,25 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, framework::Executor executor(dev_place); std::vector block_list; for (size_t blkid = 1; blkid < num_blocks; ++blkid) { - block_list.push_back(blkid); + if (blkid != prefetch_block->ID()) { + block_list.push_back(blkid); + } } - auto prepared = executor.Prepare(*program, block_list); + auto optimize_prepared = executor.Prepare(*program, block_list); // Insert placeholder for block0 which holds current op itself. - prepared.insert(prepared.begin(), - std::shared_ptr(nullptr)); + optimize_prepared.insert( + optimize_prepared.begin(), + std::shared_ptr(nullptr)); rpc_service_->SetScope(&recv_scope); rpc_service_->SetDevCtx(&dev_ctx); // TODO(qiao) set proper fields for table lookup and update rpc_service_->SetExecutor(&executor); - rpc_service_->SetPrefetchBlkdId(0); + VLOG(3) << "prefetch block id is " << prefetch_block->ID(); + auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID()); + rpc_service_->SetPrefetchBlkdId(prefetch_block->ID()); + rpc_service_->SetPrefetchPreparedCtx(prefetch_prepared.get()); + prefetch_prepared.release(); rpc_service_->SetProgram(program); // start the server listening after all member initialized. server_thread_.reset(new std::thread(RunServer, rpc_service_)); @@ -166,16 +175,18 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, parallel_blkids.push_back(1); double ts = detail::GetTimestamp(); for (size_t blkid = 2; blkid < num_blocks; ++blkid) { - if (program->Block(blkid).Parent() != last_parent_blkid) { - ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program, - &recv_scope); - parallel_blkids.clear(); - last_parent_blkid = program->Block(blkid).Parent(); + if (blkid != prefetch_block->ID()) { + if (program->Block(blkid).Parent() != last_parent_blkid) { + ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, + program, &recv_scope); + parallel_blkids.clear(); + last_parent_blkid = program->Block(blkid).Parent(); + } + parallel_blkids.push_back(blkid); } - parallel_blkids.push_back(blkid); } - ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program, - &recv_scope); + ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, + program, &recv_scope); VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)"; // Reset the received sparse variables, the sum operator would not @@ -211,6 +222,8 @@ from send_op and send back variables to recv_op. .AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); AddAttr(kOptimizeBlock, "BlockID to run on server side."); + AddAttr(kPrefetchBlock, + "prefetch block to run on server side."); AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); } diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 0da87afc961e8..759b2a462ba5b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -16,6 +16,7 @@ limitations under the License. */ #include #include +#include #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -27,6 +28,7 @@ namespace paddle { namespace operators { constexpr char kOptimizeBlock[] = "OptimizeBlock"; +constexpr char kPrefetchBlock[] = "PrefetchBlock"; void RunServer(std::shared_ptr service); diff --git a/paddle/fluid/operators/prefetch_op.cc b/paddle/fluid/operators/prefetch_op.cc index c1e41f25e8fab..f9ae01ab5d297 100644 --- a/paddle/fluid/operators/prefetch_op.cc +++ b/paddle/fluid/operators/prefetch_op.cc @@ -50,8 +50,8 @@ class PrefetchOp : public framework::OperatorBase { for (size_t i = 0; i < ins.size(); i++) { if (NeedSend(scope, ins[i])) { - VLOG(3) << "sending " << ins[i] << " to " << epmap[i] << "to get " - << outs[i] << "back"; + VLOG(3) << "sending " << ins[i] << " to " << epmap[i] << " to get " + << outs[i] << " back"; rpc_client->AsyncPrefetchVariable(epmap[i], ctx, scope, ins[i], outs[i]); } else { diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index d08a271b6ac2d..252e8c2b922b2 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -533,7 +533,8 @@ def __append_optimize_op__(op, block): attrs={ "OptimizeBlock": optimize_block, "endpoint": endpoint, - "Fanin": self.trainer_num + "Fanin": self.trainer_num, + "PrefetchBlock": prefetch_block }) pserver_program.sync_with_cpp() diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py index c47d98297c7e9..d4af2be18d2bb 100644 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py @@ -31,34 +31,35 @@ def train(use_cuda=False, is_sparse=False, is_local=False): IS_SPARSE = is_sparse def __network__(words): + id_distributed = True embed_first = fluid.layers.embedding( input=words[0], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, param_attr='shared_w', - is_distributed=True) + is_distributed=id_distributed) embed_second = fluid.layers.embedding( input=words[1], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, param_attr='shared_w', - is_distributed=True) + is_distributed=id_distributed) embed_third = fluid.layers.embedding( input=words[2], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, param_attr='shared_w', - is_distributed=True) + is_distributed=id_distributed) embed_forth = fluid.layers.embedding( input=words[3], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, param_attr='shared_w', - is_distributed=True) + is_distributed=id_distributed) concat_embed = fluid.layers.concat( input=[embed_first, embed_second, embed_third, embed_forth], axis=1) @@ -99,14 +100,16 @@ def __network__(words): def train_loop(main_program): exe.run(fluid.default_startup_program()) + TRAINING_BATCHES = 5 batch_num = 0 for pass_id in range(PASS_NUM): for data in train_reader(): avg_cost_np = exe.run(main_program, feed=feeder.feed(data), fetch_list=[avg_cost]) - if batch_num == 1: + if batch_num == TRAINING_BATCHES: return + #print("batch_id=" + str(batch_num) + ", cost=" + str(avg_cost_np[0])) if avg_cost_np[0] < 5.0: return if math.isnan(float(avg_cost_np[0])): @@ -148,7 +151,10 @@ def train_loop(main_program): exe.run(pserver_startup) exe.run(pserver_prog) elif training_role == "TRAINER": - train_loop(t.get_trainer_program()) + trainer_program = t.get_trainer_program() + with open("trainer.proto", "w") as f: + f.write(str(trainer_program)) + train_loop(trainer_program) class W2VTest(unittest.TestCase): From 2e69b776d38cdee8a7c54d8a7003e08678bc2112 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 9 Apr 2018 11:43:33 +0800 Subject: [PATCH 09/26] start to do table split --- python/paddle/fluid/distribute_transpiler.py | 37 ++++++++++--------- .../tests/book/dist_test/test_word2vec.py | 2 +- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 252e8c2b922b2..390dced7a5220 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -216,6 +216,25 @@ def transpile(self, pserver_endpoints = pservers.split(",") self.pserver_endpoints = pserver_endpoints + # process lookup_table_op + # 1. check all lookup_table_op is distributed + # 2. check all lookup_table_op share the same table. + distributed_lookup_table_ops = [] + # support only one distributed_lookup_table now + self.table_name = None + for op in program.global_block().ops: + if op.type == LOOKUP_TABLE_TYPE: + if op.attrs['is_distributed'] is True: + if self.table_name is None: + self.table_name = op.input("W")[0] + if self.table_name != op.input("W")[0]: + raise RuntimeError("all distributed lookup_table_ops" + " should have only one table") + distributed_lookup_table_ops.append(op) + + self.has_distributed_lookup_table = len( + distributed_lookup_table_ops) > 0 + # step1 param_list = [pg[0] for pg in params_grads] grad_list = [pg[1] for pg in params_grads] @@ -272,24 +291,6 @@ def transpile(self, outputs={"Out": [orig_param]}, attrs={"axis": 0}) - # process lookup_table_op - # 1. check all lookup_table_op is distributed - # 2. check all lookup_table_op share the same table. - distributed_lookup_table_ops = [] - # support only one distributed_lookup_table now - self.table_name = None - for op in program.global_block().ops: - if op.type == LOOKUP_TABLE_TYPE: - if op.attrs['is_distributed'] is True: - if self.table_name is None: - self.table_name = op.input("W")[0] - if self.table_name != op.input("W")[0]: - raise RuntimeError("all distributed lookup_table_ops" - " should have only one table") - distributed_lookup_table_ops.append(op) - - self.has_distributed_lookup_table = len( - distributed_lookup_table_ops) > 0 if self.has_distributed_lookup_table: def create_splited_vars(source_var, block, tag): diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py index d4af2be18d2bb..4abd3ee21431f 100644 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py @@ -22,7 +22,7 @@ import sys -def train(use_cuda=False, is_sparse=False, is_local=False): +def train(use_cuda=False, is_sparse=True, is_local=False): PASS_NUM = 100 EMBED_SIZE = 32 HIDDEN_SIZE = 256 From 3ad3eeaef85a5d63861b9ce6c7e072e12bf2fee0 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 9 Apr 2018 22:20:05 +0800 Subject: [PATCH 10/26] start to process table gradient --- paddle/fluid/operators/split_ids_op.cc | 10 +- python/paddle/fluid/distribute_transpiler.py | 98 +++++++++++++++----- 2 files changed, 78 insertions(+), 30 deletions(-) diff --git a/paddle/fluid/operators/split_ids_op.cc b/paddle/fluid/operators/split_ids_op.cc index a54f8a2878c86..9957bf968dc8f 100644 --- a/paddle/fluid/operators/split_ids_op.cc +++ b/paddle/fluid/operators/split_ids_op.cc @@ -48,11 +48,11 @@ class SplitIdsOp : public framework::OperatorWithKernel { PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must has output Out."); auto ids_var_type = ctx->GetInputsVarType("Ids").front(); - PADDLE_ENFORCE_EQ(ids_var_type, framework::proto::VarType::LOD_TENSOR); - - auto ids_dims = ctx->GetInputDim("Ids"); - PADDLE_ENFORCE_EQ(ids_dims.size(), 2); - PADDLE_ENFORCE_EQ(ids_dims[1], 1); + if (ids_var_type == framework::proto::VarType::LOD_TENSOR) { + auto ids_dims = ctx->GetInputDim("Ids"); + PADDLE_ENFORCE_EQ(ids_dims.size(), 2); + PADDLE_ENFORCE_EQ(ids_dims[1], 1); + } } }; diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 390dced7a5220..da99f0505e8c9 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -22,6 +22,7 @@ from . import core LOOKUP_TABLE_TYPE = "lookup_table" +LOOKUP_TABLE_GRAD_TYPE = "lookup_table_grad" RPC_CLIENT_VAR_NAME = "RPC_CLIENT_VAR" @@ -37,9 +38,9 @@ def __str__(self): class UnionFind(object): - """ Union-find data struct. + """ Union-find data structure. - Union-find is a data struct that keeps track of a set of elements partitioned + Union-find is a data structure that keeps track of a set of elements partitioned into a number of disjoint (non-overlapping) subsets. Reference: @@ -145,14 +146,6 @@ def split_dense_variable(var_list, # only have one lookup_table_op -def split_sparse_ids(): - pass - - -def get_prefetch_outputs(): - pass - - class DistributeTranspiler: def transpile(self, optimize_ops, @@ -231,6 +224,9 @@ def transpile(self, raise RuntimeError("all distributed lookup_table_ops" " should have only one table") distributed_lookup_table_ops.append(op) + else: + if self.table_name is not None: + assert op.input("W")[0] != self.table_name self.has_distributed_lookup_table = len( distributed_lookup_table_ops) > 0 @@ -238,6 +234,28 @@ def transpile(self, # step1 param_list = [pg[0] for pg in params_grads] grad_list = [pg[1] for pg in params_grads] + + if self.has_distributed_lookup_table: + param_list = [ + param for param in param_list if param.name != self.table_name + ] + grad_list = [ + grad for grad in grad_list + if grad.name != framework.grad_var_name(self.table_name) + ] + self.table_param_grad = [ + param_grad for param_grad in params_grads + if param_grad[0].name == self.table_name + ] + self.table_param_list = self.create_splited_vars( + source_var=self.table_param_grad[0], + block=program.global_block(), + tag="_") + self.table_grad_list = self.create_splited_vars( + source_var=self.table_param_grad[1], + block=program.global_block(), + tag="_") + grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) # step2 @@ -293,21 +311,12 @@ def transpile(self, if self.has_distributed_lookup_table: - def create_splited_vars(source_var, block, tag): - return [ - block.create_var( - name=str(self.table_name + tag + str(index)), - type=source_var.type, - shape=source_var.shape, - dtype=source_var.dtype) - for index in range(len(pserver_endpoints)) - ] - - # replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op + # 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op self.prefetch_input_vars = None self.prefetch_output_vars = None while True: + should_break = True all_ops = program.global_block().ops for op in all_ops: if op.type == LOOKUP_TABLE_TYPE: @@ -317,13 +326,13 @@ def create_splited_vars(source_var, block, tag): if self.prefetch_input_vars is None: ids_var = program.global_block().vars[ids_name[0]] - self.prefetch_input_vars = create_splited_vars( + self.prefetch_input_vars = self.create_splited_vars( source_var=ids_var, block=program.global_block(), tag="_prefetch_in_") if self.prefetch_output_vars is None: out_var = program.global_block().vars[out_name[0]] - self.prefetch_output_vars = create_splited_vars( + self.prefetch_output_vars = self.create_splited_vars( source_var=out_var, block=program.global_block(), tag="_prefetch_out_") @@ -368,9 +377,38 @@ def create_splited_vars(source_var, block, tag): program.global_block().delete_ops([op]) program.sync_with_cpp() - continue + should_break = False + break + should_break = True # if for loop did not break, it means that there is no more lookup_table_op - break + if should_break: + break + else: + continue + + # 2. add split_ids_op and send_vars_op to send gradient to pservers + # there should only be one table_name + all_ops = program.global_block().ops + table_grad_name = framework.grad_var_name(self.table_name) + for op in all_ops: + if table_grad_name in op.output_arg_names: + op_index = list(all_ops).index(op) + # insert split_ids_op + program.global_block().insert_op( + index=op_index + 1, + type="split_ids", + inputs={ + 'Ids': + [program.global_block().vars[table_grad_name]] + }, + outputs={"Out": self.table_grad_list}) + program.global_block().insert_op( + index=op_index + 2, + type="send_vars", + inputs={'X': self.table_grad_list}, + outputs={"RPCClient": rpc_client_var}, + attrs={"sync_send": True}) + break def get_trainer_program(self): # remove optimize ops and add a send op to main_program @@ -654,6 +692,16 @@ def _create_vars_from_blocklist(self, program.global_block().sync_with_cpp() return var_mapping + def create_splited_vars(self, source_var, block, tag): + return [ + block.create_var( + name=str(source_var.name + tag + str(index)), + type=source_var.type, + shape=source_var.shape, + dtype=source_var.dtype) + for index in range(len(self.pserver_endpoints)) + ] + def _clone_var(self, block, var): assert isinstance(var, Variable) return block.create_var( From a07a0630138dd68bc39ce99cb1111bcde0f362ad Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 10 Apr 2018 10:53:43 +0800 Subject: [PATCH 11/26] complete pserver part --- paddle/fluid/operators/sgd_op.cc | 4 +- paddle/fluid/operators/sum_op.cc | 8 +- python/paddle/fluid/distribute_transpiler.py | 87 +++++++++++++++++--- 3 files changed, 83 insertions(+), 16 deletions(-) diff --git a/paddle/fluid/operators/sgd_op.cc b/paddle/fluid/operators/sgd_op.cc index 074fa9e00f2ec..06cb0550ad7d4 100644 --- a/paddle/fluid/operators/sgd_op.cc +++ b/paddle/fluid/operators/sgd_op.cc @@ -35,8 +35,8 @@ class SGDOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ(framework::product(lr_dims), 1, "Learning rate should have 1 element"); auto param_dim = ctx->GetInputDim("Param"); - // TODO(qijun): check dimensions of Param and Grad at complie - // and run time. + // TODO(qijun): check dimensions of Param and Grad at compile + // and runtime. ctx->SetOutputDim("ParamOut", param_dim); } diff --git a/paddle/fluid/operators/sum_op.cc b/paddle/fluid/operators/sum_op.cc index d3d5c8a3429e2..9324486003486 100644 --- a/paddle/fluid/operators/sum_op.cc +++ b/paddle/fluid/operators/sum_op.cc @@ -10,7 +10,11 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/sum_op.h" + +#include +#include #include + #include "paddle/fluid/framework/var_type_inference.h" #include "paddle/fluid/operators/detail/safe_ref.h" @@ -34,8 +38,8 @@ class SumOp : public framework::OperatorWithKernel { } auto x_dims = ctx->GetInputsDim("X"); - size_t N = x_dims.size(); - PADDLE_ENFORCE_GT(N, 1, "Input tensors count should > 1."); + // size_t N = x_dims.size(); + // PADDLE_ENFORCE_GT(N, 1, "Input tensors count should > 1."); framework::DDim in_dim({0}); for (auto& x_dim : x_dims) { diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index da99f0505e8c9..7b0c3475dbcd0 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -246,15 +246,17 @@ def transpile(self, self.table_param_grad = [ param_grad for param_grad in params_grads if param_grad[0].name == self.table_name + ][0] + table_grad_var = self.table_param_grad[1] + self.table_grad_list = [ + program.global_block().create_var( + name="%s.trainer_%d.pserver_%d" % + (table_grad_var.name, trainer_id, index), + type=table_grad_var.type, + shape=table_grad_var.shape, + dtype=table_grad_var.dtype) + for index in range(len(self.pserver_endpoints)) ] - self.table_param_list = self.create_splited_vars( - source_var=self.table_param_grad[0], - block=program.global_block(), - tag="_") - self.table_grad_list = self.create_splited_vars( - source_var=self.table_param_grad[1], - block=program.global_block(), - tag="_") grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) @@ -538,9 +540,71 @@ def __append_optimize_op__(op, block): # process distributed lookup_table if self.has_distributed_lookup_table: + pserver_index = self.pserver_endpoints.index(endpoint) + + def _clone_var(block, var, persistable=True): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + persistable=persistable) + + # STEP: create table optimize block + # create table param and grad var in pserver program + param_var = _clone_var( + pserver_program.global_block(), + self.origin_program.global_block().vars[self.table_name]) + grad_var = _clone_var( + pserver_program.global_block(), + self.origin_program.global_block().vars[framework.grad_var_name( + self.table_name)], + persistable=False) + + # create grad vars in pserver program + table_grad_var = self.table_param_grad[1] + table_grad_list = [ + pserver_program.global_block().create_var( + name="%s.trainer_%d.pserver_%d" % + (table_grad_var.name, index, pserver_index), + type=table_grad_var.type, + shape=table_grad_var.shape, + dtype=table_grad_var.dtype) + for index in range(self.trainer_num) + ] + + # create table optimize block in pserver program + table_opt_op = [ + op for op in self.optimize_ops + if op.input("Param")[0] == self.table_name + ][0] + table_opt_block = pserver_program.create_block(append_block.idx) + assert table_opt_op.type == "sgd" + + # append sum op for table_grad_list + table_opt_block.append_op( + type="sum", + inputs={"X": table_grad_list}, + outputs={"Out": [grad_var]}) + + lr_var = pserver_program.global_block().vars[table_opt_op.input( + "LearningRate")[0]] + inputs = { + "Param": [param_var], + "Grad": [grad_var], + "LearningRate": [lr_var] + } + outputs = {"ParamOut": [param_var]} + table_opt_block.append_op( + type=table_opt_op.type, + inputs=inputs, + outputs=outputs, + attrs=table_opt_op.attrs) + + # STEP: create prefetch block table_var = pserver_program.global_block().vars[self.table_name] prefetch_block = pserver_program.create_block(optimize_block.idx) - pserver_index = self.pserver_endpoints.index(endpoint) trainer_ids = self.prefetch_input_vars[pserver_index] pserver_ids = pserver_program.global_block().create_var( name=trainer_ids.name, @@ -702,7 +766,7 @@ def create_splited_vars(self, source_var, block, tag): for index in range(len(self.pserver_endpoints)) ] - def _clone_var(self, block, var): + def _clone_var(self, block, var, persistable=True): assert isinstance(var, Variable) return block.create_var( name=var.name, @@ -710,7 +774,7 @@ def _clone_var(self, block, var): dtype=var.dtype, type=var.type, lod_level=var.lod_level, - persistable=True) + persistable=persistable) def _append_split_op(self, program, gradblocks): # Split variables that need to be split and append respective ops @@ -972,7 +1036,6 @@ def _is_opt_op_on_pserver(self, endpoint, op): if same_or_split_var(n, param) and n != param: return True return False - return False def _get_input_map_from_op(self, varmap, op): iomap = dict() From 53d64594f32231e51dba3a21a04069327f3f31f9 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 10 Apr 2018 15:34:09 +0800 Subject: [PATCH 12/26] can send_vars now --- paddle/fluid/operators/send_vars_op.cc | 4 +- paddle/fluid/operators/split_ids_op.cc | 8 ++- paddle/fluid/operators/split_ids_op.h | 69 ++++++++++++++------ python/paddle/fluid/distribute_transpiler.py | 1 + 4 files changed, 56 insertions(+), 26 deletions(-) diff --git a/paddle/fluid/operators/send_vars_op.cc b/paddle/fluid/operators/send_vars_op.cc index 2cbd9e2394800..56b3713d6af28 100644 --- a/paddle/fluid/operators/send_vars_op.cc +++ b/paddle/fluid/operators/send_vars_op.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include +#include // NOLINT #include #include "paddle/fluid/framework/data_type.h" @@ -36,7 +36,7 @@ class SendVarsOp : public framework::OperatorBase { auto ins = Inputs("X"); std::vector epmap = Attr>("epmap"); - int sync_send = Attr("sync_sent"); + int sync_send = Attr("sync_send"); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); diff --git a/paddle/fluid/operators/split_ids_op.cc b/paddle/fluid/operators/split_ids_op.cc index 9957bf968dc8f..a53cbc8ac5199 100644 --- a/paddle/fluid/operators/split_ids_op.cc +++ b/paddle/fluid/operators/split_ids_op.cc @@ -48,8 +48,8 @@ class SplitIdsOp : public framework::OperatorWithKernel { PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must has output Out."); auto ids_var_type = ctx->GetInputsVarType("Ids").front(); + auto ids_dims = ctx->GetInputDim("Ids"); if (ids_var_type == framework::proto::VarType::LOD_TENSOR) { - auto ids_dims = ctx->GetInputDim("Ids"); PADDLE_ENFORCE_EQ(ids_dims.size(), 2); PADDLE_ENFORCE_EQ(ids_dims[1], 1); } @@ -60,8 +60,9 @@ class SplitIdsOpInferVarType : public framework::VarTypeInference { public: void operator()(const framework::OpDesc &op_desc, framework::BlockDesc *block) const override { + auto *input_var = block->Var(op_desc.Input("Ids")[0]); for (auto &out_var : op_desc.Output("Out")) { - block->Var(out_var)->SetType(framework::proto::VarType::LOD_TENSOR); + block->Var(out_var)->SetType(input_var->GetType()); } } }; @@ -73,4 +74,5 @@ namespace ops = paddle::operators; REGISTER_OPERATOR(split_ids, ops::SplitIdsOp, ops::SplitIdsOpMaker, ops::SplitIdsOpInferVarType); REGISTER_OP_CPU_KERNEL( - split_ids, ops::SplitIdsOpKernel); + split_ids, ops::SplitIdsOpKernel, + ops::SplitIdsOpKernel); diff --git a/paddle/fluid/operators/split_ids_op.h b/paddle/fluid/operators/split_ids_op.h index d36ed398ebce6..5b351d32c6efc 100644 --- a/paddle/fluid/operators/split_ids_op.h +++ b/paddle/fluid/operators/split_ids_op.h @@ -24,35 +24,62 @@ namespace operators { template class SplitIdsOpKernel : public framework::OpKernel { public: - void Compute(const framework::ExecutionContext& ctx) const override { + void Compute(const framework::ExecutionContext &ctx) const override { auto place = ctx.GetPlace(); if (!platform::is_cpu_place(place)) { PADDLE_THROW("SplitIds do not support GPU kernel"); } - auto& ids_dims = ctx.Input("Ids")->dims(); - const T* ids = ctx.Input("Ids")->data(); - auto outs = ctx.MultiOutput("Out"); - const size_t shard_num = outs.size(); + const auto *ids_var = ctx.InputVar("Ids"); + if (ids_var->IsType()) { + const auto &ids_dims = ctx.Input("Ids")->dims(); + const T *ids = ctx.Input("Ids")->data(); + auto outs = ctx.MultiOutput("Out"); + const size_t shard_num = outs.size(); - std::vector> out_ids; - out_ids.resize(outs.size()); + std::vector> out_ids; + out_ids.resize(outs.size()); - // split id by their shard_num. - for (int i = 0; i < ids_dims[0]; ++i) { - T id = ids[i]; - size_t shard_id = static_cast(id) % shard_num; - out_ids[shard_id].push_back(id); - } + // split id by their shard_num. + for (int i = 0; i < ids_dims[0]; ++i) { + T id = ids[i]; + size_t shard_id = static_cast(id) % shard_num; + out_ids[shard_id].push_back(id); + } + + // create tensor for each shard and send to parameter server + for (size_t i = 0; i < out_ids.size(); ++i) { + auto *shard_t = outs[i]; + std::vector ids = out_ids[i]; + auto *shard_data = shard_t->mutable_data( + framework::make_ddim({static_cast(ids.size()), 1}), place); + for (size_t i = 0; i < ids.size(); ++i) { + shard_data[i] = ids[i]; + } + } + } else if (ids_var->IsType()) { + const auto *ids_selected_rows = ctx.Input("Ids"); + auto &ids_dims = ids_selected_rows->value().dims(); + PADDLE_ENFORCE_EQ(ids_dims[0], ids_selected_rows->rows().size(), ""); + const T *ids = ids_selected_rows->value().data(); + const auto &rows = ids_selected_rows->rows(); + auto outs = ctx.MultiOutput("Out"); + const size_t shard_num = outs.size(); + // get rows for outputs + for (auto &id : rows) { + size_t shard_id = static_cast(id) % shard_num; + outs[shard_id]->mutable_rows()->push_back(id); + } - // create tensor for each shard and send to parameter server - for (size_t i = 0; i < out_ids.size(); ++i) { - auto* shard_t = outs[i]; - std::vector ids = out_ids[i]; - auto* shard_data = shard_t->mutable_data( - framework::make_ddim({static_cast(ids.size()), 1}), place); - for (size_t i = 0; i < ids.size(); ++i) { - shard_data[i] = ids[i]; + int64_t row_width = ids_dims[1]; + for (auto &out : outs) { + framework::DDim ddim = framework::make_ddim( + {static_cast(out->rows().size()), row_width}); + T *output = out->mutable_value()->mutable_data(ddim, place); + for (size_t i = 0; i < ddim[0]; ++i) { + memcpy(output + i * row_width, ids + out->rows()[i] * row_width, + row_width * sizeof(T)); + } } } } diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 7b0c3475dbcd0..985edb253062d 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -580,6 +580,7 @@ def _clone_var(block, var, persistable=True): if op.input("Param")[0] == self.table_name ][0] table_opt_block = pserver_program.create_block(append_block.idx) + # only support sgd now assert table_opt_op.type == "sgd" # append sum op for table_grad_list From b1e398dc3288d42212014b315c38e442dd082370 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 10 Apr 2018 15:48:51 +0800 Subject: [PATCH 13/26] revert cpplint --- .pre-commit-config.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7c570e6d0d6ee..6140340890c0e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,6 +26,14 @@ repos: entry: bash ./.clang_format.hook -i language: system files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto)$ +- repo: local + hooks: + - id: cpplint-cpp-source + name: cpplint + description: Check C++ code style using cpplint.py. + entry: bash ./tools/codestyle/cpplint_pre_commit.hook + language: system + files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx)$ - repo: https://github.com/PaddlePaddle/pre-commit-golang sha: 8337620115c25ff8333f1b1a493bd031049bd7c0 hooks: From cf9d25fa60c9562828cb33dba5623b1dfcad235a Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 10 Apr 2018 16:31:15 +0800 Subject: [PATCH 14/26] fix a bug --- python/paddle/fluid/distribute_transpiler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 1b609d93bf583..96c7cfccffcb7 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -457,8 +457,8 @@ def get_pserver_program(self, endpoint): type=v.type, dtype=v.dtype, shape=v.shape) - if self.trainers > 1: - for trainer_id in xrange(self.trainers): + if self.trainer_num > 1: + for trainer_id in xrange(self.trainer_num): var = pserver_program.global_block().create_var( name="%s.trainer_%d" % (orig_var_name, trainer_id), persistable=False, From 064a913a2536cb0d462dc22b4e3d8433f5ecd7fe Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 10 Apr 2018 18:04:43 +0800 Subject: [PATCH 15/26] optimize code --- paddle/fluid/framework/operator.cc | 2 +- paddle/fluid/operators/split_ids_op.h | 5 +++-- python/paddle/fluid/distribute_transpiler.py | 3 ++- .../paddle/fluid/tests/book/dist_test/test_word2vec.py | 10 ++++++---- .../tests/book/dist_test/test_word2vec_pserver.sh | 4 ++-- .../tests/book/dist_test/test_word2vec_trainer.sh | 4 ++-- 6 files changed, 16 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc index a3b4a8c0829ae..7ad9c2a64edc3 100644 --- a/paddle/fluid/framework/operator.cc +++ b/paddle/fluid/framework/operator.cc @@ -55,7 +55,7 @@ static DDim GetDims(const Scope& scope, const std::string& name) { if (var->IsType()) { return var->Get().dims(); } else if (var->IsType()) { - return var->Get().GetCompleteDims(); + return var->Get().value().dims(); } else { return DDim({-1}); } diff --git a/paddle/fluid/operators/split_ids_op.h b/paddle/fluid/operators/split_ids_op.h index 5b351d32c6efc..ba1e903dbb6da 100644 --- a/paddle/fluid/operators/split_ids_op.h +++ b/paddle/fluid/operators/split_ids_op.h @@ -62,17 +62,18 @@ class SplitIdsOpKernel : public framework::OpKernel { auto &ids_dims = ids_selected_rows->value().dims(); PADDLE_ENFORCE_EQ(ids_dims[0], ids_selected_rows->rows().size(), ""); const T *ids = ids_selected_rows->value().data(); - const auto &rows = ids_selected_rows->rows(); + const auto &ids_rows = ids_selected_rows->rows(); auto outs = ctx.MultiOutput("Out"); const size_t shard_num = outs.size(); // get rows for outputs - for (auto &id : rows) { + for (auto &id : ids_rows) { size_t shard_id = static_cast(id) % shard_num; outs[shard_id]->mutable_rows()->push_back(id); } int64_t row_width = ids_dims[1]; for (auto &out : outs) { + out->set_height(ids_selected_rows->height()); framework::DDim ddim = framework::make_ddim( {static_cast(out->rows().size()), row_width}); T *output = out->mutable_value()->mutable_data(ddim, place); diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 96c7cfccffcb7..96713e3860fdf 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -413,7 +413,8 @@ def transpile(self, type="send_vars", inputs={'X': self.table_grad_list}, outputs={"RPCClient": rpc_client_var}, - attrs={"sync_send": True}) + attrs={"sync_send": True, + "epmap": pserver_endpoints}) break def get_trainer_program(self): diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py index 4abd3ee21431f..35df2d6a1e59b 100644 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py @@ -23,7 +23,7 @@ def train(use_cuda=False, is_sparse=True, is_local=False): - PASS_NUM = 100 + PASS_NUM = 1000 EMBED_SIZE = 32 HIDDEN_SIZE = 256 N = 5 @@ -107,9 +107,11 @@ def train_loop(main_program): avg_cost_np = exe.run(main_program, feed=feeder.feed(data), fetch_list=[avg_cost]) - if batch_num == TRAINING_BATCHES: - return - #print("batch_id=" + str(batch_num) + ", cost=" + str(avg_cost_np[0])) + #if batch_num == TRAINING_BATCHES: + # return + if batch_num % 10 == 0: + print("pass_id=" + str(pass_id) + ", batch_id=" + str( + batch_num) + ", cost=" + str(avg_cost_np[0])) if avg_cost_np[0] < 5.0: return if math.isnan(float(avg_cost_np[0])): diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh index d1535eb6b5173..ba3fceedf1066 100644 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash -export GLOG_v=3 -export GLOG_logtostderr=1 +#export GLOG_v=3 +#export GLOG_logtostderr=1 source test_word2vec.env diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh index 4c37b3726ed9d..30844eb88402d 100644 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh +++ b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash -export GLOG_v=3 -export GLOG_logtostderr=1 +#export GLOG_v=3 +#export GLOG_logtostderr=1 source test_word2vec.env From f81d6b3f3390e871b0fa0b54191036e7e969cd61 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 10 Apr 2018 23:25:43 +0800 Subject: [PATCH 16/26] move dist test to models --- .../tests/book/dist_test/test_word2vec.env | 7 - .../tests/book/dist_test/test_word2vec.py | 168 ------------------ .../book/dist_test/test_word2vec_pserver.py | 28 --- .../book/dist_test/test_word2vec_pserver.sh | 9 - .../book/dist_test/test_word2vec_trainer.py | 28 --- .../book/dist_test/test_word2vec_trainer.sh | 9 - 6 files changed, 249 deletions(-) delete mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec.env delete mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec.py delete mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py delete mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh delete mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py delete mode 100644 python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.env b/python/paddle/fluid/tests/book/dist_test/test_word2vec.env deleted file mode 100644 index 220e68a1a6966..0000000000000 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec.env +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -export PADDLE_INIT_PSERVERS="127.0.0.1" -export TRAINERS=1 -export POD_IP="127.0.0.1" -export PADDLE_INIT_TRAINER_ID=0 - diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec.py deleted file mode 100644 index 35df2d6a1e59b..0000000000000 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle -import paddle.fluid as fluid -import paddle.v2 -import unittest -import os -import numpy as np -import math -import sys - - -def train(use_cuda=False, is_sparse=True, is_local=False): - PASS_NUM = 1000 - EMBED_SIZE = 32 - HIDDEN_SIZE = 256 - N = 5 - BATCH_SIZE = 32 - IS_SPARSE = is_sparse - - def __network__(words): - id_distributed = True - embed_first = fluid.layers.embedding( - input=words[0], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w', - is_distributed=id_distributed) - embed_second = fluid.layers.embedding( - input=words[1], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w', - is_distributed=id_distributed) - embed_third = fluid.layers.embedding( - input=words[2], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w', - is_distributed=id_distributed) - embed_forth = fluid.layers.embedding( - input=words[3], - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w', - is_distributed=id_distributed) - - concat_embed = fluid.layers.concat( - input=[embed_first, embed_second, embed_third, embed_forth], axis=1) - hidden1 = fluid.layers.fc(input=concat_embed, - size=HIDDEN_SIZE, - act='sigmoid') - predict_word = fluid.layers.fc(input=hidden1, - size=dict_size, - act='softmax') - cost = fluid.layers.cross_entropy(input=predict_word, label=words[4]) - avg_cost = fluid.layers.mean(cost) - return avg_cost, predict_word - - word_dict = paddle.v2.dataset.imikolov.build_dict() - dict_size = len(word_dict) - - first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') - second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') - third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') - forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') - next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') - - avg_cost, predict_word = __network__( - [first_word, second_word, third_word, forth_word, next_word]) - - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) - - train_reader = paddle.v2.batch( - paddle.v2.dataset.imikolov.train(word_dict, N), BATCH_SIZE) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - feeder = fluid.DataFeeder( - feed_list=[first_word, second_word, third_word, forth_word, next_word], - place=place) - - def train_loop(main_program): - exe.run(fluid.default_startup_program()) - - TRAINING_BATCHES = 5 - batch_num = 0 - for pass_id in range(PASS_NUM): - for data in train_reader(): - avg_cost_np = exe.run(main_program, - feed=feeder.feed(data), - fetch_list=[avg_cost]) - #if batch_num == TRAINING_BATCHES: - # return - if batch_num % 10 == 0: - print("pass_id=" + str(pass_id) + ", batch_id=" + str( - batch_num) + ", cost=" + str(avg_cost_np[0])) - if avg_cost_np[0] < 5.0: - return - if math.isnan(float(avg_cost_np[0])): - sys.exit("got NaN loss, training failed.") - batch_num += 1 - - raise AssertionError("Cost is too large {0:2.2}".format(avg_cost_np[0])) - - if is_local: - train_loop(fluid.default_main_program()) - else: - port = os.getenv("PADDLE_INIT_PORT", "6174") - pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... - eplist = [] - for ip in pserver_ips.split(","): - eplist.append(':'.join([ip, port])) - pserver_endpoints = ",".join(eplist) # ip:port,ip:port... - trainers = int(os.getenv("TRAINERS")) - current_endpoint = os.getenv("POD_IP") + ":" + port - trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) - training_role = os.getenv("TRAINING_ROLE", "TRAINER") - t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainer_num=trainers) - with open("program.proto", "w") as f: - f.write(str(fluid.default_main_program())) - if training_role == "PSERVER": - pserver_prog = t.get_pserver_program(current_endpoint) - with open("pserver.proto", "w") as f: - f.write(str(pserver_prog)) - pserver_startup = t.get_startup_program(current_endpoint, - pserver_prog) - with open("startup.proto", "w") as f: - f.write(str(pserver_startup)) - exe.run(pserver_startup) - exe.run(pserver_prog) - elif training_role == "TRAINER": - trainer_program = t.get_trainer_program() - with open("trainer.proto", "w") as f: - f.write(str(trainer_program)) - train_loop(trainer_program) - - -class W2VTest(unittest.TestCase): - def test_main(self): - train() - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py deleted file mode 100644 index 2581859a810ed..0000000000000 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import test_word2vec - -os.environ["GLOG_v"] = '3' -os.environ["GLOG_logtostderr"] = '1' - -os.environ["PADDLE_INIT_PSERVERS"] = "127.0.0.1" -os.environ["TRAINERS"] = "1" -os.environ["POD_IP"] = "127.0.0.1" -os.environ["PADDLE_INIT_TRAINER_ID"] = "0" - -os.environ["TRAINING_ROLE"] = "PSERVER" - -test_word2vec.train() diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh b/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh deleted file mode 100644 index ba3fceedf1066..0000000000000 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec_pserver.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash - -#export GLOG_v=3 -#export GLOG_logtostderr=1 - -source test_word2vec.env - -export TRAINING_ROLE=PSERVER -python test_word2vec.py diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py deleted file mode 100644 index 382de6e0a029c..0000000000000 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import test_word2vec - -os.environ["GLOG_v"] = '3' -os.environ["GLOG_logtostderr"] = '1' - -os.environ["PADDLE_INIT_PSERVERS"] = "127.0.0.1" -os.environ["TRAINERS"] = "1" -os.environ["POD_IP"] = "127.0.0.1" -os.environ["PADDLE_INIT_TRAINER_ID"] = "0" - -os.environ["TRAINING_ROLE"] = "TRAINER" - -test_word2vec.train() diff --git a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh b/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh deleted file mode 100644 index 30844eb88402d..0000000000000 --- a/python/paddle/fluid/tests/book/dist_test/test_word2vec_trainer.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash - -#export GLOG_v=3 -#export GLOG_logtostderr=1 - -source test_word2vec.env - -export TRAINING_ROLE=TRAINER -python test_word2vec.py From f467b1852045b3c4ec30d0b2e7c8d21d736184e2 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 09:46:01 +0800 Subject: [PATCH 17/26] revert the interface of distribute_transpiler.transpile --- python/paddle/fluid/distribute_transpiler.py | 21 ++++---------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 96713e3860fdf..a01346d42bb26 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -135,19 +135,6 @@ def split_dense_variable(var_list, return blocks -# TODO -# 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op -# 2. create inputs and outputs for prefetch_op -# 3. delete w of lookup_table, delete it's init_op in startup_program -# 4. create prefetch_block in pserver_program -# 5. create w in pserver_main_program -# 6. create init_op for w in pservev_init_program -# 7. optimize op did not need to change now -# 8. we only support one table parameter, if there are more than one -# lookup_table_op with the same table parameter, pserver will -# only have one lookup_table_op - - class DistributeTranspiler: def transpile(self, optimize_ops, @@ -155,7 +142,7 @@ def transpile(self, trainer_id, program=None, pservers="127.0.0.1:6174", - trainer_num=1, + trainers=1, split_method=splitter.round_robin): """ Transpile the program to distributed data-parallelism programs. @@ -192,8 +179,8 @@ def transpile(self, :type program: Program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string - :param trainer_num: total number of workers/trainers in the job - :type trainer_num: int + :param trainers: total number of workers/trainers in the job + :type trainers: int :param split_method: A function to determin how to split variables to different servers equally. :type split_method: function @@ -202,7 +189,7 @@ def transpile(self, if program is None: program = default_main_program() self.origin_program = program - self.trainer_num = trainer_num + self.trainer_num = trainers self.optimize_ops = optimize_ops # TODO(typhoonzero): currently trainer_id is fetched from cluster system # like Kubernetes, we should port this to use etcd later when developing From 4b8189fe301280e0017635bd175e773ef1487f51 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 10:55:43 +0800 Subject: [PATCH 18/26] fix prefetch_block --- python/paddle/fluid/distribute_transpiler.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index a01346d42bb26..09c167aca3aef 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -536,6 +536,7 @@ def __append_optimize_op__(op, block): # break # process distributed lookup_table + prefetch_block = None if self.has_distributed_lookup_table: pserver_index = self.pserver_endpoints.index(endpoint) @@ -626,6 +627,14 @@ def _clone_var(block, var, persistable=True): "padding_idx": -1 }) + # NOTE: if has_distributed_lookup_table is False, then prefetch_block will + # not be executed, so it's safe to use optimize_block to hold the place + if self.has_distributed_lookup_table: + assert prefetch_block is not None + else: + assert prefetch_block is None + prefetch_block = optimize_block + # step5 append the listen_and_serv op pserver_program.global_block().append_op( type="listen_and_serv", From d1c8f4ba2d9c543cbb464feb8248ea431dd3eca7 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 15:03:02 +0800 Subject: [PATCH 19/26] optimize trainspiler code --- python/paddle/fluid/distribute_transpiler.py | 386 ++++++++++--------- 1 file changed, 199 insertions(+), 187 deletions(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 09c167aca3aef..e80e47a84fc6e 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -303,106 +303,10 @@ def transpile(self, attrs={"axis": 0}) if self.has_distributed_lookup_table: - - # 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op - self.prefetch_input_vars = None - self.prefetch_output_vars = None - - while True: - should_break = True - all_ops = program.global_block().ops - for op in all_ops: - if op.type == LOOKUP_TABLE_TYPE: - op_index = list(all_ops).index(op) - ids_name = op.input("Ids") - out_name = op.output("Out") - - if self.prefetch_input_vars is None: - ids_var = program.global_block().vars[ids_name[0]] - self.prefetch_input_vars = self.create_splited_vars( - source_var=ids_var, - block=program.global_block(), - tag="_prefetch_in_") - if self.prefetch_output_vars is None: - out_var = program.global_block().vars[out_name[0]] - self.prefetch_output_vars = self.create_splited_vars( - source_var=out_var, - block=program.global_block(), - tag="_prefetch_out_") - - # insert split_ids_op - program.global_block().insert_op( - index=op_index, - type="split_ids", - inputs={ - 'Ids': [ - program.global_block().vars[varname] - for varname in ids_name - ] - }, - outputs={"Out": self.prefetch_input_vars}) - - # insert prefetch_op - program.global_block().insert_op( - index=op_index + 1, - type="prefetch", - inputs={'X': self.prefetch_input_vars}, - outputs={ - "Out": self.prefetch_output_vars, - "RPCClient": rpc_client_var - }, - attrs={"epmap": eplist}) - - # insert concat_op - program.global_block().insert_op( - index=op_index + 2, - type="concat", - inputs={'X': self.prefetch_output_vars}, - outputs={ - "Out": [ - program.global_block().vars[varname] - for varname in out_name - ] - }, - attrs={"axis": 0}) - - # delete lookup_table_op - program.global_block().delete_ops([op]) - program.sync_with_cpp() - - should_break = False - break - should_break = True - # if for loop did not break, it means that there is no more lookup_table_op - if should_break: - break - else: - continue - - # 2. add split_ids_op and send_vars_op to send gradient to pservers - # there should only be one table_name - all_ops = program.global_block().ops - table_grad_name = framework.grad_var_name(self.table_name) - for op in all_ops: - if table_grad_name in op.output_arg_names: - op_index = list(all_ops).index(op) - # insert split_ids_op - program.global_block().insert_op( - index=op_index + 1, - type="split_ids", - inputs={ - 'Ids': - [program.global_block().vars[table_grad_name]] - }, - outputs={"Out": self.table_grad_list}) - program.global_block().insert_op( - index=op_index + 2, - type="send_vars", - inputs={'X': self.table_grad_list}, - outputs={"RPCClient": rpc_client_var}, - attrs={"sync_send": True, - "epmap": pserver_endpoints}) - break + self._replace_lookup_table_op_with_prefetch(program, rpc_client_var, + eplist) + self._split_table_grad_and_add_send_vars(program, rpc_client_var, + pserver_endpoints) def get_trainer_program(self): # remove optimize ops and add a send op to main_program @@ -539,93 +443,10 @@ def __append_optimize_op__(op, block): prefetch_block = None if self.has_distributed_lookup_table: pserver_index = self.pserver_endpoints.index(endpoint) - - def _clone_var(block, var, persistable=True): - assert isinstance(var, Variable) - return block.create_var( - name=var.name, - shape=var.shape, - dtype=var.dtype, - type=var.type, - persistable=persistable) - - # STEP: create table optimize block - # create table param and grad var in pserver program - param_var = _clone_var( - pserver_program.global_block(), - self.origin_program.global_block().vars[self.table_name]) - grad_var = _clone_var( - pserver_program.global_block(), - self.origin_program.global_block().vars[framework.grad_var_name( - self.table_name)], - persistable=False) - - # create grad vars in pserver program - table_grad_var = self.table_param_grad[1] - table_grad_list = [ - pserver_program.global_block().create_var( - name="%s.trainer_%d.pserver_%d" % - (table_grad_var.name, index, pserver_index), - type=table_grad_var.type, - shape=table_grad_var.shape, - dtype=table_grad_var.dtype) - for index in range(self.trainer_num) - ] - - # create table optimize block in pserver program - table_opt_op = [ - op for op in self.optimize_ops - if op.input("Param")[0] == self.table_name - ][0] - table_opt_block = pserver_program.create_block(append_block.idx) - # only support sgd now - assert table_opt_op.type == "sgd" - - # append sum op for table_grad_list - table_opt_block.append_op( - type="sum", - inputs={"X": table_grad_list}, - outputs={"Out": [grad_var]}) - - lr_var = pserver_program.global_block().vars[table_opt_op.input( - "LearningRate")[0]] - inputs = { - "Param": [param_var], - "Grad": [grad_var], - "LearningRate": [lr_var] - } - outputs = {"ParamOut": [param_var]} - table_opt_block.append_op( - type=table_opt_op.type, - inputs=inputs, - outputs=outputs, - attrs=table_opt_op.attrs) - - # STEP: create prefetch block - table_var = pserver_program.global_block().vars[self.table_name] - prefetch_block = pserver_program.create_block(optimize_block.idx) - trainer_ids = self.prefetch_input_vars[pserver_index] - pserver_ids = pserver_program.global_block().create_var( - name=trainer_ids.name, - type=trainer_ids.type, - shape=trainer_ids.shape, - dtype=trainer_ids.dtype) - trainer_out = self.prefetch_output_vars[pserver_index] - pserver_out = pserver_program.global_block().create_var( - name=trainer_out.name, - type=trainer_out.type, - shape=trainer_out.shape, - dtype=trainer_out.dtype) - prefetch_block.append_op( - type=LOOKUP_TABLE_TYPE, - inputs={'Ids': pserver_ids, - "W": table_var}, - outputs={"Out": pserver_out}, - attrs={ - "is_sparse": True, # has no effect on lookup_table op - "is_distributed": True, - "padding_idx": -1 - }) + self._create_table_optimize_block(pserver_index, pserver_program, + append_block) + prefetch_block = self._create_prefetch_block( + pserver_index, pserver_program, optimize_block) # NOTE: if has_distributed_lookup_table is False, then prefetch_block will # not be executed, so it's safe to use optimize_block to hold the place @@ -704,6 +525,197 @@ def _get_splited_name_and_shape(varname): attrs=op.attrs) return s_prog + # transpiler function for dis lookup_table + def _replace_lookup_table_op_with_prefetch(self, program, rpc_client_var, + eplist): + # 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op + self.prefetch_input_vars = None + self.prefetch_output_vars = None + + continue_search_lookup_table_op = True + while continue_search_lookup_table_op: + continue_search_lookup_table_op = False + all_ops = program.global_block().ops + for op in all_ops: + if op.type == LOOKUP_TABLE_TYPE: + continue_search_lookup_table_op = True + + op_index = list(all_ops).index(op) + ids_name = op.input("Ids") + out_name = op.output("Out") + + if self.prefetch_input_vars is None: + ids_var = program.global_block().vars[ids_name[0]] + self.prefetch_input_vars = self.create_splited_vars( + source_var=ids_var, + block=program.global_block(), + tag="_prefetch_in_") + if self.prefetch_output_vars is None: + out_var = program.global_block().vars[out_name[0]] + self.prefetch_output_vars = self.create_splited_vars( + source_var=out_var, + block=program.global_block(), + tag="_prefetch_out_") + + # insert split_ids_op + program.global_block().insert_op( + index=op_index, + type="split_ids", + inputs={ + 'Ids': [ + program.global_block().vars[varname] + for varname in ids_name + ] + }, + outputs={"Out": self.prefetch_input_vars}) + + # insert prefetch_op + program.global_block().insert_op( + index=op_index + 1, + type="prefetch", + inputs={'X': self.prefetch_input_vars}, + outputs={ + "Out": self.prefetch_output_vars, + "RPCClient": rpc_client_var + }, + attrs={"epmap": eplist}) + + # insert concat_op + program.global_block().insert_op( + index=op_index + 2, + type="concat", + inputs={'X': self.prefetch_output_vars}, + outputs={ + "Out": [ + program.global_block().vars[varname] + for varname in out_name + ] + }, + attrs={"axis": 0}) + + # delete lookup_table_op + program.global_block().delete_ops([op]) + program.sync_with_cpp() + # break for loop + break + + def _split_table_grad_and_add_send_vars(self, program, rpc_client_var, + pserver_endpoints): + # 2. add split_ids_op and send_vars_op to send gradient to pservers + # there should only be one table_name + all_ops = program.global_block().ops + table_grad_name = framework.grad_var_name(self.table_name) + for op in all_ops: + if table_grad_name in op.output_arg_names: + op_index = list(all_ops).index(op) + # insert split_ids_op + program.global_block().insert_op( + index=op_index + 1, + type="split_ids", + inputs={ + 'Ids': [program.global_block().vars[table_grad_name]] + }, + outputs={"Out": self.table_grad_list}) + program.global_block().insert_op( + index=op_index + 2, + type="send_vars", + inputs={'X': self.table_grad_list}, + outputs={"RPCClient": rpc_client_var}, + attrs={"sync_send": True, + "epmap": pserver_endpoints}) + break + + def _create_prefetch_block(self, pserver_index, pserver_program, + optimize_block): + # STEP: create prefetch block + table_var = pserver_program.global_block().vars[self.table_name] + prefetch_block = pserver_program.create_block(optimize_block.idx) + trainer_ids = self.prefetch_input_vars[pserver_index] + pserver_ids = pserver_program.global_block().create_var( + name=trainer_ids.name, + type=trainer_ids.type, + shape=trainer_ids.shape, + dtype=trainer_ids.dtype) + trainer_out = self.prefetch_output_vars[pserver_index] + pserver_out = pserver_program.global_block().create_var( + name=trainer_out.name, + type=trainer_out.type, + shape=trainer_out.shape, + dtype=trainer_out.dtype) + prefetch_block.append_op( + type=LOOKUP_TABLE_TYPE, + inputs={'Ids': pserver_ids, + "W": table_var}, + outputs={"Out": pserver_out}, + attrs={ + "is_sparse": True, # has no effect on lookup_table op + "is_distributed": True, + "padding_idx": -1 + }) + return prefetch_block + + def _create_table_optimize_block(self, pserver_index, pserver_program, + append_block): + def _clone_var(block, var, persistable=True): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + persistable=persistable) + + # STEP: create table optimize block + # create table param and grad var in pserver program + param_var = _clone_var( + pserver_program.global_block(), + self.origin_program.global_block().vars[self.table_name]) + grad_var = _clone_var( + pserver_program.global_block(), + self.origin_program.global_block().vars[framework.grad_var_name( + self.table_name)], + persistable=False) + + # create grad vars in pserver program + table_grad_var = self.table_param_grad[1] + table_grad_list = [ + pserver_program.global_block().create_var( + name="%s.trainer_%d.pserver_%d" % + (table_grad_var.name, index, pserver_index), + type=table_grad_var.type, + shape=table_grad_var.shape, + dtype=table_grad_var.dtype) for index in range(self.trainer_num) + ] + + # create table optimize block in pserver program + table_opt_op = [ + op for op in self.optimize_ops + if op.input("Param")[0] == self.table_name + ][0] + table_opt_block = pserver_program.create_block(append_block.idx) + # only support sgd now + assert table_opt_op.type == "sgd" + + # append sum op for table_grad_list + table_opt_block.append_op( + type="sum", + inputs={"X": table_grad_list}, + outputs={"Out": [grad_var]}) + + lr_var = pserver_program.global_block().vars[table_opt_op.input( + "LearningRate")[0]] + inputs = { + "Param": [param_var], + "Grad": [grad_var], + "LearningRate": [lr_var] + } + outputs = {"ParamOut": [param_var]} + table_opt_block.append_op( + type=table_opt_op.type, + inputs=inputs, + outputs=outputs, + attrs=table_opt_op.attrs) + # ====================== private transpiler functions ===================== def _create_vars_from_blocklist(self, program, From dff691c8076f8364d2dbf777444106699ea01dc9 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 15:40:34 +0800 Subject: [PATCH 20/26] add comment to sum_op --- paddle/fluid/operators/sum_op.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/paddle/fluid/operators/sum_op.cc b/paddle/fluid/operators/sum_op.cc index 9324486003486..071278969fb53 100644 --- a/paddle/fluid/operators/sum_op.cc +++ b/paddle/fluid/operators/sum_op.cc @@ -38,6 +38,7 @@ class SumOp : public framework::OperatorWithKernel { } auto x_dims = ctx->GetInputsDim("X"); + // TODO(qiao) maybe the check need to add back in the future. // size_t N = x_dims.size(); // PADDLE_ENFORCE_GT(N, 1, "Input tensors count should > 1."); From bb27df1c5e39a987d8df66b94c120589824b1142 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 15:56:35 +0800 Subject: [PATCH 21/26] add warning log --- paddle/fluid/operators/concat_op.cc | 5 ++++- paddle/fluid/operators/sum_op.cc | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/operators/concat_op.cc b/paddle/fluid/operators/concat_op.cc index ec96ba656c4ec..36f92b18b619c 100644 --- a/paddle/fluid/operators/concat_op.cc +++ b/paddle/fluid/operators/concat_op.cc @@ -35,7 +35,10 @@ class ConcatOp : public framework::OperatorWithKernel { size_t axis = static_cast(ctx->Attrs().Get("axis")); const size_t n = ins.size(); - // PADDLE_ENFORCE_GT(n, 1, "Input tensors count should > 1."); + PADDLE_ENFORCE_GT(n, 0, "Input tensors count should > 1."); + if (n == 1) { + VLOG(3) << "Warning: concat op have only one input, may waste memory"; + } auto out_dims = ins[0]; size_t in_zero_dims_size = out_dims.size(); diff --git a/paddle/fluid/operators/sum_op.cc b/paddle/fluid/operators/sum_op.cc index 071278969fb53..108f26fafe7af 100644 --- a/paddle/fluid/operators/sum_op.cc +++ b/paddle/fluid/operators/sum_op.cc @@ -38,9 +38,11 @@ class SumOp : public framework::OperatorWithKernel { } auto x_dims = ctx->GetInputsDim("X"); - // TODO(qiao) maybe the check need to add back in the future. - // size_t N = x_dims.size(); - // PADDLE_ENFORCE_GT(N, 1, "Input tensors count should > 1."); + size_t N = x_dims.size(); + PADDLE_ENFORCE_GT(N, 0, "Input tensors count should > 0."); + if (N == 1) { + VLOG(3) << "Warning: sum have only one input, may waste memory"; + } framework::DDim in_dim({0}); for (auto& x_dim : x_dims) { From 356b9e6037fc7e6f2e8d3b08e1f9a597772df55d Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 15:58:54 +0800 Subject: [PATCH 22/26] fix comment --- paddle/fluid/operators/concat_op.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/operators/concat_op.cc b/paddle/fluid/operators/concat_op.cc index 36f92b18b619c..4a36b03cb63ac 100644 --- a/paddle/fluid/operators/concat_op.cc +++ b/paddle/fluid/operators/concat_op.cc @@ -35,7 +35,7 @@ class ConcatOp : public framework::OperatorWithKernel { size_t axis = static_cast(ctx->Attrs().Get("axis")); const size_t n = ins.size(); - PADDLE_ENFORCE_GT(n, 0, "Input tensors count should > 1."); + PADDLE_ENFORCE_GT(n, 0, "Input tensors count should > 0."); if (n == 1) { VLOG(3) << "Warning: concat op have only one input, may waste memory"; } From 2f4962d0ebf30c15bc0c1479797fa8d6e7abd97e Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 17:12:12 +0800 Subject: [PATCH 23/26] fix test_send_recv --- paddle/fluid/operators/send_recv_op_test.cc | 25 +++++++++++---------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index 542bc3fde2a36..c3b07920c38e0 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -14,7 +14,7 @@ limitations under the License. */ #include #include -#include +#include // NOLINT #include "gtest/gtest.h" #include "paddle/fluid/framework/op_registry.h" @@ -37,11 +37,11 @@ namespace m = paddle::operators::math; std::unique_ptr listen_and_serv_op; int selected_port; -void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) { +void InitTensorsInScope(const p::CPUPlace &place, f::Scope *scope) { p::CPUDeviceContext ctx(place); for (int i = 0; i < 2; ++i) { auto var_name = paddle::string::Sprintf("x%d", i); - auto var = scope.Var(var_name); + auto var = scope->Var(var_name); auto tensor = var->GetMutable(); tensor->Resize({10, 10}); float *expect = tensor->mutable_data(place); @@ -50,20 +50,20 @@ void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) { } } - auto out_var = scope.Var("Out"); + auto out_var = scope->Var("Out"); auto out_tensor = out_var->GetMutable(); out_tensor->Resize({10, 10}); out_tensor->mutable_data(place); // allocate } -void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) { +void InitSelectedRowsInScope(const p::CPUPlace &place, f::Scope *scope) { p::CPUDeviceContext ctx(place); int64_t height = 10; int64_t row_numel = 10; m::SetConstant set_one; // init x0 std::vector rows0{0, 4, 7}; - auto x0_var = scope.Var("x0"); + auto x0_var = scope->Var("x0"); auto x0 = x0_var->GetMutable(); x0->set_rows(rows0); x0->set_height(height); @@ -74,7 +74,7 @@ void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) { // init x1 std::vector rows1{2, 9}; - auto x1_var = scope.Var("x1"); + auto x1_var = scope->Var("x1"); auto x1 = x1_var->GetMutable(); x1->set_rows(rows1); x1->set_height(height); @@ -83,7 +83,7 @@ void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) { f::make_ddim({static_cast(rows1.size()), row_numel}), place); set_one(ctx, x1_value, 1.0); - auto out_var = scope.Var("Out"); + auto out_var = scope->Var("Out"); auto out = out_var->GetMutable(); auto out_value = out->mutable_value(); out->set_height(height); @@ -117,9 +117,9 @@ void StartServerNet(bool is_sparse) { f::Scope scope; p::CPUPlace place; if (is_sparse) { - InitSelectedRowsInScope(scope, place); + InitSelectedRowsInScope(place, &scope); } else { - InitTensorsInScope(scope, place); + InitTensorsInScope(place, &scope); } // sub program run in listen_and_serv_op, for simple test we use sum @@ -135,6 +135,7 @@ void StartServerNet(bool is_sparse) { attrs.insert({"ParamList", std::vector({"Out"})}); attrs.insert({"GradList", std::vector({"x1"})}); attrs.insert({"OptimizeBlock", optimize_block}); + attrs.insert({"PrefetchBlock", optimize_block}); listen_and_serv_op = f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs); LOG(INFO) << "selected port before run " << selected_port; @@ -148,7 +149,7 @@ TEST(SendRecvOp, CPUDense) { // local net f::Scope scope; p::CPUPlace place; - InitTensorsInScope(scope, place); + InitTensorsInScope(place, &scope); // create rpc client var scope.Var("RPC_CLIENT_VAR"); @@ -191,7 +192,7 @@ TEST(SendRecvOp, CPUSparse) { f::Scope scope; p::CPUPlace place; p::CPUDeviceContext ctx(place); - InitSelectedRowsInScope(scope, place); + InitSelectedRowsInScope(place, &scope); scope.Var("RPC_CLIENT_VAR"); f::AttributeMap attrs; selected_port = static_cast( From 063a956ff329c79a62ccaf5c57275daf559773f1 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 18:49:14 +0800 Subject: [PATCH 24/26] fix test_send_recv --- paddle/fluid/operators/send_recv_op_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index c3b07920c38e0..3bf5d57809019 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -126,6 +126,7 @@ void StartServerNet(bool is_sparse) { f::ProgramDesc program; const auto &root_block = program.Block(0); auto *optimize_block = program.AppendBlock(root_block); + auto *prefetch_block = program.AppendBlock(root_block); // X for server side tensors, RX for received tensers, must be of same shape. AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block); @@ -135,7 +136,7 @@ void StartServerNet(bool is_sparse) { attrs.insert({"ParamList", std::vector({"Out"})}); attrs.insert({"GradList", std::vector({"x1"})}); attrs.insert({"OptimizeBlock", optimize_block}); - attrs.insert({"PrefetchBlock", optimize_block}); + attrs.insert({"PrefetchBlock", prefetch_block}); listen_and_serv_op = f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs); LOG(INFO) << "selected port before run " << selected_port; From 8eea574cf33f65fd7ee6e813faefdb58d0d7381b Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 11 Apr 2018 23:33:48 +0800 Subject: [PATCH 25/26] fix train with no distributed table --- python/paddle/fluid/distribute_transpiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index 475d71fb48331..b0522b49f44d8 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -455,7 +455,7 @@ def __append_optimize_op__(op, block): assert prefetch_block is not None else: assert prefetch_block is None - prefetch_block = optimize_block + prefetch_block = pserver_program.global_block() # step5 append the listen_and_serv op pserver_program.global_block().append_op( From 4554e7b80a9bad6df4561e7b5d96d4bdc52c5a20 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Thu, 12 Apr 2018 13:00:59 +0800 Subject: [PATCH 26/26] optimize GetDims --- paddle/fluid/framework/operator.cc | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc index 7ad9c2a64edc3..f97bd0827428f 100644 --- a/paddle/fluid/framework/operator.cc +++ b/paddle/fluid/framework/operator.cc @@ -46,7 +46,8 @@ proto::VarType::Type GetDataTypeOfVar(const Variable* var) { } } -static DDim GetDims(const Scope& scope, const std::string& name) { +static DDim GetDims(const Scope& scope, const std::string& name, + bool get_actual_dim = false) { Variable* var = scope.FindVar(name); if (var == nullptr) { return DDim({-1}); @@ -55,7 +56,11 @@ static DDim GetDims(const Scope& scope, const std::string& name) { if (var->IsType()) { return var->Get().dims(); } else if (var->IsType()) { - return var->Get().value().dims(); + if (get_actual_dim) { + return var->Get().value().dims(); + } else { + return var->Get().GetCompleteDims(); + } } else { return DDim({-1}); } @@ -129,7 +134,7 @@ std::string OperatorBase::DebugStringEx(const Scope* scope) const { for (size_t i = 0; i < input.second.size(); ++i) { ss << input.second[i]; if (scope) { - ss << "[" << GetDims(*scope, input.second[i]) << "]"; + ss << "[" << GetDims(*scope, input.second[i], true) << "]"; ss << "(" << GetLoD(*scope, input.second[i]) << ")"; } if (i != input.second.size() - 1) { @@ -149,7 +154,7 @@ std::string OperatorBase::DebugStringEx(const Scope* scope) const { for (size_t i = 0; i < output.second.size(); ++i) { ss << output.second[i]; if (scope) { - ss << "[" << GetDims(*scope, output.second[i]) << "]"; + ss << "[" << GetDims(*scope, output.second[i], true) << "]"; ss << "(" << GetLoD(*scope, output.second[i]) << ")"; } if (i != output.second.size() - 1) {