Skip to content

Commit

Permalink
fix: readd timestamp index because segment timestamp not ordered (#33856
Browse files Browse the repository at this point in the history
)

#33533

Signed-off-by: luzhang <[email protected]>
Co-authored-by: luzhang <[email protected]>
  • Loading branch information
zhagnlu and luzhang authored Jun 16, 2024
1 parent f67b6dc commit e422168
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
4 changes: 4 additions & 0 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ struct InsertRecord {
timestamps_.clear();
reserved = 0;
ack_responder_.clear();
timestamp_index_ = TimestampIndex();
pk2offset_->clear();
fields_data_.clear();
}
Expand All @@ -610,6 +611,9 @@ struct InsertRecord {
std::atomic<int64_t> reserved = 0;
AckResponder ack_responder_;

// used for timestamps index of sealed segment
TimestampIndex timestamp_index_;

// pks to row offset
std::unique_ptr<OffsetMap> pk2offset_;

Expand Down
53 changes: 42 additions & 11 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <string_view>
#include <unordered_map>
#include <vector>
#include <boost/iterator/counting_iterator.hpp>

#include "Utils.h"
#include "Types.h"
Expand Down Expand Up @@ -349,9 +348,19 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
offset += row_count;
}

TimestampIndex index;
auto min_slice_length = num_rows < 4096 ? 1 : 4096;
auto meta = GenerateFakeSlices(
timestamps.data(), num_rows, min_slice_length);
index.set_length_meta(std::move(meta));
// todo ::opt to avoid copy timestamps from field data
index.build_with(timestamps.data(), num_rows);

// use special index
std::unique_lock lck(mutex_);
AssertInfo(insert_record_.timestamps_.empty(), "already exists");
insert_record_.timestamps_.fill_chunk_data(field_data);
insert_record_.timestamp_index_ = std::move(index);
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
stats_.mem_size += sizeof(Timestamp) * data.row_count;
Expand Down Expand Up @@ -1519,6 +1528,12 @@ SegmentSealedImpl::debug() const {
void
SegmentSealedImpl::LoadSegmentMeta(
const proto::segcore::LoadSegmentMeta& segment_meta) {
std::unique_lock lck(mutex_);
std::vector<int64_t> slice_lengths;
for (auto& info : segment_meta.metas()) {
slice_lengths.push_back(info.row_count());
}
insert_record_.timestamp_index_.set_length_meta(std::move(slice_lengths));
PanicInfo(NotImplemented, "unimplemented");
}

Expand All @@ -1530,17 +1545,33 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const {

void
SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk,
Timestamp ts) const {
auto row_count = this->get_row_count();
auto& ts_vec = this->insert_record_.timestamps_;
auto iter = std::upper_bound(
boost::make_counting_iterator(static_cast<int64_t>(0)),
boost::make_counting_iterator(row_count),
ts,
[&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; });
for (size_t i = *iter; i < row_count; ++i) {
bitset_chunk.set(i);
Timestamp timestamp) const {
// TODO change the
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0);
AssertInfo(timestamps_data.size() == get_row_count(),
fmt::format("Timestamp size not equal to row count: {}, {}",
timestamps_data.size(),
get_row_count()));
auto range = insert_record_.timestamp_index_.get_active_range(timestamp);

// range == (size_, size_) and size_ is this->timestamps_.size().
// it means these data are all useful, we don't need to update bitset_chunk.
// It can be thought of as an OR operation with another bitmask that is all 0s, but it is not necessary to do so.
if (range.first == range.second && range.first == timestamps_data.size()) {
// just skip
return;
}
// range == (0, 0). it means these data can not be used, directly set bitset_chunk to all 1s.
// It can be thought of as an OR operation with another bitmask that is all 1s.
if (range.first == range.second && range.first == 0) {
bitset_chunk.set();
return;
}
auto mask = TimestampIndex::GenerateBitset(
timestamp, range, timestamps_data.data(), timestamps_data.size());
bitset_chunk |= mask;
}

bool
Expand Down

0 comments on commit e422168

Please sign in to comment.