From 0b1f21ea62996461c581faf2ffadb4899a6325e4 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Mon, 19 Aug 2024 15:53:30 +0800 Subject: [PATCH] Disagg: Fix the issue that lifetime of read-snapshots in WN may be longer than expected (#268) Co-authored-by: Ti Chi Robot Co-authored-by: JaySon --- .../WNEstablishDisaggTaskHandler.cpp | 7 ++++++ .../WNFetchPagesStreamWriter.cpp | 3 --- .../DeltaMerge/Remote/DisaggSnapshot.cpp | 25 +++++++++++++++++++ .../DeltaMerge/Remote/DisaggSnapshot.h | 4 +++ .../DeltaMerge/Remote/RNWorkerFetchPages.cpp | 9 ++++--- .../DeltaMerge/SegmentReadTaskPool.cpp | 11 ++++++++ .../Storages/DeltaMerge/SegmentReadTaskPool.h | 3 +++ 7 files changed, 55 insertions(+), 7 deletions(-) diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index 2dc861f9d14..fd1ecab104a 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -112,6 +112,13 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes snap->iterateTableSnapshots([&](const DM::Remote::DisaggPhysicalTableReadSnapshotPtr & snap) { response->add_tables(Serializer::serializeTo(snap, task_id, mem_tracker_wrapper).SerializeAsString()); }); + + // Release SegmentReadTasks that do not have ColumnFileTiny and ColumnFileInMemory + // because these tasks will never call FetchDisaggPages. + auto to_release_tasks = snap->releaseNoNeedFetchTasks(); + if (!to_release_tasks.empty()) + LOG_INFO(log, "Release no need fetch tasks: count={} segments={}", to_release_tasks.size(), to_release_tasks); + snaps->unregisterSnapshotIfEmpty(task_id); } } // namespace DB diff --git a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp index e21390ba6b3..4710d975f06 100644 --- a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp +++ b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -27,8 +26,6 @@ #include #include #include -#include -#include #include diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp index 3a66aeb3d73..849f9f3ca18 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp @@ -72,6 +72,15 @@ bool DisaggReadSnapshot::empty() const return true; } +SegmentReadTasks DisaggReadSnapshot::releaseNoNeedFetchTasks() +{ + SegmentReadTasks to_release_tasks; + std::unique_lock lock(mtx); + for (auto & [table_id, table_snap] : table_snapshots) + table_snap->releaseNoNeedFetchTasks(to_release_tasks); + return to_release_tasks; +} + DisaggPhysicalTableReadSnapshot::DisaggPhysicalTableReadSnapshot( KeyspaceTableID ks_table_id_, ColumnID pk_col_id_, @@ -97,4 +106,20 @@ SegmentReadTaskPtr DisaggPhysicalTableReadSnapshot::popTask(const UInt64 segment return nullptr; } +void DisaggPhysicalTableReadSnapshot::releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks) +{ + std::unique_lock lock(mtx); + for (auto itr = tasks.begin(); itr != tasks.end();) + { + if (itr->second->hasColumnFileToFetch()) + { + ++itr; + } + else + { + to_release_tasks.push_back(itr->second); + itr = tasks.erase(itr); + } + } +} } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index 7f471afd5fa..1ff0a68935b 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -81,6 +81,8 @@ class DisaggReadSnapshot bool empty() const; + SegmentReadTasks releaseNoNeedFetchTasks(); + DISALLOW_COPY(DisaggReadSnapshot); private: @@ -104,6 +106,8 @@ class DisaggPhysicalTableReadSnapshot return tasks.empty(); } + void releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks); + DISALLOW_COPY(DisaggPhysicalTableReadSnapshot); public: diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp index 69bd9dd09b8..62b4083fc36 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp @@ -181,10 +181,7 @@ void RNWorkerFetchPages::doFetchPages( const RNReadSegmentTaskPtr & seg_task, const disaggregated::FetchDisaggPagesRequest & request) { - // No page need to be fetched. - if (request.page_ids_size() == 0) - return; - + // No matter all delta data is cached or not, call FetchDisaggPages to release snapshot in WN. Stopwatch sw_total; Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE}; bool rpc_is_observed = false; @@ -210,6 +207,10 @@ void RNWorkerFetchPages::doFetchPages( } }); + // All delta data is cached. + if (request.page_ids_size() == 0) + return; + // Used to verify all pages are fetched. std::set remaining_pages_to_fetch; for (auto p : request.page_ids()) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 457b7750d45..492c9f65f69 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -105,6 +105,17 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas return result_tasks; } +bool SegmentReadTask::hasColumnFileToFetch() const +{ + auto need_to_fetch = [](const ColumnFilePtr & cf) { + // In v7.5, Only ColumnFileTiny need too fetch. + return cf->isTinyFile(); + }; + const auto & mem_cfs = read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles(); + const auto & persisted_cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles(); + return std::any_of(mem_cfs.cbegin(), mem_cfs.cend(), need_to_fetch) + || std::any_of(persisted_cfs.cbegin(), persisted_cfs.cend(), need_to_fetch); +} SegmentReadTasksWrapper::SegmentReadTasksWrapper(bool enable_read_thread_, SegmentReadTasks && ordered_tasks_) : enable_read_thread(enable_read_thread_) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 0ec2b7dbff2..1eceac49dcd 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -56,6 +56,9 @@ struct SegmentReadTask void mergeRanges() { ranges = DM::tryMergeRanges(std::move(ranges), 1); } static SegmentReadTasks trySplitReadTasks(const SegmentReadTasks & tasks, size_t expected_size); + + // WN calls hasColumnFileToFetch to check whether a SegmentReadTask need to fetch column files from it + bool hasColumnFileToFetch() const; }; class BlockStat