Skip to content

Commit

Permalink
merge upstream and resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
chenfeiyu committed May 18, 2022
2 parents 761be90 + d368637 commit 6bf8db3
Show file tree
Hide file tree
Showing 490 changed files with 18,739 additions and 3,765 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ option(WITH_CUSTOM_DEVICE "Compile with custom device support" OFF)
option(WITH_ARM_BRPC "Supprot Brpc in Arm" OFF)

if(WITH_RECORD_BUILDTIME)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh")
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh")
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh ${CMAKE_CURRENT_BINARY_DIR}")
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh ${CMAKE_CURRENT_BINARY_DIR}")
else()
include(ccache) # set ccache for compilation ; if WITH_RECORD_BUILDTIME=ON can't use ccache
endif()
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ PaddlePaddle is originated from industrial practices with dedication and commitm

## Installation

### Latest PaddlePaddle Release: [v2.2](https://github.com/PaddlePaddle/Paddle/tree/release/2.2)
### Latest PaddlePaddle Release: [v2.3](https://github.com/PaddlePaddle/Paddle/tree/release/2.3)

Our vision is to enable deep learning for everyone via PaddlePaddle.
Please refer to our [release announcement](https://github.com/PaddlePaddle/Paddle/releases) to track the latest features of PaddlePaddle.
Expand Down
5 changes: 3 additions & 2 deletions cmake/external/ascend.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ endif()
if (WITH_ASCEND_CL)
macro(find_ascend_toolkit_version ascend_toolkit_version_info)
file(READ ${ascend_toolkit_version_info} ASCEND_TOOLKIT_VERSION_CONTENTS)
string(REGEX MATCH "version=([0-9]+\.[0-9]+\.(RC)?[0-9]+\.[a-z]*[0-9]*)" ASCEND_TOOLKIT_VERSION "${ASCEND_TOOLKIT_VERSION_CONTENTS}")
string(REGEX REPLACE "version=([0-9]+\.[0-9]+\.(RC)?[0-9]+\.[a-z]*[0-9]*)" "\\1" ASCEND_TOOLKIT_VERSION "${ASCEND_TOOLKIT_VERSION}")
string(REGEX MATCH "version=([0-9]+\.[0-9]+\.(RC)?[0-9][.a-z0-9]*)" ASCEND_TOOLKIT_VERSION "${ASCEND_TOOLKIT_VERSION_CONTENTS}")
string(REGEX REPLACE "version=([0-9]+\.[0-9]+\.(RC)?[0-9][.a-z0-9]*)" "\\1" ASCEND_TOOLKIT_VERSION "${ASCEND_TOOLKIT_VERSION}")
string(REGEX REPLACE "[A-Z]|[a-z|\.]" "" CANN_VERSION ${ASCEND_TOOLKIT_VERSION})
STRING(SUBSTRING "${CANN_VERSION}000" 0 6 CANN_VERSION)
add_definitions("-DCANN_VERSION_CODE=${CANN_VERSION}")
if(NOT ASCEND_TOOLKIT_VERSION)
set(ASCEND_TOOLKIT_VERSION "???")
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/mkldnn.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ SET(MKLDNN_PREFIX_DIR ${THIRD_PARTY_PATH}/mkldnn)
SET(MKLDNN_INSTALL_DIR ${THIRD_PARTY_PATH}/install/mkldnn)
SET(MKLDNN_INC_DIR "${MKLDNN_INSTALL_DIR}/include" CACHE PATH "mkldnn include directory." FORCE)
SET(MKLDNN_REPOSITORY ${GIT_URL}/oneapi-src/oneDNN.git)
SET(MKLDNN_TAG 9a35435c18722ff17a48fb60bceac42bfdf78754)
SET(MKLDNN_TAG 9b186765dded79066e0cd9c17eb70b680b76fb8e)


# Introduce variables:
Expand Down
4 changes: 2 additions & 2 deletions cmake/external/xpu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ SET(XPU_RT_LIB_NAME "libxpurt.so")

if(NOT DEFINED XPU_BASE_URL)
SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220425")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220511")
else()
SET(XPU_BASE_URL "${XPU_BASE_URL}")
endif()

# ubuntu and centos: use output by XDNN API team
if(NOT DEFINED XPU_XDNN_BASE_URL)
SET(XPU_XDNN_BASE_URL_WITHOUT_DATE "https://klx-sdk-release-public.su.bcebos.com/xdnn/dev")
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL_WITHOUT_DATE}/20220425")
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL_WITHOUT_DATE}/20220511")
else()
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL}")
endif()
Expand Down
6 changes: 6 additions & 0 deletions cmake/operators.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ function(op_library TARGET)
elseif (WITH_XPU_KP AND ${xpu_kp_cc_srcs_len} GREATER 0)
xpu_library(${TARGET} SRCS ${cc_srcs} ${mkldnn_cc_srcs} ${xpu_cc_srcs} ${xpu_kp_cc_srcs} DEPS ${op_library_DEPS} ${op_common_deps})
else()
# deal with CANN version control while registering NPU operators before build
if (WITH_ASCEND_CL)
if (CANN_VERSION LESS 504000)
list(REMOVE_ITEM npu_cc_srcs "multinomial_op_npu.cc")
endif()
endif()
# Unity Build relies on global option `WITH_UNITY_BUILD` and local option `UNITY`.
if(WITH_UNITY_BUILD AND op_library_UNITY)
# Combine the cc source files.
Expand Down
4 changes: 4 additions & 0 deletions cmake/xpu_kp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ if(NOT WITH_XPU_KP)
return()
endif()

set(LINK_FLAGS "-Wl,--allow-multiple-definition")
set(CMAKE_EXE_LINKER_FLAGS "${LINK_FLAGS}")
set(CMAKE_SHARED_LINKER_FLAGS "${LINK_FLAGS}")

if(NOT XPU_TOOLCHAIN)
set(XPU_TOOLCHAIN /workspace/output/XTDK-ubuntu_x86_64)
get_filename_component(XPU_TOOLCHAIN ${XPU_TOOLCHAIN} REALPATH)
Expand Down
59 changes: 55 additions & 4 deletions paddle/fluid/distributed/collective/reducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,9 @@ void EagerReducer::AllReduceSparse(EagerGroup *group,

dev_ctx->Wait();

Tensor src_value_tensor(std::make_shared<phi::DenseTensor>(src->value()));
std::vector<int64_t> dst_shape = src_value_tensor.shape();

if (std::all_of(cpu_rows_num_ptr, cpu_rows_num_ptr + size_,
[&](int64_t row) { return row == cpu_rows_num_ptr[0]; })) {
// During sparse communication, the number of each card is same.
Expand Down Expand Up @@ -940,8 +943,6 @@ void EagerReducer::AllReduceSparse(EagerGroup *group,
&dst_rows_vector);
dev_ctx->Wait();

Tensor src_value_tensor(std::make_shared<phi::DenseTensor>(src->value()));
std::vector<int64_t> dst_shape = src_value_tensor.shape();
dst_shape[dst_shape.size() - 2] = rows_num;
auto dst_dense_tensor = std::dynamic_pointer_cast<phi::DenseTensor>(
paddle::experimental::full(IntArray(dst_shape), 0,
Expand Down Expand Up @@ -971,8 +972,58 @@ void EagerReducer::AllReduceSparse(EagerGroup *group,
*(src->mutable_value()) =
*(std::dynamic_pointer_cast<phi::DenseTensor>(dst_value_tensor.impl()));
} else {
PADDLE_THROW(
platform::errors::Unimplemented("This case is not supported."));
std::vector<Tensor> rows_tensors;
std::vector<Tensor> values_tensors;

for (int i = 0; i < size_; ++i) {
std::vector<int64_t> value_tensor_shape = {
cpu_rows_num_ptr[i], dst_shape[dst_shape.size() - 1]};
Tensor rows_tensor = paddle::experimental::full(
IntArray({static_cast<int64_t>(cpu_rows_num_ptr[i])}), 0,
DataType::INT64, inner_place_);
Tensor values_tensor = paddle::experimental::full(
IntArray(value_tensor_shape), 0, src->value().dtype(), inner_place_);
std::vector<phi::DenseTensor> rows_dense_vector;
std::vector<phi::DenseTensor> values_dense_vector;

if (i == rank_) {
auto *rows_dense_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(rows_tensor.impl())
.get();
framework::TensorFromVector<int64_t>(src_rows, *dev_ctx,
rows_dense_tensor);
values_tensor.set_impl(
std::make_shared<phi::DenseTensor>(src->value()));
}
rows_dense_vector.push_back(
*std::dynamic_pointer_cast<phi::DenseTensor>(rows_tensor.impl()));
values_dense_vector.push_back(
*std::dynamic_pointer_cast<phi::DenseTensor>(values_tensor.impl()));

auto b_opts = BroadcastOptions();
b_opts.source_rank = i;
process_group_->Broadcast(rows_dense_vector, rows_dense_vector, b_opts);
process_group_
->Broadcast(values_dense_vector, values_dense_vector, b_opts)
->Wait();
rows_tensors.push_back(rows_tensor);
values_tensors.push_back(values_tensor);
}

Tensor dst_rows_tensor =
paddle::experimental::concat(rows_tensors, phi::Scalar(0));
framework::Vector<int64_t> dst_rows_vector(rows_num, 0);
auto *dst_rows_dense_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(dst_rows_tensor.impl())
.get();
framework::TensorToVector<int64_t>(*dst_rows_dense_tensor, *dev_ctx,
&dst_rows_vector);
src->set_rows(dst_rows_vector);

Tensor dst_values_tensor =
paddle::experimental::concat(values_tensors, phi::Scalar(0));
*(src->mutable_value()) = *(
std::dynamic_pointer_cast<phi::DenseTensor>(dst_values_tensor.impl()));
}
}

Expand Down
21 changes: 9 additions & 12 deletions paddle/fluid/distributed/ps/service/heter_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,16 @@ class HeterClient {
// switch client singleton
static std::shared_ptr<HeterClient> GetSwitchInstance(
const std::vector<std::string>& peer_endpoints, int32_t peer_role) {
std::unique_lock<std::mutex> lock(mtx_);
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
if (switch_s_instance_ == nullptr) {
std::unique_lock<std::mutex> lock(mtx_);
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
if (switch_s_instance_ == nullptr) {
switch_s_instance_.reset(new HeterClient());
switch_s_instance_->SetPeerSwitchList(peer_endpoints);
switch_s_instance_->InitClientChannels(false, peer_endpoints,
peer_role);
}
switch_s_instance_.reset(new HeterClient());
switch_s_instance_->SetPeerSwitchList(peer_endpoints);
switch_s_instance_->InitClientChannels(false, peer_endpoints, peer_role);
}
return switch_s_instance_;
}
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/distributed/ps/service/heter_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,21 @@ int SendAndRecvVariableHandler::SaveInSwitchWithShard(
brpc::Controller* cntl) {
VLOG(4) << "entering SaveInSwitchWithShard";
int32_t group_id = request->group_id();
if (group_id >= FLAGS_heter_world_size) {
LOG(ERROR) << "group id exceed maxmium";
}
auto& local_shard = _local_shards[group_id];
auto& request_io_buffer = cntl->request_attachment();
butil::IOBufBytesIterator io_buffer_itr(request_io_buffer);
for (int idx = 0; idx < request->send_var_names_size(); idx++) {
const auto& var_name = request->send_var_names(idx);
const auto& var_size = request->vars_len(idx);
WaitForVarsConsumed(group_id, var_name);
std::unique_lock<std::mutex> lk(scope_mutex_);
auto& value = local_shard[var_name];
value.resize(var_size);
io_buffer_itr.copy_and_forward(reinterpret_cast<void*>(value.data()),
var_size);
std::unique_lock<std::mutex> lk(scope_mutex_);
vars_ready_flag[group_id][var_name] = 1;
VLOG(4) << "saved var_name: " << var_name << "is saved ready!";
}
Expand All @@ -162,11 +165,11 @@ int SendAndRecvVariableHandler::QueryInSwitchWithShard(
VLOG(4) << "req var name: " << req_var_name;
response->add_send_var_names(req_var_name);
WaitForVarsProduced(group_id, req_var_name);
std::unique_lock<std::mutex> lk(scope_mutex_);
auto itr = local_shard.find(req_var_name);
auto& value = itr.value();
response_io_buffer.append(value.data(), value.size());
value.resize(0); // 清空内存
std::unique_lock<std::mutex> lk(scope_mutex_);
vars_ready_flag[group_id][req_var_name] = 0;
VLOG(4) << "query var_name: " << req_var_name << "is consumed ready!";
}
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/ps/service/ps_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class PSClient {
size_t table_id) = 0; // 保留

// firstly push dense param for parameter server
// this is neccessary because dense weight initialized in trainer on cold
// this is necessary because dense weight initialized in trainer on cold
// start
virtual std::future<int32_t> PushDenseParam(const Region *regions,
size_t region_num,
Expand Down
52 changes: 31 additions & 21 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
paddle::framework::GpuPsCommGraph res;
unsigned int tot_len = 0;
int64_t tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) {
tot_len += edge_array[i].size();
}
// res.neighbor_size = tot_len;
// res.node_size = ids.size();
// res.neighbor_list = new int64_t[tot_len];
// res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()];
res.init_on_cpu(tot_len, (unsigned int)ids.size());
unsigned int offset = 0, ind = 0;
res.init_on_cpu(tot_len, ids.size());
int64_t offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_array[i].size(); j++) {
res.node_list[ind] = node_array[i][j];
Expand Down Expand Up @@ -126,15 +126,27 @@ int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id,
_db->put(src_id % shard_num % task_pool_size_, ch,
sizeof(int) * 2 + sizeof(int64_t), (char *)data, len);
}
_db->flush(src_id % shard_num % task_pool_size_);
std::string x;
// _db->flush(src_id % shard_num % task_pool_size_);
// std::string x;
// if (_db->get(src_id % shard_num % task_pool_size_, ch, sizeof(int64_t) +
// 2 * sizeof(int), x) ==0){
// VLOG(0)<<"put result";
// for(int i = 0;i < x.size();i+=8){
// VLOG(0)<<"get an id "<<*((int64_t *)(x.c_str() + i));
// }
//}
// if(src_id == 429){
// str = "";
// _db->get(src_id % shard_num % task_pool_size_, ch,
// sizeof(int) * 2 + sizeof(int64_t), str);
// int64_t *stored_data = ((int64_t *)str.c_str());
// int n = str.size() / sizeof(int64_t);
// VLOG(0)<<"429 has "<<n<<"neighbors";
// for(int i =0;i< n;i++){
// VLOG(0)<<"get an id "<<*((int64_t *)(str.c_str() +
// i*sizeof(int64_t)));
// }
// }
}
return 0;
}
Expand All @@ -146,6 +158,7 @@ char *GraphTable::random_sample_neighbor_from_ssd(
return NULL;
}
std::string str;
VLOG(2) << "sample ssd for key " << id;
char ch[sizeof(int) * 2 + sizeof(int64_t)];
memset(ch, 0, sizeof(int));
memcpy(ch + sizeof(int), &idx, sizeof(int));
Expand Down Expand Up @@ -178,6 +191,9 @@ char *GraphTable::random_sample_neighbor_from_ssd(
memcpy(buff + i * Node::id_size, &data[pos], Node::id_size);
// res.push_back(data[pos]);
}
for (int i = 0; i < actual_size; i += 8) {
VLOG(2) << "sampled an neighbor " << *(int64_t *)&buff[i];
}
return buff;
}
actual_size = 0;
Expand Down Expand Up @@ -376,7 +392,7 @@ int32_t GraphTable::load_edges_to_ssd(const std::string &path,
}

int32_t GraphTable::dump_edges_to_ssd(int idx) {
VLOG(0) << "calling dump edges to ssd";
VLOG(2) << "calling dump edges to ssd";
const int64_t fixed_size = 10000;
// std::vector<int64_t> edge_array[task_pool_size_];
std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_);
Expand All @@ -387,9 +403,9 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) {
[&, i, this]() -> int64_t {
int64_t cost = 0;
std::vector<Node *> &v = shards[i]->get_bucket();
std::vector<int64_t> s;
size_t ind = i % this->task_pool_size_;
for (size_t j = 0; j < v.size(); j++) {
std::vector<int64_t> s;
for (int k = 0; k < v[j]->get_neighbor_size(); k++) {
s.push_back(v[j]->get_neighbor_id(k));
}
Expand All @@ -405,7 +421,7 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) {
}
int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
VLOG(0) << "make_complementary_graph";
const int64_t fixed_size = 10000;
const int64_t fixed_size = byte_size / 8;
// std::vector<int64_t> edge_array[task_pool_size_];
std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_);
std::vector<std::future<int>> tasks;
Expand All @@ -416,27 +432,20 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
std::vector<Node *> &v = shards[i]->get_bucket();
size_t ind = i % this->task_pool_size_;
for (size_t j = 0; j < v.size(); j++) {
size_t location = v[j]->get_id();
// size_t location = v[j]->get_id();
for (int k = 0; k < v[j]->get_neighbor_size(); k++) {
count[ind][v[j]->get_neighbor_id(k)]++;
}
}
return 0;
}));
}

for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
std::unordered_map<int64_t, int> final_count;
std::map<int, std::vector<int64_t>> count_to_id;
std::vector<int64_t> buffer;
for (auto p : edge_shards[idx]) {
delete p;
}
clear_graph(idx);

edge_shards[idx].clear();
for (size_t i = 0; i < shard_num_per_server; i++) {
edge_shards[idx].push_back(new GraphShard());
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
for (int i = 0; i < task_pool_size_; i++) {
for (auto &p : count[i]) {
final_count[p.first] = final_count[p.first] + p.second;
Expand All @@ -447,13 +456,13 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
count_to_id[p.second].push_back(p.first);
VLOG(2) << p.first << " appear " << p.second << " times";
}
// std::map<int,std::vector<int64_t>>::iterator iter= count_to_id.rbegin();
auto iter = count_to_id.rbegin();
while (iter != count_to_id.rend() && byte_size > 0) {
for (auto x : iter->second) {
buffer.push_back(x);
if (buffer.size() >= fixed_size) {
int64_t res = load_graph_to_memory_from_ssd(idx, buffer);
buffer.clear();
byte_size -= res;
}
if (byte_size <= 0) break;
Expand Down Expand Up @@ -1265,13 +1274,14 @@ int32_t GraphTable::random_sample_neighbors(
if (node == nullptr) {
#ifdef PADDLE_WITH_HETERPS
if (search_level == 2) {
VLOG(2) << "enter sample from ssd";
VLOG(2) << "enter sample from ssd for node_id " << node_id;
char *buffer_addr = random_sample_neighbor_from_ssd(
idx, node_id, sample_size, rng, actual_size);
if (actual_size != 0) {
std::shared_ptr<char> &buffer = buffers[idx];
std::shared_ptr<char> &buffer = buffers[idy];
buffer.reset(buffer_addr, char_del);
}
VLOG(2) << "actual sampled size from ssd = " << actual_sizes[idy];
continue;
}
#endif
Expand Down
Loading

1 comment on commit 6bf8db3

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.