Skip to content

Commit

Permalink
Merge branch 'master' into create_view
Browse files Browse the repository at this point in the history
  • Loading branch information
feiniaofeiafei authored Apr 1, 2024
2 parents 2e40f0f + 9be7e30 commit 9bb9a7e
Show file tree
Hide file tree
Showing 414 changed files with 9,014 additions and 3,067 deletions.
1 change: 1 addition & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,7 @@ void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskReq
const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids,
&succ_tablet_ids);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();

TFinishTaskRequest finish_task_request;
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteReques
RuntimeProfile* profile, const UniqueId& load_id)
: BaseDeltaWriter(req, profile, load_id), _engine(engine) {
_rowset_builder = std::make_unique<CloudRowsetBuilder>(engine, req, profile);
_query_thread_context.init();
}

CloudDeltaWriter::~CloudDeltaWriter() = default;
Expand All @@ -46,12 +47,13 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
}

tasks.emplace_back([writer] {
ThreadLocalHandle::create_thread_local_if_not_exits();
SCOPED_ATTACH_TASK(writer->query_thread_context());
std::lock_guard<bthread::Mutex> lock(writer->_mtx);
if (writer->_is_init || writer->_is_cancelled) {
return Status::OK();
}
return writer->init();
Status st = writer->init(); // included in SCOPED_ATTACH_TASK
return st;
});
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ class CloudDeltaWriter final : public BaseDeltaWriter {

Status set_txn_related_delete_bitmap();

QueryThreadContext query_thread_context() { return _query_thread_context; }

private:
// Convert `_rowset_builder` from `BaseRowsetBuilder` to `CloudRowsetBuilder`
CloudRowsetBuilder* rowset_builder();

bthread::Mutex _mtx;
CloudStorageEngine& _engine;
QueryThreadContext _query_thread_context;
};

} // namespace doris
15 changes: 13 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "cloud/cloud_engine_calc_delete_bitmap_task.h"

#include <fmt/format.h>

#include <memory>

#include "cloud/cloud_meta_mgr.h"
Expand All @@ -29,6 +31,7 @@
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/memory/mem_tracker_limiter.h"

namespace doris {

Expand All @@ -38,7 +41,10 @@ CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
: _engine(engine),
_cal_delete_bitmap_req(cal_delete_bitmap_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {}
_succ_tablet_ids(succ_tablet_ids) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
"CloudEngineCalcDeleteBitmapTask");
}

void CloudEngineCalcDeleteBitmapTask::add_error_tablet_id(int64_t tablet_id, const Status& err) {
std::lock_guard<std::mutex> lck(_mutex);
Expand Down Expand Up @@ -126,9 +132,14 @@ CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
_engine_calc_delete_bitmap_task(engine_task),
_tablet(tablet),
_transaction_id(transaction_id),
_version(version) {}
_version(version) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", _transaction_id));
}

void CloudTabletCalcDeleteBitmapTask::handle() const {
SCOPED_ATTACH_TASK(_mem_tracker);
RowsetSharedPtr rowset;
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
namespace doris {

class CloudEngineCalcDeleteBitmapTask;
class MemTrackerLimiter;

class CloudTabletCalcDeleteBitmapTask {
public:
Expand All @@ -46,6 +47,7 @@ class CloudTabletCalcDeleteBitmapTask {
std::shared_ptr<CloudTablet> _tablet;
int64_t _transaction_id;
int64_t _version;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

class CloudEngineCalcDeleteBitmapTask : public EngineTask {
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaciton(
}
location_map.clear();

// 2. calc delete bimap for incremental data
// 2. calc delete bitmap for incremental data
RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
*this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
Expand Down
83 changes: 62 additions & 21 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
if (!tnode.intermediate_output_tuple_id_list.empty()) {
DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
// common subexpression elimination
DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
_intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size());
for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
_intermediate_output_row_descriptor.push_back(
RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
}
}

_query_statistics = std::make_shared<QueryStatistics>();
}

Expand All @@ -114,7 +126,15 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
}

if (!tnode.intermediate_projections_list.empty()) {
DCHECK(tnode.__isset.projections) << "no final projections";
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
for (const auto& tnode_projections : tnode.intermediate_projections_list) {
vectorized::VExprContextSPtrs projections;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
_intermediate_projections.push_back(projections);
}
}
return Status::OK();
}

Expand Down Expand Up @@ -143,7 +163,12 @@ Status ExecNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));

for (auto& i : _children) {
RETURN_IF_ERROR(i->prepare(state));
Expand All @@ -155,6 +180,9 @@ Status ExecNode::alloc_resource(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
return Status::OK();
}
Expand Down Expand Up @@ -514,6 +542,22 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
const size_t rows = origin_block->rows();
if (rows == 0) {
return Status::OK();
}
vectorized::Block input_block = *origin_block;

std::vector<int> result_column_ids;
for (auto& projections : _intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
input_block.shuffle_columns(result_column_ids);
}

DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
Expand All @@ -535,29 +579,26 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
auto rows = origin_block->rows();

if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
auto& mutable_columns = mutable_block.mutable_columns();

if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}
if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}

for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id));
auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));

return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,26 @@ class ExecNode {
return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
}
virtual const RowDescriptor& intermediate_row_desc() const { return _row_descriptor; }

// input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr
// prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor

[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
if (idx == 0) {
return intermediate_row_desc();
}
DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
return _intermediate_output_row_descriptor[idx - 1];
}

[[nodiscard]] const RowDescriptor& projections_row_desc() const {
if (_intermediate_output_row_descriptor.empty()) {
return intermediate_row_desc();
} else {
return _intermediate_output_row_descriptor.back();
}
}

int64_t rows_returned() const { return _num_rows_returned; }
int64_t limit() const { return _limit; }
bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; }
Expand Down Expand Up @@ -270,6 +290,10 @@ class ExecNode {
std::unique_ptr<RowDescriptor> _output_row_descriptor;
vectorized::VExprContextSPtrs _projections;

std::vector<RowDescriptor> _intermediate_output_row_descriptor;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;

/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;

Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
#include "util/string_util.h"
#include "vec/columns/column.h"
// NOLINTNEXTLINE(unused-includes)
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_context.h" // IWYU pragma: keep
#include "vec/exprs/vliteral.h"
#include "vec/runtime/vdatetime_value.h"

Expand Down Expand Up @@ -75,6 +75,8 @@ bool VOlapTablePartKeyComparator::operator()(const BlockRowWithIndicator& lhs,
bool l_use_new = std::get<2>(lhs);
bool r_use_new = std::get<2>(rhs);

VLOG_TRACE << '\n' << l_block->dump_data() << '\n' << r_block->dump_data();

if (l_row == -1) {
return false;
} else if (r_row == -1) {
Expand All @@ -93,7 +95,6 @@ bool VOlapTablePartKeyComparator::operator()(const BlockRowWithIndicator& lhs,
DCHECK(_slot_locs.size() == _param_locs.size())
<< _slot_locs.size() << ' ' << _param_locs.size();

//TODO: use template to accelerate this for older compiler.
const std::vector<uint16_t>* l_index = l_use_new ? &_param_locs : &_slot_locs;
const std::vector<uint16_t>* r_index = r_use_new ? &_param_locs : &_slot_locs;

Expand Down Expand Up @@ -330,6 +331,7 @@ VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSche
slot->get_data_type_ptr(), slot->col_name()});
}
}
VLOG_TRACE << _partition_block.dump_structure();
} else {
// we insert all. but not all will be used. it will controlled by _partition_slot_locs
for (auto* slot : _slots) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
}
}

if (!http_req->header(HTTP_CLOUD_CLUSTER).empty()) {
request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER));
}

#ifndef BE_TEST
// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
static const std::string HTTP_WAL_ID_KY = "wal_id";
static const std::string HTTP_AUTH_CODE = "auth_code";
static const std::string HTTP_GROUP_COMMIT = "group_commit";
static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster";

} // namespace doris
1 change: 1 addition & 0 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Status FSFileCacheStorage::rebuild_data_structure() const {
auto rebuild_dir = [&](std::filesystem::directory_iterator& upgrade_key_it) -> Status {
for (; upgrade_key_it != std::filesystem::directory_iterator(); ++upgrade_key_it) {
if (upgrade_key_it->path().filename().native().find('_') == std::string::npos) {
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
RETURN_IF_ERROR(
fs->rename(upgrade_key_it->path(), upgrade_key_it->path().native() + "_0"));
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ bool CompactionMixin::handle_ordered_data_compaction() {
_tablet->enable_unique_key_merge_on_write()) {
return false;
}

if (_tablet->tablet_meta()->tablet_schema()->skip_write_index_on_load()) {
// Expected to create index through normal compaction
return false;
}

// check delete version: if compaction type is base compaction and
// has a delete version, use original compaction
if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
Expand Down Expand Up @@ -346,6 +352,11 @@ bool CompactionMixin::handle_ordered_data_compaction() {
// most rowset of current compaction is nonoverlapping
// just handle nonoverlappint rowsets
auto st = do_compact_ordered_rowsets();
if (!st.ok()) {
LOG(WARNING) << "failed to compact ordered rowsets: " << st;
_pending_rs_guard.drop();
}

return st.ok();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class CompactionMixin : public Compaction {

void construct_skip_inverted_index(RowsetWriterContext& ctx);

// Return true if do ordered data compaction successfully
bool handle_ordered_data_compaction();

Status do_compact_ordered_rowsets();
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace doris {

class FlushToken;
class MemTable;
class MemTracker;
class StorageEngine;
class TupleDescriptor;
class SlotDescriptor;
Expand Down
Loading

0 comments on commit 9bb9a7e

Please sign in to comment.