diff --git a/dbms/src/Common/UniformRandomIntGenerator.h b/dbms/src/Common/UniformRandomIntGenerator.h new file mode 100644 index 00000000000..85badce66be --- /dev/null +++ b/dbms/src/Common/UniformRandomIntGenerator.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +namespace DB +{ +template +class UniformRandomIntGenerator +{ +public: + /// [min, max] + UniformRandomIntGenerator(IntType min, IntType max) + : dis(std::uniform_int_distribution(min, max)) + , gen(std::default_random_engine(std::random_device{}())) + {} + + IntType rand() + { + return dis(gen); + } + +private: + std::uniform_int_distribution dis; + std::default_random_engine gen; +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 6de5ba0e44c..eca7a93a56f 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -244,16 +243,22 @@ void DAGQueryBlockInterpreter::handleTableScan(const tipb::TableScan & ts, DAGPi if (ts.next_read_engine() != tipb::EngineType::Local) throw TiFlashException("Unsupported remote query.", Errors::Coprocessor::BadRequest); + DAGStorageInterpreter storage_interpreter(context, query_block.source_name, ts, max_streams); // construct pushed down filter conditions. std::vector conditions; if (query_block.selection) { for (const auto & condition : query_block.selection->selection().conditions()) conditions.push_back(&condition); - } - DAGStorageInterpreter storage_interpreter(context, query_block, ts, conditions, max_streams); - storage_interpreter.execute(pipeline); + assert(!conditions.empty()); + assert(!query_block.selection_name.empty()); + storage_interpreter.execute(pipeline, query_block.selection_name, conditions); + } + else + { + storage_interpreter.execute(pipeline); + } analyzer = std::move(storage_interpreter.analyzer); diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 83be0f1f534..e3f8b3930dd 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -54,12 +55,12 @@ MakeRegionQueryInfos( mvcc_info.regions_query_info.clear(); RegionRetryList region_need_retry; RegionException::RegionReadStatus status_res = RegionException::RegionReadStatus::OK; - for (auto & [id, r] : dag_region_infos) + for (const auto & [id, r] : dag_region_infos) { if (r.key_ranges.empty()) { throw TiFlashException( - "Income key ranges is empty for region: " + std::to_string(r.region_id), + fmt::format("Income key ranges is empty for region: {}", r.region_id), Errors::Coprocessor::BadRequest); } if (region_force_retry.count(id)) @@ -92,14 +93,16 @@ MakeRegionQueryInfos( if (!computeMappedTableID(*p.first, table_id_in_range) || table_id_in_range != table_id) { throw TiFlashException( - "Income key ranges is illegal for region: " + std::to_string(r.region_id) - + ", table id in key range is " + std::to_string(table_id_in_range) + ", table id in region is " - + std::to_string(table_id), + fmt::format( + "Income key ranges is illegal for region: {}, table id in key range is {}, table id in region is {}", + r.region_id, + table_id_in_range, + table_id), Errors::Coprocessor::BadRequest); } if (p.first->compare(*info.range_in_table.first) < 0 || p.second->compare(*info.range_in_table.second) > 0) throw TiFlashException( - "Income key ranges is illegal for region: " + std::to_string(r.region_id), + fmt::format("Income key ranges is illegal for region: {}", r.region_id), Errors::Coprocessor::BadRequest); } info.required_handle_ranges = r.key_ranges; @@ -119,49 +122,50 @@ MakeRegionQueryInfos( DAGStorageInterpreter::DAGStorageInterpreter( Context & context_, - const DAGQueryBlock & query_block_, - const tipb::TableScan & ts, - const std::vector & conditions_, + const String & table_scan_executor_id_, + const tipb::TableScan & table_scan_, size_t max_streams_) : context(context_) - , query_block(query_block_) - , table_scan(ts) - , conditions(conditions_) + , table_scan_executor_id(table_scan_executor_id_) + , table_scan(table_scan_) , max_streams(max_streams_) , log(getMPPTaskLog(*context.getDAGContext(), "DAGStorageInterpreter")) - , table_id(ts.table_id()) + , table_id(table_scan_.table_id()) , settings(context.getSettingsRef()) , tmt(context.getTMTContext()) , mvcc_query_info(new MvccQueryInfo(true, settings.read_tso)) { } -void DAGStorageInterpreter::execute(DAGPipeline & pipeline) +void DAGStorageInterpreter::execute(DAGPipeline & pipeline, const String & selection_name, const std::vector & conditions) { + assert(selection_name.empty() == conditions.empty()); + const DAGContext & dag_context = *context.getDAGContext(); - if (dag_context.isBatchCop() || dag_context.isMPPTask()) - learner_read_snapshot = doBatchCopLearnerRead(); - else - learner_read_snapshot = doCopLearnerRead(); + LearnerReadSnapshot learner_read_snapshot = (dag_context.isBatchCop() || dag_context.isMPPTask()) ? doBatchCopLearnerRead() : doCopLearnerRead(); + ManageableStoragePtr storage; std::tie(storage, table_structure_lock) = getAndLockStorage(settings.schema_version); - std::tie(required_columns, source_columns, is_need_add_cast_column, handle_column_name) = getColumnsForTableScan(settings.max_columns_to_read); + Names required_columns; + NamesAndTypes source_columns; + String handle_column_name; + std::tie(required_columns, source_columns, is_need_add_cast_column, handle_column_name) = getColumnsForTableScan(settings.max_columns_to_read, storage); analyzer = std::make_unique(std::move(source_columns), context); FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read); if (!mvcc_query_info->regions_query_info.empty()) - doLocalRead(pipeline, settings.max_block_size); + doLocalRead(pipeline, settings.max_block_size, storage, required_columns, learner_read_snapshot, conditions); - for (auto & region_info : dag_context.getRegionsForRemoteRead()) + for (const auto & region_info : dag_context.getRegionsForRemoteRead()) region_retry.emplace_back(region_info); null_stream_if_empty = std::make_shared(storage->getSampleBlockForColumns(required_columns)); // Should build these vars under protect of `table_structure_lock`. - std::tie(dag_request, dag_schema) = buildRemoteTS(); + std::tie(dag_request, dag_schema) = buildRemoteTS(storage, handle_column_name, selection_name, conditions); } LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead() @@ -227,13 +231,19 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() } catch (DB::Exception & e) { - e.addMessage("(while doing learner read for table, table_id: " + DB::toString(table_id) + ")"); + e.addMessage(fmt::format("(while doing learner read for table, table_id: {})", table_id)); throw; } } } -void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block_size) +void DAGStorageInterpreter::doLocalRead( + DAGPipeline & pipeline, + size_t max_block_size, + const ManageableStoragePtr & storage, + const Names & required_columns, + const LearnerReadSnapshot & learner_read_snapshot, + const std::vector & conditions) { const DAGContext & dag_context = *context.getDAGContext(); SelectQueryInfo query_info; @@ -263,11 +273,12 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block // Inject failpoint to throw RegionException fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, { + thread_local UniformRandomIntGenerator random(0, 99); const auto & regions_info = query_info.mvcc_query_info->regions_query_info; RegionException::UnavailableRegions region_ids; for (const auto & info : regions_info) { - if (rand() % 100 > 50) + if (random.rand() > 50) region_ids.insert(info.region_id); } throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); @@ -344,16 +355,14 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block else { // Throw an exception for TiDB / TiSpark to retry - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ")"); + e.addMessage(fmt::format("(while creating InputStreams from storage `{}`.`{}`, table_id: {})", storage->getDatabaseName(), storage->getTableName(), table_id)); throw; } } catch (DB::Exception & e) { /// Other unknown exceptions - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ")"); + e.addMessage(fmt::format("(while creating InputStreams from storage `{}`.`{}`, table_id: {})", storage->getDatabaseName(), storage->getTableName(), table_id)); throw; } } @@ -364,12 +373,12 @@ std::tuple DAGStorageInterpreter /// Get current schema version in schema syncer for a chance to shortcut. if (unlikely(query_schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)) { - auto storage_ = tmt.getStorages().get(table_id); - if (!storage_) + auto storage = tmt.getStorages().get(table_id); + if (!storage) { - throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists); + throw TiFlashException(fmt::format("Table {} doesn't exist.", table_id), Errors::Table::NotExists); } - return {storage_, storage_->lockStructureForShare(context.getCurrentQueryId())}; + return {storage, storage->lockStructureForShare(context.getCurrentQueryId())}; } auto global_schema_version = tmt.getSchemaSyncer()->getCurrentVersion(); @@ -379,23 +388,23 @@ std::tuple DAGStorageInterpreter auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple { /// Get storage in case it's dropped then re-created. // If schema synced, call getTable without try, leading to exception on table not existing. - auto storage_ = tmt.getStorages().get(table_id); - if (!storage_) + auto storage = tmt.getStorages().get(table_id); + if (!storage) { if (schema_synced) - throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists); + throw TiFlashException(fmt::format("Table {} doesn't exist.", table_id), Errors::Table::NotExists); else return {nullptr, TableStructureLockHolder{}, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false}; } - if (storage_->engineType() != ::TiDB::StorageEngine::TMT && storage_->engineType() != ::TiDB::StorageEngine::DT) + if (storage->engineType() != ::TiDB::StorageEngine::TMT && storage->engineType() != ::TiDB::StorageEngine::DT) { - throw TiFlashException("Specifying schema_version for non-managed storage: " + storage_->getName() - + ", table: " + storage_->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed", - Errors::Coprocessor::Internal); + throw TiFlashException( + fmt::format("Specifying schema_version for non-managed storage: {}, table: {}, id: {} is not allowed", storage->getName(), storage->getTableName(), table_id), + Errors::Coprocessor::Internal); } - auto lock = storage_->lockStructureForShare(context.getCurrentQueryId()); + auto lock = storage->lockStructureForShare(context.getCurrentQueryId()); /// Check schema version, requiring TiDB/TiSpark and TiFlash both use exactly the same schema. // We have three schema versions, two in TiFlash: @@ -403,22 +412,22 @@ std::tuple DAGStorageInterpreter // 2. Global: the version that TiFlash global schema is at. // And one from TiDB/TiSpark: // 3. Query: the version that TiDB/TiSpark used for this query. - auto storage_schema_version = storage_->getTableInfo().schema_version; + auto storage_schema_version = storage->getTableInfo().schema_version; // Not allow storage > query in any case, one example is time travel queries. if (storage_schema_version > query_schema_version) - throw TiFlashException("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version) - + " newer than query schema version " + std::to_string(query_schema_version), - Errors::Table::SchemaVersionError); + throw TiFlashException( + fmt::format("Table {} schema version {} newer than query schema version {}", table_id, storage_schema_version, query_schema_version), + Errors::Table::SchemaVersionError); // From now on we have storage <= query. // If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve. if (schema_synced) - return {storage_, lock, storage_schema_version, true}; + return {storage, lock, storage_schema_version, true}; // From now on the schema was not synced. // 1. storage == query, TiDB/TiSpark is using exactly the same schema that altered this table, we are just OK to serve. // 2. global >= query, TiDB/TiSpark is using a schema older than TiFlash global, but as mentioned above we have storage <= query, // meaning that the query schema is still newer than the time when this table was last altered, so we still OK to serve. if (storage_schema_version == query_schema_version || global_schema_version >= query_schema_version) - return {storage_, lock, storage_schema_version, true}; + return {storage, lock, storage_schema_version, true}; // From now on we have global < query. // Return false for outer to sync and retry. return {nullptr, TableStructureLockHolder{}, storage_schema_version, false}; @@ -469,24 +478,25 @@ std::tuple DAGStorageInterpreter } } -std::tuple, String> DAGStorageInterpreter::getColumnsForTableScan(Int64 max_columns_to_read) +std::tuple, String> DAGStorageInterpreter::getColumnsForTableScan( + Int64 max_columns_to_read, + const ManageableStoragePtr & storage) { // todo handle alias column if (max_columns_to_read && table_scan.columns().size() > max_columns_to_read) { - throw TiFlashException("Limit for number of columns to read exceeded. " - "Requested: " - + toString(table_scan.columns().size()) + ", maximum: " + toString(max_columns_to_read), - Errors::BroadcastJoin::TooManyColumns); + throw TiFlashException( + fmt::format("Limit for number of columns to read exceeded. Requested: {}, maximum: {}", table_scan.columns().size(), max_columns_to_read), + Errors::BroadcastJoin::TooManyColumns); } - Names required_columns_; - NamesAndTypes source_columns_; + Names required_columns; + NamesAndTypes source_columns; std::vector need_cast_column; need_cast_column.reserve(table_scan.columns_size()); - String handle_column_name_ = MutableSupport::tidb_pk_column_name; + String handle_column_name = MutableSupport::tidb_pk_column_name; if (auto pk_handle_col = storage->getTableInfo().getPKHandleColumn()) - handle_column_name_ = pk_handle_col->get().name; + handle_column_name = pk_handle_col->get().name; for (Int32 i = 0; i < table_scan.columns_size(); i++) { @@ -494,10 +504,10 @@ std::tuple, String> DAGS ColumnID cid = ci.column_id(); // Column ID -1 return the handle column - String name = cid == -1 ? handle_column_name_ : storage->getTableInfo().getColumnName(cid); + String name = cid == -1 ? handle_column_name : storage->getTableInfo().getColumnName(cid); auto pair = storage->getColumns().getPhysical(name); - required_columns_.emplace_back(std::move(name)); - source_columns_.emplace_back(std::move(pair)); + required_columns.emplace_back(std::move(name)); + source_columns.emplace_back(std::move(pair)); if (cid != -1 && ci.tp() == TiDB::TypeTimestamp) need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast); else if (cid != -1 && ci.tp() == TiDB::TypeTime) @@ -506,10 +516,14 @@ std::tuple, String> DAGS need_cast_column.push_back(ExtraCastAfterTSMode::None); } - return {required_columns_, source_columns_, need_cast_column, handle_column_name_}; + return {required_columns, source_columns, need_cast_column, handle_column_name}; } -std::tuple, std::optional> DAGStorageInterpreter::buildRemoteTS() +std::tuple, std::optional> DAGStorageInterpreter::buildRemoteTS( + const ManageableStoragePtr & storage, + const String & handle_column_name, + const String & selection_name, + const std::vector & conditions) { const DAGContext & dag_context = *context.getDAGContext(); if (region_retry.empty()) @@ -536,13 +550,13 @@ std::tuple, std::optional> DAGStorage DAGSchema schema; tipb::DAGRequest dag_req; auto * executor = dag_req.mutable_root_executor(); - if (query_block.selection != nullptr) + if (!conditions.empty()) { executor->set_tp(tipb::ExecType::TypeSelection); - executor->set_executor_id(query_block.selection->executor_id()); + executor->set_executor_id(selection_name); auto * selection = executor->mutable_selection(); - for (auto & condition : query_block.selection->selection().conditions()) - *selection->add_conditions() = condition; + for (const auto & condition : conditions) + *selection->add_conditions() = *condition; executor = selection->mutable_child(); } @@ -550,7 +564,7 @@ std::tuple, std::optional> DAGStorage const auto & table_info = storage->getTableInfo(); tipb::Executor * ts_exec = executor; ts_exec->set_tp(tipb::ExecType::TypeTableScan); - ts_exec->set_executor_id(query_block.source->executor_id()); + ts_exec->set_executor_id(table_scan_executor_id); *(ts_exec->mutable_tbl_scan()) = table_scan; for (int i = 0; i < table_scan.columns().size(); ++i) @@ -568,7 +582,7 @@ std::tuple, std::optional> DAGStorage } else { - auto & col_info = table_info.getColumnInfo(col_id); + const auto & col_info = table_info.getColumnInfo(col_id); schema.emplace_back(std::make_pair(col_info.name, col_info)); } dag_req.add_output_offsets(i); diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index 41d9874ecaa..d30fff618a4 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -33,15 +33,21 @@ class DAGStorageInterpreter public: DAGStorageInterpreter( Context & context_, - const DAGQueryBlock & query_block_, - const tipb::TableScan & ts, - const std::vector & conditions_, + const String & table_scan_executor_id_, + const tipb::TableScan & table_scan_, size_t max_streams_); DAGStorageInterpreter(DAGStorageInterpreter &&) = delete; DAGStorageInterpreter & operator=(DAGStorageInterpreter &&) = delete; - void execute(DAGPipeline & pipeline); + void execute(DAGPipeline & pipeline, const String & selection_name, const std::vector & conditions); + + void execute(DAGPipeline & pipeline) + { + std::string empty_selection_name; + std::vector empty_conditions; + execute(pipeline, empty_selection_name, empty_conditions); + } /// Members will be transfered to DAGQueryBlockInterpreter after execute @@ -63,39 +69,41 @@ class DAGStorageInterpreter LearnerReadSnapshot doBatchCopLearnerRead(); - void doLocalRead(DAGPipeline & pipeline, size_t max_block_size); + void doLocalRead( + DAGPipeline & pipeline, + size_t max_block_size, + const ManageableStoragePtr & storage, + const Names & required_columns, + const LearnerReadSnapshot & learner_read_snapshot, + const std::vector & conditions); std::tuple getAndLockStorage(Int64 query_schema_version); - std::tuple, String> getColumnsForTableScan(Int64 max_columns_to_read); + std::tuple, String> getColumnsForTableScan( + Int64 max_columns_to_read, + const ManageableStoragePtr & storage); - std::tuple, std::optional> buildRemoteTS(); + std::tuple, std::optional> buildRemoteTS( + const ManageableStoragePtr & storage, + const String & handle_column_name, + const String & selection_name, + const std::vector & conditions); /// passed from caller, doesn't change during DAGStorageInterpreter's lifetime Context & context; - const DAGQueryBlock & query_block; + String table_scan_executor_id; const tipb::TableScan & table_scan; - const std::vector & conditions; size_t max_streams; LogWithPrefixPtr log; /// derived from other members, doesn't change during DAGStorageInterpreter's lifetime - TableID table_id; const Settings & settings; TMTContext & tmt; /// Intermediate variables shared by multiple member functions - std::unique_ptr mvcc_query_info; - // We need to validate regions snapshot after getting streams from storage. - LearnerReadSnapshot learner_read_snapshot; - /// Table from where to read data, if not subquery. - ManageableStoragePtr storage; - Names required_columns; - NamesAndTypes source_columns; - String handle_column_name; }; } // namespace DB