Skip to content

Commit

Permalink
Merge pull request PaddlePaddle#9523 from jacquesqiao/fix-test_send_recv
Browse files Browse the repository at this point in the history
fix send_recv_op_test
  • Loading branch information
jacquesqiao authored Mar 30, 2018
2 parents 527e658 + 9583dc3 commit 63cd5fb
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
10 changes: 5 additions & 5 deletions paddle/fluid/operators/detail/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ void AsyncGRPCServer::ShutdownQueue() {
std::unique_lock<std::mutex> lock(cq_mutex_);
cq_send_->Shutdown();
cq_get_->Shutdown();
is_shut_down_ = true;
}

// This URL explains why shutdown is complicate:
void AsyncGRPCServer::ShutDown() {
server_->Shutdown();
is_shut_down_ = true;
ShutdownQueue();
server_->Shutdown();
}

void AsyncGRPCServer::TryToRegisterNewSendOne() {
Expand Down Expand Up @@ -213,14 +213,14 @@ void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq,
bool ok = false;
while (true) {
if (!cq->Next(&tag, &ok)) {
LOG(INFO) << cq_name << " get CompletionQueue shutdown!";
LOG(INFO) << cq_name << " CompletionQueue shutdown!";
break;
}

PADDLE_ENFORCE(tag);
// FIXME(typhoonzero): de-couple the barriers with recv_op
if (cq_name == "cq_get") WaitCond(1);
if (cq_name == "cq_send") WaitCond(0);
if (!is_shut_down_ && cq_name == "cq_get") WaitCond(1);
if (!is_shut_down_ && cq_name == "cq_send") WaitCond(0);

RequestBase* base = (RequestBase*)tag;
// reference:
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/operators/listen_and_serv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class ListenAndServOp : public framework::OperatorBase {

void Stop() override {
rpc_service_->Push(LISTEN_TERMINATE_MESSAGE);
rpc_service_->ShutDown();
server_thread_->join();
}

Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/operators/send_recv_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ void StartServerNet(bool is_sparse) {

// sub program run in listen_and_serv_op, for simple test we use sum
f::ProgramDesc program;
f::BlockDesc *optimize_block = program.MutableBlock(0);
const auto &root_block = program.Block(0);
auto *optimize_block = program.AppendBlock(root_block);
// X for server side tensors, RX for received tensers, must be of same shape.
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);

Expand Down

0 comments on commit 63cd5fb

Please sign in to comment.