Skip to content

Commit

Permalink
Log Improvement Back Port (#8569)
Browse files Browse the repository at this point in the history
close #8568
  • Loading branch information
xzhangxian1008 authored Dec 27, 2023
1 parent 1226495 commit 660b23a
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 77 deletions.
49 changes: 21 additions & 28 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void CreatingSetsBlockInputStream::createAll()
exception_from_workers.size());
std::rethrow_exception(exception_from_workers.front());
}
LOG_DEBUG(
LOG_INFO(
log,
"Creating all tasks takes {} sec. ",
watch.elapsedSeconds());
Expand All @@ -184,7 +184,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
Stopwatch watch;
try
{
LOG_DEBUG(log, "{}", gen_log_msg());
LOG_INFO(log, "{}", gen_log_msg());
BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});
Expand All @@ -203,7 +203,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
if (isCancelled())
{
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
LOG_WARNING(log, "Query was cancelled during set / join or temporary table creation.");
return;
}

Expand Down Expand Up @@ -263,31 +263,24 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (subquery.join)
head_rows = subquery.join->getTotalBuildInputRows();

if (head_rows != 0)
{
// avoid generate log message when log level > DEBUG.
auto gen_debug_log_msg = [&] {
FmtBuffer msg;
msg.append("Created. ");

if (subquery.set)
msg.fmtAppend("Set with {} entries from {} rows. ", subquery.set->getTotalRowCount(), head_rows);
if (subquery.join)
msg.fmtAppend("Join with {} entries from {} rows. ", subquery.join->getTotalRowCount(), head_rows);
if (subquery.table)
msg.fmtAppend("Table with {} rows. ", head_rows);

msg.fmtAppend("In {:.3f} sec. ", watch.elapsedSeconds());
msg.fmtAppend("using {} threads.", subquery.join ? subquery.join->getBuildConcurrency() : 1);
return msg.toString();
};

LOG_DEBUG(log, "{}", gen_debug_log_msg());
}
else
{
LOG_DEBUG(log, "Subquery has empty result.");
}
// avoid generate log message when log level > INFO.
auto gen_finish_log_msg = [&] {
FmtBuffer msg;
msg.append("Created. ");

if (subquery.set)
msg.fmtAppend("Set with {} entries from {} rows. ", head_rows > 0 ? subquery.set->getTotalRowCount() : 0, head_rows);
if (subquery.join)
msg.fmtAppend("Join with {} entries from {} rows. ", head_rows > 0 ? subquery.join->getTotalRowCount() : 0, head_rows);
if (subquery.table)
msg.fmtAppend("Table with {} rows. ", head_rows);

msg.fmtAppend("In {:.3f} sec. ", watch.elapsedSeconds());
msg.fmtAppend("using {} threads.", subquery.join ? subquery.join->getBuildConcurrency() : 1);
return msg.toString();
};

LOG_INFO(log, "{}", gen_finish_log_msg());
}
catch (...)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
auto [storages, locks, storage_schema_versions, ok] = get_and_lock_storages(false);
if (ok)
{
LOG_INFO(log, "{}", log_schema_version("OK, no syncing required.", storage_schema_versions));
LOG_DEBUG(log, "{}", log_schema_version("OK, no syncing required.", storage_schema_versions));
}
else
/// If first try failed, sync schema and try again.
Expand All @@ -1010,7 +1010,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
std::tie(storages, locks, storage_schema_versions, ok) = get_and_lock_storages(true);
if (ok)
{
LOG_INFO(log, "{}", log_schema_version("OK after syncing.", storage_schema_versions));
LOG_DEBUG(log, "{}", log_schema_version("OK after syncing.", storage_schema_versions));
}
else
throw TiFlashException("Shouldn't reach here", Errors::Coprocessor::Internal);
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ void EstablishCallData::writeDone(String msg, const grpc::Status & status)

if (async_tunnel_sender)
{
LOG_INFO(async_tunnel_sender->getLogger(), "connection for {} cost {} ms, including {} ms to waiting task.", async_tunnel_sender->getTunnelId(), stopwatch->elapsedMilliseconds(), waiting_task_time_ms);
auto time = stopwatch->elapsedMilliseconds();
LOG_IMPL(async_tunnel_sender->getLogger(),
msg.empty() && time < 1000 ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION,
"connection for {} cost {} ms, including {} ms to waiting task.",
async_tunnel_sender->getTunnelId(),
time,
waiting_task_time_ms);

RUNTIME_ASSERT(!async_tunnel_sender->isConsumerFinished(), async_tunnel_sender->getLogger(), "tunnel {} consumer finished in advance", async_tunnel_sender->getTunnelId());

Expand Down
36 changes: 23 additions & 13 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ grpc::Status FlashService::Coprocessor(
coprocessor::Response * response)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_DEBUG(log, "Handling coprocessor request: {}", request->DebugString());
bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(log, log_level, "Handling coprocessor request, is_remote_read: {}, start ts: {}, region id: {}, region epoch: {}", is_remote_read, request->start_ts(), request->context().region_id(), request->context().region_epoch().DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
GET_METRIC(tiflash_coprocessor_request_count, type_cop).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_cop).Increment();
if (is_remote_read)
Expand All @@ -140,7 +141,7 @@ grpc::Status FlashService::Coprocessor(

const auto & settings = context->getSettingsRef();
auto handle_limit = settings.cop_pool_handle_limit != 0 ? settings.cop_pool_handle_limit.get() : 10 * cop_pool->size();
auto max_queued_duration_seconds = std::min(settings.cop_pool_max_queued_seconds, 20);
auto max_queued_duration_ms = std::min(settings.cop_pool_max_queued_seconds, 20) * 1000;

if (handle_limit > 0)
{
Expand All @@ -155,14 +156,20 @@ grpc::Status FlashService::Coprocessor(


grpc::Status ret = executeInThreadPool(*cop_pool, [&] {
if (max_queued_duration_seconds > 0)
auto wait_ms = watch.elapsedMilliseconds();
if (max_queued_duration_ms > 0)
{
if (auto current = watch.elapsedSeconds(); current > max_queued_duration_seconds)
if (wait_ms > static_cast<UInt64>(max_queued_duration_ms))
{
response->mutable_region_error()->mutable_server_is_busy()->set_reason(fmt::format("this task queued in tiflash cop pool too long, current = {}, limit = {}", current, max_queued_duration_seconds));
response->mutable_region_error()->mutable_server_is_busy()->set_reason(fmt::format("this task queued in tiflash cop pool too long, current = {}ms, limit = {}ms", wait_ms, max_queued_duration_ms));
return grpc::Status::OK;
}
}

if (wait_ms > 1000)
log_level = Poco::Message::PRIO_INFORMATION;
LOG_IMPL(log, log_level, "Begin process cop request after wait {} ms, start ts: {}, region id: {}, region epoch: {}", wait_ms, request->start_ts(), request->context().region_id(), request->context().region_epoch().DebugString());

auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -179,14 +186,14 @@ grpc::Status FlashService::Coprocessor(
return cop_handler.execute();
});

LOG_DEBUG(log, "Handle coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
LOG_IMPL(log, log_level, "Handle coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
return ret;
}

grpc::Status FlashService::BatchCoprocessor(grpc::ServerContext * grpc_context, const coprocessor::BatchRequest * request, grpc::ServerWriter<coprocessor::BatchResponse> * writer)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_DEBUG(log, "Handling coprocessor request: {}", request->DebugString());
LOG_INFO(log, "Handling batch coprocessor request, start ts: {}", request->start_ts());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand All @@ -202,6 +209,9 @@ grpc::Status FlashService::BatchCoprocessor(grpc::ServerContext * grpc_context,
});

grpc::Status ret = executeInThreadPool(*batch_cop_pool, [&] {
auto wait_ms = watch.elapsedMilliseconds();
LOG_INFO(log, "Begin process batch cop request after wait {} ms, start ts: {}", wait_ms, request->start_ts());

auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -212,7 +222,7 @@ grpc::Status FlashService::BatchCoprocessor(grpc::ServerContext * grpc_context,
return cop_handler.execute();
});

LOG_DEBUG(log, "Handle coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
LOG_INFO(log, "Handle batch coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
return ret;
}

Expand All @@ -222,7 +232,7 @@ grpc::Status FlashService::DispatchMPPTask(
mpp::DispatchTaskResponse * response)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_DEBUG(log, "Handling mpp dispatch request: {}", request->DebugString());
LOG_INFO(log, "Handling mpp dispatch request, task meta: {}", request->meta().DebugString());
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;
Expand Down Expand Up @@ -278,7 +288,7 @@ grpc::Status AsyncFlashService::establishMPPConnectionAsync(grpc::ServerContext
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// Establish a pipe for data transferring. The pipes have registered by the task in advance.
// We need to find it out and bind the grpc stream with it.
LOG_DEBUG(log, "Handling establish mpp connection request: {}", request->DebugString());
LOG_INFO(log, "Handling establish mpp connection request: {}", request->DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand All @@ -297,7 +307,7 @@ grpc::Status FlashService::EstablishMPPConnection(grpc::ServerContext * grpc_con
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// Establish a pipe for data transferring. The pipes have registered by the task in advance.
// We need to find it out and bind the grpc stream with it.
LOG_DEBUG(log, "Handling establish mpp connection request: {}", request->DebugString());
LOG_INFO(log, "Handling establish mpp connection request: {}", request->DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -350,7 +360,7 @@ grpc::Status FlashService::CancelMPPTask(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// CancelMPPTask cancels the query of the task.
LOG_DEBUG(log, "cancel mpp task request: {}", request->DebugString());
LOG_INFO(log, "cancel mpp task request: {}", request->DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down
23 changes: 8 additions & 15 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
if (handler->finished())
{
--alive_async_connections;
connectionDone(handler->meetError(), handler->getErrMsg(), handler->getLog());
connectionDone(handler->meetError(), handler->getErrMsg());
}
}
else
Expand Down Expand Up @@ -650,7 +650,7 @@ void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
meet_error = true;
local_err_msg = getCurrentExceptionMessage(false);
}
connectionDone(meet_error, local_err_msg, log);
connectionDone(meet_error, local_err_msg);
}

template <typename RPCContext>
Expand Down Expand Up @@ -804,8 +804,7 @@ String ExchangeReceiverBase<RPCContext>::getStatusString()
template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::connectionDone(
bool meet_error,
const String & local_err_msg,
const LoggerPtr & log)
const String & local_err_msg)
{
Int32 copy_live_conn = -1;
{
Expand All @@ -819,22 +818,16 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
}
copy_live_conn = --live_connections;
}
LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_conn);

if (copy_live_conn == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
}
else if (copy_live_conn < 0)
if (copy_live_conn < 0)
throw Exception("live_connections should not be less than 0!");

if (meet_error || copy_live_conn == 0)
{
auto log_level = meet_error ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_INFORMATION;
LOG_IMPL(exc_log, log_level, "Finish receiver channels, meet error: {}, error message: {}", meet_error, local_err_msg);
finishAllMsgChannels();
}
}

template <typename RPCContext>
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ class ExchangeReceiverBase

void connectionDone(
bool meet_error,
const String & local_err_msg,
const LoggerPtr & log);
const String & local_err_msg);

void finishAllMsgChannels();
void cancelAllMsgChannels();
Expand Down
17 changes: 12 additions & 5 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ MPPTask::~MPPTask()
if (process_list_entry != nullptr && current_memory_tracker != process_list_entry->get().getMemoryTrackerPtr().get())
current_memory_tracker = process_list_entry->get().getMemoryTrackerPtr().get();
abortTunnels("", true);
LOG_DEBUG(log, "finish MPPTask: {}", id.toString());
LOG_INFO(log, "finish MPPTask: {}", id.toString());
}

void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
Expand Down Expand Up @@ -361,14 +361,17 @@ void MPPTask::runImpl()
String err_msg;
try
{
LOG_INFO(log, "task starts preprocessing");
LOG_DEBUG(log, "task starts preprocessing");
preprocess();
auto time_cost_in_preprocess_ms = stopwatch.elapsedMilliseconds();
LOG_DEBUG(log, "task preprocess done");
schedule_entry.setNeededThreads(estimateCountOfNewThreads());
LOG_DEBUG(log, "Estimate new thread count of query: {} including tunnel_threads: {}, receiver_threads: {}", schedule_entry.getNeededThreads(), dag_context->tunnel_set->getExternalThreadCnt(), new_thread_count_of_mpp_receiver);

scheduleOrWait();

LOG_INFO(log, "task starts running");
auto time_cost_in_schedule_ms = stopwatch.elapsedMilliseconds() - time_cost_in_preprocess_ms;
LOG_INFO(log, "task starts running, time cost in schedule: {} ms, time cost in preprocess {} ms", time_cost_in_schedule_ms, time_cost_in_preprocess_ms);
if (status.load() != RUNNING)
{
/// when task is in running state, canceling the task will call sendCancelToQuery to do the cancellation, however
Expand All @@ -379,6 +382,10 @@ void MPPTask::runImpl()
mpp_task_statistics.start();

auto result = query_executor_holder->execute();
auto log_level = Poco::Message::PRIO_DEBUG;
if (!result.is_success || status != RUNNING)
log_level = Poco::Message::PRIO_INFORMATION;
LOG_IMPL(log, log_level, "mpp task finish execute, success: {}, status: {}", result.is_success, magic_enum::enum_name(status.load()));
if (likely(result.is_success))
{
// finish receiver
Expand Down Expand Up @@ -408,7 +415,7 @@ void MPPTask::runImpl()
if (err_msg.empty())
{
if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
LOG_DEBUG(log, "finish task");
else
LOG_WARNING(log, "finish task which is in {} state", magic_enum::enum_name(status.load()));
if (status == FINISHED)
Expand Down Expand Up @@ -442,7 +449,7 @@ void MPPTask::runImpl()
mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.logTracingJson();

LOG_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
LOG_DEBUG(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
unregisterTask();
}

Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/Mpp/MPPTaskScheduleEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ bool MPPTaskScheduleEntry::schedule(ScheduleState state)
std::unique_lock lock(schedule_mu);
if (schedule_state == ScheduleState::WAITING)
{
LOG_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
auto log_level = state == ScheduleState::SCHEDULED ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_WARNING;
LOG_IMPL(log, log_level, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
schedule_state = state;
schedule_cv.notify_one();
return true;
Expand All @@ -51,7 +52,7 @@ bool MPPTaskScheduleEntry::schedule(ScheduleState state)

void MPPTaskScheduleEntry::waitForSchedule()
{
LOG_INFO(log, "task waits for schedule");
LOG_DEBUG(log, "task waits for schedule");
Stopwatch stopwatch;
double time_cost = 0;
{
Expand All @@ -69,7 +70,7 @@ void MPPTaskScheduleEntry::waitForSchedule()
throw Exception(fmt::format("{} is failed to schedule because of being cancelled in min-tso scheduler after waiting for {}s.", id.toString(), time_cost));
}
}
LOG_INFO(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
LOG_DEBUG(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
}

const MPPTaskId & MPPTaskScheduleEntry::getMPPTaskId() const
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/MPPTaskStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ const BaseRuntimeStatistics & MPPTaskStatistics::collectRuntimeStatistics()

void MPPTaskStatistics::logTracingJson()
{
LOG_INFO(
LOG_IMPL(
logger,
/// don't use info log for initializing status since it does not contains too many information
status == INITIALIZING ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION,
R"({{"query_tso":{},"task_id":{},"is_root":{},"sender_executor_id":"{}","executors":{},"host":"{}")"
R"(,"task_init_timestamp":{},"task_start_timestamp":{},"task_end_timestamp":{})"
R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})"
Expand Down
Loading

0 comments on commit 660b23a

Please sign in to comment.