Skip to content

Commit

Permalink
Add info for ExchangeReceiver logs (#7499)
Browse files Browse the repository at this point in the history
close #7498
  • Loading branch information
xzhangxian1008 authored May 18, 2023
1 parent f54a1d4 commit c43b82e
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 14 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/AsyncRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class AsyncRequestHandler : public AsyncRequestHandlerBase
: cq(&(GRPCCompletionQueuePool::global_instance->pickQueue()))
, rpc_context(context)
, request(std::move(req))
, req_info(fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id))
, req_info(fmt::format("async tunnel{}+{}", req.send_task_id, req.recv_task_id))
, has_data(false)
, retry_times(0)
, stage(AsyncRequestStage::NEED_INIT)
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,20 +590,21 @@ void ExchangeReceiverBase<RPCContext>::setUpLocalConnections(std::vector<Request
else
{
LOG_DEBUG(exc_log, "refined local tunnel is enabled");
String req_info = fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id);
String req_info = fmt::format("local tunnel{}+{}", req.send_task_id, req.recv_task_id);
LoggerPtr local_log = Logger::get(fmt::format("{} {}", exc_log->identifier(), req_info));

LocalRequestHandler local_request_handler(
getMemoryTracker(),
[this](bool meet_error, const String & local_err_msg) {
this->connectionDone(meet_error, local_err_msg, exc_log);
[this, log = local_log](bool meet_error, const String & local_err_msg) {
this->connectionDone(meet_error, local_err_msg, log);
},
[this]() {
this->connectionLocalDone();
},
[this]() {
this->addLocalConnectionNum();
},
ReceiverChannelWriter(&(getMsgChannels()), req_info, exc_log, getDataSizeInQueue(), ReceiverMode::Local));
ReceiverChannelWriter(&(getMsgChannels()), req_info, local_log, getDataSizeInQueue(), ReceiverMode::Local));

rpc_context->establishMPPConnectionLocalV2(
req,
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ void Event::switchStatus(EventStatus from, EventStatus to)
magic_enum::enum_name(from),
magic_enum::enum_name(to),
magic_enum::enum_name(status.load()));
#ifndef NDEBUG
LOG_TRACE(log, "switch status: {} --> {}", magic_enum::enum_name(from), magic_enum::enum_name(to));
#endif // !NDEBUG

LOG_DEBUG(log, "switch status: {} --> {}", magic_enum::enum_name(from), magic_enum::enum_name(to));
}

void Event::assertStatus(EventStatus expect)
Expand Down
14 changes: 8 additions & 6 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ Task::Task(MemoryTrackerPtr mem_tracker_, const String & req_id)

Task::~Task()
{
RUNTIME_ASSERT(
task_status == ExecTaskStatus::FINALIZE,
log,
"The state of the Task must be {} before it is destructed, but it is actually {}",
magic_enum::enum_name(ExecTaskStatus::FINALIZE),
magic_enum::enum_name(task_status));
if unlikely (task_status != ExecTaskStatus::FINALIZE)
{
LOG_WARNING(
log,
"The state of the Task should be {} before it is destructed, but it is actually {}",
magic_enum::enum_name(ExecTaskStatus::FINALIZE),
magic_enum::enum_name(task_status));
}
}

#define CHECK_FINISHED \
Expand Down

0 comments on commit c43b82e

Please sign in to comment.