Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiFlash supports stale read #6459

Merged
merged 8 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ namespace DB
F(type_exchange_partition, {"type", "exchange_partition"})) \
M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \
F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_stale_read_count, "Total number of stale read", Counter) \
hehechen marked this conversation as resolved.
Show resolved Hide resolved
M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \
F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \
Expand Down Expand Up @@ -232,7 +233,7 @@ namespace DB
F(type_active_rpc_async_worker, {"type", "rpc_async_worker_active"}), \
F(type_total_rpc_async_worker, {"type", "rpc_async_worker_total"})) \
M(tiflash_task_scheduler, "Min-tso task scheduler", Gauge, \
F(type_min_tso, {"type", "min_tso"}), \
F(type_min_query_ts, {"type", "min_query_ts"}), \
hehechen marked this conversation as resolved.
Show resolved Hide resolved
F(type_waiting_queries_count, {"type", "waiting_queries_count"}), \
F(type_active_queries_count, {"type", "active_queries_count"}), \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/DAGProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct DAGProperties
bool use_broadcast_join = false;
Int32 mpp_partition_num = 1;
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
UInt64 query_ts = 0;
UInt64 server_id = 1;
UInt64 local_query_id = 1;
Int64 task_id = 1;

Int32 mpp_timeout = 10;
};
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptr<Fl
server_map[partition_id] = std::move(server);
}

void MockComputeServerManager::cancelQuery(size_t start_ts)
void MockComputeServerManager::cancelQuery(String query_id)
{
mpp::CancelTaskRequest req;
auto * meta = req.mutable_meta();
meta->set_start_ts(start_ts);
auto [query_ts, local_query_id_, server_id] = MPPTaskId::decodeQueryID(query_id);
meta->set_query_ts(query_ts);
meta->set_local_query_id(local_query_id_);
meta->set_server_id(server_id);
mpp::CancelTaskResponse response;
for (const auto & server : server_map)
server.second->flashService()->cancelMPPTaskForTest(&req, &response);
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB::tests
{

/** Hold Mock Compute Server to manage the lifetime of them.
* Maintains Mock Compute Server info.
*/
Expand All @@ -49,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>

void resetMockMPPServerInfo(size_t partition_num);

void cancelQuery(size_t start_ts);
void cancelQuery(String query_id);

static String queryInfo();

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockExecutor/AstToPB.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,27 @@ using MPPCtxPtr = std::shared_ptr<MPPCtx>;
struct MPPInfo
{
Timestamp start_ts;
UInt64 query_ts;
UInt64 server_id;
UInt64 local_query_id;
Int64 partition_id;
Int64 task_id;
const std::vector<Int64> sender_target_task_ids;
const std::unordered_map<String, std::vector<Int64>> receiver_source_task_ids_map;

MPPInfo(
Timestamp start_ts_,
UInt64 query_ts_,
UInt64 server_id_,
UInt64 local_query_id_,
Int64 partition_id_,
Int64 task_id_,
const std::vector<Int64> & sender_target_task_ids_,
const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map_)
: start_ts(start_ts_)
, query_ts(query_ts_)
, server_id(server_id_)
, local_query_id(local_query_id_)
, partition_id(partition_id_)
, task_id(task_id_)
, sender_target_task_ids(sender_target_task_ids_)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(it->second[i]);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i].addr : Debug::LOCAL_HOST;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(task_id);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i++].addr : Debug::LOCAL_HOST;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Debug/dbgQueryCompiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ struct QueryFragment
{
MPPInfo mpp_info(
properties.start_ts,
properties.query_ts,
properties.server_id,
properties.local_query_id,
partition_id,
task_ids[partition_id],
sender_target_task_ids,
Expand All @@ -141,7 +144,7 @@ struct QueryFragment
}
else
{
MPPInfo mpp_info(properties.start_ts, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
ret.push_back(toQueryTask(properties, mpp_info, context));
}
return ret;
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc

mpp::TaskMeta root_tm;
root_tm.set_start_ts(properties.start_ts);
root_tm.set_query_ts(properties.query_ts);
root_tm.set_local_query_id(properties.local_query_id);
root_tm.set_server_id(properties.server_id);
root_tm.set_address(root_addr);
root_tm.set_task_id(-1);
root_tm.set_partition_id(-1);
Expand Down Expand Up @@ -71,6 +74,9 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(Debug::LOCAL_HOST);
tm.set_task_id(root_task_id);
tm.set_partition_id(-1);
Expand All @@ -84,6 +90,9 @@ void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tip
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(addr);
tm.set_task_id(task_id);
tm.set_partition_id(-1);
Expand All @@ -109,6 +118,9 @@ void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr<mpp::DispatchT
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand All @@ -128,6 +140,9 @@ void prepareDispatchTaskRequestWithMultipleContext(QueryTask & task, std::shared
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class DAGContext
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id())
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id(), mpp_task_meta.server_id(), mpp_task_meta.query_ts(), mpp_task_meta.local_query_id())
hehechen marked this conversation as resolved.
Show resolved Hide resolved
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ grpc::Status FlashService::CancelMPPTask(

auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down Expand Up @@ -383,7 +383,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest
}
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ extern const char force_no_local_region_for_mpp_task[];

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: meta(meta_)
, id(meta.start_ts(), meta.task_id())
, id(meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id())
hehechen marked this conversation as resolved.
Show resolved Hide resolved
hehechen marked this conversation as resolved.
Show resolved Hide resolved
, context(context_)
, manager(context_->getTMTContext().getMPPTaskManager().get())
, schedule_entry(manager, id)
Expand Down Expand Up @@ -139,7 +139,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id(), task_meta.server_id(), task_meta.query_ts(), task_meta.local_query_id()}, tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
Expand Down Expand Up @@ -204,7 +204,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {nullptr, err_msg};
}

MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id()};
MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id(), request->receiver_meta().server_id(), request->receiver_meta().query_ts(), request->receiver_meta().local_query_id()};
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id);
if (tunnel_ptr == nullptr)
Expand Down Expand Up @@ -449,7 +449,7 @@ void MPPTask::runImpl()
void MPPTask::handleError(const String & error_msg)
{
auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg);
manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR);
manager->abortMPPQuery(id.query_id, updated_msg, AbortType::ONERROR);
if (!registered)
// if the task is not registered, need to cancel it explicitly
abort(error_msg, AbortType::ONERROR);
Expand Down
26 changes: 24 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,41 @@
// limitations under the License.

#include <Flash/Mpp/MPPTaskId.h>
#include <IO/WriteHelpers.h>
#include <fmt/core.h>

namespace DB
{
String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<query:N/A,task:N/A>" : fmt::format("MPP<query:{},task:{}>", start_ts, task_id);
return isUnknown() ? "MPP<query_id:N/A,start_ts:N/A,query_ts:N/A,task_id:N/A,server_id:N/A>" : fmt::format("MPP<query:{},start_ts:{},query_ts:{},task_id:{},server_id:{},local_query_id:{}>", query_id, start_ts, query_ts, task_id, server_id, local_query_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const String MPPTaskId::Max_Query_Id = generateQueryID(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64);

String MPPTaskId::generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_)
{
if (local_query_id_ == 0)
{
return intToHex(0) + intToHex(0) + intToHex(start_ts_);
}
return intToHex(query_ts_) + intToHex(local_query_id_) + intToHex(server_id_);
}

std::tuple<UInt64, UInt64, UInt64> MPPTaskId::decodeQueryID(String query_id_str)
{
UInt64 decode_query_ts = std::stoull(query_id_str.substr(0, sizeof(Int64) * 2), 0, 16);
UInt64 decode_local_query_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2, sizeof(Int64) * 2), 0, 16);
UInt64 decode_server_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2 * 2, sizeof(UInt64) * 2), 0, 16);
LOG_DEBUG(Logger::get(__FUNCTION__), "query_ts_={}, local_query_id_={}, server_id_={}", decode_query_ts, decode_local_query_id, decode_server_id);
return {decode_query_ts, decode_local_query_id, decode_server_id};
}

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
return lid.start_ts == rid.start_ts && lid.task_id == rid.task_id;
return lid.query_id == rid.query_id && lid.task_id == rid.task_id;
}
} // namespace DB
30 changes: 26 additions & 4 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,45 @@ struct MPPTaskId
{
MPPTaskId()
: start_ts(0)
, server_id(0)
, query_ts(0)
, task_id(unknown_task_id){};

MPPTaskId(UInt64 start_ts_, Int64 task_id_)
MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id_, UInt64 query_ts_, UInt64 local_query_id_)
hehechen marked this conversation as resolved.
Show resolved Hide resolved
: start_ts(start_ts_)
, task_id(task_id_){};
, server_id(server_id_)
, query_ts(query_ts_)
, task_id(task_id_)
, local_query_id(local_query_id_)
{
query_id = generateQueryID(query_ts, local_query_id, server_id, start_ts_);
}

UInt64 start_ts;
hehechen marked this conversation as resolved.
Show resolved Hide resolved
UInt64 server_id;
UInt64 query_ts;
Int64 task_id;
UInt64 local_query_id;
String query_id;

bool isUnknown() const { return task_id == unknown_task_id; }

String toString() const;

static const MPPTaskId unknown_mpp_task_id;
static const String Max_Query_Id;
static String generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_);
static std::tuple<UInt64, UInt64, UInt64> decodeQueryID(String query_id_str);

private:
static constexpr Int64 unknown_task_id = -1;

static String intToHex(UInt64 i)
{
std::stringstream stream;
stream << std::setfill('0') << std::setw(sizeof(UInt64) * 2)
<< std::hex << i;
return stream.str();
}
};

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid);
Expand All @@ -53,7 +75,7 @@ class hash<DB::MPPTaskId>
public:
size_t operator()(const DB::MPPTaskId & id) const
{
return hash<UInt64>()(id.start_ts) ^ hash<Int64>()(id.task_id);
return hash<String>()(id.query_id);
}
};
} // namespace std
Loading