Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mow master 0922 tmp 4 rebase #3

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,8 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
output_rowsets.push_back(_output_rowset);

if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
_tablet->enable_unique_key_merge_on_write() &&
_tablet->tablet_schema()->cluster_key_idxes().empty()) {
Version version = _tablet->max_version();
DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
Expand Down
59 changes: 40 additions & 19 deletions be/src/olap/delete_bitmap_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,47 +92,54 @@ bool MergeIndexDeleteBitmapCalculatorContext::Comparator::operator()(
Slice key1, key2;
RETURN_IF_ERROR(lhs->get_current_key(key1));
RETURN_IF_ERROR(rhs->get_current_key(key2));
if (_sequence_length == 0) {
if (_sequence_length == 0 && _rowid_length == 0) {
auto cmp_result = key1.compare(key2);
// when key1 is the same as key2,
// we want the one with greater segment id to be popped first
return cmp_result ? (cmp_result > 0) : (lhs->segment_id() < rhs->segment_id());
}
// smaller key popped first
auto key1_without_seq = Slice(key1.get_data(), key1.get_size() - _sequence_length);
auto key2_without_seq = Slice(key2.get_data(), key2.get_size() - _sequence_length);
auto key1_without_seq =
Slice(key1.get_data(), key1.get_size() - _sequence_length - _rowid_length);
auto key2_without_seq =
Slice(key2.get_data(), key2.get_size() - _sequence_length - _rowid_length);
auto cmp_result = key1_without_seq.compare(key2_without_seq);
if (cmp_result != 0) {
return cmp_result > 0;
}
// greater sequence value popped first
auto key1_sequence_val =
Slice(key1.get_data() + key1.get_size() - _sequence_length, _sequence_length);
auto key2_sequence_val =
Slice(key2.get_data() + key2.get_size() - _sequence_length, _sequence_length);
cmp_result = key1_sequence_val.compare(key2_sequence_val);
if (cmp_result != 0) {
return cmp_result < 0;
if (_sequence_length > 0) {
// greater sequence value popped first
auto key1_sequence_val =
Slice(key1.get_data() + key1_without_seq.get_size() + 1, _sequence_length - 1);
auto key2_sequence_val =
Slice(key2.get_data() + key2_without_seq.get_size() + 1, _sequence_length - 1);
cmp_result = key1_sequence_val.compare(key2_sequence_val);
if (cmp_result != 0) {
return cmp_result < 0;
}
}
// greater segment id popped first
return lhs->segment_id() < rhs->segment_id();
}

bool MergeIndexDeleteBitmapCalculatorContext::Comparator::is_key_same(Slice const& lhs,
Slice const& rhs) const {
DCHECK(lhs.get_size() >= _sequence_length);
DCHECK(rhs.get_size() >= _sequence_length);
auto lhs_without_seq = Slice(lhs.get_data(), lhs.get_size() - _sequence_length);
auto rhs_without_seq = Slice(rhs.get_data(), rhs.get_size() - _sequence_length);
DCHECK(lhs.get_size() >= _sequence_length + _rowid_length);
DCHECK(rhs.get_size() >= _sequence_length + _rowid_length);
auto lhs_without_seq = Slice(lhs.get_data(), lhs.get_size() - _sequence_length - _rowid_length);
auto rhs_without_seq = Slice(rhs.get_data(), rhs.get_size() - _sequence_length - _rowid_length);
return lhs_without_seq.compare(rhs_without_seq) == 0;
}

Status MergeIndexDeleteBitmapCalculator::init(RowsetId rowset_id,
std::vector<SegmentSharedPtr> const& segments,
size_t seq_col_length, size_t max_batch_size) {
size_t seq_col_length, size_t rowdid_length,
size_t max_batch_size) {
_rowset_id = rowset_id;
_seq_col_length = seq_col_length;
_comparator = MergeIndexDeleteBitmapCalculatorContext::Comparator(seq_col_length);
_rowid_length = rowdid_length;
_comparator =
MergeIndexDeleteBitmapCalculatorContext::Comparator(seq_col_length, _rowid_length);
_contexts.reserve(segments.size());
_heap = std::make_unique<Heap>(_comparator);

Expand All @@ -146,6 +153,10 @@ Status MergeIndexDeleteBitmapCalculator::init(RowsetId rowset_id,
_contexts.emplace_back(std::move(index), index_type, segment->id(), pk_idx->num_rows());
_heap->push(&_contexts.back());
}
if (_rowid_length > 0) {
_rowid_coder = get_key_coder(
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>()->type());
}
return Status::OK();
}

Expand All @@ -159,6 +170,15 @@ Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) {
if (!_last_key.empty() && _comparator.is_key_same(cur_key, _last_key)) {
loc.segment_id = cur_ctx->segment_id();
loc.row_id = cur_ctx->row_id();
if (_rowid_length > 0) {
Slice key_without_seq = Slice(cur_key.get_data(),
cur_key.get_size() - _seq_col_length - _rowid_length);
Slice rowid_slice =
Slice(cur_key.get_data() + key_without_seq.get_size() + _seq_col_length + 1,
_rowid_length - 1);
RETURN_IF_ERROR(_rowid_coder->decode_ascending(&rowid_slice, _rowid_length,
(uint8_t*)&loc.row_id));
}
auto st = cur_ctx->advance();
if (st.ok()) {
_heap->push(cur_ctx);
Expand All @@ -176,8 +196,9 @@ Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) {
RETURN_IF_ERROR(nxt_ctx->get_current_key(nxt_key));
Status st = _comparator.is_key_same(cur_key, nxt_key)
? cur_ctx->advance()
: cur_ctx->seek_at_or_after(Slice(
nxt_key.get_data(), nxt_key.get_size() - _seq_col_length));
: cur_ctx->seek_at_or_after(
Slice(nxt_key.get_data(),
nxt_key.get_size() - _seq_col_length - _rowid_length));
if (st.is<ErrorCode::END_OF_FILE>()) {
continue;
}
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/delete_bitmap_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/base_tablet.h"
#include "olap/binlog_config.h"
#include "olap/data_dir.h"
#include "olap/key_coder.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
Expand All @@ -47,13 +48,15 @@ class MergeIndexDeleteBitmapCalculatorContext {
public:
class Comparator {
public:
Comparator(size_t sequence_length) : _sequence_length(sequence_length) {}
Comparator(size_t sequence_length, size_t rowid_length)
: _sequence_length(sequence_length), _rowid_length(rowid_length) {}
bool operator()(MergeIndexDeleteBitmapCalculatorContext* lhs,
MergeIndexDeleteBitmapCalculatorContext* rhs) const;
bool is_key_same(Slice const& lhs, Slice const& rhs) const;

private:
size_t _sequence_length;
size_t _rowid_length;
};

MergeIndexDeleteBitmapCalculatorContext(std::unique_ptr<segment_v2::IndexedColumnIterator> iter,
Expand Down Expand Up @@ -90,7 +93,7 @@ class MergeIndexDeleteBitmapCalculator {
MergeIndexDeleteBitmapCalculator() = default;

Status init(RowsetId rowset_id, std::vector<SegmentSharedPtr> const& segments,
size_t seq_col_length = 0, size_t max_batch_size = 1024);
size_t seq_col_length = 0, size_t rowid_length = 0, size_t max_batch_size = 1024);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 1024 is a magic number; consider replacing it with a named constant [readability-magic-numbers]

                size_t seq_col_length = 0, size_t rowid_length = 0, size_t max_batch_size = 1024);
                                                                                            ^


Status calculate_one(RowLocation& loc);

Expand All @@ -101,11 +104,13 @@ class MergeIndexDeleteBitmapCalculator {
std::vector<MergeIndexDeleteBitmapCalculatorContext*>,
MergeIndexDeleteBitmapCalculatorContext::Comparator>;
std::vector<MergeIndexDeleteBitmapCalculatorContext> _contexts;
MergeIndexDeleteBitmapCalculatorContext::Comparator _comparator {0};
MergeIndexDeleteBitmapCalculatorContext::Comparator _comparator {0, 0};
RowsetId _rowset_id;
std::unique_ptr<Heap> _heap;
std::string _last_key;
size_t _seq_col_length;
size_t _rowid_length;
const KeyCoder* _rowid_coder = nullptr;
};

} // namespace doris
54 changes: 54 additions & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,56 @@ size_t MemTable::_sort() {
return same_keys_num;
}

void MemTable::_sort_by_cluster_keys() {
SCOPED_RAW_TIMER(&_stat.sort_ns);
_stat.sort_times++;
// sort all rows
vectorized::Block in_block = _output_mutable_block.to_block();
vectorized::MutableBlock mutable_block =
vectorized::MutableBlock::build_mutable_block(&in_block);
auto clone_block = in_block.clone_without_columns();
_output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block);

std::vector<RowInBlock*> row_in_blocks;
std::unique_ptr<int, std::function<void(int*)>> row_in_blocks_deleter((int*)0x01, [&](int*) {
std::for_each(row_in_blocks.begin(), row_in_blocks.end(),
std::default_delete<RowInBlock>());
});
row_in_blocks.reserve(mutable_block.rows());
for (size_t i = 0; i < mutable_block.rows(); i++) {
row_in_blocks.emplace_back(new RowInBlock {i});
}
Tie tie = Tie(0, mutable_block.rows());

for (auto i : _tablet_schema->cluster_key_idxes()) {
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
return mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1);
};
_sort_one_column(row_in_blocks, tie, cmp);
}

// sort extra round by _row_pos to make the sort stable
auto iter = tie.iter();
while (iter.next()) {
pdqsort(std::next(row_in_blocks.begin(), iter.left()),
std::next(row_in_blocks.begin(), iter.right()),
[](const RowInBlock* lhs, const RowInBlock* rhs) -> bool {
return lhs->_row_pos < rhs->_row_pos;
});
}

in_block = mutable_block.to_block();
SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
std::vector<int> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
for (int i = 0; i < row_in_blocks.size(); i++) {
row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos);
}
_output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
}

void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp) {
auto iter = tie.iter();
Expand Down Expand Up @@ -448,6 +498,10 @@ std::unique_ptr<vectorized::Block> MemTable::to_block() {
} else {
_aggregate<true>();
}
if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow &&
!_tablet_schema->cluster_key_idxes().empty()) {
_sort_by_cluster_keys();
}
return vectorized::Block::create_unique(_output_mutable_block.to_block());
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class MemTable {

//return number of same keys
size_t _sort();
void _sort_by_cluster_keys();
void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp);
template <bool is_final>
Expand Down
40 changes: 36 additions & 4 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema());
}
reader_params.tablet_schema = merge_tablet_schema;
if (!tablet->tablet_schema()->cluster_key_idxes().empty()) {
reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
}

if (stats_output && stats_output->rowid_conversion) {
reader_params.record_rowids = true;
Expand Down Expand Up @@ -164,16 +167,23 @@ void Merger::vertical_split_columns(TabletSchemaSPtr tablet_schema,
if (delete_sign_idx != -1) {
key_columns.emplace_back(delete_sign_idx);
}
if (!tablet_schema->cluster_key_idxes().empty()) {
for (const auto& cid : tablet_schema->cluster_key_idxes()) {
if (cid >= num_key_cols) {
key_columns.emplace_back(cid);
}
}
}
}
VLOG_NOTICE << "sequence_col_idx=" << sequence_col_idx
<< ", delete_sign_idx=" << delete_sign_idx;
// for duplicate no keys
if (!key_columns.empty()) {
column_groups->emplace_back(std::move(key_columns));
}
std::vector<uint32_t> value_columns;
for (auto i = num_key_cols; i < total_cols; ++i) {
if (i == sequence_col_idx || i == delete_sign_idx) {
if (i == sequence_col_idx || i == delete_sign_idx ||
key_columns.end() != std::find(key_columns.begin(), key_columns.end(), i)) {
continue;
}
if ((i - num_key_cols) % config::vertical_compaction_num_columns_per_group == 0) {
Expand All @@ -187,12 +197,14 @@ Status Merger::vertical_compact_one_group(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, bool is_key,
const std::vector<uint32_t>& column_group, vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output) {
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
TabletReader::ReaderParams reader_params;
reader_params.is_key_column_group = is_key;
reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;

Expand All @@ -214,6 +226,9 @@ Status Merger::vertical_compact_one_group(
}

reader_params.tablet_schema = merge_tablet_schema;
if (!tablet->tablet_schema()->cluster_key_idxes().empty()) {
reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
}

if (is_key && stats_output && stats_output->rowid_conversion) {
reader_params.record_rowids = true;
Expand Down Expand Up @@ -341,6 +356,22 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_
std::vector<std::vector<uint32_t>> column_groups;
vertical_split_columns(tablet_schema, &column_groups);

std::vector<uint32_t> key_group_cluster_key_idxes;
if (column_groups.size() > 0) {
if (!tablet_schema->cluster_key_idxes().empty()) {
auto& key_column_group = column_groups[0];
for (const auto& index_in_tablet_schema : tablet_schema->cluster_key_idxes()) {
for (auto j = 0; j < key_column_group.size(); ++j) {
auto cid = key_column_group[j];
if (cid == index_in_tablet_schema) {
key_group_cluster_key_idxes.emplace_back(j);
break;
}
}
}
}
}

vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
reader_type);
// compact group one by one
Expand All @@ -349,7 +380,8 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_
bool is_key = (i == 0);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output));
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output,
key_group_cluster_key_idxes));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class Merger {
bool is_key, const std::vector<uint32_t>& column_group,
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output);
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes);

// for segcompaction
static Status vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Status PrimaryKeyIndexBuilder::init() {

Status PrimaryKeyIndexBuilder::add_item(const Slice& key) {
RETURN_IF_ERROR(_primary_key_index_builder->add(&key));
Slice key_without_seq = Slice(key.get_data(), key.get_size() - _seq_col_length);
Slice key_without_seq = Slice(key.get_data(), key.get_size() - _seq_col_length - _rowid_length);
_bloom_filter_index_builder->add_values(&key_without_seq, 1);
// the key is already sorted, so the first key is min_key, and
// the last key is max_key.
Expand Down
14 changes: 10 additions & 4 deletions be/src/olap/primary_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ class PrimaryKeyIndexMetaPB;
// NOTE: for now, it's only used when unique key merge-on-write property enabled.
class PrimaryKeyIndexBuilder {
public:
PrimaryKeyIndexBuilder(io::FileWriter* file_writer, size_t seq_col_length)
PrimaryKeyIndexBuilder(io::FileWriter* file_writer, size_t seq_col_length, size_t rowid_length)
: _file_writer(file_writer),
_num_rows(0),
_size(0),
_disk_size(0),
_seq_col_length(seq_col_length) {}
_seq_col_length(seq_col_length),
_rowid_length(rowid_length) {}

Status init();

Expand All @@ -70,8 +71,12 @@ class PrimaryKeyIndexBuilder {
// used for be ut
uint32_t data_page_num() const { return _primary_key_index_builder->data_page_num(); }

Slice min_key() { return Slice(_min_key.data(), _min_key.size() - _seq_col_length); }
Slice max_key() { return Slice(_max_key.data(), _max_key.size() - _seq_col_length); }
Slice min_key() {
return Slice(_min_key.data(), _min_key.size() - _seq_col_length - _rowid_length);
}
Slice max_key() {
return Slice(_max_key.data(), _max_key.size() - _seq_col_length - _rowid_length);
}

Status finalize(segment_v2::PrimaryKeyIndexMetaPB* meta);

Expand All @@ -81,6 +86,7 @@ class PrimaryKeyIndexBuilder {
uint64_t _size;
uint64_t _disk_size;
size_t _seq_col_length;
size_t _rowid_length;

faststring _min_key;
faststring _max_key;
Expand Down
Loading
Loading