Skip to content

Commit

Permalink
use an agent-id rather than the process PID (#24968)
Browse files Browse the repository at this point in the history
When using ray inside a virtualenv on windows, python.exe as reported by sys.executable is a PEP397 launcher to the actual python as reported by os.getpid():

>>> import sys, os, psutil
>>> >>> print(sys.executable)
C:\temp\issue24361\Scripts\python.exe
>>> os.getpid()
2208
>>> child = psutil.Process(2208)
>>> child.cmdline()
['C:\\oss\\CPython38\\python.exe']
>>> child.parent().cmdline()
['C:\\temp\\issue24361\\Scripts\\python.exe']
>>> child.parent().pid
6424
When the agent_manager launches the agent process via Process::Process(), it gets the PID of the launcher process (6424), which is what is expected as an ID when registering the agent in the gRPC callback. But inside agent.py, the child process reports the PID via os.getpid(), which is 2208, and this is the wrong PID to register the agent.

The solution proposed here is another version of #24905 that creates a int agent_id = rand(); before starting the python process, and passes the agent_id to the process.
  • Loading branch information
mattip authored May 27, 2022
1 parent 3234fd3 commit 02f220b
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 42 deletions.
26 changes: 18 additions & 8 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ def __init__(
dashboard_agent_port,
gcs_address,
minimal,
temp_dir=None,
session_dir=None,
runtime_env_dir=None,
log_dir=None,
metrics_export_port=None,
node_manager_port=None,
listen_port=0,
object_store_name=None,
raylet_name=None,
logging_params=None,
disable_metrics_collection: bool = False,
*, # the following are required kwargs
object_store_name: str,
raylet_name: str,
log_dir: str,
temp_dir: str,
session_dir: str,
runtime_env_dir: str,
logging_params: dict,
agent_id: int,
):
"""Initialize the DashboardAgent object."""
# Public attributes are accessible for all agent modules.
Expand All @@ -76,6 +78,7 @@ def __init__(
self.logging_params = logging_params
self.node_id = os.environ["RAY_NODE_ID"]
self.metrics_collection_disabled = disable_metrics_collection
self.agent_id = agent_id
# TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is
# only used for fate-sharing with the raylet and we need a different
# fate-sharing mechanism for Windows anyways.
Expand Down Expand Up @@ -203,7 +206,7 @@ async def _check_parent():

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_pid=os.getpid(),
agent_id=self.agent_id,
agent_port=self.grpc_port,
agent_ip_address=self.ip,
)
Expand Down Expand Up @@ -354,6 +357,12 @@ async def _check_parent():
action="store_true",
help=("If this arg is set, metrics report won't be enabled from the agent."),
)
parser.add_argument(
"--agent-id",
required=True,
type=int,
help="ID to report when registering with raylet",
)

args = parser.parse_args()
try:
Expand Down Expand Up @@ -383,6 +392,7 @@ async def _check_parent():
raylet_name=args.raylet_name,
logging_params=logging_params,
disable_metrics_collection=args.disable_metrics_collection,
agent_id=args.agent_id,
)
if os.environ.get("_RAY_AGENT_FAILING"):
raise Exception("Failure injection failure.")
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/agent_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum AgentRpcStatus {
}

message RegisterAgentRequest {
int32 agent_pid = 1;
int32 agent_id = 1;
int32 agent_port = 2;
string agent_ip_address = 3;
}
Expand Down
81 changes: 51 additions & 30 deletions src/ray/raylet/agent_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ namespace raylet {
void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request,
rpc::RegisterAgentReply *reply,
rpc::SendReplyCallback send_reply_callback) {
agent_ip_address_ = request.agent_ip_address();
agent_port_ = request.agent_port();
agent_pid_ = request.agent_pid();
reported_agent_ip_address_ = request.agent_ip_address();
reported_agent_port_ = request.agent_port();
reported_agent_id_ = request.agent_id();
// TODO(SongGuyang): We should remove this after we find better port resolution.
// Note: `agent_port_` should be 0 if the grpc port of agent is in conflict.
if (agent_port_ != 0) {
runtime_env_agent_client_ =
runtime_env_agent_client_factory_(agent_ip_address_, agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_
<< ", port: " << agent_port_ << ", pid: " << agent_pid_;
if (reported_agent_port_ != 0) {
runtime_env_agent_client_ = runtime_env_agent_client_factory_(
reported_agent_ip_address_, reported_agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << reported_agent_ip_address_
<< ", port: " << reported_agent_port_ << ", id: " << reported_agent_id_;
} else {
RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: "
<< agent_ip_address_ << ", pid: " << agent_pid_
<< reported_agent_ip_address_ << ", id: " << reported_agent_id_
<< ". The agent client in the raylet has been disabled.";
disable_agent_client_ = true;
}
Expand All @@ -56,58 +56,79 @@ void AgentManager::StartAgent() {
return;
}

if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : options_.agent_commands) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
// Create a non-zero random agent_id to pass to the child process
// We cannot use pid an id because os.getpid() from the python process is not
// reliable when using a launcher.
// See https://github.com/ray-project/ray/issues/24361 and Python issue
// https://github.com/python/cpython/issues/83086
int agent_id = 0;
while (agent_id == 0) {
agent_id = rand();
}

// Launch the process to create the agent.
std::error_code ec;
const std::string agent_id_str = std::to_string(agent_id);
std::vector<const char *> argv;
for (const std::string &arg : options_.agent_commands) {
argv.push_back(arg.c_str());
}
argv.push_back("--agent-id");
argv.push_back(agent_id_str.c_str());

// Disable metrics report if needed.
if (!RayConfig::instance().enable_metrics_collection()) {
argv.push_back("--disable-metrics-collection");
}
argv.push_back(NULL);

if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : argv) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
}

// Set node id to agent.
ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});

// Launch the process to create the agent.
std::error_code ec;
Process child(argv.data(), nullptr, ec, false, env);
if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start agent with return value " << ec << ": "
<< ec.message();
}

std::thread monitor_thread([this, child]() mutable {
std::thread monitor_thread([this, child, agent_id]() mutable {
SetThreadName("agent.monitor");
RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId()
<< ", register timeout "
RAY_LOG(INFO) << "Monitor agent process with id " << agent_id << ", register timeout "
<< RayConfig::instance().agent_register_timeout_ms() << "ms.";
auto timer = delay_executor_(
[this, child]() mutable {
if (agent_pid_ != child.GetId()) {
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " has not registered. ip " << agent_ip_address_
<< ", pid " << agent_pid_;
[this, child, agent_id]() mutable {
if (reported_agent_id_ != agent_id) {
if (reported_agent_id_ == 0) {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " timed out before registering. ip "
<< reported_agent_ip_address_ << ", id "
<< reported_agent_id_;
} else {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " but got id " << reported_agent_id_
<< ", this is a fatal error";
}
child.Kill();
}
},
RayConfig::instance().agent_register_timeout_ms());

int exit_code = child.Wait();
timer->cancel();
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " exit, return value " << exit_code << ". ip "
<< agent_ip_address_ << ". pid " << agent_pid_;
RAY_LOG(WARNING) << "Agent process with id " << agent_id << " exit, return value "
<< exit_code << ". ip " << reported_agent_ip_address_ << ". id "
<< reported_agent_id_;
RAY_LOG(ERROR)
<< "The raylet exited immediately because the Ray agent failed. "
"The raylet fate shares with the agent. This can happen because the "
Expand Down
6 changes: 3 additions & 3 deletions src/ray/raylet/agent_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ class AgentManager : public rpc::AgentManagerServiceHandler {

private:
Options options_;
pid_t agent_pid_ = 0;
int agent_port_ = 0;
pid_t reported_agent_id_ = 0;
int reported_agent_port_ = 0;
/// Whether or not we intend to start the agent. This is false if we
/// are missing Ray Dashboard dependencies, for example.
bool should_start_agent_ = true;
std::string agent_ip_address_;
std::string reported_agent_ip_address_;
DelayExecutorFn delay_executor_;
RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_;
std::shared_ptr<rpc::RuntimeEnvAgentClientInterface> runtime_env_agent_client_;
Expand Down

0 comments on commit 02f220b

Please sign in to comment.