Skip to content

Commit

Permalink
[fix](index compaction)Fix MOW index compaction core (apache#32085)
Browse files Browse the repository at this point in the history
In MOW table with inverted index, when the table has deleted docs,
the doc_id in rowid_conversion_map will be INT32_MAX.
index_compact in CLucene is not handling it correctly.
It will cause doc lost in specific terms and the postings will be incorrect.
The index compaction process will be fail.

Here are the changes:
1. Remove INT32_MAX out of destPostingsQueues in CLucene to handle deleted doc right.
2. Add debug code and switch for index compaction in Doris
3. Add debug_index_compaction operation in index_tool
  • Loading branch information
qidaye authored Mar 12, 2024
1 parent c6d9f5f commit 96493eb
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 4 deletions.
2 changes: 1 addition & 1 deletion be/src/clucene
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_mBool(inverted_index_compaction_enable, "false");
// Only for debug, do not use in production
DEFINE_mBool(debug_inverted_index_compaction, "false");
// index by RAM directory
DEFINE_mBool(inverted_index_ram_dir_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,8 @@ DECLARE_mInt32(inverted_index_max_buffered_docs);
DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_mBool(inverted_index_compaction_enable);
// Only for debug, do not use in production
DECLARE_mBool(debug_inverted_index_compaction);
// index by RAM directory
DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
Expand Down
126 changes: 123 additions & 3 deletions be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <gflags/gflags.h>

#include <filesystem>
#include <fstream>
#include <iostream>
#include <nlohmann/json.hpp>
#include <sstream>
#include <string>
#include <vector>
Expand All @@ -37,7 +39,8 @@ using namespace lucene::index;
using namespace lucene::util;
using namespace lucene::search;

DEFINE_string(operation, "", "valid operation: show_nested_files,check_terms,term_query");
DEFINE_string(operation, "",
"valid operation: show_nested_files,check_terms,term_query,debug_index_compaction");

DEFINE_string(directory, "./", "inverted index file directory");
DEFINE_string(idx_file_name, "", "inverted index file name");
Expand All @@ -46,17 +49,31 @@ DEFINE_string(term, "", "inverted index term to query");
DEFINE_string(column_name, "", "inverted index column_name to query");
DEFINE_string(pred_type, "", "inverted index term query predicate, eq/lt/gt/le/ge/match etc.");
DEFINE_bool(print_row_id, false, "print row id when query terms");
DEFINE_bool(print_doc_id, false, "print doc id when check terms stats");
// only for debug index compaction
DEFINE_int32(idx_id, -1, "inverted index id");
DEFINE_string(src_idx_dirs_file, "", "source segment index files");
DEFINE_string(dest_idx_dirs_file, "", "destination segment index files");
DEFINE_string(dest_seg_num_rows_file, "", "destination segment number of rows");
DEFINE_string(tablet_path, "", "tablet path");
DEFINE_string(trans_vec_file, "", "rowid conversion map file");

std::string get_usage(const std::string& progname) {
std::stringstream ss;
ss << progname << " is the Doris inverted index file tool.\n";
ss << "Stop BE first before use this tool.\n";
ss << "Usage:\n";
ss << "./index_tool --operation=show_nested_files --idx_file_path=path/to/file\n";
ss << "./index_tool --operation=check_terms_stats --idx_file_path=path/to/file\n";
ss << "./index_tool --operation=check_terms_stats --idx_file_path=path/to/file "
"--print_doc_id\n";
ss << "./index_tool --operation=term_query --directory=directory "
"--idx_file_name=file --print_row_id --term=term --column_name=column_name "
"--pred_type=eq/lt/gt/le/ge/match etc\n";
ss << "*** debug_index_compaction operation is only for offline debug index compaction, do not "
"use in production ***\n";
ss << "./index_tool --operation=debug_index_compaction --idx_id=index_id "
"--src_idx_dirs_file=path/to/file --dest_idx_dirs_file=path/to/file "
"--dest_seg_num_rows_file=path/to/file --tablet_path=path/to/tablet "
"--trans_vec_file=path/to/file\n";
return ss.str();
}

Expand Down Expand Up @@ -126,6 +143,14 @@ void check_terms_stats(lucene::store::Directory* dir) {

printf("Term: %s ", token.c_str());
printf("Freq: %d\n", te->docFreq());
if (FLAGS_print_doc_id) {
TermDocs* td = r->termDocs(te->term());
while (td->next()) {
printf("DocID: %d ", td->doc());
printf("TermFreq: %d\n", td->freq());
}
_CLLDELETE(td);
}
}
printf("Term count: %d\n\n", nterms);
_CLLDELETE(te);
Expand Down Expand Up @@ -232,6 +257,101 @@ int main(int argc, char** argv) {
} catch (CLuceneError& err) {
std::cerr << "error occurred when check_terms_stats: " << err.what() << std::endl;
}
} else if (FLAGS_operation == "debug_index_compaction") {
// only for debug index compaction, do not use in production
if (FLAGS_idx_id <= 0 || FLAGS_src_idx_dirs_file == "" || FLAGS_dest_idx_dirs_file == "" ||
FLAGS_dest_seg_num_rows_file == "" || FLAGS_tablet_path == "" ||
FLAGS_trans_vec_file == "") {
std::cout << "invalid params for debug_index_compaction " << std::endl;
return -1;
}

auto fs = doris::io::global_local_filesystem();

auto read_file_to_json = [&](const std::string& file, std::string& output) {
if (!fs->read_file_to_string(file, &output).ok()) {
std::cout << "read file " << file << " failed" << std::endl;
return false;
}
return true;
};

int32_t index_id = FLAGS_idx_id;
std::string tablet_path = FLAGS_tablet_path;
std::string src_index_dirs_string;
std::string dest_index_dirs_string;
std::string dest_segment_num_rows_string;
std::string trans_vec_string;

if (!read_file_to_json(FLAGS_src_idx_dirs_file, src_index_dirs_string) ||
!read_file_to_json(FLAGS_dest_idx_dirs_file, dest_index_dirs_string) ||
!read_file_to_json(FLAGS_dest_seg_num_rows_file, dest_segment_num_rows_string) ||
!read_file_to_json(FLAGS_trans_vec_file, trans_vec_string)) {
return -1;
}
std::vector<std::string> src_index_files = nlohmann::json::parse(src_index_dirs_string);
std::vector<std::string> dest_index_files = nlohmann::json::parse(dest_index_dirs_string);
std::vector<uint32_t> dest_segment_num_rows =
nlohmann::json::parse(dest_segment_num_rows_string);
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec =
nlohmann::json::parse(trans_vec_string);
int src_segment_num = src_index_files.size();
int dest_segment_num = dest_index_files.size();

std::string index_writer_path = tablet_path + "/tmp_index_writer";
lucene::store::Directory* dir =
DorisCompoundDirectoryFactory::getDirectory(fs, index_writer_path.c_str(), false);
lucene::analysis::SimpleAnalyzer<char> analyzer;
auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true /* create */,
true /* closeDirOnShutdown */);
std::ostream* infoStream = &std::cout;
index_writer->setInfoStream(infoStream);
// get compound directory src_index_dirs
std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num);
for (int i = 0; i < src_segment_num; ++i) {
// format: rowsetId_segmentId_indexId.idx
std::string src_idx_full_name =
src_index_files[i] + "_" + std::to_string(index_id) + ".idx";
DorisCompoundReader* reader = new DorisCompoundReader(
DorisCompoundDirectoryFactory::getDirectory(fs, tablet_path.c_str()),
src_idx_full_name.c_str());
src_index_dirs[i] = reader;
}

// get dest idx file paths
std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto path = tablet_path + "/" + dest_index_files[i] + "_" + std::to_string(index_id);
dest_index_dirs[i] =
DorisCompoundDirectoryFactory::getDirectory(fs, path.c_str(), true);
}

index_writer->indexCompaction(src_index_dirs, dest_index_dirs, trans_vec,
dest_segment_num_rows);

index_writer->close();
_CLDELETE(index_writer);
// NOTE: need to ref_cnt-- for dir,
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed.
_CLDECDELETE(dir)
for (auto d : src_index_dirs) {
if (d != nullptr) {
d->close();
_CLDELETE(d);
}
}
for (auto d : dest_index_dirs) {
if (d != nullptr) {
// NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize.
//d->close();
_CLDELETE(d);
}
}

// delete temporary index_writer_path
fs->delete_directory(index_writer_path.c_str());
} else {
std::cout << "invalid operation: " << FLAGS_operation << "\n" << usage << std::endl;
return -1;
Expand Down
42 changes: 42 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <nlohmann/json.hpp>
#include <numeric>
#include <ostream>
#include <set>
Expand All @@ -36,6 +37,7 @@
#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/remote_file_system.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
Expand Down Expand Up @@ -472,6 +474,46 @@ Status Compaction::do_compaction_impl(int64_t permits) {
dest_index_files[i] = prefix;
}

// Only write info files when debug index compaction is enabled.
// The files are used to debug index compaction and works with index_tool.
if (config::debug_inverted_index_compaction) {
auto write_json_to_file = [&](const nlohmann::json& json_obj,
const std::string& file_name) {
io::FileWriterPtr file_writer;
std::string file_path =
fmt::format("{}/{}.json", config::sys_log_dir, file_name);
RETURN_IF_ERROR(
io::global_local_filesystem()->create_file(file_path, &file_writer));
RETURN_IF_ERROR(file_writer->append(json_obj.dump()));
RETURN_IF_ERROR(file_writer->append("\n"));
return file_writer->close();
};

// Convert trans_vec to JSON and print it
nlohmann::json trans_vec_json = trans_vec;
auto output_version = _output_version.to_string().substr(
1, _output_version.to_string().size() - 2);
RETURN_IF_ERROR(write_json_to_file(
trans_vec_json,
fmt::format("trans_vec_{}_{}", _tablet->tablet_id(), output_version)));

nlohmann::json src_index_files_json = src_index_files;
RETURN_IF_ERROR(write_json_to_file(
src_index_files_json,
fmt::format("src_idx_dirs_{}_{}", _tablet->tablet_id(), output_version)));

nlohmann::json dest_index_files_json = dest_index_files;
RETURN_IF_ERROR(write_json_to_file(
dest_index_files_json,
fmt::format("dest_idx_dirs_{}_{}", _tablet->tablet_id(), output_version)));

nlohmann::json dest_segment_num_rows_json = dest_segment_num_rows;
RETURN_IF_ERROR(
write_json_to_file(dest_segment_num_rows_json,
fmt::format("dest_seg_num_rows_{}_{}",
_tablet->tablet_id(), output_version)));
}

// create index_writer to compaction indexes
auto& fs = _output_rowset->rowset_meta()->fs();
auto& tablet_path = _tablet->tablet_path();
Expand Down

0 comments on commit 96493eb

Please sign in to comment.