From c6f8a73bb2328be5db5be95d0ac33330ed29bcdf Mon Sep 17 00:00:00 2001 From: zhagnlu <1542303831@qq.com> Date: Tue, 4 Jun 2024 14:09:47 +0800 Subject: [PATCH] enhance: optimize some cache to reduce memory usage (#33534) #33533 Signed-off-by: luzhang Co-authored-by: luzhang --- internal/core/src/mmap/Column.h | 7 +- internal/core/src/segcore/InsertRecord.h | 13 +--- .../core/src/segcore/SegmentGrowingImpl.cpp | 9 +-- internal/core/src/segcore/SegmentInterface.h | 1 + .../core/src/segcore/SegmentSealedImpl.cpp | 75 ++++--------------- 5 files changed, 24 insertions(+), 81 deletions(-) diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index bda4ca16a9edd..916bb07b0e1c8 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -459,9 +459,7 @@ class VariableColumn : public ColumnBase { std::string_view RawAt(const int i) const { - size_t len = (i == indices_.size() - 1) ? size_ - indices_.back() - : indices_[i + 1] - indices_[i]; - return std::string_view(data_ + indices_[i], len); + return std::string_view(views_[i]); } void @@ -502,6 +500,9 @@ class VariableColumn : public ColumnBase { } ConstructViews(); + + // Not need indices_ after + indices_.clear(); } protected: diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 7da03c1828b36..13a92d22e760a 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -212,7 +212,8 @@ class OffsetOrderedArray : public OffsetMap { PanicInfo(Unsupported, "OffsetOrderedArray could not insert after seal"); } - array_.push_back(std::make_pair(std::get(pk), offset)); + array_.push_back( + std::make_pair(std::get(pk), static_cast(offset))); } void @@ -285,13 +286,13 @@ class OffsetOrderedArray : public OffsetMap { private: bool is_sealed = false; - std::vector> array_; + std::vector> array_; }; template struct InsertRecord { InsertRecord(const Schema& schema, int64_t size_per_chunk) - : row_ids_(size_per_chunk), timestamps_(size_per_chunk) { + : timestamps_(size_per_chunk) { std::optional pk_field_id = schema.get_primary_field_id(); for (auto& field : schema) { @@ -590,10 +591,8 @@ struct InsertRecord { void clear() { timestamps_.clear(); - row_ids_.clear(); reserved = 0; ack_responder_.clear(); - timestamp_index_ = TimestampIndex(); pk2offset_->clear(); fields_data_.clear(); } @@ -605,15 +604,11 @@ struct InsertRecord { public: ConcurrentVector timestamps_; - ConcurrentVector row_ids_; // used for preInsert of growing segment std::atomic reserved = 0; AckResponder ack_responder_; - // used for timestamps index of sealed segment - TimestampIndex timestamp_index_; - // pks to row offset std::unique_ptr pk2offset_; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 3d1f277c43d89..d8cd057f28be7 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -110,7 +110,6 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, // step 3: fill into Segment.ConcurrentVector insert_record_.timestamps_.set_data_raw( reserved_offset, timestamps_raw, num_rows); - insert_record_.row_ids_.set_data_raw(reserved_offset, row_ids, num_rows); // update the mem size of timestamps and row IDs stats_.mem_size += num_rows * (sizeof(Timestamp) + sizeof(idx_t)); @@ -224,7 +223,6 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { } if (field_id == RowFieldID) { - insert_record_.row_ids_.set_data_raw(reserved_offset, field_data); continue; } @@ -313,7 +311,6 @@ SegmentGrowingImpl::LoadFieldDataV2(const LoadFieldDataInfo& infos) { } if (field_id == RowFieldID) { - insert_record_.row_ids_.set_data_raw(reserved_offset, field_data); continue; } @@ -766,10 +763,8 @@ SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type, static_cast(output)); break; case SystemFieldType::RowId: - bulk_subscript_impl(&this->insert_record_.row_ids_, - seg_offsets, - count, - static_cast(output)); + PanicInfo(ErrorCode::Unsupported, + "RowId retrieve is not supported"); break; default: PanicInfo(DataTypeInvalid, "unknown subscript fields"); diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 6a2dbf1485bfd..663cfa20819be 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -235,6 +235,7 @@ class SegmentInternalInterface : public SegmentInterface { virtual int64_t num_chunk_data(FieldId field_id) const = 0; + // bitset 1 means not hit. 0 means hit. virtual void mask_with_timestamps(BitsetType& bitset_chunk, Timestamp timestamp) const = 0; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 36e7a6aebb184..4c06d6a3ffb5b 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "Utils.h" #include "Types.h" @@ -348,35 +349,15 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { offset += row_count; } - TimestampIndex index; - auto min_slice_length = num_rows < 4096 ? 1 : 4096; - auto meta = GenerateFakeSlices( - timestamps.data(), num_rows, min_slice_length); - index.set_length_meta(std::move(meta)); - // todo ::opt to avoid copy timestamps from field data - index.build_with(timestamps.data(), num_rows); - - // use special index std::unique_lock lck(mutex_); AssertInfo(insert_record_.timestamps_.empty(), "already exists"); insert_record_.timestamps_.fill_chunk_data(field_data); - insert_record_.timestamp_index_ = std::move(index); AssertInfo(insert_record_.timestamps_.num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); stats_.mem_size += sizeof(Timestamp) * data.row_count; } else { AssertInfo(system_field_type == SystemFieldType::RowId, "System field type of id column is not RowId"); - - auto field_data = storage::CollectFieldDataChannel(data.channel); - - // write data under lock - std::unique_lock lck(mutex_); - AssertInfo(insert_record_.row_ids_.empty(), "already exists"); - insert_record_.row_ids_.fill_chunk_data(field_data); - AssertInfo(insert_record_.row_ids_.num_chunk() == 1, - "num chunk not equal to 1 for sealed segment"); - stats_.mem_size += sizeof(idx_t) * data.row_count; } ++system_ready_count_; } else { @@ -925,9 +906,7 @@ SegmentSealedImpl::DropFieldData(const FieldId field_id) { std::unique_lock lck(mutex_); --system_ready_count_; - if (system_field_type == SystemFieldType::RowId) { - insert_record_.row_ids_.clear(); - } else if (system_field_type == SystemFieldType::Timestamp) { + if (system_field_type == SystemFieldType::Timestamp) { insert_record_.timestamps_.clear(); } lck.unlock(); @@ -1042,13 +1021,7 @@ SegmentSealedImpl::bulk_subscript(SystemFieldType system_type, static_cast(output)); break; case SystemFieldType::RowId: - AssertInfo(insert_record_.row_ids_.num_chunk() == 1, - "num chunk of rowID not equal to 1 for sealed segment"); - bulk_subscript_impl( - this->insert_record_.row_ids_.get_chunk_data(0), - seg_offsets, - count, - static_cast(output)); + PanicInfo(ErrorCode::Unsupported, "RowId retrieve not supported"); break; default: PanicInfo(DataTypeInvalid, @@ -1512,12 +1485,6 @@ SegmentSealedImpl::debug() const { void SegmentSealedImpl::LoadSegmentMeta( const proto::segcore::LoadSegmentMeta& segment_meta) { - std::unique_lock lck(mutex_); - std::vector slice_lengths; - for (auto& info : segment_meta.metas()) { - slice_lengths.push_back(info.row_count()); - } - insert_record_.timestamp_index_.set_length_meta(std::move(slice_lengths)); PanicInfo(NotImplemented, "unimplemented"); } @@ -1529,33 +1496,17 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const { void SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk, - Timestamp timestamp) const { - // TODO change the - AssertInfo(insert_record_.timestamps_.num_chunk() == 1, - "num chunk not equal to 1 for sealed segment"); - const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0); - AssertInfo(timestamps_data.size() == get_row_count(), - fmt::format("Timestamp size not equal to row count: {}, {}", - timestamps_data.size(), - get_row_count())); - auto range = insert_record_.timestamp_index_.get_active_range(timestamp); - - // range == (size_, size_) and size_ is this->timestamps_.size(). - // it means these data are all useful, we don't need to update bitset_chunk. - // It can be thought of as an OR operation with another bitmask that is all 0s, but it is not necessary to do so. - if (range.first == range.second && range.first == timestamps_data.size()) { - // just skip - return; - } - // range == (0, 0). it means these data can not be used, directly set bitset_chunk to all 1s. - // It can be thought of as an OR operation with another bitmask that is all 1s. - if (range.first == range.second && range.first == 0) { - bitset_chunk.set(); - return; + Timestamp ts) const { + auto row_count = this->get_row_count(); + auto& ts_vec = this->insert_record_.timestamps_; + auto iter = std::upper_bound( + boost::make_counting_iterator(static_cast(0)), + boost::make_counting_iterator(row_count), + ts, + [&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; }); + for (size_t i = *iter; i < row_count; ++i) { + bitset_chunk.set(i); } - auto mask = TimestampIndex::GenerateBitset( - timestamp, range, timestamps_data.data(), timestamps_data.size()); - bitset_chunk |= mask; } bool