Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "use an agent-id rather than the process PID (#24968)"… #25376

Merged
merged 1 commit into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ def __init__(
dashboard_agent_port,
gcs_address,
minimal,
temp_dir=None,
session_dir=None,
runtime_env_dir=None,
log_dir=None,
metrics_export_port=None,
node_manager_port=None,
listen_port=0,
object_store_name=None,
raylet_name=None,
logging_params=None,
disable_metrics_collection: bool = False,
*, # the following are required kwargs
object_store_name: str,
raylet_name: str,
log_dir: str,
temp_dir: str,
session_dir: str,
runtime_env_dir: str,
logging_params: dict,
agent_id: int,
):
"""Initialize the DashboardAgent object."""
# Public attributes are accessible for all agent modules.
Expand All @@ -76,6 +78,7 @@ def __init__(
self.logging_params = logging_params
self.node_id = os.environ["RAY_NODE_ID"]
self.metrics_collection_disabled = disable_metrics_collection
self.agent_id = agent_id
# TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is
# only used for fate-sharing with the raylet and we need a different
# fate-sharing mechanism for Windows anyways.
Expand Down Expand Up @@ -203,7 +206,7 @@ async def _check_parent():

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_pid=os.getpid(),
agent_id=self.agent_id,
agent_port=self.grpc_port,
agent_ip_address=self.ip,
)
Expand Down Expand Up @@ -354,6 +357,12 @@ async def _check_parent():
action="store_true",
help=("If this arg is set, metrics report won't be enabled from the agent."),
)
parser.add_argument(
"--agent-id",
required=True,
type=int,
help="ID to report when registering with raylet",
)

args = parser.parse_args()
try:
Expand Down Expand Up @@ -383,6 +392,7 @@ async def _check_parent():
raylet_name=args.raylet_name,
logging_params=logging_params,
disable_metrics_collection=args.disable_metrics_collection,
agent_id=args.agent_id,
)
if os.environ.get("_RAY_AGENT_FAILING"):
raise Exception("Failure injection failure.")
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/agent_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum AgentRpcStatus {
}

message RegisterAgentRequest {
int32 agent_pid = 1;
int32 agent_id = 1;
int32 agent_port = 2;
string agent_ip_address = 3;
}
Expand Down
81 changes: 51 additions & 30 deletions src/ray/raylet/agent_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ namespace raylet {
void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request,
rpc::RegisterAgentReply *reply,
rpc::SendReplyCallback send_reply_callback) {
agent_ip_address_ = request.agent_ip_address();
agent_port_ = request.agent_port();
agent_pid_ = request.agent_pid();
reported_agent_ip_address_ = request.agent_ip_address();
reported_agent_port_ = request.agent_port();
reported_agent_id_ = request.agent_id();
// TODO(SongGuyang): We should remove this after we find better port resolution.
// Note: `agent_port_` should be 0 if the grpc port of agent is in conflict.
if (agent_port_ != 0) {
runtime_env_agent_client_ =
runtime_env_agent_client_factory_(agent_ip_address_, agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_
<< ", port: " << agent_port_ << ", pid: " << agent_pid_;
if (reported_agent_port_ != 0) {
runtime_env_agent_client_ = runtime_env_agent_client_factory_(
reported_agent_ip_address_, reported_agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << reported_agent_ip_address_
<< ", port: " << reported_agent_port_ << ", id: " << reported_agent_id_;
} else {
RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: "
<< agent_ip_address_ << ", pid: " << agent_pid_
<< reported_agent_ip_address_ << ", id: " << reported_agent_id_
<< ". The agent client in the raylet has been disabled.";
disable_agent_client_ = true;
}
Expand All @@ -56,58 +56,79 @@ void AgentManager::StartAgent() {
return;
}

if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : options_.agent_commands) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
// Create a non-zero random agent_id to pass to the child process
// We cannot use pid an id because os.getpid() from the python process is not
// reliable when using a launcher.
// See https://github.com/ray-project/ray/issues/24361 and Python issue
// https://github.com/python/cpython/issues/83086
int agent_id = 0;
while (agent_id == 0) {
agent_id = rand();
}

// Launch the process to create the agent.
std::error_code ec;
const std::string agent_id_str = std::to_string(agent_id);
std::vector<const char *> argv;
for (const std::string &arg : options_.agent_commands) {
argv.push_back(arg.c_str());
}
argv.push_back("--agent-id");
argv.push_back(agent_id_str.c_str());

// Disable metrics report if needed.
if (!RayConfig::instance().enable_metrics_collection()) {
argv.push_back("--disable-metrics-collection");
}
argv.push_back(NULL);

if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : argv) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
}

// Set node id to agent.
ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});

// Launch the process to create the agent.
std::error_code ec;
Process child(argv.data(), nullptr, ec, false, env);
if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start agent with return value " << ec << ": "
<< ec.message();
}

std::thread monitor_thread([this, child]() mutable {
std::thread monitor_thread([this, child, agent_id]() mutable {
SetThreadName("agent.monitor");
RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId()
<< ", register timeout "
RAY_LOG(INFO) << "Monitor agent process with id " << agent_id << ", register timeout "
<< RayConfig::instance().agent_register_timeout_ms() << "ms.";
auto timer = delay_executor_(
[this, child]() mutable {
if (agent_pid_ != child.GetId()) {
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " has not registered. ip " << agent_ip_address_
<< ", pid " << agent_pid_;
[this, child, agent_id]() mutable {
if (reported_agent_id_ != agent_id) {
if (reported_agent_id_ == 0) {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " timed out before registering. ip "
<< reported_agent_ip_address_ << ", id "
<< reported_agent_id_;
} else {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " but got id " << reported_agent_id_
<< ", this is a fatal error";
}
child.Kill();
}
},
RayConfig::instance().agent_register_timeout_ms());

int exit_code = child.Wait();
timer->cancel();
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " exit, return value " << exit_code << ". ip "
<< agent_ip_address_ << ". pid " << agent_pid_;
RAY_LOG(WARNING) << "Agent process with id " << agent_id << " exit, return value "
<< exit_code << ". ip " << reported_agent_ip_address_ << ". id "
<< reported_agent_id_;
RAY_LOG(ERROR)
<< "The raylet exited immediately because the Ray agent failed. "
"The raylet fate shares with the agent. This can happen because the "
Expand Down
6 changes: 3 additions & 3 deletions src/ray/raylet/agent_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ class AgentManager : public rpc::AgentManagerServiceHandler {

private:
Options options_;
pid_t agent_pid_ = 0;
int agent_port_ = 0;
pid_t reported_agent_id_ = 0;
int reported_agent_port_ = 0;
/// Whether or not we intend to start the agent. This is false if we
/// are missing Ray Dashboard dependencies, for example.
bool should_start_agent_ = true;
std::string agent_ip_address_;
std::string reported_agent_ip_address_;
DelayExecutorFn delay_executor_;
RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_;
std::shared_ptr<rpc::RuntimeEnvAgentClientInterface> runtime_env_agent_client_;
Expand Down