Skip to content

Commit

Permalink
TiFlash supports stale read
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <[email protected]>
  • Loading branch information
hehechen committed Dec 14, 2022
1 parent 165e23e commit ce91962
Show file tree
Hide file tree
Showing 31 changed files with 437 additions and 199 deletions.
5 changes: 3 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ namespace DB
F(type_exchange_partition, {"type", "exchange_partition"})) \
M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \
F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_stale_read_count, "Total number of stale read", Counter) \
M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \
F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \
Expand Down Expand Up @@ -232,7 +233,7 @@ namespace DB
F(type_active_rpc_async_worker, {"type", "rpc_async_worker_active"}), \
F(type_total_rpc_async_worker, {"type", "rpc_async_worker_total"})) \
M(tiflash_task_scheduler, "Min-tso task scheduler", Gauge, \
F(type_min_tso, {"type", "min_tso"}), \
F(type_min_query_ts, {"type", "min_query_ts"}), \
F(type_waiting_queries_count, {"type", "waiting_queries_count"}), \
F(type_active_queries_count, {"type", "active_queries_count"}), \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/DAGProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct DAGProperties
bool use_broadcast_join = false;
Int32 mpp_partition_num = 1;
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
UInt64 query_ts = 0;
UInt64 server_id = 1;
UInt64 local_query_id = 1;
Int64 task_id = 1;

Int32 mpp_timeout = 10;
};
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptr<Fl
server_map[partition_id] = std::move(server);
}

void MockComputeServerManager::cancelQuery(size_t start_ts)
void MockComputeServerManager::cancelQuery(String query_id)
{
mpp::CancelTaskRequest req;
auto * meta = req.mutable_meta();
meta->set_start_ts(start_ts);
auto [query_ts, local_query_id_, server_id] = MPPTaskId::decodeQueryID(query_id);
meta->set_query_ts(query_ts);
meta->set_local_query_id(local_query_id_);
meta->set_server_id(server_id);
mpp::CancelTaskResponse response;
for (const auto & server : server_map)
server.second->flashService()->cancelMPPTaskForTest(&req, &response);
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB::tests
{

/** Hold Mock Compute Server to manage the lifetime of them.
* Maintains Mock Compute Server info.
*/
Expand All @@ -49,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>

void resetMockMPPServerInfo(size_t partition_num);

void cancelQuery(size_t start_ts);
void cancelQuery(String query_id);

static String queryInfo();

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockExecutor/AstToPB.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,27 @@ using MPPCtxPtr = std::shared_ptr<MPPCtx>;
struct MPPInfo
{
Timestamp start_ts;
UInt64 query_ts;
UInt64 server_id;
UInt64 local_query_id;
Int64 partition_id;
Int64 task_id;
const std::vector<Int64> sender_target_task_ids;
const std::unordered_map<String, std::vector<Int64>> receiver_source_task_ids_map;

MPPInfo(
Timestamp start_ts_,
UInt64 query_ts_,
UInt64 server_id_,
UInt64 local_query_id_,
Int64 partition_id_,
Int64 task_id_,
const std::vector<Int64> & sender_target_task_ids_,
const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map_)
: start_ts(start_ts_)
, query_ts(query_ts_)
, server_id(server_id_)
, local_query_id(local_query_id_)
, partition_id(partition_id_)
, task_id(task_id_)
, sender_target_task_ids(sender_target_task_ids_)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(it->second[i]);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i].addr : Debug::LOCAL_HOST;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(task_id);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i++].addr : Debug::LOCAL_HOST;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Debug/dbgQueryCompiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ struct QueryFragment
{
MPPInfo mpp_info(
properties.start_ts,
properties.query_ts,
properties.server_id,
properties.local_query_id,
partition_id,
task_ids[partition_id],
sender_target_task_ids,
Expand All @@ -141,7 +144,7 @@ struct QueryFragment
}
else
{
MPPInfo mpp_info(properties.start_ts, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
ret.push_back(toQueryTask(properties, mpp_info, context));
}
return ret;
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc

mpp::TaskMeta root_tm;
root_tm.set_start_ts(properties.start_ts);
root_tm.set_query_ts(properties.query_ts);
root_tm.set_local_query_id(properties.local_query_id);
root_tm.set_server_id(properties.server_id);
root_tm.set_address(root_addr);
root_tm.set_task_id(-1);
root_tm.set_partition_id(-1);
Expand Down Expand Up @@ -71,6 +74,9 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(Debug::LOCAL_HOST);
tm.set_task_id(root_task_id);
tm.set_partition_id(-1);
Expand All @@ -84,6 +90,9 @@ void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tip
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(addr);
tm.set_task_id(task_id);
tm.set_partition_id(-1);
Expand All @@ -109,6 +118,9 @@ void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr<mpp::DispatchT
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand All @@ -128,6 +140,9 @@ void prepareDispatchTaskRequestWithMultipleContext(QueryTask & task, std::shared
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class DAGContext
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id())
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id(), mpp_task_meta.server_id(), mpp_task_meta.query_ts(), mpp_task_meta.local_query_id())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ grpc::Status FlashService::CancelMPPTask(

auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down Expand Up @@ -383,7 +383,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest
}
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ extern const char force_no_local_region_for_mpp_task[];

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: meta(meta_)
, id(meta.start_ts(), meta.task_id())
, id(meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id())
, context(context_)
, manager(context_->getTMTContext().getMPPTaskManager().get())
, schedule_entry(manager, id)
Expand Down Expand Up @@ -139,7 +139,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id(), task_meta.server_id(), task_meta.query_ts(), task_meta.local_query_id()}, tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
Expand Down Expand Up @@ -204,7 +204,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {nullptr, err_msg};
}

MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id()};
MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id(), request->receiver_meta().server_id(), request->receiver_meta().query_ts(), request->receiver_meta().local_query_id()};
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id);
if (tunnel_ptr == nullptr)
Expand Down Expand Up @@ -449,7 +449,7 @@ void MPPTask::runImpl()
void MPPTask::handleError(const String & error_msg)
{
auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg);
manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR);
manager->abortMPPQuery(id.query_id, updated_msg, AbortType::ONERROR);
if (!registered)
// if the task is not registered, need to cancel it explicitly
abort(error_msg, AbortType::ONERROR);
Expand Down
26 changes: 24 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,41 @@
// limitations under the License.

#include <Flash/Mpp/MPPTaskId.h>
#include <IO/WriteHelpers.h>
#include <fmt/core.h>

namespace DB
{
String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<query:N/A,task:N/A>" : fmt::format("MPP<query:{},task:{}>", start_ts, task_id);
return isUnknown() ? "MPP<query_id:N/A,start_ts:N/A,query_ts:N/A,task_id:N/A,server_id:N/A>" : fmt::format("MPP<query:{},start_ts:{},query_ts:{},task_id:{},server_id:{},local_query_id:{}>", query_id, start_ts, query_ts, task_id, server_id, local_query_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const String MPPTaskId::Max_Query_Id = generateQueryID(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64);

String MPPTaskId::generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_)
{
if (local_query_id_ == 0)
{
return intToHex(0) + intToHex(0) + intToHex(start_ts_);
}
return intToHex(query_ts_) + intToHex(local_query_id_) + intToHex(server_id_);
}

std::tuple<UInt64, UInt64, UInt64> MPPTaskId::decodeQueryID(String query_id_str)
{
UInt64 decode_query_ts = std::stoull(query_id_str.substr(0, sizeof(Int64) * 2), 0, 16);
UInt64 decode_local_query_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2, sizeof(Int64) * 2), 0, 16);
UInt64 decode_server_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2 * 2, sizeof(UInt64) * 2), 0, 16);
LOG_DEBUG(Logger::get(__FUNCTION__), "query_ts_={}, local_query_id_={}, server_id_={}", decode_query_ts, decode_local_query_id, decode_server_id);
return {decode_query_ts, decode_local_query_id, decode_server_id};
}

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
return lid.start_ts == rid.start_ts && lid.task_id == rid.task_id;
return lid.query_id == rid.query_id && lid.task_id == rid.task_id;
}
} // namespace DB
30 changes: 26 additions & 4 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,45 @@ struct MPPTaskId
{
MPPTaskId()
: start_ts(0)
, server_id(0)
, query_ts(0)
, task_id(unknown_task_id){};

MPPTaskId(UInt64 start_ts_, Int64 task_id_)
MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id_, UInt64 query_ts_, UInt64 local_query_id_)
: start_ts(start_ts_)
, task_id(task_id_){};
, server_id(server_id_)
, query_ts(query_ts_)
, task_id(task_id_)
, local_query_id(local_query_id_)
{
query_id = generateQueryID(query_ts, local_query_id, server_id, start_ts_);
}

UInt64 start_ts;
UInt64 server_id;
UInt64 query_ts;
Int64 task_id;
UInt64 local_query_id;
String query_id;

bool isUnknown() const { return task_id == unknown_task_id; }

String toString() const;

static const MPPTaskId unknown_mpp_task_id;
static const String Max_Query_Id;
static String generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_);
static std::tuple<UInt64, UInt64, UInt64> decodeQueryID(String query_id_str);

private:
static constexpr Int64 unknown_task_id = -1;

static String intToHex(UInt64 i)
{
std::stringstream stream;
stream << std::setfill('0') << std::setw(sizeof(UInt64) * 2)
<< std::hex << i;
return stream.str();
}
};

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid);
Expand All @@ -53,7 +75,7 @@ class hash<DB::MPPTaskId>
public:
size_t operator()(const DB::MPPTaskId & id) const
{
return hash<UInt64>()(id.start_ts) ^ hash<Int64>()(id.task_id);
return hash<String>()(id.query_id);
}
};
} // namespace std
Loading

0 comments on commit ce91962

Please sign in to comment.