diff --git a/dashboard/agent.py b/dashboard/agent.py index 772af96ab7ed..2805059585be 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -43,17 +43,19 @@ def __init__( dashboard_agent_port, gcs_address, minimal, - temp_dir=None, - session_dir=None, - runtime_env_dir=None, - log_dir=None, metrics_export_port=None, node_manager_port=None, listen_port=0, - object_store_name=None, - raylet_name=None, - logging_params=None, disable_metrics_collection: bool = False, + *, # the following are required kwargs + object_store_name: str, + raylet_name: str, + log_dir: str, + temp_dir: str, + session_dir: str, + runtime_env_dir: str, + logging_params: dict, + agent_id: int, ): """Initialize the DashboardAgent object.""" # Public attributes are accessible for all agent modules. @@ -76,6 +78,7 @@ def __init__( self.logging_params = logging_params self.node_id = os.environ["RAY_NODE_ID"] self.metrics_collection_disabled = disable_metrics_collection + self.agent_id = agent_id # TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is # only used for fate-sharing with the raylet and we need a different # fate-sharing mechanism for Windows anyways. @@ -203,7 +206,7 @@ async def _check_parent(): await raylet_stub.RegisterAgent( agent_manager_pb2.RegisterAgentRequest( - agent_pid=os.getpid(), + agent_id=self.agent_id, agent_port=self.grpc_port, agent_ip_address=self.ip, ) @@ -354,6 +357,12 @@ async def _check_parent(): action="store_true", help=("If this arg is set, metrics report won't be enabled from the agent."), ) + parser.add_argument( + "--agent-id", + required=True, + type=int, + help="ID to report when registering with raylet", + ) args = parser.parse_args() try: @@ -383,6 +392,7 @@ async def _check_parent(): raylet_name=args.raylet_name, logging_params=logging_params, disable_metrics_collection=args.disable_metrics_collection, + agent_id=args.agent_id, ) if os.environ.get("_RAY_AGENT_FAILING"): raise Exception("Failure injection failure.") diff --git a/src/ray/protobuf/agent_manager.proto b/src/ray/protobuf/agent_manager.proto index cbbd12700453..bf438ece0464 100644 --- a/src/ray/protobuf/agent_manager.proto +++ b/src/ray/protobuf/agent_manager.proto @@ -25,7 +25,7 @@ enum AgentRpcStatus { } message RegisterAgentRequest { - int32 agent_pid = 1; + int32 agent_id = 1; int32 agent_port = 2; string agent_ip_address = 3; } diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 7076344325ee..c08cc0c2798a 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -29,19 +29,19 @@ namespace raylet { void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, rpc::RegisterAgentReply *reply, rpc::SendReplyCallback send_reply_callback) { - agent_ip_address_ = request.agent_ip_address(); - agent_port_ = request.agent_port(); - agent_pid_ = request.agent_pid(); + reported_agent_ip_address_ = request.agent_ip_address(); + reported_agent_port_ = request.agent_port(); + reported_agent_id_ = request.agent_id(); // TODO(SongGuyang): We should remove this after we find better port resolution. // Note: `agent_port_` should be 0 if the grpc port of agent is in conflict. - if (agent_port_ != 0) { - runtime_env_agent_client_ = - runtime_env_agent_client_factory_(agent_ip_address_, agent_port_); - RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ - << ", port: " << agent_port_ << ", pid: " << agent_pid_; + if (reported_agent_port_ != 0) { + runtime_env_agent_client_ = runtime_env_agent_client_factory_( + reported_agent_ip_address_, reported_agent_port_); + RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << reported_agent_ip_address_ + << ", port: " << reported_agent_port_ << ", id: " << reported_agent_id_; } else { RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: " - << agent_ip_address_ << ", pid: " << agent_pid_ + << reported_agent_ip_address_ << ", id: " << reported_agent_id_ << ". The agent client in the raylet has been disabled."; disable_agent_client_ = true; } @@ -56,30 +56,45 @@ void AgentManager::StartAgent() { return; } - if (RAY_LOG_ENABLED(DEBUG)) { - std::stringstream stream; - stream << "Starting agent process with command:"; - for (const auto &arg : options_.agent_commands) { - stream << " " << arg; - } - RAY_LOG(DEBUG) << stream.str(); + // Create a non-zero random agent_id to pass to the child process + // We cannot use pid an id because os.getpid() from the python process is not + // reliable when using a launcher. + // See https://github.com/ray-project/ray/issues/24361 and Python issue + // https://github.com/python/cpython/issues/83086 + int agent_id = 0; + while (agent_id == 0) { + agent_id = rand(); } - - // Launch the process to create the agent. - std::error_code ec; + const std::string agent_id_str = std::to_string(agent_id); std::vector argv; for (const std::string &arg : options_.agent_commands) { argv.push_back(arg.c_str()); } + argv.push_back("--agent-id"); + argv.push_back(agent_id_str.c_str()); + // Disable metrics report if needed. if (!RayConfig::instance().enable_metrics_collection()) { argv.push_back("--disable-metrics-collection"); } argv.push_back(NULL); + + if (RAY_LOG_ENABLED(DEBUG)) { + std::stringstream stream; + stream << "Starting agent process with command:"; + for (const auto &arg : argv) { + stream << " " << arg; + } + RAY_LOG(DEBUG) << stream.str(); + } + // Set node id to agent. ProcessEnvironment env; env.insert({"RAY_NODE_ID", options_.node_id.Hex()}); env.insert({"RAY_RAYLET_PID", std::to_string(getpid())}); + + // Launch the process to create the agent. + std::error_code ec; Process child(argv.data(), nullptr, ec, false, env); if (!child.IsValid() || ec) { // The worker failed to start. This is a fatal error. @@ -87,17 +102,23 @@ void AgentManager::StartAgent() { << ec.message(); } - std::thread monitor_thread([this, child]() mutable { + std::thread monitor_thread([this, child, agent_id]() mutable { SetThreadName("agent.monitor"); - RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId() - << ", register timeout " + RAY_LOG(INFO) << "Monitor agent process with id " << agent_id << ", register timeout " << RayConfig::instance().agent_register_timeout_ms() << "ms."; auto timer = delay_executor_( - [this, child]() mutable { - if (agent_pid_ != child.GetId()) { - RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() - << " has not registered. ip " << agent_ip_address_ - << ", pid " << agent_pid_; + [this, child, agent_id]() mutable { + if (reported_agent_id_ != agent_id) { + if (reported_agent_id_ == 0) { + RAY_LOG(WARNING) << "Agent process expected id " << agent_id + << " timed out before registering. ip " + << reported_agent_ip_address_ << ", id " + << reported_agent_id_; + } else { + RAY_LOG(WARNING) << "Agent process expected id " << agent_id + << " but got id " << reported_agent_id_ + << ", this is a fatal error"; + } child.Kill(); } }, @@ -105,9 +126,9 @@ void AgentManager::StartAgent() { int exit_code = child.Wait(); timer->cancel(); - RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() - << " exit, return value " << exit_code << ". ip " - << agent_ip_address_ << ". pid " << agent_pid_; + RAY_LOG(WARNING) << "Agent process with id " << agent_id << " exit, return value " + << exit_code << ". ip " << reported_agent_ip_address_ << ". id " + << reported_agent_id_; RAY_LOG(ERROR) << "The raylet exited immediately because the Ray agent failed. " "The raylet fate shares with the agent. This can happen because the " diff --git a/src/ray/raylet/agent_manager.h b/src/ray/raylet/agent_manager.h index 23fb6835dc38..ef460b96c9f6 100644 --- a/src/ray/raylet/agent_manager.h +++ b/src/ray/raylet/agent_manager.h @@ -93,12 +93,12 @@ class AgentManager : public rpc::AgentManagerServiceHandler { private: Options options_; - pid_t agent_pid_ = 0; - int agent_port_ = 0; + pid_t reported_agent_id_ = 0; + int reported_agent_port_ = 0; /// Whether or not we intend to start the agent. This is false if we /// are missing Ray Dashboard dependencies, for example. bool should_start_agent_ = true; - std::string agent_ip_address_; + std::string reported_agent_ip_address_; DelayExecutorFn delay_executor_; RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_; std::shared_ptr runtime_env_agent_client_;