Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add related bin log size into search stream reduce #33010

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/design_docs/segcore/segment_interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

1. `get_row_count`: Get the number of entities in the segment
2. `get_schema`: Get the corresponding collection schema in the segment
3. `GetMemoryUsageInBytes`: Get memory usage of a segment
3. `GetResourceUsage`: Get memory and disk usage of a segment.
4. `Search(plan, placeholderGroup, timestamp) -> QueryResult`: Perform search operations according to the plan containing search parameters and predicate conditions, and return search results. Ensure that the time of all search results is before the specified timestamp(MVCC)
5. `FillTargetEntry(plan, &queryResult)`: Fill the missing column data for search results based on target columns in the plan

Expand Down
47 changes: 41 additions & 6 deletions internal/core/src/common/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,61 @@
static File
Open(const std::string_view filepath, int flags) {
int fd = open(filepath.data(), flags, S_IRUSR | S_IWUSR);
AssertInfo(fd != -1,
"failed to create mmap file {}: {}",
filepath,
strerror(errno));
if (fd < 0) {
throw SegcoreError(ErrorCode::UnistdError,
fmt::format("failed to open file at {}: {}",

Check warning on line 40 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L40

Added line #L40 was not covered by tests
filepath,
strerror(errno)));

Check warning on line 42 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L42

Added line #L42 was not covered by tests
}
return File(fd);
}

int
Descriptor() const {
if (fd_ < 0) {
throw SegcoreError(ErrorCode::UnistdError,
"file descriptor is invalid");

Check warning on line 51 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L51

Added line #L51 was not covered by tests
}
return fd_;
}

ssize_t
Write(const void* buf, size_t size) {
return write(fd_, buf, size);
if (fd_ < 0) {
throw SegcoreError(ErrorCode::UnistdError,
"file descriptor is invalid when writing file");

Check warning on line 60 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L60

Added line #L60 was not covered by tests
}

ssize_t cnt = write(fd_, buf, size);
if (cnt < 0) {
throw SegcoreError(ErrorCode::UnistdError,
fmt::format("failed to write file: stderror: {}",
strerror(errno)));

Check warning on line 67 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L66-L67

Added lines #L66 - L67 were not covered by tests
}
if (cnt != size) {
throw SegcoreError(
ErrorCode::UnistdError,
fmt::format("short write to file: written: {}, expected: {}",

Check warning on line 72 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L72

Added line #L72 was not covered by tests
cnt,
size));

Check warning on line 74 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L74

Added line #L74 was not covered by tests
}
return cnt;
}

offset_t
Seek(offset_t offset, int whence) {
return lseek(fd_, offset, whence);
if (fd_ < 0) {
throw SegcoreError(ErrorCode::UnistdError,
"file descriptor is invalid when seek file");

Check warning on line 83 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L83

Added line #L83 was not covered by tests
}

offset_t position = lseek(fd_, offset, whence);
if (position < 0) {
throw SegcoreError(ErrorCode::UnistdError,
fmt::format("failed to seek file: stderror: {}",
strerror(errno)));

Check warning on line 90 in internal/core/src/common/File.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/common/File.h#L89-L90

Added lines #L89 - L90 were not covered by tests
}
return position;
}

void
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,11 @@ struct TypeTraits<DataType::VECTOR_FLOAT> {
static constexpr const char* Name = "VECTOR_FLOAT";
};

struct ResourceUsage {
size_t mem_size;
size_t disk_size;
};

} // namespace milvus
template <>
struct fmt::formatter<milvus::DataType> : formatter<string_view> {
Expand Down
7 changes: 7 additions & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>

#ifdef __cplusplus
Expand Down Expand Up @@ -63,6 +64,12 @@ typedef struct CStatus {
const char* error_msg;
} CStatus;

// CSegmentResourceUsage is used to represent the resource usage of a segment, and used by CGO.
typedef struct CSegmentResourceUsage {
size_t mem_size;
size_t disk_size;
} CSegmentResourceUsage;

typedef struct CProto {
const void* proto_blob;
int64_t proto_size;
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class IndexBase {
return index_type_;
}

virtual ResourceUsage
GetResourceUsage() const = 0;

protected:
explicit IndexBase(IndexType index_type)
: index_type_(std::move(index_type)) {
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/index/InvertedIndexTantivy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,14 @@ InvertedIndexTantivy<T>::BuildWithRawData(size_t n,
}
}

template <typename T>
ResourceUsage
InvertedIndexTantivy<T>::GetResourceUsage() const {
// TODO: we can't get the memory usage of inverted index now.
auto disk_size = disk_file_manager_->GetLocalFileSize();
return ResourceUsage{0, disk_size};
}

template class InvertedIndexTantivy<bool>;
template class InvertedIndexTantivy<int8_t>;
template class InvertedIndexTantivy<int16_t>;
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/InvertedIndexTantivy.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
const TargetBitmap
RegexQuery(const std::string& pattern) override;

ResourceUsage
GetResourceUsage() const override;

private:
void
finish();
Expand Down
9 changes: 9 additions & 0 deletions internal/core/src/index/ScalarIndexSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,15 @@ ScalarIndexSort<T>::ShouldSkip(const T lower_value,
return true;
}

template <typename T>
ResourceUsage
ScalarIndexSort<T>::GetResourceUsage() const {
auto mem_size = data_.capacity() * sizeof(IndexStructure<T>) +
idx_to_offsets_.capacity() * sizeof(int32_t);
// TODO: after enabling mmap, move the mem_size as disk_size.
return ResourceUsage{mem_size, 0};
}

template class ScalarIndexSort<bool>;
template class ScalarIndexSort<int8_t>;
template class ScalarIndexSort<int16_t>;
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/ScalarIndexSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class ScalarIndexSort : public ScalarIndex<T> {
return true;
}

ResourceUsage
GetResourceUsage() const override;

private:
bool
ShouldSkip(const T lower_value, const T upper_value, const OpType op);
Expand Down
20 changes: 15 additions & 5 deletions internal/core/src/index/StringIndexMarisa.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "index/Index.h"
#include "storage/Util.h"
#include "storage/space.h"
#include "storage/LocalChunkManagerSingleton.h"

namespace milvus::index {

Expand Down Expand Up @@ -256,18 +257,21 @@
auto len = index->size;

auto file = File::Open(file_name, O_RDWR | O_CREAT | O_EXCL);
auto written = file.Write(index->data.get(), len);
if (written != len) {
try {
auto written = file.Write(index->data.get(), len);
} catch (const SegcoreError& e) {

Check warning on line 262 in internal/core/src/index/StringIndexMarisa.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/StringIndexMarisa.cpp#L262

Added line #L262 was not covered by tests
file.Close();
remove(file_name.c_str());
throw SegcoreError(
ErrorCode::UnistdError,
fmt::format("write index to fd error: {}", strerror(errno)));
throw;

Check warning on line 265 in internal/core/src/index/StringIndexMarisa.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/StringIndexMarisa.cpp#L265

Added line #L265 was not covered by tests
}

file.Seek(0, SEEK_SET);
if (config.contains(kEnableMmap)) {
trie_.mmap(file_name.c_str());
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
resource_usage_.disk_size += local_chunk_manager->Size(file_name);
} else {
trie_.read(file.Descriptor());
}
Expand Down Expand Up @@ -563,4 +567,10 @@
return std::string(agent.key().ptr(), agent.key().length());
}

ResourceUsage
StringIndexMarisa::GetResourceUsage() const {
// TODO: we cannot estimate the memory usage of marisa trie.
return resource_usage_;
}

} // namespace milvus::index
4 changes: 4 additions & 0 deletions internal/core/src/index/StringIndexMarisa.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class StringIndexMarisa : public StringIndex {
return true;
}

ResourceUsage
GetResourceUsage() const override;

private:
void
fill_str_ids(size_t n, const std::string* values);
Expand All @@ -123,6 +126,7 @@ class StringIndexMarisa : public StringIndex {
bool built_ = false;
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
std::shared_ptr<milvus_storage::Space> space_;
ResourceUsage resource_usage_{};
};

using StringIndexMarisaPtr = std::unique_ptr<StringIndexMarisa>;
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,14 @@
return load_config;
}

template <typename T>
ResourceUsage
VectorDiskAnnIndex<T>::GetResourceUsage() const {

Check warning on line 515 in internal/core/src/index/VectorDiskIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorDiskIndex.cpp#L515

Added line #L515 was not covered by tests
// TODO: we can't get the memory usage of vector index now.
auto disk_size = file_manager_->GetLocalFileSize();
return ResourceUsage{0, disk_size};

Check warning on line 518 in internal/core/src/index/VectorDiskIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorDiskIndex.cpp#L517-L518

Added lines #L517 - L518 were not covered by tests
}

template class VectorDiskAnnIndex<float>;
template class VectorDiskAnnIndex<float16>;
template class VectorDiskAnnIndex<bfloat16>;
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/index/VectorDiskIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <vector>

#include "common/Types.h"
#include "index/VectorIndex.h"
#include "storage/DiskFileManagerImpl.h"
#include "storage/space.h"
Expand Down Expand Up @@ -112,6 +113,9 @@ class VectorDiskAnnIndex : public VectorIndex {
const knowhere::Json& json,
const BitsetView& bitset) const override;

ResourceUsage
GetResourceUsage() const override;

private:
knowhere::Json
update_load_json(const Config& config);
Expand Down
21 changes: 15 additions & 6 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "log/Log.h"
#include "storage/DataCodec.h"
#include "storage/MemFileManagerImpl.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/ThreadPools.h"
#include "storage/space.h"
#include "storage/Util.h"
Expand Down Expand Up @@ -760,14 +761,9 @@
"lost index slice data");
auto data = batch_data[file_name];
auto start_write_file = std::chrono::system_clock::now();
auto written = file.Write(data->Data(), data->Size());
file.Write(data->Data(), data->Size());

Check warning on line 764 in internal/core/src/index/VectorMemIndex.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/index/VectorMemIndex.cpp#L764

Added line #L764 was not covered by tests
write_disk_duration_sum +=
(std::chrono::system_clock::now() - start_write_file);
AssertInfo(
written == data->Size(),
fmt::format("failed to write index data to disk {}: {}",
filepath->data(),
strerror(errno)));
}
for (auto& file : batch) {
pending_index_files.erase(file);
Expand Down Expand Up @@ -831,6 +827,11 @@
auto dim = index_.Dim();
this->SetDim(index_.Dim());

auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager();
resource_usage_.disk_size += local_chunk_manager->Size(filepath.value());

auto ok = unlink(filepath->data());
AssertInfo(ok == 0,
"failed to unlink mmap index file {}: {}",
Expand Down Expand Up @@ -953,6 +954,14 @@
strerror(errno));
LOG_INFO("load vector index done");
}

template <typename T>
ResourceUsage
VectorMemIndex<T>::GetResourceUsage() const {
// TODO: we can't get the memory usage of vector index now.
return resource_usage_;
}

template class VectorMemIndex<float>;
template class VectorMemIndex<uint8_t>;
template class VectorMemIndex<float16>;
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,13 @@ class VectorMemIndex : public VectorIndex {
void
LoadFromFileV2(const Config& config);

ResourceUsage
GetResourceUsage() const override;

protected:
Config config_;

ResourceUsage resource_usage_{};
knowhere::Index<knowhere::IndexNode> index_;
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
std::shared_ptr<milvus_storage::Space> space_;
Expand Down
Loading
Loading