From 5a7f4729c7cbf60d257181c5b1cccb4189a9caa6 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 16 Nov 2021 21:20:49 +0900 Subject: [PATCH] [Core] Better logs job message failure (#20363) ## Why are these changes needed? There's one user who has an issue that one of raylets cannot schedule tasks anymore because `num_worker_not_started_by_job_config_not_exist ` > 0. This PR adds better log messages to figure out if the root cause is the job information is not properly propagated from GCS to raylet through Redis pubsub. ## Related issue number ## Checks - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --- src/ray/raylet/node_manager.cc | 4 +++- src/ray/raylet/worker_pool.cc | 17 +++++++++++++++-- src/ray/raylet/worker_pool.h | 6 ++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2c43bd7612d7..c36c72a6c25a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -538,7 +538,9 @@ void NodeManager::DestroyWorker(std::shared_ptr worker, } void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_data) { - RAY_LOG(DEBUG) << "HandleJobStarted for job " << job_id; + RAY_LOG(INFO) << "New job has started. Job id " << job_id << " Driver pid " + << job_data.driver_pid() << " is dead: " << job_data.is_dead() + << " driver address: " << job_data.driver_ip_address(); worker_pool_.HandleJobStarted(job_id, job_data.config()); // NOTE: Technically `HandleJobStarted` isn't idempotent because we'll // increment the ref count multiple times. This is fine because diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 64836c91fe22..e303f0152302 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -193,6 +193,7 @@ Process WorkerPool::StartWorkerProcess( RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet."; // Will reschedule ready tasks in `NodeManager::HandleJobStarted`. *status = PopWorkerStatus::JobConfigMissing; + process_failed_job_config_missing_++; return Process(); } job_config = &it->second; @@ -215,6 +216,7 @@ Process WorkerPool::StartWorkerProcess( << " workers of language type " << static_cast(language) << " pending registration"; *status = PopWorkerStatus::TooManyStartingWorkerProcesses; + process_failed_rate_limited_++; return Process(); } // Either there are no workers pending registration or the worker start is being forced. @@ -447,6 +449,7 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId() << ") have not registered to raylet within timeout."; PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration; + process_failed_pending_registration_++; bool found; bool used; TaskID task_id; @@ -1067,13 +1070,14 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, // create runtime env. CreateRuntimeEnv( task_spec.SerializedRuntimeEnv(), task_spec.JobId(), - [start_worker_process_fn, callback, &state, task_spec, dynamic_options]( + [this, start_worker_process_fn, callback, &state, task_spec, dynamic_options]( bool successful, const std::string &serialized_runtime_env_context) { if (successful) { start_worker_process_fn(task_spec, state, dynamic_options, true, task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context, callback); } else { + process_failed_runtime_env_setup_failed_++; callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed); RAY_LOG(WARNING) << "Create runtime env failed for task " << task_spec.TaskId() @@ -1125,13 +1129,14 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, // create runtime env. CreateRuntimeEnv( task_spec.SerializedRuntimeEnv(), task_spec.JobId(), - [start_worker_process_fn, callback, &state, task_spec]( + [this, start_worker_process_fn, callback, &state, task_spec]( bool successful, const std::string &serialized_runtime_env_context) { if (successful) { start_worker_process_fn(task_spec, state, {}, false, task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context, callback); } else { + process_failed_runtime_env_setup_failed_++; callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed); RAY_LOG(WARNING) << "Create runtime env failed for task " << task_spec.TaskId() @@ -1381,6 +1386,14 @@ std::unordered_set> WorkerPool::GetWorkersByPro std::string WorkerPool::DebugString() const { std::stringstream result; result << "WorkerPool:"; + result << "\n- registered jobs: " << all_jobs_.size() - finished_jobs_.size(); + result << "\n- process_failed_job_config_missing: " + << process_failed_job_config_missing_; + result << "\n- process_failed_rate_limited: " << process_failed_rate_limited_; + result << "\n- process_failed_pending_registration: " + << process_failed_pending_registration_; + result << "\n- process_failed_runtime_env_setup_failed: " + << process_failed_runtime_env_setup_failed_; for (const auto &entry : states_by_lang_) { result << "\n- num " << Language_Name(entry.first) << " workers: " << entry.second.registered_workers.size(); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 2035047022c9..31e0c61c2973 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -681,6 +681,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { const std::function get_time_; /// Agent manager. std::shared_ptr agent_manager_; + + /// Stats + int64_t process_failed_job_config_missing_ = 0; + int64_t process_failed_rate_limited_ = 0; + int64_t process_failed_pending_registration_ = 0; + int64_t process_failed_runtime_env_setup_failed_ = 0; }; } // namespace raylet