Skip to content

Commit

Permalink
GCS client test failure flakiness (#34656)
Browse files Browse the repository at this point in the history
Why are these changes needed?

Right now the theory is as follow.

pubsub io service is created and run inside the GcsServer. That means if pubsub io service is accessed after GCSServer GC'ed, it will segfault.
Right now, upon teardown, when we call rpc::DrainAndResetExecutor, this will recreate the Executor thread pool.
Upon teardown, If DrainAndResetExecutor -> GcsServer's internal pubsub posts new SendReply to the newly created threadpool -> GcsServer.reset -> pubsub io service GC'ed -> SendReply invoked from the newly created thread pool, it will segfault.
NOTE: the segfault is from pubsub service if you see the failure

#2 0x7f92034d9129 in ray::rpc::ServerCallImpl<ray::rpc::InternalPubSubGcsServiceHandler, ray::rpc::GcsSubscriberPollRequest, ray::rpc::GcsSubscriberPollReply>::HandleRequestImpl()::'lambda'(ray::Status, std::__1::function<void ()>, std::__1::function<void ()>)::operator()(ray::Status, std::__1::function<void ()>, std::__1::function<void ()>) const::'lambda'()::operator()() const /proc/self/cwd/bazel-out/k8-opt/bin/_virtual_includes/grpc_common_lib/ray/rpc/server_call.h:212:48
As a fix, I only drain the thread pool. And then reset it after all operations are fully cleaned up (only from tests). I think there's no need to reset for regular proc termination like raylet, gcs, core workers.

Related issue number

Closes #34344

Signed-off-by: SangBin Cho <[email protected]>
  • Loading branch information
rkooo567 authored Apr 22, 2023
1 parent 6c35629 commit 26a9201
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ void CoreWorker::Exit(
exit_type,
detail = std::move(detail),
creation_task_exception_pb_bytes]() {
rpc::DrainAndResetServerCallExecutor();
rpc::DrainServerCallExecutor();
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
KillChildProcs();
Shutdown();
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,14 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
gcs_client_.reset();

server_io_service_->stop();
rpc::DrainAndResetServerCallExecutor();
rpc::DrainServerCallExecutor();
server_io_service_thread_->join();
gcs_server_->Stop();
gcs_server_.reset();
if (!no_redis_) {
TestSetupUtil::FlushAllRedisServers();
}
rpc::ResetServerCallExecutor();
}

void RestartGcsServer() {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ int main(int argc, char *argv[]) {
int signal_number) {
RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down...";
main_service.stop();
ray::rpc::DrainAndResetServerCallExecutor();
ray::rpc::DrainServerCallExecutor();
gcs_server.Stop();
ray::stats::Shutdown();
};
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ class GcsServerTest : public ::testing::Test {

void TearDown() override {
io_service_.stop();
rpc::DrainAndResetServerCallExecutor();
rpc::DrainServerCallExecutor();
gcs_server_->Stop();
thread_io_service_->join();
gcs_server_.reset();
ray::gcs::RedisCallbackManager::instance().Clear();
rpc::ResetServerCallExecutor();
}

bool AddJob(const rpc::AddJobRequest &request) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,7 @@ void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request,
return;
}
auto shutdown_after_reply = []() {
rpc::DrainAndResetServerCallExecutor();
rpc::DrainServerCallExecutor();
// Note that the callback is posted to the io service after the shutdown GRPC request
// is replied. Otherwise, the RPC might not be replied to GCS before it shutsdown
// itself. Implementation note: When raylet is shutdown by ray stop, the CLI sends a
Expand Down
5 changes: 3 additions & 2 deletions src/ray/rpc/server_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ std::unique_ptr<boost::asio::thread_pool> &_GetServerCallExecutor() {

boost::asio::thread_pool &GetServerCallExecutor() { return *_GetServerCallExecutor(); }

void DrainAndResetServerCallExecutor() {
GetServerCallExecutor().join();
void DrainServerCallExecutor() { GetServerCallExecutor().join(); }

void ResetServerCallExecutor() {
_GetServerCallExecutor() = std::make_unique<boost::asio::thread_pool>(
::RayConfig::instance().num_server_call_thread());
}
Expand Down
11 changes: 8 additions & 3 deletions src/ray/rpc/server_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ namespace rpc {
/// This pool is shared across gRPC servers.
boost::asio::thread_pool &GetServerCallExecutor();

/// For testing
/// Drain the executor and reset it.
void DrainAndResetServerCallExecutor();
/// Drain the executor.
void DrainServerCallExecutor();

/// Reset the server call executor.
/// Testing only. After you drain the executor
/// you need to regenerate the executor
/// because they are global.
void ResetServerCallExecutor();

/// Represents the callback function to be called when a `ServiceHandler` finishes
/// handling a request.
Expand Down

0 comments on commit 26a9201

Please sign in to comment.