From 73e427fe1384cf94088d046d3089af4c4d061b43 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Wed, 29 Nov 2023 18:50:48 +0800 Subject: [PATCH] Disagg: Fix the problem of missing remote page id in MemTableSet. (#8437) close pingcap/tiflash#8436 --- .../Storages/DeltaMerge/SegmentReadTask.cpp | 48 ++++++++++++------ .../tests/gtest_segment_read_task.cpp | 50 +++++++++++++++++++ 2 files changed, 84 insertions(+), 14 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index f950b426d71..e72725cec52 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -110,19 +110,31 @@ SegmentReadTask::SegmentReadTask( ranges.push_back(RowKeyRange::deserialize(rb)); } - const auto & cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles(); std::vector remote_page_ids; std::vector remote_page_sizes; - remote_page_ids.reserve(cfs.size()); - remote_page_sizes.reserve(cfs.size()); - for (const auto & cf : cfs) { - if (auto * tiny = cf->tryToTinyFile(); tiny) + // The number of ColumnFileTiny of MemTableSet is unknown, but there is a very high probability that it is zero. + // So ignoring the number of ColumnFileTiny of MemTableSet is better than always adding all the number of ColumnFile of MemTableSet when reserving. + const auto & cfs = read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles(); + remote_page_ids.reserve(cfs.size()); + remote_page_sizes.reserve(cfs.size()); + } + auto extract_remote_pages = [&remote_page_ids, &remote_page_sizes](const ColumnFiles & cfs) { + UInt64 count = 0; + for (const auto & cf : cfs) { - remote_page_ids.emplace_back(tiny->getDataPageId()); - remote_page_sizes.emplace_back(tiny->getDataPageSize()); + if (auto * tiny = cf->tryToTinyFile(); tiny) + { + remote_page_ids.emplace_back(tiny->getDataPageId()); + remote_page_sizes.emplace_back(tiny->getDataPageSize()); + ++count; + } } - } + return count; + }; + auto memory_page_count = extract_remote_pages(read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles()); + auto persisted_page_count + = extract_remote_pages(read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFiles()); extra_remote_info.emplace(ExtraRemoteSegmentInfo{ .store_address = store_address, @@ -133,9 +145,12 @@ SegmentReadTask::SegmentReadTask( LOG_DEBUG( read_snapshot->log, - "memtable_cfs_count={} persisted_cfs_count={} remote_page_ids={} delta_index={} store_address={}", + "memory_cfs_count={} memory_page_count={} persisted_cfs_count={} persisted_page_count={} remote_page_ids={} " + "delta_index={} store_address={}", read_snapshot->delta->getMemTableSetSnapshot()->getColumnFileCount(), - cfs.size(), + memory_page_count, + read_snapshot->delta->getPersistedFileSetSnapshot()->getColumnFileCount(), + persisted_page_count, extra_remote_info->remote_page_ids, read_snapshot->delta->getSharedDeltaIndex()->toString(), store_address); @@ -207,16 +222,21 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas void SegmentReadTask::initColumnFileDataProvider(const Remote::RNLocalPageCacheGuardPtr & pages_guard) { - auto & data_provider = read_snapshot->delta->getPersistedFileSetSnapshot()->data_provider; - RUNTIME_CHECK(std::dynamic_pointer_cast(data_provider)); - RUNTIME_CHECK(extra_remote_info.has_value()); auto page_cache = dm_context->global_context.getSharedContextDisagg()->rn_page_cache; - data_provider = std::make_shared( + auto page_data_provider = std::make_shared( page_cache, pages_guard, store_id, KeyspaceTableID{dm_context->keyspace_id, dm_context->physical_table_id}); + + auto & persisted_cf_set_data_provider = read_snapshot->delta->getPersistedFileSetSnapshot()->data_provider; + RUNTIME_CHECK(std::dynamic_pointer_cast(persisted_cf_set_data_provider)); + persisted_cf_set_data_provider = page_data_provider; + + auto & memory_cf_set_data_provider = read_snapshot->delta->getMemTableSetSnapshot()->data_provider; + RUNTIME_CHECK(std::dynamic_pointer_cast(memory_cf_set_data_provider)); + memory_cf_set_data_provider = page_data_provider; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp index 2106fc7d02e..a229ee107a2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp @@ -373,4 +373,54 @@ try } CATCH +TEST_F(DeltaMergeStoreTest, MemTableSetWithCFTiny) +try +{ + auto table_column_defines = DMTestEnv::getDefaultColumns(); + store = reload(table_column_defines); + + { + auto block = DMTestEnv::prepareSimpleWriteBlock( + 0, + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows, + false); + store->write(*db_context, db_context->getSettingsRef(), block); + } + + auto scan_context = std::make_shared(); + auto snap = store->writeNodeBuildRemoteReadSnapshot( + *db_context, + db_context->getSettingsRef(), + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + 1, + "req_id", + {}, + scan_context); + + snap->column_defines = std::make_shared(store->getTableColumns()); + + MemTrackerWrapper mem_tracker_wrapper(nullptr); + auto remote_table_pb = Remote::Serializer::serializePhysicalTable(snap, /*task_id*/ {}, mem_tracker_wrapper); + ASSERT_EQ(remote_table_pb.segments_size(), 1); + + db_context->getSharedContextDisagg()->remote_data_store + = std::make_shared(db_context->getFileProvider()); + const auto & remote_seg = remote_table_pb.segments(0); + auto seg_task = std::make_shared( + Logger::get(), + *db_context, + scan_context, + remote_seg, + DisaggTaskId{}, + /*store_id*/ 1, + /*store_address*/ "127.0.0.1", + store->keyspace_id, + store->physical_table_id); + const auto & cfs = seg_task->read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles(); + ASSERT_EQ(cfs.size(), 1); + const auto & cf = cfs.front(); + ASSERT_NE(cf->tryToTinyFile(), nullptr); + ASSERT_EQ(seg_task->extra_remote_info->remote_page_ids.size(), 1); +} +CATCH } // namespace DB::DM::tests