From ab11d33a2f3767a2b13bc37e4fcdcf32f6d8c4a9 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 5 Nov 2024 11:55:24 -0800 Subject: [PATCH 1/3] Test runtime env overhead Signed-off-by: Jiajun Yao --- release/benchmarks/distributed/test_many_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 6f524cf4e3ea..941795d19b27 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -18,7 +18,7 @@ def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 - @ray.remote(num_cpus=cpus_per_task) + @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) def task(): time.sleep(sleep_time) From 69a7d078d8d76e500852c1b2c00c1eb7aa105e56 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 7 Nov 2024 13:48:52 -0800 Subject: [PATCH 2/3] Revert "Test runtime env overhead" This reverts commit ab11d33a2f3767a2b13bc37e4fcdcf32f6d8c4a9. --- release/benchmarks/distributed/test_many_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 941795d19b27..6f524cf4e3ea 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -18,7 +18,7 @@ def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 - @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) + @ray.remote(num_cpus=cpus_per_task) def task(): time.sleep(sleep_time) From 506daa20e985018fc7b71c8ee72664f9186ef44c Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 7 Nov 2024 14:19:54 -0800 Subject: [PATCH 3/3] up Signed-off-by: Jiajun Yao --- python/ray/_raylet.pyx | 2 ++ .../transport/normal_task_submitter.cc | 2 +- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 2 +- src/ray/gcs/gcs_server/gcs_actor_manager.h | 4 ++-- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 2 +- src/ray/gcs/gcs_server/gcs_actor_scheduler.h | 2 +- src/ray/gcs/gcs_server/gcs_job_manager.h | 2 +- .../gcs_job_manager_export_event_test.cc | 2 +- .../gcs/gcs_server/test/gcs_job_manager_test.cc | 2 +- src/ray/raylet/main.cc | 2 +- src/ray/raylet/node_manager.h | 2 +- .../rpc/node_manager/node_manager_client_pool.cc | 6 +++--- .../rpc/node_manager/node_manager_client_pool.h | 12 +++++------- src/ray/rpc/worker/core_worker_client.h | 14 ++------------ src/ray/rpc/worker/core_worker_client_pool.cc | 2 +- src/ray/rpc/worker/core_worker_client_pool.h | 15 ++++++--------- 16 files changed, 30 insertions(+), 43 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index fa49a88c691f..d93b5f379780 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -417,6 +417,8 @@ class ObjectRefGenerator: return False else: return True + else: + return False """ Private APIs diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index c3c95262fc21..84a1eb92f5ab 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -546,7 +546,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli void NormalTaskSubmitter::PushNormalTask( const rpc::Address &addr, - shared_ptr client, + std::shared_ptr client, const SchedulingKey &scheduling_key, const TaskSpecification &task_spec, const google::protobuf::RepeatedPtrField &assigned_resources) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 946769884a8a..aa609bacb445 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -329,7 +329,7 @@ GcsActorManager::GcsActorManager( RuntimeEnvManager &runtime_env_manager, GcsFunctionManager &function_manager, std::function destroy_owned_placement_group_if_needed, - const rpc::ClientFactoryFn &worker_client_factory) + const rpc::CoreWorkerClientFactoryFn &worker_client_factory) : gcs_actor_scheduler_(std::move(scheduler)), gcs_table_storage_(std::move(gcs_table_storage)), gcs_publisher_(std::move(gcs_publisher)), diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index f31943ccb717..dadb90498379 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -319,7 +319,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { RuntimeEnvManager &runtime_env_manager, GcsFunctionManager &function_manager, std::function destroy_owned_placement_group_if_needed, - const rpc::ClientFactoryFn &worker_client_factory = nullptr); + const rpc::CoreWorkerClientFactoryFn &worker_client_factory = nullptr); ~GcsActorManager() = default; @@ -692,7 +692,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { std::shared_ptr gcs_publisher_; /// Factory to produce clients to workers. This is used to communicate with /// actors and their owners. - rpc::ClientFactoryFn worker_client_factory_; + rpc::CoreWorkerClientFactoryFn worker_client_factory_; /// A callback that is used to destroy placemenet group owned by the actor. /// This method MUST BE IDEMPOTENT because it can be called multiple times during /// actor destroy process. diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 3582152c5f66..f562c0f9034e 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -31,7 +31,7 @@ GcsActorScheduler::GcsActorScheduler( GcsActorSchedulerFailureCallback schedule_failure_handler, GcsActorSchedulerSuccessCallback schedule_success_handler, std::shared_ptr raylet_client_pool, - rpc::ClientFactoryFn client_factory, + rpc::CoreWorkerClientFactoryFn client_factory, std::function normal_task_resources_changed_callback) : io_context_(io_context), diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index 25ee4b7687b0..1ea66d0ddbe0 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -133,7 +133,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { GcsActorSchedulerFailureCallback schedule_failure_handler, GcsActorSchedulerSuccessCallback schedule_success_handler, std::shared_ptr raylet_client_pool, - rpc::ClientFactoryFn client_factory = nullptr, + rpc::CoreWorkerClientFactoryFn client_factory = nullptr, std::function normal_task_resources_changed_callback = nullptr); virtual ~GcsActorScheduler() = default; diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.h b/src/ray/gcs/gcs_server/gcs_job_manager.h index 95f43c7e27ad..b558cbc9c64d 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.h +++ b/src/ray/gcs/gcs_server/gcs_job_manager.h @@ -55,7 +55,7 @@ class GcsJobManager : public rpc::JobInfoHandler { RuntimeEnvManager &runtime_env_manager, GcsFunctionManager &function_manager, InternalKVInterface &internal_kv, - rpc::ClientFactoryFn client_factory = nullptr) + rpc::CoreWorkerClientFactoryFn client_factory = nullptr) : gcs_table_storage_(std::move(gcs_table_storage)), gcs_publisher_(std::move(gcs_publisher)), runtime_env_manager_(runtime_env_manager), diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc index a37db046804e..832490d2f81e 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_job_manager_export_event_test.cc @@ -85,7 +85,7 @@ class GcsJobManagerTest : public ::testing::Test { std::unique_ptr function_manager_; std::unique_ptr kv_; std::unique_ptr fake_kv_; - rpc::ClientFactoryFn client_factory_; + rpc::CoreWorkerClientFactoryFn client_factory_; RuntimeEnvManager runtime_env_manager_; const std::chrono::milliseconds timeout_ms_{5000}; std::string log_dir_; diff --git a/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc index b120c6d631ef..107af6752a5d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc @@ -81,7 +81,7 @@ class GcsJobManagerTest : public ::testing::Test { std::unique_ptr function_manager_; std::unique_ptr kv_; std::unique_ptr fake_kv_; - rpc::ClientFactoryFn client_factory_; + rpc::CoreWorkerClientFactoryFn client_factory_; RuntimeEnvManager runtime_env_manager_; const std::chrono::milliseconds timeout_ms_{5000}; }; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index f72e8d2661b2..5ae488e6e372 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -449,7 +449,7 @@ int main(int argc, char *argv[]) { auto signal_handler = [&raylet, shutdown_raylet_gracefully_internal]( const boost::system::error_code &error, int signal_number) { ray::rpc::NodeDeathInfo node_death_info; - optional drain_request = + std::optional drain_request = raylet->node_manager().GetLocalDrainRequest(); RAY_LOG(INFO) << "received SIGTERM. Existing local drain request = " << (drain_request.has_value() ? drain_request->DebugString() : "None"); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f16f2b4174b5..cef5e66aa26f 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -224,7 +224,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, } /// Get the local drain request. - optional GetLocalDrainRequest() const { + std::optional GetLocalDrainRequest() const { return cluster_resource_scheduler_->GetLocalResourceManager().GetLocalDrainRequest(); } diff --git a/src/ray/rpc/node_manager/node_manager_client_pool.cc b/src/ray/rpc/node_manager/node_manager_client_pool.cc index 097ad168d631..b473c468bae5 100644 --- a/src/ray/rpc/node_manager/node_manager_client_pool.cc +++ b/src/ray/rpc/node_manager/node_manager_client_pool.cc @@ -17,7 +17,7 @@ namespace ray { namespace rpc { -shared_ptr NodeManagerClientPool::GetOrConnectByAddress( +std::shared_ptr NodeManagerClientPool::GetOrConnectByAddress( const rpc::Address &address) { RAY_CHECK(address.raylet_id() != ""); absl::MutexLock lock(&mu_); @@ -36,8 +36,8 @@ shared_ptr NodeManagerClientPool::GetOrConnectByAddr return connection; } -optional> NodeManagerClientPool::GetOrConnectByID( - ray::NodeID id) { +std::optional> +NodeManagerClientPool::GetOrConnectByID(ray::NodeID id) { absl::MutexLock lock(&mu_); auto it = client_map_.find(id); if (it == client_map_.end()) { diff --git a/src/ray/rpc/node_manager/node_manager_client_pool.h b/src/ray/rpc/node_manager/node_manager_client_pool.h index 0e575a460cfb..687be2b57317 100644 --- a/src/ray/rpc/node_manager/node_manager_client_pool.h +++ b/src/ray/rpc/node_manager/node_manager_client_pool.h @@ -22,9 +22,6 @@ #include "ray/raylet_client/raylet_client.h" #include "ray/rpc/node_manager/node_manager_client.h" -using absl::optional; -using std::shared_ptr; - namespace ray { namespace rpc { @@ -36,12 +33,13 @@ class NodeManagerClientPool { /// Return an existing NodeManagerWorkerClient if exists, and connect to one if it does /// not. The returned pointer is borrowed, and expected to be used briefly. - optional> GetOrConnectByID(ray::NodeID id); + std::optional> GetOrConnectByID( + ray::NodeID id); /// Return an existing NodeManagerWorkerClient if exists, and connect to one if it does /// not. The returned pointer is borrowed, and expected to be used briefly. /// The function is guaranteed to return the non-nullptr. - shared_ptr GetOrConnectByAddress( + std::shared_ptr GetOrConnectByAddress( const rpc::Address &address); /// Removes a connection to the worker from the pool, if one exists. Since the @@ -77,8 +75,8 @@ class NodeManagerClientPool { /// A pool of open connections by host:port. Clients can reuse the connection /// objects in this pool by requesting them - absl::flat_hash_map> client_map_ - ABSL_GUARDED_BY(mu_); + absl::flat_hash_map> + client_map_ ABSL_GUARDED_BY(mu_); }; } // namespace rpc diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 3dc55da1c8db..db5df1c0ce15 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -14,12 +14,6 @@ #pragma once -#ifdef __clang__ -// TODO(mehrdadn): Remove this when the warnings are addressed -#pragma clang diagnostic push -#pragma clang diagnostic warning "-Wunused-result" -#endif - #include #include @@ -473,12 +467,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, int64_t max_finished_seq_no_ ABSL_GUARDED_BY(mutex_) = -1; }; -typedef std::function(const rpc::Address &)> - ClientFactoryFn; +using CoreWorkerClientFactoryFn = + std::function(const rpc::Address &)>; } // namespace rpc } // namespace ray - -#ifdef __clang__ -#pragma clang diagnostic pop -#endif diff --git a/src/ray/rpc/worker/core_worker_client_pool.cc b/src/ray/rpc/worker/core_worker_client_pool.cc index 3850f474b842..19ee34497cc7 100644 --- a/src/ray/rpc/worker/core_worker_client_pool.cc +++ b/src/ray/rpc/worker/core_worker_client_pool.cc @@ -17,7 +17,7 @@ namespace ray { namespace rpc { -shared_ptr CoreWorkerClientPool::GetOrConnect( +std::shared_ptr CoreWorkerClientPool::GetOrConnect( const Address &addr_proto) { RAY_CHECK_NE(addr_proto.worker_id(), ""); absl::MutexLock lock(&mu_); diff --git a/src/ray/rpc/worker/core_worker_client_pool.h b/src/ray/rpc/worker/core_worker_client_pool.h index ec6b12516227..704ed82361b8 100644 --- a/src/ray/rpc/worker/core_worker_client_pool.h +++ b/src/ray/rpc/worker/core_worker_client_pool.h @@ -20,9 +20,6 @@ #include "ray/common/id.h" #include "ray/rpc/worker/core_worker_client.h" -using absl::optional; -using std::shared_ptr; - namespace ray { namespace rpc { @@ -35,13 +32,13 @@ class CoreWorkerClientPool { : client_factory_(defaultClientFactory(ccm)){}; /// Creates a CoreWorkerClientPool by a given connection function. - CoreWorkerClientPool(ClientFactoryFn client_factory) + CoreWorkerClientPool(CoreWorkerClientFactoryFn client_factory) : client_factory_(client_factory){}; /// Returns an open CoreWorkerClientInterface if one exists, and connect to one /// if it does not. The returned pointer is borrowed, and expected to be used /// briefly. - shared_ptr GetOrConnect(const Address &addr_proto); + std::shared_ptr GetOrConnect(const Address &addr_proto); /// Removes a connection to the worker from the pool, if one exists. Since the /// shared pointer will no longer be retained in the pool, the connection will @@ -59,7 +56,7 @@ class CoreWorkerClientPool { /// Provides the default client factory function. Providing this function to the /// construtor aids migration but is ultimately a thing that should be /// deprecated and brought internal to the pool, so this is our bridge. - ClientFactoryFn defaultClientFactory(rpc::ClientCallManager &ccm) const { + CoreWorkerClientFactoryFn defaultClientFactory(rpc::ClientCallManager &ccm) const { return [&](const rpc::Address &addr) { return std::shared_ptr(new rpc::CoreWorkerClient(addr, ccm)); }; @@ -76,7 +73,7 @@ class CoreWorkerClientPool { /// This factory function does the connection to CoreWorkerClient, and is /// provided by the constructor (either the default implementation, above, or a /// provided one) - ClientFactoryFn client_factory_; + CoreWorkerClientFactoryFn client_factory_; absl::Mutex mu_; @@ -84,11 +81,11 @@ class CoreWorkerClientPool { public: CoreWorkerClientEntry() {} CoreWorkerClientEntry(ray::WorkerID worker_id, - shared_ptr core_worker_client) + std::shared_ptr core_worker_client) : worker_id(worker_id), core_worker_client(core_worker_client) {} ray::WorkerID worker_id; - shared_ptr core_worker_client; + std::shared_ptr core_worker_client; }; /// A list of open connections from the most recent accessed to the least recent