diff --git a/contrib/tiflash-proxy-cmake/CMakeLists.txt b/contrib/tiflash-proxy-cmake/CMakeLists.txt index d48ef1601d2..b9bd9585e54 100644 --- a/contrib/tiflash-proxy-cmake/CMakeLists.txt +++ b/contrib/tiflash-proxy-cmake/CMakeLists.txt @@ -14,10 +14,39 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG" OR SAN_DEBUG) set(_TIFLASH_PROXY_BUILD_PROFILE "debug") +<<<<<<< HEAD set(_TIFLASH_PROXY_MAKE_COMMAND make debug) else() set(_TIFLASH_PROXY_BUILD_PROFILE "release") set(_TIFLASH_PROXY_MAKE_COMMAND make release) +======= + if (ENABLE_JEMALLOC) + if (APPLE) + message(STATUS "proxy's jemalloc is disabled (AppleOS)") + set(_TIFLASH_PROXY_MAKE_COMMAND make debug) + else() + message(STATUS "proxy's jemalloc is enabled") + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make debug) + endif() + else() + message(STATUS "proxy's jemalloc is disabled") + set(_TIFLASH_PROXY_MAKE_COMMAND make debug) + endif() +else() + set(_TIFLASH_PROXY_BUILD_PROFILE "release") + if (ENABLE_JEMALLOC) + if (APPLE) + message(STATUS "proxy's jemalloc is disabled (AppleOS)") + set(_TIFLASH_PROXY_MAKE_COMMAND make release) + else() + message(STATUS "proxy's jemalloc is enabled") + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make release) + endif() + else() + message(STATUS "proxy's jemalloc is disabled") + set(_TIFLASH_PROXY_MAKE_COMMAND make release) + endif() +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) endif() set(_TIFLASH_PROXY_SOURCE_DIR "${TiFlash_SOURCE_DIR}/contrib/tiflash-proxy") diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 0d4f0269460..56e73641f68 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -465,6 +465,22 @@ namespace DB "Bucketed snapshot total size", \ Histogram, \ F(type_approx_raft_snapshot, {{"type", "approx_raft_snapshot"}}, ExpBuckets{1024, 2, 24})) /* 16G */ \ +<<<<<<< HEAD +======= + M(tiflash_raft_learner_read_failures_count, \ + "Raft learner read failure reason counter", \ + Counter, \ + F(type_not_found_tiflash, {{"type", "not_found_tiflash"}}), \ + F(type_epoch_not_match, {{"type", "epoch_not_match"}}), \ + F(type_not_leader, {{"type", "not_leader"}}), \ + F(type_not_found_tikv, {{"type", "not_found_tikv"}}), \ + F(type_bucket_epoch_not_match, {{"type", "bucket_epoch_not_match"}}), \ + F(type_flashback, {{"type", "flashback"}}), \ + F(type_key_not_in_region, {{"type", "key_not_in_region"}}), \ + F(type_tikv_server_issue, {{"type", "tikv_server_issue"}}), \ + F(type_tikv_lock, {{"type", "tikv_lock"}}), \ + F(type_other, {{"type", "other"}})) \ +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) /* required by DBaaS */ \ M(tiflash_server_info, \ "Indicate the tiflash server info, and the value is the start timestamp (s).", \ diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 2db9476804b..a1201a4b44c 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -26,6 +26,12 @@ #include #include #include +<<<<<<< HEAD +======= +#include +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) #include #include @@ -108,6 +114,10 @@ DBGInvoker::DBGInvoker() MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFilesWithHandles); regSchemalessFunc("region_snapshot_apply_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyDTFiles); regSchemalessFunc("region_ingest_sst", MockRaftCommand::dbgFuncIngestSST); + // Test whether a PK exists in KVStore. + regSchemalessFunc("find_key_kvstore", dbgFuncFindKey); + // Test whether a PK exists in DT. + regSchemafulFunc("find_key_dt", dbgFuncFindKeyDt); regSchemalessFunc("init_fail_point", DbgFailPointFunc::dbgInitFailPoint); regSchemalessFunc("enable_fail_point", DbgFailPointFunc::dbgEnableFailPoint); diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 8c5f0710b9d..c50691aac28 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -769,7 +769,7 @@ TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const St auto it = tables_by_name.find(qualified_name); if (it == tables_by_name.end()) { - throw Exception("Mock TiDB table " + qualified_name + " does not exists", ErrorCodes::UNKNOWN_TABLE); + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Mock TiDB table {} does not exists", qualified_name); } return it->second; diff --git a/dbms/src/Debug/dbgFuncInvestigator.cpp b/dbms/src/Debug/dbgFuncInvestigator.cpp new file mode 100644 index 00000000000..88dd9e828d5 --- /dev/null +++ b/dbms/src/Debug/dbgFuncInvestigator.cpp @@ -0,0 +1,308 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ + +struct MatchResult +{ + String mapped_database_name; + String table_name; + std::vector> in_default; + std::vector> in_write; + std::vector in_lock; + std::unordered_map regions; + + String toString() const + { + FmtBuffer fmt_buf; + fmt_buf.fmtAppend("default_cf "); + fmt_buf.joinStr( + in_default.begin(), + in_default.end(), + [](const auto & a, const auto &) { return fmt::format("{}-{}", a.first, a.second); }, + ":"); + fmt_buf.fmtAppend("; write_cf "); + fmt_buf.joinStr( + in_write.begin(), + in_write.end(), + [](const auto & a, const auto &) { return fmt::format("{}-{}", a.first, a.second); }, + ":"); + fmt_buf.fmtAppend("; lock_cf "); + fmt_buf.joinStr(in_lock.begin(), in_lock.end(), ":"); + for (const auto & [region_id, region] : regions) + { + fmt_buf.fmtAppend( + "; region {} {}, tikv_range: {}; ", + region_id, + region->getDebugString(), + region->getRange()->toDebugString()); + } + return fmt_buf.toString(); + } +}; + +/// 1. If the arg is [start1, end1], find all key-value pairs in this range; +/// 2. If the arg is [start1], make sure it is not a common handle, and we will only check the key-value pair by start1; +/// 3. If the arg is [start1, end1, start2, end2, ...], it must be a common handle, return all key-value pairs within the range. +void dbgFuncFindKey(Context & context, const ASTs & args, DBGInvoker::Printer output) +{ + if (args.size() < 3) + throw Exception( + "Args not matched, should be: database-name, table-name, start1 [, start2, ..., end1, end2, ...]", + ErrorCodes::BAD_ARGUMENTS); + + auto & tmt = context.getTMTContext(); + auto & kvstore = *tmt.getKVStore(); + MatchResult result; + const String & database_name_raw = typeid_cast(*args[0]).name; + const String & table_name = typeid_cast(*args[1]).name; + + auto maybe_database_name = mappedDatabaseWithOptional(context, database_name_raw); + if (maybe_database_name == std::nullopt) + { + output(fmt::format("Database {} not found.", database_name_raw)); + return; + } + result.mapped_database_name = maybe_database_name.value(); + auto & mapped_database_name = result.mapped_database_name; + result.table_name = table_name; + + auto schema_syncer = tmt.getSchemaSyncerManager(); + auto storage = tmt.getStorages().getByName(mapped_database_name, table_name, false); + if (storage == nullptr) + { + output(fmt::format("can't find table {} {}", mapped_database_name, table_name)); + return; + } + + auto table_info = storage->getTableInfo(); + schema_syncer->syncTableSchema(context, table_info.keyspace_id, table_info.id); + if (table_info.partition.num > 0) + { + for (const auto & def : table_info.partition.definitions) + { + schema_syncer->syncTableSchema(context, table_info.keyspace_id, def.id); + } + } + + auto table_id = table_info.id; + + // tablePrefix_rowPrefix_tableID_rowID + TiKVKey start_key, end_key; + HandleID start_handle, end_handle; + + constexpr static size_t OFFSET = 2; + if (table_info.is_common_handle) + { + size_t arg_size = args.size() - OFFSET; + // The `start` and `end` argments should be provided in pair. Therefore, the number of arguments must be even. + if ((arg_size & 1) != 0) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Args not matched for common handle table, arg_size={}, should be: database-name, table-name, " + "start_col, [, start_col2, ..., end_col1, end_col2, ...]", + arg_size); + } + size_t handle_column_size = table_info.is_common_handle ? table_info.getPrimaryIndexInfo().idx_cols.size() : 1; + auto key_size = arg_size / 2; + std::vector start_field; + start_field.reserve(key_size); + std::vector end_field; + end_field.reserve(key_size); + + for (size_t i = 0; i < handle_column_size; i++) + { + auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset]; + auto start_datum = TiDB::DatumBumpy( + RegionBench::convertField(column_info, typeid_cast(*args[OFFSET + i]).value), + column_info.tp); + start_field.emplace_back(start_datum.field()); + auto end_datum = TiDB::DatumBumpy( + RegionBench::convertField( + column_info, + typeid_cast(*args[OFFSET + key_size + i]).value), + column_info.tp); + end_field.emplace_back(end_datum.field()); + } + + start_key = RecordKVFormat::genKey(table_info, start_field); + end_key = RecordKVFormat::genKey(table_info, end_field); + } + else + { + start_handle = static_cast(safeGet(typeid_cast(*args[OFFSET]).value)); + start_key = RecordKVFormat::genKey(table_id, start_handle); + if (args.size() == 3) + { + end_handle = start_handle + 1; + } + else + { + end_handle + = static_cast(safeGet(typeid_cast(*args[OFFSET + 1]).value)); + } + end_key = RecordKVFormat::genKey(table_id, end_handle); + } + + auto range = RegionRangeKeys(TiKVKey::copyFrom(start_key), std::move(end_key)); + auto regions = kvstore.getRegionsByRangeOverlap(range.comparableKeys()); + + for (const auto & [region_id, region] : regions) + { + auto r = RegionBench::DebugRegion(region); + const auto & data = r.debugData(); + + for (const auto & [k, v] : data.defaultCF().getData()) + { + if (k.first == start_handle) + { + result.in_default.emplace_back(region_id, k.second); + } + } + for (const auto & [k, v] : data.writeCF().getData()) + { + if (k.first == start_handle) + { + result.in_write.emplace_back(region_id, k.second); + } + } + + auto lock_key = std::make_shared(TiKVKey::copyFrom(start_key)); + if (data.lockCF().getData().contains( + RegionLockCFDataTrait::Key{lock_key, std::string_view(lock_key->data(), lock_key->dataSize())})) + { + result.in_lock.emplace_back(region_id); + } + } + result.regions = regions; + output(fmt::format("find key result {}", result.toString())); +} + +BlockInputStreamPtr dbgFuncFindKeyDt(Context & context, const ASTs & args) +{ + if (args.size() < 4) + throw Exception( + "Args not matched, should be: database-name, table-name, key, value, [key2, value2, ...]", + ErrorCodes::BAD_ARGUMENTS); + + auto & tmt = context.getTMTContext(); + const String & database_name_raw = typeid_cast(*args[0]).name; + const String & table_name_raw = typeid_cast(*args[1]).name; + + auto maybe_database_name = mappedDatabaseWithOptional(context, database_name_raw); + if (maybe_database_name == std::nullopt) + { + LOG_INFO(DB::Logger::get(), "Can't find database {}", database_name_raw); + return std::make_shared("Error"); + } + auto mapped_database_name = maybe_database_name.value(); + auto mapped_qualified_table_name = mappedTable(context, database_name_raw, table_name_raw); + auto mapped_table_name = mapped_qualified_table_name.second; + + auto schema_syncer = tmt.getSchemaSyncerManager(); + auto storage = tmt.getStorages().getByName(mapped_database_name, table_name_raw, false); + if (storage == nullptr) + { + LOG_INFO(DB::Logger::get(), "Can't find database and table {}.{}", mapped_database_name, mapped_table_name); + return std::make_shared("Error"); + } + + auto table_info = storage->getTableInfo(); + schema_syncer->syncTableSchema(context, table_info.keyspace_id, table_info.id); + if (table_info.partition.num > 0) + { + for (const auto & def : table_info.partition.definitions) + { + schema_syncer->syncTableSchema(context, table_info.keyspace_id, def.id); + } + } + + auto key = safeGet(typeid_cast(*args[2]).value); + auto value = safeGet(typeid_cast(*args[3]).value); + + constexpr static size_t OFFSET = 4; + FmtBuffer fmt_buf; + auto key_size = args.size() - OFFSET; + if (key_size & 1) + { + LOG_INFO(DB::Logger::get(), "Key-values should be in pair {}", database_name_raw, table_name_raw); + return std::make_shared("Error"); + } + for (size_t i = 0; i != key_size / 2; i++) + { + auto k = safeGet(typeid_cast(*args[OFFSET + 2 * i]).value); + auto v = safeGet(typeid_cast(*args[OFFSET + 2 * i + 1]).value); + fmt_buf.fmtAppend(" and {} = {}", k, v); + } + String query; + if (table_info.is_common_handle && !table_info.pk_is_handle) + { + query = fmt::format( + "selraw *,_INTERNAL_VERSION,_INTERNAL_DELMARK,_tidb_rowid from {}.{} where {} = {}{}", + mapped_database_name, + mapped_table_name, + key, + value, + fmt_buf.toString()); + } + else + { + query = fmt::format( + "selraw *,_INTERNAL_VERSION,_INTERNAL_DELMARK from {}.{} where {} = {}{}", + mapped_database_name, + mapped_table_name, + key, + value, + fmt_buf.toString()); + } + LOG_INFO(DB::Logger::get(), "The query is {}", query); + ParserSelectQuery parser; + ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "dbgFuncFindKeyDt", 0); + + InterpreterSelectQuery interpreter(ast, context); + auto res = interpreter.execute(); + return res.in; +} + + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Debug/dbgFuncInvestigator.h b/dbms/src/Debug/dbgFuncInvestigator.h new file mode 100644 index 00000000000..12671186cb7 --- /dev/null +++ b/dbms/src/Debug/dbgFuncInvestigator.h @@ -0,0 +1,26 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ + +void dbgFuncFindKey(Context & context, const ASTs & args, DBGInvoker::Printer output); +BlockInputStreamPtr dbgFuncFindKeyDt(Context & context, const ASTs & args); + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Debug/dbgFuncMockRaftCommand.cpp b/dbms/src/Debug/dbgFuncMockRaftCommand.cpp index d221b36a418..0a85bf28108 100644 --- a/dbms/src/Debug/dbgFuncMockRaftCommand.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftCommand.cpp @@ -14,9 +14,15 @@ #include #include +#include #include +<<<<<<< HEAD:dbms/src/Debug/dbgFuncMockRaftCommand.cpp #include #include +======= +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)):dbms/src/Debug/dbgKVStore/dbgFuncMockRaftCommand.cpp #include #include #include diff --git a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp index d2062d92bd1..e8c135bb632 100644 --- a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp @@ -16,11 +16,19 @@ #include #include #include +<<<<<<< HEAD:dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp #include #include #include #include #include +======= +#include +#include +#include +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)):dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp #include #include #include diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 8042e0055a4..9bb24013b0a 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -14,9 +14,15 @@ #include #include +#include #include +<<<<<<< HEAD:dbms/src/Debug/dbgFuncRegion.cpp #include #include +======= +#include +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)):dbms/src/Debug/dbgKVStore/dbgFuncRegion.cpp #include #include #include diff --git a/dbms/src/Debug/dbgNaturalDag.cpp b/dbms/src/Debug/dbgNaturalDag.cpp index 64efbc6469c..5b0dffec826 100644 --- a/dbms/src/Debug/dbgNaturalDag.cpp +++ b/dbms/src/Debug/dbgNaturalDag.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include -#include #include #include #include diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 4efbdb0dcbc..f0e917d3cd4 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -13,8 +13,13 @@ // limitations under the License. #include +#include #include +<<<<<<< HEAD #include +======= +#include +>>>>>>> ce8ae39fb9 (Debug: Add find key debug invoker (#8853)) #include #include #include diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp new file mode 100644 index 00000000000..c32d09a7b4e --- /dev/null +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -0,0 +1,470 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +void UnavailableRegions::tryThrowRegionException() +{ + // For batch-cop request (not handled by disagg write node), all unavailable regions, include the ones with lock exception, should be collected and retry next round. + // For normal cop request, which only contains one region, LockException should be thrown directly and let upper layer (like client-c, tidb, tispark) handle it. + // For batch-cop request (handled by disagg write node), LockException should be thrown directly and let upper layer (disagg read node) handle it. + + if (is_wn_disagg_read && !region_locks.empty()) + throw LockException(std::move(region_locks)); + + if (!batch_cop && !region_locks.empty()) + throw LockException(std::move(region_locks)); + + if (!ids.empty()) + throw RegionException(std::move(ids), status, extra_msg.c_str()); +} + +void UnavailableRegions::addRegionWaitIndexTimeout( + const RegionID region_id, + UInt64 index_to_wait, + UInt64 current_applied_index) +{ + if (!batch_cop) + { + // If server is being terminated / time-out, add the region_id into `unavailable_regions` to other store. + addStatus(region_id, RegionException::RegionReadStatus::NOT_FOUND, ""); + return; + } + + // When wait index timeout happens, we return a `TiFlashException` instead of `RegionException` to break + // the read request from retrying. + // TODO: later maybe we can return SERVER_IS_BUSY to the client + throw TiFlashException( + Errors::Coprocessor::RegionError, + "Region unavailable, region_id={} wait_index={} applied_index={}", + region_id, + index_to_wait, + current_applied_index); +} + +/// LearnerReadWorker /// + +LearnerReadWorker::LearnerReadWorker( + MvccQueryInfo & mvcc_query_info_, + TMTContext & tmt_, + bool for_batch_cop, + bool is_wn_disagg_read, + const LoggerPtr & log_) + : mvcc_query_info(mvcc_query_info_) + , tmt(tmt_) + , kvstore(tmt.getKVStore()) + , log(log_) + , unavailable_regions(for_batch_cop, is_wn_disagg_read) +{ + assert(log != nullptr); + stats.num_regions = mvcc_query_info.regions_query_info.size(); +} + +LearnerReadSnapshot LearnerReadWorker::buildRegionsSnapshot() +{ + LearnerReadSnapshot regions_snapshot; + // check region is not null and store region map. + const auto & regions_info = mvcc_query_info.regions_query_info; + for (const auto & info : regions_info) + { + auto region = kvstore->getRegion(info.region_id); + if (region == nullptr) + { + LOG_WARNING(log, "region not found in KVStore, region_id={}", info.region_id); + throw RegionException({info.region_id}, RegionException::RegionReadStatus::NOT_FOUND, nullptr); + } + regions_snapshot.emplace(info.region_id, std::move(region)); + } + // make sure regions are not duplicated. + if (unlikely(regions_snapshot.size() != regions_info.size())) + throw TiFlashException( + Errors::Coprocessor::BadRequest, + "Duplicate region id, n_request={} n_actual={}", + regions_info.size(), + regions_snapshot.size()); + return regions_snapshot; +} + +std::vector LearnerReadWorker::buildBatchReadIndexReq( + const RegionTable & region_table, + const LearnerReadSnapshot & regions_snapshot, + RegionsReadIndexResult & batch_read_index_result) +{ + const auto & regions_info = mvcc_query_info.regions_query_info; + std::vector batch_read_index_req; + batch_read_index_req.reserve(regions_info.size()); + + // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. + auto read_index_tso + = mvcc_query_info.read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info.read_tso; + for (const auto & region_to_query : regions_info) + { + const RegionID region_id = region_to_query.region_id; + // don't stale read in test scenarios. + bool can_stale_read = mvcc_query_info.read_tso != std::numeric_limits::max() + && read_index_tso <= region_table.getSelfSafeTS(region_id); + if (can_stale_read) + { + batch_read_index_result.emplace(region_id, kvrpcpb::ReadIndexResponse()); + ++stats.num_stale_read; + continue; + } + + if (auto ori_read_index = mvcc_query_info.getReadIndexRes(region_id); ori_read_index) + { + // the read index result from cache + auto resp = kvrpcpb::ReadIndexResponse(); + resp.set_read_index(ori_read_index); + batch_read_index_result.emplace(region_id, std::move(resp)); + ++stats.num_cached_read_index; + } + else + { + // generate request for read index + const auto & region = regions_snapshot.find(region_id)->second; + batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, read_index_tso)); + ++stats.num_read_index_request; + } + } + assert(stats.num_regions == stats.num_stale_read + stats.num_cached_read_index + stats.num_read_index_request); + return batch_read_index_req; +} + +void LearnerReadWorker::doBatchReadIndex( + const std::vector & batch_read_index_req, + const UInt64 timeout_ms, + RegionsReadIndexResult & batch_read_index_result) +{ + const auto & make_default_batch_read_index_result = [&](bool with_region_error) { + for (const auto & req : batch_read_index_req) + { + auto resp = kvrpcpb::ReadIndexResponse(); + if (with_region_error) + resp.mutable_region_error()->mutable_region_not_found(); + batch_read_index_result.emplace(req.context().region_id(), std::move(resp)); + } + }; + kvstore->addReadIndexEvent(1); + SCOPE_EXIT({ kvstore->addReadIndexEvent(-1); }); + if (!tmt.checkRunning()) + { + make_default_batch_read_index_result(true); + return; + } + + if (!kvstore->getProxyHelper()) + { + // Only in mock test, `proxy_helper` will be `nullptr`. Set `read_index` to 0 and skip waiting. + make_default_batch_read_index_result(false); + return; + } + + /// Blocking learner read. Note that learner read must be performed ahead of data read, + /// otherwise the desired index will be blocked by the lock of data read. + auto res = kvstore->batchReadIndex(batch_read_index_req, timeout_ms); + for (auto && [resp, region_id] : res) + { + batch_read_index_result.emplace(region_id, std::move(resp)); + } +} + +void LearnerReadWorker::recordReadIndexError( + const LearnerReadSnapshot & regions_snapshot, + RegionsReadIndexResult & read_index_result) +{ + // if size of batch_read_index_result is not equal with batch_read_index_req, there must be region_error/lock, find and return directly. + for (auto & [region_id, resp] : read_index_result) + { + std::string extra_msg; + if (resp.has_region_error()) + { + const auto & region_error = resp.region_error(); + auto region_status = RegionException::RegionReadStatus::OTHER; + if (region_error.has_epoch_not_match()) + { + auto snapshot_region_iter = regions_snapshot.find(region_id); + if (snapshot_region_iter != regions_snapshot.end()) + { + extra_msg = fmt::format( + "read_index_resp error, region_id={} version={} conf_version={}", + region_id, + snapshot_region_iter->second.create_time_version, + snapshot_region_iter->second.create_time_conf_ver); + } + else + { + extra_msg = fmt::format("read_index_resp error, region_id={} not found in snapshot", region_id); + } + GET_METRIC(tiflash_raft_learner_read_failures_count, type_epoch_not_match).Increment(); + region_status = RegionException::RegionReadStatus::EPOCH_NOT_MATCH; + } + else if (region_error.has_not_leader()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_not_leader).Increment(); + region_status = RegionException::RegionReadStatus::NOT_LEADER; + } + else if (region_error.has_region_not_found()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_not_found_tikv).Increment(); + region_status = RegionException::RegionReadStatus::NOT_FOUND_TIKV; + } + // Below errors seldomly happens in raftstore-v1, however, we are not sure if they will happen in v2. + else if (region_error.has_flashbackinprogress() || region_error.has_flashbacknotprepared()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_flashback).Increment(); + region_status = RegionException::RegionReadStatus::FLASHBACK; + } + else if (region_error.has_bucket_version_not_match()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_bucket_epoch_not_match).Increment(); + LOG_DEBUG( + log, + "meet abnormal region error {}, [region_id={}]", + resp.region_error().DebugString(), + region_id); + region_status = RegionException::RegionReadStatus::BUCKET_EPOCH_NOT_MATCH; + } + else if (region_error.has_key_not_in_region()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_key_not_in_region).Increment(); + LOG_DEBUG( + log, + "meet abnormal region error {}, [region_id={}]", + resp.region_error().DebugString(), + region_id); + region_status = RegionException::RegionReadStatus::KEY_NOT_IN_REGION; + } + else if ( + region_error.has_server_is_busy() || region_error.has_raft_entry_too_large() + || region_error.has_region_not_initialized() || region_error.has_disk_full() + || region_error.has_read_index_not_ready() || region_error.has_proposal_in_merging_mode()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_tikv_server_issue).Increment(); + LOG_DEBUG(log, "meet abnormal region error {}, [region_id={}]", resp.ShortDebugString(), region_id); + region_status = RegionException::RegionReadStatus::TIKV_SERVER_ISSUE; + } + else + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_other).Increment(); + LOG_DEBUG(log, "meet abnormal region error {}, [region_id={}]", resp.ShortDebugString(), region_id); + } + unavailable_regions.addStatus(region_id, region_status, std::move(extra_msg)); + } + else if (resp.has_locked()) + { + GET_METRIC(tiflash_raft_learner_read_failures_count, type_tikv_lock).Increment(); + unavailable_regions.addRegionLock(region_id, LockInfoPtr(resp.release_locked())); + } + else + { + // cache read-index to avoid useless overhead about retry. + // resp.read_index() is 0 when stale read, skip it to avoid overwriting read_index res from the last retry. + if (resp.read_index() != 0) + { + mvcc_query_info.addReadIndexResToCache(region_id, resp.read_index()); + } + } + } +} + +RegionsReadIndexResult LearnerReadWorker::readIndex( + const LearnerReadSnapshot & regions_snapshot, + UInt64 timeout_ms, + Stopwatch & watch) +{ + RegionsReadIndexResult batch_read_index_result; + const auto batch_read_index_req + = buildBatchReadIndexReq(tmt.getRegionTable(), regions_snapshot, batch_read_index_result); + + GET_METRIC(tiflash_stale_read_count).Increment(stats.num_stale_read); + GET_METRIC(tiflash_raft_read_index_count).Increment(batch_read_index_req.size()); + + doBatchReadIndex(batch_read_index_req, timeout_ms, batch_read_index_result); + + stats.read_index_elapsed_ms = watch.elapsedMilliseconds(); + GET_METRIC(tiflash_raft_read_index_duration_seconds).Observe(stats.read_index_elapsed_ms / 1000.0); + recordReadIndexError(regions_snapshot, batch_read_index_result); + + const auto log_lvl = unavailable_regions.empty() ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION; + LOG_IMPL( + log, + log_lvl, + "[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} " + "num_unavailable={} " + "cost={}ms", + stats.num_regions, + stats.num_read_index_request, + stats.num_stale_read, + stats.num_cached_read_index, + unavailable_regions.size(), + stats.read_index_elapsed_ms); + + return batch_read_index_result; +} + +void LearnerReadWorker::waitIndex( + const LearnerReadSnapshot & regions_snapshot, + RegionsReadIndexResult & batch_read_index_result, + const UInt64 timeout_ms, + Stopwatch & watch) +{ + const auto & regions_info = mvcc_query_info.regions_query_info; + for (const auto & region_to_query : regions_info) + { + // if region is unavailable, skip wait index. + if (unavailable_regions.contains(region_to_query.region_id)) + continue; + + const auto & region = regions_snapshot.find(region_to_query.region_id)->second; + + const auto total_wait_index_elapsed_ms = watch.elapsedMilliseconds(); + const auto index_to_wait = batch_read_index_result.find(region_to_query.region_id)->second.read_index(); + if (timeout_ms != 0 && total_wait_index_elapsed_ms > timeout_ms) + { + // Wait index timeout is enabled && timeout happens, simply check all Regions' applied index + // instead of wait index for Regions one by one. + if (!region->checkIndex(index_to_wait)) + { + auto current = region->appliedIndex(); + unavailable_regions.addRegionWaitIndexTimeout(region_to_query.region_id, index_to_wait, current); + } + continue; // timeout happens, check next region quickly + } + + // Wait index timeout is disabled; or timeout is enabled but not happen yet, wait index for + // a specify Region. + const auto [wait_res, time_cost] = region->waitIndex( + index_to_wait, + timeout_ms, + [this]() { return tmt.checkRunning(); }, + log); + if (wait_res != WaitIndexStatus::Finished) + { + auto current = region->appliedIndex(); + unavailable_regions.addRegionWaitIndexTimeout(region_to_query.region_id, index_to_wait, current); + continue; // error or timeout happens, check next region quickly + } + + if (time_cost > 0) + { + // Only record information if wait-index does happen + GET_METRIC(tiflash_raft_wait_index_duration_seconds).Observe(time_cost); + } + + if (unlikely(!mvcc_query_info.resolve_locks)) + { + continue; + } + + // Try to resolve locks and flush data into storage layer + const auto & physical_table_id = region_to_query.physical_table_id; + auto res = RegionTable::resolveLocksAndWriteRegion( + tmt, + physical_table_id, + region, + mvcc_query_info.read_tso, + region_to_query.bypass_lock_ts, + region_to_query.version, + region_to_query.conf_version, + log); + + std::visit( + variant_op::overloaded{ + [&](LockInfoPtr & lock) { unavailable_regions.addRegionLock(region->id(), std::move(lock)); }, + [&](RegionException::RegionReadStatus & status) { + if (status != RegionException::RegionReadStatus::OK) + { + LOG_WARNING( + log, + "Check memory cache, region_id={} version={} handle_range={} status={}", + region_to_query.region_id, + region_to_query.version, + RecordKVFormat::DecodedTiKVKeyRangeToDebugString(region_to_query.range_in_table), + magic_enum::enum_name(status)); + unavailable_regions.addStatus(region->id(), status, "resolveLock"); + } + }, + }, + res); + } // wait index for next region + + stats.wait_index_elapsed_ms = watch.elapsedMilliseconds(); + const auto log_lvl = unavailable_regions.empty() ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION; + LOG_IMPL( + log, + log_lvl, + "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}", + stats.wait_index_elapsed_ms, + stats.num_regions, + unavailable_regions.size()); +} + +std::tuple // +LearnerReadWorker::waitUntilDataAvailable( + const LearnerReadSnapshot & regions_snapshot, + UInt64 read_index_timeout_ms, + UInt64 wait_index_timeout_ms) +{ + const auto start_time = Clock::now(); + + Stopwatch watch; + RegionsReadIndexResult batch_read_index_result = readIndex(regions_snapshot, read_index_timeout_ms, watch); + watch.restart(); // restart to count the elapsed of wait index + waitIndex(regions_snapshot, batch_read_index_result, wait_index_timeout_ms, watch); + + const auto end_time = Clock::now(); + const auto time_elapsed_ms = std::chrono::duration_cast(end_time - start_time).count(); + GET_METRIC(tiflash_syncing_data_freshness).Observe(time_elapsed_ms / 1000.0); // For DBaaS SLI + + // TODO should we try throw immediately after readIndex? + // Throw Region exception if there are any unavailable regions, the exception will be handled in the + // following methods + // - `CoprocessorHandler::execute` + // - `FlashService::EstablishDisaggTask` + // - `DAGDriver::execute` + // - `DAGStorageInterpreter::doBatchCopLearnerRead` + // - `DAGStorageInterpreter::buildLocalStreamsForPhysicalTable` + // - `DAGStorageInterpreter::buildLocalExecForPhysicalTable` + unavailable_regions.tryThrowRegionException(); + + // Use info level if read wait index run slow or any unavailable region exists + const auto log_lvl = (time_elapsed_ms > 1000 || !unavailable_regions.empty()) // + ? Poco::Message::PRIO_INFORMATION + : Poco::Message::PRIO_DEBUG; + LOG_IMPL( + log, + log_lvl, + "[Learner Read] batch read index | wait index" + " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}", + time_elapsed_ms, + stats.read_index_elapsed_ms, + stats.wait_index_elapsed_ms, + stats.num_regions, + stats.num_stale_read, + unavailable_regions.size()); + return {start_time, end_time}; +} + +} // namespace DB diff --git a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp new file mode 100644 index 00000000000..1f57eed56cb --- /dev/null +++ b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp @@ -0,0 +1,319 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace tests +{ +TEST(AsyncTasksTest, AsyncTasksNormal) +{ + using namespace std::chrono_literals; + using TestAsyncTasks = AsyncTasks, void>; + + auto log = DB::Logger::get(); + LOG_INFO(log, "Cancel and addTask"); + // Cancel and addTask + { + auto async_tasks = std::make_unique(1, 1, 2); + auto m = std::make_shared(); + auto m2 = std::make_shared(); + int flag = 0; + std::unique_lock cl(*m); + async_tasks->addTask(1, [m, &flag, &async_tasks, &m2]() { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1); + std::scoped_lock rl2(*m2); + std::scoped_lock rl(*m); + if (cancel_handle->isCanceled()) + { + return; + } + flag = 1; + }); + async_tasks->asyncCancelTask(1); + ASSERT_FALSE(async_tasks->isScheduled(1)); + async_tasks->addTask(1, [&flag]() { flag = 2; }); + cl.unlock(); + std::scoped_lock rl2(*m2); + ASSERT_NO_THROW(async_tasks->fetchResult(1)); + ASSERT_EQ(flag, 2); + } + + // Lifetime of tasks + LOG_INFO(log, "Lifetime of tasks"); + { + auto async_tasks = std::make_unique(1, 1, 1); + auto sp_after_sched = SyncPointCtl::enableInScope("after_AsyncTasks::addTask_scheduled"); + auto sp_before_quit = SyncPointCtl::enableInScope("before_AsyncTasks::addTask_quit"); + std::thread t1([&]() { + sp_after_sched.waitAndPause(); + ASSERT_EQ(async_tasks->unsafeQueryState(1), TestAsyncTasks::TaskState::NotScheduled); + sp_after_sched.next(); + sp_after_sched.disable(); + }); + std::thread t2([&]() { + sp_before_quit.waitAndPause(); + ASSERT_EQ(async_tasks->unsafeQueryState(1), TestAsyncTasks::TaskState::InQueue); + sp_before_quit.next(); + sp_before_quit.disable(); + std::this_thread::sleep_for(50ms); + ASSERT_TRUE(async_tasks->isReady(1)); + }); + auto res = async_tasks->addTask(1, []() {}); + ASSERT_TRUE(res); + t1.join(); + t2.join(); + } + + // Cancel in queue + LOG_INFO(log, "Cancel in queue"); + { + auto async_tasks = std::make_unique(1, 1, 2); + bool finished = false; + bool canceled = false; + std::mutex mtx; + std::unique_lock cl(mtx); + + auto res1 = async_tasks->addTask(1, [&]() { + std::scoped_lock rl(mtx); + UNUSED(rl); + }); + ASSERT_TRUE(res1); + + auto res2 = async_tasks->addTaskWithCancel( + 2, + [&]() { finished = true; }, + [&]() { canceled = true; }); + ASSERT_TRUE(res2); + + async_tasks->asyncCancelTask(2); + cl.unlock(); + + int elapsed = 0; + while (true) + { + if (canceled) + { + break; + } + ++elapsed; + std::this_thread::sleep_for(50ms); + } + ASSERT_TRUE(elapsed < 10); + ASSERT_FALSE(finished); + } + + // Block cancel + LOG_INFO(log, "Block cancel"); + { + auto async_tasks = std::make_unique(2, 2, 10); + int total = 9; + int finished = 0; + std::vector f(total, false); + for (int i = 0; i < total; i++) + { + auto res = async_tasks->addTask(i, [i, &async_tasks, &finished, log]() { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + while (true) + { + std::this_thread::sleep_for(100ms); + if (cancel_handle->isCanceled()) + { + break; + } + } + finished += 1; + }); + // Ensure thread 1 is the first + if (i == 0) + std::this_thread::sleep_for(10ms); + ASSERT_TRUE(res); + } + + while (finished < total) + { + std::this_thread::sleep_for(100ms); + for (int i = 0; i < total; i++) + { + if (f[i]) + continue; + if (async_tasks->blockedCancelRunningTask(i) == AsyncTaskHelper::TaskState::InQueue) + { + // Cancel in queue, should manually add `finished`. + finished += 1; + } + f[i] = true; + break; + } + } + + for (int i = 0; i < total; i++) + { + ASSERT_TRUE(f[i]); + } + ASSERT_EQ(async_tasks->count(), 0); + } + + // Cancel tasks in queue + LOG_INFO(log, "Cancel tasks in queue"); + { + auto async_tasks = std::make_unique(1, 1, 100); + + int total = 7; + std::atomic_int finished = 0; + for (int i = 0; i < total; i++) + { + auto res = async_tasks->addTaskWithCancel( + i, + [i, &async_tasks, &finished]() { + while (true) + { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + // Busy loop to take over cpu + if (cancel_handle->isCanceled()) + { + break; + } + } + finished.fetch_add(1); + }, + [&]() { finished.fetch_add(1); }); + // Ensure task 1 is the first to handle + if (i == 0) + std::this_thread::sleep_for(10ms); + ASSERT_TRUE(res); + } + + for (int i = 0; i < total; i++) + { + std::this_thread::sleep_for(100ms); + async_tasks->asyncCancelTask(i); + // Throw on double cancel + EXPECT_THROW(async_tasks->asyncCancelTask(i), Exception); + } + + int elapsed = 0; + while (true) + { + if (finished >= total) + { + break; + } + ++elapsed; + std::this_thread::sleep_for(100ms); + } + ASSERT_TRUE(elapsed < 50); + ASSERT_EQ(async_tasks->count(), 0); + } +} + +TEST(AsyncTasksTest, AsyncTasksCommon) +try +{ + using namespace std::chrono_literals; + + using TestAsyncTasks = AsyncTasks, int>; + auto async_tasks = std::make_unique(1, 1, 2); + + int total = 5; + int max_steps = 10; + int current_step = 0; + std::vector f(total, false); + std::vector s(total, false); + bool initial_loop = true; + while (true) + { + ASSERT(current_step < max_steps); + auto count = std::accumulate(f.begin(), f.end(), 0, [&](int a, bool b) -> int { return a + int(b); }); + if (count >= total) + { + break; + } + + auto to_be_canceled = total - 1; + + if (s[to_be_canceled] && !f[to_be_canceled]) + { + auto state = async_tasks->queryState(to_be_canceled); + RUNTIME_CHECK(state == TestAsyncTasks::TaskState::InQueue || state == TestAsyncTasks::TaskState::Running); + async_tasks->asyncCancelTask( + to_be_canceled, + []() {}, + true); + f[to_be_canceled] = true; + ASSERT_EQ(async_tasks->queryState(to_be_canceled), TestAsyncTasks::TaskState::NotScheduled); + ASSERT_EQ(f[to_be_canceled], true); + ASSERT_EQ(s[to_be_canceled], true); + } + + // Add tasks + for (int i = 0; i < total; ++i) + { + if (!s[i]) + { + auto res = async_tasks->addTask(i, [i, &async_tasks, to_be_canceled, &f]() { + if (i == to_be_canceled) + { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + while (true) + { + if (cancel_handle->blockedWaitFor(100ms)) + { + f[to_be_canceled] = true; + break; + } + } + } + else + { + std::this_thread::sleep_for(100ms); + } + return 1; + }); + if (res) + { + s[i] = true; + } + // In the first loop, only the first task can run. + if (initial_loop) + ASSERT_EQ(res, i <= 1); + } + } + + // Fetch result + for (int i = 0; i < total; ++i) + { + if (!f[i]) + { + if (async_tasks->isReady(i)) + { + [[maybe_unused]] auto r = async_tasks->fetchResult(i); + f[i] = true; + } + } + } + initial_loop = false; + std::this_thread::sleep_for(100ms); + current_step++; + } + + ASSERT_EQ(async_tasks->count(), 0); +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file