Skip to content

Commit

Permalink
Disagg: Fix the issue that lifetime of read-snapshots in WN may be lo…
Browse files Browse the repository at this point in the history
…nger than expected (pingcap#268)

Co-authored-by: Ti Chi Robot <[email protected]>
Co-authored-by: JaySon <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2024
1 parent fd5d988 commit 0b1f21e
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 7 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes
snap->iterateTableSnapshots([&](const DM::Remote::DisaggPhysicalTableReadSnapshotPtr & snap) {
response->add_tables(Serializer::serializeTo(snap, task_id, mem_tracker_wrapper).SerializeAsString());
});

// Release SegmentReadTasks that do not have ColumnFileTiny and ColumnFileInMemory
// because these tasks will never call FetchDisaggPages.
auto to_release_tasks = snap->releaseNoNeedFetchTasks();
if (!to_release_tasks.empty())
LOG_INFO(log, "Release no need fetch tasks: count={} segments={}", to_release_tasks.size(), to_release_tasks);
snaps->unregisterSnapshotIfEmpty(task_id);
}

} // namespace DB
3 changes: 0 additions & 3 deletions dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Disaggregated/WNFetchPagesStreamWriter.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <Interpreters/SharedContexts/Disagg.h>
Expand All @@ -27,8 +26,6 @@
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/Page/PageUtil.h>
#include <common/logger_useful.h>
#include <kvproto/mpp.pb.h>
#include <tipb/expression.pb.h>

#include <memory>

Expand Down
25 changes: 25 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ bool DisaggReadSnapshot::empty() const
return true;
}

SegmentReadTasks DisaggReadSnapshot::releaseNoNeedFetchTasks()
{
SegmentReadTasks to_release_tasks;
std::unique_lock lock(mtx);
for (auto & [table_id, table_snap] : table_snapshots)
table_snap->releaseNoNeedFetchTasks(to_release_tasks);
return to_release_tasks;
}

DisaggPhysicalTableReadSnapshot::DisaggPhysicalTableReadSnapshot(
KeyspaceTableID ks_table_id_,
ColumnID pk_col_id_,
Expand All @@ -97,4 +106,20 @@ SegmentReadTaskPtr DisaggPhysicalTableReadSnapshot::popTask(const UInt64 segment
return nullptr;
}

void DisaggPhysicalTableReadSnapshot::releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks)
{
std::unique_lock lock(mtx);
for (auto itr = tasks.begin(); itr != tasks.end();)
{
if (itr->second->hasColumnFileToFetch())
{
++itr;
}
else
{
to_release_tasks.push_back(itr->second);
itr = tasks.erase(itr);
}
}
}
} // namespace DB::DM::Remote
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class DisaggReadSnapshot

bool empty() const;

SegmentReadTasks releaseNoNeedFetchTasks();

DISALLOW_COPY(DisaggReadSnapshot);

private:
Expand All @@ -104,6 +106,8 @@ class DisaggPhysicalTableReadSnapshot
return tasks.empty();
}

void releaseNoNeedFetchTasks(SegmentReadTasks & to_release_tasks);

DISALLOW_COPY(DisaggPhysicalTableReadSnapshot);

public:
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,7 @@ void RNWorkerFetchPages::doFetchPages(
const RNReadSegmentTaskPtr & seg_task,
const disaggregated::FetchDisaggPagesRequest & request)
{
// No page need to be fetched.
if (request.page_ids_size() == 0)
return;

// No matter all delta data is cached or not, call FetchDisaggPages to release snapshot in WN.
Stopwatch sw_total;
Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE};
bool rpc_is_observed = false;
Expand All @@ -210,6 +207,10 @@ void RNWorkerFetchPages::doFetchPages(
}
});

// All delta data is cached.
if (request.page_ids_size() == 0)
return;

// Used to verify all pages are fetched.
std::set<UInt64> remaining_pages_to_fetch;
for (auto p : request.page_ids())
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas
return result_tasks;
}

bool SegmentReadTask::hasColumnFileToFetch() const
{
auto need_to_fetch = [](const ColumnFilePtr & cf) {
// In v7.5, Only ColumnFileTiny need too fetch.
return cf->isTinyFile();
};
const auto & mem_cfs = read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles();
const auto & persisted_cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles();
return std::any_of(mem_cfs.cbegin(), mem_cfs.cend(), need_to_fetch)
|| std::any_of(persisted_cfs.cbegin(), persisted_cfs.cend(), need_to_fetch);
}

SegmentReadTasksWrapper::SegmentReadTasksWrapper(bool enable_read_thread_, SegmentReadTasks && ordered_tasks_)
: enable_read_thread(enable_read_thread_)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ struct SegmentReadTask
void mergeRanges() { ranges = DM::tryMergeRanges(std::move(ranges), 1); }

static SegmentReadTasks trySplitReadTasks(const SegmentReadTasks & tasks, size_t expected_size);

// WN calls hasColumnFileToFetch to check whether a SegmentReadTask need to fetch column files from it
bool hasColumnFileToFetch() const;
};

class BlockStat
Expand Down

0 comments on commit 0b1f21e

Please sign in to comment.