From 5e48cc80660f1728955ed75cc9fd44edd1a6b8b5 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Sun, 10 Dec 2023 00:54:30 +0800 Subject: [PATCH 1/5] add wal function --- be/src/olap/wal_manager.h | 1 + be/src/runtime/group_commit_mgr.cpp | 43 ++++++++++++++++++--- be/src/runtime/group_commit_mgr.h | 20 ++++++++-- be/src/vec/sink/group_commit_block_sink.cpp | 3 +- be/src/vec/sink/writer/vwal_writer.cpp | 39 ++++++++++--------- be/src/vec/sink/writer/vwal_writer.h | 21 +++++----- 6 files changed, 90 insertions(+), 37 deletions(-) diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index 4634916c60882b..d0a547a8d6f933 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#pragma once #include #include diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 7af04600cf42af..47a21653bbe4ab 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -85,6 +85,8 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo _block_queue.pop_front(); _all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed); _single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); + //append block + RETURN_IF_ERROR(append_block(0, 0, block, nullptr, nullptr)); } if (_block_queue.empty() && need_commit && _load_ids.empty()) { CHECK_EQ(_single_block_queue_bytes->load(), 0); @@ -130,8 +132,8 @@ void LoadBlockQueue::cancel(const Status& st) { Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr block, - std::shared_ptr& load_block_queue) { + std::shared_ptr block, std::shared_ptr& load_block_queue, + int be_exe_version) { DCHECK(table_id == _table_id); { std::unique_lock l(_lock); @@ -155,7 +157,8 @@ Status GroupCommitTable::get_first_block_load_queue( if (!_need_plan_fragment) { _need_plan_fragment = true; RETURN_IF_ERROR(_thread_pool->submit_func([&] { - [[maybe_unused]] auto st = _create_group_commit_load(load_block_queue); + [[maybe_unused]] auto st = + _create_group_commit_load(load_block_queue, be_exe_version); })); } _cv.wait_for(l, std::chrono::seconds(4)); @@ -175,7 +178,7 @@ Status GroupCommitTable::get_first_block_load_queue( } Status GroupCommitTable::_create_group_commit_load( - std::shared_ptr& load_block_queue) { + std::shared_ptr& load_block_queue, int be_exe_version) { Status st = Status::OK(); std::unique_ptr> finish_plan_func((int*)0x01, [&](int*) { if (!st.ok()) { @@ -251,6 +254,10 @@ Status GroupCommitTable::_create_group_commit_load( if (_exec_env->wal_mgr()->is_running()) { _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id, WalManager::WAL_STATUS::PREPARE); + //create wal + RETURN_IF_ERROR( + load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), + params.desc_tbl.slotDescriptors, be_exe_version)); st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, pipeline_params); } else { @@ -310,6 +317,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ auto it = _load_block_queues.find(instance_id); if (it != _load_block_queues.end()) { auto& load_block_queue = it->second; + //close wal + RETURN_IF_ERROR(load_block_queue->close_wal()); if (prepare_failed || !status.ok()) { load_block_queue->cancel(status); } @@ -418,7 +427,8 @@ void GroupCommitMgr::stop() { Status GroupCommitMgr::get_first_block_load_queue( int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr block, - std::shared_ptr& load_block_queue) { + std::shared_ptr& load_block_queue, + int be_exe_version) { std::shared_ptr group_commit_table; { std::lock_guard wlock(_lock); @@ -430,7 +440,7 @@ Status GroupCommitMgr::get_first_block_load_queue( group_commit_table = _table_map[table_id]; } return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id, - block, load_block_queue); + block, load_block_queue, be_exe_version); } Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, @@ -447,4 +457,25 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i } return group_commit_table->get_load_block_queue(instance_id, load_block_queue); } +Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, + const std::string& import_label, WalManager* wal_manager, + std::vector& slot_desc, int be_exe_version) { + _v_wal_writer = std::make_shared( + db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version); + RETURN_IF_ERROR(_v_wal_writer->init()); + return Status::OK(); +} +Status LoadBlockQueue::append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, + vectorized::OlapTableBlockConvertor* block_convertor, + vectorized::OlapTabletFinder* tablet_finder) { + RETURN_IF_ERROR(_v_wal_writer->append_block(num_rows, filter_rows, block, block_convertor, + tablet_finder)); + return Status::OK(); +} +Status LoadBlockQueue::close_wal() { + if (_v_wal_writer != nullptr) { + RETURN_IF_ERROR(_v_wal_writer->close()); + } + return Status::OK(); +} } // namespace doris diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index df35f6c83ebf8e..962d07bcb68cb6 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -25,6 +25,9 @@ #include "common/status.h" #include "util/threadpool.h" #include "vec/core/block.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" +#include "vec/sink/writer/vwal_writer.h" namespace doris { class ExecEnv; @@ -53,6 +56,13 @@ class LoadBlockQueue { Status add_load_id(const UniqueId& load_id); void remove_load_id(const UniqueId& load_id); void cancel(const Status& st); + Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, + WalManager* wal_manager, std::vector& slot_desc, + int be_exe_version); + Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, + vectorized::OlapTableBlockConvertor* block_convertor, + vectorized::OlapTabletFinder* tablet_finder); + Status close_wal(); static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; UniqueId load_instance_id; @@ -80,6 +90,7 @@ class LoadBlockQueue { std::shared_ptr _single_block_queue_bytes; // group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' int64_t _group_commit_interval_ms; + std::shared_ptr _v_wal_writer; }; class GroupCommitTable { @@ -94,12 +105,14 @@ class GroupCommitTable { Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr block, - std::shared_ptr& load_block_queue); + std::shared_ptr& load_block_queue, + int be_exe_version); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr& load_block_queue); private: - Status _create_group_commit_load(std::shared_ptr& load_block_queue); + Status _create_group_commit_load(std::shared_ptr& load_block_queue, + int be_exe_version); Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, int64_t txn_id, bool is_pipeline, const TExecPlanFragmentParams& params, @@ -134,7 +147,8 @@ class GroupCommitMgr { Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr block, - std::shared_ptr& load_block_queue); + std::shared_ptr& load_block_queue, + int be_exe_version); private: ExecEnv* _exec_env = nullptr; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 1fc4205d9c6269..be51bb5c18aaa6 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -154,7 +154,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, load_id.__set_lo(_load_id.lo); if (_load_block_queue == nullptr) { RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue)); + _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue, + state->be_exec_version())); state->set_import_label(_load_block_queue->label); state->set_wal_id(_load_block_queue->txn_id); } diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index c21a5db14371e4..7586b90b73fe08 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -27,7 +27,6 @@ #include "common/compiler_util.h" #include "common/status.h" -#include "olap/wal_manager.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -43,38 +42,43 @@ namespace doris { namespace vectorized { -VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state, - TupleDescriptor* output_tuple_desc) +VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, + const std::string& import_label, WalManager* wal_manager, + std::vector& slot_desc, int be_exe_version) : _db_id(db_id), _tb_id(tb_id), _wal_id(wal_id), - _state(state), - _output_tuple_desc(output_tuple_desc) {} + _label(import_label), + _wal_manager(wal_manager), + _slot_descs(slot_desc), + _be_exe_version(be_exe_version) {} VWalWriter::~VWalWriter() {} Status VWalWriter::init() { - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id, - _state->import_label())); - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); - _state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id, - WalManager::WAL_STATUS::CREATE); + RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, _label)); + RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer)); + _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WAL_STATUS::CREATE); std::stringstream ss; - for (auto slot_desc : _output_tuple_desc->slots()) { - ss << std::to_string(slot_desc->col_unique_id()) << ","; + for (auto slot_desc : _slot_descs) { + if (slot_desc.col_unique_id < 0) { + continue; + } + ss << std::to_string(slot_desc.col_unique_id) << ","; } std::string col_ids = ss.str().substr(0, ss.str().size() - 1); + LOG(INFO) << "col_ids:" << col_ids; RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids)); return Status::OK(); } Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder, vectorized::Block* block, - RuntimeState* state, int64_t num_rows, int64_t filtered_rows) { + int64_t num_rows, int64_t filtered_rows) { PBlock pblock; size_t uncompressed_bytes = 0, compressed_bytes = 0; if (filtered_rows == 0) { - RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, + RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); } else { @@ -89,7 +93,7 @@ Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor, } res_block.add_row(block, i); } - RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock, + RETURN_IF_ERROR(res_block.to_block().serialize(_be_exe_version, &pblock, &uncompressed_bytes, &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); @@ -97,11 +101,10 @@ Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor, return Status::OK(); } -Status VWalWriter::append_block(vectorized::Block* input_block, int64_t num_rows, - int64_t filter_rows, vectorized::Block* block, +Status VWalWriter::append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder) { - return write_wal(block_convertor, tablet_finder, block, _state, num_rows, filter_rows); + return write_wal(block_convertor, tablet_finder, block, num_rows, filter_rows); } Status VWalWriter::close() { diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index d33f3f015a74ce..3e5686257ebb0a 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -56,6 +56,7 @@ #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" +#include "olap/wal_manager.h" #include "olap/wal_writer.h" #include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" @@ -82,16 +83,15 @@ namespace vectorized { class VWalWriter { public: - VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state, - TupleDescriptor* output_tuple_desc); + VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, + WalManager* wal_manager, std::vector& slot_desc, + int be_exe_version); ~VWalWriter(); Status init(); Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder, - vectorized::Block* block, RuntimeState* state, int64_t num_rows, - int64_t filtered_rows); - Status append_block(vectorized::Block* input_block, int64_t num_rows, int64_t filter_rows, - vectorized::Block* block, OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder); + vectorized::Block* block, int64_t num_rows, int64_t filtered_rows); + Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, + OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder); Status close(); private: @@ -100,8 +100,11 @@ class VWalWriter { int64_t _wal_id; uint32_t _version = 0; std::string _label; - RuntimeState* _state = nullptr; - TupleDescriptor* _output_tuple_desc = nullptr; +// RuntimeState* _state = nullptr; +// TupleDescriptor* _output_tuple_desc = nullptr; + WalManager* _wal_manager; + std::vector& _slot_descs; + int _be_exe_version; std::shared_ptr _wal_writer; }; } // namespace vectorized From f5d3dbf7867c26241f95eec7b1526f606775d9c2 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Sun, 10 Dec 2023 01:19:36 +0800 Subject: [PATCH 2/5] edit --- be/src/vec/sink/writer/vwal_writer.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index 3e5686257ebb0a..6c0ca864f9f072 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -100,8 +100,6 @@ class VWalWriter { int64_t _wal_id; uint32_t _version = 0; std::string _label; -// RuntimeState* _state = nullptr; -// TupleDescriptor* _output_tuple_desc = nullptr; WalManager* _wal_manager; std::vector& _slot_descs; int _be_exe_version; From 8676a7f8e476c739365dbb23043f0303984ef602 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Sun, 10 Dec 2023 14:37:54 +0800 Subject: [PATCH 3/5] add filter --- be/src/runtime/group_commit_mgr.cpp | 9 ++---- be/src/runtime/group_commit_mgr.h | 6 +--- be/src/vec/sink/group_commit_block_sink.cpp | 12 ++++++++ be/src/vec/sink/writer/vwal_writer.cpp | 34 ++++----------------- be/src/vec/sink/writer/vwal_writer.h | 6 ++-- 5 files changed, 24 insertions(+), 43 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 47a21653bbe4ab..16ef0e7b550579 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -86,7 +86,7 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo _all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed); _single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); //append block - RETURN_IF_ERROR(append_block(0, 0, block, nullptr, nullptr)); + RETURN_IF_ERROR(append_block(block)); } if (_block_queue.empty() && need_commit && _load_ids.empty()) { CHECK_EQ(_single_block_queue_bytes->load(), 0); @@ -465,11 +465,8 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, RETURN_IF_ERROR(_v_wal_writer->init()); return Status::OK(); } -Status LoadBlockQueue::append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, - vectorized::OlapTableBlockConvertor* block_convertor, - vectorized::OlapTabletFinder* tablet_finder) { - RETURN_IF_ERROR(_v_wal_writer->append_block(num_rows, filter_rows, block, block_convertor, - tablet_finder)); +Status LoadBlockQueue::append_block(vectorized::Block* block) { + RETURN_IF_ERROR(_v_wal_writer->append_block(block)); return Status::OK(); } Status LoadBlockQueue::close_wal() { diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 962d07bcb68cb6..d91c69240a8dba 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -25,8 +25,6 @@ #include "common/status.h" #include "util/threadpool.h" #include "vec/core/block.h" -#include "vec/sink/vtablet_block_convertor.h" -#include "vec/sink/vtablet_finder.h" #include "vec/sink/writer/vwal_writer.h" namespace doris { @@ -59,9 +57,7 @@ class LoadBlockQueue { Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version); - Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, - vectorized::OlapTableBlockConvertor* block_convertor, - vectorized::OlapTabletFinder* tablet_finder); + Status append_block(vectorized::Block* block); Status close_wal(); static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index be51bb5c18aaa6..53e0e6b2846f63 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -122,6 +122,18 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_ bool has_filtered_rows = false; RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( state, input_block, block, _output_vexpr_ctxs, rows, has_filtered_rows)); + if (_block_convertor->num_filtered_rows() > 0) { + auto cloneBlock = block->clone_without_columns(); + auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + for (int i = 0; i < rows; ++i) { + if (_block_convertor->filter_map()[i]) { + LOG(INFO) << "skip " << i; + continue; + } + res_block.add_row(block.get(), i); + } + block->swap(res_block.to_block()); + } // add block into block queue return _add_block(state, block); } diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index 7586b90b73fe08..2f23918be21b27 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -72,39 +72,17 @@ Status VWalWriter::init() { return Status::OK(); } -Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder, vectorized::Block* block, - int64_t num_rows, int64_t filtered_rows) { +Status VWalWriter::write_wal(vectorized::Block* block) { PBlock pblock; size_t uncompressed_bytes = 0, compressed_bytes = 0; - if (filtered_rows == 0) { - RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, - &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); - RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); - } else { - auto cloneBlock = block->clone_without_columns(); - auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); - for (int i = 0; i < num_rows; ++i) { - if (block_convertor->num_filtered_rows() > 0 && block_convertor->filter_map()[i]) { - continue; - } - if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) { - continue; - } - res_block.add_row(block, i); - } - RETURN_IF_ERROR(res_block.to_block().serialize(_be_exe_version, &pblock, - &uncompressed_bytes, &compressed_bytes, - segment_v2::CompressionTypePB::SNAPPY)); - RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); - } + RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, + &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); + RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); return Status::OK(); } -Status VWalWriter::append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, - OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder) { - return write_wal(block_convertor, tablet_finder, block, num_rows, filter_rows); +Status VWalWriter::append_block(vectorized::Block* block) { + return write_wal(block); } Status VWalWriter::close() { diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index 6c0ca864f9f072..2d663d8f5ed54a 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -88,10 +88,8 @@ class VWalWriter { int be_exe_version); ~VWalWriter(); Status init(); - Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder, - vectorized::Block* block, int64_t num_rows, int64_t filtered_rows); - Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block, - OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder); + Status write_wal(vectorized::Block* block); + Status append_block(vectorized::Block* block); Status close(); private: From d11374f1021a39b6c79cc51b49420fcccc3724eb Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Sun, 10 Dec 2023 17:13:40 +0800 Subject: [PATCH 4/5] edit --- be/src/runtime/group_commit_mgr.cpp | 22 +++++++-------------- be/src/runtime/group_commit_mgr.h | 1 - be/src/vec/sink/group_commit_block_sink.cpp | 14 ++++++++----- be/src/vec/sink/writer/vwal_writer.cpp | 5 ----- be/src/vec/sink/writer/vwal_writer.h | 3 +-- 5 files changed, 17 insertions(+), 28 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 16ef0e7b550579..0807d6c13206ec 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -42,6 +42,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) { } if (block->rows() > 0) { _block_queue.push_back(block); + //write wal + RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get())); _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); _single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } @@ -85,8 +87,6 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo _block_queue.pop_front(); _all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed); _single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); - //append block - RETURN_IF_ERROR(append_block(block)); } if (_block_queue.empty() && need_commit && _load_ids.empty()) { CHECK_EQ(_single_block_queue_bytes->load(), 0); @@ -249,20 +249,16 @@ Status GroupCommitTable::_create_group_commit_load( std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); _need_plan_fragment = false; - _cv.notify_all(); - } - if (_exec_env->wal_mgr()->is_running()) { _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id, WalManager::WAL_STATUS::PREPARE); //create wal RETURN_IF_ERROR( load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), params.desc_tbl.slotDescriptors, be_exe_version)); - st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, - pipeline_params); - } else { - st = Status::InternalError("be is stopping"); + _cv.notify_all(); } + st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, + pipeline_params); if (!st.ok()) { static_cast(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, st, true, nullptr)); @@ -462,13 +458,9 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, std::vector& slot_desc, int be_exe_version) { _v_wal_writer = std::make_shared( db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version); - RETURN_IF_ERROR(_v_wal_writer->init()); - return Status::OK(); -} -Status LoadBlockQueue::append_block(vectorized::Block* block) { - RETURN_IF_ERROR(_v_wal_writer->append_block(block)); - return Status::OK(); + return _v_wal_writer->init(); } + Status LoadBlockQueue::close_wal() { if (_v_wal_writer != nullptr) { RETURN_IF_ERROR(_v_wal_writer->close()); diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index d91c69240a8dba..be129d545731b0 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -57,7 +57,6 @@ class LoadBlockQueue { Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version); - Status append_block(vectorized::Block* block); Status close_wal(); static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 53e0e6b2846f63..f2adf0c0b38954 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -165,11 +165,15 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, load_id.__set_hi(_load_id.hi); load_id.__set_lo(_load_id.lo); if (_load_block_queue == nullptr) { - RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue, - state->be_exec_version())); - state->set_import_label(_load_block_queue->label); - state->set_wal_id(_load_block_queue->txn_id); + if (state->exec_env()->wal_mgr()->is_running()) { + RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( + _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue, + state->be_exec_version())); + state->set_import_label(_load_block_queue->label); + state->set_wal_id(_load_block_queue->txn_id); + } else { + return Status::InternalError("be is stopping"); + } } RETURN_IF_ERROR(_load_block_queue->add_block(output_block)); return Status::OK(); diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index 2f23918be21b27..d929207e9a9b3d 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -67,7 +67,6 @@ Status VWalWriter::init() { ss << std::to_string(slot_desc.col_unique_id) << ","; } std::string col_ids = ss.str().substr(0, ss.str().size() - 1); - LOG(INFO) << "col_ids:" << col_ids; RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids)); return Status::OK(); } @@ -81,10 +80,6 @@ Status VWalWriter::write_wal(vectorized::Block* block) { return Status::OK(); } -Status VWalWriter::append_block(vectorized::Block* block) { - return write_wal(block); -} - Status VWalWriter::close() { if (_wal_writer != nullptr) { RETURN_IF_ERROR(_wal_writer->finalize()); diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index 2d663d8f5ed54a..17c9dc979a1c47 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -89,7 +89,6 @@ class VWalWriter { ~VWalWriter(); Status init(); Status write_wal(vectorized::Block* block); - Status append_block(vectorized::Block* block); Status close(); private: @@ -100,7 +99,7 @@ class VWalWriter { std::string _label; WalManager* _wal_manager; std::vector& _slot_descs; - int _be_exe_version; + int _be_exe_version = 0; std::shared_ptr _wal_writer; }; } // namespace vectorized From a1c60b772d608e661190382d3bc72cf1ec06ac0a Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Sun, 10 Dec 2023 19:08:59 +0800 Subject: [PATCH 5/5] edit --- be/src/vec/sink/group_commit_block_sink.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index f2adf0c0b38954..010423502495c0 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -127,7 +127,6 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_ auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); for (int i = 0; i < rows; ++i) { if (_block_convertor->filter_map()[i]) { - LOG(INFO) << "skip " << i; continue; } res_block.add_row(block.get(), i);