Skip to content

Commit

Permalink
enhance: support sparse vector mmap in growing segment type (#36566)
Browse files Browse the repository at this point in the history
issue: #32984
related pr: #36565

Signed-off-by: cqy123456 <[email protected]>
  • Loading branch information
cqy123456 authored Oct 15, 2024
1 parent bb3ef53 commit aa904be
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 56 deletions.
3 changes: 2 additions & 1 deletion internal/core/src/common/VectorTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ constexpr bool IsVariableType =
template <typename T>
constexpr bool IsVariableTypeSupportInChunk =
std::is_same_v<T, std::string> || std::is_same_v<T, Array> ||
std::is_same_v<T, Json>;
std::is_same_v<T, Json> ||
std::is_same_v<T, knowhere::sparse::SparseRow<float>>;

template <typename T>
using ChunkViewType = std::conditional_t<
Expand Down
71 changes: 38 additions & 33 deletions internal/core/src/mmap/ChunkData.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,10 @@ struct FixedLengthChunk {
size() {
return size_;
};
Type
get(const int i) const {
return data_[i];
}
const Type&
view(const int i) const {
return data_[i];
}
};

private:
int64_t size_ = 0;
Expand All @@ -73,19 +69,10 @@ struct VariableLengthChunk {
throw std::runtime_error(
"set should be a template specialization function");
}
inline Type
get(const int i) const {
throw std::runtime_error(
"get should be a template specialization function");
}
const ChunkViewType<Type>&
view(const int i) const {
return data_[i];
}
const ChunkViewType<Type>&
operator[](const int i) const {
return view(i);
}
void*
data() {
return data_.data();
Expand All @@ -100,6 +87,8 @@ struct VariableLengthChunk {
FixedVector<ChunkViewType<Type>> data_;
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
};

// Template specialization for string
template <>
inline void
VariableLengthChunk<std::string>::set(const std::string* src,
Expand Down Expand Up @@ -128,12 +117,40 @@ VariableLengthChunk<std::string>::set(const std::string* src,
offset += data_size;
}
}

// Template specialization for sparse vector
template <>
inline std::string
VariableLengthChunk<std::string>::get(const int i) const {
// copy to a string
return std::string(data_[i]);
inline void
VariableLengthChunk<knowhere::sparse::SparseRow<float>>::set(
const knowhere::sparse::SparseRow<float>* src,
uint32_t begin,
uint32_t length) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
milvus::ErrorCode err_code;
AssertInfo(
begin + length <= size_,
"failed to set a chunk with length: {} from beign {}, map_size={}",
length,
begin,
size_);

size_t total_size = 0;
for (auto i = 0; i < length; i++) {
total_size += src[i].data_byte_size();
}
auto buf = (uint8_t*)mcm->Allocate(mmap_descriptor_, total_size);
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
for (auto i = 0, offset = 0; i < length; i++) {
auto data_size = src[i].data_byte_size();
uint8_t* data_ptr = buf + offset;
std::memcpy(data_ptr, (uint8_t*)src[i].data(), data_size);
data_[i + begin] =
knowhere::sparse::SparseRow<float>(src[i].size(), data_ptr, false);
offset += data_size;
}
}

// Template specialization for json
template <>
inline void
VariableLengthChunk<Json>::set(const Json* src,
Expand Down Expand Up @@ -162,11 +179,8 @@ VariableLengthChunk<Json>::set(const Json* src,
offset += data_size;
}
}
template <>
inline Json
VariableLengthChunk<Json>::get(const int i) const {
return std::move(Json(simdjson::padded_string(data_[i].data())));
}

// Template specialization for array
template <>
inline void
VariableLengthChunk<Array>::set(const Array* src,
Expand Down Expand Up @@ -198,14 +212,5 @@ VariableLengthChunk<Array>::set(const Array* src,
offset += data_size;
}
}
template <>
inline Array
VariableLengthChunk<Array>::get(const int i) const {
auto array_view_i = data_[i];
char* data = static_cast<char*>(const_cast<void*>(array_view_i.data()));
return Array(data,
array_view_i.byte_size(),
array_view_i.get_element_type(),
array_view_i.get_offsets_in_copy());
}

} // namespace milvus
20 changes: 0 additions & 20 deletions internal/core/src/mmap/ChunkVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class ChunkVectorBase {
get_chunk_data(int64_t index) = 0;
virtual int64_t
get_chunk_size(int64_t index) = 0;
virtual Type
get_element(int64_t chunk_id, int64_t chunk_offset) = 0;
virtual int64_t
get_element_size() = 0;
virtual int64_t
Expand Down Expand Up @@ -106,23 +104,6 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
}
}

Type
get_element(int64_t chunk_id, int64_t chunk_offset) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
auto chunk = vec_[chunk_id];
AssertInfo(
chunk_id < this->counter_ && chunk_offset < chunk.size(),
fmt::format("index out of range, index={}, chunk_offset={}, cap={}",
chunk_id,
chunk_offset,
chunk.size()));
if constexpr (IsMmap) {
return chunk.get(chunk_offset);
} else {
return chunk[chunk_offset];
}
}

ChunkViewType<Type>
view_element(int64_t chunk_id, int64_t chunk_offset) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
Expand Down Expand Up @@ -229,7 +210,6 @@ SelectChunkVectorPtr(storage::MmapChunkDescriptorPtr& mmap_descriptor) {
return std::make_unique<ThreadSafeChunkVector<Type>>();
}
} else {
// todo: sparse float vector support mmap
return std::make_unique<ThreadSafeChunkVector<Type>>();
}
}
Expand Down
120 changes: 118 additions & 2 deletions internal/core/unittest/test_chunk_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
using namespace milvus::segcore;
using namespace milvus;
namespace pb = milvus::proto;

class ChunkVectorTest : public testing::Test {
class ChunkVectorTest : public ::testing::TestWithParam<bool> {
public:
void
SetUp() override {
Expand Down Expand Up @@ -172,9 +171,126 @@ TEST_F(ChunkVectorTest, FillDataWithMmap) {
num_inserted);
EXPECT_EQ(float_array_result->scalars().array_data().data_size(),
num_inserted);
// checking dense/sparse vector
auto fp32_vec_res =
fp32_vec_result.get()->mutable_vectors()->float_vector().data();
auto fp16_vec_res = (float16*)fp16_vec_result.get()
->mutable_vectors()
->float16_vector()
.data();
auto bf16_vec_res = (bfloat16*)bf16_vec_result.get()
->mutable_vectors()
->bfloat16_vector()
.data();
auto sparse_vec_res = SparseBytesToRows(
sparse_vec_result->vectors().sparse_float_vector().contents());
EXPECT_TRUE(fp32_vec_res.size() == num_inserted * dim);
auto fp32_vec_gt = dataset.get_col<float>(fp32_vec);
auto fp16_vec_gt = dataset.get_col<float16>(fp16_vec);
auto bf16_vec_gt = dataset.get_col<bfloat16>(bf16_vec);
auto sparse_vec_gt =
dataset.get_col<knowhere::sparse::SparseRow<float>>(sparse_vec);

for (size_t i = 0; i < num_inserted; ++i) {
auto id = ids_ds->GetIds()[i];
// check dense vector
for (size_t j = 0; j < 128; ++j) {
EXPECT_TRUE(fp32_vec_res[i * dim + j] ==
fp32_vec_gt[(id % per_batch) * dim + j]);
EXPECT_TRUE(fp16_vec_res[i * dim + j] ==
fp16_vec_gt[(id % per_batch) * dim + j]);
EXPECT_TRUE(bf16_vec_res[i * dim + j] ==
bf16_vec_gt[(id % per_batch) * dim + j]);
}
//check sparse vector
auto actual_row = sparse_vec_res[i];
auto expected_row = sparse_vec_gt[(id % per_batch)];
EXPECT_TRUE(actual_row.size() == expected_row.size());
for (size_t j = 0; j < actual_row.size(); ++j) {
EXPECT_TRUE(actual_row[j].id == expected_row[j].id);
EXPECT_TRUE(actual_row[j].val == expected_row[j].val);
}
}
}
}

INSTANTIATE_TEST_SUITE_P(IsSparse, ChunkVectorTest, ::testing::Bool());
TEST_P(ChunkVectorTest, SearchWithMmap) {
auto is_sparse = GetParam();
auto data_type =
is_sparse ? DataType::VECTOR_SPARSE_FLOAT : DataType::VECTOR_FLOAT;
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
auto random = schema->AddDebugField("random", DataType::DOUBLE);
auto vec = schema->AddDebugField("embeddings", data_type, 128, metric_type);
schema->set_primary_field_id(pk);

auto segment = CreateGrowingSegment(schema, empty_index_meta, 11, config);
auto segmentImplPtr = dynamic_cast<SegmentGrowingImpl*>(segment.get());

milvus::proto::plan::PlanNode plan_node;
auto vector_anns = plan_node.mutable_vector_anns();
if (is_sparse) {
vector_anns->set_vector_type(
milvus::proto::plan::VectorType::SparseFloatVector);
} else {
vector_anns->set_vector_type(
milvus::proto::plan::VectorType::FloatVector);
}
vector_anns->set_placeholder_tag("$0");
vector_anns->set_field_id(102);
auto query_info = vector_anns->mutable_query_info();
query_info->set_topk(5);
query_info->set_round_decimal(3);
query_info->set_metric_type(metric_type);
query_info->set_search_params(R"({"nprobe": 16})");
auto plan_str = plan_node.SerializeAsString();

int64_t per_batch = 10000;
int64_t n_batch = 3;
int64_t top_k = 5;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
auto offset = segment->PreInsert(per_batch);
auto pks = dataset.get_col<int64_t>(pk);
segment->Insert(offset,
per_batch,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
const VectorBase* field_data = nullptr;
if (is_sparse) {
field_data = segmentImplPtr->get_insert_record()
.get_data<milvus::SparseFloatVector>(vec);
} else {
field_data = segmentImplPtr->get_insert_record()
.get_data<milvus::FloatVector>(vec);
}
auto inserted = (i + 1) * per_batch;

auto num_queries = 5;
auto ph_group_raw =
is_sparse ? CreateSparseFloatPlaceholderGroup(num_queries)
: CreatePlaceholderGroup(num_queries, 128, 1024);

auto plan = milvus::query::CreateSearchPlanByExpr(
*schema, plan_str.data(), plan_str.size());
auto ph_group =
ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
Timestamp timestamp = 1000000;
auto sr = segment->Search(plan.get(), ph_group.get(), timestamp);
EXPECT_EQ(sr->total_nq_, num_queries);
EXPECT_EQ(sr->unity_topK_, top_k);
EXPECT_EQ(sr->distances_.size(), num_queries * top_k);
EXPECT_EQ(sr->seg_offsets_.size(), num_queries * top_k);
for (auto i = 0; i < num_queries; i++) {
for (auto k = 0; k < top_k; k++) {
EXPECT_NE(sr->seg_offsets_.data()[i * top_k + k], -1);
EXPECT_FALSE(std::isnan(sr->distances_.data()[i * top_k + k]));
}
}
}
}
TEST_F(ChunkVectorTest, QueryWithMmap) {
auto schema = std::make_shared<Schema>();
schema->AddDebugField(
Expand Down

0 comments on commit aa904be

Please sign in to comment.