diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index 4634916c60882b2..d0a547a8d6f9336 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#pragma once #include #include diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 7af04600cf42afa..0807d6c13206ec9 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -42,6 +42,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) { } if (block->rows() > 0) { _block_queue.push_back(block); + //write wal + RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get())); _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); _single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } @@ -130,8 +132,8 @@ void LoadBlockQueue::cancel(const Status& st) { Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr block, - std::shared_ptr& load_block_queue) { + std::shared_ptr block, std::shared_ptr& load_block_queue, + int be_exe_version) { DCHECK(table_id == _table_id); { std::unique_lock l(_lock); @@ -155,7 +157,8 @@ Status GroupCommitTable::get_first_block_load_queue( if (!_need_plan_fragment) { _need_plan_fragment = true; RETURN_IF_ERROR(_thread_pool->submit_func([&] { - [[maybe_unused]] auto st = _create_group_commit_load(load_block_queue); + [[maybe_unused]] auto st = + _create_group_commit_load(load_block_queue, be_exe_version); })); } _cv.wait_for(l, std::chrono::seconds(4)); @@ -175,7 +178,7 @@ Status GroupCommitTable::get_first_block_load_queue( } Status GroupCommitTable::_create_group_commit_load( - std::shared_ptr& load_block_queue) { + std::shared_ptr& load_block_queue, int be_exe_version) { Status st = Status::OK(); std::unique_ptr> finish_plan_func((int*)0x01, [&](int*) { if (!st.ok()) { @@ -246,16 +249,16 @@ Status GroupCommitTable::_create_group_commit_load( std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); _need_plan_fragment = false; - _cv.notify_all(); - } - if (_exec_env->wal_mgr()->is_running()) { _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id, WalManager::WAL_STATUS::PREPARE); - st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, - pipeline_params); - } else { - st = Status::InternalError("be is stopping"); + //create wal + RETURN_IF_ERROR( + load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), + params.desc_tbl.slotDescriptors, be_exe_version)); + _cv.notify_all(); } + st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, + pipeline_params); if (!st.ok()) { static_cast(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, st, true, nullptr)); @@ -310,6 +313,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ auto it = _load_block_queues.find(instance_id); if (it != _load_block_queues.end()) { auto& load_block_queue = it->second; + //close wal + RETURN_IF_ERROR(load_block_queue->close_wal()); if (prepare_failed || !status.ok()) { load_block_queue->cancel(status); } @@ -418,7 +423,8 @@ void GroupCommitMgr::stop() { Status GroupCommitMgr::get_first_block_load_queue( int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr block, - std::shared_ptr& load_block_queue) { + std::shared_ptr& load_block_queue, + int be_exe_version) { std::shared_ptr group_commit_table; { std::lock_guard wlock(_lock); @@ -430,7 +436,7 @@ Status GroupCommitMgr::get_first_block_load_queue( group_commit_table = _table_map[table_id]; } return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id, - block, load_block_queue); + block, load_block_queue, be_exe_version); } Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, @@ -447,4 +453,18 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i } return group_commit_table->get_load_block_queue(instance_id, load_block_queue); } +Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, + const std::string& import_label, WalManager* wal_manager, + std::vector& slot_desc, int be_exe_version) { + _v_wal_writer = std::make_shared( + db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version); + return _v_wal_writer->init(); +} + +Status LoadBlockQueue::close_wal() { + if (_v_wal_writer != nullptr) { + RETURN_IF_ERROR(_v_wal_writer->close()); + } + return Status::OK(); +} } // namespace doris diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index df35f6c83ebf8e5..be129d545731b09 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -25,6 +25,7 @@ #include "common/status.h" #include "util/threadpool.h" #include "vec/core/block.h" +#include "vec/sink/writer/vwal_writer.h" namespace doris { class ExecEnv; @@ -53,6 +54,10 @@ class LoadBlockQueue { Status add_load_id(const UniqueId& load_id); void remove_load_id(const UniqueId& load_id); void cancel(const Status& st); + Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, + WalManager* wal_manager, std::vector& slot_desc, + int be_exe_version); + Status close_wal(); static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; UniqueId load_instance_id; @@ -80,6 +85,7 @@ class LoadBlockQueue { std::shared_ptr _single_block_queue_bytes; // group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' int64_t _group_commit_interval_ms; + std::shared_ptr _v_wal_writer; }; class GroupCommitTable { @@ -94,12 +100,14 @@ class GroupCommitTable { Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr block, - std::shared_ptr& load_block_queue); + std::shared_ptr& load_block_queue, + int be_exe_version); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr& load_block_queue); private: - Status _create_group_commit_load(std::shared_ptr& load_block_queue); + Status _create_group_commit_load(std::shared_ptr& load_block_queue, + int be_exe_version); Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, int64_t txn_id, bool is_pipeline, const TExecPlanFragmentParams& params, @@ -134,7 +142,8 @@ class GroupCommitMgr { Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr block, - std::shared_ptr& load_block_queue); + std::shared_ptr& load_block_queue, + int be_exe_version); private: ExecEnv* _exec_env = nullptr; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 1fc4205d9c62694..010423502495c04 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -122,6 +122,17 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_ bool has_filtered_rows = false; RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( state, input_block, block, _output_vexpr_ctxs, rows, has_filtered_rows)); + if (_block_convertor->num_filtered_rows() > 0) { + auto cloneBlock = block->clone_without_columns(); + auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + for (int i = 0; i < rows; ++i) { + if (_block_convertor->filter_map()[i]) { + continue; + } + res_block.add_row(block.get(), i); + } + block->swap(res_block.to_block()); + } // add block into block queue return _add_block(state, block); } @@ -153,10 +164,15 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, load_id.__set_hi(_load_id.hi); load_id.__set_lo(_load_id.lo); if (_load_block_queue == nullptr) { - RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue)); - state->set_import_label(_load_block_queue->label); - state->set_wal_id(_load_block_queue->txn_id); + if (state->exec_env()->wal_mgr()->is_running()) { + RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( + _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue, + state->be_exec_version())); + state->set_import_label(_load_block_queue->label); + state->set_wal_id(_load_block_queue->txn_id); + } else { + return Status::InternalError("be is stopping"); + } } RETURN_IF_ERROR(_load_block_queue->add_block(output_block)); return Status::OK(); diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index c21a5db14371e4c..d929207e9a9b3d1 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -27,7 +27,6 @@ #include "common/compiler_util.h" #include "common/status.h" -#include "olap/wal_manager.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -43,67 +42,44 @@ namespace doris { namespace vectorized { -VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state, - TupleDescriptor* output_tuple_desc) +VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, + const std::string& import_label, WalManager* wal_manager, + std::vector& slot_desc, int be_exe_version) : _db_id(db_id), _tb_id(tb_id), _wal_id(wal_id), - _state(state), - _output_tuple_desc(output_tuple_desc) {} + _label(import_label), + _wal_manager(wal_manager), + _slot_descs(slot_desc), + _be_exe_version(be_exe_version) {} VWalWriter::~VWalWriter() {} Status VWalWriter::init() { - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id, - _state->import_label())); - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); - _state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id, - WalManager::WAL_STATUS::CREATE); + RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, _label)); + RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer)); + _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WAL_STATUS::CREATE); std::stringstream ss; - for (auto slot_desc : _output_tuple_desc->slots()) { - ss << std::to_string(slot_desc->col_unique_id()) << ","; + for (auto slot_desc : _slot_descs) { + if (slot_desc.col_unique_id < 0) { + continue; + } + ss << std::to_string(slot_desc.col_unique_id) << ","; } std::string col_ids = ss.str().substr(0, ss.str().size() - 1); RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids)); return Status::OK(); } -Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder, vectorized::Block* block, - RuntimeState* state, int64_t num_rows, int64_t filtered_rows) { +Status VWalWriter::write_wal(vectorized::Block* block) { PBlock pblock; size_t uncompressed_bytes = 0, compressed_bytes = 0; - if (filtered_rows == 0) { - RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, - &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); - RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); - } else { - auto cloneBlock = block->clone_without_columns(); - auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); - for (int i = 0; i < num_rows; ++i) { - if (block_convertor->num_filtered_rows() > 0 && block_convertor->filter_map()[i]) { - continue; - } - if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) { - continue; - } - res_block.add_row(block, i); - } - RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock, - &uncompressed_bytes, &compressed_bytes, - segment_v2::CompressionTypePB::SNAPPY)); - RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); - } + RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, + &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); + RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); return Status::OK(); } -Status VWalWriter::append_block(vectorized::Block* input_block, int64_t num_rows, - int64_t filter_rows, vectorized::Block* block, - OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder) { - return write_wal(block_convertor, tablet_finder, block, _state, num_rows, filter_rows); -} - Status VWalWriter::close() { if (_wal_writer != nullptr) { RETURN_IF_ERROR(_wal_writer->finalize()); diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index d33f3f015a74ceb..17c9dc979a1c47f 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -56,6 +56,7 @@ #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gutil/ref_counted.h" +#include "olap/wal_manager.h" #include "olap/wal_writer.h" #include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" @@ -82,16 +83,12 @@ namespace vectorized { class VWalWriter { public: - VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state, - TupleDescriptor* output_tuple_desc); + VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, + WalManager* wal_manager, std::vector& slot_desc, + int be_exe_version); ~VWalWriter(); Status init(); - Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder, - vectorized::Block* block, RuntimeState* state, int64_t num_rows, - int64_t filtered_rows); - Status append_block(vectorized::Block* input_block, int64_t num_rows, int64_t filter_rows, - vectorized::Block* block, OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder); + Status write_wal(vectorized::Block* block); Status close(); private: @@ -100,8 +97,9 @@ class VWalWriter { int64_t _wal_id; uint32_t _version = 0; std::string _label; - RuntimeState* _state = nullptr; - TupleDescriptor* _output_tuple_desc = nullptr; + WalManager* _wal_manager; + std::vector& _slot_descs; + int _be_exe_version = 0; std::shared_ptr _wal_writer; }; } // namespace vectorized