diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 6da54e74e69..8e8b6117def 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -83,7 +83,7 @@ std::unordered_map> FailPointHelper::f M(force_slow_page_storage_snapshot_release) #define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \ - M(pause_after_learner_read) \ + M(pause_with_alter_locks_acquired) \ M(hang_in_execution) \ M(pause_before_dt_background_delta_merge) \ M(pause_until_dt_background_delta_merge) \ diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index f514293e7d6..879a8435e0f 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -14,16 +14,20 @@ #include #include +#include #include #include #include #include #include #include +#include #include #include #include #include +#include +#include #include #include #include @@ -31,13 +35,20 @@ #include #include +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#include +#pragma GCC diagnostic pop + + namespace DB { namespace FailPoints { extern const char region_exception_after_read_from_storage_some_error[]; extern const char region_exception_after_read_from_storage_all_error[]; -extern const char pause_after_learner_read[]; +extern const char pause_with_alter_locks_acquired[]; extern const char force_remote_read_for_batch_cop[]; extern const char pause_after_copr_streams_acquired[]; } // namespace FailPoints @@ -223,6 +234,9 @@ DAGStorageInterpreter::DAGStorageInterpreter( } } +// Apply learner read to ensure we can get strong consistent with TiKV Region +// leaders. If the local Regions do not match the requested Regions, then build +// request to retry fetching data from other nodes. void DAGStorageInterpreter::execute(DAGPipeline & pipeline) { prepare(); @@ -233,23 +247,29 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) { if (!mvcc_query_info->regions_query_info.empty()) - doLocalRead(pipeline, settings.max_block_size); + buildLocalStreams(pipeline, settings.max_block_size); - null_stream_if_empty = std::make_shared(storage_for_logical_table->getSampleBlockForColumns(required_columns)); + // Should build `remote_requests` and `null_stream` under protect of `table_structure_lock`. + auto null_stream_if_empty = std::make_shared(storage_for_logical_table->getSampleBlockForColumns(required_columns)); - // Should build these vars under protect of `table_structure_lock`. - buildRemoteRequests(); + auto remote_requests = buildRemoteRequests(); - releaseAlterLocks(); + // A failpoint to test pause before alter lock released + FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired); + // Release alter locks + // The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result + // from those streams even if DDL operations are applied. Release the alter lock so that reading does not + // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. + const TableLockHolders drop_locks = releaseAlterLocks(); // It is impossible to have no joined stream. assert(pipeline.streams_with_non_joined_data.empty()); - // after executeRemoteQuery, remote read stream will be appended in pipeline.streams. + // after buildRemoteStreams, remote read stream will be appended in pipeline.streams. size_t remote_read_streams_start_index = pipeline.streams.size(); // For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop / mpp mode. if (!remote_requests.empty()) - executeRemoteQuery(pipeline); + buildRemoteStreams(std::move(remote_requests), pipeline); /// record local and remote io input stream auto & table_scan_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[table_scan.getTableScanExecutorID()]; @@ -257,7 +277,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) if (pipeline.streams.empty()) { - pipeline.streams.emplace_back(null_stream_if_empty); + pipeline.streams.emplace_back(std::move(null_stream_if_empty)); // reset remote_read_streams_start_index for null_stream_if_empty. remote_read_streams_start_index = 1; } @@ -268,7 +288,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) pipeline.transform([&](auto & stream) { // todo do not need to hold all locks in each stream, if the stream is reading from table a // it only needs to hold the lock of table a - for (auto & lock : drop_locks) + for (const auto & lock : drop_locks) stream->addTableLock(lock); }); @@ -290,12 +310,29 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) void DAGStorageInterpreter::prepare() { + // About why we do learner read before acquiring structure lock on Storage(s). + // Assume that: + // 1. Read threads do learner read and wait for the Raft applied index with holding a read lock + // on "alter lock" of an IStorage X + // 2. Raft threads try to decode data for Region in the same IStorage X, and find it need to + // apply DDL operations which acquire write lock on "alter locks" + // Under this situation, all Raft threads will be stuck by the read threads, but read threads + // wait for Raft threads to push forward the applied index. Deadlocks happens!! + // So we must do learner read without structure lock on IStorage. After learner read, acquire the + // structure lock of IStorage(s) (to avoid concurrent issues between read threads and DDL + // operations) and build the requested inputstreams. Once the inputstreams build, we should release + // the alter lock to avoid blocking DDL operations. + // TODO: If we can acquire a read-only view on the IStorage structure (both `ITableDeclaration` + // and `TiDB::TableInfo`) we may get this process more simplified. (tiflash/issues/1853) + + // Do learner read const DAGContext & dag_context = *context.getDAGContext(); if (dag_context.isBatchCop() || dag_context.isMPPTask()) learner_read_snapshot = doBatchCopLearnerRead(); else learner_read_snapshot = doCopLearnerRead(); + // Acquire read lock on `alter lock` and build the requested inputstreams storages_with_structure_lock = getAndLockStorages(settings.schema_version); assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end()); storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage; @@ -303,8 +340,6 @@ void DAGStorageInterpreter::prepare() std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(settings.max_columns_to_read); analyzer = std::make_unique(std::move(source_columns), context); - - FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read); } void DAGStorageInterpreter::executePushedDownFilter( @@ -392,7 +427,7 @@ void DAGStorageInterpreter::executeCastAfterTableScan( } } -void DAGStorageInterpreter::executeRemoteQuery(DAGPipeline & pipeline) +void DAGStorageInterpreter::buildRemoteStreams(std::vector && remote_requests, DAGPipeline & pipeline) { assert(!remote_requests.empty()); DAGSchema & schema = remote_requests[0].schema; @@ -464,8 +499,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead() { if (table_scan.isPartitionTableScan()) { - throw Exception("Cop request does not support partition table scan"); + throw TiFlashException("Cop request does not support partition table scan", DB::Errors::Coprocessor::BadRequest); } + TablesRegionInfoMap regions_for_local_read; for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) { @@ -481,7 +517,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead() if (info_retry) throw RegionException({info_retry->begin()->get().region_id}, status); - return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, false, context, log); + return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/false, context, log); } /// Will assign region_retry_from_local_region @@ -517,7 +553,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() } if (mvcc_query_info->regions_query_info.empty()) return {}; - return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, true, context, log); + return doLearnerRead(logical_table_id, *mvcc_query_info, max_streams, /*for_batch_cop=*/true, context, log); } catch (const LockException & e) { @@ -584,18 +620,18 @@ std::unordered_map DAGStorageInterpreter::generateSele return ret; } -void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block_size) +void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size) { const DAGContext & dag_context = *context.getDAGContext(); size_t total_local_region_num = mvcc_query_info->regions_query_info.size(); if (total_local_region_num == 0) return; - auto table_query_infos = generateSelectQueryInfos(); - for (auto & table_query_info : table_query_infos) + const auto table_query_infos = generateSelectQueryInfos(); + for (const auto & table_query_info : table_query_infos) { DAGPipeline current_pipeline; - TableID table_id = table_query_info.first; - SelectQueryInfo & query_info = table_query_info.second; + const TableID table_id = table_query_info.first; + const SelectQueryInfo & query_info = table_query_info.second; size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); if (region_num == 0) continue; @@ -613,11 +649,11 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block { current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, current_max_streams); - // After getting streams from storage, we need to validate whether regions have changed or not after learner read. - // In case the versions of regions have changed, those `streams` may contain different data other than expected. - // Like after region merge/split. + // After getting streams from storage, we need to validate whether Regions have changed or not after learner read. + // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams` + // may contain different data other than expected. - // Inject failpoint to throw RegionException + // Inject failpoint to throw RegionException for testing fiu_do_on(FailPoints::region_exception_after_read_from_storage_some_error, { const auto & regions_info = query_info.mvcc_query_info->regions_query_info; RegionException::UnavailableRegions region_ids; @@ -781,7 +817,7 @@ std::unordered_map DAG return {{}, {}, {}, false}; } - if (table_store->engineType() != ::TiDB::StorageEngine::TMT && table_store->engineType() != ::TiDB::StorageEngine::DT) + if (unlikely(table_store->engineType() != ::TiDB::StorageEngine::DT)) { throw TiFlashException( fmt::format( @@ -954,8 +990,10 @@ std::tuple> DAGStorageIn return {required_columns_tmp, source_columns_tmp, need_cast_column}; } -void DAGStorageInterpreter::buildRemoteRequests() +// Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions` +std::vector DAGStorageInterpreter::buildRemoteRequests() { + std::vector remote_requests; std::unordered_map region_id_to_table_id_map; std::unordered_map retry_regions_map; for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) @@ -978,6 +1016,8 @@ void DAGStorageInterpreter::buildRemoteRequests() if (retry_regions.empty()) continue; + // Append the region into DAGContext to return them to the upper layer. + // The upper layer should refresh its cache about these regions. for (const auto & r : retry_regions) context.getDAGContext()->retry_regions.push_back(r.get()); @@ -989,17 +1029,17 @@ void DAGStorageInterpreter::buildRemoteRequests() push_down_filter, log)); } + return remote_requests; } -void DAGStorageInterpreter::releaseAlterLocks() +TableLockHolders DAGStorageInterpreter::releaseAlterLocks() { - // The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result - // from those streams even if DDL operations are applied. Release the alter lock so that reading does not - // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. + TableLockHolders drop_locks; for (auto storage_with_lock : storages_with_structure_lock) { drop_locks.emplace_back(std::get<1>(std::move(storage_with_lock.second.lock).release())); } + return drop_locks; } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index a1d88083468..56de51385b2 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -14,13 +14,11 @@ #pragma once -#include #include #include #include #include #include -#include #include #include #include @@ -30,12 +28,6 @@ #include #include -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#include -#include -#pragma GCC diagnostic pop - #include namespace DB @@ -43,7 +35,7 @@ namespace DB using TablesRegionInfoMap = std::unordered_map>; /// DAGStorageInterpreter encapsulates operations around storage during interprete stage. /// It's only intended to be used by DAGQueryBlockInterpreter. -/// After DAGStorageInterpreter::execute some of its members will be transfered to DAGQueryBlockInterpreter. +/// After DAGStorageInterpreter::execute some of its members will be transferred to DAGQueryBlockInterpreter. class DAGStorageInterpreter { public: @@ -58,7 +50,7 @@ class DAGStorageInterpreter void execute(DAGPipeline & pipeline); - /// Members will be transfered to DAGQueryBlockInterpreter after execute + /// Members will be transferred to DAGQueryBlockInterpreter after execute std::unique_ptr analyzer; @@ -72,15 +64,15 @@ class DAGStorageInterpreter LearnerReadSnapshot doBatchCopLearnerRead(); - void doLocalRead(DAGPipeline & pipeline, size_t max_block_size); + void buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size); std::unordered_map getAndLockStorages(Int64 query_schema_version); std::tuple> getColumnsForTableScan(Int64 max_columns_to_read); - void buildRemoteRequests(); + std::vector buildRemoteRequests(); - void releaseAlterLocks(); + TableLockHolders releaseAlterLocks(); std::unordered_map generateSelectQueryInfos(); @@ -88,7 +80,7 @@ class DAGStorageInterpreter void recordProfileStreams(DAGPipeline & pipeline, const String & key); - void executeRemoteQuery(DAGPipeline & pipeline); + void buildRemoteStreams(std::vector && remote_requests, DAGPipeline & pipeline); void executeCastAfterTableScan( size_t remote_read_streams_start_index, @@ -106,9 +98,6 @@ class DAGStorageInterpreter std::vector is_need_add_cast_column; /// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag. RegionRetryList region_retry_from_local_region; - TableLockHolders drop_locks; - std::vector remote_requests; - BlockInputStreamPtr null_stream_if_empty; /// passed from caller, doesn't change during DAGStorageInterpreter's lifetime diff --git a/dbms/src/Flash/Coprocessor/TiDBTableScan.h b/dbms/src/Flash/Coprocessor/TiDBTableScan.h index 3c7703de7bf..934ee2c7769 100644 --- a/dbms/src/Flash/Coprocessor/TiDBTableScan.h +++ b/dbms/src/Flash/Coprocessor/TiDBTableScan.h @@ -49,7 +49,7 @@ class TiDBTableScan { return physical_table_ids; } - String getTableScanExecutorID() const + const String & getTableScanExecutorID() const { return executor_id; } diff --git a/dbms/src/Functions/GeoUtils.h b/dbms/src/Functions/GeoUtils.h index 6bf1f52fbf0..764e7aa5427 100644 --- a/dbms/src/Functions/GeoUtils.h +++ b/dbms/src/Functions/GeoUtils.h @@ -30,6 +30,7 @@ #endif #pragma GCC diagnostic ignored "-Wpragmas" +#pragma GCC diagnostic ignored "-Wunknown-warning-option" #pragma GCC diagnostic ignored "-Wunused-but-set-variable" #pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic ignored "-Wunused-variable" diff --git a/dbms/src/Storages/Transaction/LearnerRead.h b/dbms/src/Storages/Transaction/LearnerRead.h index 91d027c6599..ab7da31935c 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.h +++ b/dbms/src/Storages/Transaction/LearnerRead.h @@ -30,7 +30,7 @@ struct RegionLearnerReadSnapshot : RegionPtr UInt64 snapshot_event_flag{0}; RegionLearnerReadSnapshot() = default; - RegionLearnerReadSnapshot(const RegionPtr & region) + explicit RegionLearnerReadSnapshot(const RegionPtr & region) : RegionPtr(region) , snapshot_event_flag(region->getSnapshotEventFlag()) {}