From 8923936c9a17bd5a5a15f195b93e62eff6d739a4 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 18 Oct 2024 17:03:25 +0800 Subject: [PATCH] enhance: Support memory mode chunk cache (#35347) (#35836) Chunk cache supports loading raw vectors into memory. issue: https://github.com/milvus-io/milvus/issues/35273 pr: https://github.com/milvus-io/milvus/pull/35347 --------- Signed-off-by: bigsheeper --- configs/milvus.yaml | 1 + internal/core/src/segcore/SegmentSealed.h | 2 +- .../core/src/segcore/SegmentSealedImpl.cpp | 13 +- internal/core/src/segcore/SegmentSealedImpl.h | 2 +- internal/core/src/segcore/segment_c.cpp | 6 +- internal/core/src/segcore/segment_c.h | 4 +- internal/core/src/storage/ChunkCache.cpp | 26 +++- internal/core/src/storage/ChunkCache.h | 9 +- internal/core/unittest/test_chunk_cache.cpp | 118 +++++++++++++++++- internal/core/unittest/test_sealed.cpp | 2 +- internal/querynodev2/segments/segment.go | 12 +- internal/querynodev2/server.go | 10 +- pkg/util/paramtable/component_param.go | 10 ++ pkg/util/paramtable/component_param_test.go | 2 + 14 files changed, 186 insertions(+), 31 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8e6f1c6e2aa63..3a589e06a8018 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -416,6 +416,7 @@ queryNode: vectorIndex: false # Enable mmap for loading vector index scalarField: false # Enable mmap for loading scalar data scalarIndex: false # Enable mmap for loading scalar index + chunkCache: true # Enable mmap for chunk cache (raw vector retrieving). # Enable memory mapping (mmap) to optimize the handling of growing raw data. # By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized. # However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments. diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index ad73665711c53..2df9833680364 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -41,7 +41,7 @@ class SegmentSealed : public SegmentInternalInterface { virtual void AddFieldDataInfoForSealed(const LoadFieldDataInfo& field_data_info) = 0; virtual void - WarmupChunkCache(const FieldId field_id) = 0; + WarmupChunkCache(const FieldId field_id, bool mmap_enabled) = 0; SegmentType type() const override { diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 2a278cdc2aa5f..04e0d448a8330 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -129,7 +129,7 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) { } void -SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) { +SegmentSealedImpl::WarmupChunkCache(const FieldId field_id, bool mmap_enabled) { auto& field_meta = schema_->operator[](field_id); AssertInfo(field_meta.is_vector(), "vector field is not vector type"); @@ -155,7 +155,11 @@ SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) { auto cc = storage::MmapManager::GetInstance().GetChunkCache(); const bool mmap_rss_not_need = true; for (const auto& data_path : field_info.insert_files) { - auto column = cc->Read(data_path, mmap_descriptor_, mmap_rss_not_need); + auto column = cc->Read(data_path, + mmap_descriptor_, + field_meta, + mmap_enabled, + mmap_rss_not_need); } } @@ -909,7 +913,10 @@ std::tuple> static ReadFromChunkCache( const storage::ChunkCachePtr& cc, const std::string& data_path, const storage::MmapChunkDescriptorPtr& descriptor) { - auto column = cc->Read(data_path, descriptor); + // For mmap mode, field_meta is unused, so just construct a fake field meta. + auto fm = FieldMeta(FieldName(""), FieldId(0), milvus::DataType::NONE); + // TODO: add Load() interface for chunk cache when support retrieve_enable, make Read() raise error if cache miss + auto column = cc->Read(data_path, descriptor, fm, true); cc->Prefetch(data_path); return {data_path, column}; } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index acb5bc364676f..49ef7c15ffe41 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -292,7 +292,7 @@ class SegmentSealedImpl : public SegmentSealed { LoadScalarIndex(const LoadIndexInfo& info); void - WarmupChunkCache(const FieldId field_id) override; + WarmupChunkCache(const FieldId field_id, bool mmap_enabled) override; bool generate_interim_index(const FieldId field_id); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index e662c22181a9c..62297ec886ddf 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -486,14 +486,16 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment, } CStatus -WarmupChunkCache(CSegmentInterface c_segment, int64_t field_id) { +WarmupChunkCache(CSegmentInterface c_segment, + int64_t field_id, + bool mmap_enabled) { try { auto segment_interface = reinterpret_cast(c_segment); auto segment = dynamic_cast(segment_interface); AssertInfo(segment != nullptr, "segment conversion failed"); - segment->WarmupChunkCache(milvus::FieldId(field_id)); + segment->WarmupChunkCache(milvus::FieldId(field_id), mmap_enabled); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(milvus::UnexpectedError, e.what()); diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index ec25518348234..ef7bb1a91306a 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -137,7 +137,9 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment, CLoadFieldDataInfo c_load_field_data_info); CStatus -WarmupChunkCache(CSegmentInterface c_segment, int64_t field_id); +WarmupChunkCache(CSegmentInterface c_segment, + int64_t field_id, + bool mmap_enabled); ////////////////////////////// interfaces for SegmentInterface ////////////////////////////// CStatus diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 9167982d8b51f..f6586c52681f5 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -24,6 +24,8 @@ namespace milvus::storage { std::shared_ptr ChunkCache::Read(const std::string& filepath, const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled, const bool mmap_rss_not_need) { // use rlock to get future { @@ -62,8 +64,9 @@ ChunkCache::Read(const std::string& filepath, std::string err_msg = ""; try { field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath); - column = Mmap(field_data->GetFieldData(), descriptor); - if (mmap_rss_not_need) { + column = Mmap( + field_data->GetFieldData(), descriptor, field_meta, mmap_enabled); + if (mmap_enabled && mmap_rss_not_need) { auto ok = madvise(reinterpret_cast( const_cast(column->MmappedData())), column->DataByteSize(), @@ -136,7 +139,9 @@ ChunkCache::Prefetch(const std::string& filepath) { std::shared_ptr ChunkCache::Mmap(const FieldDataPtr& field_data, - const MmapChunkDescriptorPtr& descriptor) { + const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled) { auto data_type = field_data->get_data_type(); auto data_size = field_data->Size(); @@ -144,13 +149,22 @@ ChunkCache::Mmap(const FieldDataPtr& field_data, std::shared_ptr column{}; if (IsSparseFloatVectorDataType(data_type)) { - column = std::make_shared(mcm_, descriptor); + if (mmap_enabled) { + column = std::make_shared(mcm_, descriptor); + } else { + column = std::make_shared(field_meta); + } } else if (IsVariableDataType(data_type)) { AssertInfo( false, "TODO: unimplemented for variable data type: {}", data_type); } else { - column = - std::make_shared(data_size, data_type, mcm_, descriptor); + if (mmap_enabled) { + column = std::make_shared( + data_size, data_type, mcm_, descriptor); + } else { + column = std::make_shared(field_data->get_num_rows(), + field_meta); + } } column->AppendBatch(field_data); return column; diff --git a/internal/core/src/storage/ChunkCache.h b/internal/core/src/storage/ChunkCache.h index 91c37b13a1fa3..528c05f655147 100644 --- a/internal/core/src/storage/ChunkCache.h +++ b/internal/core/src/storage/ChunkCache.h @@ -47,6 +47,8 @@ class ChunkCache { std::shared_ptr Read(const std::string& filepath, const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled, const bool mmap_rss_not_need = false); void @@ -58,10 +60,9 @@ class ChunkCache { private: std::shared_ptr Mmap(const FieldDataPtr& field_data, - const MmapChunkDescriptorPtr& descriptor); - - std::string - CachePath(const std::string& filepath); + const MmapChunkDescriptorPtr& descriptor, + const FieldMeta& field_meta, + bool mmap_enabled); private: using ColumnTable = std::unordered_map< diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index ce0a74b595028..75b15554a4874 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -116,7 +116,8 @@ TEST_P(ChunkCacheTest, Read) { auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); // validate dense data - const auto& dense_column = cc->Read(dense_file_name, descriptor); + const auto& dense_column = + cc->Read(dense_file_name, descriptor, dense_field_meta, true); Assert(dense_column->DataByteSize() == dim * N * 4); auto actual_dense = (const float*)(dense_column->Data()); for (auto i = 0; i < N * dim; i++) { @@ -126,7 +127,114 @@ TEST_P(ChunkCacheTest, Read) { } // validate sparse data - const auto& sparse_column = cc->Read(sparse_file_name, descriptor); + const auto& sparse_column = + cc->Read(sparse_file_name, descriptor, sparse_field_meta, true); + auto expected_sparse_size = 0; + auto actual_sparse = + (const knowhere::sparse::SparseRow*)(sparse_column->Data()); + for (auto i = 0; i < N; i++) { + const auto& actual_sparse_row = actual_sparse[i]; + const auto& expect_sparse_row = sparse_data[i]; + AssertInfo( + actual_sparse_row.size() == expect_sparse_row.size(), + fmt::format("Incorrect size of sparse row: expect {}, actual {}", + expect_sparse_row.size(), + actual_sparse_row.size())); + auto bytes = actual_sparse_row.data_byte_size(); + AssertInfo( + memcmp(actual_sparse_row.data(), expect_sparse_row.data(), bytes) == + 0, + fmt::format("Incorrect data of sparse row: expect {}, actual {}", + expect_sparse_row.data(), + actual_sparse_row.data())); + expected_sparse_size += bytes; + } + + ASSERT_EQ(sparse_column->DataByteSize(), expected_sparse_size); + + cc->Remove(dense_file_name); + cc->Remove(sparse_file_name); + lcm->Remove(dense_file_name); + lcm->Remove(sparse_file_name); +} + +TEST_F(ChunkCacheTest, ReadByMemoryMode) { + auto N = 10000; + auto dim = 128; + auto dense_metric_type = knowhere::metric::L2; + auto sparse_metric_type = knowhere::metric::IP; + + auto schema = std::make_shared(); + auto fake_dense_vec_id = schema->AddDebugField( + "fakevec", milvus::DataType::VECTOR_FLOAT, dim, dense_metric_type); + auto i64_fid = schema->AddDebugField("counter", milvus::DataType::INT64); + auto fake_sparse_vec_id = + schema->AddDebugField("fakevec_sparse", + milvus::DataType::VECTOR_SPARSE_FLOAT, + dim, + sparse_metric_type); + schema->set_primary_field_id(i64_fid); + + auto dataset = milvus::segcore::DataGen(schema, N); + + auto dense_field_data_meta = + milvus::storage::FieldDataMeta{1, 2, 3, fake_dense_vec_id.get()}; + auto sparse_field_data_meta = + milvus::storage::FieldDataMeta{1, 2, 3, fake_sparse_vec_id.get()}; + auto dense_field_meta = milvus::FieldMeta(milvus::FieldName("fakevec"), + fake_dense_vec_id, + milvus::DataType::VECTOR_FLOAT, + dim, + dense_metric_type); + auto sparse_field_meta = + milvus::FieldMeta(milvus::FieldName("fakevec_sparse"), + fake_sparse_vec_id, + milvus::DataType::VECTOR_SPARSE_FLOAT, + dim, + sparse_metric_type); + + auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager(); + auto dense_data = dataset.get_col(fake_dense_vec_id); + auto sparse_data = + dataset.get_col>(fake_sparse_vec_id); + + auto data_slices = std::vector{dense_data.data()}; + auto slice_sizes = std::vector{static_cast(N)}; + auto slice_names = std::vector{dense_file_name}; + PutFieldData(lcm.get(), + data_slices, + slice_sizes, + slice_names, + dense_field_data_meta, + dense_field_meta); + + data_slices = std::vector{sparse_data.data()}; + slice_sizes = std::vector{static_cast(N)}; + slice_names = std::vector{sparse_file_name}; + PutFieldData(lcm.get(), + data_slices, + slice_sizes, + slice_names, + sparse_field_data_meta, + sparse_field_meta); + + auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); + + // validate dense data + const auto& dense_column = + cc->Read(dense_file_name, descriptor, dense_field_meta, false); + Assert(dense_column->DataByteSize() == dim * N * 4); + auto actual_dense = (const float*)(dense_column->Data()); + for (auto i = 0; i < N * dim; i++) { + AssertInfo(dense_data[i] == actual_dense[i], + fmt::format( + "expect {}, actual {}", dense_data[i], actual_dense[i])); + } + + // validate sparse data + const auto& sparse_column = + cc->Read(sparse_file_name, descriptor, sparse_field_meta, false); auto expected_sparse_size = 0; auto actual_sparse = (const knowhere::sparse::SparseRow*)(sparse_column->Data()); @@ -222,7 +330,8 @@ TEST_P(ChunkCacheTest, TestMultithreads) { constexpr int threads = 16; std::vector total_counts(threads); auto executor = [&](int thread_id) { - const auto& dense_column = cc->Read(dense_file_name, descriptor); + const auto& dense_column = + cc->Read(dense_file_name, descriptor, dense_field_meta, true); Assert(dense_column->DataByteSize() == dim * N * 4); auto actual_dense = (const float*)dense_column->Data(); @@ -233,7 +342,8 @@ TEST_P(ChunkCacheTest, TestMultithreads) { "expect {}, actual {}", dense_data[i], actual_dense[i])); } - const auto& sparse_column = cc->Read(sparse_file_name, descriptor); + const auto& sparse_column = + cc->Read(sparse_file_name, descriptor, sparse_field_meta, true); auto actual_sparse = (const knowhere::sparse::SparseRow*)sparse_column->Data(); for (auto i = 0; i < N; i++) { diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 4efbe7387d7ea..dbfc0bceb280f 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1654,7 +1654,7 @@ TEST(Sealed, WarmupChunkCache) { auto has = segment->HasRawData(vec_info.field_id); EXPECT_FALSE(has); - segment_sealed->WarmupChunkCache(FieldId(vec_info.field_id)); + segment_sealed->WarmupChunkCache(FieldId(vec_info.field_id), true); auto ids_ds = GenRandomIds(N); auto result = diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 85325114e26a9..f3da0169d8366 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1384,7 +1384,8 @@ func (s *LocalSegment) innerLoadIndex(ctx context.Context, } // 4. - s.WarmupChunkCache(ctx, indexInfo.GetFieldID()) + mmapChunkCache := paramtable.Get().QueryNodeCfg.MmapChunkCache.GetAsBool() + s.WarmupChunkCache(ctx, indexInfo.GetFieldID(), mmapChunkCache) warmupChunkCacheSpan := tr.RecordSpan() log.Info("Finish loading index", zap.Duration("newLoadIndexInfoSpan", newLoadIndexInfoSpan), @@ -1437,12 +1438,13 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F return nil } -func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { +func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmapEnabled bool) { log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), zap.Int64("fieldID", fieldID), + zap.Bool("mmapEnabled", mmapEnabled), ) if !s.ptrLock.RLockIf(state.IsNotReleased) { return @@ -1456,7 +1458,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { case "sync": GetWarmupPool().Submit(func() (any, error) { cFieldID := C.int64_t(fieldID) - status = C.WarmupChunkCache(s.ptr, cFieldID) + cMmapEnabled := C.bool(mmapEnabled) + status = C.WarmupChunkCache(s.ptr, cFieldID, cMmapEnabled) if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil { log.Warn("warming up chunk cache synchronously failed", zap.Error(err)) return nil, err @@ -1476,7 +1479,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) { defer s.ptrLock.RUnlock() cFieldID := C.int64_t(fieldID) - status = C.WarmupChunkCache(s.ptr, cFieldID) + cMmapEnabled := C.bool(mmapEnabled) + status = C.WarmupChunkCache(s.ptr, cFieldID, cMmapEnabled) if err := HandleCStatus(ctx, &status, ""); err != nil { log.Warn("warming up chunk cache asynchronously failed", zap.Error(err)) return nil, err diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 62c2c5389ea13..8a1f1eedcb9ad 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -381,8 +381,9 @@ func (node *QueryNode) Start() error { growingmmapEnable := paramtable.Get().QueryNodeCfg.GrowingMmapEnabled.GetAsBool() mmapVectorIndex := paramtable.Get().QueryNodeCfg.MmapVectorIndex.GetAsBool() mmapVectorField := paramtable.Get().QueryNodeCfg.MmapVectorField.GetAsBool() - mmapScarlarIndex := paramtable.Get().QueryNodeCfg.MmapScalarIndex.GetAsBool() - mmapScarlarField := paramtable.Get().QueryNodeCfg.MmapScalarField.GetAsBool() + mmapScalarIndex := paramtable.Get().QueryNodeCfg.MmapScalarIndex.GetAsBool() + mmapScalarField := paramtable.Get().QueryNodeCfg.MmapScalarField.GetAsBool() + mmapChunkCache := paramtable.Get().QueryNodeCfg.MmapChunkCache.GetAsBool() node.UpdateStateCode(commonpb.StateCode_Healthy) @@ -394,8 +395,9 @@ func (node *QueryNode) Start() error { zap.Bool("growingmmapEnable", growingmmapEnable), zap.Bool("mmapVectorIndex", mmapVectorIndex), zap.Bool("mmapVectorField", mmapVectorField), - zap.Bool("mmapScarlarIndex", mmapScarlarIndex), - zap.Bool("mmapScarlarField", mmapScarlarField), + zap.Bool("mmapScalarIndex", mmapScalarIndex), + zap.Bool("mmapScalarField", mmapScalarField), + zap.Bool("mmapChunkCache", mmapChunkCache), ) }) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 45651322eb792..5c3ac80a21da2 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2353,6 +2353,7 @@ type queryNodeConfig struct { MmapVectorIndex ParamItem `refreshable:"false"` MmapScalarField ParamItem `refreshable:"false"` MmapScalarIndex ParamItem `refreshable:"false"` + MmapChunkCache ParamItem `refreshable:"false"` GrowingMmapEnabled ParamItem `refreshable:"false"` FixedFileSizeForMmapManager ParamItem `refreshable:"false"` MaxMmapDiskPercentageForMmapManager ParamItem `refreshable:"false"` @@ -2673,6 +2674,15 @@ This defaults to true, indicating that Milvus creates temporary index for growin } p.MmapScalarIndex.Init(base.mgr) + p.MmapChunkCache = ParamItem{ + Key: "queryNode.mmap.chunkCache", + Version: "2.4.12", + DefaultValue: "true", + Doc: "Enable mmap for chunk cache (raw vector retrieving).", + Export: true, + } + p.MmapChunkCache.Init(base.mgr) + p.GrowingMmapEnabled = ParamItem{ Key: "queryNode.mmap.growingMmapEnabled", Version: "2.4.6", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 41604b33586ae..4657dd7d74de8 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -452,6 +452,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) + + assert.Equal(t, true, Params.MmapChunkCache.GetAsBool()) }) t.Run("test dataCoordConfig", func(t *testing.T) {