Skip to content

Commit

Permalink
[Improve](inverted_index) update clucene and improve array inverted i…
Browse files Browse the repository at this point in the history
…ndex writer (#32436)
  • Loading branch information
amorynan authored and Doris-Extras committed Apr 10, 2024
1 parent 8e6ed80 commit 28e2d89
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 33 deletions.
92 changes: 60 additions & 32 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {

explicit InvertedIndexColumnWriterImpl(const std::string& field_name,
InvertedIndexFileWriter* index_file_writer,
const TabletIndex* index_meta)
: _index_meta(index_meta), _index_file_writer(index_file_writer) {
const TabletIndex* index_meta,
const bool single_field = true)
: _single_field(single_field),
_index_meta(index_meta),
_index_file_writer(index_file_writer) {
_parser_type = get_inverted_index_parser_type_from_string(
get_parser_string_from_properties(_index_meta->properties()));
_value_key_coder = get_key_coder(field_type);
Expand Down Expand Up @@ -235,9 +238,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
RETURN_IF_ERROR(create_char_string_reader(_char_string_reader));
RETURN_IF_ERROR(create_analyzer(_analyzer));
RETURN_IF_ERROR(create_index_writer(_index_writer));
RETURN_IF_ERROR(create_field(&_field));
_doc = std::make_unique<lucene::document::Document>();
_doc->add(*_field);
if (_single_field) {
RETURN_IF_ERROR(create_field(&_field));
_doc->add(*_field);
} else {
// array's inverted index do need create field first
_doc->setNeedResetFieldData(true);
}
return Status::OK();
}

Expand Down Expand Up @@ -280,11 +288,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
return Status::OK();
}

void new_fulltext_field(const char* field_value_data, size_t field_value_size) {
if (_parser_type == InvertedIndexParserType::PARSER_ENGLISH ||
_parser_type == InvertedIndexParserType::PARSER_CHINESE ||
_parser_type == InvertedIndexParserType::PARSER_UNICODE ||
_parser_type == InvertedIndexParserType::PARSER_STANDARD) {
void new_inverted_index_field(const char* field_value_data, size_t field_value_size) {
if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN &&
_parser_type != InvertedIndexParserType::PARSER_NONE) {
new_char_token_stream(field_value_data, field_value_size, _field);
} else {
new_field_char_value(field_value_data, field_value_size, _field);
Expand Down Expand Up @@ -326,7 +332,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
(_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) {
RETURN_IF_ERROR(add_null_document());
} else {
new_fulltext_field(v->get_data(), v->get_size());
new_inverted_index_field(v->get_data(), v->get_size());
RETURN_IF_ERROR(add_document());
}
++v;
Expand All @@ -346,39 +352,58 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
}
const auto* offsets = reinterpret_cast<const uint64_t*>(offsets_ptr);
if constexpr (field_is_slice_type(field_type)) {
if (_field == nullptr || _index_writer == nullptr) {
LOG(ERROR) << "field or index writer is null in inverted index writer.";
return Status::InternalError(
"field or index writer is null in inverted index writer");
if (_index_writer == nullptr) {
LOG(ERROR) << "index writer is null in inverted index writer.";
return Status::InternalError("index writer is null in inverted index writer");
}
auto ignore_above_value =
get_parser_ignore_above_value_from_properties(_index_meta->properties());
auto ignore_above = std::stoi(ignore_above_value);
for (int i = 0; i < count; ++i) {
// offsets[i+1] is now row element count
std::vector<std::string> strings;
// [0, 3, 6]
// [10,20,30] [20,30,40], [30,40,50]
auto start_off = offsets[i];
auto end_off = offsets[i + 1];
// TODO(Amory).later we use object pool to avoid field creation
lucene::document::Field* new_field = nullptr;
CL_NS(analysis)::TokenStream* ts = nullptr;
for (auto j = start_off; j < end_off; ++j) {
if (null_map[j] == 1) {
continue;
}
// now we temp create field . later make a pool
if (Status st = create_field(&new_field); st != Status::OK()) {
LOG(ERROR)
<< "create field " << string(_field_name.begin(), _field_name.end())
<< " error:" << st;
return st;
}
auto* v = (Slice*)((const uint8_t*)value_ptr + j * field_size);
strings.emplace_back(v->get_data(), v->get_size());
}

auto value = join(strings, " ");
// only ignore_above UNTOKENIZED strings and empty strings not tokenized
if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
value.length() > ignore_above) ||
(_parser_type != InvertedIndexParserType::PARSER_NONE && value.empty())) {
RETURN_IF_ERROR(add_null_document());
} else {
new_fulltext_field(value.c_str(), value.length());
RETURN_IF_ERROR(add_document());
if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
v->get_size() > ignore_above) ||
(_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) {
// is here a null value?
// TODO. Maybe here has performance problem for large size string.
continue;
} else {
if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN &&
_parser_type != InvertedIndexParserType::PARSER_NONE) {
// in this case stream need to delete after add_document, because the
// stream can not reuse for different field
_char_string_reader->init(v->get_data(), v->get_size(), false);
ts = _analyzer->tokenStream(new_field->name(),
_char_string_reader.get());
new_field->setValue(ts);
} else {
new_field_char_value(v->get_data(), v->get_size(), new_field);
}
_doc->add(*new_field);
}
}
RETURN_IF_ERROR(add_document());
_doc->clear();
_CLDELETE(ts);
_rid++;
}
} else if constexpr (field_is_numeric_type(field_type)) {
Expand Down Expand Up @@ -424,7 +449,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
item_data_ptr = (uint8_t*)item_data_ptr + field_size;
}
auto value = join(strings, " ");
new_fulltext_field(value.c_str(), value.length());
new_inverted_index_field(value.c_str(), value.length());
_rid++;
RETURN_IF_ERROR(add_document());
values++;
Expand Down Expand Up @@ -577,6 +602,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {

std::unique_ptr<lucene::document::Document> _doc = nullptr;
lucene::document::Field* _field = nullptr;
bool _single_field = true;
std::unique_ptr<lucene::index::IndexWriter> _index_writer = nullptr;
std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr;
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
Expand All @@ -596,22 +622,24 @@ Status InvertedIndexColumnWriter::create(const Field* field,
const auto* typeinfo = field->type_info();
FieldType type = typeinfo->type();
std::string field_name = field->name();
bool single_field = true;
if (type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
const auto* array_typeinfo = dynamic_cast<const ArrayTypeInfo*>(typeinfo);
if (array_typeinfo != nullptr) {
typeinfo = array_typeinfo->item_type_info();
type = typeinfo->type();
single_field = false;
} else {
return Status::NotSupported("unsupported array type for inverted index: " +
std::to_string(int(type)));
}
}

switch (type) {
#define M(TYPE) \
case TYPE: \
*res = std::make_unique<InvertedIndexColumnWriterImpl<TYPE>>( \
field_name, index_file_writer, index_meta); \
#define M(TYPE) \
case TYPE: \
*res = std::make_unique<InvertedIndexColumnWriterImpl<TYPE>>( \
field_name, index_file_writer, index_meta, single_field); \
break;
M(FieldType::OLAP_FIELD_TYPE_TINYINT)
M(FieldType::OLAP_FIELD_TYPE_SMALLINT)
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ class ExecEnv {
DNSCache* dns_cache() { return _dns_cache; }

#ifdef BE_TEST
void set_tmp_file_dir(std::unique_ptr<segment_v2::TmpFileDirs> tmp_file_dirs) {
this->_tmp_file_dirs = std::move(tmp_file_dirs);
}
void set_ready() { this->_s_ready = true; }
void set_not_ready() { this->_s_ready = false; }
void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
Expand Down
Loading

0 comments on commit 28e2d89

Please sign in to comment.