Skip to content

Commit

Permalink
add wal function
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb authored Dec 10, 2023
1 parent b448361 commit 87f6046
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 73 deletions.
1 change: 1 addition & 0 deletions be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#pragma once
#include <gen_cpp/PaloInternalService_types.h>

#include <condition_variable>
Expand Down
46 changes: 33 additions & 13 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block) {
}
if (block->rows() > 0) {
_block_queue.push_back(block);
//write wal
RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get()));
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
_single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
}
Expand Down Expand Up @@ -130,8 +132,8 @@ void LoadBlockQueue::cancel(const Status& st) {

Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version) {
DCHECK(table_id == _table_id);
{
std::unique_lock l(_lock);
Expand All @@ -155,7 +157,8 @@ Status GroupCommitTable::get_first_block_load_queue(
if (!_need_plan_fragment) {
_need_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
[[maybe_unused]] auto st = _create_group_commit_load(load_block_queue);
[[maybe_unused]] auto st =
_create_group_commit_load(load_block_queue, be_exe_version);
}));
}
_cv.wait_for(l, std::chrono::seconds(4));
Expand All @@ -175,7 +178,7 @@ Status GroupCommitTable::get_first_block_load_queue(
}

Status GroupCommitTable::_create_group_commit_load(
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) {
Status st = Status::OK();
std::unique_ptr<int, std::function<void(int*)>> finish_plan_func((int*)0x01, [&](int*) {
if (!st.ok()) {
Expand Down Expand Up @@ -246,16 +249,16 @@ Status GroupCommitTable::_create_group_commit_load(
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
_cv.notify_all();
}
if (_exec_env->wal_mgr()->is_running()) {
_exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
WalManager::WAL_STATUS::PREPARE);
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,
pipeline_params);
} else {
st = Status::InternalError("be is stopping");
//create wal
RETURN_IF_ERROR(
load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
params.desc_tbl.slotDescriptors, be_exe_version));
_cv.notify_all();
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,
pipeline_params);
if (!st.ok()) {
static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id,
st, true, nullptr));
Expand Down Expand Up @@ -310,6 +313,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
auto it = _load_block_queues.find(instance_id);
if (it != _load_block_queues.end()) {
auto& load_block_queue = it->second;
//close wal
RETURN_IF_ERROR(load_block_queue->close_wal());
if (prepare_failed || !status.ok()) {
load_block_queue->cancel(status);
}
Expand Down Expand Up @@ -418,7 +423,8 @@ void GroupCommitMgr::stop() {
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
Expand All @@ -430,7 +436,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
group_commit_table = _table_map[table_id];
}
return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id,
block, load_block_queue);
block, load_block_queue, be_exe_version);
}

Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
Expand All @@ -447,4 +453,18 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
}
return group_commit_table->get_load_block_queue(instance_id, load_block_queue);
}
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version);
return _v_wal_writer->init();
}

Status LoadBlockQueue::close_wal() {
if (_v_wal_writer != nullptr) {
RETURN_IF_ERROR(_v_wal_writer->close());
}
return Status::OK();
}
} // namespace doris
15 changes: 12 additions & 3 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/status.h"
#include "util/threadpool.h"
#include "vec/core/block.h"
#include "vec/sink/writer/vwal_writer.h"

namespace doris {
class ExecEnv;
Expand Down Expand Up @@ -53,6 +54,10 @@ class LoadBlockQueue {
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc,
int be_exe_version);
Status close_wal();

static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
UniqueId load_instance_id;
Expand Down Expand Up @@ -80,6 +85,7 @@ class LoadBlockQueue {
std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
// group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
};

class GroupCommitTable {
Expand All @@ -94,12 +100,14 @@ class GroupCommitTable {
Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);

private:
Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label,
int64_t txn_id, bool is_pipeline,
const TExecPlanFragmentParams& params,
Expand Down Expand Up @@ -134,7 +142,8 @@ class GroupCommitMgr {
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version);

private:
ExecEnv* _exec_env = nullptr;
Expand Down
24 changes: 20 additions & 4 deletions be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_
bool has_filtered_rows = false;
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
state, input_block, block, _output_vexpr_ctxs, rows, has_filtered_rows));
if (_block_convertor->num_filtered_rows() > 0) {
auto cloneBlock = block->clone_without_columns();
auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < rows; ++i) {
if (_block_convertor->filter_map()[i]) {
continue;
}
res_block.add_row(block.get(), i);
}
block->swap(res_block.to_block());
}
// add block into block queue
return _add_block(state, block);
}
Expand Down Expand Up @@ -153,10 +164,15 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
load_id.__set_hi(_load_id.hi);
load_id.__set_lo(_load_id.lo);
if (_load_block_queue == nullptr) {
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
_db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue));
state->set_import_label(_load_block_queue->label);
state->set_wal_id(_load_block_queue->txn_id);
if (state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
_db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue,
state->be_exec_version()));
state->set_import_label(_load_block_queue->label);
state->set_wal_id(_load_block_queue->txn_id);
} else {
return Status::InternalError("be is stopping");
}
}
RETURN_IF_ERROR(_load_block_queue->add_block(output_block));
return Status::OK();
Expand Down
62 changes: 19 additions & 43 deletions be/src/vec/sink/writer/vwal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include "common/compiler_util.h"
#include "common/status.h"
#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
Expand All @@ -43,67 +42,44 @@
namespace doris {
namespace vectorized {

VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state,
TupleDescriptor* output_tuple_desc)
VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version)
: _db_id(db_id),
_tb_id(tb_id),
_wal_id(wal_id),
_state(state),
_output_tuple_desc(output_tuple_desc) {}
_label(import_label),
_wal_manager(wal_manager),
_slot_descs(slot_desc),
_be_exe_version(be_exe_version) {}

VWalWriter::~VWalWriter() {}

Status VWalWriter::init() {
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id,
_state->import_label()));
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer));
_state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id,
WalManager::WAL_STATUS::CREATE);
RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, _label));
RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
_wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WAL_STATUS::CREATE);
std::stringstream ss;
for (auto slot_desc : _output_tuple_desc->slots()) {
ss << std::to_string(slot_desc->col_unique_id()) << ",";
for (auto slot_desc : _slot_descs) {
if (slot_desc.col_unique_id < 0) {
continue;
}
ss << std::to_string(slot_desc.col_unique_id) << ",";
}
std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids));
return Status::OK();
}

Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder, vectorized::Block* block,
RuntimeState* state, int64_t num_rows, int64_t filtered_rows) {
Status VWalWriter::write_wal(vectorized::Block* block) {
PBlock pblock;
size_t uncompressed_bytes = 0, compressed_bytes = 0;
if (filtered_rows == 0) {
RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes,
&compressed_bytes, segment_v2::CompressionTypePB::SNAPPY));
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock}));
} else {
auto cloneBlock = block->clone_without_columns();
auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < num_rows; ++i) {
if (block_convertor->num_filtered_rows() > 0 && block_convertor->filter_map()[i]) {
continue;
}
if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) {
continue;
}
res_block.add_row(block, i);
}
RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock,
&uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::SNAPPY));
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock}));
}
RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes,
&compressed_bytes, segment_v2::CompressionTypePB::SNAPPY));
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock}));
return Status::OK();
}

Status VWalWriter::append_block(vectorized::Block* input_block, int64_t num_rows,
int64_t filter_rows, vectorized::Block* block,
OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder) {
return write_wal(block_convertor, tablet_finder, block, _state, num_rows, filter_rows);
}

Status VWalWriter::close() {
if (_wal_writer != nullptr) {
RETURN_IF_ERROR(_wal_writer->finalize());
Expand Down
18 changes: 8 additions & 10 deletions be/src/vec/sink/writer/vwal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "olap/wal_manager.h"
#include "olap/wal_writer.h"
#include "runtime/decimalv2_value.h"
#include "runtime/exec_env.h"
Expand All @@ -82,16 +83,12 @@ namespace vectorized {

class VWalWriter {
public:
VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state,
TupleDescriptor* output_tuple_desc);
VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc,
int be_exe_version);
~VWalWriter();
Status init();
Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder,
vectorized::Block* block, RuntimeState* state, int64_t num_rows,
int64_t filtered_rows);
Status append_block(vectorized::Block* input_block, int64_t num_rows, int64_t filter_rows,
vectorized::Block* block, OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder);
Status write_wal(vectorized::Block* block);
Status close();

private:
Expand All @@ -100,8 +97,9 @@ class VWalWriter {
int64_t _wal_id;
uint32_t _version = 0;
std::string _label;
RuntimeState* _state = nullptr;
TupleDescriptor* _output_tuple_desc = nullptr;
WalManager* _wal_manager;
std::vector<TSlotDescriptor>& _slot_descs;
int _be_exe_version = 0;
std::shared_ptr<WalWriter> _wal_writer;
};
} // namespace vectorized
Expand Down

0 comments on commit 87f6046

Please sign in to comment.