Skip to content

Commit

Permalink
[Core] Rename ClientFactoryFn to CoreWorkerClientFactoryFn (ray-proje…
Browse files Browse the repository at this point in the history
…ct#48576)

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: mohitjain2504 <[email protected]>
  • Loading branch information
jjyao authored and mohitjain2504 committed Nov 15, 2024
1 parent e6d3f52 commit a868478
Show file tree
Hide file tree
Showing 16 changed files with 30 additions and 43 deletions.
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ class ObjectRefGenerator:
return False
else:
return True
else:
return False

"""
Private APIs
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/normal_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli

void NormalTaskSubmitter::PushNormalTask(
const rpc::Address &addr,
shared_ptr<rpc::CoreWorkerClientInterface> client,
std::shared_ptr<rpc::CoreWorkerClientInterface> client,
const SchedulingKey &scheduling_key,
const TaskSpecification &task_spec,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ GcsActorManager::GcsActorManager(
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory)
const rpc::CoreWorkerClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
const rpc::CoreWorkerClientFactoryFn &worker_client_factory = nullptr);

~GcsActorManager() = default;

Expand Down Expand Up @@ -692,7 +692,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;
rpc::CoreWorkerClientFactoryFn worker_client_factory_;
/// A callback that is used to destroy placemenet group owned by the actor.
/// This method MUST BE IDEMPOTENT because it can be called multiple times during
/// actor destroy process.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ GcsActorScheduler::GcsActorScheduler(
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory,
rpc::CoreWorkerClientFactoryFn client_factory,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback)
: io_context_(io_context),
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory = nullptr,
rpc::CoreWorkerClientFactoryFn client_factory = nullptr,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback = nullptr);
virtual ~GcsActorScheduler() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class GcsJobManager : public rpc::JobInfoHandler {
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
InternalKVInterface &internal_kv,
rpc::ClientFactoryFn client_factory = nullptr)
rpc::CoreWorkerClientFactoryFn client_factory = nullptr)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
runtime_env_manager_(runtime_env_manager),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class GcsJobManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsFunctionManager> function_manager_;
std::unique_ptr<gcs::MockInternalKVInterface> kv_;
std::unique_ptr<gcs::FakeInternalKVInterface> fake_kv_;
rpc::ClientFactoryFn client_factory_;
rpc::CoreWorkerClientFactoryFn client_factory_;
RuntimeEnvManager runtime_env_manager_;
const std::chrono::milliseconds timeout_ms_{5000};
std::string log_dir_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class GcsJobManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsFunctionManager> function_manager_;
std::unique_ptr<gcs::MockInternalKVInterface> kv_;
std::unique_ptr<gcs::FakeInternalKVInterface> fake_kv_;
rpc::ClientFactoryFn client_factory_;
rpc::CoreWorkerClientFactoryFn client_factory_;
RuntimeEnvManager runtime_env_manager_;
const std::chrono::milliseconds timeout_ms_{5000};
};
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ int main(int argc, char *argv[]) {
auto signal_handler = [&raylet, shutdown_raylet_gracefully_internal](
const boost::system::error_code &error, int signal_number) {
ray::rpc::NodeDeathInfo node_death_info;
optional<ray::rpc::DrainRayletRequest> drain_request =
std::optional<ray::rpc::DrainRayletRequest> drain_request =
raylet->node_manager().GetLocalDrainRequest();
RAY_LOG(INFO) << "received SIGTERM. Existing local drain request = "
<< (drain_request.has_value() ? drain_request->DebugString() : "None");
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
}

/// Get the local drain request.
optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
std::optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
return cluster_resource_scheduler_->GetLocalResourceManager().GetLocalDrainRequest();
}

Expand Down
6 changes: 3 additions & 3 deletions src/ray/rpc/node_manager/node_manager_client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace ray {
namespace rpc {

shared_ptr<ray::RayletClientInterface> NodeManagerClientPool::GetOrConnectByAddress(
std::shared_ptr<ray::RayletClientInterface> NodeManagerClientPool::GetOrConnectByAddress(
const rpc::Address &address) {
RAY_CHECK(address.raylet_id() != "");
absl::MutexLock lock(&mu_);
Expand All @@ -36,8 +36,8 @@ shared_ptr<ray::RayletClientInterface> NodeManagerClientPool::GetOrConnectByAddr
return connection;
}

optional<shared_ptr<ray::RayletClientInterface>> NodeManagerClientPool::GetOrConnectByID(
ray::NodeID id) {
std::optional<std::shared_ptr<ray::RayletClientInterface>>
NodeManagerClientPool::GetOrConnectByID(ray::NodeID id) {
absl::MutexLock lock(&mu_);
auto it = client_map_.find(id);
if (it == client_map_.end()) {
Expand Down
12 changes: 5 additions & 7 deletions src/ray/rpc/node_manager/node_manager_client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"

using absl::optional;
using std::shared_ptr;

namespace ray {
namespace rpc {

Expand All @@ -36,12 +33,13 @@ class NodeManagerClientPool {

/// Return an existing NodeManagerWorkerClient if exists, and connect to one if it does
/// not. The returned pointer is borrowed, and expected to be used briefly.
optional<shared_ptr<ray::RayletClientInterface>> GetOrConnectByID(ray::NodeID id);
std::optional<std::shared_ptr<ray::RayletClientInterface>> GetOrConnectByID(
ray::NodeID id);

/// Return an existing NodeManagerWorkerClient if exists, and connect to one if it does
/// not. The returned pointer is borrowed, and expected to be used briefly.
/// The function is guaranteed to return the non-nullptr.
shared_ptr<ray::RayletClientInterface> GetOrConnectByAddress(
std::shared_ptr<ray::RayletClientInterface> GetOrConnectByAddress(
const rpc::Address &address);

/// Removes a connection to the worker from the pool, if one exists. Since the
Expand Down Expand Up @@ -77,8 +75,8 @@ class NodeManagerClientPool {

/// A pool of open connections by host:port. Clients can reuse the connection
/// objects in this pool by requesting them
absl::flat_hash_map<ray::NodeID, shared_ptr<ray::RayletClientInterface>> client_map_
ABSL_GUARDED_BY(mu_);
absl::flat_hash_map<ray::NodeID, std::shared_ptr<ray::RayletClientInterface>>
client_map_ ABSL_GUARDED_BY(mu_);
};

} // namespace rpc
Expand Down
14 changes: 2 additions & 12 deletions src/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@

#pragma once

#ifdef __clang__
// TODO(mehrdadn): Remove this when the warnings are addressed
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wunused-result"
#endif

#include <grpcpp/grpcpp.h>

#include <deque>
Expand Down Expand Up @@ -473,12 +467,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
int64_t max_finished_seq_no_ ABSL_GUARDED_BY(mutex_) = -1;
};

typedef std::function<std::shared_ptr<CoreWorkerClientInterface>(const rpc::Address &)>
ClientFactoryFn;
using CoreWorkerClientFactoryFn =
std::function<std::shared_ptr<CoreWorkerClientInterface>(const rpc::Address &)>;

} // namespace rpc
} // namespace ray

#ifdef __clang__
#pragma clang diagnostic pop
#endif
2 changes: 1 addition & 1 deletion src/ray/rpc/worker/core_worker_client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace ray {
namespace rpc {

shared_ptr<CoreWorkerClientInterface> CoreWorkerClientPool::GetOrConnect(
std::shared_ptr<CoreWorkerClientInterface> CoreWorkerClientPool::GetOrConnect(
const Address &addr_proto) {
RAY_CHECK_NE(addr_proto.worker_id(), "");
absl::MutexLock lock(&mu_);
Expand Down
15 changes: 6 additions & 9 deletions src/ray/rpc/worker/core_worker_client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
#include "ray/common/id.h"
#include "ray/rpc/worker/core_worker_client.h"

using absl::optional;
using std::shared_ptr;

namespace ray {
namespace rpc {

Expand All @@ -35,13 +32,13 @@ class CoreWorkerClientPool {
: client_factory_(defaultClientFactory(ccm)){};

/// Creates a CoreWorkerClientPool by a given connection function.
CoreWorkerClientPool(ClientFactoryFn client_factory)
CoreWorkerClientPool(CoreWorkerClientFactoryFn client_factory)
: client_factory_(client_factory){};

/// Returns an open CoreWorkerClientInterface if one exists, and connect to one
/// if it does not. The returned pointer is borrowed, and expected to be used
/// briefly.
shared_ptr<CoreWorkerClientInterface> GetOrConnect(const Address &addr_proto);
std::shared_ptr<CoreWorkerClientInterface> GetOrConnect(const Address &addr_proto);

/// Removes a connection to the worker from the pool, if one exists. Since the
/// shared pointer will no longer be retained in the pool, the connection will
Expand All @@ -59,7 +56,7 @@ class CoreWorkerClientPool {
/// Provides the default client factory function. Providing this function to the
/// construtor aids migration but is ultimately a thing that should be
/// deprecated and brought internal to the pool, so this is our bridge.
ClientFactoryFn defaultClientFactory(rpc::ClientCallManager &ccm) const {
CoreWorkerClientFactoryFn defaultClientFactory(rpc::ClientCallManager &ccm) const {
return [&](const rpc::Address &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(new rpc::CoreWorkerClient(addr, ccm));
};
Expand All @@ -76,19 +73,19 @@ class CoreWorkerClientPool {
/// This factory function does the connection to CoreWorkerClient, and is
/// provided by the constructor (either the default implementation, above, or a
/// provided one)
ClientFactoryFn client_factory_;
CoreWorkerClientFactoryFn client_factory_;

absl::Mutex mu_;

struct CoreWorkerClientEntry {
public:
CoreWorkerClientEntry() {}
CoreWorkerClientEntry(ray::WorkerID worker_id,
shared_ptr<CoreWorkerClientInterface> core_worker_client)
std::shared_ptr<CoreWorkerClientInterface> core_worker_client)
: worker_id(worker_id), core_worker_client(core_worker_client) {}

ray::WorkerID worker_id;
shared_ptr<CoreWorkerClientInterface> core_worker_client;
std::shared_ptr<CoreWorkerClientInterface> core_worker_client;
};

/// A list of open connections from the most recent accessed to the least recent
Expand Down

0 comments on commit a868478

Please sign in to comment.