Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add wal function #6

Merged
merged 5 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#pragma once
#include <gen_cpp/PaloInternalService_types.h>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'gen_cpp/PaloInternalService_types.h' file not found [clang-diagnostic-error]

#include <gen_cpp/PaloInternalService_types.h>
         ^


#include <condition_variable>
Expand Down
43 changes: 37 additions & 6 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
_block_queue.pop_front();
_all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed);
_single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed);
//append block
RETURN_IF_ERROR(append_block(0, 0, block, nullptr, nullptr));
}
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
CHECK_EQ(_single_block_queue_bytes->load(), 0);
Expand Down Expand Up @@ -130,8 +132,8 @@ void LoadBlockQueue::cancel(const Status& st) {

Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<vectorized::Block> block, std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version) {
DCHECK(table_id == _table_id);
{
std::unique_lock l(_lock);
Expand All @@ -155,7 +157,8 @@ Status GroupCommitTable::get_first_block_load_queue(
if (!_need_plan_fragment) {
_need_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
[[maybe_unused]] auto st = _create_group_commit_load(load_block_queue);
[[maybe_unused]] auto st =
_create_group_commit_load(load_block_queue, be_exe_version);
}));
}
_cv.wait_for(l, std::chrono::seconds(4));
Expand All @@ -175,7 +178,7 @@ Status GroupCommitTable::get_first_block_load_queue(
}

Status GroupCommitTable::_create_group_commit_load(
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) {
Status st = Status::OK();
std::unique_ptr<int, std::function<void(int*)>> finish_plan_func((int*)0x01, [&](int*) {
if (!st.ok()) {
Expand Down Expand Up @@ -251,6 +254,10 @@ Status GroupCommitTable::_create_group_commit_load(
if (_exec_env->wal_mgr()->is_running()) {
_exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
WalManager::WAL_STATUS::PREPARE);
//create wal
RETURN_IF_ERROR(
load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
params.desc_tbl.slotDescriptors, be_exe_version));
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,
pipeline_params);
} else {
Expand Down Expand Up @@ -310,6 +317,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
auto it = _load_block_queues.find(instance_id);
if (it != _load_block_queues.end()) {
auto& load_block_queue = it->second;
//close wal
RETURN_IF_ERROR(load_block_queue->close_wal());
if (prepare_failed || !status.ok()) {
load_block_queue->cancel(status);
}
Expand Down Expand Up @@ -418,7 +427,8 @@ void GroupCommitMgr::stop() {
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
Expand All @@ -430,7 +440,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
group_commit_table = _table_map[table_id];
}
return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id,
block, load_block_queue);
block, load_block_queue, be_exe_version);
}

Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
Expand All @@ -447,4 +457,25 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
}
return group_commit_table->get_load_block_queue(instance_id, load_block_queue);
}
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'create_wal' can be made static [readability-convert-member-functions-to-static]

Suggested change
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
static Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,

const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
db_id, tb_id, txn_id, label, wal_manager, slot_desc, be_exe_version);
RETURN_IF_ERROR(_v_wal_writer->init());
return Status::OK();
hust-hhb marked this conversation as resolved.
Show resolved Hide resolved
}
Status LoadBlockQueue::append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'append_block' can be made static [readability-convert-member-functions-to-static]

be/src/runtime/group_commit_mgr.h:61:

-     Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block,
+     static Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block,

vectorized::OlapTableBlockConvertor* block_convertor,
vectorized::OlapTabletFinder* tablet_finder) {
RETURN_IF_ERROR(_v_wal_writer->append_block(num_rows, filter_rows, block, block_convertor,
tablet_finder));
return Status::OK();
}
Status LoadBlockQueue::close_wal() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'close_wal' can be made static [readability-convert-member-functions-to-static]

be/src/runtime/group_commit_mgr.h:64:

-     Status close_wal();
+     static Status close_wal();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'close_wal' can be made static [readability-convert-member-functions-to-static]

be/src/runtime/group_commit_mgr.h:60:

-     Status close_wal();
+     static Status close_wal();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'close_wal' can be made static [readability-convert-member-functions-to-static]

be/src/runtime/group_commit_mgr.h:59:

-     Status close_wal();
+     static Status close_wal();

if (_v_wal_writer != nullptr) {
RETURN_IF_ERROR(_v_wal_writer->close());
}
return Status::OK();
}
} // namespace doris
20 changes: 17 additions & 3 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include "common/status.h"
#include "util/threadpool.h"
#include "vec/core/block.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
#include "vec/sink/writer/vwal_writer.h"

namespace doris {
class ExecEnv;
Expand Down Expand Up @@ -53,6 +56,13 @@ class LoadBlockQueue {
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc,
int be_exe_version);
Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block,
vectorized::OlapTableBlockConvertor* block_convertor,
vectorized::OlapTabletFinder* tablet_finder);
Status close_wal();

static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
UniqueId load_instance_id;
Expand Down Expand Up @@ -80,6 +90,7 @@ class LoadBlockQueue {
std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
// group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
};

class GroupCommitTable {
Expand All @@ -94,12 +105,14 @@ class GroupCommitTable {
Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);

private:
Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label,
int64_t txn_id, bool is_pipeline,
const TExecPlanFragmentParams& params,
Expand Down Expand Up @@ -134,7 +147,8 @@ class GroupCommitMgr {
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version);

private:
ExecEnv* _exec_env = nullptr;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
load_id.__set_lo(_load_id.lo);
if (_load_block_queue == nullptr) {
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
_db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue));
_db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue,
state->be_exec_version()));
state->set_import_label(_load_block_queue->label);
state->set_wal_id(_load_block_queue->txn_id);
}
Expand Down
39 changes: 21 additions & 18 deletions be/src/vec/sink/writer/vwal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include "common/compiler_util.h"
#include "common/status.h"
#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
Expand All @@ -43,38 +42,43 @@
namespace doris {
namespace vectorized {

VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state,
TupleDescriptor* output_tuple_desc)
VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version)
: _db_id(db_id),
_tb_id(tb_id),
_wal_id(wal_id),
_state(state),
_output_tuple_desc(output_tuple_desc) {}
_label(import_label),
_wal_manager(wal_manager),
_slot_descs(slot_desc),
_be_exe_version(be_exe_version) {}

VWalWriter::~VWalWriter() {}

Status VWalWriter::init() {
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id,
_state->import_label()));
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer));
_state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id,
WalManager::WAL_STATUS::CREATE);
RETURN_IF_ERROR(_wal_manager->add_wal_path(_db_id, _tb_id, _wal_id, _label));
RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
_wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WAL_STATUS::CREATE);
std::stringstream ss;
for (auto slot_desc : _output_tuple_desc->slots()) {
ss << std::to_string(slot_desc->col_unique_id()) << ",";
for (auto slot_desc : _slot_descs) {
if (slot_desc.col_unique_id < 0) {
continue;
}
ss << std::to_string(slot_desc.col_unique_id) << ",";
}
std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
LOG(INFO) << "col_ids:" << col_ids;
hust-hhb marked this conversation as resolved.
Show resolved Hide resolved
RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids));
return Status::OK();
}

Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder, vectorized::Block* block,
RuntimeState* state, int64_t num_rows, int64_t filtered_rows) {
int64_t num_rows, int64_t filtered_rows) {
PBlock pblock;
size_t uncompressed_bytes = 0, compressed_bytes = 0;
if (filtered_rows == 0) {
RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes,
RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes,
&compressed_bytes, segment_v2::CompressionTypePB::SNAPPY));
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock}));
} else {
Expand All @@ -89,19 +93,18 @@ Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor,
}
res_block.add_row(block, i);
}
RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock,
RETURN_IF_ERROR(res_block.to_block().serialize(_be_exe_version, &pblock,
&uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::SNAPPY));
RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock}));
}
return Status::OK();
}

Status VWalWriter::append_block(vectorized::Block* input_block, int64_t num_rows,
int64_t filter_rows, vectorized::Block* block,
Status VWalWriter::append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block,
OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder) {
return write_wal(block_convertor, tablet_finder, block, _state, num_rows, filter_rows);
return write_wal(block_convertor, tablet_finder, block, num_rows, filter_rows);
}

Status VWalWriter::close() {
Expand Down
19 changes: 10 additions & 9 deletions be/src/vec/sink/writer/vwal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "olap/wal_manager.h"
#include "olap/wal_writer.h"
#include "runtime/decimalv2_value.h"
#include "runtime/exec_env.h"
Expand All @@ -82,16 +83,15 @@ namespace vectorized {

class VWalWriter {
public:
VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state,
TupleDescriptor* output_tuple_desc);
VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc,
int be_exe_version);
~VWalWriter();
Status init();
Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder,
vectorized::Block* block, RuntimeState* state, int64_t num_rows,
int64_t filtered_rows);
Status append_block(vectorized::Block* input_block, int64_t num_rows, int64_t filter_rows,
vectorized::Block* block, OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder);
vectorized::Block* block, int64_t num_rows, int64_t filtered_rows);
Status append_block(int64_t num_rows, int64_t filter_rows, vectorized::Block* block,
OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder);
Status close();

private:
Expand All @@ -100,8 +100,9 @@ class VWalWriter {
int64_t _wal_id;
uint32_t _version = 0;
std::string _label;
RuntimeState* _state = nullptr;
TupleDescriptor* _output_tuple_desc = nullptr;
WalManager* _wal_manager;
std::vector<TSlotDescriptor>& _slot_descs;
int _be_exe_version;
std::shared_ptr<WalWriter> _wal_writer;
};
} // namespace vectorized
Expand Down
Loading