Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <[email protected]>
  • Loading branch information
hehechen committed Dec 15, 2022
1 parent 3e70e2c commit ffd0afd
Show file tree
Hide file tree
Showing 18 changed files with 117 additions and 60 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ namespace DB
F(type_active_rpc_async_worker, {"type", "rpc_async_worker_active"}), \
F(type_total_rpc_async_worker, {"type", "rpc_async_worker_total"})) \
M(tiflash_task_scheduler, "Min-tso task scheduler", Gauge, \
F(type_min_query_ts, {"type", "min_query_ts"}), \
F(type_min_tso, {"type", "min_tso"}), \
F(type_waiting_queries_count, {"type", "waiting_queries_count"}), \
F(type_active_queries_count, {"type", "active_queries_count"}), \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ inline size_t getReadTSOForLog(const String & line)
{
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
// Rely on that MPP task prefix "MPP<query:435802637197639681,task:1>"
// Rely on that MPP task prefix "MPP<query:query_ts:1671076528883980489, local_query_id:42356295, server_id:3340035, start_ts:438062669858603008,start_ts:438062669858603008,task_id:42356296>"
auto pos = line.find(", start_ts:");
if (pos != std::string::npos && regex_search(line.cbegin() + pos, line.cend(), m, rx))
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ grpc::Status FlashService::CancelMPPTask(

auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down Expand Up @@ -408,7 +408,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest
}
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id(), task_meta.server_id(), task_meta.query_ts(), task_meta.local_query_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId(task_meta), tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
Expand Down Expand Up @@ -204,7 +204,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {nullptr, err_msg};
}

MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id(), request->receiver_meta().server_id(), request->receiver_meta().query_ts(), request->receiver_meta().local_query_id()};
MPPTaskId receiver_id(request->receiver_meta());
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id);
if (tunnel_ptr == nullptr)
Expand Down
47 changes: 43 additions & 4 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,56 @@
// limitations under the License.

#include <Flash/Mpp/MPPTaskId.h>
#include <common/likely.h>
#include <fmt/core.h>

namespace DB
{
bool isOldVersion(const MPPQueryId & mpp_query_id)
{
return mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0;
}


bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const
{
if (query_ts == 0 && local_query_id == 0 && server_id == 0)
// compare with MPP query generated by TiDB that version less than v6.6
bool left_old_version = isOldVersion(*this);
bool right_old_version = isOldVersion(mpp_query_id);
if (unlikely(left_old_version && right_old_version))
{
return start_ts < mpp_query_id.start_ts;
}
return (query_ts < mpp_query_id.query_ts) || (local_query_id < mpp_query_id.local_query_id) || (server_id < mpp_query_id.server_id);
if (unlikely(left_old_version))
{
return true;
}
if (unlikely(right_old_version))
{
return false;
}
// compare with MPP query generated by TiDB that version after v6.6
if (query_ts != mpp_query_id.query_ts)
{
return query_ts < mpp_query_id.query_ts;
}
if (server_id == mpp_query_id.server_id)
{
return local_query_id < mpp_query_id.local_query_id;
}
// now we can't compare reasonably, just choose one randomly by hash.
auto lhash = MPPQueryIdHash()(*this);
auto rhash = MPPQueryIdHash()(mpp_query_id);
if (lhash != rhash)
{
return lhash < rhash;
}
// hash values are same, just compare the rest fields.
if (local_query_id != mpp_query_id.local_query_id)
{
return local_query_id < mpp_query_id.local_query_id;
}
return server_id < mpp_query_id.server_id;
}
bool MPPQueryId::operator==(const MPPQueryId & rid) const
{
Expand All @@ -40,7 +79,7 @@ bool MPPQueryId::operator<=(const MPPQueryId & rid) const

size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept
{
if (mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0)
if (isOldVersion(mpp_query_id))
{
return std::hash<UInt64>()(mpp_query_id.start_ts);
}
Expand All @@ -49,7 +88,7 @@ size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcep

String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<query_id:N/A,start_ts:N/A,task_id:N/A>" : fmt::format("MPP<query:{},start_ts:{},task_id:{}>", query_id.toDebugString(), start_ts, task_id);
return isUnknown() ? "MPP<query_id:N/A,task_id:N/A>" : fmt::format("MPP<query:{},task_id:{}>", query_id.toString(), task_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};
Expand Down
25 changes: 17 additions & 8 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <common/types.h>
#include <fmt/core.h>
#include <kvproto/mpp.pb.h>

namespace DB
{
Expand All @@ -32,11 +33,17 @@ struct MPPQueryId
, server_id(server_id)
, start_ts(start_ts)
{}
MPPQueryId(const mpp::TaskMeta & task_meta)
: query_ts(task_meta.query_ts())
, local_query_id(task_meta.local_query_id())
, server_id(task_meta.server_id())
, start_ts(task_meta.start_ts())
{}
bool operator<(const MPPQueryId & mpp_query_id) const;
bool operator==(const MPPQueryId & rid) const;
bool operator!=(const MPPQueryId & rid) const;
bool operator<=(const MPPQueryId & rid) const;
String toDebugString() const
String toString() const
{
return fmt::format("query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}", query_ts, local_query_id, server_id, start_ts);
}
Expand All @@ -51,17 +58,19 @@ struct MPPQueryIdHash
struct MPPTaskId
{
MPPTaskId()
: start_ts(0)
, task_id(unknown_task_id)
: task_id(unknown_task_id)
, query_id({0, 0, 0, 0}){};

MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id)
: start_ts(start_ts_)
, task_id(task_id_)
, query_id(query_ts, local_query_id, server_id, start_ts_)
MPPTaskId(UInt64 start_ts, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id)
: task_id(task_id_)
, query_id(query_ts, local_query_id, server_id, start_ts)
{}

MPPTaskId(const mpp::TaskMeta & task_meta)
: task_id(task_meta.task_id())
, query_id(task_meta)
{}

UInt64 start_ts;
Int64 task_id;
MPPQueryId query_id;

Expand Down
24 changes: 12 additions & 12 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void MPPTaskManager::removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_
std::pair<MPPTunnelPtr, String> MPPTaskManager::findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq)
{
const auto & meta = request->sender_meta();
MPPTaskId id{meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id()};
MPPTaskId id{meta};
Int64 sender_task_id = meta.task_id();
Int64 receiver_task_id = request->receiver_meta().task_id();

Expand All @@ -62,7 +62,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findAsyncTunnel(const ::mpp::Est
if (query_it != mpp_query_map.end() && !query_it->second->isInNormalState())
{
/// if the query is aborted, return the error message
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString()));
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toString()));
/// meet error
return {nullptr, query_it->second->error_message};
}
Expand Down Expand Up @@ -100,7 +100,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findAsyncTunnel(const ::mpp::Est
cv.notify_all();
}
}
return {nullptr, fmt::format("Can't find task [{},{},{}] within 10 s.", id.query_id.toDebugString(), id.start_ts, id.task_id)};
return {nullptr, fmt::format("Can't find task [{},{}] within 10 s.", id.query_id.toString(), id.task_id)};
}
}
/// don't need to delete the alarm here because registerMPPTask will delete all the related alarm
Expand All @@ -112,7 +112,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findAsyncTunnel(const ::mpp::Est
std::pair<MPPTunnelPtr, String> MPPTaskManager::findTunnelWithTimeout(const ::mpp::EstablishMPPConnectionRequest * request, std::chrono::seconds timeout)
{
const auto & meta = request->sender_meta();
MPPTaskId id{meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id()};
MPPTaskId id{meta};
std::unordered_map<MPPTaskId, MPPTaskPtr>::iterator it;
bool cancelled = false;
String error_message;
Expand All @@ -127,7 +127,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findTunnelWithTimeout(const ::mp
else if (!query_it->second->isInNormalState())
{
/// if the query is aborted, return true to stop waiting timeout.
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString()));
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toString()));
cancelled = true;
error_message = query_it->second->error_message;
return true;
Expand All @@ -149,7 +149,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findTunnelWithTimeout(const ::mp

void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type)
{
LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id.toDebugString(), magic_enum::enum_name(abort_type), reason));
LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id.toString(), magic_enum::enum_name(abort_type), reason));
MPPQueryTaskSetPtr task_set;
{
/// abort task may take a long time, so first
Expand All @@ -159,12 +159,12 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r
auto it = mpp_query_map.find(query_id);
if (it == mpp_query_map.end())
{
LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id.toDebugString()));
LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id.toString()));
return;
}
else if (!it->second->isInNormalState())
{
LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id.toDebugString()));
LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id.toString()));
return;
}
it->second->state = MPPQueryTaskSet::Aborting;
Expand All @@ -178,7 +178,7 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r
it->second->alarms.clear();
if (it->second->task_map.empty())
{
LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id.toDebugString()));
LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id.toString()));
removeMPPQueryTaskSet(query_id, true);
cv.notify_all();
return;
Expand All @@ -189,7 +189,7 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r
}

FmtBuffer fmt_buf;
fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id.toDebugString());
fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id.toString());
for (auto & it : task_set->task_map)
fmt_buf.fmtAppend("{} ", it.first.toString());
LOG_WARNING(log, fmt_buf.toString());
Expand All @@ -200,11 +200,11 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r
{
std::lock_guard lock(mu);
auto it = mpp_query_map.find(query_id);
RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id.toDebugString());
RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id.toString());
it->second->state = MPPQueryTaskSet::Aborted;
cv.notify_all();
}
LOG_WARNING(log, "Finish abort query: " + query_id.toDebugString());
LOG_WARNING(log, "Finish abort query: " + query_id.toString());
}

std::pair<bool, String> MPPTaskManager::registerTask(MPPTaskPtr task)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ void MPPTaskStatistics::logTracingJson()
R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})"
R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})"
R"(,"status":"{}","error_message":"{}","working_time":{},"memory_peak":{}}})",
id.start_ts,
id.query_id.start_ts,
id.task_id,
id.query_id.toDebugString(),
id.query_id.toString(),
is_root,
sender_executor_id,
executor_statistics_collector.resToJson(),
Expand Down
Loading

0 comments on commit ffd0afd

Please sign in to comment.