Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Rename ClientFactoryFn to CoreWorkerClientFactoryFn #48576

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ class ObjectRefGenerator:
return False
else:
return True
else:
return False

"""
Private APIs
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/normal_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli

void NormalTaskSubmitter::PushNormalTask(
const rpc::Address &addr,
shared_ptr<rpc::CoreWorkerClientInterface> client,
std::shared_ptr<rpc::CoreWorkerClientInterface> client,
const SchedulingKey &scheduling_key,
const TaskSpecification &task_spec,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ GcsActorManager::GcsActorManager(
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory)
const rpc::CoreWorkerClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
const rpc::CoreWorkerClientFactoryFn &worker_client_factory = nullptr);

~GcsActorManager() = default;

Expand Down Expand Up @@ -692,7 +692,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;
rpc::CoreWorkerClientFactoryFn worker_client_factory_;
/// A callback that is used to destroy placemenet group owned by the actor.
/// This method MUST BE IDEMPOTENT because it can be called multiple times during
/// actor destroy process.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ GcsActorScheduler::GcsActorScheduler(
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory,
rpc::CoreWorkerClientFactoryFn client_factory,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback)
: io_context_(io_context),
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory = nullptr,
rpc::CoreWorkerClientFactoryFn client_factory = nullptr,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback = nullptr);
virtual ~GcsActorScheduler() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class GcsJobManager : public rpc::JobInfoHandler {
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
InternalKVInterface &internal_kv,
rpc::ClientFactoryFn client_factory = nullptr)
rpc::CoreWorkerClientFactoryFn client_factory = nullptr)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
runtime_env_manager_(runtime_env_manager),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class GcsJobManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsFunctionManager> function_manager_;
std::unique_ptr<gcs::MockInternalKVInterface> kv_;
std::unique_ptr<gcs::FakeInternalKVInterface> fake_kv_;
rpc::ClientFactoryFn client_factory_;
rpc::CoreWorkerClientFactoryFn client_factory_;
RuntimeEnvManager runtime_env_manager_;
const std::chrono::milliseconds timeout_ms_{5000};
std::string log_dir_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class GcsJobManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsFunctionManager> function_manager_;
std::unique_ptr<gcs::MockInternalKVInterface> kv_;
std::unique_ptr<gcs::FakeInternalKVInterface> fake_kv_;
rpc::ClientFactoryFn client_factory_;
rpc::CoreWorkerClientFactoryFn client_factory_;
RuntimeEnvManager runtime_env_manager_;
const std::chrono::milliseconds timeout_ms_{5000};
};
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ int main(int argc, char *argv[]) {
auto signal_handler = [&raylet, shutdown_raylet_gracefully_internal](
const boost::system::error_code &error, int signal_number) {
ray::rpc::NodeDeathInfo node_death_info;
optional<ray::rpc::DrainRayletRequest> drain_request =
std::optional<ray::rpc::DrainRayletRequest> drain_request =
raylet->node_manager().GetLocalDrainRequest();
RAY_LOG(INFO) << "received SIGTERM. Existing local drain request = "
<< (drain_request.has_value() ? drain_request->DebugString() : "None");
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
}

/// Get the local drain request.
optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
std::optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
return cluster_resource_scheduler_->GetLocalResourceManager().GetLocalDrainRequest();
}

Expand Down
6 changes: 3 additions & 3 deletions src/ray/rpc/node_manager/node_manager_client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace ray {
namespace rpc {

shared_ptr<ray::RayletClientInterface> NodeManagerClientPool::GetOrConnectByAddress(
std::shared_ptr<ray::RayletClientInterface> NodeManagerClientPool::GetOrConnectByAddress(
const rpc::Address &address) {
RAY_CHECK(address.raylet_id() != "");
absl::MutexLock lock(&mu_);
Expand All @@ -36,8 +36,8 @@ shared_ptr<ray::RayletClientInterface> NodeManagerClientPool::GetOrConnectByAddr
return connection;
}

optional<shared_ptr<ray::RayletClientInterface>> NodeManagerClientPool::GetOrConnectByID(
ray::NodeID id) {
std::optional<std::shared_ptr<ray::RayletClientInterface>>
NodeManagerClientPool::GetOrConnectByID(ray::NodeID id) {
absl::MutexLock lock(&mu_);
auto it = client_map_.find(id);
if (it == client_map_.end()) {
Expand Down
12 changes: 5 additions & 7 deletions src/ray/rpc/node_manager/node_manager_client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"

using absl::optional;
using std::shared_ptr;

namespace ray {
namespace rpc {

Expand All @@ -36,12 +33,13 @@ class NodeManagerClientPool {

/// Return an existing NodeManagerWorkerClient if exists, and connect to one if it does
/// not. The returned pointer is borrowed, and expected to be used briefly.
optional<shared_ptr<ray::RayletClientInterface>> GetOrConnectByID(ray::NodeID id);
std::optional<std::shared_ptr<ray::RayletClientInterface>> GetOrConnectByID(
ray::NodeID id);

/// Return an existing NodeManagerWorkerClient if exists, and connect to one if it does
/// not. The returned pointer is borrowed, and expected to be used briefly.
/// The function is guaranteed to return the non-nullptr.
shared_ptr<ray::RayletClientInterface> GetOrConnectByAddress(
std::shared_ptr<ray::RayletClientInterface> GetOrConnectByAddress(
const rpc::Address &address);

/// Removes a connection to the worker from the pool, if one exists. Since the
Expand Down Expand Up @@ -77,8 +75,8 @@ class NodeManagerClientPool {

/// A pool of open connections by host:port. Clients can reuse the connection
/// objects in this pool by requesting them
absl::flat_hash_map<ray::NodeID, shared_ptr<ray::RayletClientInterface>> client_map_
ABSL_GUARDED_BY(mu_);
absl::flat_hash_map<ray::NodeID, std::shared_ptr<ray::RayletClientInterface>>
client_map_ ABSL_GUARDED_BY(mu_);
};

} // namespace rpc
Expand Down
14 changes: 2 additions & 12 deletions src/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@

#pragma once

#ifdef __clang__
// TODO(mehrdadn): Remove this when the warnings are addressed
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wunused-result"
#endif

#include <grpcpp/grpcpp.h>

#include <deque>
Expand Down Expand Up @@ -473,12 +467,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
int64_t max_finished_seq_no_ ABSL_GUARDED_BY(mutex_) = -1;
};

typedef std::function<std::shared_ptr<CoreWorkerClientInterface>(const rpc::Address &)>
ClientFactoryFn;
using CoreWorkerClientFactoryFn =
std::function<std::shared_ptr<CoreWorkerClientInterface>(const rpc::Address &)>;

} // namespace rpc
} // namespace ray

#ifdef __clang__
#pragma clang diagnostic pop
#endif
2 changes: 1 addition & 1 deletion src/ray/rpc/worker/core_worker_client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace ray {
namespace rpc {

shared_ptr<CoreWorkerClientInterface> CoreWorkerClientPool::GetOrConnect(
std::shared_ptr<CoreWorkerClientInterface> CoreWorkerClientPool::GetOrConnect(
const Address &addr_proto) {
RAY_CHECK_NE(addr_proto.worker_id(), "");
absl::MutexLock lock(&mu_);
Expand Down
15 changes: 6 additions & 9 deletions src/ray/rpc/worker/core_worker_client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
#include "ray/common/id.h"
#include "ray/rpc/worker/core_worker_client.h"

using absl::optional;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my....

using std::shared_ptr;

namespace ray {
namespace rpc {

Expand All @@ -35,13 +32,13 @@ class CoreWorkerClientPool {
: client_factory_(defaultClientFactory(ccm)){};

/// Creates a CoreWorkerClientPool by a given connection function.
CoreWorkerClientPool(ClientFactoryFn client_factory)
CoreWorkerClientPool(CoreWorkerClientFactoryFn client_factory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit and move semantics

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix in my next PR

: client_factory_(client_factory){};

/// Returns an open CoreWorkerClientInterface if one exists, and connect to one
/// if it does not. The returned pointer is borrowed, and expected to be used
/// briefly.
shared_ptr<CoreWorkerClientInterface> GetOrConnect(const Address &addr_proto);
std::shared_ptr<CoreWorkerClientInterface> GetOrConnect(const Address &addr_proto);

/// Removes a connection to the worker from the pool, if one exists. Since the
/// shared pointer will no longer be retained in the pool, the connection will
Expand All @@ -59,7 +56,7 @@ class CoreWorkerClientPool {
/// Provides the default client factory function. Providing this function to the
/// construtor aids migration but is ultimately a thing that should be
/// deprecated and brought internal to the pool, so this is our bridge.
ClientFactoryFn defaultClientFactory(rpc::ClientCallManager &ccm) const {
CoreWorkerClientFactoryFn defaultClientFactory(rpc::ClientCallManager &ccm) const {
return [&](const rpc::Address &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(new rpc::CoreWorkerClient(addr, ccm));
};
Expand All @@ -76,19 +73,19 @@ class CoreWorkerClientPool {
/// This factory function does the connection to CoreWorkerClient, and is
/// provided by the constructor (either the default implementation, above, or a
/// provided one)
ClientFactoryFn client_factory_;
CoreWorkerClientFactoryFn client_factory_;

absl::Mutex mu_;

struct CoreWorkerClientEntry {
public:
CoreWorkerClientEntry() {}
CoreWorkerClientEntry(ray::WorkerID worker_id,
shared_ptr<CoreWorkerClientInterface> core_worker_client)
std::shared_ptr<CoreWorkerClientInterface> core_worker_client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be moved?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix in my next PR

: worker_id(worker_id), core_worker_client(core_worker_client) {}

ray::WorkerID worker_id;
shared_ptr<CoreWorkerClientInterface> core_worker_client;
std::shared_ptr<CoreWorkerClientInterface> core_worker_client;
};

/// A list of open connections from the most recent accessed to the least recent
Expand Down