From 75733080bb195580883f579f9ed926032bb4df74 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 13 Aug 2024 17:21:02 +0800 Subject: [PATCH] Disagg: Fix the issue that lifetime of read-snapshots in WN may be longer than expected (#9297) (#9317) close pingcap/tiflash#9298 1. In WN, release snapshots of SegmentReadTasks that do not have ColumnFileTiny and ColumnFileInMemory. 2. In CN, no matter all ColumnFileTinys and ColumnFileInMemorys are cached or not, call FetchDisaggPages to release snapshot in WN. Signed-off-by: ti-chi-bot Co-authored-by: jinhelin --- .../WNEstablishDisaggTaskHandler.cpp | 7 ++++++ .../WNFetchPagesStreamWriter.cpp | 3 --- .../DeltaMerge/Remote/DisaggSnapshot.cpp | 25 +++++++++++++++++++ .../DeltaMerge/Remote/DisaggSnapshot.h | 4 +++ .../DeltaMerge/Remote/RNWorkerFetchPages.cpp | 15 ++++++++--- .../DeltaMerge/SegmentReadTaskPool.cpp | 11 ++++++++ .../Storages/DeltaMerge/SegmentReadTaskPool.h | 3 +++ 7 files changed, 61 insertions(+), 7 deletions(-) diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index a68c1bc3685..ada89517ec4 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -103,6 +103,13 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes snap->iterateTableSnapshots([&](const DM::Remote::DisaggPhysicalTableReadSnapshotPtr & snap) { response->add_tables(Serializer::serializeTo(snap, task_id, mem_tracker_wrapper).SerializeAsString()); }); + + // Release SegmentReadTasks that do not have ColumnFileTiny and ColumnFileInMemory + // because these tasks will never call FetchDisaggPages. + auto to_release_tasks = snap->releaseNoNeedFetchTasks(); + if (!to_release_tasks.empty()) + LOG_INFO(log, "Release no need fetch tasks: count={} segments={}", to_release_tasks.size(), to_release_tasks); + snaps->unregisterSnapshotIfEmpty(task_id); } } // namespace DB diff --git a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp index e21390ba6b3..4710d975f06 100644 --- a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp +++ b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -27,8 +26,6 @@ #include #include #include -#include -#include #include diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp index 80d57f74548..6f34f5302b8 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp @@ -72,6 +72,15 @@ bool DisaggReadSnapshot::empty() const return true; } +SegmentReadTasks DisaggReadSnapshot::releaseNoNeedFetchTasks() +{ + SegmentReadTasks to_release_tasks; + std::unique_lock lock(mtx); + for (auto & [table_id, table_snap] : table_snapshots) + table_snap->releaseNoNeedFetchTasks(to_release_tasks); + return to_release_tasks; +} + DisaggPhysicalTableReadSnapshot::DisaggPhysicalTableReadSnapshot( KeyspaceTableID ks_table_id_, SegmentReadTasks && tasks_) @@ -95,4 +104,20 @@ SegmentReadTaskPtr DisaggPhysicalTableReadSnapshot::popTask(const UInt64 segment return nullptr; } +void DisaggPhysicalTableReadSnapshot::releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks) +{ + std::unique_lock lock(mtx); + for (auto itr = tasks.begin(); itr != tasks.end();) + { + if (itr->second->hasColumnFileToFetch()) + { + ++itr; + } + else + { + to_release_tasks.push_back(itr->second); + itr = tasks.erase(itr); + } + } +} } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index 539bc986497..86ac2a7e62f 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -81,6 +81,8 @@ class DisaggReadSnapshot bool empty() const; + SegmentReadTasks releaseNoNeedFetchTasks(); + DISALLOW_COPY(DisaggReadSnapshot); private: @@ -104,6 +106,8 @@ class DisaggPhysicalTableReadSnapshot return tasks.empty(); } + void releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks); + DISALLOW_COPY(DisaggPhysicalTableReadSnapshot); public: diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp index ee3641e23af..b70f4347a84 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp @@ -94,6 +94,12 @@ disaggregated::FetchDisaggPagesRequest buildFetchPagesRequest( RNReadSegmentTaskPtr RNWorkerFetchPages::doWork(const RNReadSegmentTaskPtr & seg_task) { + if (seg_task->meta.delta_tinycf_page_ids.empty()) + { + // No page need to be fetched or guarded. + return seg_task; + } + MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get()); Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; SCOPE_EXIT({ @@ -170,10 +176,7 @@ void RNWorkerFetchPages::doFetchPages( const RNReadSegmentTaskPtr & seg_task, const disaggregated::FetchDisaggPagesRequest & request) { - // No page need to be fetched. - if (request.page_ids_size() == 0) - return; - + // No matter all delta data is cached or not, call FetchDisaggPages to release snapshot in WN. Stopwatch sw_total; Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE}; bool rpc_is_observed = false; @@ -195,6 +198,10 @@ void RNWorkerFetchPages::doFetchPages( stream_resp->Finish(); }); + // All delta data is cached. + if (request.page_ids_size() == 0) + return; + // Used to verify all pages are fetched. std::set remaining_pages_to_fetch; for (auto p : request.page_ids()) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 457b7750d45..492c9f65f69 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -105,6 +105,17 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas return result_tasks; } +bool SegmentReadTask::hasColumnFileToFetch() const +{ + auto need_to_fetch = [](const ColumnFilePtr & cf) { + // In v7.5, Only ColumnFileTiny need too fetch. + return cf->isTinyFile(); + }; + const auto & mem_cfs = read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles(); + const auto & persisted_cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles(); + return std::any_of(mem_cfs.cbegin(), mem_cfs.cend(), need_to_fetch) + || std::any_of(persisted_cfs.cbegin(), persisted_cfs.cend(), need_to_fetch); +} SegmentReadTasksWrapper::SegmentReadTasksWrapper(bool enable_read_thread_, SegmentReadTasks && ordered_tasks_) : enable_read_thread(enable_read_thread_) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index fb37b7ba502..55695abb5bd 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -56,6 +56,9 @@ struct SegmentReadTask void mergeRanges() { ranges = DM::tryMergeRanges(std::move(ranges), 1); } static SegmentReadTasks trySplitReadTasks(const SegmentReadTasks & tasks, size_t expected_size); + + // WN calls hasColumnFileToFetch to check whether a SegmentReadTask need to fetch column files from it + bool hasColumnFileToFetch() const; }; class BlockStat