From 49efcab4fe3cc21d1acc0e8c780cc887645e7420 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 2 Jun 2022 07:12:33 +0900 Subject: [PATCH] Revert "[core] Remove gcs addr updater in core worker. (#24747)" (#25375) Turns out https://github.com/ray-project/ray/pull/25342 wasn't the root cause of the ray shutdown flakiness. I realized there's another PR that could affect this test suite. Let's try reverting it and see if things get better. --- BUILD.bazel | 1 - python/ray/cluster_utils.py | 10 +- python/ray/node.py | 12 +-- python/ray/tests/test_scheduling_2.py | 10 +- src/mock/ray/raylet_client/raylet_client.h | 4 + src/ray/core_worker/core_worker.cc | 25 +++-- src/ray/core_worker/core_worker.h | 7 ++ .../core_worker/gcs_server_address_updater.cc | 96 +++++++++++++++++++ .../core_worker/gcs_server_address_updater.h | 52 ++++++++++ .../gcs_server/test/gcs_server_test_util.h | 4 + src/ray/protobuf/node_manager.proto | 10 ++ src/ray/raylet/node_manager.cc | 10 ++ src/ray/raylet/node_manager.h | 5 + src/ray/raylet_client/raylet_client.cc | 6 ++ src/ray/raylet_client/raylet_client.h | 6 ++ .../rpc/node_manager/node_manager_client.h | 6 ++ .../rpc/node_manager/node_manager_server.h | 5 +- src/ray/util/process.cc | 2 +- src/ray/util/util_test.cc | 21 ---- 19 files changed, 236 insertions(+), 56 deletions(-) create mode 100644 src/ray/core_worker/gcs_server_address_updater.cc create mode 100644 src/ray/core_worker/gcs_server_address_updater.h diff --git a/BUILD.bazel b/BUILD.bazel index c1440c1f35b7..3fea5aba53f0 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1459,7 +1459,6 @@ cc_test( deps = [ ":ray_util", "@boost//:asio", - "@boost//:process", "@com_google_googletest//:gtest_main", ], ) diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index 3cc3ffbde0cc..168aea34ab87 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -246,19 +246,13 @@ def remove_node(self, node, allow_graceful=True): ) if self.head_node == node: - # We have to wait to prevent the raylet becomes a zombie which will prevent - # worker from exiting self.head_node.kill_all_processes( - check_alive=False, allow_graceful=allow_graceful, wait=True + check_alive=False, allow_graceful=allow_graceful ) self.head_node = None # TODO(rliaw): Do we need to kill all worker processes? else: - # We have to wait to prevent the raylet becomes a zombie which will prevent - # worker from exiting - node.kill_all_processes( - check_alive=False, allow_graceful=allow_graceful, wait=True - ) + node.kill_all_processes(check_alive=False, allow_graceful=allow_graceful) self.worker_nodes.remove(node) assert ( diff --git a/python/ray/node.py b/python/ray/node.py index 3d2477736a0a..95e82f2d6ef4 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -1341,7 +1341,7 @@ def kill_reaper(self, check_alive: bool = True): ray_constants.PROCESS_TYPE_REAPER, check_alive=check_alive ) - def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False): + def kill_all_processes(self, check_alive=True, allow_graceful=False): """Kill all of the processes. Note that This is slower than necessary because it calls kill, wait, @@ -1350,8 +1350,6 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False) Args: check_alive: Raise an exception if any of the processes were already dead. - wait: If true, then this method will not return until the - process in question has exited. """ # Kill the raylet first. This is important for suppressing errors at # shutdown because we give the raylet a chance to exit gracefully and @@ -1363,7 +1361,6 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False) ray_constants.PROCESS_TYPE_RAYLET, check_alive=check_alive, allow_graceful=allow_graceful, - wait=wait, ) if ray_constants.PROCESS_TYPE_GCS_SERVER in self.all_processes: @@ -1371,7 +1368,6 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False) ray_constants.PROCESS_TYPE_GCS_SERVER, check_alive=check_alive, allow_graceful=allow_graceful, - wait=wait, ) # We call "list" to copy the keys because we are modifying the @@ -1381,10 +1377,7 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False) # while cleaning up. if process_type != ray_constants.PROCESS_TYPE_REAPER: self._kill_process_type( - process_type, - check_alive=check_alive, - allow_graceful=allow_graceful, - wait=wait, + process_type, check_alive=check_alive, allow_graceful=allow_graceful ) if ray_constants.PROCESS_TYPE_REAPER in self.all_processes: @@ -1392,7 +1385,6 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False) ray_constants.PROCESS_TYPE_REAPER, check_alive=check_alive, allow_graceful=allow_graceful, - wait=wait, ) def live_processes(self): diff --git a/python/ray/tests/test_scheduling_2.py b/python/ray/tests/test_scheduling_2.py index a1c619bee1ba..a530df037927 100644 --- a/python/ray/tests/test_scheduling_2.py +++ b/python/ray/tests/test_scheduling_2.py @@ -255,10 +255,7 @@ def func(): @pytest.mark.parametrize("connect_to_client", [True, False]) -def test_node_affinity_scheduling_strategy( - monkeypatch, ray_start_cluster, connect_to_client -): - monkeypatch.setenv("RAY_num_heartbeats_timeout", "4") +def test_node_affinity_scheduling_strategy(ray_start_cluster, connect_to_client): cluster = ray_start_cluster cluster.add_node(num_cpus=8, resources={"head": 1}) ray.init(address=cluster.address) @@ -519,10 +516,7 @@ def ping(self): @pytest.mark.skipif( platform.system() == "Windows", reason="FakeAutoscaler doesn't work on Windows" ) -def test_demand_report_for_node_affinity_scheduling_strategy( - monkeypatch, shutdown_only -): - monkeypatch.setenv("RAY_num_heartbeats_timeout", "4") +def test_demand_report_for_node_affinity_scheduling_strategy(shutdown_only): from ray.cluster_utils import AutoscalingCluster cluster = AutoscalingCluster( diff --git a/src/mock/ray/raylet_client/raylet_client.h b/src/mock/ray/raylet_client/raylet_client.h index dee2808e0fea..063253f58040 100644 --- a/src/mock/ray/raylet_client/raylet_client.h +++ b/src/mock/ray/raylet_client/raylet_client.h @@ -198,6 +198,10 @@ class MockRayletClientInterface : public RayletClientInterface { GetSystemConfig, (const rpc::ClientCallback &callback), (override)); + MOCK_METHOD(void, + GetGcsServerAddress, + (const rpc::ClientCallback &callback), + (override)); MOCK_METHOD(void, UpdateResourceUsage, (std::string & serialized_resource_usage_batch, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 3a93637f224a..fda8c7c736b7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -180,6 +180,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ << rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID() << ", raylet " << local_raylet_id; + // Begin to get gcs server address from raylet. + gcs_server_address_updater_ = std::make_unique( + options_.raylet_ip_address, + options_.node_manager_port, + [this](std::string ip, int port) { + absl::MutexLock lock(&gcs_server_address_mutex_); + gcs_server_address_.first = ip; + gcs_server_address_.second = port; + }); + gcs_client_ = std::make_shared(options_.gcs_options); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); @@ -236,7 +246,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ return std::shared_ptr( new rpc::CoreWorkerClient(addr, *client_call_manager_)); }); - if (options_.worker_type != WorkerType::DRIVER) { + + if (options_.worker_type == WorkerType::WORKER) { periodical_runner_.RunFnPeriodically( [this] { ExitIfParentRayletDies(); }, RayConfig::instance().raylet_death_check_interval_milliseconds()); @@ -748,14 +759,16 @@ void CoreWorker::RegisterToGcs() { } void CoreWorker::ExitIfParentRayletDies() { + RAY_CHECK(options_.worker_type == WorkerType::WORKER); RAY_CHECK(!RayConfig::instance().RAYLET_PID().empty()); - static auto raylet_pid = - static_cast(std::stoi(RayConfig::instance().RAYLET_PID())); + auto raylet_pid = static_cast(std::stoi(RayConfig::instance().RAYLET_PID())); bool should_shutdown = !IsProcessAlive(raylet_pid); if (should_shutdown) { - RAY_LOG(WARNING) << "Shutting down the core worker because the local raylet failed. " - << "Check out the raylet.out log file. Raylet pid: " << raylet_pid; - QuickExit(); + std::ostringstream stream; + stream << "Shutting down the core worker because the local raylet failed. " + << "Check out the raylet.out log file. Raylet pid: " << raylet_pid; + RAY_LOG(WARNING) << stream.str(); + task_execution_service_.post([this]() { Shutdown(); }, "CoreWorker.Shutdown"); } } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 326403f655a3..8b67002fb080 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -26,6 +26,7 @@ #include "ray/core_worker/core_worker_options.h" #include "ray/core_worker/core_worker_process.h" #include "ray/core_worker/future_resolver.h" +#include "ray/core_worker/gcs_server_address_updater.h" #include "ray/core_worker/lease_policy.h" #include "ray/core_worker/object_recovery_manager.h" #include "ray/core_worker/profiling.h" @@ -1112,6 +1113,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Client to the GCS shared by core worker interfaces. std::shared_ptr gcs_client_; + std::pair gcs_server_address_ GUARDED_BY(gcs_server_address_mutex_) = + std::make_pair("", 0); + /// To protect accessing the `gcs_server_address_`. + absl::Mutex gcs_server_address_mutex_; + std::unique_ptr gcs_server_address_updater_; + // Client to the raylet shared by core worker interfaces. This needs to be a // shared_ptr for direct calls because we can lease multiple workers through // one client, and we need to keep the connection alive until we return all diff --git a/src/ray/core_worker/gcs_server_address_updater.cc b/src/ray/core_worker/gcs_server_address_updater.cc new file mode 100644 index 000000000000..04fc09947c9f --- /dev/null +++ b/src/ray/core_worker/gcs_server_address_updater.cc @@ -0,0 +1,96 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/gcs_server_address_updater.h" + +namespace ray { +namespace core { + +GcsServerAddressUpdater::GcsServerAddressUpdater( + const std::string raylet_ip_address, + const int port, + std::function update_func) + : client_call_manager_(updater_io_service_), + raylet_client_(rpc::NodeManagerWorkerClient::make( + raylet_ip_address, port, client_call_manager_)), + update_func_(update_func), + updater_runner_(updater_io_service_), + updater_thread_([this] { + SetThreadName("gcs_addr_updater"); + std::thread::id this_id = std::this_thread::get_id(); + RAY_LOG(INFO) << "GCS Server updater thread id: " << this_id; + /// The asio work to keep io_service_ alive. + boost::asio::io_service::work io_service_work_(updater_io_service_); + updater_io_service_.run(); + }) { + // Start updating gcs server address. + updater_runner_.RunFnPeriodically( + [this] { UpdateGcsServerAddress(); }, + RayConfig::instance().gcs_service_address_check_interval_milliseconds(), + "GcsServerAddressUpdater.UpdateGcsServerAddress"); +} + +GcsServerAddressUpdater::~GcsServerAddressUpdater() { + updater_io_service_.stop(); + if (updater_thread_.joinable()) { + updater_thread_.join(); + } else { + RAY_LOG(WARNING) + << "Could not join updater thread. This can cause segfault upon destruction."; + } + RAY_LOG(DEBUG) << "GcsServerAddressUpdater is destructed"; +} + +void GcsServerAddressUpdater::UpdateGcsServerAddress() { + raylet_client_.GetGcsServerAddress([this](const Status &status, + const rpc::GetGcsServerAddressReply &reply) { + const int64_t max_retries = + RayConfig::instance().gcs_rpc_server_reconnect_timeout_s() * 1000 / + RayConfig::instance().gcs_service_address_check_interval_milliseconds(); + if (!status.ok()) { + failed_ping_count_ += 1; + auto warning_threshold = max_retries / 2; + RAY_LOG_EVERY_N(WARNING, warning_threshold) + << "Failed to get the gcs server address from raylet " << failed_ping_count_ + << " times in a row. If it keeps failing to obtain the address, " + "the worker might crash. Connection status " + << status; + if (failed_ping_count_ >= max_retries) { + std::stringstream os; + os << "Failed to receive the GCS address for " << failed_ping_count_ + << " times without success. The worker will exit ungracefully. It is because "; + if (status.IsGrpcUnavailable()) { + RAY_LOG(WARNING) << os.str() + << "raylet has died, and it couldn't obtain the GCS address " + "from the raylet anymore. Please check the log from " + "raylet.err on this address."; + } else { + RAY_LOG(ERROR) + << os.str() + << "GCS has died. It could be because there was an issue that " + "kills GCS, such as high memory usage triggering OOM killer " + "to kill GCS. Cluster will be highly likely unavailable if you see " + "this log. Please check the log from gcs_server.err."; + } + QuickExit(); + } + } else { + failed_ping_count_ = 0; + update_func_(reply.ip(), reply.port()); + } + }); +} + +} // namespace core +} // namespace ray diff --git a/src/ray/core_worker/gcs_server_address_updater.h b/src/ray/core_worker/gcs_server_address_updater.h new file mode 100644 index 000000000000..02e3dc90b046 --- /dev/null +++ b/src/ray/core_worker/gcs_server_address_updater.h @@ -0,0 +1,52 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/asio/periodical_runner.h" +#include "ray/raylet_client/raylet_client.h" + +namespace ray { +namespace core { + +class GcsServerAddressUpdater { + public: + /// Create a updater for gcs server address. + /// + /// \param raylet_ip_address Raylet ip address. + /// \param port Port to connect raylet. + /// \param address to store gcs server address. + GcsServerAddressUpdater(const std::string raylet_ip_address, + const int port, + std::function update_func); + + ~GcsServerAddressUpdater(); + + private: + /// Update gcs server address. + void UpdateGcsServerAddress(); + + instrumented_io_context updater_io_service_; + rpc::ClientCallManager client_call_manager_; + /// A client connection to the raylet. + raylet::RayletClient raylet_client_; + std::function update_func_; + PeriodicalRunner updater_runner_; + std::thread updater_thread_; + int32_t failed_ping_count_ = 0; +}; + +} // namespace core +} // namespace ray diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 27e5927386d5..6ab95b034cac 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -268,6 +268,10 @@ struct GcsServerMocker { void GetSystemConfig(const ray::rpc::ClientCallback &callback) override {} + void GetGcsServerAddress( + const ray::rpc::ClientCallback &callback) + override {} + /// ResourceUsageInterface void RequestResourceReport( const rpc::ClientCallback &callback) override { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 4f1e3ecfbd31..7ed23b18a85c 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -293,6 +293,14 @@ message UpdateResourceUsageRequest { message UpdateResourceUsageReply { } +message GetGcsServerAddressRequest { +} + +message GetGcsServerAddressReply { + string ip = 1; + int32 port = 2; +} + message GetTasksInfoRequest {} message GetTasksInfoReply { @@ -378,6 +386,8 @@ service NodeManagerService { returns (ReleaseUnusedBundlesReply); // Get the system config. rpc GetSystemConfig(GetSystemConfigRequest) returns (GetSystemConfigReply); + // Get gcs server address. + rpc GetGcsServerAddress(GetGcsServerAddressRequest) returns (GetGcsServerAddressReply); // [State API] Get the all task information of the node. rpc GetTasksInfo(GetTasksInfoRequest) returns (GetTasksInfoReply); // [State API] Get the all object information of the node. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e151839fb6d5..00e559bb509e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2409,6 +2409,16 @@ void NodeManager::HandleGetSystemConfig(const rpc::GetSystemConfigRequest &reque send_reply_callback(Status::OK(), nullptr, nullptr); } +void NodeManager::HandleGetGcsServerAddress( + const rpc::GetGcsServerAddressRequest &request, + rpc::GetGcsServerAddressReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto address = gcs_client_->GetGcsServerAddress(); + reply->set_ip(address.first); + reply->set_port(address.second); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index b2b701a5a7c0..00f6fca6bcc8 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -588,6 +588,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::GetSystemConfigReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `GetGcsServerAddress` request. + void HandleGetGcsServerAddress(const rpc::GetGcsServerAddressRequest &request, + rpc::GetGcsServerAddressReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `HandleGetTasksInfo` request. void HandleGetTasksInfo(const rpc::GetTasksInfoRequest &request, rpc::GetTasksInfoReply *reply, diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 536cedc32571..f7c3616a716e 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -542,4 +542,10 @@ void raylet::RayletClient::GetSystemConfig( grpc_client_->GetSystemConfig(request, callback); } +void raylet::RayletClient::GetGcsServerAddress( + const rpc::ClientCallback &callback) { + rpc::GetGcsServerAddressRequest request; + grpc_client_->GetGcsServerAddress(request, callback); +} + } // namespace ray diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 0d8eea2c5591..9211d8b15604 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -185,6 +185,9 @@ class RayletClientInterface : public PinObjectsInterface, virtual void GetSystemConfig( const rpc::ClientCallback &callback) = 0; + virtual void GetGcsServerAddress( + const rpc::ClientCallback &callback) = 0; + virtual void NotifyGCSRestart( const rpc::ClientCallback &callback) = 0; @@ -458,6 +461,9 @@ class RayletClient : public RayletClientInterface { void GetSystemConfig( const rpc::ClientCallback &callback) override; + void GetGcsServerAddress( + const rpc::ClientCallback &callback) override; + void GlobalGC(const rpc::ClientCallback &callback); void UpdateResourceUsage( diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index 197eb658737a..b2382a7f6e4a 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -187,6 +187,12 @@ class NodeManagerWorkerClient grpc_client_, /*method_timeout_ms*/ -1, ) + /// Get gcs server address. + VOID_RPC_CLIENT_METHOD(NodeManagerService, + GetGcsServerAddress, + grpc_client_, + /*method_timeout_ms*/ -1, ) + /// Get all the task information from the node. VOID_RPC_CLIENT_METHOD(NodeManagerService, GetTasksInfo, diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index a690fb0b6b68..2769b46bd214 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -44,6 +44,7 @@ namespace rpc { RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage, -1) \ RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles, -1) \ RPC_SERVICE_HANDLER(NodeManagerService, GetSystemConfig, -1) \ + RPC_SERVICE_HANDLER(NodeManagerService, GetGcsServerAddress, -1) \ RPC_SERVICE_HANDLER(NodeManagerService, ShutdownRaylet, -1) \ RPC_SERVICE_HANDLER(NodeManagerService, GetTasksInfo, -1) \ RPC_SERVICE_HANDLER(NodeManagerService, GetObjectsInfo, -1) @@ -146,10 +147,12 @@ class NodeManagerServiceHandler { GetSystemConfigReply *reply, SendReplyCallback send_reply_callback) = 0; + virtual void HandleGetGcsServerAddress(const GetGcsServerAddressRequest &request, + GetGcsServerAddressReply *reply, + SendReplyCallback send_reply_callback) = 0; virtual void HandleGetTasksInfo(const GetTasksInfoRequest &request, GetTasksInfoReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleGetObjectsInfo(const GetObjectsInfoRequest &request, GetObjectsInfoReply *reply, SendReplyCallback send_reply_callback) = 0; diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 71b4de91b684..8d634dff9fb4 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -602,7 +602,7 @@ pid_t GetPID() { bool IsParentProcessAlive() { return GetParentPID() != 1; } bool IsProcessAlive(pid_t pid) { -#if defined _WIN32 +#ifdef _WIN32 if (HANDLE handle = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, static_cast(pid))) { DWORD exit_code; diff --git a/src/ray/util/util_test.cc b/src/ray/util/util_test.cc index c309e40c550a..3e13dedb10bf 100644 --- a/src/ray/util/util_test.cc +++ b/src/ray/util/util_test.cc @@ -17,15 +17,9 @@ #include #include -#include -#include -#include #include "gtest/gtest.h" #include "ray/util/logging.h" -#include "ray/util/process.h" - -using namespace std::chrono_literals; static const char *argv0 = NULL; @@ -190,21 +184,6 @@ TEST(UtilTest, CreateCommandLineTest) { } } -TEST(UtilTest, IsProcessAlive) { - namespace bp = boost::process; - bp::child c("bash"); - auto pid = c.id(); - c.join(); - for (int i = 0; i < 5; ++i) { - if (IsProcessAlive(pid)) { - std::this_thread::sleep_for(1s); - } else { - break; - } - } - RAY_CHECK(!IsProcessAlive(pid)); -} - } // namespace ray int main(int argc, char **argv) {