diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 087949d2b89..663aa8b233f 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -197,20 +197,87 @@ void setQuotaAndLimitsOnTableScan(Context & context, DAGPipeline & pipeline) } // add timezone cast for timestamp type, this is used to support session level timezone -bool addExtraCastsAfterTs( +// +std::tuple addExtraCastsAfterTs( DAGExpressionAnalyzer & analyzer, const std::vector & need_cast_column, - ExpressionActionsChain & chain, const TiDBTableScan & table_scan) { bool has_need_cast_column = false; for (auto b : need_cast_column) - { has_need_cast_column |= (b != ExtraCastAfterTSMode::None); - } if (!has_need_cast_column) - return false; - return analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan); + return {false, nullptr, nullptr}; + + auto original_source_columns = analyzer.getCurrentInputColumns(); + ExpressionActionsChain chain; + analyzer.initChain(chain, original_source_columns); + // execute timezone cast or duration cast if needed for local table scan + if (analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan)) + { + ExpressionActionsPtr extra_cast = chain.getLastActions(); + assert(extra_cast); + chain.finalize(); + chain.clear(); + + // After `analyzer.appendExtraCastsAfterTS`, analyzer.getCurrentInputColumns() has been modified. + // For remote read, `timezone cast and duration cast` had been pushed down, don't need to execute cast expressions. + // To keep the schema of local read streams and remote read streams the same, do project action for remote read streams. + NamesWithAliases project_for_remote_read; + const auto & after_cast_source_columns = analyzer.getCurrentInputColumns(); + for (size_t i = 0; i < after_cast_source_columns.size(); ++i) + project_for_remote_read.emplace_back(original_source_columns[i].name, after_cast_source_columns[i].name); + assert(!project_for_remote_read.empty()); + ExpressionActionsPtr project_for_cop_read = std::make_shared(original_source_columns, analyzer.getContext().getSettingsRef()); + project_for_cop_read->add(ExpressionAction::project(project_for_remote_read)); + + return {true, extra_cast, project_for_cop_read}; + } + else + { + return {false, nullptr, nullptr}; + } +} + +void injectFailPointForLocalRead(const SelectQueryInfo & query_info) +{ + // Inject failpoint to throw RegionException for testing + fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, { + const auto & regions_info = query_info.mvcc_query_info->regions_query_info; + RegionException::UnavailableRegions region_ids; + for (const auto & info : regions_info) + { + if (random() % 100 > 50) + region_ids.insert(info.region_id); + } + throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); + }); + fiu_do_on(FailPoints::region_exception_after_read_from_storage_all_error, { + const auto & regions_info = query_info.mvcc_query_info->regions_query_info; + RegionException::UnavailableRegions region_ids; + for (const auto & info : regions_info) + region_ids.insert(info.region_id); + throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); + }); +} + +String genErrMsgForLocalRead( + const ManageableStoragePtr & storage, + const TableID & table_id, + const TableID & logical_table_id) +{ + return table_id == logical_table_id + ? fmt::format( + "(while creating InputStreams from storage `{}`.`{}`, table_id: {})", + storage->getDatabaseName(), + storage->getTableName(), + table_id) + : fmt::format( + "(while creating InputStreams from storage `{}`.`{}`, table_id: {}, logical_table_id: {})", + storage->getDatabaseName(), + storage->getTableName(), + table_id, + logical_table_id); } } // namespace @@ -272,7 +339,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) // For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop / mpp mode. if (!remote_requests.empty()) - buildRemoteStreams(std::move(remote_requests), pipeline); + buildRemoteStreams(remote_requests, pipeline); /// record local and remote io input stream auto & table_scan_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[table_scan.getTableScanExecutorID()]; @@ -346,10 +413,10 @@ void DAGStorageInterpreter::prepare() analyzer = std::make_unique(std::move(source_columns), context); } -void DAGStorageInterpreter::executePushedDownFilter( - size_t remote_read_streams_start_index, - DAGPipeline & pipeline) +std::tuple DAGStorageInterpreter::buildPushDownFilter() { + assert(push_down_filter.hasValue()); + ExpressionActionsChain chain; analyzer->initChain(chain, analyzer->getCurrentInputColumns()); String filter_column_name = analyzer->appendWhere(chain, push_down_filter.conditions); @@ -368,6 +435,15 @@ void DAGStorageInterpreter::executePushedDownFilter( chain.finalize(); chain.clear(); + return {before_where, filter_column_name, project_after_where}; +} + +void DAGStorageInterpreter::executePushedDownFilter( + size_t remote_read_streams_start_index, + DAGPipeline & pipeline) +{ + auto [before_where, filter_column_name, project_after_where] = buildPushDownFilter(); + assert(pipeline.streams_with_non_joined_data.empty()); assert(remote_read_streams_start_index <= pipeline.streams.size()); // for remote read, filter had been pushed down, don't need to execute again. @@ -386,28 +462,10 @@ void DAGStorageInterpreter::executeCastAfterTableScan( size_t remote_read_streams_start_index, DAGPipeline & pipeline) { - auto original_source_columns = analyzer->getCurrentInputColumns(); - - ExpressionActionsChain chain; - analyzer->initChain(chain, original_source_columns); - // execute timezone cast or duration cast if needed for local table scan - if (addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, chain, table_scan)) + auto [has_cast, extra_cast, project_for_cop_read] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan); + if (has_cast) { - ExpressionActionsPtr extra_cast = chain.getLastActions(); - chain.finalize(); - chain.clear(); - - // After `addExtraCastsAfterTs`, analyzer->getCurrentInputColumns() has been modified. - // For remote read, `timezone cast and duration cast` had been pushed down, don't need to execute cast expressions. - // To keep the schema of local read streams and remote read streams the same, do project action for remote read streams. - NamesWithAliases project_for_remote_read; - const auto & after_cast_source_columns = analyzer->getCurrentInputColumns(); - for (size_t i = 0; i < after_cast_source_columns.size(); ++i) - { - project_for_remote_read.emplace_back(original_source_columns[i].name, after_cast_source_columns[i].name); - } - assert(!project_for_remote_read.empty()); assert(pipeline.streams_with_non_joined_data.empty()); assert(remote_read_streams_start_index <= pipeline.streams.size()); size_t i = 0; @@ -419,27 +477,20 @@ void DAGStorageInterpreter::executeCastAfterTableScan( stream->setExtraInfo("cast after local tableScan"); } // remote streams - if (i < pipeline.streams.size()) + while (i < pipeline.streams.size()) { - ExpressionActionsPtr project_for_cop_read = generateProjectExpressionActions( - pipeline.streams[i], - context, - project_for_remote_read); - while (i < pipeline.streams.size()) - { - auto & stream = pipeline.streams[i++]; - stream = std::make_shared(stream, project_for_cop_read, log->identifier()); - stream->setExtraInfo("cast after remote tableScan"); - } + auto & stream = pipeline.streams[i++]; + stream = std::make_shared(stream, project_for_cop_read, log->identifier()); + stream->setExtraInfo("cast after remote tableScan"); } } } -void DAGStorageInterpreter::buildRemoteStreams(std::vector && remote_requests, DAGPipeline & pipeline) +std::vector DAGStorageInterpreter::buildCopTasks(const std::vector & remote_requests) { assert(!remote_requests.empty()); - DAGSchema & schema = remote_requests[0].schema; #ifndef NDEBUG + const DAGSchema & schema = remote_requests[0].schema; auto schema_match = [&schema](const DAGSchema & other) { if (schema.size() != other.size()) return false; @@ -456,7 +507,6 @@ void DAGStorageInterpreter::buildRemoteStreams(std::vector && rem throw Exception("Schema mismatch between different partitions for partition table"); } #endif - bool has_enforce_encode_type = remote_requests[0].dag_request.has_force_encode_type() && remote_requests[0].dag_request.force_encode_type(); pingcap::kv::Cluster * cluster = tmt.getKVCluster(); std::vector all_tasks; for (const auto & remote_request : remote_requests) @@ -472,7 +522,16 @@ void DAGStorageInterpreter::buildRemoteStreams(std::vector && rem auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, &Poco::Logger::get("pingcap/coprocessor")); all_tasks.insert(all_tasks.end(), tasks.begin(), tasks.end()); } + return all_tasks; +} +void DAGStorageInterpreter::buildRemoteStreams(const std::vector & remote_requests, DAGPipeline & pipeline) +{ + std::vector all_tasks = buildCopTasks(remote_requests); + + const DAGSchema & schema = remote_requests[0].schema; + pingcap::kv::Cluster * cluster = tmt.getKVCluster(); + bool has_enforce_encode_type = remote_requests[0].dag_request.has_force_encode_type() && remote_requests[0].dag_request.force_encode_type(); size_t concurrent_num = std::min(context.getSettingsRef().max_threads, all_tasks.size()); size_t task_per_thread = all_tasks.size() / concurrent_num; size_t rest_task = all_tasks.size() % concurrent_num; @@ -631,6 +690,123 @@ std::unordered_map DAGStorageInterpreter::generateSele return ret; } +bool DAGStorageInterpreter::checkRetriableForBatchCopOrMPP( + const TableID & table_id, + const SelectQueryInfo & query_info, + const RegionException & e, + int num_allow_retry) +{ + const DAGContext & dag_context = *context.getDAGContext(); + assert((dag_context.isBatchCop() || dag_context.isMPPTask())); + const auto & dag_regions = dag_context.getTableRegionsInfoByTableID(table_id).local_regions; + FmtBuffer buffer; + // Normally there is only few regions need to retry when super batch is enabled. Retry to read + // from local first. However, too many retry in different places may make the whole process + // time out of control. We limit the number of retries to 1 now. + if (likely(num_allow_retry > 0)) + { + auto & regions_query_info = query_info.mvcc_query_info->regions_query_info; + for (auto iter = regions_query_info.begin(); iter != regions_query_info.end(); /**/) + { + if (e.unavailable_region.find(iter->region_id) != e.unavailable_region.end()) + { + // move the error regions info from `query_info.mvcc_query_info->regions_query_info` to `region_retry_from_local_region` + if (auto region_iter = dag_regions.find(iter->region_id); likely(region_iter != dag_regions.end())) + { + region_retry_from_local_region.emplace_back(region_iter->second); + buffer.fmtAppend("{},", region_iter->first); + } + iter = regions_query_info.erase(iter); + } + else + { + ++iter; + } + } + LOG_FMT_WARNING( + log, + "RegionException after read from storage, regions [{}], message: {}{}", + buffer.toString(), + e.message(), + (regions_query_info.empty() ? "" : ", retry to read from local")); + // no available region in local, break retry loop + // otherwise continue to retry read from local storage + return !regions_query_info.empty(); + } + else + { + // push all regions to `region_retry_from_local_region` to retry from other tiflash nodes + for (const auto & region : query_info.mvcc_query_info->regions_query_info) + { + auto iter = dag_regions.find(region.region_id); + if (likely(iter != dag_regions.end())) + { + region_retry_from_local_region.emplace_back(iter->second); + buffer.fmtAppend("{},", iter->first); + } + } + LOG_FMT_WARNING(log, "RegionException after read from storage, regions [{}], message: {}", buffer.toString(), e.message()); + return false; // break retry loop + } +} + +void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable( + const TableID & table_id, + const SelectQueryInfo & query_info, + DAGPipeline & pipeline, + size_t max_block_size) +{ + size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); + if (region_num == 0) + return; + + assert(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end()); + auto & storage = storages_with_structure_lock[table_id].storage; + + const DAGContext & dag_context = *context.getDAGContext(); + for (int num_allow_retry = 1; num_allow_retry >= 0; --num_allow_retry) + { + try + { + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); + + injectFailPointForLocalRead(query_info); + + // After getting streams from storage, we need to validate whether Regions have changed or not after learner read. + // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams` + // may contain different data other than expected. + validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); + break; + } + catch (RegionException & e) + { + /// Recover from region exception for batchCop/MPP + if (dag_context.isBatchCop() || dag_context.isMPPTask()) + { + // clean all streams from local because we are not sure the correctness of those streams + pipeline.streams.clear(); + if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry))) + continue; + else + break; + } + else + { + // Throw an exception for TiDB / TiSpark to retry + e.addMessage(genErrMsgForLocalRead(storage, table_id, logical_table_id)); + throw; + } + } + catch (DB::Exception & e) + { + /// Other unknown exceptions + e.addMessage(genErrMsgForLocalRead(storage, table_id, logical_table_id)); + throw; + } + } +} + void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size) { const DAGContext & dag_context = *context.getDAGContext(); @@ -646,142 +822,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max DAGPipeline current_pipeline; const TableID table_id = table_query_info.first; const SelectQueryInfo & query_info = table_query_info.second; - size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); - if (region_num == 0) - continue; - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; - assert(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end()); - auto & storage = storages_with_structure_lock[table_id].storage; - - int num_allow_retry = 1; - while (true) - { - try - { - current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); - - // After getting streams from storage, we need to validate whether Regions have changed or not after learner read. - // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams` - // may contain different data other than expected. - - // Inject failpoint to throw RegionException for testing - fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, { - const auto & regions_info = query_info.mvcc_query_info->regions_query_info; - RegionException::UnavailableRegions region_ids; - for (const auto & info : regions_info) - { - if (random() % 100 > 50) - region_ids.insert(info.region_id); - } - throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); - }); - fiu_do_on(FailPoints::region_exception_after_read_from_storage_all_error, { - const auto & regions_info = query_info.mvcc_query_info->regions_query_info; - RegionException::UnavailableRegions region_ids; - for (const auto & info : regions_info) - region_ids.insert(info.region_id); - throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND); - }); - validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log); - break; - } - catch (RegionException & e) - { - /// Recover from region exception when super batch is enable - if (dag_context.isBatchCop() || dag_context.isMPPTask()) - { - // clean all streams from local because we are not sure the correctness of those streams - current_pipeline.streams.clear(); - const auto & dag_regions = dag_context.getTableRegionsInfoByTableID(table_id).local_regions; - FmtBuffer buffer; - // Normally there is only few regions need to retry when super batch is enabled. Retry to read - // from local first. However, too many retry in different places may make the whole process - // time out of control. We limit the number of retries to 1 now. - if (likely(num_allow_retry > 0)) - { - --num_allow_retry; - auto & regions_query_info = query_info.mvcc_query_info->regions_query_info; - for (auto iter = regions_query_info.begin(); iter != regions_query_info.end(); /**/) - { - if (e.unavailable_region.find(iter->region_id) != e.unavailable_region.end()) - { - // move the error regions info from `query_info.mvcc_query_info->regions_query_info` to `region_retry_from_local_region` - if (auto region_iter = dag_regions.find(iter->region_id); likely(region_iter != dag_regions.end())) - { - region_retry_from_local_region.emplace_back(region_iter->second); - buffer.fmtAppend("{},", region_iter->first); - } - iter = regions_query_info.erase(iter); - } - else - { - ++iter; - } - } - LOG_FMT_WARNING( - log, - "RegionException after read from storage, regions [{}], message: {}{}", - buffer.toString(), - e.message(), - (regions_query_info.empty() ? "" : ", retry to read from local")); - if (unlikely(regions_query_info.empty())) - break; // no available region in local, break retry loop - continue; // continue to retry read from local storage - } - else - { - // push all regions to `region_retry_from_local_region` to retry from other tiflash nodes - for (const auto & region : query_info.mvcc_query_info->regions_query_info) - { - auto iter = dag_regions.find(region.region_id); - if (likely(iter != dag_regions.end())) - { - region_retry_from_local_region.emplace_back(iter->second); - buffer.fmtAppend("{},", iter->first); - } - } - LOG_FMT_WARNING(log, "RegionException after read from storage, regions [{}], message: {}", buffer.toString(), e.message()); - break; // break retry loop - } - } - else - { - // Throw an exception for TiDB / TiSpark to retry - if (table_id == logical_table_id) - e.addMessage(fmt::format( - "(while creating InputStreams from storage `{}`.`{}`, table_id: {})", - storage->getDatabaseName(), - storage->getTableName(), - table_id)); - else - e.addMessage(fmt::format( - "(while creating InputStreams from storage `{}`.`{}`, table_id: {}, logical_table_id: {})", - storage->getDatabaseName(), - storage->getTableName(), - table_id, - logical_table_id)); - throw; - } - } - catch (DB::Exception & e) - { - /// Other unknown exceptions - if (table_id == logical_table_id) - e.addMessage(fmt::format( - "(while creating InputStreams from storage `{}`.`{}`, table_id: {})", - storage->getDatabaseName(), - storage->getTableName(), - table_id)); - else - e.addMessage(fmt::format( - "(while creating InputStreams from storage `{}`.`{}`, table_id: {}, logical_table_id: {})", - storage->getDatabaseName(), - storage->getTableName(), - table_id, - logical_table_id)); - throw; - } - } + buildLocalStreamsForPhysicalTable(table_id, query_info, current_pipeline, max_block_size); if (has_multiple_partitions) stream_pool->addPartitionStreams(current_pipeline.streams); else diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index 0425abe04db..efe78c25918 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -64,6 +65,16 @@ class DAGStorageInterpreter LearnerReadSnapshot doBatchCopLearnerRead(); + bool checkRetriableForBatchCopOrMPP( + const TableID & table_id, + const SelectQueryInfo & query_info, + const RegionException & e, + int num_allow_retry); + void buildLocalStreamsForPhysicalTable( + const TableID & table_id, + const SelectQueryInfo & query_info, + DAGPipeline & pipeline, + size_t max_block_size); void buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size); std::unordered_map getAndLockStorages(Int64 query_schema_version); @@ -80,12 +91,15 @@ class DAGStorageInterpreter void recordProfileStreams(DAGPipeline & pipeline, const String & key); - void buildRemoteStreams(std::vector && remote_requests, DAGPipeline & pipeline); + std::vector buildCopTasks(const std::vector & remote_requests); + void buildRemoteStreams(const std::vector & remote_requests, DAGPipeline & pipeline); void executeCastAfterTableScan( size_t remote_read_streams_start_index, DAGPipeline & pipeline); + // before_where, filter_column_name, after_where + std::tuple buildPushDownFilter(); void executePushedDownFilter( size_t remote_read_streams_start_index, DAGPipeline & pipeline); diff --git a/dbms/src/Interpreters/ExpressionActions.h b/dbms/src/Interpreters/ExpressionActions.h index e8fb48f4e3f..09344173b59 100644 --- a/dbms/src/Interpreters/ExpressionActions.h +++ b/dbms/src/Interpreters/ExpressionActions.h @@ -145,6 +145,14 @@ class ExpressionActions sample_block.insert(ColumnWithTypeAndName(nullptr, input_elem.type, input_elem.name)); } + ExpressionActions(const NamesAndTypes & input_columns_, const Settings & settings_) + : input_columns(input_columns_.cbegin(), input_columns_.cend()) + , settings(settings_) + { + for (const auto & input_elem : input_columns) + sample_block.insert(ColumnWithTypeAndName(nullptr, input_elem.type, input_elem.name)); + } + /// For constant columns the columns themselves can be contained in `input_columns_`. ExpressionActions(const ColumnsWithTypeAndName & input_columns_, const Settings & settings_) : settings(settings_)