Skip to content

Commit

Permalink
[enhancement](oom) return error when bloom filter allocate memory fai…
Browse files Browse the repository at this point in the history
…led (apache#35790)

## Proposed changes


1. return error when bloom filter allocate memory failed
2. return error when deserialize a block,  it may need a lot of memory.

---------

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Jun 3, 2024
1 parent 3f7e7a0 commit 645147a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 12 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Status PrimaryKeyIndexBuilder::init() {
Status PrimaryKeyIndexBuilder::add_item(const Slice& key) {
RETURN_IF_ERROR(_primary_key_index_builder->add(&key));
Slice key_without_seq = Slice(key.get_data(), key.get_size() - _seq_col_length - _rowid_length);
_bloom_filter_index_builder->add_values(&key_without_seq, 1);
RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(&key_without_seq, 1));
// the key is already sorted, so the first key is min_key, and
// the last key is max_key.
if (UNLIKELY(_num_rows == 0)) {
Expand Down
13 changes: 8 additions & 5 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {

~BloomFilterIndexWriterImpl() override = default;

void add_values(const void* values, size_t count) override {
Status add_values(const void* values, size_t count) override {
const CppType* v = (const CppType*)values;
for (int i = 0; i < count; ++i) {
if (_values.find(*v) == _values.end()) {
if constexpr (_is_slice_type()) {
CppType new_value;
_type_info->deep_copy(&new_value, v, &_arena);
RETURN_IF_CATCH_EXCEPTION(_type_info->deep_copy(&new_value, v, &_arena));
_values.insert(new_value);
} else if constexpr (_is_int128()) {
int128_t new_value;
Expand All @@ -97,6 +97,7 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
}
++v;
}
return Status::OK();
}

void add_nulls(uint32_t count) override { _has_null = true; }
Expand Down Expand Up @@ -175,14 +176,15 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {

} // namespace

void PrimaryKeyBloomFilterIndexWriterImpl::add_values(const void* values, size_t count) {
Status PrimaryKeyBloomFilterIndexWriterImpl::add_values(const void* values, size_t count) {
const Slice* v = (const Slice*)values;
for (int i = 0; i < count; ++i) {
Slice new_value;
_type_info->deep_copy(&new_value, v, &_arena);
RETURN_IF_CATCH_EXCEPTION(_type_info->deep_copy(&new_value, v, &_arena));
_values.push_back(new_value);
++v;
}
return Status::OK();
}

Status PrimaryKeyBloomFilterIndexWriterImpl::flush() {
Expand Down Expand Up @@ -247,14 +249,15 @@ NGramBloomFilterIndexWriterImpl::NGramBloomFilterIndexWriterImpl(
static_cast<void>(BloomFilter::create(NGRAM_BLOOM_FILTER, &_bf, bf_size));
}

void NGramBloomFilterIndexWriterImpl::add_values(const void* values, size_t count) {
Status NGramBloomFilterIndexWriterImpl::add_values(const void* values, size_t count) {
const Slice* src = reinterpret_cast<const Slice*>(values);
for (int i = 0; i < count; ++i, ++src) {
if (src->size < _gram_size) {
continue;
}
_token_extractor.string_to_bloom_filter(src->data, src->size, *_bf);
}
return Status::OK();
}

Status NGramBloomFilterIndexWriterImpl::flush() {
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BloomFilterIndexWriter {
BloomFilterIndexWriter() = default;
virtual ~BloomFilterIndexWriter() = default;

virtual void add_values(const void* values, size_t count) = 0;
virtual Status add_values(const void* values, size_t count) = 0;

virtual void add_nulls(uint32_t count) = 0;

Expand Down Expand Up @@ -85,7 +85,9 @@ class PrimaryKeyBloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
}
};

void add_values(const void* values, size_t count) override;
// This method may allocate large memory for bf, will return error
// when memory is exhaused to prevent oom.
Status add_values(const void* values, size_t count) override;

void add_nulls(uint32_t count) override { _has_null = true; }

Expand Down Expand Up @@ -114,7 +116,7 @@ class NGramBloomFilterIndexWriterImpl : public BloomFilterIndexWriter {

NGramBloomFilterIndexWriterImpl(const BloomFilterOptions& bf_options, uint8_t gram_size,
uint16_t bf_size);
void add_values(const void* values, size_t count) override;
Status add_values(const void* values, size_t count) override;
void add_nulls(uint32_t) override {}
Status flush() override;
Status finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ Status ScalarColumnWriter::append_data_in_current_page(const uint8_t* data, size
_inverted_index_builder->add_values(get_field()->name(), data, *num_written));
}
if (_opts.need_bloom_filter) {
_bloom_filter_index_builder->add_values(data, *num_written);
RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, *num_written));
}

_next_rowid += *num_written;
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Status Block::deserialize(const PBlock& pblock) {
BlockCompressionCodec* codec;
RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec));
uncompressed_size = pblock.uncompressed_size();
// Should also use allocator to allocate memory here.
compression_scratch.resize(uncompressed_size);
Slice decompressed_slice(compression_scratch);
RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size),
Expand All @@ -123,7 +124,9 @@ Status Block::deserialize(const PBlock& pblock) {
for (const auto& pcol_meta : pblock.column_metas()) {
DataTypePtr type = DataTypeFactory::instance().create_data_type(pcol_meta);
MutableColumnPtr data_column = type->create_column();
buf = type->deserialize(buf, data_column.get(), pblock.be_exec_version());
// Here will try to allocate large memory, should return error if failed.
RETURN_IF_CATCH_EXCEPTION(
buf = type->deserialize(buf, data_column.get(), pblock.be_exec_version()));
data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
}
initialize_index_by_name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val
const CppType* vals = (const CppType*)values;
for (int i = 0; i < value_count;) {
size_t num = std::min(1024, (int)value_count - i);
bloom_filter_index_writer->add_values(vals + i, num);
static_cast<void>(bloom_filter_index_writer->add_values(vals + i, num));
if (i == 2048) {
// second page
bloom_filter_index_writer->add_nulls(null_count);
Expand Down

0 comments on commit 645147a

Please sign in to comment.