Skip to content

Commit

Permalink
pick selectdb-cloud-dev-semi pr 851, 926 (apache#933)
Browse files Browse the repository at this point in the history
* [Feature-WIP](inverted) add inverted index compaction implementation (apache#851)

* [Feature-WIP](inverted) add inverted index compaction implementation

* [Feature-WIP](inverted) set condition for rowid vec trans

* [Fix](inverted) remove useless function

* [Fix](inverted) seperate compound directory fs and cfs, for cloud mode and other situations

Co-authored-by: airborne12 <[email protected]>
  • Loading branch information
Tanya-W and airborne12 authored Oct 26, 2022
1 parent 767fb17 commit b778b43
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 52 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,8 @@ CONF_String(inverted_index_searcher_cache_limit, "10%");
// inverted index
CONF_Int32(query_bkd_inverted_index_limit_percent, "10"); // 10%

CONF_Bool(enable_index_compaction, "false");

} // namespace config

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ add_library(Olap STATIC
rowset/segment_v2/inverted_index_desc.cpp
rowset/segment_v2/inverted_index_compound_directory.cpp
rowset/segment_v2/inverted_index_compound_reader.cpp
rowset/segment_v2/inverted_index_compaction.cpp
task/engine_batch_load_task.cpp
task/engine_checksum_task.cpp
task/engine_clone_task.cpp
Expand Down
77 changes: 75 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "gutil/strings/substitute.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/segment_v2/inverted_index_compaction.h"
#include "olap/tablet.h"
#include "util/time.h"
#include "util/trace.h"
Expand Down Expand Up @@ -168,6 +169,16 @@ Status Compaction::do_compaction_impl(int64_t permits) {
#ifdef CLOUD_MODE
context.fs = cloud::latest_fs();
#endif
if (use_vectorized_compaction && config::enable_index_compaction &&
((_tablet->keys_type() == KeysType::UNIQUE_KEYS ||
_tablet->keys_type() == KeysType::DUP_KEYS))) {
auto columns = cur_tablet_schema->get_inverted_index_column();
for (auto column_id : columns) {
if (field_is_slice_type(cur_tablet_schema->column(column_id).type())) {
context.skip_inverted_index.insert(column_id);
}
}
}
RETURN_IF_ERROR(_tablet->create_rowset_writer(&context, &_output_rs_writer));
#ifdef CLOUD_MODE
RETURN_IF_ERROR(cloud::meta_mgr()->prepare_rowset(_output_rs_writer->rowset_meta(), true));
Expand All @@ -180,8 +191,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Merger::Statistics stats;
Status res;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
if ( context.skip_inverted_index.size() > 0 ||
(_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
stats.rowid_conversion = &_rowid_conversion;
}

Expand Down Expand Up @@ -220,6 +232,67 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// 3. check correctness
RETURN_NOT_OK(check_correctness(stats));
TRACE("check correctness finished");
if (_input_row_num > 0 && stats.rowid_conversion && use_vectorized_compaction &&
config::enable_index_compaction) {
OlapStopWatch inverted_watch;
// translation vec
// <<dest_idx_num, desc_docId>>
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec =
stats.rowid_conversion->get_rowid_conversion_map();

// source rowset,segment -> index_id
std::map<std::pair<RowsetId, uint32_t>, uint32_t> src_seg_to_id_map =
stats.rowid_conversion->get_src_segment_to_id_map();
// dest rowset id
RowsetId dest_rowset_id = stats.rowid_conversion->get_dst_rowset_id();
// dest segment id -> num rows
std::vector<uint32_t> dest_segment_num_rows;
RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows));

auto src_segment_num = src_seg_to_id_map.size();
auto dest_segment_num = dest_segment_num_rows.size();

// src index files
// format: rowsetId_segmentId
std::vector<std::string> src_index_files(src_segment_num);
for (auto m : src_seg_to_id_map) {
std::pair<RowsetId, uint32_t> p = m.first;
src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second);
}

// dest index files
// format: rowsetId_segmentId
std::vector<std::string> dest_index_files(dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i);
dest_index_files[i] = prefix;
}

// create index_writer to compaction indexes
auto fs = _output_rowset->rowset_meta()->fs();
auto tablet_path = _output_rowset->tablet_path();

DCHECK(dest_index_files.size() > 0);
// we choose the first destination segment name as the temporary index writer path
// Used to distinguish between different index compaction
auto index_writer_path = tablet_path + "/" + dest_index_files[0];
LOG(INFO) << "start index compaction"
<< ". tablet=" << _tablet->full_name()
<< ", source index size=" << src_segment_num
<< ", destination index size=" << dest_segment_num << ".";
std::for_each(context.skip_inverted_index.cbegin(), context.skip_inverted_index.cend(),
[&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files,
&dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows](int32_t column_id) {
compact_column(column_id, src_segment_num, dest_segment_num, src_index_files,
dest_index_files, fs, index_writer_path, tablet_path, trans_vec,
dest_segment_num_rows);
});

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->full_name() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";
}

// 4. update and persistent tablet meta
RETURN_NOT_OK(update_tablet_meta());
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class RowIdConversion {

// set dst rowset id
void set_dst_rowset_id(const RowsetId& dst_rowset_id) { _dst_rowst_id = dst_rowset_id; }

// get dst rowset id
const RowsetId get_dst_rowset_id() { return _dst_rowst_id;}
// add row id to the map
void add(const std::vector<RowLocation>& rss_row_ids,
const std::vector<uint32_t>& dst_segments_num_row) {
Expand Down Expand Up @@ -92,6 +93,10 @@ class RowIdConversion {
return _segments_rowid_map;
}

const std::map<std::pair<RowsetId, uint32_t>, uint32_t>& get_src_segment_to_id_map() {
return _segment_to_id_map;
}

std::pair<RowsetId, uint32_t> get_segment_by_id(uint32_t id) const {
DCHECK_GT(_id_to_segment_map.size(), id);
return _id_to_segment_map.at(id);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct RowsetWriterContext {
int64_t oldest_write_timestamp = -1;
int64_t newest_write_timestamp = -1;
bool enable_unique_key_merge_on_write = false;
std::set<int32_t> skip_inverted_index;

// for tracing local schema change record
std::shared_ptr<vectorized::object_util::LocalSchemaChangeRecorder> schema_change_recorder = nullptr;
Expand Down
68 changes: 68 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#include "inverted_index_compaction.h"

#include <CLucene.h>

#include "inverted_index_compound_directory.h"
#include "inverted_index_compound_reader.h"

namespace doris {
namespace segment_v2 {
void compact_column(int32_t column_id, int src_segment_num, int dest_segment_num,
std::vector<std::string> src_index_files,
std::vector<std::string> dest_index_files, io::FileSystem* fs,
std::string index_writer_path, std::string tablet_path,
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
std::vector<uint32_t> dest_segment_num_rows) {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, index_writer_path.c_str(), false);
lucene::index::IndexWriter* index_writer = _CLNEW lucene::index::IndexWriter(
dir, nullptr, true /* create */, true /* closeDirOnShutdown */);
// NOTE: need to ref_cnt-- for dir,
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed.
_CLLDECDELETE(dir)

// get compound directory src_index_dirs
std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num);
for (int i = 0; i < src_segment_num; ++i) {
// format: rowsetId_segmentId_columnId.idx
std::string src_idx_full_name =
src_index_files[i] + "_" + std::to_string(column_id) + ".idx";
DorisCompoundReader* reader = new DorisCompoundReader(
DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str(), true),
src_idx_full_name.c_str());
src_index_dirs[i] = reader;
}

// get dest idx file paths
std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto path = tablet_path + "/" + dest_index_files[i] + "_" + std::to_string(column_id);
dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs, path.c_str(), true);
}

index_writer->indexCompaction(src_index_dirs, dest_index_dirs, trans_vec,
dest_segment_num_rows);

index_writer->close();
_CLDELETE(index_writer);
index_writer = nullptr;
for (auto d : src_index_dirs) {
if (d != nullptr) {
d->close();
_CLDELETE(d);
}
}
for (auto d : dest_index_dirs) {
if (d != nullptr) {
d->close();
_CLDELETE(d);
}
}

// delete temporary index_writer_path
fs->delete_directory(index_writer_path.c_str());
}
} // namespace segment_v2
} // namespace doris
16 changes: 16 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_compaction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <vector>
#include "io/fs/file_system.h"

namespace doris {

namespace segment_v2 {
void compact_column(int32_t column_id, int src_segment_num, int dest_segment_num,
std::vector<std::string> src_index_files,
std::vector<std::string> dest_index_files, io::FileSystem* fs,
std::string index_writer_path, std::string tablet_path,
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
std::vector<uint32_t> dest_segment_num_rows);
} // namespace segment_v2
} // namespace doris
64 changes: 47 additions & 17 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ void DorisCompoundFileWriter::writeCompoundFile() {
int32_t file_count = sorted_files.size();

// get index file name
io::Path path(((DorisCompoundDirectory*)directory)->getDirName());
auto idx_path = path.parent_path();
std::string idx_name = std::string(path.stem().c_str()) + COMPOUND_FILE_EXTENSION;
//io::Path path(((DorisCompoundDirectory*)directory)->getDirName());
//auto idx_path = path.parent_path();
//std::string idx_name = std::string(path.stem().c_str()) + COMPOUND_FILE_EXTENSION;

io::Path cfs_path(((DorisCompoundDirectory*)directory)->getCfsDirName());
auto idx_path = cfs_path.parent_path();
std::string idx_name = std::string(cfs_path.stem().c_str()) + COMPOUND_FILE_EXTENSION;
// write file entries to ram directory to get header length
lucene::store::RAMDirectory ram_dir;
std::unique_ptr<lucene::store::IndexOutput> ram_output(ram_dir.createOutput(idx_name.c_str()));
Expand Down Expand Up @@ -178,8 +181,9 @@ void DorisCompoundFileWriter::writeCompoundFile() {
// write real file
// rowset data file home
// lock problem?
DorisCompoundDirectory* out_dir = DorisCompoundDirectory::getDirectory(
((DorisCompoundDirectory*)directory)->getFileSystem(), idx_path.c_str(), false);
auto compound_fs = ((DorisCompoundDirectory*)directory)->getCompoundFileSystem();
auto out_dir = DorisCompoundDirectory::getDirectory(compound_fs, idx_path.c_str(), false);

std::unique_ptr<lucene::store::IndexOutput> output(out_dir->createOutput(idx_name.c_str()));
output->writeVInt(file_count);
// write file entries
Expand Down Expand Up @@ -612,9 +616,21 @@ DorisCompoundDirectory::DorisCompoundDirectory()
}

void DorisCompoundDirectory::init(io::FileSystem* _fs, const char* _path,
lucene::store::LockFactory* lockFactory) {
lucene::store::LockFactory* lockFactory,
io::FileSystem* cfs, const char* cfs_path) {
fs = _fs;
directory = _path;

if (cfs == nullptr) {
compound_fs = fs;
} else {
compound_fs = cfs;
}
if (cfs_path != nullptr) {
cfs_directory = cfs_path;
} else {
cfs_directory = _path;
}
bool doClearLockID = false;

if (lockFactory == NULL) {
Expand Down Expand Up @@ -725,6 +741,11 @@ const char* DorisCompoundDirectory::getDirName() const {
return directory.c_str();
}


const char* DorisCompoundDirectory::getCfsDirName() const {
return cfs_directory.c_str();
}

DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
io::FileSystem* fileSystem, const char* file, bool create,
lucene::store::LockFactory* lockFactory) {
Expand All @@ -740,18 +761,26 @@ DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
}

DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(io::FileSystem* fs, const char* file,
bool useCompoundFileWriter) {
DorisCompoundDirectory* dir = getDirectory(fs, file, (lucene::store::LockFactory*)NULL);
bool useCompoundFileWriter,
io::FileSystem* cfs_fs, const char* cfs_file) {
DorisCompoundDirectory* dir = getDirectory(fs, file, (lucene::store::LockFactory*)nullptr, cfs_fs, cfs_file);
dir->useCompoundFileWriter = useCompoundFileWriter;
return dir;
}

//static
DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
io::FileSystem* fs, const char* _file, lucene::store::LockFactory* lockFactory) {
DorisCompoundDirectory* dir = NULL;
io::FileSystem* fs, const char* _file, lucene::store::LockFactory* lockFactory,
io::FileSystem* cfs, const char* _cfs_file) {
const char* cfs_file = _cfs_file;
if (cfs_file == nullptr) {
cfs_file = _file;
}
DorisCompoundDirectory* dir = nullptr;
{
if (!_file || !*_file) _CLTHROWA(CL_ERR_IO, "Invalid directory");
if (!_file || !*_file) {
_CLTHROWA(CL_ERR_IO, "Invalid directory");
}

// char buf[CL_MAX_PATH];
// char* file = _realpath(
Expand Down Expand Up @@ -786,11 +815,12 @@ DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
}

SCOPED_LOCK_MUTEX(DIRECTORIES_LOCK)
dir = DIRECTORIES.get(file);
// NOTE: we use cfs_file for cache key here, because *file* is temporary file path in cloud mode.
dir = DIRECTORIES.get(cfs_file);
if (dir == NULL) {
dir = _CLNEW DorisCompoundDirectory();
dir->init(fs, file, lockFactory);
DIRECTORIES.put(dir->directory.c_str(), dir);
dir->init(fs, file, lockFactory, cfs, cfs_file);
DIRECTORIES.put(dir->cfs_directory.c_str(), dir);
} else {
if (lockFactory != NULL && lockFactory != dir->getLockFactory()) {
_CLTHROWA(CL_ERR_IO,
Expand Down Expand Up @@ -877,7 +907,7 @@ void DorisCompoundDirectory::close() {
CND_PRECONDITION(directory[0] != 0, "directory is not open");

if (--refCount <= 0) { //refcount starts at 1
Directory* dir = DIRECTORIES.get(getDirName());
Directory* dir = DIRECTORIES.get(getCfsDirName());
if (dir) {
if (useCompoundFileWriter) {
DorisCompoundFileWriter* cfsWriter = _CLNEW DorisCompoundFileWriter(dir);
Expand All @@ -888,7 +918,7 @@ void DorisCompoundDirectory::close() {
_CLDELETE(cfsWriter);
}

DIRECTORIES.remove(getDirName()); //this will be removed in ~DorisCompoundDirectory
DIRECTORIES.remove(getCfsDirName()); //this will be removed in ~DorisCompoundDirectory
_CLDECDELETE(dir);
//NOTE: Don't unlock the mutex, since it has been destroyed now...
return;
Expand Down Expand Up @@ -1021,4 +1051,4 @@ std::string DorisCompoundDirectory::toString() const {
}

} // namespace segment_v2
} // namespace doris
} // namespace doris
Loading

0 comments on commit b778b43

Please sign in to comment.