Skip to content

Commit

Permalink
[Core] Better logs job message failure (#20363)
Browse files Browse the repository at this point in the history
<!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. -->

## Why are these changes needed?

There's one user who has an issue that one of raylets cannot schedule tasks anymore because `num_worker_not_started_by_job_config_not_exist ` > 0.

This PR adds better log messages to figure out if the root cause is the job information is not properly propagated from GCS to raylet through Redis pubsub. 

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
  • Loading branch information
rkooo567 authored and Alex committed Nov 21, 2021
1 parent 0ea156b commit 5a7f472
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,9 @@ void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker,
}

void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_data) {
RAY_LOG(DEBUG) << "HandleJobStarted for job " << job_id;
RAY_LOG(INFO) << "New job has started. Job id " << job_id << " Driver pid "
<< job_data.driver_pid() << " is dead: " << job_data.is_dead()
<< " driver address: " << job_data.driver_ip_address();
worker_pool_.HandleJobStarted(job_id, job_data.config());
// NOTE: Technically `HandleJobStarted` isn't idempotent because we'll
// increment the ref count multiple times. This is fine because
Expand Down
17 changes: 15 additions & 2 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Process WorkerPool::StartWorkerProcess(
RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet.";
// Will reschedule ready tasks in `NodeManager::HandleJobStarted`.
*status = PopWorkerStatus::JobConfigMissing;
process_failed_job_config_missing_++;
return Process();
}
job_config = &it->second;
Expand All @@ -215,6 +216,7 @@ Process WorkerPool::StartWorkerProcess(
<< " workers of language type " << static_cast<int>(language)
<< " pending registration";
*status = PopWorkerStatus::TooManyStartingWorkerProcesses;
process_failed_rate_limited_++;
return Process();
}
// Either there are no workers pending registration or the worker start is being forced.
Expand Down Expand Up @@ -447,6 +449,7 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
<< ") have not registered to raylet within timeout.";
PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration;
process_failed_pending_registration_++;
bool found;
bool used;
TaskID task_id;
Expand Down Expand Up @@ -1067,13 +1070,14 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
// create runtime env.
CreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
[start_worker_process_fn, callback, &state, task_spec, dynamic_options](
[this, start_worker_process_fn, callback, &state, task_spec, dynamic_options](
bool successful, const std::string &serialized_runtime_env_context) {
if (successful) {
start_worker_process_fn(task_spec, state, dynamic_options, true,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context, callback);
} else {
process_failed_runtime_env_setup_failed_++;
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
RAY_LOG(WARNING)
<< "Create runtime env failed for task " << task_spec.TaskId()
Expand Down Expand Up @@ -1125,13 +1129,14 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
// create runtime env.
CreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
[start_worker_process_fn, callback, &state, task_spec](
[this, start_worker_process_fn, callback, &state, task_spec](
bool successful, const std::string &serialized_runtime_env_context) {
if (successful) {
start_worker_process_fn(task_spec, state, {}, false,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context, callback);
} else {
process_failed_runtime_env_setup_failed_++;
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
RAY_LOG(WARNING)
<< "Create runtime env failed for task " << task_spec.TaskId()
Expand Down Expand Up @@ -1381,6 +1386,14 @@ std::unordered_set<std::shared_ptr<WorkerInterface>> WorkerPool::GetWorkersByPro
std::string WorkerPool::DebugString() const {
std::stringstream result;
result << "WorkerPool:";
result << "\n- registered jobs: " << all_jobs_.size() - finished_jobs_.size();
result << "\n- process_failed_job_config_missing: "
<< process_failed_job_config_missing_;
result << "\n- process_failed_rate_limited: " << process_failed_rate_limited_;
result << "\n- process_failed_pending_registration: "
<< process_failed_pending_registration_;
result << "\n- process_failed_runtime_env_setup_failed: "
<< process_failed_runtime_env_setup_failed_;
for (const auto &entry : states_by_lang_) {
result << "\n- num " << Language_Name(entry.first)
<< " workers: " << entry.second.registered_workers.size();
Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
const std::function<double()> get_time_;
/// Agent manager.
std::shared_ptr<AgentManager> agent_manager_;
/// Stats
int64_t process_failed_job_config_missing_ = 0;
int64_t process_failed_rate_limited_ = 0;
int64_t process_failed_pending_registration_ = 0;
int64_t process_failed_runtime_env_setup_failed_ = 0;
};
} // namespace raylet
Expand Down

0 comments on commit 5a7f472

Please sign in to comment.