diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index ac29e8f69fe..4e6becfa937 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -105,6 +105,13 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes response->add_tables( Serializer::serializePhysicalTable(snap, task_id, mem_tracker_wrapper, need_mem_data).SerializeAsString()); }); + + // Release SegmentReadTasks that do not have ColumnFileTiny and ColumnFileInMemory + // because these tasks will never call FetchDisaggPages. + auto to_release_tasks = snap->releaseNoNeedFetchTasks(); + if (!to_release_tasks.empty()) + LOG_INFO(log, "Release no need fetch tasks: count={} segments={}", to_release_tasks.size(), to_release_tasks); + snaps->unregisterSnapshotIfEmpty(task_id); } } // namespace DB diff --git a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp index 06b053c33f1..44800d2a53c 100644 --- a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp +++ b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -29,10 +28,7 @@ #include #include #include -#include -#include -#include #include #include diff --git a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h index 6a844625030..8a3f1c39f12 100644 --- a/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h +++ b/dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h @@ -34,8 +34,6 @@ namespace DB struct Settings; -using SyncPagePacketWriter = grpc::ServerWriter; - class WNFetchPagesStreamWriter; using WNFetchPagesStreamWriterPtr = std::unique_ptr; diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp index 80d57f74548..6f34f5302b8 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp @@ -72,6 +72,15 @@ bool DisaggReadSnapshot::empty() const return true; } +SegmentReadTasks DisaggReadSnapshot::releaseNoNeedFetchTasks() +{ + SegmentReadTasks to_release_tasks; + std::unique_lock lock(mtx); + for (auto & [table_id, table_snap] : table_snapshots) + table_snap->releaseNoNeedFetchTasks(to_release_tasks); + return to_release_tasks; +} + DisaggPhysicalTableReadSnapshot::DisaggPhysicalTableReadSnapshot( KeyspaceTableID ks_table_id_, SegmentReadTasks && tasks_) @@ -95,4 +104,20 @@ SegmentReadTaskPtr DisaggPhysicalTableReadSnapshot::popTask(const UInt64 segment return nullptr; } +void DisaggPhysicalTableReadSnapshot::releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks) +{ + std::unique_lock lock(mtx); + for (auto itr = tasks.begin(); itr != tasks.end();) + { + if (itr->second->hasColumnFileToFetch()) + { + ++itr; + } + else + { + to_release_tasks.push_back(itr->second); + itr = tasks.erase(itr); + } + } +} } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index 539bc986497..86ac2a7e62f 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -81,6 +81,8 @@ class DisaggReadSnapshot bool empty() const; + SegmentReadTasks releaseNoNeedFetchTasks(); + DISALLOW_COPY(DisaggReadSnapshot); private: @@ -104,6 +106,8 @@ class DisaggPhysicalTableReadSnapshot return tasks.empty(); } + void releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks); + DISALLOW_COPY(DisaggPhysicalTableReadSnapshot); public: diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index a735648e8b4..274a83554d1 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -329,6 +329,7 @@ void SegmentReadTask::fetchPages() { return; } + // Don't need to fetch ColumnFileTiny and ColumnFileInMemory. if (extra_remote_info->remote_page_ids.empty() && !needFetchMemTableSet()) { LOG_DEBUG(read_snapshot->log, "Neither ColumnFileTiny or ColumnFileInMemory need to be fetched from WN."); @@ -337,7 +338,6 @@ void SegmentReadTask::fetchPages() Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; SCOPE_EXIT({ - // This metric is per-segment. GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_fetch_page) .Observe(watch_work.elapsedSeconds()); }); @@ -516,6 +516,10 @@ void SegmentReadTask::checkMemTableSetReady() const bool SegmentReadTask::needFetchMemTableSet() const { // Check if any object of ColumnFileInMemory does not contain data. + // In v7.5, data of ColumnFileInMemory will be returned by EstablishDisaggTask, + // so cf_in_mem->getCache() will NOT be null after deserializeSegment. + // Since 8.1, data of ColumnFileInMemory need to fetch by calling `FetchDisaggPages`, + // cf_in_mem->getCache() will be null after deserializeSegment. for (const auto & cf : read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles()) { if (auto * cf_in_mem = cf->tryToInMemoryFile(); cf_in_mem) @@ -542,10 +546,7 @@ static void checkPageID( void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest & request) { - // No page and memtable need to be fetched. - if (request.page_ids_size() == 0 && !needFetchMemTableSet()) - return; - + // No matter all delta data is cached or not, call FetchDisaggPages to release snapshot in WN. const auto * cluster = dm_context->global_context.getTMTContext().getKVCluster(); pingcap::kv::RpcCall rpc( cluster->rpc_client, @@ -564,6 +565,13 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest } }); + // All delta data is cached. + if (request.page_ids_size() == 0 && !needFetchMemTableSet()) + { + finishPagesPacketStream(stream_resp); + return; + } + doFetchPagesImpl( [&stream_resp, this](disaggregated::PagesPacket & packet) { if (stream_resp->Read(&packet)) @@ -572,15 +580,7 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest } else { - auto status = stream_resp->Finish(); - stream_resp.reset(); // Reset to avoid calling `Finish()` repeatedly. - RUNTIME_CHECK_MSG( - status.ok(), - "Failed to fetch all pages for {}, status={}, message={}, wn_address={}", - *this, - static_cast(status.error_code()), - status.error_message(), - extra_remote_info->store_address); + finishPagesPacketStream(stream_resp); return false; } }, @@ -770,4 +770,33 @@ GlobalSegmentID SegmentReadTask::getGlobalSegmentID() const }; } +void SegmentReadTask::finishPagesPacketStream( + std::unique_ptr> & stream_resp) +{ + if unlikely (stream_resp == nullptr) + return; + + auto status = stream_resp->Finish(); + stream_resp.reset(); // Reset to avoid calling `Finish()` repeatedly. + RUNTIME_CHECK_MSG( + status.ok(), + "Failed to fetch all pages for {}, status={}, message={}, wn_address={}", + *this, + static_cast(status.error_code()), + status.error_message(), + extra_remote_info->store_address); +} + +bool SegmentReadTask::hasColumnFileToFetch() const +{ + auto need_to_fetch = [](const ColumnFilePtr & cf) { + // Only ColumnFileMemory and ColumnFileTiny need too fetch. + // ColumnFileDeleteRange and ColumnFileBig do not need to fetch. + return cf->isInMemoryFile() || cf->isTinyFile(); + }; + const auto & mem_cfs = read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles(); + const auto & persisted_cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles(); + return std::any_of(mem_cfs.cbegin(), mem_cfs.cend(), need_to_fetch) + || std::any_of(persisted_cfs.cbegin(), persisted_cfs.cend(), need_to_fetch); +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index 28621409d05..c24d52a782f 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -111,6 +111,9 @@ struct SegmentReadTask return input_stream; } + // WN calls hasColumnFileToFetch to check whether a SegmentReadTask need to fetch column files from it + bool hasColumnFileToFetch() const; + String toString() const; private: @@ -146,6 +149,8 @@ struct SegmentReadTask ReadMode read_mode, size_t expected_block_size); + void finishPagesPacketStream(std::unique_ptr> & stream); + BlockInputStreamPtr input_stream; friend tests::SegmentReadTaskTest; @@ -166,7 +171,7 @@ struct fmt::formatter auto format(const DB::DM::SegmentReadTask & t, FormatContext & ctx) const { return fmt::format_to(ctx.out(), "{}", t.toString()); - }; + } }; template <>