Skip to content

Commit

Permalink
[refactor](group commit) remove future block
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Dec 8, 2023
1 parent 16230b5 commit b448361
Show file tree
Hide file tree
Showing 35 changed files with 75 additions and 352 deletions.
24 changes: 6 additions & 18 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,17 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_ERROR_IF_NON_VEC;
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
case TDataSinkType::OLAP_TABLE_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs));
}
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, true));
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
Expand Down Expand Up @@ -298,13 +292,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_ERROR_IF_NON_VEC;
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
case TDataSinkType::OLAP_TABLE_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs));
}
break;
}
Expand All @@ -316,13 +311,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
sink->reset(new vectorized::MultiCastDataStreamSink(multi_cast_data_streamer));
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, true));
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Status OlapTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& in
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
RETURN_IF_ERROR(_writer->init_properties(p._pool));
return Status::OK();
}

Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/olap_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ class OlapTableSinkOperatorX final : public DataSinkOperatorX<OlapTableSinkLocal
public:
using Base = DataSinkOperatorX<OlapTableSinkLocalState>;
OlapTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr, bool group_commit)
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_group_commit(group_commit),
_pool(pool) {};

Status init(const TDataSink& thrift_sink) override {
Expand Down Expand Up @@ -107,7 +106,6 @@ class OlapTableSinkOperatorX final : public DataSinkOperatorX<OlapTableSinkLocal
const RowDescriptor& _row_desc;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
const std::vector<TExpr>& _t_output_expr;
const bool _group_commit;
ObjectPool* _pool = nullptr;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Status OlapTableSinkV2LocalState::init(RuntimeState* state, LocalSinkStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
RETURN_IF_ERROR(_writer->init_properties(p._pool));
return Status::OK();
}

Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/olap_table_sink_v2_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ class OlapTableSinkV2OperatorX final : public DataSinkOperatorX<OlapTableSinkV2L
public:
using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>;
OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr, bool group_commit)
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_group_commit(group_commit),
_pool(pool) {};

Status init(const TDataSink& thrift_sink) override {
Expand Down Expand Up @@ -109,7 +108,6 @@ class OlapTableSinkV2OperatorX final : public DataSinkOperatorX<OlapTableSinkV2L
const RowDescriptor& _row_desc;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
const std::vector<TExpr>& _t_output_expr;
const bool _group_commit;
ObjectPool* _pool = nullptr;
};

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id,
int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb, bool group_commit)
const report_status_callback& report_status_cb)
: _query_id(query_id),
_fragment_instance_id(instance_id),
_fragment_id(fragment_id),
Expand All @@ -126,7 +126,6 @@ PipelineFragmentContext::PipelineFragmentContext(
_call_back(call_back),
_is_report_on_cancel(true),
_report_status_cb(report_status_cb),
_group_commit(group_commit),
_create_time(MonotonicNanos()) {
if (_query_ctx->get_task_group()) {
_task_group_entity = _query_ctx->get_task_group()->task_entity();
Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
const int fragment_id, int backend_num,
std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb,
bool group_commit = false);
const report_status_callback& report_status_cb);

virtual ~PipelineFragmentContext();

Expand Down Expand Up @@ -133,8 +132,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
return _task_group_entity;
}
void trigger_report_if_necessary();

bool is_group_commit() { return _group_commit; }
virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(1);
ins_ids[0] = _fragment_instance_id;
Expand Down Expand Up @@ -235,7 +232,6 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
return nullptr;
}
std::vector<std::unique_ptr<PipelineTask>> _tasks;
bool _group_commit;

uint64_t _create_time;

Expand Down
21 changes: 2 additions & 19 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "task_queue.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
#include "vec/core/future_block.h"

namespace doris {
class RuntimeState;
Expand Down Expand Up @@ -167,8 +166,7 @@ Status PipelineTask::prepare(RuntimeState* state) {
fmt::format_to(operator_ids_str, "]");
_task_profile->add_info_string("OperatorIds(source2root)", fmt::to_string(operator_ids_str));

_block = _fragment_context->is_group_commit() ? doris::vectorized::FutureBlock::create_unique()
: doris::vectorized::Block::create_unique();
_block = doris::vectorized::Block::create_unique();

// We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters).
set_state(PipelineTaskState::RUNNABLE);
Expand Down Expand Up @@ -257,16 +255,6 @@ Status PipelineTask::execute(bool* eos) {
}

auto status = Status::OK();
auto handle_group_commit = [&]() {
if (UNLIKELY(_fragment_context->is_group_commit() && !status.ok() && _block != nullptr)) {
auto* future_block = dynamic_cast<vectorized::FutureBlock*>(_block.get());
std::unique_lock<std::mutex> l(*(future_block->lock));
if (!future_block->is_handled()) {
future_block->set_result(status, 0, 0);
future_block->cv->notify_all();
}
}
};

this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
Expand All @@ -291,11 +279,7 @@ Status PipelineTask::execute(bool* eos) {
{
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
status = _root->get_block(_state, block, _data_state);
if (UNLIKELY(!status.ok())) {
handle_group_commit();
return status;
}
RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
}
*eos = _data_state == SourceState::FINISHED;

Expand All @@ -306,7 +290,6 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_collect_query_statistics());
}
status = _sink->sink(_state, block, _data_state);
handle_group_commit();
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ namespace doris::pipeline {
PipelineXFragmentContext::PipelineXFragmentContext(
const TUniqueId& query_id, const int fragment_id, std::shared_ptr<QueryContext> query_ctx,
ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb, bool group_commit)
const report_status_callback& report_status_cb)
: PipelineFragmentContext(query_id, TUniqueId(), fragment_id, -1, query_ctx, exec_env,
call_back, report_status_cb, group_commit) {}
call_back, report_status_cb) {}

PipelineXFragmentContext::~PipelineXFragmentContext() {
auto st = _query_ctx->exec_status();
Expand Down Expand Up @@ -340,10 +340,10 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
_sink.reset(new OlapTableSinkV2OperatorX(pool, next_sink_operator_id(), row_desc,
output_exprs, false));
output_exprs));
} else {
_sink.reset(new OlapTableSinkOperatorX(pool, next_sink_operator_id(), row_desc,
output_exprs, false));
output_exprs));
}
break;
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
PipelineXFragmentContext(const TUniqueId& query_id, const int fragment_id,
std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb,
bool group_commit = false);
const report_status_callback& report_status_cb);

~PipelineXFragmentContext() override;

Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb,
std::bind<Status>(
std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), this,
std::placeholders::_1, std::placeholders::_2),
params.group_commit);
std::placeholders::_1, std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
Expand Down
25 changes: 11 additions & 14 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@

namespace doris {

Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block) {
DCHECK(block->get_schema_version() == schema_version);
Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block) {
std::unique_lock l(mutex);
RETURN_IF_ERROR(_status);
while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
Expand Down Expand Up @@ -80,9 +79,8 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
}
if (!_block_queue.empty()) {
auto& future_block = _block_queue.front();
auto* fblock = static_cast<vectorized::FutureBlock*>(block);
fblock->swap_future_block(future_block);
auto fblock = _block_queue.front();
block->swap(*fblock.get());
*find_block = true;
_block_queue.pop_front();
_all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed);
Expand Down Expand Up @@ -123,29 +121,26 @@ void LoadBlockQueue::cancel(const Status& st) {
while (!_block_queue.empty()) {
{
auto& future_block = _block_queue.front();
std::unique_lock<std::mutex> l0(*(future_block->lock));
future_block->set_result(st, future_block->rows(), 0);
_all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed);
_single_block_queue_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed);
future_block->cv->notify_all();
}
_block_queue.pop_front();
}
}

Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
DCHECK(table_id == _table_id);
auto base_schema_version = block->get_schema_version();
{
std::unique_lock l(_lock);
for (int i = 0; i < 3; i++) {
bool is_schema_version_match = true;
for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) {
if (!it->second->need_commit) {
if (base_schema_version == it->second->schema_version) {
if (it->second->add_load_id(block->get_load_id()).ok()) {
if (it->second->add_load_id(load_id).ok()) {
load_block_queue = it->second;
return Status::OK();
}
Expand All @@ -166,7 +161,7 @@ Status GroupCommitTable::get_first_block_load_queue(
_cv.wait_for(l, std::chrono::seconds(4));
if (load_block_queue != nullptr) {
if (load_block_queue->schema_version == base_schema_version) {
if (load_block_queue->add_load_id(block->get_load_id()).ok()) {
if (load_block_queue->add_load_id(load_id).ok()) {
return Status::OK();
}
} else if (base_schema_version < load_block_queue->schema_version) {
Expand Down Expand Up @@ -421,7 +416,8 @@ void GroupCommitMgr::stop() {
}

Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
Expand All @@ -433,7 +429,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
group_commit_table = _table_map[table_id];
}
return group_commit_table->get_first_block_load_queue(table_id, block, load_block_queue);
return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id,
block, load_block_queue);
}

Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
Expand Down
15 changes: 8 additions & 7 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "common/status.h"
#include "util/threadpool.h"
#include "vec/core/block.h"
#include "vec/core/future_block.h"

namespace doris {
class ExecEnv;
Expand All @@ -49,7 +48,7 @@ class LoadBlockQueue {
_single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
};

Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
Status add_block(std::shared_ptr<vectorized::Block> block);
Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
Expand All @@ -72,7 +71,7 @@ class LoadBlockQueue {
std::condition_variable _get_cond;
// the set of load ids of all blocks in this queue
std::set<UniqueId> _load_ids;
std::list<std::shared_ptr<vectorized::FutureBlock>> _block_queue;
std::list<std::shared_ptr<vectorized::Block>> _block_queue;

Status _status = Status::OK();
// memory consumption of all tables' load block queues, used for back pressure.
Expand All @@ -92,8 +91,9 @@ class GroupCommitTable {
_db_id(db_id),
_table_id(table_id),
_all_block_queues_bytes(all_block_queue_bytes) {};
Status get_first_block_load_queue(int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
Expand Down Expand Up @@ -131,8 +131,9 @@ class GroupCommitMgr {
// used when init group_commit_scan_node
Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status get_first_block_load_queue(int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version,
const UniqueId& load_id,
std::shared_ptr<vectorized::Block> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);

private:
Expand Down
Loading

0 comments on commit b448361

Please sign in to comment.