From fce88d7dcd18c9a18208c3b0bbc890fe52e8ccf2 Mon Sep 17 00:00:00 2001 From: amory Date: Thu, 28 Mar 2024 10:33:09 +0800 Subject: [PATCH] [Improve](inverted_index) update clucene and improve array inverted index writer (#32436) --- be/src/clucene | 2 +- .../segment_v2/inverted_index_writer.cpp | 92 ++++--- be/src/runtime/exec_env.h | 3 + .../segment_v2/inverted_index_array_test.cpp | 225 ++++++++++++++++++ 4 files changed, 289 insertions(+), 33 deletions(-) create mode 100644 be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp diff --git a/be/src/clucene b/be/src/clucene index ef95e67ae31234..ff2cd82f9e545a 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit ef95e67ae3123409f006072194f742a079603159 +Subproject commit ff2cd82f9e545a24318f1256eba312b4d0562a82 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 078668cb0cf230..87827464f155b2 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -80,8 +80,11 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { explicit InvertedIndexColumnWriterImpl(const std::string& field_name, InvertedIndexFileWriter* index_file_writer, - const TabletIndex* index_meta) - : _index_meta(index_meta), _index_file_writer(index_file_writer) { + const TabletIndex* index_meta, + const bool single_field = true) + : _single_field(single_field), + _index_meta(index_meta), + _index_file_writer(index_file_writer) { _parser_type = get_inverted_index_parser_type_from_string( get_parser_string_from_properties(_index_meta->properties())); _value_key_coder = get_key_coder(field_type); @@ -237,9 +240,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { RETURN_IF_ERROR(create_char_string_reader(_char_string_reader)); RETURN_IF_ERROR(create_analyzer(_analyzer)); RETURN_IF_ERROR(create_index_writer(_index_writer)); - RETURN_IF_ERROR(create_field(&_field)); _doc = std::make_unique(); - _doc->add(*_field); + if (_single_field) { + RETURN_IF_ERROR(create_field(&_field)); + _doc->add(*_field); + } else { + // array's inverted index do need create field first + _doc->setNeedResetFieldData(true); + } return Status::OK(); } @@ -282,11 +290,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { return Status::OK(); } - void new_fulltext_field(const char* field_value_data, size_t field_value_size) { - if (_parser_type == InvertedIndexParserType::PARSER_ENGLISH || - _parser_type == InvertedIndexParserType::PARSER_CHINESE || - _parser_type == InvertedIndexParserType::PARSER_UNICODE || - _parser_type == InvertedIndexParserType::PARSER_STANDARD) { + void new_inverted_index_field(const char* field_value_data, size_t field_value_size) { + if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN && + _parser_type != InvertedIndexParserType::PARSER_NONE) { new_char_token_stream(field_value_data, field_value_size, _field); } else { new_field_char_value(field_value_data, field_value_size, _field); @@ -328,7 +334,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { (_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) { RETURN_IF_ERROR(add_null_document()); } else { - new_fulltext_field(v->get_data(), v->get_size()); + new_inverted_index_field(v->get_data(), v->get_size()); RETURN_IF_ERROR(add_document()); } ++v; @@ -348,39 +354,58 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { } const auto* offsets = reinterpret_cast(offsets_ptr); if constexpr (field_is_slice_type(field_type)) { - if (_field == nullptr || _index_writer == nullptr) { - LOG(ERROR) << "field or index writer is null in inverted index writer."; - return Status::InternalError( - "field or index writer is null in inverted index writer"); + if (_index_writer == nullptr) { + LOG(ERROR) << "index writer is null in inverted index writer."; + return Status::InternalError("index writer is null in inverted index writer"); } auto ignore_above_value = get_parser_ignore_above_value_from_properties(_index_meta->properties()); auto ignore_above = std::stoi(ignore_above_value); for (int i = 0; i < count; ++i) { // offsets[i+1] is now row element count - std::vector strings; // [0, 3, 6] // [10,20,30] [20,30,40], [30,40,50] auto start_off = offsets[i]; auto end_off = offsets[i + 1]; + // TODO(Amory).later we use object pool to avoid field creation + lucene::document::Field* new_field = nullptr; + CL_NS(analysis)::TokenStream* ts = nullptr; for (auto j = start_off; j < end_off; ++j) { if (null_map[j] == 1) { continue; } + // now we temp create field . later make a pool + if (Status st = create_field(&new_field); st != Status::OK()) { + LOG(ERROR) + << "create field " << string(_field_name.begin(), _field_name.end()) + << " error:" << st; + return st; + } auto* v = (Slice*)((const uint8_t*)value_ptr + j * field_size); - strings.emplace_back(v->get_data(), v->get_size()); - } - - auto value = join(strings, " "); - // only ignore_above UNTOKENIZED strings and empty strings not tokenized - if ((_parser_type == InvertedIndexParserType::PARSER_NONE && - value.length() > ignore_above) || - (_parser_type != InvertedIndexParserType::PARSER_NONE && value.empty())) { - RETURN_IF_ERROR(add_null_document()); - } else { - new_fulltext_field(value.c_str(), value.length()); - RETURN_IF_ERROR(add_document()); + if ((_parser_type == InvertedIndexParserType::PARSER_NONE && + v->get_size() > ignore_above) || + (_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) { + // is here a null value? + // TODO. Maybe here has performance problem for large size string. + continue; + } else { + if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN && + _parser_type != InvertedIndexParserType::PARSER_NONE) { + // in this case stream need to delete after add_document, because the + // stream can not reuse for different field + _char_string_reader->init(v->get_data(), v->get_size(), false); + ts = _analyzer->tokenStream(new_field->name(), + _char_string_reader.get()); + new_field->setValue(ts); + } else { + new_field_char_value(v->get_data(), v->get_size(), new_field); + } + _doc->add(*new_field); + } } + RETURN_IF_ERROR(add_document()); + _doc->clear(); + _CLDELETE(ts); _rid++; } } else if constexpr (field_is_numeric_type(field_type)) { @@ -426,7 +451,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { item_data_ptr = (uint8_t*)item_data_ptr + field_size; } auto value = join(strings, " "); - new_fulltext_field(value.c_str(), value.length()); + new_inverted_index_field(value.c_str(), value.length()); _rid++; RETURN_IF_ERROR(add_document()); values++; @@ -579,6 +604,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { std::unique_ptr _doc = nullptr; lucene::document::Field* _field = nullptr; + bool _single_field = true; std::unique_ptr _index_writer = nullptr; std::unique_ptr _analyzer = nullptr; std::unique_ptr _char_string_reader = nullptr; @@ -598,11 +624,13 @@ Status InvertedIndexColumnWriter::create(const Field* field, const auto* typeinfo = field->type_info(); FieldType type = typeinfo->type(); std::string field_name = field->name(); + bool single_field = true; if (type == FieldType::OLAP_FIELD_TYPE_ARRAY) { const auto* array_typeinfo = dynamic_cast(typeinfo); if (array_typeinfo != nullptr) { typeinfo = array_typeinfo->item_type_info(); type = typeinfo->type(); + single_field = false; } else { return Status::NotSupported("unsupported array type for inverted index: " + std::to_string(int(type))); @@ -610,10 +638,10 @@ Status InvertedIndexColumnWriter::create(const Field* field, } switch (type) { -#define M(TYPE) \ - case TYPE: \ - *res = std::make_unique>( \ - field_name, index_file_writer, index_meta); \ +#define M(TYPE) \ + case TYPE: \ + *res = std::make_unique>( \ + field_name, index_file_writer, index_meta, single_field); \ break; M(FieldType::OLAP_FIELD_TYPE_TINYINT) M(FieldType::OLAP_FIELD_TYPE_SMALLINT) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 8a71ca5ba44648..f3be4d8033363e 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -214,6 +214,9 @@ class ExecEnv { MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); } WalManager* wal_mgr() { return _wal_manager.get(); } #ifdef BE_TEST + void set_tmp_file_dir(std::unique_ptr tmp_file_dirs) { + this->_tmp_file_dirs = std::move(tmp_file_dirs); + } void set_ready() { this->_s_ready = true; } void set_not_ready() { this->_s_ready = false; } void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) { diff --git a/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp new file mode 100644 index 00000000000000..74e9827db25199 --- /dev/null +++ b/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "gtest/gtest_pred_impl.h" +#include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" +#include "olap/rowset/segment_v2/inverted_index_compound_reader.h" +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_fs_directory.h" +#include "olap/rowset/segment_v2/inverted_index_writer.h" +#include "olap/rowset/segment_v2/zone_map_index.h" +#include "olap/tablet_schema.h" +#include "olap/tablet_schema_helper.h" +#include "runtime/exec_env.h" +#include "util/slice.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/common/pod_array_fwd.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/olap/olap_data_convertor.h" + +using namespace lucene::index; +using doris::segment_v2::InvertedIndexFileWriter; + +namespace doris { +namespace segment_v2 { + +class InvertedIndexArrayTest : public testing::Test { +public: + const std::string kTestDir = "./ut_dir/inverted_index_array_test"; + + void check_terms_stats(string dir_str, string file_str) { + auto fs = io::global_local_filesystem(); + std::unique_ptr reader = std::make_unique( + DorisFSDirectoryFactory::getDirectory(fs, dir_str.c_str()), file_str.c_str(), 4096); + std::cout << "Term statistics for " << file_str << std::endl; + std::cout << "==================================" << std::endl; + lucene::store::Directory* dir = reader.get(); + + IndexReader* r = IndexReader::open(dir); + + printf("Max Docs: %d\n", r->maxDoc()); + printf("Num Docs: %d\n", r->numDocs()); + + TermEnum* te = r->terms(); + int32_t nterms; + for (nterms = 0; te->next(); nterms++) { + /* empty */ + std::string token = + lucene_wcstoutf8string(te->term(false)->text(), te->term(false)->textLength()); + + printf("Term: %s ", token.c_str()); + printf("Freq: %d\n", te->docFreq()); + } + printf("Term count: %d\n\n", nterms); + te->close(); + _CLLDELETE(te); + + r->close(); + _CLLDELETE(r); + reader->close(); + } + + void SetUp() override { + auto st = io::global_local_filesystem()->delete_directory(kTestDir); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(kTestDir); + ASSERT_TRUE(st.ok()) << st; + config::enable_write_index_searcher_cache = false; + std::vector paths; + paths.emplace_back(kTestDir, 1024); + auto tmp_file_dirs = std::make_unique(paths); + st = tmp_file_dirs->init(); + if (!st.OK()) { + std::cout << "init tmp file dirs error:" << st.to_string() << std::endl; + return; + } + ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs)); + } + void TearDown() override { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kTestDir).ok()); + } + + void test_string(std::string testname, Field* field) { + EXPECT_TRUE(field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY); + std::string filename = kTestDir + "/" + testname; + auto fs = io::global_local_filesystem(); + + io::FileWriterPtr file_writer; + EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); + auto index_meta_pb = std::make_unique(); + index_meta_pb->set_index_type(IndexType::INVERTED); + index_meta_pb->set_index_id(26033); + index_meta_pb->set_index_name("index_inverted_arr1"); + index_meta_pb->clear_col_unique_id(); + index_meta_pb->add_col_unique_id(0); + + TabletIndex idx_meta; + idx_meta.index_type(); + idx_meta.init_from_pb(*index_meta_pb.get()); + auto index_file_writer = std::make_unique( + fs, file_writer->path().parent_path(), file_writer->path().filename(), + InvertedIndexStorageFormatPB::V1); + std::unique_ptr _inverted_index_builder = nullptr; + EXPECT_EQ(InvertedIndexColumnWriter::create(field, &_inverted_index_builder, + index_file_writer.get(), &idx_meta), + Status::OK()); + vectorized::PaddedPODArray _slice; + _slice.resize(5); + + vectorized::Array a1, a2; + a1.push_back("amory"); + a1.push_back("doris"); + a2.push_back(vectorized::Null()); + a2.push_back("amory"); + a2.push_back("commiter"); + + vectorized::DataTypePtr s1 = std::make_shared( + std::make_shared()); + vectorized::DataTypePtr au = std::make_shared(s1); + vectorized::MutableColumnPtr col = au->create_column(); + col->insert(a1); + col->insert(a2); + vectorized::ColumnPtr column_array = std::move(col); + vectorized::ColumnWithTypeAndName type_and_name(column_array, au, "arr1"); + + vectorized::PaddedPODArray _offsets; + _offsets.reserve(3); + _offsets.emplace_back(0); + _offsets.emplace_back(2); + _offsets.emplace_back(5); + const uint8_t* offsets_ptr = (const uint8_t*)(_offsets.data()); + + auto* col_arr = assert_cast(column_array.get()); + const vectorized::UInt8* nested_null_map = + assert_cast(col_arr->get_data_ptr().get()) + ->get_null_map_data() + .data(); + auto* col_arr_str = assert_cast( + assert_cast(col_arr->get_data_ptr().get()) + ->get_nested_column_ptr() + .get()); + const char* char_data = (const char*)(col_arr_str->get_chars().data()); + const vectorized::ColumnString::Offset* offset_cur = col_arr_str->get_offsets().data(); + const vectorized::ColumnString::Offset* offset_end = offset_cur + 5; + + Slice* slice = _slice.data(); + size_t string_offset = *(offset_cur - 1); + const vectorized::UInt8* nullmap_cur = nested_null_map; + while (offset_cur != offset_end) { + if (!*nullmap_cur) { + slice->data = const_cast(char_data + string_offset); + slice->size = *offset_cur - string_offset; + } else { + slice->data = nullptr; + slice->size = 0; + } + string_offset = *offset_cur; + ++nullmap_cur; + ++slice; + ++offset_cur; + } + + auto field_size = field->get_sub_field(0)->size(); + Status st = _inverted_index_builder->add_array_values( + field_size, reinterpret_cast(_slice.data()), + reinterpret_cast(nested_null_map), offsets_ptr, 2); + EXPECT_EQ(st, Status::OK()); + EXPECT_EQ(_inverted_index_builder->finish(), Status::OK()); + EXPECT_EQ(index_file_writer->close(), Status::OK()); + + { + std::cout << "dir: " << file_writer->path().parent_path().string() << std::endl; + string idx_file_name = file_writer->path().filename().string() + "_26033.idx"; + std::cout << "file: " << file_writer->path().filename().string() << std::endl; + check_terms_stats(file_writer->path().parent_path().string(), idx_file_name); + } + } +}; + +TEST_F(InvertedIndexArrayTest, ArrayString) { + TabletColumn arrayTabletColumn; + arrayTabletColumn.set_unique_id(0); + arrayTabletColumn.set_name("arr1"); + arrayTabletColumn.set_type(FieldType::OLAP_FIELD_TYPE_ARRAY); + TabletColumn arraySubColumn; + arraySubColumn.set_unique_id(1); + arraySubColumn.set_name("arr_sub_string"); + arraySubColumn.set_type(FieldType::OLAP_FIELD_TYPE_STRING); + arrayTabletColumn.add_sub_column(arraySubColumn); + Field* field = FieldFactory::create(arrayTabletColumn); + test_string("InvertedIndexArray", field); + delete field; +} + +} // namespace segment_v2 +} // namespace doris