Skip to content

Commit

Permalink
Revert "Revert "Route core worker ERROR/FATAL logs to driver logs (#1… (
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored Sep 14, 2021
1 parent 31e1638 commit 15512c2
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 17 deletions.
19 changes: 19 additions & 0 deletions python/ray/tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ def foo(out_str, err_str):
assert err_str.split("\n")[-2].endswith("def")


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_core_worker_error_message():
script = """
import ray
import sys
ray.init(local_mode=True)
# In local mode this generates an ERROR level log.
ray._private.utils.push_error_to_driver(
ray.worker.global_worker, "type", "Hello there")
"""

proc = run_string_as_driver_nonblocking(script)
err_str = proc.stderr.read().decode("ascii")

assert "Hello there" in err_str, err_str


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_disable_driver_logs_breakpoint():
script = """
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ RAY_CONFIG(bool, preallocate_plasma_memory, false)
/// then spread via weighted (by critical resource usage).
RAY_CONFIG(bool, scheduler_hybrid_scheduling, true)

/// The fraction of resource utilization on a node after which the scheduler starts
/// to prefer spreading tasks to other nodes. This balances between locality and
/// even balancing of load. Low values (min 0.0) encourage more load spreading.
RAY_CONFIG(float, scheduler_spread_threshold,
getenv("RAY_SCHEDULER_SPREAD_THRESHOLD") != nullptr
? std::stof(getenv("RAY_SCHEDULER_SPREAD_THRESHOLD"))
Expand Down
8 changes: 4 additions & 4 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,13 +570,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
uint32_t delay = RayConfig::instance().task_retry_delay_ms();
RAY_LOG(ERROR) << "Will resubmit task after a " << delay
<< "ms delay: " << spec.DebugString();
RAY_LOG(INFO) << "Will resubmit task after a " << delay
<< "ms delay: " << spec.DebugString();
absl::MutexLock lock(&mutex_);
to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec));
} else {
RAY_LOG(ERROR) << "Resubmitting task that produced lost plasma object: "
<< spec.DebugString();
RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object: "
<< spec.DebugString();
if (spec.IsActorTask()) {
auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject());
Expand Down
36 changes: 23 additions & 13 deletions src/ray/util/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ void RayLog::StartRayLog(const std::string &app_name, RayLogLevel severity_thres
app_name_ = app_name;
log_dir_ = log_dir;

// All the logging sinks to add.
std::vector<spdlog::sink_ptr> sinks;
auto level = static_cast<spdlog::level::level_enum>(severity_threshold_);

if (!log_dir_.empty()) {
// Enable log file if log_dir_ is not empty.
std::string dir_ends_with_slash = log_dir_;
Expand Down Expand Up @@ -243,26 +247,32 @@ void RayLog::StartRayLog(const std::string &app_name, RayLogLevel severity_thres
// logger.
spdlog::drop(RayLog::GetLoggerName());
}
file_logger = spdlog::rotating_logger_mt(
RayLog::GetLoggerName(),
auto file_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(
dir_ends_with_slash + app_name_without_path + "_" + std::to_string(pid) + ".log",
log_rotation_max_size_, log_rotation_file_num_);
spdlog::set_default_logger(file_logger);
sinks.push_back(file_sink);
} else {
auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
console_sink->set_pattern(log_format_pattern_);
auto level = static_cast<spdlog::level::level_enum>(severity_threshold_);
console_sink->set_level(level);

auto err_sink = std::make_shared<spdlog::sinks::stderr_color_sink_mt>();
err_sink->set_pattern(log_format_pattern_);
err_sink->set_level(spdlog::level::err);

auto logger = std::shared_ptr<spdlog::logger>(
new spdlog::logger(RayLog::GetLoggerName(), {console_sink, err_sink}));
logger->set_level(level);
spdlog::set_default_logger(logger);
sinks.push_back(console_sink);
}

// In all cases, log errors to the console log so they are in driver logs.
// https://github.com/ray-project/ray/issues/12893
auto err_sink = std::make_shared<spdlog::sinks::stderr_color_sink_mt>();
err_sink->set_pattern(log_format_pattern_);
err_sink->set_level(spdlog::level::err);
sinks.push_back(err_sink);

// Set the combined logger.
auto logger = std::make_shared<spdlog::logger>(RayLog::GetLoggerName(), sinks.begin(),
sinks.end());
logger->set_level(level);
logger->set_pattern(log_format_pattern_);
spdlog::set_level(static_cast<spdlog::level::level_enum>(severity_threshold_));
spdlog::set_pattern(log_format_pattern_);
spdlog::set_default_logger(logger);
}

void RayLog::UninstallSignalAction() {
Expand Down

0 comments on commit 15512c2

Please sign in to comment.