Skip to content

Commit

Permalink
Revert "[core] Remove gcs addr updater in core worker. (#24747)" (#25375
Browse files Browse the repository at this point in the history
)

Turns out #25342 wasn't the root cause of the ray shutdown flakiness. I realized there's another PR that could affect this test suite. Let's try reverting it and see if things get better.
  • Loading branch information
rkooo567 authored Jun 1, 2022
1 parent b9874f5 commit 49efcab
Show file tree
Hide file tree
Showing 19 changed files with 236 additions and 56 deletions.
1 change: 0 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,6 @@ cc_test(
deps = [
":ray_util",
"@boost//:asio",
"@boost//:process",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
10 changes: 2 additions & 8 deletions python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,13 @@ def remove_node(self, node, allow_graceful=True):
)

if self.head_node == node:
# We have to wait to prevent the raylet becomes a zombie which will prevent
# worker from exiting
self.head_node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful, wait=True
check_alive=False, allow_graceful=allow_graceful
)
self.head_node = None
# TODO(rliaw): Do we need to kill all worker processes?
else:
# We have to wait to prevent the raylet becomes a zombie which will prevent
# worker from exiting
node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful, wait=True
)
node.kill_all_processes(check_alive=False, allow_graceful=allow_graceful)
self.worker_nodes.remove(node)

assert (
Expand Down
12 changes: 2 additions & 10 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ def kill_reaper(self, check_alive: bool = True):
ray_constants.PROCESS_TYPE_REAPER, check_alive=check_alive
)

def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False):
def kill_all_processes(self, check_alive=True, allow_graceful=False):
"""Kill all of the processes.
Note that This is slower than necessary because it calls kill, wait,
Expand All @@ -1350,8 +1350,6 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False)
Args:
check_alive: Raise an exception if any of the processes were
already dead.
wait: If true, then this method will not return until the
process in question has exited.
"""
# Kill the raylet first. This is important for suppressing errors at
# shutdown because we give the raylet a chance to exit gracefully and
Expand All @@ -1363,15 +1361,13 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False)
ray_constants.PROCESS_TYPE_RAYLET,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
)

if ray_constants.PROCESS_TYPE_GCS_SERVER in self.all_processes:
self._kill_process_type(
ray_constants.PROCESS_TYPE_GCS_SERVER,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
)

# We call "list" to copy the keys because we are modifying the
Expand All @@ -1381,18 +1377,14 @@ def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False)
# while cleaning up.
if process_type != ray_constants.PROCESS_TYPE_REAPER:
self._kill_process_type(
process_type,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
process_type, check_alive=check_alive, allow_graceful=allow_graceful
)

if ray_constants.PROCESS_TYPE_REAPER in self.all_processes:
self._kill_process_type(
ray_constants.PROCESS_TYPE_REAPER,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
)

def live_processes(self):
Expand Down
10 changes: 2 additions & 8 deletions python/ray/tests/test_scheduling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,7 @@ def func():


@pytest.mark.parametrize("connect_to_client", [True, False])
def test_node_affinity_scheduling_strategy(
monkeypatch, ray_start_cluster, connect_to_client
):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "4")
def test_node_affinity_scheduling_strategy(ray_start_cluster, connect_to_client):
cluster = ray_start_cluster
cluster.add_node(num_cpus=8, resources={"head": 1})
ray.init(address=cluster.address)
Expand Down Expand Up @@ -519,10 +516,7 @@ def ping(self):
@pytest.mark.skipif(
platform.system() == "Windows", reason="FakeAutoscaler doesn't work on Windows"
)
def test_demand_report_for_node_affinity_scheduling_strategy(
monkeypatch, shutdown_only
):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "4")
def test_demand_report_for_node_affinity_scheduling_strategy(shutdown_only):
from ray.cluster_utils import AutoscalingCluster

cluster = AutoscalingCluster(
Expand Down
4 changes: 4 additions & 0 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ class MockRayletClientInterface : public RayletClientInterface {
GetSystemConfig,
(const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback),
(override));
MOCK_METHOD(void,
GetGcsServerAddress,
(const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback),
(override));
MOCK_METHOD(void,
UpdateResourceUsage,
(std::string & serialized_resource_usage_batch,
Expand Down
25 changes: 19 additions & 6 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
<< rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID()
<< ", raylet " << local_raylet_id;

// Begin to get gcs server address from raylet.
gcs_server_address_updater_ = std::make_unique<GcsServerAddressUpdater>(
options_.raylet_ip_address,
options_.node_manager_port,
[this](std::string ip, int port) {
absl::MutexLock lock(&gcs_server_address_mutex_);
gcs_server_address_.first = ip;
gcs_server_address_.second = port;
});

gcs_client_ = std::make_shared<gcs::GcsClient>(options_.gcs_options);

RAY_CHECK_OK(gcs_client_->Connect(io_service_));
Expand Down Expand Up @@ -236,7 +246,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
});
if (options_.worker_type != WorkerType::DRIVER) {

if (options_.worker_type == WorkerType::WORKER) {
periodical_runner_.RunFnPeriodically(
[this] { ExitIfParentRayletDies(); },
RayConfig::instance().raylet_death_check_interval_milliseconds());
Expand Down Expand Up @@ -748,14 +759,16 @@ void CoreWorker::RegisterToGcs() {
}

void CoreWorker::ExitIfParentRayletDies() {
RAY_CHECK(options_.worker_type == WorkerType::WORKER);
RAY_CHECK(!RayConfig::instance().RAYLET_PID().empty());
static auto raylet_pid =
static_cast<pid_t>(std::stoi(RayConfig::instance().RAYLET_PID()));
auto raylet_pid = static_cast<pid_t>(std::stoi(RayConfig::instance().RAYLET_PID()));
bool should_shutdown = !IsProcessAlive(raylet_pid);
if (should_shutdown) {
RAY_LOG(WARNING) << "Shutting down the core worker because the local raylet failed. "
<< "Check out the raylet.out log file. Raylet pid: " << raylet_pid;
QuickExit();
std::ostringstream stream;
stream << "Shutting down the core worker because the local raylet failed. "
<< "Check out the raylet.out log file. Raylet pid: " << raylet_pid;
RAY_LOG(WARNING) << stream.str();
task_execution_service_.post([this]() { Shutdown(); }, "CoreWorker.Shutdown");
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "ray/core_worker/core_worker_options.h"
#include "ray/core_worker/core_worker_process.h"
#include "ray/core_worker/future_resolver.h"
#include "ray/core_worker/gcs_server_address_updater.h"
#include "ray/core_worker/lease_policy.h"
#include "ray/core_worker/object_recovery_manager.h"
#include "ray/core_worker/profiling.h"
Expand Down Expand Up @@ -1112,6 +1113,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
// Client to the GCS shared by core worker interfaces.
std::shared_ptr<gcs::GcsClient> gcs_client_;

std::pair<std::string, int> gcs_server_address_ GUARDED_BY(gcs_server_address_mutex_) =
std::make_pair<std::string, int>("", 0);
/// To protect accessing the `gcs_server_address_`.
absl::Mutex gcs_server_address_mutex_;
std::unique_ptr<GcsServerAddressUpdater> gcs_server_address_updater_;

// Client to the raylet shared by core worker interfaces. This needs to be a
// shared_ptr for direct calls because we can lease multiple workers through
// one client, and we need to keep the connection alive until we return all
Expand Down
96 changes: 96 additions & 0 deletions src/ray/core_worker/gcs_server_address_updater.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/core_worker/gcs_server_address_updater.h"

namespace ray {
namespace core {

GcsServerAddressUpdater::GcsServerAddressUpdater(
const std::string raylet_ip_address,
const int port,
std::function<void(std::string, int)> update_func)
: client_call_manager_(updater_io_service_),
raylet_client_(rpc::NodeManagerWorkerClient::make(
raylet_ip_address, port, client_call_manager_)),
update_func_(update_func),
updater_runner_(updater_io_service_),
updater_thread_([this] {
SetThreadName("gcs_addr_updater");
std::thread::id this_id = std::this_thread::get_id();
RAY_LOG(INFO) << "GCS Server updater thread id: " << this_id;
/// The asio work to keep io_service_ alive.
boost::asio::io_service::work io_service_work_(updater_io_service_);
updater_io_service_.run();
}) {
// Start updating gcs server address.
updater_runner_.RunFnPeriodically(
[this] { UpdateGcsServerAddress(); },
RayConfig::instance().gcs_service_address_check_interval_milliseconds(),
"GcsServerAddressUpdater.UpdateGcsServerAddress");
}

GcsServerAddressUpdater::~GcsServerAddressUpdater() {
updater_io_service_.stop();
if (updater_thread_.joinable()) {
updater_thread_.join();
} else {
RAY_LOG(WARNING)
<< "Could not join updater thread. This can cause segfault upon destruction.";
}
RAY_LOG(DEBUG) << "GcsServerAddressUpdater is destructed";
}

void GcsServerAddressUpdater::UpdateGcsServerAddress() {
raylet_client_.GetGcsServerAddress([this](const Status &status,
const rpc::GetGcsServerAddressReply &reply) {
const int64_t max_retries =
RayConfig::instance().gcs_rpc_server_reconnect_timeout_s() * 1000 /
RayConfig::instance().gcs_service_address_check_interval_milliseconds();
if (!status.ok()) {
failed_ping_count_ += 1;
auto warning_threshold = max_retries / 2;
RAY_LOG_EVERY_N(WARNING, warning_threshold)
<< "Failed to get the gcs server address from raylet " << failed_ping_count_
<< " times in a row. If it keeps failing to obtain the address, "
"the worker might crash. Connection status "
<< status;
if (failed_ping_count_ >= max_retries) {
std::stringstream os;
os << "Failed to receive the GCS address for " << failed_ping_count_
<< " times without success. The worker will exit ungracefully. It is because ";
if (status.IsGrpcUnavailable()) {
RAY_LOG(WARNING) << os.str()
<< "raylet has died, and it couldn't obtain the GCS address "
"from the raylet anymore. Please check the log from "
"raylet.err on this address.";
} else {
RAY_LOG(ERROR)
<< os.str()
<< "GCS has died. It could be because there was an issue that "
"kills GCS, such as high memory usage triggering OOM killer "
"to kill GCS. Cluster will be highly likely unavailable if you see "
"this log. Please check the log from gcs_server.err.";
}
QuickExit();
}
} else {
failed_ping_count_ = 0;
update_func_(reply.ip(), reply.port());
}
});
}

} // namespace core
} // namespace ray
52 changes: 52 additions & 0 deletions src/ray/core_worker/gcs_server_address_updater.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/raylet_client/raylet_client.h"

namespace ray {
namespace core {

class GcsServerAddressUpdater {
public:
/// Create a updater for gcs server address.
///
/// \param raylet_ip_address Raylet ip address.
/// \param port Port to connect raylet.
/// \param address to store gcs server address.
GcsServerAddressUpdater(const std::string raylet_ip_address,
const int port,
std::function<void(std::string, int)> update_func);

~GcsServerAddressUpdater();

private:
/// Update gcs server address.
void UpdateGcsServerAddress();

instrumented_io_context updater_io_service_;
rpc::ClientCallManager client_call_manager_;
/// A client connection to the raylet.
raylet::RayletClient raylet_client_;
std::function<void(std::string, int)> update_func_;
PeriodicalRunner updater_runner_;
std::thread updater_thread_;
int32_t failed_ping_count_ = 0;
};

} // namespace core
} // namespace ray
4 changes: 4 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ struct GcsServerMocker {
void GetSystemConfig(const ray::rpc::ClientCallback<ray::rpc::GetSystemConfigReply>
&callback) override {}

void GetGcsServerAddress(
const ray::rpc::ClientCallback<ray::rpc::GetGcsServerAddressReply> &callback)
override {}

/// ResourceUsageInterface
void RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) override {
Expand Down
10 changes: 10 additions & 0 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ message UpdateResourceUsageRequest {
message UpdateResourceUsageReply {
}

message GetGcsServerAddressRequest {
}

message GetGcsServerAddressReply {
string ip = 1;
int32 port = 2;
}

message GetTasksInfoRequest {}

message GetTasksInfoReply {
Expand Down Expand Up @@ -378,6 +386,8 @@ service NodeManagerService {
returns (ReleaseUnusedBundlesReply);
// Get the system config.
rpc GetSystemConfig(GetSystemConfigRequest) returns (GetSystemConfigReply);
// Get gcs server address.
rpc GetGcsServerAddress(GetGcsServerAddressRequest) returns (GetGcsServerAddressReply);
// [State API] Get the all task information of the node.
rpc GetTasksInfo(GetTasksInfoRequest) returns (GetTasksInfoReply);
// [State API] Get the all object information of the node.
Expand Down
10 changes: 10 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2409,6 +2409,16 @@ void NodeManager::HandleGetSystemConfig(const rpc::GetSystemConfigRequest &reque
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void NodeManager::HandleGetGcsServerAddress(
const rpc::GetGcsServerAddressRequest &request,
rpc::GetGcsServerAddressReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto address = gcs_client_->GetGcsServerAddress();
reply->set_ip(address.first);
reply->set_port(address.second);
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request,
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down
Loading

0 comments on commit 49efcab

Please sign in to comment.