Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection_id and connection_alias in logs #8348

Merged
merged 11 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ tipb::SelectResponse executeDAGRequest(
region_id,
RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", log);
DAGContext
dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log);
context.setDAGContext(&dag_context);

DAGDriver<DAGRequestKind::Cop> driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down Expand Up @@ -433,7 +434,8 @@ bool runAndCompareDagReq(
region_id,
RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", log);
DAGContext
dag_context(dag_request, std::move(tables_regions_info), NullspaceID, "", DAGRequestKind::Cop, "", 0, "", log);
context.setDAGContext(&dag_context);
DAGDriver<DAGRequestKind::Cop>
driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ grpc::Status BatchCoprocessorHandler::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
DAGRequestKind::BatchCop,
resource_group_name,
cop_request->connection_id(),
cop_request->connection_alias(),
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ DAGContext::DAGContext(
const String & tidb_host_,
DAGRequestKind kind_,
const String & resource_group_name_,
UInt64 connection_id_,
const String & connection_alias_,
LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->ShortDebugString())
Expand All @@ -71,6 +73,8 @@ DAGContext::DAGContext(
, warning_count(0)
, keyspace_id(keyspace_id_)
, resource_group_name(resource_group_name_)
, connection_id(connection_id_)
, connection_alias(connection_alias_)
{
RUNTIME_ASSERT(kind != DAGRequestKind::MPP, log, "DAGContext non-mpp constructor get a mpp kind");
initOutputInfo();
Expand All @@ -95,6 +99,8 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
, resource_group_name(meta_.resource_group_name())
, connection_id(meta_.connection_id())
, connection_alias(meta_.connection_alias())
{
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
Expand Down Expand Up @@ -127,6 +133,8 @@ DAGContext::DAGContext(
, warnings(max_recorded_error_count)
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(task_meta_))
, connection_id(task_meta_.connection_id())
, connection_alias(task_meta_.connection_alias())
{
initOutputInfo();
}
Expand All @@ -144,6 +152,7 @@ DAGContext::DAGContext(UInt64 max_error_count_)
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
, connection_id(0)
{}

// for tests need to run query tasks.
Expand All @@ -163,8 +172,10 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, connection_id(0)
{
query_operator_spill_contexts = std::make_shared<QueryOperatorSpillContexts>(MPPQueryId(0, 0, 0, 0, ""), 100);
query_operator_spill_contexts
= std::make_shared<QueryOperatorSpillContexts>(MPPQueryId(0, 0, 0, 0, "", 0, ""), 100);
initOutputInfo();
}

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class DAGContext
const String & tidb_host_,
DAGRequestKind cop_kind_,
const String & resource_group_name,
UInt64 connection_id_,
const String & connection_alias_,
LoggerPtr log_);

// for mpp
Expand Down Expand Up @@ -340,6 +342,9 @@ class DAGContext
void setAutoSpillMode() { in_auto_spill_mode = true; }
bool isInAutoSpillMode() const { return in_auto_spill_mode; }

UInt64 getConnectionID() const { return connection_id; }
const String & getConnectionAlias() const { return connection_alias; }

public:
DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
Expand Down Expand Up @@ -447,6 +452,11 @@ class DAGContext
// - Stream: execute with block input stream
// - Pipeline: execute with pipeline model
ExecutionMode execution_mode = ExecutionMode::None;

// It's the session id between mysql client and tidb
UInt64 connection_id;
// It's the session alias between mysql client and tidb
String connection_alias;
};

} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ std::vector<pingcap::coprocessor::CopTask> DAGStorageInterpreter::buildCopTasks(
req,
store_type,
dagContext().getKeyspaceID(),
remote_request.connection_id,
remote_request.connection_alias,
&Poco::Logger::get("pingcap/coprocessor"),
std::move(meta_data),
[&] { GET_METRIC(tiflash_coprocessor_request_count, type_remote_read_sent).Increment(); });
Expand Down Expand Up @@ -1551,6 +1553,8 @@ std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests(const DM::
retry_regions_map[region_id_to_table_id_map[r.get().region_id]].emplace_back(r);
}

UInt64 connection_id = dagContext().getConnectionID();
const String & connection_alias = dagContext().getConnectionAlias();
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
{
const auto & retry_regions = retry_regions_map[physical_table_id];
Expand All @@ -1568,6 +1572,8 @@ std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests(const DM::
table_scan,
storages_with_structure_lock[physical_table_id].storage->getTableInfo(),
filter_conditions,
connection_id,
connection_alias,
log));
}
LOG_DEBUG(log, "remote request size: {}", remote_requests.size());
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ RemoteRequest RemoteRequest::build(
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const FilterConditions & filter_conditions,
UInt64 connection_id,
const String & connection_alias,
const LoggerPtr & log)
{
LOG_INFO(log, "{}", printRetryRegions(retry_regions, table_info.id));
Expand Down Expand Up @@ -86,7 +88,7 @@ RemoteRequest RemoteRequest::build(
dag_req.set_time_zone_offset(original_dag_req.time_zone_offset());

std::vector<pingcap::coprocessor::KeyRange> key_ranges = buildKeyRanges(retry_regions);
return {std::move(dag_req), std::move(schema), std::move(key_ranges)};
return {std::move(dag_req), std::move(schema), std::move(key_ranges), connection_id, connection_alias};
}

std::vector<pingcap::coprocessor::KeyRange> RemoteRequest::buildKeyRanges(const RegionRetryList & retry_regions)
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ struct RemoteRequest
RemoteRequest(
tipb::DAGRequest && dag_request_,
DAGSchema && schema_,
std::vector<pingcap::coprocessor::KeyRange> && key_ranges_)
std::vector<pingcap::coprocessor::KeyRange> && key_ranges_,
UInt64 connection_id_,
const String & connection_alias_)
: dag_request(std::move(dag_request_))
, schema(std::move(schema_))
, key_ranges(std::move(key_ranges_))
, connection_id(connection_id_)
, connection_alias(connection_alias_)
{}

static RemoteRequest build(
Expand All @@ -52,6 +56,8 @@ struct RemoteRequest
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const FilterConditions & filter_conditions,
UInt64 connection_id,
const String & connection_alias,
const LoggerPtr & log);
static std::vector<pingcap::coprocessor::KeyRange> buildKeyRanges(const RegionRetryList & retry_regions);
static std::string printRetryRegions(const RegionRetryList & retry_regions, TableID table_id);
Expand All @@ -60,5 +66,8 @@ struct RemoteRequest
DAGSchema schema;
/// the sorted key ranges
std::vector<pingcap::coprocessor::KeyRange> key_ranges;

UInt64 connection_id;
String connection_alias;
};
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ grpc::Status CoprocessorHandler<is_stream>::execute()
cop_context.db_context.getClientInfo().current_address.toString(),
kind,
resource_group_name,
cop_request->connection_id(),
cop_request->connection_alias(),
Logger::get(log->identifier()));
cop_context.db_context.setDAGContext(&dag_context);

Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,14 @@ grpc::Status FlashService::Coprocessor(
});
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
auto request_identifier = fmt::format(
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}",
"Coprocessor, is_remote_read: {}, start_ts: {}, region_info: {}, resource_group: {}, conn_id: {}, "
"conn_alias: {}",
is_remote_read,
request->start_ts(),
region_info,
request->context().resource_control_context().resource_group_name());
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());
CoprocessorHandler<false> cop_handler(cop_context, request, response, request_identifier);
return cop_handler.execute();
};
Expand Down Expand Up @@ -340,9 +343,11 @@ grpc::Status FlashService::BatchCoprocessor(
}
CoprocessorContext cop_context(*db_context, request->context(), *grpc_context);
auto request_identifier = fmt::format(
"BatchCoprocessor, start_ts: {}, resource_group: {}",
"BatchCoprocessor, start_ts: {}, resource_group: {}, conn_id: {}, conn_alias: {}",
request->start_ts(),
request->context().resource_control_context().resource_group_name());
request->context().resource_control_context().resource_group_name(),
request->connection_id(),
request->connection_alias());
BatchCoprocessorHandler cop_handler(cop_context, request, writer, request_identifier);
return cop_handler.execute();
});
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ String MPPTaskId::toString() const
const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "");
const MPPQueryId MPPTaskId::Max_Query_Id
= MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64, "", MAX_UINT64, "");

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
Expand Down
46 changes: 36 additions & 10 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,33 @@ struct MPPQueryId
UInt64 server_id;
UInt64 start_ts;
String resource_group_name;
UInt64 connection_id;
String connection_alias;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

MPPQueryId(
UInt64 query_ts,
UInt64 local_query_id,
UInt64 server_id,
UInt64 start_ts,
const String & resource_group_name_)
const String & resource_group_name_,
UInt64 connection_id_,
const String & connection_alias_)
: query_ts(query_ts)
, local_query_id(local_query_id)
, server_id(server_id)
, start_ts(start_ts)
, resource_group_name(resource_group_name_)
, connection_id(connection_id_)
, connection_alias(connection_alias_)
{}
explicit MPPQueryId(const mpp::TaskMeta & task_meta)
: query_ts(task_meta.query_ts())
, local_query_id(task_meta.local_query_id())
, server_id(task_meta.server_id())
, start_ts(task_meta.start_ts())
, resource_group_name(task_meta.resource_group_name())
, connection_id(task_meta.connection_id())
, connection_alias(task_meta.connection_alias())
{}
bool operator<(const MPPQueryId & mpp_query_id) const;
bool operator==(const MPPQueryId & rid) const;
Expand All @@ -56,12 +64,15 @@ struct MPPQueryId
String toString() const
{
return fmt::format(
"<query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}>",
"<query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}, conn_id: {}, conn_alias: "
"{}>",
Comment on lines +67 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not related to this PR)
Must we log the resource_group into logging? I think the MPPTaskId is quite long now. It may bring performance regression when there are lots of logging.

@guo-shaoge

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I think for information like conn_id/conn_alias/resource_group, we can only log it on receiving the request, not log it for every log of during the query

query_ts,
local_query_id,
server_id,
start_ts,
resource_group_name);
resource_group_name,
connection_id,
connection_alias);
}
};

Expand Down Expand Up @@ -89,9 +100,11 @@ struct MPPGatherId
UInt64 local_query_id,
UInt64 server_id,
UInt64 start_ts,
const String & resource_group_name)
const String & resource_group_name,
UInt64 connection_id,
const String & connection_alias)
: gather_id(gather_id_)
, query_id(query_ts, local_query_id, server_id, start_ts, resource_group_name)
, query_id(query_ts, local_query_id, server_id, start_ts, resource_group_name, connection_id, connection_alias)
{}
explicit MPPGatherId(const mpp::TaskMeta & task_meta)
: gather_id(task_meta.gather_id())
Expand All @@ -100,13 +113,16 @@ struct MPPGatherId
String toString() const
{
return fmt::format(
"<gather_id:{}, query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}>",
"<gather_id:{}, query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}, resource_group: {}, conn_id: "
"{}, conn_alias: {}>",
gather_id,
query_id.query_ts,
query_id.local_query_id,
query_id.server_id,
query_id.start_ts,
query_id.resource_group_name);
query_id.resource_group_name,
query_id.connection_id,
query_id.connection_alias);
}
bool hasMeaningfulGatherId() const { return gather_id > 0; }
bool operator==(const MPPGatherId & rid) const;
Expand All @@ -122,7 +138,7 @@ struct MPPTaskId
{
MPPTaskId()
: task_id(unknown_task_id)
, gather_id(0, 0, 0, 0, 0, ""){};
, gather_id(0, 0, 0, 0, 0, "", 0, ""){};

MPPTaskId(
UInt64 start_ts,
Expand All @@ -131,9 +147,19 @@ struct MPPTaskId
Int64 gather_id,
UInt64 query_ts,
UInt64 local_query_id,
const String resource_group_name)
const String resource_group_name,
UInt64 connection_id,
const String & connection_alias)
: task_id(task_id_)
, gather_id(gather_id, query_ts, local_query_id, server_id, start_ts, resource_group_name)
, gather_id(
gather_id,
query_ts,
local_query_id,
server_id,
start_ts,
resource_group_name,
connection_id,
connection_alias)
{}

explicit MPPTaskId(const mpp::TaskMeta & task_meta)
Expand Down
Loading