diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index a68c1bc3685..ada89517ec4 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -103,6 +103,13 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes snap->iterateTableSnapshots([&](const DM::Remote::DisaggPhysicalTableReadSnapshotPtr & snap) { response->add_tables(Serializer::serializeTo(snap, task_id, mem_tracker_wrapper).SerializeAsString()); }); + + // Release SegmentReadTasks that do not have ColumnFileTiny and ColumnFileInMemory + // because these tasks will never call FetchDisaggPages. + auto to_release_tasks = snap->releaseNoNeedFetchTasks(); + if (!to_release_tasks.empty()) + LOG_INFO(log, "Release no need fetch tasks: count={} segments={}", to_release_tasks.size(), to_release_tasks); + snaps->unregisterSnapshotIfEmpty(task_id); } } // namespace DB diff --git a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp index e21390ba6b3..4710d975f06 100644 --- a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp +++ b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -27,8 +26,6 @@ #include #include #include -#include -#include #include diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp index 80d57f74548..6f34f5302b8 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp @@ -72,6 +72,15 @@ bool DisaggReadSnapshot::empty() const return true; } +SegmentReadTasks DisaggReadSnapshot::releaseNoNeedFetchTasks() +{ + SegmentReadTasks to_release_tasks; + std::unique_lock lock(mtx); + for (auto & [table_id, table_snap] : table_snapshots) + table_snap->releaseNoNeedFetchTasks(to_release_tasks); + return to_release_tasks; +} + DisaggPhysicalTableReadSnapshot::DisaggPhysicalTableReadSnapshot( KeyspaceTableID ks_table_id_, SegmentReadTasks && tasks_) @@ -95,4 +104,20 @@ SegmentReadTaskPtr DisaggPhysicalTableReadSnapshot::popTask(const UInt64 segment return nullptr; } +void DisaggPhysicalTableReadSnapshot::releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks) +{ + std::unique_lock lock(mtx); + for (auto itr = tasks.begin(); itr != tasks.end();) + { + if (itr->second->hasColumnFileToFetch()) + { + ++itr; + } + else + { + to_release_tasks.push_back(itr->second); + itr = tasks.erase(itr); + } + } +} } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index 539bc986497..86ac2a7e62f 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -81,6 +81,8 @@ class DisaggReadSnapshot bool empty() const; + SegmentReadTasks releaseNoNeedFetchTasks(); + DISALLOW_COPY(DisaggReadSnapshot); private: @@ -104,6 +106,8 @@ class DisaggPhysicalTableReadSnapshot return tasks.empty(); } + void releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks); + DISALLOW_COPY(DisaggPhysicalTableReadSnapshot); public: diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp index ee3641e23af..b70f4347a84 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp @@ -94,6 +94,12 @@ disaggregated::FetchDisaggPagesRequest buildFetchPagesRequest( RNReadSegmentTaskPtr RNWorkerFetchPages::doWork(const RNReadSegmentTaskPtr & seg_task) { + if (seg_task->meta.delta_tinycf_page_ids.empty()) + { + // No page need to be fetched or guarded. + return seg_task; + } + MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get()); Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; SCOPE_EXIT({ @@ -170,10 +176,7 @@ void RNWorkerFetchPages::doFetchPages( const RNReadSegmentTaskPtr & seg_task, const disaggregated::FetchDisaggPagesRequest & request) { - // No page need to be fetched. - if (request.page_ids_size() == 0) - return; - + // No matter all delta data is cached or not, call FetchDisaggPages to release snapshot in WN. Stopwatch sw_total; Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE}; bool rpc_is_observed = false; @@ -195,6 +198,10 @@ void RNWorkerFetchPages::doFetchPages( stream_resp->Finish(); }); + // All delta data is cached. + if (request.page_ids_size() == 0) + return; + // Used to verify all pages are fetched. std::set remaining_pages_to_fetch; for (auto p : request.page_ids()) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 457b7750d45..492c9f65f69 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -105,6 +105,17 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas return result_tasks; } +bool SegmentReadTask::hasColumnFileToFetch() const +{ + auto need_to_fetch = [](const ColumnFilePtr & cf) { + // In v7.5, Only ColumnFileTiny need too fetch. + return cf->isTinyFile(); + }; + const auto & mem_cfs = read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles(); + const auto & persisted_cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles(); + return std::any_of(mem_cfs.cbegin(), mem_cfs.cend(), need_to_fetch) + || std::any_of(persisted_cfs.cbegin(), persisted_cfs.cend(), need_to_fetch); +} SegmentReadTasksWrapper::SegmentReadTasksWrapper(bool enable_read_thread_, SegmentReadTasks && ordered_tasks_) : enable_read_thread(enable_read_thread_) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index fb37b7ba502..55695abb5bd 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -56,6 +56,9 @@ struct SegmentReadTask void mergeRanges() { ranges = DM::tryMergeRanges(std::move(ranges), 1); } static SegmentReadTasks trySplitReadTasks(const SegmentReadTasks & tasks, size_t expected_size); + + // WN calls hasColumnFileToFetch to check whether a SegmentReadTask need to fetch column files from it + bool hasColumnFileToFetch() const; }; class BlockStat