Skip to content

Commit

Permalink
Disagg: Fix the problem of missing remote page id in MemTableSet. (#8437
Browse files Browse the repository at this point in the history
)

close #8436
  • Loading branch information
JinheLin authored Nov 29, 2023
1 parent 79a71b3 commit 73e427f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 14 deletions.
48 changes: 34 additions & 14 deletions dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,31 @@ SegmentReadTask::SegmentReadTask(
ranges.push_back(RowKeyRange::deserialize(rb));
}

const auto & cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles();
std::vector<UInt64> remote_page_ids;
std::vector<size_t> remote_page_sizes;
remote_page_ids.reserve(cfs.size());
remote_page_sizes.reserve(cfs.size());
for (const auto & cf : cfs)
{
if (auto * tiny = cf->tryToTinyFile(); tiny)
// The number of ColumnFileTiny of MemTableSet is unknown, but there is a very high probability that it is zero.
// So ignoring the number of ColumnFileTiny of MemTableSet is better than always adding all the number of ColumnFile of MemTableSet when reserving.
const auto & cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles();
remote_page_ids.reserve(cfs.size());
remote_page_sizes.reserve(cfs.size());
}
auto extract_remote_pages = [&remote_page_ids, &remote_page_sizes](const ColumnFiles & cfs) {
UInt64 count = 0;
for (const auto & cf : cfs)
{
remote_page_ids.emplace_back(tiny->getDataPageId());
remote_page_sizes.emplace_back(tiny->getDataPageSize());
if (auto * tiny = cf->tryToTinyFile(); tiny)
{
remote_page_ids.emplace_back(tiny->getDataPageId());
remote_page_sizes.emplace_back(tiny->getDataPageSize());
++count;
}
}
}
return count;
};
auto memory_page_count = extract_remote_pages(read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles());
auto persisted_page_count
= extract_remote_pages(read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles());

extra_remote_info.emplace(ExtraRemoteSegmentInfo{
.store_address = store_address,
Expand All @@ -133,9 +145,12 @@ SegmentReadTask::SegmentReadTask(

LOG_DEBUG(
read_snapshot->log,
"memtable_cfs_count={} persisted_cfs_count={} remote_page_ids={} delta_index={} store_address={}",
"memory_cfs_count={} memory_page_count={} persisted_cfs_count={} persisted_page_count={} remote_page_ids={} "
"delta_index={} store_address={}",
read_snapshot->delta->getMemTableSetSnapshot()->getColumnFileCount(),
cfs.size(),
memory_page_count,
read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFileCount(),
persisted_page_count,
extra_remote_info->remote_page_ids,
read_snapshot->delta->getSharedDeltaIndex()->toString(),
store_address);
Expand Down Expand Up @@ -207,16 +222,21 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas

void SegmentReadTask::initColumnFileDataProvider(const Remote::RNLocalPageCacheGuardPtr & pages_guard)
{
auto & data_provider = read_snapshot->delta->getPersistedFileSetSnapshot()->data_provider;
RUNTIME_CHECK(std::dynamic_pointer_cast<ColumnFileDataProviderNop>(data_provider));

RUNTIME_CHECK(extra_remote_info.has_value());
auto page_cache = dm_context->global_context.getSharedContextDisagg()->rn_page_cache;
data_provider = std::make_shared<Remote::ColumnFileDataProviderRNLocalPageCache>(
auto page_data_provider = std::make_shared<Remote::ColumnFileDataProviderRNLocalPageCache>(
page_cache,
pages_guard,
store_id,
KeyspaceTableID{dm_context->keyspace_id, dm_context->physical_table_id});

auto & persisted_cf_set_data_provider = read_snapshot->delta->getPersistedFileSetSnapshot()->data_provider;
RUNTIME_CHECK(std::dynamic_pointer_cast<ColumnFileDataProviderNop>(persisted_cf_set_data_provider));
persisted_cf_set_data_provider = page_data_provider;

auto & memory_cf_set_data_provider = read_snapshot->delta->getMemTableSetSnapshot()->data_provider;
RUNTIME_CHECK(std::dynamic_pointer_cast<ColumnFileDataProviderNop>(memory_cf_set_data_provider));
memory_cf_set_data_provider = page_data_provider;
}


Expand Down
50 changes: 50 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,54 @@ try
}
CATCH

TEST_F(DeltaMergeStoreTest, MemTableSetWithCFTiny)
try
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
store = reload(table_column_defines);

{
auto block = DMTestEnv::prepareSimpleWriteBlock(
0,
db_context->getSettingsRef().dt_segment_delta_cache_limit_rows,
false);
store->write(*db_context, db_context->getSettingsRef(), block);
}

auto scan_context = std::make_shared<ScanContext>();
auto snap = store->writeNodeBuildRemoteReadSnapshot(
*db_context,
db_context->getSettingsRef(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
1,
"req_id",
{},
scan_context);

snap->column_defines = std::make_shared<ColumnDefines>(store->getTableColumns());

MemTrackerWrapper mem_tracker_wrapper(nullptr);
auto remote_table_pb = Remote::Serializer::serializePhysicalTable(snap, /*task_id*/ {}, mem_tracker_wrapper);
ASSERT_EQ(remote_table_pb.segments_size(), 1);

db_context->getSharedContextDisagg()->remote_data_store
= std::make_shared<DM::Remote::DataStoreMock>(db_context->getFileProvider());
const auto & remote_seg = remote_table_pb.segments(0);
auto seg_task = std::make_shared<SegmentReadTask>(
Logger::get(),
*db_context,
scan_context,
remote_seg,
DisaggTaskId{},
/*store_id*/ 1,
/*store_address*/ "127.0.0.1",
store->keyspace_id,
store->physical_table_id);
const auto & cfs = seg_task->read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles();
ASSERT_EQ(cfs.size(), 1);
const auto & cf = cfs.front();
ASSERT_NE(cf->tryToTinyFile(), nullptr);
ASSERT_EQ(seg_task->extra_remote_info->remote_page_ids.size(), 1);
}
CATCH
} // namespace DB::DM::tests

0 comments on commit 73e427f

Please sign in to comment.