Skip to content

Commit

Permalink
refine log for cop/batch cop (pingcap#8182)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Oct 11, 2023
1 parent 65bf5af commit 7e44baf
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 38 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
7 changes: 4 additions & 3 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ extern const int NOT_IMPLEMENTED;
BatchCoprocessorHandler::BatchCoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::BatchRequest * cop_request_,
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_)
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_,
const String & identifier)
: cop_context(cop_context_)
, cop_request(cop_request_)
, writer(writer_)
, resource_group_name(cop_request->context().resource_control_context().resource_group_name())
, log(Logger::get("BatchCoprocessorHandler, resource_group: " + resource_group_name))
, log(Logger::get(identifier))
{}

grpc::Status BatchCoprocessorHandler::execute()
Expand Down Expand Up @@ -83,7 +84,7 @@ grpc::Status BatchCoprocessorHandler::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
DAGRequestKind::BatchCop,
resource_group_name,
Logger::get("BatchCoprocessorHandler, resource_group: " + resource_group_name));
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

DAGDriver<DAGRequestKind::BatchCop> driver(
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/BatchCoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class BatchCoprocessorHandler
BatchCoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::BatchRequest * cop_request_,
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_);
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_,
const String & identifier);

grpc::Status execute();

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace pingcap
{
namespace common
{

template <typename T>
class CopIterMPMCQueue : public IMPMCQueue<T>
{
Expand Down Expand Up @@ -181,7 +180,8 @@ class CoprocessorReader
bool enable_cop_stream_,
size_t queue_size,
UInt64 cop_timeout,
const pingcap::kv::LabelFilter & tiflash_label_filter_)
const pingcap::kv::LabelFilter & tiflash_label_filter_,
const String & source_identifier)
: schema(schema_)
, has_enforce_encode_type(has_enforce_encode_type_)
, concurrency(concurrency_)
Expand All @@ -191,7 +191,7 @@ class CoprocessorReader
std::move(tasks),
cluster,
concurrency_,
&Poco::Logger::get("pingcap/coprocessor"),
&Poco::Logger::get(fmt::format("{} pingcap/coprocessor", source_identifier)),
cop_timeout,
tiflash_label_filter_)
{}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ CoprocessorReaderPtr DAGStorageInterpreter::buildCoprocessorReader(const std::ve
enable_cop_stream,
queue_size,
cop_timeout,
tiflash_label_filter);
tiflash_label_filter,
log->identifier());
context.getDAGContext()->addCoprocessorReader(coprocessor_reader);

return coprocessor_reader;
Expand Down
19 changes: 7 additions & 12 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,26 @@ template <>
CoprocessorHandler<false>::CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
coprocessor::Response * cop_response_)
coprocessor::Response * cop_response_,
const String & identifier)
: cop_context(cop_context_)
, cop_request(cop_request_)
, cop_response(cop_response_)
, resource_group_name(cop_request->context().resource_control_context().resource_group_name())
, log(Logger::get("CoprocessorHandler, resource_group: " + resource_group_name))
, log(Logger::get(identifier))
{}

template <>
CoprocessorHandler<true>::CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
grpc::ServerWriter<coprocessor::Response> * cop_writer_)
grpc::ServerWriter<coprocessor::Response> * cop_writer_,
const String & identifier)
: cop_context(cop_context_)
, cop_request(cop_request_)
, cop_writer(cop_writer_)
, resource_group_name(cop_request->context().resource_control_context().resource_group_name())
, log(Logger::get("CoprocessorHandler(stream), resource_group: " + resource_group_name))
, log(Logger::get(identifier))
{}

template <bool is_stream>
Expand Down Expand Up @@ -141,13 +143,6 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
genCopKeyRange(cop_request->ranges()),
&bypass_lock_ts));

String msg;
if constexpr (is_stream)
msg = "CoprocessorHandler(stream), resource_group: ";
else
msg = "CoprocessorHandler, resource_group: ";
msg += resource_group_name;

DAGRequestKind kind;
if constexpr (is_stream)
kind = DAGRequestKind::CopStream;
Expand All @@ -161,7 +156,7 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
kind,
resource_group_name,
Logger::get(msg));
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

if constexpr (is_stream)
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/CoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class CoprocessorHandler
CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
coprocessor::Response * response_);
coprocessor::Response * response_,
const String & identifier);
CoprocessorHandler(
CoprocessorContext & cop_context_,
const coprocessor::Request * cop_request_,
grpc::ServerWriter<coprocessor::Response> * cop_writer_);
grpc::ServerWriter<coprocessor::Response> * cop_writer_,
const String & identifier);

grpc::Status execute();

Expand Down
52 changes: 37 additions & 15 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,19 @@ grpc::Status FlashService::Coprocessor(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
auto region_info = fmt::format(
"{{{}, {}, {}}}",
request->context().region_id(),
request->context().region_epoch().conf_ver(),
request->context().region_epoch().version());
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_level,
"Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}, region epoch: {}",
"Handling coprocessor request, is_remote_read: {}, start ts: {}, region info: {}",
is_remote_read,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -275,11 +279,10 @@ grpc::Status FlashService::Coprocessor(
LOG_IMPL(
log,
log_level,
"Begin process cop request after wait {} ms, start ts: {}, region info: {}, region epoch: {}",
"Begin process cop request after wait {} ms, start ts: {}, region info: {}",
wait_ms,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);
auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -292,7 +295,13 @@ grpc::Status FlashService::Coprocessor(
GET_METRIC(tiflash_coprocessor_handling_request_count, type_remote_read_executing).Decrement();
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
CoprocessorHandler<false> cop_handler(cop_context, request, response);
auto request_identifier = fmt::format(
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name());
CoprocessorHandler<false> cop_handler(cop_context, request, response, request_identifier);
return cop_handler.execute();
});

Expand Down Expand Up @@ -330,7 +339,11 @@ grpc::Status FlashService::BatchCoprocessor(
return status;
}
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
BatchCoprocessorHandler cop_handler(cop_context, request, writer);
auto request_identifier = fmt::format(
"BatchCoprocessor, start_ts: {}, resource_group: {}",
request->start_ts(),
request->context().resource_control_context().resource_group_name());
BatchCoprocessorHandler cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
});

Expand All @@ -345,15 +358,19 @@ grpc::Status FlashService::CoprocessorStream(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
bool is_remote_read = getClientMetaVarWithDefault(grpc_context, "is_remote_read", "") == "true";
auto region_info = fmt::format(
"{{{}, {}, {}}}",
request->context().region_id(),
request->context().region_epoch().conf_ver(),
request->context().region_epoch().version());
auto log_level = is_remote_read ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_level,
"Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}, region epoch: {}",
"Handling coprocessor stream request, is_remote_read: {}, start ts: {}, region info: {}",
is_remote_read,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);

auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
Expand Down Expand Up @@ -419,11 +436,10 @@ grpc::Status FlashService::CoprocessorStream(
LOG_IMPL(
log,
log_level,
"Begin process cop stream request after wait {} ms, start ts: {}, region info: {}, region epoch: {}",
"Begin process cop stream request after wait {} ms, start ts: {}, region info: {}",
wait_ms,
request->start_ts(),
request->context().region_id(),
request->context().region_epoch().DebugString());
region_info);
auto [db_context, status] = createDBContext(grpc_context);
if (!status.ok())
{
Expand All @@ -436,7 +452,13 @@ grpc::Status FlashService::CoprocessorStream(
GET_METRIC(tiflash_coprocessor_handling_request_count, type_remote_read_executing).Decrement();
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
CoprocessorHandler<true> cop_handler(cop_context, request, writer);
auto request_identifier = fmt::format(
"Coprocessor(stream), is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name());
CoprocessorHandler<true> cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
});

Expand Down

0 comments on commit 7e44baf

Please sign in to comment.