Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Improvement Back Port #8569

Merged
merged 10 commits into from
Dec 27, 2023
49 changes: 21 additions & 28 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void CreatingSetsBlockInputStream::createAll()
exception_from_workers.size());
std::rethrow_exception(exception_from_workers.front());
}
LOG_DEBUG(
LOG_INFO(
log,
"Creating all tasks takes {} sec. ",
watch.elapsedSeconds());
Expand All @@ -184,7 +184,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
Stopwatch watch;
try
{
LOG_DEBUG(log, "{}", gen_log_msg());
LOG_INFO(log, "{}", gen_log_msg());
BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});
Expand All @@ -203,7 +203,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
if (isCancelled())
{
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
LOG_WARNING(log, "Query was cancelled during set / join or temporary table creation.");
return;
}

Expand Down Expand Up @@ -263,31 +263,24 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (subquery.join)
head_rows = subquery.join->getTotalBuildInputRows();

if (head_rows != 0)
{
// avoid generate log message when log level > DEBUG.
auto gen_debug_log_msg = [&] {
FmtBuffer msg;
msg.append("Created. ");

if (subquery.set)
msg.fmtAppend("Set with {} entries from {} rows. ", subquery.set->getTotalRowCount(), head_rows);
if (subquery.join)
msg.fmtAppend("Join with {} entries from {} rows. ", subquery.join->getTotalRowCount(), head_rows);
if (subquery.table)
msg.fmtAppend("Table with {} rows. ", head_rows);

msg.fmtAppend("In {:.3f} sec. ", watch.elapsedSeconds());
msg.fmtAppend("using {} threads.", subquery.join ? subquery.join->getBuildConcurrency() : 1);
return msg.toString();
};

LOG_DEBUG(log, "{}", gen_debug_log_msg());
}
else
{
LOG_DEBUG(log, "Subquery has empty result.");
}
// avoid generate log message when log level > INFO.
auto gen_finish_log_msg = [&] {
FmtBuffer msg;
msg.append("Created. ");

if (subquery.set)
msg.fmtAppend("Set with {} entries from {} rows. ", head_rows > 0 ? subquery.set->getTotalRowCount() : 0, head_rows);
if (subquery.join)
msg.fmtAppend("Join with {} entries from {} rows. ", head_rows > 0 ? subquery.join->getTotalRowCount() : 0, head_rows);
if (subquery.table)
msg.fmtAppend("Table with {} rows. ", head_rows);

msg.fmtAppend("In {:.3f} sec. ", watch.elapsedSeconds());
msg.fmtAppend("using {} threads.", subquery.join ? subquery.join->getBuildConcurrency() : 1);
return msg.toString();
};

LOG_INFO(log, "{}", gen_finish_log_msg());
}
catch (...)
{
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,26 +991,26 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
GET_METRIC(tiflash_schema_trigger_count, type_cop_read).Increment();
tmt.getSchemaSyncer()->syncSchemas(context);
auto schema_sync_cost = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start_time).count();
LOG_INFO(log, "Table {} schema sync cost {}ms.", logical_table_id, schema_sync_cost);
LOG_DEBUG(log, "Table {} schema sync cost {}ms.", logical_table_id, schema_sync_cost);
};

/// Try get storage and lock once.
auto [storages, locks, storage_schema_versions, ok] = get_and_lock_storages(false);
if (ok)
{
LOG_INFO(log, "{}", log_schema_version("OK, no syncing required.", storage_schema_versions));
LOG_DEBUG(log, "{}", log_schema_version("OK, no syncing required.", storage_schema_versions));
}
else
/// If first try failed, sync schema and try again.
{
LOG_INFO(log, "not OK, syncing schemas.");
LOG_DEBUG(log, "not OK, syncing schemas.");

sync_schema();

std::tie(storages, locks, storage_schema_versions, ok) = get_and_lock_storages(true);
if (ok)
{
LOG_INFO(log, "{}", log_schema_version("OK after syncing.", storage_schema_versions));
LOG_DEBUG(log, "{}", log_schema_version("OK after syncing.", storage_schema_versions));
}
else
throw TiFlashException("Shouldn't reach here", Errors::Coprocessor::Internal);
Expand Down
36 changes: 23 additions & 13 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ grpc::Status FlashService::Coprocessor(
coprocessor::Response * response)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_DEBUG(log, "Handling coprocessor request: {}", request->DebugString());
bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(log, log_level, "Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}, region epoch: {}", is_remote_read, request->start_ts(), request->context().region_id(), request->context().region_epoch().DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
GET_METRIC(tiflash_coprocessor_request_count, type_cop).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_cop).Increment();
if (is_remote_read)
Expand All @@ -140,7 +141,7 @@ grpc::Status FlashService::Coprocessor(

const auto & settings = context->getSettingsRef();
auto handle_limit = settings.cop_pool_handle_limit != 0 ? settings.cop_pool_handle_limit.get() : 10 * cop_pool->size();
auto max_queued_duration_seconds = std::min(settings.cop_pool_max_queued_seconds, 20);
auto max_queued_duration_ms = std::min(settings.cop_pool_max_queued_seconds, 20) * 1000;

if (handle_limit > 0)
{
Expand All @@ -155,14 +156,20 @@ grpc::Status FlashService::Coprocessor(


grpc::Status ret = executeInThreadPool(*cop_pool, [&] {
if (max_queued_duration_seconds > 0)
auto wait_ms = watch.elapsedMilliseconds();
if (max_queued_duration_ms > 0)
{
if (auto current = watch.elapsedSeconds(); current > max_queued_duration_seconds)
if (wait_ms > static_cast<UInt64>(max_queued_duration_ms))
{
response->mutable_region_error()->mutable_server_is_busy()->set_reason(fmt::format("this task queued in tiflash cop pool too long, current = {}, limit = {}", current, max_queued_duration_seconds));
response->mutable_region_error()->mutable_server_is_busy()->set_reason(fmt::format("this task queued in tiflash cop pool too long, current = {}ms, limit = {}ms", wait_ms, max_queued_duration_ms));
return grpc::Status::OK;
}
}

if (wait_ms > 1000)
log_level = Poco::Message::PRIO_INFORMATION;
LOG_IMPL(log, log_level, "Begin process cop request after wait {} ms, start ts: {}, region info: {}, region epoch: {}", wait_ms, request->start_ts(), request->context().region_id(), request->context().region_epoch().DebugString());

auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -179,14 +186,14 @@ grpc::Status FlashService::Coprocessor(
return cop_handler.execute();
});

LOG_DEBUG(log, "Handle coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
LOG_IMPL(log, log_level, "Handle coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
return ret;
}

grpc::Status FlashService::BatchCoprocessor(grpc::ServerContext * grpc_context, const coprocessor::BatchRequest * request, grpc::ServerWriter<coprocessor::BatchResponse> * writer)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_DEBUG(log, "Handling coprocessor request: {}", request->DebugString());
LOG_INFO(log, "Handling batch coprocessor request, start ts: {}", request->start_ts());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand All @@ -202,6 +209,9 @@ grpc::Status FlashService::BatchCoprocessor(grpc::ServerContext * grpc_context,
});

grpc::Status ret = executeInThreadPool(*batch_cop_pool, [&] {
auto wait_ms = watch.elapsedMilliseconds();
LOG_INFO(log, "Begin process batch cop request after wait {} ms, start ts: {}", wait_ms, request->start_ts());

auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -212,7 +222,7 @@ grpc::Status FlashService::BatchCoprocessor(grpc::ServerContext * grpc_context,
return cop_handler.execute();
});

LOG_DEBUG(log, "Handle coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
LOG_INFO(log, "Handle batch coprocessor request done: {}, {}", ret.error_code(), ret.error_message());
return ret;
}

Expand All @@ -222,7 +232,7 @@ grpc::Status FlashService::DispatchMPPTask(
mpp::DispatchTaskResponse * response)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_DEBUG(log, "Handling mpp dispatch request: {}", request->DebugString());
LOG_INFO(log, "Handling mpp dispatch request, task meta: {}", request->meta().DebugString());
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;
Expand Down Expand Up @@ -278,7 +288,7 @@ grpc::Status AsyncFlashService::establishMPPConnectionAsync(grpc::ServerContext
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// Establish a pipe for data transferring. The pipes have registered by the task in advance.
// We need to find it out and bind the grpc stream with it.
LOG_DEBUG(log, "Handling establish mpp connection request: {}", request->DebugString());
LOG_INFO(log, "Handling establish mpp connection request: {}", request->DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand All @@ -297,7 +307,7 @@ grpc::Status FlashService::EstablishMPPConnection(grpc::ServerContext * grpc_con
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// Establish a pipe for data transferring. The pipes have registered by the task in advance.
// We need to find it out and bind the grpc stream with it.
LOG_DEBUG(log, "Handling establish mpp connection request: {}", request->DebugString());
LOG_INFO(log, "Handling establish mpp connection request: {}", request->DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -350,7 +360,7 @@ grpc::Status FlashService::CancelMPPTask(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
// CancelMPPTask cancels the query of the task.
LOG_DEBUG(log, "cancel mpp task request: {}", request->DebugString());
LOG_INFO(log, "cancel mpp task request: {}", request->DebugString());

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down
23 changes: 8 additions & 15 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
if (handler->finished())
{
--alive_async_connections;
connectionDone(handler->meetError(), handler->getErrMsg(), handler->getLog());
connectionDone(handler->meetError(), handler->getErrMsg());
}
}
else
Expand Down Expand Up @@ -650,7 +650,7 @@ void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
meet_error = true;
local_err_msg = getCurrentExceptionMessage(false);
}
connectionDone(meet_error, local_err_msg, log);
connectionDone(meet_error, local_err_msg);
}

template <typename RPCContext>
Expand Down Expand Up @@ -804,8 +804,7 @@ String ExchangeReceiverBase<RPCContext>::getStatusString()
template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::connectionDone(
bool meet_error,
const String & local_err_msg,
const LoggerPtr & log)
const String & local_err_msg)
{
Int32 copy_live_conn = -1;
{
Expand All @@ -819,22 +818,16 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
}
copy_live_conn = --live_connections;
}
LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_conn);

if (copy_live_conn == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
}
else if (copy_live_conn < 0)
if (copy_live_conn < 0)
throw Exception("live_connections should not be less than 0!");

if (meet_error || copy_live_conn == 0)
{
auto log_level = meet_error ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_INFORMATION;
LOG_IMPL(exc_log, log_level, "Finish receiver channels, meet error: {}, error message: {}", meet_error, local_err_msg);
finishAllMsgChannels();
}
}

template <typename RPCContext>
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ class ExchangeReceiverBase

void connectionDone(
bool meet_error,
const String & local_err_msg,
const LoggerPtr & log);
const String & local_err_msg);

void finishAllMsgChannels();
void cancelAllMsgChannels();
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ MPPTask::~MPPTask()
if (process_list_entry != nullptr && current_memory_tracker != process_list_entry->get().getMemoryTrackerPtr().get())
current_memory_tracker = process_list_entry->get().getMemoryTrackerPtr().get();
abortTunnels("", true);
LOG_DEBUG(log, "finish MPPTask: {}", id.toString());
LOG_INFO(log, "finish MPPTask: {}", id.toString());
}

void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
Expand Down Expand Up @@ -361,14 +361,17 @@ void MPPTask::runImpl()
String err_msg;
try
{
LOG_INFO(log, "task starts preprocessing");
LOG_DEBUG(log, "task starts preprocessing");
preprocess();
auto time_cost_in_preprocess_ms = stopwatch.elapsedMilliseconds();
LOG_DEBUG(log, "task preprocess done");
schedule_entry.setNeededThreads(estimateCountOfNewThreads());
LOG_DEBUG(log, "Estimate new thread count of query: {} including tunnel_threads: {}, receiver_threads: {}", schedule_entry.getNeededThreads(), dag_context->tunnel_set->getExternalThreadCnt(), new_thread_count_of_mpp_receiver);

scheduleOrWait();

LOG_INFO(log, "task starts running");
auto time_cost_in_schedule_ms = stopwatch.elapsedMilliseconds() - time_cost_in_preprocess_ms;
LOG_INFO(log, "task starts running, time cost in schedule: {} ms, time cost in preprocess", time_cost_in_schedule_ms, time_cost_in_preprocess_ms);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
if (status.load() != RUNNING)
{
/// when task is in running state, canceling the task will call sendCancelToQuery to do the cancellation, however
Expand All @@ -379,6 +382,7 @@ void MPPTask::runImpl()
mpp_task_statistics.start();

auto result = query_executor_holder->execute();
LOG_INFO(log, "mpp task finish execute, success: {}", result.is_success);
if (likely(result.is_success))
{
// finish receiver
Expand Down Expand Up @@ -408,7 +412,7 @@ void MPPTask::runImpl()
if (err_msg.empty())
{
if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
LOG_DEBUG(log, "finish task");
else
LOG_WARNING(log, "finish task which is in {} state", magic_enum::enum_name(status.load()));
if (status == FINISHED)
Expand Down Expand Up @@ -442,7 +446,7 @@ void MPPTask::runImpl()
mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.logTracingJson();

LOG_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
LOG_DEBUG(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
unregisterTask();
}

Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/Mpp/MPPTaskScheduleEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ bool MPPTaskScheduleEntry::schedule(ScheduleState state)
std::unique_lock lock(schedule_mu);
if (schedule_state == ScheduleState::WAITING)
{
LOG_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
auto log_level = state == ScheduleState::SCHEDULED ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_WARNING;
LOG_IMPL(log, log_level, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
schedule_state = state;
schedule_cv.notify_one();
return true;
Expand All @@ -51,7 +52,7 @@ bool MPPTaskScheduleEntry::schedule(ScheduleState state)

void MPPTaskScheduleEntry::waitForSchedule()
{
LOG_INFO(log, "task waits for schedule");
LOG_DEBUG(log, "task waits for schedule");
Stopwatch stopwatch;
double time_cost = 0;
{
Expand All @@ -69,7 +70,7 @@ void MPPTaskScheduleEntry::waitForSchedule()
throw Exception(fmt::format("{} is failed to schedule because of being cancelled in min-tso scheduler after waiting for {}s.", id.toString(), time_cost));
}
}
LOG_INFO(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
LOG_DEBUG(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
}

const MPPTaskId & MPPTaskScheduleEntry::getMPPTaskId() const
Expand Down
Loading