-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add wal function #6
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -42,6 +42,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block) { | |||||
} | ||||||
if (block->rows() > 0) { | ||||||
_block_queue.push_back(block); | ||||||
//write wal | ||||||
RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get())); | ||||||
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); | ||||||
_single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); | ||||||
} | ||||||
|
@@ -130,8 +132,8 @@ void LoadBlockQueue::cancel(const Status& st) { | |||||
|
||||||
Status GroupCommitTable::get_first_block_load_queue( | ||||||
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, | ||||||
std::shared_ptr<vectorized::Block> block, | ||||||
std::shared_ptr<LoadBlockQueue>& load_block_queue) { | ||||||
std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue, | ||||||
int be_exe_version) { | ||||||
DCHECK(table_id == _table_id); | ||||||
{ | ||||||
std::unique_lock l(_lock); | ||||||
|
@@ -155,7 +157,8 @@ Status GroupCommitTable::get_first_block_load_queue( | |||||
if (!_need_plan_fragment) { | ||||||
_need_plan_fragment = true; | ||||||
RETURN_IF_ERROR(_thread_pool->submit_func([&] { | ||||||
[[maybe_unused]] auto st = _create_group_commit_load(load_block_queue); | ||||||
[[maybe_unused]] auto st = | ||||||
_create_group_commit_load(load_block_queue, be_exe_version); | ||||||
})); | ||||||
} | ||||||
_cv.wait_for(l, std::chrono::seconds(4)); | ||||||
|
@@ -175,7 +178,7 @@ Status GroupCommitTable::get_first_block_load_queue( | |||||
} | ||||||
|
||||||
Status GroupCommitTable::_create_group_commit_load( | ||||||
std::shared_ptr<LoadBlockQueue>& load_block_queue) { | ||||||
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) { | ||||||
Status st = Status::OK(); | ||||||
std::unique_ptr<int, std::function<void(int*)>> finish_plan_func((int*)0x01, [&](int*) { | ||||||
if (!st.ok()) { | ||||||
|
@@ -246,16 +249,16 @@ Status GroupCommitTable::_create_group_commit_load( | |||||
std::unique_lock l(_lock); | ||||||
_load_block_queues.emplace(instance_id, load_block_queue); | ||||||
_need_plan_fragment = false; | ||||||
_cv.notify_all(); | ||||||
} | ||||||
if (_exec_env->wal_mgr()->is_running()) { | ||||||
_exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id, | ||||||
WalManager::WAL_STATUS::PREPARE); | ||||||
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, | ||||||
pipeline_params); | ||||||
} else { | ||||||
st = Status::InternalError("be is stopping"); | ||||||
//create wal | ||||||
RETURN_IF_ERROR( | ||||||
load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), | ||||||
params.desc_tbl.slotDescriptors, be_exe_version)); | ||||||
_cv.notify_all(); | ||||||
} | ||||||
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, | ||||||
pipeline_params); | ||||||
if (!st.ok()) { | ||||||
static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, | ||||||
st, true, nullptr)); | ||||||
|
@@ -310,6 +313,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ | |||||
auto it = _load_block_queues.find(instance_id); | ||||||
if (it != _load_block_queues.end()) { | ||||||
auto& load_block_queue = it->second; | ||||||
//close wal | ||||||
RETURN_IF_ERROR(load_block_queue->close_wal()); | ||||||
if (prepare_failed || !status.ok()) { | ||||||
load_block_queue->cancel(status); | ||||||
} | ||||||
|
@@ -418,7 +423,8 @@ void GroupCommitMgr::stop() { | |||||
Status GroupCommitMgr::get_first_block_load_queue( | ||||||
int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, | ||||||
std::shared_ptr<vectorized::Block> block, | ||||||
std::shared_ptr<LoadBlockQueue>& load_block_queue) { | ||||||
std::shared_ptr<LoadBlockQueue>& load_block_queue, | ||||||
int be_exe_version) { | ||||||
std::shared_ptr<GroupCommitTable> group_commit_table; | ||||||
{ | ||||||
std::lock_guard wlock(_lock); | ||||||
|
@@ -430,7 +436,7 @@ Status GroupCommitMgr::get_first_block_load_queue( | |||||
group_commit_table = _table_map[table_id]; | ||||||
} | ||||||
return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id, | ||||||
block, load_block_queue); | ||||||
block, load_block_queue, be_exe_version); | ||||||
} | ||||||
|
||||||
Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, | ||||||
|
@@ -447,4 +453,18 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i | |||||
} | ||||||
return group_commit_table->get_load_block_queue(instance_id, load_block_queue); | ||||||
} | ||||||
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'create_wal' can be made static [readability-convert-member-functions-to-static]
Suggested change
|
||||||
const std::string& import_label, WalManager* wal_manager, | ||||||
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) { | ||||||
_v_wal_writer = std::make_shared<vectorized::VWalWriter>( | ||||||
db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version); | ||||||
return _v_wal_writer->init(); | ||||||
} | ||||||
|
||||||
Status LoadBlockQueue::close_wal() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'close_wal' can be made static [readability-convert-member-functions-to-static] be/src/runtime/group_commit_mgr.h:64: - Status close_wal();
+ static Status close_wal(); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'close_wal' can be made static [readability-convert-member-functions-to-static] be/src/runtime/group_commit_mgr.h:60: - Status close_wal();
+ static Status close_wal(); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'close_wal' can be made static [readability-convert-member-functions-to-static] be/src/runtime/group_commit_mgr.h:59: - Status close_wal();
+ static Status close_wal(); |
||||||
if (_v_wal_writer != nullptr) { | ||||||
RETURN_IF_ERROR(_v_wal_writer->close()); | ||||||
} | ||||||
return Status::OK(); | ||||||
} | ||||||
} // namespace doris |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,6 @@ | |
|
||
#include "common/compiler_util.h" | ||
#include "common/status.h" | ||
#include "olap/wal_manager.h" | ||
#include "runtime/client_cache.h" | ||
#include "runtime/descriptors.h" | ||
#include "runtime/runtime_state.h" | ||
|
@@ -43,67 +42,44 @@ | |
namespace doris { | ||
namespace vectorized { | ||
|
||
VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state, | ||
TupleDescriptor* output_tuple_desc) | ||
VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, | ||
const std::string& import_label, WalManager* wal_manager, | ||
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) | ||
: _db_id(db_id), | ||
_tb_id(tb_id), | ||
_wal_id(wal_id), | ||
_state(state), | ||
_output_tuple_desc(output_tuple_desc) {} | ||
_label(import_label), | ||
_wal_manager(wal_manager), | ||
_slot_descs(slot_desc), | ||
_be_exe_version(be_exe_version) {} | ||
|
||
VWalWriter::~VWalWriter() {} | ||
|
||
Status VWalWriter::init() { | ||
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id, | ||
_state->import_label())); | ||
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); | ||
_state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id, | ||
WalManager::WAL_STATUS::CREATE); | ||
RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, _label)); | ||
RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer)); | ||
_wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WAL_STATUS::CREATE); | ||
std::stringstream ss; | ||
for (auto slot_desc : _output_tuple_desc->slots()) { | ||
ss << std::to_string(slot_desc->col_unique_id()) << ","; | ||
for (auto slot_desc : _slot_descs) { | ||
if (slot_desc.col_unique_id < 0) { | ||
continue; | ||
} | ||
ss << std::to_string(slot_desc.col_unique_id) << ","; | ||
} | ||
std::string col_ids = ss.str().substr(0, ss.str().size() - 1); | ||
RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids)); | ||
return Status::OK(); | ||
} | ||
|
||
Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor, | ||
OlapTabletFinder* tablet_finder, vectorized::Block* block, | ||
RuntimeState* state, int64_t num_rows, int64_t filtered_rows) { | ||
Status VWalWriter::write_wal(vectorized::Block* block) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'write_wal' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/writer/vwal_writer.h:90: - Status write_wal(vectorized::Block* block);
+ static Status write_wal(vectorized::Block* block); |
||
PBlock pblock; | ||
size_t uncompressed_bytes = 0, compressed_bytes = 0; | ||
if (filtered_rows == 0) { | ||
RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, | ||
&compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); | ||
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock})); | ||
} else { | ||
auto cloneBlock = block->clone_without_columns(); | ||
auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); | ||
for (int i = 0; i < num_rows; ++i) { | ||
if (block_convertor->num_filtered_rows() > 0 && block_convertor->filter_map()[i]) { | ||
continue; | ||
} | ||
if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) { | ||
continue; | ||
} | ||
res_block.add_row(block, i); | ||
} | ||
RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock, | ||
&uncompressed_bytes, &compressed_bytes, | ||
segment_v2::CompressionTypePB::SNAPPY)); | ||
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock})); | ||
} | ||
RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, | ||
&compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); | ||
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock})); | ||
return Status::OK(); | ||
} | ||
|
||
Status VWalWriter::append_block(vectorized::Block* input_block, int64_t num_rows, | ||
int64_t filter_rows, vectorized::Block* block, | ||
OlapTableBlockConvertor* block_convertor, | ||
OlapTabletFinder* tablet_finder) { | ||
return write_wal(block_convertor, tablet_finder, block, _state, num_rows, filter_rows); | ||
} | ||
|
||
Status VWalWriter::close() { | ||
if (_wal_writer != nullptr) { | ||
RETURN_IF_ERROR(_wal_writer->finalize()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: 'gen_cpp/PaloInternalService_types.h' file not found [clang-diagnostic-error]