diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 8739c5bfca0..c05b0308ecd 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -29,6 +29,10 @@ endif () find_package (Threads) +if (ENABLE_TESTS) + add_definitions(-DFIU_ENABLE) +endif() + include_directories (src) add_subdirectory (src) @@ -78,6 +82,7 @@ add_headers_only(dbms src/Server) list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD}) list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON}) +list (APPEND clickhouse_common_io_headers ${fiu_include_dirs}) list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp) list (APPEND dbms_headers src/Functions/IFunction.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h) @@ -157,6 +162,7 @@ target_link_libraries (clickhouse_common_io ${EXECINFO_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${CMAKE_DL_LIBS} + fiu prometheus-cpp::core prometheus-cpp::push prometheus-cpp::pull @@ -254,9 +260,6 @@ if (NOT USE_INTERNAL_ZSTD_LIBRARY) target_include_directories (dbms BEFORE PRIVATE ${ZSTD_INCLUDE_DIR}) endif () -#use libfiu -target_include_directories(dbms PUBLIC ${fiu_include_dirs}) - target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR}) target_include_directories (clickhouse_common_io PUBLIC ${DBMS_INCLUDE_DIR}) target_include_directories (clickhouse_common_io PUBLIC ${PCG_RANDOM_INCLUDE_DIR}) @@ -271,8 +274,6 @@ add_subdirectory (tests) if (ENABLE_TESTS) include (${ClickHouse_SOURCE_DIR}/cmake/find_gtest.cmake) - add_definitions(-DFIU_ENABLE) - if (USE_INTERNAL_GTEST_LIBRARY) # Google Test from sources add_subdirectory(${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp new file mode 100644 index 00000000000..c49538e58d4 --- /dev/null +++ b/dbms/src/Common/FailPoint.cpp @@ -0,0 +1,105 @@ +#include + +#include +#include +#include + +namespace DB +{ +std::unordered_map> FailPointHelper::fail_point_wait_channels; + +#define APPLY_FOR_FAILPOINTS(M) \ + M(exception_between_drop_meta_and_data) \ + M(exception_between_alter_data_and_meta) \ + M(exception_drop_table_during_remove_meta) \ + M(exception_between_rename_table_data_and_metadata); \ + M(exception_between_create_database_meta_and_directory); \ + M(exception_before_rename_table_old_meta_removed); \ + M(region_exception_after_read_from_storage_some_error) \ + M(region_exception_after_read_from_storage_all_error) + +#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) M(pause_after_learner_read) + +namespace FailPoints +{ +#define M(NAME) extern const char NAME[] = #NAME ""; +APPLY_FOR_FAILPOINTS(M) +APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) +#undef M +} // namespace FailPoints + +#ifdef FIU_ENABLE +class FailPointChannel : private boost::noncopyable +{ +public: + // wake up all waiting threads when destroy + ~FailPointChannel() { cv.notify_all(); } + + void wait() + { + std::unique_lock lock(m); + cv.wait(lock); + } + +private: + std::mutex m; + std::condition_variable cv; +}; + +void FailPointHelper::enableFailPoint(const String & fail_point_name) +{ +#define M(NAME) \ + if (fail_point_name == FailPoints::NAME) \ + { \ + /* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \ + fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \ + return; \ + } + + APPLY_FOR_FAILPOINTS(M) +#undef M + +#define M(NAME) \ + if (fail_point_name == FailPoints::NAME) \ + { \ + /* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \ + fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \ + fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared()); \ + return; \ + } + + APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) +#undef M + throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR); +} + +void FailPointHelper::disableFailPoint(const String & fail_point_name) +{ + if (auto iter = fail_point_wait_channels.find(fail_point_name); iter != fail_point_wait_channels.end()) + fail_point_wait_channels.erase(iter); + fiu_disable(fail_point_name.c_str()); +} + +void FailPointHelper::wait(const String & fail_point_name) +{ + if (auto iter = fail_point_wait_channels.find(fail_point_name); iter == fail_point_wait_channels.end()) + throw Exception("Can not find channel for fail point" + fail_point_name); + else + { + auto ptr = iter->second; + ptr->wait(); + } +} +#else +class FailPointChannel +{ +}; + +void FailPointHelper::enableFailPoint(const String & fail_point_name) {} + +void FailPointHelper::disableFailPoint(const String & fail_point_name) {} + +void FailPointHelper::wait(const String & fail_point_name) {} +#endif + +} // namespace DB diff --git a/dbms/src/Common/FailPoint.h b/dbms/src/Common/FailPoint.h index 6b39b7a9515..516575f15d8 100644 --- a/dbms/src/Common/FailPoint.h +++ b/dbms/src/Common/FailPoint.h @@ -5,6 +5,8 @@ #include #include +#include + namespace DB { @@ -13,36 +15,25 @@ namespace ErrorCodes extern const int FAIL_POINT_ERROR; }; -#define FAIL_POINT_REGISTER(name) static constexpr char name[] = #name ""; - -#define FAIL_POINT_ENABLE(trigger, name) \ - else if (trigger == name) { fiu_enable(name, 1, nullptr, FIU_ONETIME); } - -FAIL_POINT_REGISTER(exception_between_drop_meta_and_data) -FAIL_POINT_REGISTER(exception_between_alter_data_and_meta) -FAIL_POINT_REGISTER(exception_drop_table_during_remove_meta) -FAIL_POINT_REGISTER(exception_between_rename_table_data_and_metadata); -FAIL_POINT_REGISTER(exception_between_create_database_meta_and_directory); -FAIL_POINT_REGISTER(exception_before_rename_table_old_meta_removed); - +/// Macros to set failpoints. +// When `fail_point` is enabled, throw an exception #define FAIL_POINT_TRIGGER_EXCEPTION(fail_point) \ fiu_do_on(fail_point, throw Exception("Fail point " #fail_point " is triggered.", ErrorCodes::FAIL_POINT_ERROR);) +// When `fail_point` is enabled, wait till it is disabled +#define FAIL_POINT_PAUSE(fail_point) fiu_do_on(fail_point, FailPointHelper::wait(fail_point);) + +class FailPointChannel; class FailPointHelper { public: - static void enableFailPoint(const String & fail_point_name) - { - if (false) {} - FAIL_POINT_ENABLE(fail_point_name, exception_between_alter_data_and_meta) - FAIL_POINT_ENABLE(fail_point_name, exception_between_drop_meta_and_data) - FAIL_POINT_ENABLE(fail_point_name, exception_drop_table_during_remove_meta) - FAIL_POINT_ENABLE(fail_point_name, exception_between_rename_table_data_and_metadata) - FAIL_POINT_ENABLE(fail_point_name, exception_between_create_database_meta_and_directory) - FAIL_POINT_ENABLE(fail_point_name, exception_before_rename_table_old_meta_removed) - else throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR); - } - - static void disableFailPoint(const String & fail_point_name) { fiu_disable(fail_point_name.c_str()); } + static void enableFailPoint(const String & fail_point_name); + + static void disableFailPoint(const String & fail_point_name); + + static void wait(const String & fail_point_name); + +private: + static std::unordered_map> fail_point_wait_channels; }; } // namespace DB diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index 9a1f1e32df6..bb3220cfd9e 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -34,6 +34,12 @@ namespace ErrorCodes extern const int SYNTAX_ERROR; } +namespace FailPoints +{ +extern const char exception_drop_table_during_remove_meta[]; +extern const char exception_between_rename_table_data_and_metadata[]; +} + static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256; static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5; static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768; @@ -205,7 +211,7 @@ void DatabaseOrdinary::removeTable( // If tiflash crash before remove metadata, next time it restart, will // full apply schema from TiDB. And the old table's metadata and data // will be removed. - FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_drop_table_during_remove_meta); Poco::File(table_metadata_path).remove(); } catch (...) @@ -249,7 +255,7 @@ void DatabaseOrdinary::renameTable( } // TODO: Atomic rename table is not fixed. - FAIL_POINT_TRIGGER_EXCEPTION(exception_between_rename_table_data_and_metadata); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_rename_table_data_and_metadata); ASTPtr ast = DatabaseLoading::getQueryFromMetadata(context, detail::getTableMetadataPath(metadata_path, table_name)); if (!ast) diff --git a/dbms/src/Databases/DatabaseTiFlash.cpp b/dbms/src/Databases/DatabaseTiFlash.cpp index d15ffa46eee..94ad211f2a3 100644 --- a/dbms/src/Databases/DatabaseTiFlash.cpp +++ b/dbms/src/Databases/DatabaseTiFlash.cpp @@ -32,6 +32,12 @@ extern const int CANNOT_GET_CREATE_TABLE_QUERY; extern const int SYNTAX_ERROR; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char exception_drop_table_during_remove_meta[]; +extern const char exception_before_rename_table_old_meta_removed[]; +} + static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768; DatabaseTiFlash::DatabaseTiFlash( @@ -178,8 +184,8 @@ void DatabaseTiFlash::createTable(const Context & context, const String & table_ /// If it was ATTACH query and file with table metadata already exist /// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one. - context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), - table_metadata_path, EncryptionPath(table_metadata_path, "")); + context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), table_metadata_path, + EncryptionPath(table_metadata_path, "")); } catch (...) { @@ -198,7 +204,7 @@ void DatabaseTiFlash::removeTable(const Context & context, const String & table_ // full apply schema from TiDB. And the old table's metadata and data // will be removed. String table_metadata_path = getTableMetadataPath(table_name); - FAIL_POINT_TRIGGER_EXCEPTION(exception_drop_table_during_remove_meta); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_drop_table_during_remove_meta); context.getFileProvider()->deleteRegularFile(table_metadata_path, EncryptionPath(table_metadata_path, "")); } catch (...) @@ -273,10 +279,11 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_ // then we cannot rename the encryption info and the file in an atomic operation. bool use_target_encrypt_info = context.getFileProvider()->isFileEncrypted(EncryptionPath(new_tbl_meta_file, "")); { - EncryptionPath encryption_path = use_target_encrypt_info ? EncryptionPath(new_tbl_meta_file, "") : EncryptionPath(new_tbl_meta_file_tmp, ""); + EncryptionPath encryption_path + = use_target_encrypt_info ? EncryptionPath(new_tbl_meta_file, "") : EncryptionPath(new_tbl_meta_file_tmp, ""); bool create_new_encryption_info = !use_target_encrypt_info && statement.size(); - WriteBufferFromFileProvider out(context.getFileProvider(), new_tbl_meta_file_tmp, encryption_path, - create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL); + WriteBufferFromFileProvider out( + context.getFileProvider(), new_tbl_meta_file_tmp, encryption_path, create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL); writeString(statement, out); out.next(); if (context.getSettingsRef().fsync_metadata) @@ -293,8 +300,8 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_ } else { - context.getFileProvider()->renameFile(new_tbl_meta_file_tmp, EncryptionPath(new_tbl_meta_file_tmp, ""), - new_tbl_meta_file, EncryptionPath(new_tbl_meta_file, "")); + context.getFileProvider()->renameFile(new_tbl_meta_file_tmp, EncryptionPath(new_tbl_meta_file_tmp, ""), new_tbl_meta_file, + EncryptionPath(new_tbl_meta_file, "")); } } catch (...) @@ -306,7 +313,7 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_ throw; } - FAIL_POINT_TRIGGER_EXCEPTION(exception_before_rename_table_old_meta_removed); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_rename_table_old_meta_removed); // If only display name updated, don't remove `old_tbl_meta_file`. if (!isSamePath(old_tbl_meta_file, new_tbl_meta_file)) @@ -314,7 +321,8 @@ void DatabaseTiFlash::renameTable(const Context & context, const String & table_ // If process crash before removing old table meta file, we will continue or rollback this // rename command next time `loadTables` is called. See `loadTables` and // `DatabaseLoading::startupTables` for more details. - context.getFileProvider()->deleteRegularFile(old_tbl_meta_file, EncryptionPath(old_tbl_meta_file, ""));// Then remove old meta file + context.getFileProvider()->deleteRegularFile( + old_tbl_meta_file, EncryptionPath(old_tbl_meta_file, "")); // Then remove old meta file } } @@ -365,10 +373,11 @@ void DatabaseTiFlash::alterTable( // refer to the comment in `renameTable` bool use_target_encrypt_info = context.getFileProvider()->isFileEncrypted(EncryptionPath(table_metadata_path, "")); { - EncryptionPath encryption_path = use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, ""); + EncryptionPath encryption_path + = use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, ""); bool create_new_encryption_info = !use_target_encrypt_info && statement.size(); - WriteBufferFromFileProvider out(context.getFileProvider(), table_metadata_tmp_path, encryption_path, - create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL); + WriteBufferFromFileProvider out( + context.getFileProvider(), table_metadata_tmp_path, encryption_path, create_new_encryption_info, O_WRONLY | O_CREAT | O_EXCL); writeString(statement, out); out.next(); if (context.getSettingsRef().fsync_metadata) @@ -385,16 +394,15 @@ void DatabaseTiFlash::alterTable( } else { - context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), - table_metadata_path, EncryptionPath(table_metadata_path, "")); + context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), table_metadata_path, + EncryptionPath(table_metadata_path, "")); } } catch (...) { if (!use_target_encrypt_info) { - context.getFileProvider()->deleteRegularFile(table_metadata_tmp_path, - EncryptionPath(table_metadata_tmp_path, "")); + context.getFileProvider()->deleteRegularFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, "")); } throw; } @@ -482,8 +490,8 @@ void DatabaseTiFlash::drop(const Context & context) // Remove meta file for this database if (auto meta_file = Poco::File(getDatabaseMetadataPath(getMetadataPath())); meta_file.exists()) { - context.getFileProvider()->deleteRegularFile(getDatabaseMetadataPath(getMetadataPath()), - EncryptionPath(getDatabaseMetadataPath(getMetadataPath()), "")); + context.getFileProvider()->deleteRegularFile( + getDatabaseMetadataPath(getMetadataPath()), EncryptionPath(getDatabaseMetadataPath(getMetadataPath()), "")); } } diff --git a/dbms/src/Databases/test/gtest_database.cpp b/dbms/src/Databases/test/gtest_database.cpp index 71b99358513..397715684fb 100644 --- a/dbms/src/Databases/test/gtest_database.cpp +++ b/dbms/src/Databases/test/gtest_database.cpp @@ -25,6 +25,11 @@ namespace ErrorCodes extern const int SYNTAX_ERROR; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char exception_before_rename_table_old_meta_removed[]; +} + namespace tests { @@ -469,7 +474,7 @@ try const String to_tbl_name = "t_112"; // Rename table to another database, and mock crash by failed point - FailPointHelper::enableFailPoint("exception_before_rename_table_old_meta_removed"); + FailPointHelper::enableFailPoint(FailPoints::exception_before_rename_table_old_meta_removed); ASSERT_THROW( typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name), DB::Exception); diff --git a/dbms/src/Debug/dbgFuncFailPoint.h b/dbms/src/Debug/dbgFuncFailPoint.h index fb07c611147..52e5b7c1009 100644 --- a/dbms/src/Debug/dbgFuncFailPoint.h +++ b/dbms/src/Debug/dbgFuncFailPoint.h @@ -8,10 +8,14 @@ namespace DB struct DbgFailPointFunc { - static void dbgEnableFailPoint(Context & context, const ASTs & args, DBGInvoker::Printer output); - + // Init fail point. must be called if you want to enable / disable failpoints + // DBGInvoke init_fail_point() static void dbgInitFailPoint(Context & context, const ASTs & args, DBGInvoker::Printer output); + // Enable fail point. + // DBGInvoke enable_fail_point(name) + static void dbgEnableFailPoint(Context & context, const ASTs & args, DBGInvoker::Printer output); + // Disable fail point. // Usage: // ./stoage-client.sh "DBGInvoke disable_fail_point(name)" diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 6e5b6c0af43..8ef71db22b3 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -52,6 +53,13 @@ extern const int COP_BAD_DAG_REQUEST; extern const int NO_COMMON_TYPE; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char region_exception_after_read_from_storage_some_error[]; +extern const char region_exception_after_read_from_storage_all_error[]; +extern const char pause_after_learner_read[]; +} // namespace FailPoints + DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(Context & context_, const std::vector & input_streams_vec_, const DAGQueryBlock & query_block_, bool keep_session_timezone_info_, const tipb::DAGRequest & rqst_, ASTPtr dummy_query_, const DAGQuerySource & dag_, std::vector & subqueriesForSets_) @@ -455,7 +463,6 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & } size_t max_block_size = settings.max_block_size; - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; if (query_block.selection) { @@ -471,28 +478,14 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & query_info.dag_query = std::make_unique(conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns()); query_info.mvcc_query_info = std::move(mvcc_query_info); + FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read); bool need_local_read = !query_info.mvcc_query_info->regions_query_info.empty(); if (need_local_read) { - // TODO: Note that if storage is (Txn)MergeTree, and any region exception thrown, we won't do retry here. - // Now we only support DeltaTree in production environment and don't do any extra check for storage type here. - try - { - pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); - // After getting streams from storage, we need to validate whether regions have changed or not after learner read. - // In case the versions of regions have changed, those `streams` may contain different data other than expected. - // Like after region merge/split. - validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); - } - catch (DB::Exception & e) - { - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ")"); - throw; - } + readFromLocalStorage(table_id, required_columns, query_info, max_block_size, learner_read_snapshot, pipeline, region_retry); } - // For those regions which are not presented in this tiflash node, we will try to fetch streams from other tiflash nodes, only happens in batch cop mode. + // For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop mode. if (!region_retry.empty()) { LOG_DEBUG(log, ({ @@ -605,6 +598,127 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & } } +void DAGQueryBlockInterpreter::readFromLocalStorage( // + const TableID table_id, const Names & required_columns, SelectQueryInfo & query_info, const size_t max_block_size, + const LearnerReadSnapshot & learner_read_snapshot, // + Pipeline & pipeline, std::unordered_map & region_retry) +{ + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + auto & tmt = context.getTMTContext(); + // TODO: Note that if storage is (Txn)MergeTree, and any region exception thrown, we won't do retry here. + // Now we only support DeltaTree in production environment and don't do any extra check for storage type here. + + int num_allow_retry = 1; + while (true) + { + try + { + pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); + + // After getting streams from storage, we need to validate whether regions have changed or not after learner read. + // In case the versions of regions have changed, those `streams` may contain different data other than expected. + // Like after region merge/split. + + // Inject failpoint to throw RegionException + fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, { + const auto & regions_info = query_info.mvcc_query_info->regions_query_info; + std::vector region_ids; + for (const auto & info : regions_info) + { + if (rand() % 100 > 50) + region_ids.push_back(info.region_id); + } + throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); + }); + fiu_do_on(FailPoints::region_exception_after_read_from_storage_all_error, { + const auto & regions_info = query_info.mvcc_query_info->regions_query_info; + std::vector region_ids; + for (const auto & info : regions_info) + region_ids.push_back(info.region_id); + throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); + }); + validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); + break; + } + catch (DB::RegionException & e) + { + /// Recover from region exception when super batch is enable + if (dag.isBatchCop()) + { + // clean all streams from local because we are not sure the correctness of those streams + pipeline.streams.clear(); + const auto & dag_regions = dag.getRegions(); + std::stringstream ss; + // Normally there is only few regions need to retry when super batch is enabled. Retry to read + // from local first. However, too many retry in different places may make the whole process + // time out of control. We limit the number of retries to 1 now. + if (likely(num_allow_retry > 0)) + { + --num_allow_retry; + // sort e.region_ids so that we can use lower_bound to find the region happens to error or not + std::sort(e.region_ids.begin(), e.region_ids.end()); + auto & regions_query_info = query_info.mvcc_query_info->regions_query_info; + for (auto iter = regions_query_info.begin(); iter != regions_query_info.end(); /**/) + { + auto error_id_iter = std::lower_bound(e.region_ids.begin(), e.region_ids.end(), iter->region_id); + if (error_id_iter != e.region_ids.end() && *error_id_iter == iter->region_id) + { + // move the error regions info from `query_info.mvcc_query_info->regions_query_info` to `region_retry` + auto region_iter = dag_regions.find(iter->region_id); + if (likely(region_iter != dag_regions.end())) + { + region_retry.emplace(region_iter->first, region_iter->second); + ss << region_iter->first << ","; + } + iter = regions_query_info.erase(iter); + } + else + { + ++iter; + } + } + LOG_WARNING(log, + "RegionException after read from storage, regions [" + << ss.str() << "], message: " << e.message() + << (regions_query_info.empty() ? "" : ", retry to read from local")); + if (unlikely(regions_query_info.empty())) + break; // no available region in local, break retry loop + continue; // continue to retry read from local storage + } + else + { + // push all regions to `region_retry` to retry from other tiflash nodes + for (const auto & region : query_info.mvcc_query_info->regions_query_info) + { + auto iter = dag_regions.find(region.region_id); + if (likely(iter != dag_regions.end())) + { + region_retry.emplace(iter->first, iter->second); + ss << iter->first << ","; + } + } + LOG_WARNING(log, "RegionException after read from storage, regions [" << ss.str() << "], message: " << e.message()); + break; // break retry loop + } + } + else + { + // Throw an exception for TiDB / TiSpark to retry + e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() + + "`, table_id: " + DB::toString(table_id) + ")"); + throw; + } + } + catch (DB::Exception & e) + { + /// Other unknown exceptions + e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() + + "`, table_id: " + DB::toString(table_id) + ")"); + throw; + } + } +} + void DAGQueryBlockInterpreter::prepareJoinKeys(const google::protobuf::RepeatedPtrField & keys, const DataTypes & key_types, Pipeline & pipeline, Names & key_names, bool left, bool is_right_out_join) { diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 93e482b149a..07401bdfad2 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -7,6 +7,7 @@ #pragma GCC diagnostic pop #include +#include #include #include #include @@ -15,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -27,6 +27,9 @@ namespace DB class Context; class Region; using RegionPtr = std::shared_ptr; +struct RegionLearnerReadSnapshot; +using LearnerReadSnapshot = std::unordered_map; +struct SelectQueryInfo; struct Pipeline { @@ -100,6 +103,11 @@ class DAGQueryBlockInterpreter void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, Names & aggregation_keys, TiDB::TiDBCollators & collators, AggregateDescriptions & aggregate_descriptions); void executeFinalProject(Pipeline & pipeline); + + void readFromLocalStorage( // + const TableID table_id, const Names & required_columns, SelectQueryInfo & query_info, const size_t max_block_size, + const LearnerReadSnapshot & learner_read_snapshot, // + Pipeline & pipeline, std::unordered_map & region_retry); void getAndLockStorageWithSchemaVersion(TableID table_id, Int64 schema_version); SortDescription getSortDescription(std::vector & order_columns); AnalysisResult analyzeExpressions(); diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAGHelper.hpp b/dbms/src/Flash/Coprocessor/InterpreterDAGHelper.hpp index a6c9d9e8a0b..c80fd11bc23 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAGHelper.hpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAGHelper.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB { diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 8b866275369..371b46da7f1 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -57,6 +57,11 @@ namespace ErrorCodes extern const int READONLY; } +namespace FailPoints +{ +extern const char exception_between_create_database_meta_and_directory[]; +} + InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) @@ -141,7 +146,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) context.getFileProvider()->renameFile(metadata_file_tmp_path, EncryptionPath(metadata_file_tmp_path, ""), metadata_file_path, EncryptionPath(metadata_file_path, "")); - FAIL_POINT_TRIGGER_EXCEPTION(exception_between_create_database_meta_and_directory); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_create_database_meta_and_directory); // meta file (not temporary) of database exists means create database success, // we need to create meta directory for it if not exists. if (auto db_meta_path = Poco::File(database->getMetadataPath()); !db_meta_path.exists()) diff --git a/dbms/src/Interpreters/InterpreterDropQuery.cpp b/dbms/src/Interpreters/InterpreterDropQuery.cpp index 199961d962f..805da66ba74 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDropQuery.cpp @@ -25,6 +25,10 @@ extern const int READONLY; extern const int FAIL_POINT_ERROR; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char exception_between_drop_meta_and_data[]; +} InterpreterDropQuery::InterpreterDropQuery(const ASTPtr & query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) {} @@ -145,7 +149,7 @@ BlockIO InterpreterDropQuery::execute() storage->removeFromTMTContext(); }); - FAIL_POINT_TRIGGER_EXCEPTION(exception_between_drop_meta_and_data); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_drop_meta_and_data); /// Delete table data table.first->drop(); @@ -156,7 +160,8 @@ BlockIO InterpreterDropQuery::execute() const String database_data_path = database->getDataPath(); if (!database_data_path.empty()) { - String table_data_path = database_data_path + (endsWith(database_data_path, "/") ? "" : "/") + escapeForFileName(current_table_name); + String table_data_path + = database_data_path + (endsWith(database_data_path, "/") ? "" : "/") + escapeForFileName(current_table_name); if (Poco::File(table_data_path).exists()) { diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 6f57f9cd7ad..7fc3f545a95 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -47,6 +47,11 @@ namespace ErrorCodes extern const int FAIL_POINT_ERROR; } +namespace FailPoints +{ +extern const char exception_between_alter_data_and_meta[]; +} + StorageMergeTree::StorageMergeTree(const String & path_, const String & db_engine_, @@ -470,7 +475,7 @@ void StorageMergeTree::alterInternal( // The process of data change and meta change is not atomic, so we must make sure change data firstly // and change meta secondly. If server crashes during or after changing data, we must fix the schema after restart. - FAIL_POINT_TRIGGER_EXCEPTION(exception_between_alter_data_and_meta); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_alter_data_and_meta); context.getDatabase(database_name)->alterTable(context, table_name, new_columns, storage_modifier); setColumns(std::move(new_columns)); diff --git a/dbms/src/Storages/Transaction/LearnerRead.cpp b/dbms/src/Storages/Transaction/LearnerRead.cpp index 72cfca881d1..527e9ac8181 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.cpp +++ b/dbms/src/Storages/Transaction/LearnerRead.cpp @@ -188,12 +188,14 @@ LearnerReadSnapshot doLearnerRead(const TiDB::TableID table_id, // return regions_snapshot; } +/// Ensure regions' info after read. void validateQueryInfo( const MvccQueryInfo & mvcc_query_info, const LearnerReadSnapshot & regions_snapshot, TMTContext & tmt, Poco::Logger * log) { - const auto & regions_query_info = mvcc_query_info.regions_query_info; - /// Ensure regions' info after read. - for (const auto & region_query_info : regions_query_info) + std::vector fail_region_ids; + RegionException::RegionReadStatus fail_status = RegionException::RegionReadStatus::OK; + + for (const auto & region_query_info : mvcc_query_info.regions_query_info) { RegionException::RegionReadStatus status = RegionException::RegionReadStatus::OK; auto region = tmt.getKVStore()->getRegion(region_query_info.region_id); @@ -211,15 +213,20 @@ void validateQueryInfo( if (status != RegionException::RegionReadStatus::OK) { + fail_region_ids.emplace_back(region_query_info.region_id); + fail_status = status; LOG_WARNING(log, "Check after read from Storage, region " << region_query_info.region_id << ", version " << region_query_info.version // << ", handle range [" << region_query_info.range_in_table.first.toString() << ", " << region_query_info.range_in_table.second.toString() << "), status " << RegionException::RegionReadStatusString(status)); - // throw region exception and let TiDB retry - throwRetryRegion(regions_query_info, status); } } + + if (!fail_region_ids.empty()) + { + throw RegionException(std::move(fail_region_ids), fail_status); + } } } // namespace DB diff --git a/tests/fullstack-test/fault-inject/exception_after_read_from_storage.test b/tests/fullstack-test/fault-inject/exception_after_read_from_storage.test new file mode 100644 index 00000000000..d7873333cfc --- /dev/null +++ b/tests/fullstack-test/fault-inject/exception_after_read_from_storage.test @@ -0,0 +1,51 @@ +# Tests for Region meta changed after read from storage +mysql> drop table if exists test.t + +>> DBGInvoke __init_fail_point() + +# Ensure that we can create table for that database +mysql> create table test.t(a int not null, b int not null) +mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc' + +func> wait_table test t + +mysql> insert into test.t values (1, 1),(1, 2) + +# This should retry to read normal Regions in local, and read error Regions from remote +>> DBGInvoke __enable_fail_point(region_exception_after_read_from_storage_some_error) + +mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t; ++---+---+ +| a | b | ++---+---+ +| 1 | 1 | +| 1 | 2 | ++---+---+ + +# This should retry to read error Regions from remote +>> DBGInvoke __enable_fail_point(region_exception_after_read_from_storage_all_error) + +mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t; ++---+---+ +| a | b | ++---+---+ +| 1 | 1 | +| 1 | 2 | ++---+---+ + + +# This should first retry in local, then retry from remote because of "all error" in the retry +>> DBGInvoke __enable_fail_point(region_exception_after_read_from_storage_some_error) +>> DBGInvoke __enable_fail_point(region_exception_after_read_from_storage_all_error) + +mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t; ++---+---+ +| a | b | ++---+---+ +| 1 | 1 | +| 1 | 2 | ++---+---+ + +mysql> drop table if exists test.t +>> DBGInvoke __disable_fail_point(region_exception_after_read_from_storage_some_error) +>> DBGInvoke __disable_fail_point(region_exception_after_read_from_storage_all_error)