Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][state] Proper report of failure when job finishes #31761

Merged
merged 24 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions python/ray/tests/test_task_events.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from collections import defaultdict
from typing import Dict
import pytest

import ray
from ray._private.test_utils import (
raw_metrics,
run_string_as_driver_nonblocking,
wait_for_condition,
)
from ray.experimental.state.api import list_tasks
Expand Down Expand Up @@ -118,3 +120,140 @@ def verify():
timeout=10,
retry_interval_ms=500,
)


def test_fault_tolerance_parents_finish(shutdown_only):
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
ray.init(num_cpus=8, _system_config=_SYSTEM_CONFIG)
import time

# Each parent task spins off 2 child task, where each child spins off
# 1 grand_child task.
NUM_CHILD = 2

@ray.remote
def grand_child():
time.sleep(999)

@ray.remote
def finished_child():
grand_child.remote()

@ray.remote
def failed_child():
grand_child.remote()
raise ValueError("task is expected to fail")

@ray.remote
def parent():
for _ in range(NUM_CHILD):
finished_child.remote()
for _ in range(NUM_CHILD):
failed_child.remote()

raise ValueError("task is expected to fail")

with pytest.raises(ray.exceptions.RayTaskError):
ray.get(parent.remote())

def verify():
tasks = list_tasks()
assert len(tasks) == 9, (
"Incorrect number of tasks are reported. "
"Expected length: 1 parent + 4 child + 4 grandchild tasks"
)
for task in tasks:
# Finished child should have been
print(task)
if task["name"] == "finished_child":
assert (
task["scheduling_state"] == "FINISHED"
), f"{task['name']} has run state"
else:
assert (
task["scheduling_state"] == "FAILED"
), f"{task['name']} has run state"

return True

wait_for_condition(
verify,
timeout=10,
retry_interval_ms=500,
)


def test_fault_tolerance_job_failed(shutdown_only):
ray.init(num_cpus=8, _system_config=_SYSTEM_CONFIG)
script = """
import ray
import time

ray.init("auto")
NUM_CHILD = 2

@ray.remote
def grandchild():
time.sleep(999)

@ray.remote
def child():
ray.get(grandchild.remote())

@ray.remote
def finished_child():
ray.put(1)
return

@ray.remote
def parent():
children = [child.remote() for _ in range(NUM_CHILD)]
finished_children = ray.get([finished_child.remote() for _ in range(NUM_CHILD)])
ray.get(children)

ray.get(parent.remote())

"""
proc = run_string_as_driver_nonblocking(script)

def verify():
tasks = list_tasks()
print(tasks)
assert len(tasks) == 7, (
"Incorrect number of tasks are reported. "
"Expected length: 1 parent + 2 finished child + 2 failed child + "
"2 failed grandchild tasks"
)
return True

wait_for_condition(
verify,
timeout=10,
retry_interval_ms=500,
)

proc.kill()

def verify():
tasks = list_tasks()
assert len(tasks) == 7, (
"Incorrect number of tasks are reported. "
"Expected length: 1 parent + 2 finished child + 2 failed child + "
"2 failed grandchild tasks"
)
for task in tasks:
if "finished" in task["func_or_class_name"]:
assert (
task["scheduling_state"] == "FINISHED"
), f"task {task['func_or_class_name']} has wrong state"
else:
assert (
task["scheduling_state"] == "FAILED"
), f"task {task['func_or_class_name']} has wrong state"

return True

wait_for_condition(
verify,
timeout=10,
retry_interval_ms=500,
)
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ RAY_CONFIG(int64_t, task_events_max_num_task_events_in_buffer, 10000)
/// Setting the value to -1 allows unlimited profile events to be sent.
RAY_CONFIG(int64_t, task_events_max_num_profile_events_for_task, 100)

/// The delay in ms that GCS should mark any running tasks from a job as failed.
/// Setting this value too smaller might result in some finished tasks marked as failed by
/// GCS.
RAY_CONFIG(uint64_t, gcs_mark_task_failed_on_job_done_delay_ms, /* 3 secs */ 1000 * 3)
rickyyx marked this conversation as resolved.
Show resolved Hide resolved

/// Whether or not we enable metrics collection.
RAY_CONFIG(bool, enable_metrics_collection, true)

Expand Down
9 changes: 4 additions & 5 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
} else {
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, nullptr));
runtime_env_manager_.RemoveURIReference(job_id.Hex());
ClearJobInfos(job_id);
ClearJobInfos(job_table_data);
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
}
function_manager_.RemoveJobReference(job_id);
Expand Down Expand Up @@ -121,10 +121,10 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request,
}
}

void GcsJobManager::ClearJobInfos(const JobID &job_id) {
void GcsJobManager::ClearJobInfos(const rpc::JobTableData &job_data) {
// Notify all listeners.
for (auto &listener : job_finished_listeners_) {
listener(std::make_shared<JobID>(job_id));
listener(job_data);
}
// Clear cache.
// TODO(qwang): This line will cause `test_actor_advanced.py::test_detached_actor`
Expand All @@ -137,8 +137,7 @@ void GcsJobManager::ClearJobInfos(const JobID &job_id) {
/// Add listener to monitor the add action of nodes.
///
/// \param listener The handler which process the add of nodes.
void GcsJobManager::AddJobFinishedListener(
std::function<void(std::shared_ptr<JobID>)> listener) {
void GcsJobManager::AddJobFinishedListener(JobFinishListenerCallback listener) {
RAY_CHECK(listener);
job_finished_listeners_.emplace_back(std::move(listener));
}
Expand Down
9 changes: 5 additions & 4 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
namespace ray {
namespace gcs {

using JobFinishListenerCallback = rpc::JobInfoHandler::JobFinishListenerCallback;

/// This implementation class of `JobInfoHandler`.
class GcsJobManager : public rpc::JobInfoHandler {
public:
Expand Down Expand Up @@ -58,8 +60,7 @@ class GcsJobManager : public rpc::JobInfoHandler {
rpc::GetNextJobIDReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void AddJobFinishedListener(
std::function<void(std::shared_ptr<JobID>)> listener) override;
void AddJobFinishedListener(JobFinishListenerCallback listener) override;

std::shared_ptr<rpc::JobConfig> GetJobConfig(const JobID &job_id) const;

Expand All @@ -68,14 +69,14 @@ class GcsJobManager : public rpc::JobInfoHandler {
std::shared_ptr<GcsPublisher> gcs_publisher_;

/// Listeners which monitors the finish of jobs.
std::vector<std::function<void(std::shared_ptr<JobID>)>> job_finished_listeners_;
std::vector<JobFinishListenerCallback> job_finished_listeners_;

/// A cached mapping from job id to job config.
absl::flat_hash_map<JobID, std::shared_ptr<rpc::JobConfig>> cached_job_configs_;

ray::RuntimeEnvManager &runtime_env_manager_;
GcsFunctionManager &function_manager_;
void ClearJobInfos(const JobID &job_id);
void ClearJobInfos(const rpc::JobTableData &job_data);

void MarkJobAsFinished(rpc::JobTableData job_table_data,
std::function<void(Status)> done_callback);
Expand Down
8 changes: 5 additions & 3 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,11 @@ void GcsServer::InstallEventListeners() {
});

// Install job event listeners.
gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr<JobID> job_id) {
gcs_actor_manager_->OnJobFinished(*job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(*job_id);
gcs_job_manager_->AddJobFinishedListener([this](const rpc::JobTableData &job_data) {
auto job_id = JobID::FromBinary(job_data.job_id());
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
gcs_actor_manager_->OnJobFinished(job_id);
gcs_task_manager_->OnJobFinished(job_id, job_data.end_time());
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
});

// Install scheduling event listeners.
Expand Down
Loading