From 19cecc13621471b2aeabc80d0f94289c0c05726f Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Thu, 19 Sep 2024 18:26:34 +0800 Subject: [PATCH] storage: Support adding vector index in background (#9411) ref pingcap/tiflash#9032 storage: Support adding vector index in background Signed-off-by: Lloyd-Pottiger --- dbms/src/Common/CurrentMetrics.cpp | 1 + dbms/src/Interpreters/Context.cpp | 23 + dbms/src/Interpreters/Context.h | 4 + dbms/src/Server/Server.cpp | 33 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 17 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 69 ++- .../DeltaMerge/DeltaMergeStore_InternalBg.cpp | 2 +- .../DeltaMergeStore_InternalSegment.cpp | 340 ++++++++++- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 17 +- dbms/src/Storages/DeltaMerge/File/DMFile.h | 8 +- .../DeltaMerge/File/DMFileBlockInputStream.h | 2 +- .../DeltaMerge/File/DMFileIndexWriter.cpp | 233 +++++++ .../DeltaMerge/File/DMFileIndexWriter.h | 77 +++ .../File/DMFileV3IncrementWriter.cpp | 13 +- .../DeltaMerge/File/DMFileV3IncrementWriter.h | 3 - .../Storages/DeltaMerge/File/DMFileWriter.cpp | 42 +- .../Storages/DeltaMerge/File/DMFileWriter.h | 8 +- .../DeltaMerge/Index/VectorIndexCache.h | 2 +- .../DeltaMerge/LocalIndexerScheduler.h | 5 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 87 ++- dbms/src/Storages/DeltaMerge/Segment.h | 4 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 7 +- .../tests/gtest_dm_delta_merge_store.cpp | 2 + ...est_dm_delta_merge_store_fast_add_peer.cpp | 1 + .../gtest_dm_delta_merge_store_test_basic.h | 2 + ...test_dm_delta_merge_store_vector_index.cpp | 569 ++++++++++++++++++ .../tests/gtest_dm_meta_version.cpp | 6 +- .../tests/gtest_dm_minmax_index.cpp | 3 +- .../tests/gtest_dm_simple_pk_test_basic.cpp | 1 + .../tests/gtest_dm_vector_index.cpp | 147 +++-- .../tests/gtest_dm_vector_index_utils.h | 94 +++ .../gtest_segment_replace_stable_data.cpp | 39 -- .../tests/gtest_segment_test_basic.cpp | 31 + .../tests/gtest_segment_test_basic.h | 5 + .../DeltaMerge/tests/gtest_segment_util.h | 2 + .../DeltaMerge/workload/DTWorkload.cpp | 1 + dbms/src/Storages/FormatVersion.h | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 37 ++ .../System/StorageSystemDTLocalIndexes.cpp | 3 + .../System/StorageSystemDTSegments.cpp | 2 + .../Storages/System/StorageSystemDTTables.cpp | 2 + dbms/src/TestUtils/TiFlashTestEnv.cpp | 2 + dbms/src/TiDB/Schema/TiDB.cpp | 7 + dbms/src/TiDB/Schema/TiDB.h | 2 + 44 files changed, 1720 insertions(+), 237 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 23b3c83215c..d2786a934c8 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -57,6 +57,7 @@ M(DT_SnapshotOfReadRaw) \ M(DT_SnapshotOfSegmentSplit) \ M(DT_SnapshotOfSegmentMerge) \ + M(DT_SnapshotOfSegmentIngestIndex) \ M(DT_SnapshotOfSegmentIngest) \ M(DT_SnapshotOfDeltaMerge) \ M(DT_SnapshotOfDeltaCompact) \ diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 618e0cbe7ce..95edc01b2bd 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #include #include #include @@ -171,6 +172,7 @@ struct ContextShared PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3; DM::GlobalPageIdAllocatorPtr global_page_id_allocator; DM::GlobalStoragePoolPtr global_storage_pool; + DM::LocalIndexerSchedulerPtr global_local_indexer_scheduler; /// The PS instance available on Write Node. UniversalPageStorageServicePtr ps_write; @@ -1750,6 +1752,27 @@ DM::GlobalPageIdAllocatorPtr Context::getGlobalPageIdAllocator() const return shared->global_page_id_allocator; } +bool Context::initializeGlobalLocalIndexerScheduler(size_t pool_size, size_t memory_limit) +{ + auto lock = getLock(); + if (!shared->global_local_indexer_scheduler) + { + shared->global_local_indexer_scheduler + = std::make_shared(DM::LocalIndexerScheduler::Options{ + .pool_size = pool_size, + .memory_limit = memory_limit, + .auto_start = true, + }); + } + return true; +} + +DM::LocalIndexerSchedulerPtr Context::getGlobalLocalIndexerScheduler() const +{ + auto lock = getLock(); + return shared->global_local_indexer_scheduler; +} + bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool) { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 1bc9a753c68..257c92ed0b8 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -456,6 +457,9 @@ class Context bool initializeGlobalPageIdAllocator(); DM::GlobalPageIdAllocatorPtr getGlobalPageIdAllocator() const; + bool initializeGlobalLocalIndexerScheduler(size_t pool_size, size_t memory_limit); + DM::LocalIndexerSchedulerPtr getGlobalLocalIndexerScheduler() const; + bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool); DM::GlobalStoragePoolPtr getGlobalStoragePool() const; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index c42bf1de1be..fbcbac0c33e 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -987,12 +987,14 @@ int Server::main(const std::vector & /*args*/) if (storage_config.format_version != 0) { - if (storage_config.s3_config.isS3Enabled() && storage_config.format_version != STORAGE_FORMAT_V100.identifier) + if (storage_config.s3_config.isS3Enabled() && storage_config.format_version != STORAGE_FORMAT_V100.identifier + && storage_config.format_version != STORAGE_FORMAT_V101.identifier + && storage_config.format_version != STORAGE_FORMAT_V102.identifier) { - LOG_WARNING(log, "'storage.format_version' must be set to 100 when S3 is enabled!"); + LOG_WARNING(log, "'storage.format_version' must be set to 100/101/102 when S3 is enabled!"); throw Exception( ErrorCodes::INVALID_CONFIG_PARAMETER, - "'storage.format_version' must be set to 100 when S3 is enabled!"); + "'storage.format_version' must be set to 100/101/102 when S3 is enabled!"); } setStorageFormat(storage_config.format_version); LOG_INFO(log, "Using format_version={} (explicit storage format detected).", storage_config.format_version); @@ -1003,8 +1005,8 @@ int Server::main(const std::vector & /*args*/) { // If the user does not explicitly set format_version in the config file but // enables S3, then we set up a proper format version to support S3. - setStorageFormat(STORAGE_FORMAT_V100.identifier); - LOG_INFO(log, "Using format_version={} (infer by S3 is enabled).", STORAGE_FORMAT_V100.identifier); + setStorageFormat(STORAGE_FORMAT_V102.identifier); + LOG_INFO(log, "Using format_version={} (infer by S3 is enabled).", STORAGE_FORMAT_V102.identifier); } else { @@ -1325,6 +1327,27 @@ int Server::main(const std::vector & /*args*/) settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity), settings.bytes_that_rss_larger_than_limit); + if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) + { + // No need to have local index scheduler. + } + else if (global_context->getSharedContextDisagg()->isDisaggregatedStorageMode()) + { + // There is no compute task in write node. + // Set the pool size to 80% of logical cores and 60% of memory + // to take full advantage of the resources and avoid blocking other tasks like writes and compactions. + global_context->initializeGlobalLocalIndexerScheduler( + std::max(1, server_info.cpu_info.logical_cores * 8 / 10), // at least 1 thread + std::max(256 * 1024 * 1024ULL, server_info.memory_info.capacity * 6 / 10)); // at least 256MB + } + else + { + // There could be compute tasks, reserve more memory for computes. + global_context->initializeGlobalLocalIndexerScheduler( + std::max(1, server_info.cpu_info.logical_cores * 4 / 10), // at least 1 thread + std::max(256 * 1024 * 1024ULL, server_info.memory_info.capacity * 4 / 10)); // at least 256MB + } + /// PageStorage run mode has been determined above global_context->initializeGlobalPageIdAllocator(); if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 4431c6797f6..3966121b6f5 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -62,6 +63,7 @@ #include #include + namespace ProfileEvents { extern const Event DMWriteBlock; @@ -216,6 +218,7 @@ DeltaMergeStore::DeltaMergeStore( const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, + IndexInfosPtr local_index_infos_, const Settings & settings_, ThreadPool * thread_pool) : global_context(db_context.getGlobalContext()) @@ -230,6 +233,7 @@ DeltaMergeStore::DeltaMergeStore( , background_pool(db_context.getBackgroundPool()) , blockable_background_pool(db_context.getBlockableBackgroundPool()) , next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY) + , local_index_infos(std::move(local_index_infos_)) , log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_))) { { @@ -332,6 +336,7 @@ DeltaMergeStorePtr DeltaMergeStore::create( const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, + IndexInfosPtr local_index_infos_, const Settings & settings_, ThreadPool * thread_pool) { @@ -347,9 +352,11 @@ DeltaMergeStorePtr DeltaMergeStore::create( handle, is_common_handle_, rowkey_column_size_, + local_index_infos_, settings_, thread_pool); std::shared_ptr store_shared_ptr(store); + store_shared_ptr->checkAllSegmentsLocalIndex(); return store_shared_ptr; } @@ -365,6 +372,7 @@ std::unique_ptr DeltaMergeStore::createUnique( const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, + IndexInfosPtr local_index_infos_, const Settings & settings_, ThreadPool * thread_pool) { @@ -380,9 +388,11 @@ std::unique_ptr DeltaMergeStore::createUnique( handle, is_common_handle_, rowkey_column_size_, + local_index_infos_, settings_, thread_pool); std::unique_ptr store_unique_ptr(store); + store_unique_ptr->checkAllSegmentsLocalIndex(); return store_unique_ptr; } @@ -504,6 +514,11 @@ void DeltaMergeStore::shutdown() return; LOG_TRACE(log, "Shutdown DeltaMerge start"); + + auto indexer_scheulder = global_context.getGlobalLocalIndexerScheduler(); + RUNTIME_CHECK(indexer_scheulder != nullptr); + indexer_scheulder->dropTasks(keyspace_id, physical_table_id); + // Must shutdown storage path pool to make sure the DMFile remove callbacks // won't remove dmfiles unexpectly. path_pool->shutdown(); @@ -2032,10 +2047,10 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info) original_table_columns.swap(new_original_table_columns); store_columns.swap(new_store_columns); + // TODO(local index): There could be some local indexes added/dropped after DDL std::atomic_store(&original_table_header, std::make_shared(toEmptyBlock(original_table_columns))); } - SortDescription DeltaMergeStore::getPrimarySortDescription() const { std::shared_lock lock(read_write_mutex); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 30f5a55a54a..8af9fdb525e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -187,10 +187,13 @@ struct LocalIndexStats }; using LocalIndexesStats = std::vector; + class DeltaMergeStore; using DeltaMergeStorePtr = std::shared_ptr; -class DeltaMergeStore : private boost::noncopyable +class DeltaMergeStore + : private boost::noncopyable + , public std::enable_shared_from_this { public: friend class ::DB::DM::tests::DeltaMergeStoreTest; @@ -292,6 +295,7 @@ class DeltaMergeStore : private boost::noncopyable const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, + IndexInfosPtr local_index_infos_, const Settings & settings_ = EMPTY_SETTINGS, ThreadPool * thread_pool = nullptr); @@ -308,6 +312,7 @@ class DeltaMergeStore : private boost::noncopyable const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, + IndexInfosPtr local_index_infos_, const Settings & settings_ = EMPTY_SETTINGS, ThreadPool * thread_pool = nullptr); @@ -323,6 +328,7 @@ class DeltaMergeStore : private boost::noncopyable const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, + IndexInfosPtr local_index_infos_, const Settings & settings_ = EMPTY_SETTINGS, ThreadPool * thread_pool = nullptr); @@ -726,6 +732,12 @@ class DeltaMergeStore : private boost::noncopyable MergeDeltaReason reason, SegmentSnapshotPtr segment_snap = nullptr); + void segmentEnsureStableIndex( + DMContext & dm_context, + const IndexInfosPtr & index_info, + const DMFiles & dm_files, + const String & source_segment_info); + /** * Ingest a DMFile into the segment, optionally causing a new segment being created. * @@ -854,11 +866,45 @@ class DeltaMergeStore : private boost::noncopyable const SegmentPtr & segment, ThreadType thread_type, InputType input_type); + + /** + * Segment update meta with new DMFiles. A lock must be provided, so that it is + * possible to update the meta for multiple segments all at once. + */ + SegmentPtr segmentUpdateMeta( + std::unique_lock & read_write_lock, + DMContext & dm_context, + const SegmentPtr & segment, + const DMFiles & new_dm_files); + + /** + * Check whether there are new local indexes should be built for all segments. + */ + void checkAllSegmentsLocalIndex(); + + /** + * Ensure the segment has stable index. + * If the segment has no stable index, it will be built in background. + * Note: This function can not be called in constructor, since shared_from_this() is not available. + * + * @returns true if index is missing and a build task is added in background. + */ + bool segmentEnsureStableIndexAsync(const SegmentPtr & segment); + #ifndef DBMS_PUBLIC_GTEST private: #else public: #endif + /** + * Wait until the segment has stable index. + * If the index is ready or no need to build, it will return immediately. + * Only used for testing. + * + * @returns false if index is still missing after wait timed out. + */ + bool segmentWaitStableIndexReady(const SegmentPtr & segment) const; + void dropAllSegments(bool keep_first_segment); String getLogTracingId(const DMContext & dm_ctx); // Returns segment that contains start_key and whether 'segments' is empty. @@ -916,13 +962,30 @@ class DeltaMergeStore : private boost::noncopyable // of resources to build, so they will be built in separated background pool. IndexInfosPtr local_index_infos; + struct DMFileIDToSegmentIDs + { + public: + using Key = PageIdU64; // dmfile_id + using Value = std::unordered_set; // segment_ids + + void remove(const SegmentPtr & segment); + + void add(const SegmentPtr & segment); + + const Value & get(PageIdU64 dmfile_id) const; + + private: + std::unordered_map u_map; + }; + // dmfile_id -> segment_ids + // This map is not protected by lock, should be accessed under read_write_mutex. + DMFileIDToSegmentIDs dmfile_id_to_segment_ids; + // Synchronize between write threads and read threads. mutable std::shared_mutex read_write_mutex; LoggerPtr log; }; -using DeltaMergeStorePtr = std::shared_ptr; - } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 18c7b4df6ca..6e46b409826 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -390,7 +390,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) // Foreground task don't get GC safe point from remote, but we better make it as up to date as possible. if (updateGCSafePoint()) { - /// Note that `task.dm_context->db_context` will be free after query is finish. We should not use that in background task. + /// Note that `task.dm_context->global_context` will be free after query is finish. We should not use that in background task. task.dm_context->min_version = latest_gc_safe_point.load(std::memory_order_relaxed); LOG_DEBUG(log, "Task {} GC safe point: {}", magic_enum::enum_name(task.type), task.dm_context->min_version); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 8ddb6108fdc..00d3f012b8a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -14,13 +14,17 @@ #include #include +#include #include #include +#include +#include #include #include #include + namespace CurrentMetrics { extern const Metric DT_DeltaMerge; @@ -32,15 +36,49 @@ extern const Metric DT_SnapshotOfSegmentSplit; extern const Metric DT_SnapshotOfSegmentMerge; extern const Metric DT_SnapshotOfDeltaMerge; extern const Metric DT_SnapshotOfSegmentIngest; +extern const Metric DT_SnapshotOfSegmentIngestIndex; } // namespace CurrentMetrics namespace DB::DM { +void DeltaMergeStore::DMFileIDToSegmentIDs::remove(const SegmentPtr & segment) +{ + RUNTIME_CHECK(segment != nullptr); + for (const auto & dmfile : segment->getStable()->getDMFiles()) + { + if (auto it = u_map.find(dmfile->fileId()); it != u_map.end()) + { + it->second.erase(segment->segmentId()); + } + } +} + +void DeltaMergeStore::DMFileIDToSegmentIDs::add(const SegmentPtr & segment) +{ + RUNTIME_CHECK(segment != nullptr); + for (const auto & dmfile : segment->getStable()->getDMFiles()) + { + u_map[dmfile->fileId()].insert(segment->segmentId()); + } +} + +const DeltaMergeStore::DMFileIDToSegmentIDs::Value & DeltaMergeStore::DMFileIDToSegmentIDs::get( + PageIdU64 dmfile_id) const +{ + static const Value empty; + if (auto it = u_map.find(dmfile_id); it != u_map.end()) + { + return it->second; + } + return empty; +} + void DeltaMergeStore::removeSegment(std::unique_lock &, const SegmentPtr & segment) { segments.erase(segment->getRowKeyRange().getEnd()); id_to_segment.erase(segment->segmentId()); + dmfile_id_to_segment_ids.remove(segment); } void DeltaMergeStore::addSegment(std::unique_lock &, const SegmentPtr & segment) @@ -52,6 +90,7 @@ void DeltaMergeStore::addSegment(std::unique_lock &, const Se segment->simpleInfo()); segments[segment->getRowKeyRange().getEnd()] = segment; id_to_segment[segment->segmentId()] = segment; + dmfile_id_to_segment_ids.add(segment); } void DeltaMergeStore::replaceSegment( @@ -64,9 +103,11 @@ void DeltaMergeStore::replaceSegment( old_segment->segmentId(), new_segment->segmentId()); segments.erase(old_segment->getRowKeyRange().getEnd()); + dmfile_id_to_segment_ids.remove(old_segment); segments[new_segment->getRowKeyRange().getEnd()] = new_segment; id_to_segment[new_segment->segmentId()] = new_segment; + dmfile_id_to_segment_ids.add(new_segment); } SegmentPair DeltaMergeStore::segmentSplit( @@ -211,6 +252,7 @@ SegmentPair DeltaMergeStore::segmentSplit( wbs.writeMeta(); segment->abandon(dm_context); + removeSegment(lock, segment); addSegment(lock, new_left); addSegment(lock, new_right); @@ -251,6 +293,16 @@ SegmentPair DeltaMergeStore::segmentSplit( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); + // For logical split, no new DMFile is created, new_left and new_right share the same DMFile with the old segment. + // Even if the index build process of the old segment is not finished, after it is finished, + // it will also trigger the new_left and new_right to bump the meta version. + // So there is no need to check the local index update for logical split. + if (!split_info.is_logical) + { + segmentEnsureStableIndexAsync(new_left); + segmentEnsureStableIndexAsync(new_right); + } + return {new_left, new_right}; } @@ -389,9 +441,293 @@ SegmentPtr DeltaMergeStore::segmentMerge( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); + segmentEnsureStableIndexAsync(merged); return merged; } +void DeltaMergeStore::checkAllSegmentsLocalIndex() +{ + if (!local_index_infos || local_index_infos->empty()) + return; + + LOG_INFO(log, "CheckAllSegmentsLocalIndex - Begin"); + + size_t segments_updated_meta = 0; + auto dm_context = newDMContext(global_context, global_context.getSettingsRef(), "checkAllSegmentsLocalIndex"); + + // 1. Make all segments referencing latest meta version. + { + Stopwatch watch; + std::unique_lock lock(read_write_mutex); + + std::map latest_dmf_by_id; + for (const auto & [end, segment] : segments) + { + UNUSED(end); + for (const auto & dm_file : segment->getStable()->getDMFiles()) + { + auto & latest_dmf = latest_dmf_by_id[dm_file->fileId()]; + if (!latest_dmf || dm_file->metaVersion() > latest_dmf->metaVersion()) + // Note: pageId could be different. It is fine. + latest_dmf = dm_file; + } + } + for (const auto & [end, segment] : segments) + { + UNUSED(end); + for (const auto & dm_file : segment->getStable()->getDMFiles()) + { + auto & latest_dmf = latest_dmf_by_id.at(dm_file->fileId()); + if (dm_file->metaVersion() < latest_dmf->metaVersion()) + { + // Note: pageId could be different. It is fine, replaceStableMetaVersion will fix it. + auto update_result = segmentUpdateMeta(lock, *dm_context, segment, {latest_dmf}); + RUNTIME_CHECK(update_result != nullptr, segment->simpleInfo()); + ++segments_updated_meta; + } + } + } + LOG_INFO( + log, + "CheckAllSegmentsLocalIndex - Finish, updated_meta={}, elapsed={:.3f}s", + segments_updated_meta, + watch.elapsedSeconds()); + } + + size_t segments_missing_indexes = 0; + + // 2. Trigger ensureStableIndex for all segments. + // There could be new segments between 1 and 2, which is fine. New segments + // will invoke ensureStableIndex at creation time. + { + // There must be a lock, because segments[] may be mutated. + // And one lock for all is fine, because segmentEnsureStableIndexAsync is non-blocking, it + // simply put tasks in the background. + std::shared_lock lock(read_write_mutex); + for (const auto & [end, segment] : segments) + { + UNUSED(end); + if (segmentEnsureStableIndexAsync(segment)) + ++segments_missing_indexes; + } + } + + LOG_INFO( + log, + "CheckAllSegmentsLocalIndex - Finish, segments_[updated_meta/missing_index]={}/{}", + segments_updated_meta, + segments_missing_indexes); +} + +bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) +{ + RUNTIME_CHECK(segment != nullptr); + + // TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL + if (!local_index_infos || local_index_infos->empty()) + return false; + + // No lock is needed, stable meta is immutable. + auto dm_files = segment->getStable()->getDMFiles(); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files); + if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) + return false; + + auto store_weak_ptr = weak_from_this(); + auto tracing_id = fmt::format("segmentEnsureStableIndex<{}>", log->identifier()); + auto workload = [store_weak_ptr, build_info, dm_files, segment, tracing_id]() -> void { + auto store = store_weak_ptr.lock(); + if (store == nullptr) // Store is destroyed before the task is executed. + return; + auto dm_context = store->newDMContext( // + store->global_context, + store->global_context.getSettingsRef(), + tracing_id); + const auto source_segment_info = segment->simpleInfo(); + store->segmentEnsureStableIndex(*dm_context, build_info.indexes_to_build, dm_files, source_segment_info); + }; + + auto indexer_scheduler = global_context.getGlobalLocalIndexerScheduler(); + RUNTIME_CHECK(indexer_scheduler != nullptr); + indexer_scheduler->pushTask(LocalIndexerScheduler::Task{ + .keyspace_id = keyspace_id, + .table_id = physical_table_id, + .file_ids = build_info.file_ids, + .request_memory = build_info.estimated_memory_bytes, + .workload = workload, + }); + return true; +} + +bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) const +{ + RUNTIME_CHECK(segment != nullptr); + + // TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL + if (!local_index_infos || local_index_infos->empty()) + return true; + + // No lock is needed, stable meta is immutable. + auto segment_id = segment->segmentId(); + auto dm_files = segment->getStable()->getDMFiles(); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files); + if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) + return true; + + static constexpr size_t MAX_CHECK_TIME_SECONDS = 60; // 60s + Stopwatch watch; + while (watch.elapsedSeconds() < MAX_CHECK_TIME_SECONDS) + { + DMFilePtr dmfile; + { + std::shared_lock lock(read_write_mutex); + auto seg = id_to_segment.at(segment_id); + assert(!seg->getStable()->getDMFiles().empty()); + dmfile = seg->getStable()->getDMFiles()[0]; + } + if (!dmfile) + return false; // DMFile is not exist, return false + bool all_indexes_built = true; + for (const auto & index : *build_info.indexes_to_build) + { + auto col_id = index.column_id; + // The dmfile may be built before col_id is added. Skip build indexes for it + if (!dmfile->isColumnExist(col_id)) + continue; + + all_indexes_built = all_indexes_built && dmfile->getColumnStat(col_id).index_bytes > 0; + } + if (all_indexes_built) + return true; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 0.1s + } + + return false; +} + +SegmentPtr DeltaMergeStore::segmentUpdateMeta( + std::unique_lock & read_write_lock, + DMContext & dm_context, + const SegmentPtr & segment, + const DMFiles & new_dm_files) +{ + if (!isSegmentValid(read_write_lock, segment)) + { + LOG_WARNING(log, "SegmentUpdateMeta - Give up because segment not valid, segment={}", segment->simpleInfo()); + return {}; + } + + auto lock = segment->mustGetUpdateLock(); + auto new_segment = segment->replaceStableMetaVersion(lock, dm_context, new_dm_files); + if (new_segment == nullptr) + { + LOG_WARNING( + log, + "SegmentUpdateMeta - Failed due to replace stableMeta failed, segment={}", + segment->simpleInfo()); + return {}; + } + + replaceSegment(read_write_lock, segment, new_segment); + + // Must not abandon old segment, because they share the same delta. + // segment->abandon(dm_context); + + if constexpr (DM_RUN_CHECK) + { + new_segment->check(dm_context, "After SegmentUpdateMeta"); + } + + LOG_INFO( + log, + "SegmentUpdateMeta - Finish, old_segment={} new_segment={}", + segment->simpleInfo(), + new_segment->simpleInfo()); + return new_segment; +} + +void DeltaMergeStore::segmentEnsureStableIndex( + DMContext & dm_context, + const IndexInfosPtr & index_info, + const DMFiles & dm_files, + const String & source_segment_info) +{ + // 1. Acquire a snapshot for PageStorage, and keep the snapshot until index is built. + // This helps keep DMFile valid during the index build process. + // We don't acquire a snapshot from the source_segment, because the source_segment + // may be abandoned at this moment. + // + // Note that we cannot simply skip the index building when seg is not valid any more, + // because segL and segR is still referencing them, consider this case: + // 1. seg=PhysicalSplit + // 2. Add CreateStableIndex(seg) to ThreadPool + // 3. segL, segR=LogicalSplit(seg) + // 4. CreateStableIndex(seg) + + auto storage_snapshot = std::make_shared( + *dm_context.storage_pool, + dm_context.getReadLimiter(), + dm_context.tracing_id, + /*snapshot_read*/ true); + + RUNTIME_CHECK(dm_files.size() == 1); // size > 1 is currently not supported. + const auto & dm_file = dm_files[0]; + + // 2. Check whether the DMFile has been referenced by any valid segment. + { + std::shared_lock lock(read_write_mutex); + auto segment_ids = dmfile_id_to_segment_ids.get(dm_file->fileId()); + if (segment_ids.empty()) + { + LOG_DEBUG( + log, + "EnsureStableIndex - Give up because no segment to update, source_segment={}", + source_segment_info); + return; + } + } + + LOG_INFO( + log, + "EnsureStableIndex - Begin building index, dm_files={} source_segment={}", + DMFile::info(dm_files), + source_segment_info); + + // 2. Build the index. + DMFileIndexWriter iw(DMFileIndexWriter::Options{ + .path_pool = path_pool, + .index_infos = index_info, + .dm_files = dm_files, + .dm_context = dm_context, + }); + auto new_dmfiles = iw.build(); + RUNTIME_CHECK(!new_dmfiles.empty()); + + LOG_INFO( + log, + "EnsureStableIndex - Finish building index, dm_files={} source_segment={}", + DMFile::info(dm_files), + source_segment_info); + + // 3. Update the meta version of the segments to the latest one. + // To avoid logical split between step 2 and 3, get lastest segments to update again. + // If TiFlash crashes during updating the meta version, some segments' meta are updated and some are not. + // So after TiFlash restarts, we will update meta versions to latest versions again. + { + // We must acquire a single lock when updating multiple segments. + // Otherwise we may miss new segments. + std::unique_lock lock(read_write_mutex); + auto segment_ids = dmfile_id_to_segment_ids.get(dm_file->fileId()); + for (const auto & seg_id : segment_ids) + { + auto segment = id_to_segment[seg_id]; + auto new_segment = segmentUpdateMeta(lock, dm_context, segment, new_dmfiles); + // Expect update meta always success, because the segment must be valid and bump meta should succeed. + RUNTIME_CHECK_MSG(new_segment != nullptr, "Update meta failed for segment {}", segment->simpleInfo()); + } + } +} + SegmentPtr DeltaMergeStore::segmentMergeDelta( DMContext & dm_context, const SegmentPtr & segment, @@ -516,7 +852,6 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( wbs.writeMeta(); - // The instance of PKRange::End is closely linked to instance of PKRange. So we cannot reuse it. // Replace must be done by erase + insert. replaceSegment(lock, segment, new_segment); @@ -543,6 +878,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); + segmentEnsureStableIndexAsync(new_segment); return new_segment; } @@ -652,6 +988,7 @@ SegmentPtr DeltaMergeStore::segmentIngestData( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); + segmentEnsureStableIndexAsync(new_segment); return new_segment; } @@ -711,6 +1048,7 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceDataFromCheckpoint( if constexpr (DM_RUN_CHECK) check(dm_context.global_context); + segmentEnsureStableIndexAsync(new_segment); return new_segment; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 75d898a0666..529c12d952b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -58,6 +58,19 @@ String DMFile::ngcPath() const return getNGCPath(parentPath(), fileId(), getStatus()); } +String DMFile::info(const DMFiles & files) +{ + FmtBuffer buffer; + buffer.append("["); + buffer.joinStr( + files.cbegin(), + files.cend(), + [](const auto & file, FmtBuffer & fb) { fb.fmtAppend("dmf_{}(v={})", file->fileId(), file->metaVersion()); }, + ", "); + buffer.append("]"); + return buffer.toString(); +} + DMFilePtr DMFile::create( UInt64 file_id, const String & parent_path, @@ -103,7 +116,6 @@ DMFilePtr DMFile::create( // since the NGC file is a file under the folder. // FIXME : this should not use PageUtils. PageUtil::touchFile(new_dmfile->ngcPath()); - return new_dmfile; } @@ -170,6 +182,7 @@ DMFilePtr DMFile::restore( ); dmfile->meta->read(file_provider, read_meta_mode); } + return dmfile; } @@ -468,7 +481,7 @@ std::vector DMFile::listFilesForUpload() const return fnames; } -void DMFile::switchToRemote(const S3::DMFileOID & oid) +void DMFile::switchToRemote(const S3::DMFileOID & oid) const { RUNTIME_CHECK(useMetaV2()); RUNTIME_CHECK(getStatus() == DMFileStatus::READABLE); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index a5cb69ee876..c59a69ac140 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -65,6 +65,8 @@ class DMFile : private boost::noncopyable UInt64 meta_version = 0, KeyspaceID keyspace_id = NullspaceID); + static String info(const DMFiles & dm_files); + struct ListOptions { // Only return the DTFiles id list that can be GC @@ -176,10 +178,12 @@ class DMFile : private boost::noncopyable } bool useMetaV2() const { return meta->format_version == DMFileFormat::V3; } + std::vector listFilesForUpload() const; - void switchToRemote(const S3::DMFileOID & oid); + void switchToRemote(const S3::DMFileOID & oid) const; UInt32 metaVersion() const { return meta->metaVersion(); } + UInt32 bumpMetaVersion() const { return meta->bumpMetaVersion(); } private: DMFile( @@ -298,7 +302,7 @@ class DMFile : private boost::noncopyable friend class DMFileV3IncrementWriter; friend class DMFileWriter; - friend class DMFileWriterRemote; + friend class DMFileIndexWriter; friend class DMFileReader; friend class MarkLoader; friend class ColumnReadStream; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index be1bbce8d4a..efe98004ca0 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -82,7 +82,7 @@ class DMFileBlockInputStreamBuilder // - current settings from this context // - current read limiter form this context // - current file provider from this context - explicit DMFileBlockInputStreamBuilder(const Context & dm_context); + explicit DMFileBlockInputStreamBuilder(const Context & context); // Build the final stream ptr. // Empty `rowkey_ranges` means not filter by rowkey diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp new file mode 100644 index 00000000000..aa879d4b206 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp @@ -0,0 +1,233 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::DM +{ + +DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo( + const IndexInfosPtr & index_infos, + const DMFiles & dm_files) +{ + assert(index_infos != nullptr); + static constexpr double VECTOR_INDEX_SIZE_FACTOR = 1.2; + + LocalIndexBuildInfo build; + build.indexes_to_build = std::make_shared(); + build.file_ids.reserve(dm_files.size()); + for (const auto & dmfile : dm_files) + { + bool any_new_index_build = false; + for (const auto & index : *index_infos) + { + auto col_id = index.column_id; + // The dmfile may be built before col_id is added. Skip build indexes for it + if (!dmfile->isColumnExist(col_id)) + continue; + + if (dmfile->getColumnStat(col_id).index_bytes > 0) + continue; + + any_new_index_build = true; + + auto col_stat = dmfile->getColumnStat(col_id); + build.indexes_to_build->emplace_back(index); + build.estimated_memory_bytes += col_stat.serialized_bytes * VECTOR_INDEX_SIZE_FACTOR; + } + + if (any_new_index_build) + build.file_ids.emplace_back(LocalIndexerScheduler::DMFileID(dmfile->fileId())); + } + + build.file_ids.shrink_to_fit(); + return build; +} + +size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable) const +{ + const auto column_defines = dm_file_mutable->getColumnDefines(); + const auto del_cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [](const ColumnDefine & cd) { + return cd.id == TAG_COLUMN_ID; + }); + RUNTIME_CHECK_MSG( + del_cd_iter != column_defines.cend(), + "Cannot find del_mark column, file={}", + dm_file_mutable->path()); + + // read_columns are: DEL_MARK, COL_A, COL_B, ... + // index_builders are: COL_A, COL_B, ... + + ColumnDefines read_columns{*del_cd_iter}; + read_columns.reserve(options.index_infos->size() + 1); + + std::vector index_builders; + index_builders.reserve(options.index_infos->size()); + + // The caller should avoid building index for the same column multiple times. + for (const auto & index_info : *options.index_infos) + { + const auto cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [&](const auto & cd) { + return cd.id == index_info.column_id; + }); + RUNTIME_CHECK_MSG( + cd_iter != column_defines.cend(), + "Cannot find column_id={} in file={}", + index_info.column_id, + dm_file_mutable->path()); + + // Index already built. We don't allow. The caller should filter away. + RUNTIME_CHECK(dm_file_mutable->getColumnStat(index_info.column_id).index_bytes == 0, index_info.column_id); + + read_columns.push_back(*cd_iter); + index_builders.push_back(VectorIndexBuilder::create(index_info.index_definition)); + } + + // If no index to build. + if (index_builders.empty()) + return 0; + + DMFileV3IncrementWriter::Options iw_options{ + .dm_file = dm_file_mutable, + .file_provider = options.dm_context.global_context.getFileProvider(), + .write_limiter = options.dm_context.global_context.getWriteLimiter(), + .path_pool = options.path_pool, + .disagg_ctx = options.dm_context.global_context.getSharedContextDisagg(), + }; + auto iw = DMFileV3IncrementWriter::create(iw_options); + + DMFileBlockInputStreamBuilder read_stream_builder(options.dm_context.global_context); + auto scan_context = std::make_shared(); + + // Note: We use range::newAll to build index for all data in dmfile, because the index is file-level. + auto read_stream = read_stream_builder.build( + dm_file_mutable, + read_columns, + {RowKeyRange::newAll(options.dm_context.is_common_handle, options.dm_context.rowkey_column_size)}, + scan_context); + + // Read all blocks and build index + while (true) + { + auto block = read_stream->read(); + if (!block) + break; + + RUNTIME_CHECK(block.columns() == read_columns.size()); + RUNTIME_CHECK(block.getByPosition(0).column_id == TAG_COLUMN_ID); + + auto del_mark_col = block.safeGetByPosition(0).column; + RUNTIME_CHECK(del_mark_col != nullptr); + const auto * del_mark = static_cast *>(del_mark_col.get()); + RUNTIME_CHECK(del_mark != nullptr); + + for (size_t col_idx = 0, col_idx_max = index_builders.size(); col_idx < col_idx_max; ++col_idx) + { + const auto & index_builder = index_builders[col_idx]; + const auto & col_with_type_and_name = block.safeGetByPosition(col_idx + 1); + RUNTIME_CHECK(col_with_type_and_name.column_id == read_columns[col_idx + 1].id); + const auto & col = col_with_type_and_name.column; + index_builder->addBlock(*col, del_mark); + } + } + + // Write down the index + size_t total_built_index_bytes = 0; + for (size_t col_idx = 0, col_idx_max = index_builders.size(); col_idx < col_idx_max; col_idx++) + { + const auto & index_builder = index_builders[col_idx]; + const auto & cd = read_columns[col_idx + 1]; + + // Save index and update column stats + auto callback = [&](const IDataType::SubstreamPath & substream_path) -> void { + if (IDataType::isNullMap(substream_path) || IDataType::isArraySizes(substream_path)) + return; + + const auto stream_name = DMFile::getFileNameBase(cd.id, substream_path); + const auto index_file_name = colIndexFileName(stream_name); + const auto index_path = iw->localPath() + "/" + index_file_name; + index_builder->save(index_path); + + auto & col_stat = dm_file_mutable->meta->getColumnStats().at(cd.id); + col_stat.index_bytes = Poco::File(index_path).getSize(); + total_built_index_bytes += col_stat.index_bytes; + // Memorize what kind of vector index it is, so that we can correctly restore it when reading. + col_stat.vector_index.emplace(); + col_stat.vector_index->set_index_kind(tipb::VectorIndexKind_Name(index_builder->definition->kind)); + col_stat.vector_index->set_distance_metric( + tipb::VectorDistanceMetric_Name(index_builder->definition->distance_metric)); + col_stat.vector_index->set_dimensions(index_builder->definition->dimension); + + iw->include(index_file_name); + }; + cd.type->enumerateStreams(callback); + } + + dm_file_mutable->meta->bumpMetaVersion(); + iw->finalize(); // Note: There may be S3 uploads here. + return total_built_index_bytes; +} + +DMFiles DMFileIndexWriter::build() const +{ + RUNTIME_CHECK(!built); + // Create a clone of existing DMFile instances by using DMFile::restore, + // because later we will mutate some fields and persist these mutations. + DMFiles cloned_dm_files{}; + cloned_dm_files.reserve(options.dm_files.size()); + + auto delegate = options.path_pool->getStableDiskDelegator(); + for (const auto & dm_file : options.dm_files) + { + if (const auto disagg_ctx = options.dm_context.global_context.getSharedContextDisagg(); + !disagg_ctx || !disagg_ctx->remote_data_store) + RUNTIME_CHECK(dm_file->parentPath() == delegate.getDTFilePath(dm_file->fileId())); + + auto new_dmfile = DMFile::restore( + options.dm_context.global_context.getFileProvider(), + dm_file->fileId(), + dm_file->pageId(), + dm_file->parentPath(), + DMFileMeta::ReadMode::all(), + dm_file->metaVersion()); + cloned_dm_files.push_back(new_dmfile); + } + + for (const auto & cloned_dmfile : cloned_dm_files) + { + auto index_bytes = buildIndexForFile(cloned_dmfile); + if (auto data_store = options.dm_context.global_context.getSharedContextDisagg()->remote_data_store; + !data_store) + { + // After building index, add the index size to the file size. + auto res = options.path_pool->getStableDiskDelegator().updateDTFileSize( + cloned_dmfile->fileId(), + cloned_dmfile->getBytesOnDisk() + index_bytes); + RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", cloned_dmfile->path()); + } + } + + built = true; + return cloned_dm_files; +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h new file mode 100644 index 00000000000..8604e682186 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h @@ -0,0 +1,77 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class StoragePathPool; +using StoragePathPoolPtr = std::shared_ptr; +} // namespace DB + +namespace DB::DM +{ +class DMFile; +using DMFilePtr = std::shared_ptr; +using DMFiles = std::vector; +} // namespace DB::DM + +namespace DB::DM +{ + +class DMFileIndexWriter +{ +public: + struct LocalIndexBuildInfo + { + std::vector file_ids; + size_t estimated_memory_bytes = 0; + IndexInfosPtr indexes_to_build; + }; + + static LocalIndexBuildInfo getLocalIndexBuildInfo(const IndexInfosPtr & index_infos, const DMFiles & dm_files); + + struct Options + { + const StoragePathPoolPtr path_pool; + const IndexInfosPtr index_infos; + const DMFiles dm_files; + const DMContext & dm_context; + }; + + explicit DMFileIndexWriter(const Options & options) + : logger(Logger::get()) + , options(options) + {} + + // Note: This method can only be called once. + DMFiles build() const; + +private: + size_t buildIndexForFile(const DMFilePtr & dm_file_mutable) const; + +private: + const LoggerPtr logger; + const Options options; + mutable bool built = false; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp index 65494efba92..9c666d9a3de 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp @@ -15,14 +15,12 @@ #include #include #include -#include #include #include #include -#include -#include #include + namespace DB::DM { @@ -148,15 +146,6 @@ void DMFileV3IncrementWriter::writeAndIncludeMetaFile() // to ensure file's integrity. auto meta_file_path_for_write = meta_file_path + ".tmp"; - // Just a protection. We don't allow overwriting meta file. - { - auto existing_file = Poco::File(meta_file_path); - RUNTIME_CHECK_MSG( // - !existing_file.exists(), - "Meta file already exists, file={}", - meta_file_path); - } - auto meta_file = WriteBufferFromWritableFileBuilder::buildPtr( options.file_provider, meta_file_path_for_write, // Must not use meta->metaPath(), because DMFile may be a S3 DMFile diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h index e159095d35a..c6244fb2ed3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -35,10 +34,8 @@ using StoragePathPoolPtr = std::shared_ptr; namespace DB::DM { - class DMFile; using DMFilePtr = std::shared_ptr; - } // namespace DB::DM namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 2b012cb27a3..4b82aaf4ce3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include @@ -60,14 +59,12 @@ DMFileWriter::DMFileWriter( for (auto & cd : write_columns) { - if (cd.vector_index) - RUNTIME_CHECK(VectorIndexBuilder::isSupportedType(*cd.type)); - // TODO: currently we only generate index for Integers, Date, DateTime types, and this should be configurable by user. /// for handle column always generate index auto type = removeNullable(cd.type); bool do_index = cd.id == EXTRA_HANDLE_COLUMN_ID || type->isInteger() || type->isDateOrDateTime(); - addStreams(cd.id, cd.type, do_index, cd.vector_index); + + addStreams(cd.id, cd.type, do_index); dmfile->meta->getColumnStats().emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0}); } } @@ -101,11 +98,7 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaFile() } } -void DMFileWriter::addStreams( - ColId col_id, - DataTypePtr type, - bool do_index, - TiDB::VectorIndexDefinitionPtr do_vector_index) +void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) { auto callback = [&](const IDataType::SubstreamPath & substream_path) { const auto stream_name = DMFile::getFileNameBase(col_id, substream_path); @@ -118,8 +111,7 @@ void DMFileWriter::addStreams( options.max_compress_block_size, file_provider, write_limiter, - do_index && substream_can_index, - (do_vector_index && substream_can_index) ? do_vector_index : nullptr); + do_index && substream_can_index); column_streams.emplace(stream_name, std::move(stream)); }; @@ -209,9 +201,6 @@ void DMFileWriter::writeColumn( (col_id == EXTRA_HANDLE_COLUMN_ID || col_id == TAG_COLUMN_ID) ? nullptr : del_mark); } - if (stream->vector_index) - stream->vector_index->addBlock(column, del_mark); - /// There could already be enough data to compress into the new block. if (stream->compressed_buf->offset() >= options.min_compress_block_size) stream->compressed_buf->next(); @@ -336,24 +325,6 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) buffer->next(); } - if (stream->vector_index && !is_empty_file) - { - // Vector index files are always not written into the merged file - // because we want to allow to be mmaped by the usearch. - - const auto index_name = dmfile->colIndexPath(stream_name); - stream->vector_index->save(index_name); - col_stat.index_bytes = Poco::File(index_name).getSize(); - - // Memorize what kind of vector index it is, so that we can correctly restore it when reading. - col_stat.vector_index.emplace(); - col_stat.vector_index->set_index_kind( - tipb::VectorIndexKind_Name(stream->vector_index->definition->kind)); - col_stat.vector_index->set_distance_metric( - tipb::VectorDistanceMetric_Name(stream->vector_index->definition->distance_metric)); - col_stat.vector_index->set_dimensions(stream->vector_index->definition->dimension); - } - // write mark into merged_file_writer if (!is_empty_file) { @@ -443,11 +414,6 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) } #endif } - - if (stream->vector_index) - { - RUNTIME_CHECK_MSG(false, "Vector index is not compatible with V1 and V2 format"); - } } }; type->enumerateStreams(callback, {}); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 2363790374e..d85185bc729 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -22,7 +22,6 @@ #include #include #include -#include namespace DB { @@ -53,8 +52,7 @@ class DMFileWriter size_t max_compress_block_size, FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter_, - bool do_index, - TiDB::VectorIndexDefinitionPtr do_vector_index) + bool do_index) : plain_file(ChecksumWriteBufferBuilder::build( dmfile->getConfiguration().has_value(), file_provider, @@ -72,7 +70,6 @@ class DMFileWriter compression_settings, !dmfile->getConfiguration().has_value())) , minmaxes(do_index ? std::make_shared(*type) : nullptr) - , vector_index(do_vector_index ? VectorIndexBuilder::create(do_vector_index) : nullptr) { if (!dmfile->useMetaV2()) { @@ -98,7 +95,6 @@ class DMFileWriter WriteBufferPtr compressed_buf; MinMaxIndexPtr minmaxes; - VectorIndexBuilderPtr vector_index; MarksInCompressedFilePtr marks; @@ -160,7 +156,7 @@ class DMFileWriter /// Add streams with specified column id. Since a single column may have more than one Stream, /// for example Nullable column has a NullMap column, we would track them with a mapping /// FileNameBase -> Stream. - void addStreams(ColId col_id, DataTypePtr type, bool do_index, TiDB::VectorIndexDefinitionPtr do_vector_index); + void addStreams(ColId col_id, DataTypePtr type, bool do_index); WriteBufferFromFileBasePtr createMetaFile(); void finalizeMeta(); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h index 013631ca1f0..1a82496b8bc 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexCache.h @@ -50,7 +50,7 @@ class VectorIndexCache std::mutex shutdown_mu; private: - friend class ::DB::DM::tests::VectorIndexTestUtils; + friend class tests::VectorIndexTestUtils; // Drop the in-memory Vector Index if the on-disk file is deleted. // mmaped file could be unmmaped so that disk space can be reclaimed. diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index d72666377de..0b7221deeb9 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -220,6 +220,9 @@ struct fmt::formatter template auto format(const DB::DM::LocalIndexerScheduler::FileID & id, FormatContext & ctx) const -> decltype(ctx.out()) { - return fmt::format_to(ctx.out(), "{}", std::visit([](const auto & id) { return id.id; }, id)); + if (std::holds_alternative(id)) + return fmt::format_to(ctx.out(), "DM_{}", std::get(id).id); + else + return fmt::format_to(ctx.out(), "CT_{}", std::get(id).id); } }; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 719fb00ad42..498b66635c6 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -1393,50 +1394,16 @@ SegmentPtr Segment::replaceStableMetaVersion( DMContext & dm_context, const DMFiles & new_stable_files) { - auto current_stable_files_str = [&] { - FmtBuffer fmt_buf; - fmt_buf.append('['); - fmt_buf.joinStr( - stable->getDMFiles().begin(), - stable->getDMFiles().end(), - [](const DMFilePtr & file, FmtBuffer & fb) { - fb.fmtAppend("dmf_{}(v{})", file->fileId(), file->metaVersion()); - }, - ","); - fmt_buf.append(']'); - return fmt_buf.toString(); - }; - - auto new_stable_files_str = [&] { - FmtBuffer fmt_buf; - fmt_buf.append('['); - fmt_buf.joinStr( - new_stable_files.begin(), - new_stable_files.end(), - [](const DMFilePtr & file, FmtBuffer & fb) { - fb.fmtAppend("dmf_{}(v{})", file->fileId(), file->metaVersion()); - }, - ","); - fmt_buf.append(']'); - return fmt_buf.toString(); - }; - - LOG_DEBUG( - log, - "ReplaceStableMetaVersion - Begin, current_stable={} new_stable={}", - current_stable_files_str(), - new_stable_files_str()); - - // Ensure new stable files have the same DMFile ID as the old stable files. + // Ensure new stable files have the same DMFile ID and Page ID as the old stable files. // We only allow changing meta version when calling this function. if (new_stable_files.size() != stable->getDMFiles().size()) { LOG_WARNING( log, - "ReplaceStableMetaVersion - Fail, stable files count mismatch, current_stable={} new_stable={}", - current_stable_files_str(), - new_stable_files_str()); + "ReplaceStableMetaVersion - Failed due to stable mismatch, current_stable={} new_stable={}", + DMFile::info(stable->getDMFiles()), + DMFile::info(new_stable_files)); return {}; } for (size_t i = 0; i < new_stable_files.size(); i++) @@ -1445,17 +1412,45 @@ SegmentPtr Segment::replaceStableMetaVersion( { LOG_WARNING( log, - "ReplaceStableMetaVersion - Fail, stable files mismatch, current_stable={} new_stable={}", - current_stable_files_str(), - new_stable_files_str()); + "ReplaceStableMetaVersion - Failed due to stable mismatch, current_stable={} " + "new_stable={}", + DMFile::info(stable->getDMFiles()), + DMFile::info(new_stable_files)); return {}; } } WriteBatches wbs(*dm_context.storage_pool, dm_context.getWriteLimiter()); + DMFiles new_dm_files; + new_dm_files.reserve(new_stable_files.size()); + const auto & current_stable_files = stable->getDMFiles(); + for (size_t file_idx = 0; file_idx < new_stable_files.size(); ++file_idx) + { + const auto & new_file = new_stable_files[file_idx]; + const auto & current_file = current_stable_files[file_idx]; + RUNTIME_CHECK(new_file->fileId() == current_file->fileId()); + if (new_file->pageId() != current_file->pageId()) + { + // Allow pageId being different. We will restore using a correct pageId + // because this function is supposed to only update meta version. + auto new_dmfile = DMFile::restore( + dm_context.global_context.getFileProvider(), + new_file->fileId(), + current_file->pageId(), + new_file->parentPath(), + DMFileMeta::ReadMode::all(), + new_file->metaVersion()); + new_dm_files.push_back(new_dmfile); + } + else + { + new_dm_files.push_back(new_file); + } + } + auto new_stable = std::make_shared(stable->getId()); - new_stable->setFiles(new_stable_files, rowkey_range, &dm_context); + new_stable->setFiles(new_dm_files, rowkey_range, &dm_context); new_stable->saveMeta(wbs.meta); auto new_me = std::make_shared( // @@ -1470,8 +1465,11 @@ SegmentPtr Segment::replaceStableMetaVersion( wbs.writeAll(); - LOG_DEBUG(log, "ReplaceStableMetaVersion - Finish, new_stable_files={}", new_stable_files_str()); - + LOG_DEBUG( + log, + "ReplaceStableMetaVersion - Finish, new_stable={} old_stable={}", + DMFile::info(new_stable_files), + DMFile::info(stable->getDMFiles())); return new_me; } @@ -2466,6 +2464,7 @@ String Segment::simpleInfo() const String Segment::info() const { + RUNTIME_CHECK(stable && delta); return fmt::format( " #include #include -#include namespace DB::DM { @@ -554,6 +553,8 @@ class Segment String logId() const; String simpleInfo() const; + // Detail information of segment. + // Do not use it in read path since the segment may not in local. String info() const; static String simpleInfo(const std::vector & segments); @@ -751,7 +752,6 @@ class Segment const ColumnDefines & read_columns, const StableValueSpacePtr & stable); - #ifndef DBMS_PUBLIC_GTEST private: #else diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index eaee30ff77e..71ebc988d60 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -279,12 +279,7 @@ size_t StableValueSpace::getDMFilesBytes() const String StableValueSpace::getDMFilesString() { - String s; - for (auto & file : files) - s += "dmf_" + DB::toString(file->fileId()) + ","; - if (!s.empty()) - s.erase(s.length() - 1); - return s; + return DMFile::info(files); } void StableValueSpace::enableDMFilesGC(DMContext & dm_context) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 4be99198746..eb203caefea 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -282,6 +282,7 @@ try handle_column_define, false, 1, + nullptr, DeltaMergeStore::Settings()); auto block = DMTestEnv::prepareSimpleWriteBlock(0, 100, false); new_store->write(*db_context, db_context->getSettingsRef(), block); @@ -3351,6 +3352,7 @@ class DeltaMergeStoreMergeDeltaBySegmentTest (*cols)[0], pk_type == DMTestEnv::PkType::CommonHandle, 1, + nullptr, DeltaMergeStore::Settings()); dm_context = store->newDMContext( *db_context, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 9cd713dbf98..00b135c1b61 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -173,6 +173,7 @@ class DeltaMergeStoreTestFastAddPeer handle_column_define, is_common_handle, rowkey_column_size, + nullptr, DeltaMergeStore::Settings()); return s; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index d0e966fd646..1b51b208e5f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -79,6 +79,7 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic handle_column_define, is_common_handle, rowkey_column_size, + nullptr, DeltaMergeStore::Settings()); return s; } @@ -195,6 +196,7 @@ class DeltaMergeStoreRWTest handle_column_define, is_common_handle, rowkey_column_size, + nullptr, DeltaMergeStore::Settings()); return s; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp new file mode 100644 index 00000000000..5d49358444f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp @@ -0,0 +1,569 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + + +namespace DB::DM::tests +{ + +class DeltaMergeStoreVectorTest + : public DB::base::TiFlashStorageTestBasic + , public VectorIndexTestUtils +{ +public: + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + store = reload(); + } + + DeltaMergeStorePtr reload() + { + TiFlashStorageTestBasic::reload(); + auto cols = DMTestEnv::getDefaultColumns(); + cols->push_back(cdVec()); + + ColumnDefine handle_column_define = (*cols)[0]; + + DeltaMergeStorePtr s = DeltaMergeStore::create( + *db_context, + false, + "test", + "t_100", + NullspaceID, + 100, + true, + *cols, + handle_column_define, + false, + 1, + indexInfo(), + DeltaMergeStore::Settings()); + return s; + } + + void write(size_t num_rows_write) + { + String sequence = fmt::format("[0, {})", num_rows_write); + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of vector for test + block.insert(colVecFloat32(sequence, vec_column_name, vec_column_id)); + } + store->write(*db_context, db_context->getSettingsRef(), block); + } + + void read(const RowKeyRange & range, const PushDownFilterPtr & filter, const ColumnWithTypeAndName & out) + { + auto in = store->read( + *db_context, + db_context->getSettingsRef(), + {cdVec()}, + {range}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + filter, + std::vector{}, + 0, + TRACING_NAME, + /*keep_order=*/false)[0]; + ASSERT_INPUTSTREAM_COLS_UR( + in, + Strings({vec_column_name}), + createColumns({ + out, + })); + } + + void triggerMergeDelta() + { + std::vector all_segments; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + all_segments.push_back(segment); + } + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + for (const auto & segment : all_segments) + ASSERT_TRUE( + store->segmentMergeDelta(*dm_context, segment, DeltaMergeStore::MergeDeltaReason::Manual) != nullptr); + } + + void waitStableIndexReady() + { + std::vector all_segments; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + all_segments.push_back(segment); + } + for (const auto & segment : all_segments) + ASSERT_TRUE(store->segmentWaitStableIndexReady(segment)); + } + + void triggerMergeAllSegments() + { + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + std::vector segments_to_merge; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + segments_to_merge.push_back(segment); + } + std::sort(segments_to_merge.begin(), segments_to_merge.end(), [](const auto & lhs, const auto & rhs) { + return lhs->getRowKeyRange().getEnd() < rhs->getRowKeyRange().getEnd(); + }); + auto new_segment = store->segmentMerge( + *dm_context, + segments_to_merge, + DeltaMergeStore::SegmentMergeReason::BackgroundGCThread); + ASSERT_TRUE(new_segment != nullptr); + } + +protected: + DeltaMergeStorePtr store; + + constexpr static const char * TRACING_NAME = "DeltaMergeStoreVectorTest"; +}; + +TEST_F(DeltaMergeStoreVectorTest, TestBasic) +try +{ + store = reload(); + + const size_t num_rows_write = 128; + + // write to store + write(num_rows_write); + + // trigger mergeDelta for all segments + triggerMergeDelta(); + + // check stable index has built for all segments + waitStableIndexReady(); + + const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } +} +CATCH + +TEST_F(DeltaMergeStoreVectorTest, TestLogicalSplitAndMerge) +try +{ + store = reload(); + + const size_t num_rows_write = 128; + + // write to store + write(num_rows_write); + + // trigger mergeDelta for all segments + triggerMergeDelta(); + + // logical split + RowKeyRange left_segment_range; + { + SegmentPtr segment; + { + std::shared_lock lock(store->read_write_mutex); + segment = store->segments.begin()->second; + } + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto breakpoint = RowKeyValue::fromHandle(num_rows_write / 2); + const auto [left, right] = store->segmentSplit( + *dm_context, + segment, + DeltaMergeStore::SegmentSplitReason::ForIngest, + breakpoint, + DeltaMergeStore::SegmentSplitMode::Logical); + ASSERT_TRUE(left->rowkey_range.end == breakpoint); + ASSERT_TRUE(right->rowkey_range.start == breakpoint); + left_segment_range = RowKeyRange( + left->rowkey_range.start, + left->rowkey_range.end, + store->is_common_handle, + store->rowkey_column_size); + } + + // check stable index has built for all segments + waitStableIndexReady(); + + // read from store + { + read( + left_segment_range, + EMPTY_FILTER, + colVecFloat32(fmt::format("[0, {})", num_rows_write / 2), vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(left_segment_range, filter, createVecFloat32Column({{2.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(left_segment_range, filter, createVecFloat32Column({})); // FIXME: should be 63.0 + } + + // merge segment + triggerMergeAllSegments(); + + // check stable index has built for all segments + waitStableIndexReady(); + + auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{122.0}})); + } +} +CATCH + +TEST_F(DeltaMergeStoreVectorTest, TestPhysicalSplitAndMerge) +try +{ + // Physical split is slow, so if we trigger mergeDelta and then physical split soon, + // the physical split is likely to fail since vector index building cause segment to be invalid. + + store = reload(); + + const size_t num_rows_write = 128; + + // write to store + write(num_rows_write); + + // trigger mergeDelta for all segments + triggerMergeDelta(); + + // physical split + auto physical_split = [&] { + SegmentPtr segment; + { + std::shared_lock lock(store->read_write_mutex); + segment = store->segments.begin()->second; + } + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto breakpoint = RowKeyValue::fromHandle(num_rows_write / 2); + return store->segmentSplit( + *dm_context, + segment, + DeltaMergeStore::SegmentSplitReason::ForIngest, + breakpoint, + DeltaMergeStore::SegmentSplitMode::Physical); + }; + + auto [left, right] = physical_split(); + if (left == nullptr && right == nullptr) + { + // check stable index has built for all segments first + waitStableIndexReady(); + // trigger physical split again + std::tie(left, right) = physical_split(); + } + + ASSERT_TRUE(left->rowkey_range.end == RowKeyValue::fromHandle(num_rows_write / 2)); + ASSERT_TRUE(right->rowkey_range.start == RowKeyValue::fromHandle(num_rows_write / 2)); + RowKeyRange left_segment_range = RowKeyRange( + left->rowkey_range.start, + left->rowkey_range.end, + store->is_common_handle, + store->rowkey_column_size); + + // check stable index has built for all segments + waitStableIndexReady(); + + // read from store + { + read( + left_segment_range, + EMPTY_FILTER, + colVecFloat32(fmt::format("[0, {})", num_rows_write / 2), vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(left_segment_range, filter, createVecFloat32Column({{2.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(left_segment_range, filter, createVecFloat32Column({{63.0}})); + } + + // merge segment + triggerMergeAllSegments(); + + // check stable index has built for all segments + waitStableIndexReady(); + + auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({122.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{122.0}})); + } +} +CATCH + +TEST_F(DeltaMergeStoreVectorTest, TestIngestData) +try +{ + store = reload(); + + const size_t num_rows_write = 128; + + // write to store + write(num_rows_write); + + // Prepare DMFile + auto [dmfile_parent_path, file_id] = store->preAllocateIngestFile(); + ASSERT_FALSE(dmfile_parent_path.empty()); + DMFilePtr dmfile = DMFile::create( + file_id, + dmfile_parent_path, + std::make_optional(), + 128 * 1024, + 16 * 1024 * 1024, + DMFileFormat::V3); + { + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of vector for test + block.insert(colVecFloat32(fmt::format("[0, {})", num_rows_write), vec_column_name, vec_column_id)); + ColumnDefinesPtr cols = DMTestEnv::getDefaultColumns(); + cols->push_back(cdVec()); + auto stream = std::make_shared(*db_context, dmfile, *cols); + stream->writePrefix(); + stream->write(block, DMFileBlockOutputStream::BlockProperty{0, 0, 0, 0}); + stream->writeSuffix(); + } + auto page_id = dmfile->pageId(); + auto file_provider = db_context->getFileProvider(); + dmfile = DMFile::restore( + file_provider, + file_id, + page_id, + dmfile_parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); + auto delegator = store->path_pool->getStableDiskDelegator(); + delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), dmfile_parent_path); + + // Ingest data + { + // Ingest data into the first segment + auto segment = store->segments.begin()->second; + auto range = segment->getRowKeyRange(); + + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto new_segment = store->segmentIngestData(*dm_context, segment, dmfile, true); + ASSERT_TRUE(new_segment != nullptr); + } + + // check stable index has built for all segments + waitStableIndexReady(); + + auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } +} +CATCH + + +TEST_F(DeltaMergeStoreVectorTest, TestStoreRestore) +try +{ + store = reload(); + + const size_t num_rows_write = 128; + + // write to store + write(num_rows_write); + + // trigger mergeDelta for all segments + triggerMergeDelta(); + + // shutdown store + store->shutdown(); + + // restore store + store = reload(); + + // check stable index has built for all segments + waitStableIndexReady(); + + const auto range = RowKeyRange::newAll(store->is_common_handle, store->rowkey_column_size); + + // read from store + { + read(range, EMPTY_FILTER, colVecFloat32("[0, 128)", vec_column_name, vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.0})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(encodeVectorFloat32({2.1})); + + auto filter = std::make_shared(wrapWithANNQueryInfo(nullptr, ann_query_info)); + + read(range, filter, createVecFloat32Column({{2.0}})); + } +} +CATCH + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp index 869b8ade963..7d11d41c8ca 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp @@ -283,7 +283,9 @@ try }); dm_file_2->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test_overwrite"; ASSERT_EQ(1, dm_file_2->meta->bumpMetaVersion()); - ASSERT_THROW({ iw->finalize(); }, DB::Exception); + ASSERT_NO_THROW({ + iw->finalize(); + }); // No exception should be thrown because it may be a file left by previous writes but segment failed to update meta version. // Read out meta v1 again. auto dm_file_for_read = DMFile::restore( @@ -294,7 +296,7 @@ try DMFileMeta::ReadMode::all(), /* meta_version= */ 1); ASSERT_STREQ( - "test", + "test_overwrite", dm_file_for_read->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index be5f5a76e36..bae9f31d5af 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -122,7 +122,8 @@ bool checkMatch( table_columns, getExtraHandleColumnDefine(is_common_handle), is_common_handle, - 1); + 1, + nullptr); store->write(context, context.getSettingsRef(), block); store->flushCache(context, all_range); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp index 67e1091f0bd..d356970e408 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp @@ -59,6 +59,7 @@ void SimplePKTestBasic::reload() (*cols)[0], is_common_handle, 1, + nullptr, DeltaMergeStore::Settings()); dm_context = store->newDMContext( *db_context, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index 8c0b68014d2..9dcbbfea0c1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -18,12 +18,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -36,9 +38,11 @@ #include #include +#include #include #include + namespace CurrentMetrics { extern const Metric DT_SnapshotOfRead; @@ -53,55 +57,6 @@ extern const char file_cache_fg_download_fail[]; namespace DB::DM::tests { -class VectorIndexTestUtils -{ -public: - const ColumnID vec_column_id = 100; - const String vec_column_name = "vec"; - - /// Create a column with values like [1], [2], [3], ... - /// Each value is a VectorFloat32 with exactly one dimension. - static ColumnWithTypeAndName colInt64(std::string_view sequence, const String & name = "", Int64 column_id = 0) - { - auto data = genSequence(sequence); - return createColumn(data, name, column_id); - } - - static ColumnWithTypeAndName colVecFloat32(std::string_view sequence, const String & name = "", Int64 column_id = 0) - { - auto data = genSequence(sequence); - std::vector data_in_array; - for (auto & v : data) - { - Array vec; - vec.push_back(static_cast(v)); - data_in_array.push_back(vec); - } - return createVecFloat32Column(data_in_array, name, column_id); - } - - static String encodeVectorFloat32(const std::vector & vec) - { - WriteBufferFromOwnString wb; - Array arr; - for (const auto & v : vec) - arr.push_back(static_cast(v)); - EncodeVectorFloat32(arr, wb); - return wb.str(); - } - - ColumnDefine cdVec() - { - // When used in read, no need to assign vector_index. - return ColumnDefine(vec_column_id, vec_column_name, tests::typeFromString("Array(Float32)")); - } - - static size_t cleanVectorCacheEntries(const std::shared_ptr & cache) - { - return cache->cleanOutdatedCacheEntries(); - } -}; - class VectorIndexDMFileTest : public VectorIndexTestUtils , public DB::base::TiFlashStorageTestBasic @@ -113,12 +68,14 @@ class VectorIndexDMFileTest TiFlashStorageTestBasic::SetUp(); parent_path = TiFlashStorageTestBasic::getTemporaryPath(); - path_pool = std::make_shared( - db_context->getPathPool().withTable("test", "VectorIndexDMFileTest", false)); + path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); storage_pool = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 100, *path_pool, "test.t1"); + auto delegator = path_pool->getStableDiskDelegator(); + auto paths = delegator.listPaths(); + RUNTIME_CHECK(paths.size() == 1); dm_file = DMFile::create( 1, - parent_path, + paths[0], std::make_optional(), 128 * 1024, 16 * 1024 * 1024, @@ -149,10 +106,31 @@ class VectorIndexDMFileTest DMFilePtr restoreDMFile() { - auto file_id = dm_file->fileId(); - auto page_id = dm_file->pageId(); - auto file_provider = dbContext().getFileProvider(); - return DMFile::restore(file_provider, file_id, page_id, parent_path, DMFileMeta::ReadMode::all()); + auto dmfile_parent_path = dm_file->parentPath(); + auto dmfile = DMFile::restore( + dbContext().getFileProvider(), + dm_file->fileId(), + dm_file->pageId(), + dmfile_parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); + auto delegator = path_pool->getStableDiskDelegator(); + delegator.addDTFile(dm_file->fileId(), dmfile->getBytesOnDisk(), dmfile_parent_path); + return dmfile; + } + + DMFilePtr buildIndex(TiDB::VectorIndexDefinition definition) + { + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(indexInfo(definition), {dm_file}); + DMFileIndexWriter iw(DMFileIndexWriter::Options{ + .path_pool = path_pool, + .index_infos = build_info.indexes_to_build, + .dm_files = {dm_file}, + .dm_context = *dm_context, + }); + auto new_dmfiles = iw.build(); + assert(new_dmfiles.size() == 1); + return new_dmfiles[0]; } Context & dbContext() { return *db_context; } @@ -230,6 +208,7 @@ try } dm_file = restoreDMFile(); + dm_file = buildIndex(*vec_cd.vector_index); // Read with exact match { @@ -540,6 +519,7 @@ try } dm_file = restoreDMFile(); + dm_file = buildIndex(*vec_cd.vector_index); { auto ann_query_info = std::make_shared(); @@ -606,6 +586,7 @@ try } dm_file = restoreDMFile(); + dm_file = buildIndex(*vec_cd.vector_index); // Pack #0 is filtered out according to VecIndex { @@ -748,6 +729,7 @@ try } dm_file = restoreDMFile(); + dm_file = buildIndex(*vec_cd.vector_index); // Pack Filter using RowKeyRange { @@ -1047,6 +1029,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 5, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); auto stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 1, {100.0}); assertStreamOut(stream, "[4, 5)"); @@ -1071,6 +1054,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 5, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 10, /* at */ 20); @@ -1101,6 +1085,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 10, /* at */ 20, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); // Delta: [12, 18), [50, 60) writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 6, /* at */ 12); @@ -1189,6 +1174,7 @@ try ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 5, /* at */ 0, /* clear */ false); flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 10, /* at */ 20); @@ -1259,11 +1245,7 @@ class VectorIndexSegmentOnS3Test auto cols = DMTestEnv::getDefaultColumns(); auto vec_cd = cdVec(); - vec_cd.vector_index = std::make_shared(TiDB::VectorIndexDefinition{ - .kind = tipb::VectorIndexKind::HNSW, - .dimension = 1, - .distance_metric = tipb::VectorDistanceMetric::L2, - }); + vec_cd.vector_index = std::make_shared(index_info); cols->emplace_back(vec_cd); setColumns(cols); @@ -1347,7 +1329,7 @@ class VectorIndexSegmentOnS3Test { auto * file_cache = FileCache::instance(); auto file_segments = file_cache->getAll(); - for (const auto & file_seg : file_cache->getAll()) + for (const auto & file_seg : file_segments) file_cache->remove(file_cache->toS3Key(file_seg->getLocalFileName()), true); RUNTIME_CHECK(file_cache->getAll().empty()); @@ -1360,11 +1342,42 @@ class VectorIndexSegmentOnS3Test block.insert(colVecFloat32("[0, 100)", vec_column_name, vec_column_id)); wn_segment->write(*dm_context, std::move(block), true); wn_segment = wn_segment->mergeDelta(*dm_context, tableColumns()); + wn_segment = buildIndex(dm_context, wn_segment); + RUNTIME_CHECK(wn_segment != nullptr); // Let's just make sure we are later indeed reading from S3 RUNTIME_CHECK(wn_segment->stable->getDMFiles()[0]->path().rfind("s3://") == 0); } + SegmentPtr buildIndex(DMContextPtr dm_context, SegmentPtr segment) + { + auto * file_cache = FileCache::instance(); + RUNTIME_CHECK(file_cache != nullptr); + RUNTIME_CHECK(file_cache->getAll().empty()); + + auto dm_files = segment->getStable()->getDMFiles(); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(indexInfo(index_info), dm_files); + + // Build index + DMFileIndexWriter iw(DMFileIndexWriter::Options{ + .path_pool = storage_path_pool, + .index_infos = build_info.indexes_to_build, + .dm_files = dm_files, + .dm_context = *dm_context, + }); + auto new_dmfiles = iw.build(); + + RUNTIME_CHECK(file_cache->getAll().size() == 2); + SegmentPtr new_segment; + { + auto lock = segment->mustGetUpdateLock(); + new_segment = segment->replaceStableMetaVersion(lock, *dm_context, new_dmfiles); + } + // remove all file cache to make sure we are reading from S3 + removeAllFileCache(); + return new_segment; + } + BlockInputStreamPtr computeNodeTableScan() { return createComputeNodeStream(wn_segment, {cdPK(), cdVec()}, nullptr); @@ -1427,6 +1440,12 @@ class VectorIndexSegmentOnS3Test // MemoryTrackerPtr memory_tracker; MemTrackerWrapper dummy_mem_tracker = MemTrackerWrapper(0, root_of_query_mem_trackers.get()); + + const TiDB::VectorIndexDefinition index_info = { + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }; }; TEST_F(VectorIndexSegmentOnS3Test, FileCacheNotEnabled) @@ -1635,8 +1654,7 @@ try removeAllFileCache(); } { - // Check whether on-disk file is successfully unlinked when there is a memory - // cache. + // Check whether on-disk file is successfully unlinked when there is a memory cache. auto * file_cache = FileCache::instance(); ASSERT_TRUE(std::filesystem::is_empty(file_cache->cache_dir)); } @@ -1913,5 +1931,4 @@ try } CATCH - } // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h new file mode 100644 index 00000000000..1174a991523 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h @@ -0,0 +1,94 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + + +namespace DB::DM::tests +{ + +class VectorIndexTestUtils +{ +public: + const ColumnID vec_column_id = 100; + const String vec_column_name = "vec"; + + /// Create a column with values like [1], [2], [3], ... + /// Each value is a VectorFloat32 with exactly one dimension. + static ColumnWithTypeAndName colInt64(std::string_view sequence, const String & name = "", Int64 column_id = 0) + { + auto data = genSequence(sequence); + return ::DB::tests::createColumn(data, name, column_id); + } + + static ColumnWithTypeAndName colVecFloat32(std::string_view sequence, const String & name = "", Int64 column_id = 0) + { + auto data = genSequence(sequence); + std::vector data_in_array; + for (auto & v : data) + { + Array vec; + vec.push_back(static_cast(v)); + data_in_array.push_back(vec); + } + return ::DB::tests::createVecFloat32Column(data_in_array, name, column_id); + } + + static String encodeVectorFloat32(const std::vector & vec) + { + WriteBufferFromOwnString wb; + Array arr; + for (const auto & v : vec) + arr.push_back(static_cast(v)); + EncodeVectorFloat32(arr, wb); + return wb.str(); + } + + ColumnDefine cdVec() + { + // When used in read, no need to assign vector_index. + return ColumnDefine(vec_column_id, vec_column_name, ::DB::tests::typeFromString("Array(Float32)")); + } + + static size_t cleanVectorCacheEntries(const std::shared_ptr & cache) + { + return cache->cleanOutdatedCacheEntries(); + } + + IndexInfosPtr indexInfo( + TiDB::VectorIndexDefinition definition = TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }) + { + const IndexInfos index_infos = IndexInfos{ + IndexInfo{ + .type = IndexType::Vector, + .column_id = vec_column_id, + .index_definition = std::make_shared(definition), + }, + }; + return std::make_shared(index_infos); + } +}; + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp index c06e90a1d88..b939f2028af 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp @@ -342,45 +342,6 @@ try } CATCH -TEST_P(SegmentReplaceStableData, UpdateMetaAfterLogicalSplit) -try -{ - writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); - flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); - mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); - - auto right_segment_id = splitSegmentAt( // - DELTA_MERGE_FIRST_SEGMENT_ID, - /* split_at= */ 50, - Segment::SplitMode::Logical); - ASSERT_TRUE(right_segment_id.has_value()); - - // The left and right segment shares the same stable. - // However we should be able to update their meta independently, - // as long as meta versions are different. - - ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); - ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); - ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); - ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); - - // Update left meta does not change right meta - replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "bar"); - ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); - ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); - ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); - ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); - - // Update right meta should fail, because right meta is still holding meta version 0 - // and will overwrite meta version 1. - ASSERT_THROW({ replaceSegmentStableWithNewMetaValue(*right_segment_id, "foo"); }, DB::Exception); - ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); - ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); - ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); - ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); -} -CATCH - TEST_P(SegmentReplaceStableData, RestoreSegment) try { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 63d9322b627..5f898376049 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -735,6 +736,36 @@ bool SegmentTestBasic::replaceSegmentStableData(PageIdU64 segment_id, const DMFi return success; } +bool SegmentTestBasic::ensureSegmentStableIndex(PageIdU64 segment_id, IndexInfosPtr local_index_infos) +{ + LOG_INFO(logger_op, "EnsureSegmentStableIndex, segment_id={}", segment_id); + + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + + bool success = false; + auto segment = segments[segment_id]; + auto dm_files = segment->getStable()->getDMFiles(); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files); + + // Build index + DMFileIndexWriter iw(DMFileIndexWriter::Options{ + .path_pool = storage_path_pool, + .index_infos = build_info.indexes_to_build, + .dm_files = dm_files, + .dm_context = *dm_context, + }); + auto new_dmfiles = iw.build(); + RUNTIME_CHECK(new_dmfiles.size() == 1); + + LOG_INFO(logger_op, "EnsureSegmentStableIndex, build index done, segment_id={}", segment_id); + + // Replace stable data + success = replaceSegmentStableData(segment_id, new_dmfiles[0]); + + operation_statistics["ensureStableIndex"]++; + return success; +} + bool SegmentTestBasic::areSegmentsSharingStable(const std::vector & segments_id) const { RUNTIME_CHECK(segments_id.size() >= 2); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 9a7ab721eec..180bcd57cb2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -101,6 +101,11 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic */ bool replaceSegmentStableData(PageIdU64 segment_id, const DMFilePtr & file); + /** + * Returns whether segment stable index is created. + */ + bool ensureSegmentStableIndex(PageIdU64 segment_id, IndexInfosPtr local_index_infos); + Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); Block prepareWriteBlockInSegmentRange( PageIdU64 segment_id, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_util.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_util.h index ebe2db01a17..e7e31757222 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_util.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_util.h @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#pragma once + #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index c061b7136d8..57543880c15 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -71,6 +71,7 @@ DTWorkload::DTWorkload( table_info->handle, table_info->is_common_handle, table_info->rowkey_column_indexes.size(), + nullptr, DeltaMergeStore::Settings()); stat.init_ms = sw.elapsedMilliseconds(); LOG_INFO(log, "Init store {} ms", stat.init_ms); diff --git a/dbms/src/Storages/FormatVersion.h b/dbms/src/Storages/FormatVersion.h index 0cd4427bb22..c7b39cc31a5 100644 --- a/dbms/src/Storages/FormatVersion.h +++ b/dbms/src/Storages/FormatVersion.h @@ -181,7 +181,7 @@ inline static const StorageFormatVersion STORAGE_FORMAT_V102 = StorageFormatVers .identifier = 102, }; -inline StorageFormatVersion STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V5; +inline StorageFormatVersion STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V7; inline const StorageFormatVersion & toStorageFormat(UInt64 setting) { diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index ad575bebb6f..dd186653ecf 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -1785,6 +1786,40 @@ SortDescription StorageDeltaMerge::getPrimarySortDescription() const return desc; } +IndexInfosPtr extractLocalIndexInfos(const TiDB::TableInfo & table_info) +{ + IndexInfosPtr index_infos = std::make_shared(); + index_infos->reserve(table_info.columns.size()); + for (const auto & col : table_info.columns) + { + // TODO: support more index type + if (col.vector_index) + { + // Vector Index requires a specific storage format to work. + if ((STORAGE_FORMAT_CURRENT.identifier > 0 && STORAGE_FORMAT_CURRENT.identifier < 6) + || STORAGE_FORMAT_CURRENT.identifier == 100) + { + LOG_ERROR( + Logger::get(), + "The current storage format is {}, which does not support building vector index. TiFlash will " + "write data without vector index.", + STORAGE_FORMAT_CURRENT.identifier); + return {}; + } + + index_infos->emplace_back(IndexInfo{ + .type = IndexType::Vector, + .column_id = col.id, + .column_name = col.name, + .index_definition = col.vector_index, + }); + } + } + + index_infos->shrink_to_fit(); + return index_infos; +} + DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) { if (storeInited()) @@ -1794,6 +1829,7 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread std::lock_guard lock(store_mutex); if (_store == nullptr) { + auto index_infos = extractLocalIndexInfos(tidb_table_info); _store = DeltaMergeStore::create( global_context, data_path_contains_database_name, @@ -1806,6 +1842,7 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread std::move(table_column_info->handle_column_define), is_common_handle, rowkey_column_size, + std::move(index_infos), DeltaMergeStore::Settings(), thread_pool); table_column_info.reset(nullptr); diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index 1830db53527..bc73dec0f63 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -29,6 +29,7 @@ namespace DB { + StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & name_) : name(name_) { @@ -40,6 +41,7 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam {"tidb_table", std::make_shared()}, {"keyspace_id", std::make_shared(std::make_shared())}, {"table_id", std::make_shared()}, + {"belonging_table_id", std::make_shared()}, {"column_name", std::make_shared()}, {"column_id", std::make_shared()}, @@ -114,6 +116,7 @@ BlockInputStreams StorageSystemDTLocalIndexes::read( else res_columns[j++]->insert(static_cast(keyspace_id)); res_columns[j++]->insert(table_id); + res_columns[j++]->insert(table_info.belonging_table_id); res_columns[j++]->insert(stat.column_name); res_columns[j++]->insert(stat.column_id); diff --git a/dbms/src/Storages/System/StorageSystemDTSegments.cpp b/dbms/src/Storages/System/StorageSystemDTSegments.cpp index 0092d984d49..0502501a891 100644 --- a/dbms/src/Storages/System/StorageSystemDTSegments.cpp +++ b/dbms/src/Storages/System/StorageSystemDTSegments.cpp @@ -40,6 +40,7 @@ StorageSystemDTSegments::StorageSystemDTSegments(const std::string & name_) {"tidb_table", std::make_shared()}, {"keyspace_id", std::make_shared(std::make_shared())}, {"table_id", std::make_shared()}, + {"belonging_table_id", std::make_shared()}, {"is_tombstone", std::make_shared()}, {"segment_id", std::make_shared()}, @@ -131,6 +132,7 @@ BlockInputStreams StorageSystemDTSegments::read( else res_columns[j++]->insert(static_cast(keyspace_id)); res_columns[j++]->insert(table_id); + res_columns[j++]->insert(table_info.belonging_table_id); res_columns[j++]->insert(dm_storage->getTombstone()); res_columns[j++]->insert(stat.segment_id); diff --git a/dbms/src/Storages/System/StorageSystemDTTables.cpp b/dbms/src/Storages/System/StorageSystemDTTables.cpp index 2a0d45957a1..33c8813128d 100644 --- a/dbms/src/Storages/System/StorageSystemDTTables.cpp +++ b/dbms/src/Storages/System/StorageSystemDTTables.cpp @@ -41,6 +41,7 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) {"tidb_table", std::make_shared()}, {"keyspace_id", std::make_shared(std::make_shared())}, {"table_id", std::make_shared()}, + {"belonging_table_id", std::make_shared()}, {"is_tombstone", std::make_shared()}, {"segment_count", std::make_shared()}, @@ -163,6 +164,7 @@ BlockInputStreams StorageSystemDTTables::read( else res_columns[j++]->insert(static_cast(keyspace_id)); res_columns[j++]->insert(table_id); + res_columns[j++]->insert(table_info.belonging_table_id); res_columns[j++]->insert(dm_storage->getTombstone()); res_columns[j++]->insert(stat.segment_count); diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index 2e93b3c67d9..b8ba74d477e 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -118,6 +118,8 @@ void TiFlashTestEnv::addGlobalContext( KeyManagerPtr key_manager = std::make_shared(false); global_context->initializeFileProvider(key_manager, false); + global_context->initializeGlobalLocalIndexerScheduler(1, 0); + // initialize background & blockable background thread pool global_context->setSettings(settings_); Settings & settings = global_context->getSettingsRef(); diff --git a/dbms/src/TiDB/Schema/TiDB.cpp b/dbms/src/TiDB/Schema/TiDB.cpp index 0b5af2a955e..d74be73795a 100644 --- a/dbms/src/TiDB/Schema/TiDB.cpp +++ b/dbms/src/TiDB/Schema/TiDB.cpp @@ -226,6 +226,8 @@ Field ColumnInfo::defaultValueToField() const return getYearValue(value.convert()); case TypeSet: TRY_CATCH_DEFAULT_VALUE_TO_FIELD({ return getSetValue(value.convert()); }); + case TypeTiDBVectorFloat32: + return genVectorFloat32Empty(); default: throw Exception("Have not processed type: " + std::to_string(tp)); } @@ -1286,6 +1288,11 @@ String genJsonNull() return null; } +String genVectorFloat32Empty() +{ + return String(4, '\0'); // Length=0 vector +} + tipb::FieldType columnInfoToFieldType(const ColumnInfo & ci) { tipb::FieldType ret; diff --git a/dbms/src/TiDB/Schema/TiDB.h b/dbms/src/TiDB/Schema/TiDB.h index 52acadb6936..7ef29e437bc 100644 --- a/dbms/src/TiDB/Schema/TiDB.h +++ b/dbms/src/TiDB/Schema/TiDB.h @@ -337,6 +337,8 @@ struct TableInfo String genJsonNull(); +String genVectorFloat32Empty(); + tipb::FieldType columnInfoToFieldType(const ColumnInfo & ci); ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type); ColumnInfo toTiDBColumnInfo(const tipb::ColumnInfo & tipb_column_info);