Skip to content

Commit

Permalink
Merge branch 'master' into file_scanner_null_ptr
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Sep 4, 2024
2 parents 46cb996 + ae78c52 commit 32efeca
Show file tree
Hide file tree
Showing 345 changed files with 6,039 additions and 6,068 deletions.
3 changes: 0 additions & 3 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
Expand Down Expand Up @@ -87,7 +86,6 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
Expand All @@ -109,7 +107,6 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ jobs:
git checkout 6adbe14579e5b8e19eb3e31e5ff2479f3bd302c7
popd &>/dev/null
- name: Install Python dependencies
uses: actions/setup-python@v5
with:
python-version: '3.10' # Adjust if needed

- name: "Format it!"
if: ${{ steps.filter.outputs.changes == 'true' }}
uses: ./.github/actions/clang-format-lint-action
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
sh_checker_comment: true
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples

preparation:
name: "Clang Tidy Preparation"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scope-label.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
github.event_name == 'pull_request_target' &&
(github.event.action == 'opened' ||
github.event.action == 'synchronize')
uses: actions/labeler@v5.5.0
uses: actions/labeler@2.2.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
configuration-path: .github/workflows/labeler/scope-label-conf.yml
Expand Down
21 changes: 20 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,10 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();

{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st);
}
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -741,6 +744,10 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
}
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down Expand Up @@ -841,6 +848,10 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label << ", is_2pc: " << is_2pc;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st);
}
CommitTxnRequest req;
CommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -860,6 +871,10 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st);
}
AbortTxnRequest req;
AbortTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -879,6 +894,10 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st);
}
PrecommitTxnRequest req;
PrecommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
std::stringstream ss;
ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label;
std::string op_info = ss.str();
LOG(INFO) << "rollback stream laod txn " << op_info;
LOG(INFO) << "rollback stream load txn " << op_info;
TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
: !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
: TxnOpParamType::ILLEGAL;
Expand Down
63 changes: 55 additions & 8 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ void register_suites() {
*arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error");
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn'
suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("CloudMetaMgr::commit_txn", [](auto&& args) {
LOG(INFO) << "injection CloudMetaMgr::commit_txn";
auto* arg0 = try_any_cast_ret<Status>(args);
arg0->first = Status::InternalError<false>(
"test_file_segment_cache_corruption injection error");
arg0->second = true;
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand All @@ -139,16 +150,18 @@ void set_sleep(const std::string& point, HttpRequest* req) {
}
}
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [duration](auto&& args) {
sp->set_call_back(point, [point, duration](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
});
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

void set_return(const std::string& point, HttpRequest* req) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [](auto&& args) {
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return void";
auto pred = try_any_cast<bool*>(args.back());
*pred = true;
} catch (const std::bad_any_cast&) {
Expand All @@ -160,8 +173,9 @@ void set_return(const std::string& point, HttpRequest* req) {

void set_return_ok(const std::string& point, HttpRequest* req) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [](auto&& args) {
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return ok";
auto* pair = try_any_cast_ret<Status>(args);
pair->first = Status::OK();
pair->second = true;
Expand All @@ -188,8 +202,9 @@ void set_return_error(const std::string& point, HttpRequest* req) {
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [code](auto&& args) {
sp->set_call_back(point, [code, point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return error code=" << code;
auto* pair = try_any_cast_ret<Status>(args);
pair->first = Status::Error<false>(code, "injected error");
pair->second = true;
Expand Down Expand Up @@ -243,7 +258,7 @@ void handle_clear(HttpRequest* req) {
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

void handle_suite(HttpRequest* req) {
void handle_apply_suite(HttpRequest* req) {
auto& suite = req->param("name");
if (suite.empty()) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty suite name");
Expand All @@ -253,10 +268,11 @@ void handle_suite(HttpRequest* req) {
std::call_once(register_suites_once, register_suites);
if (auto it = suite_map.find(suite); it != suite_map.end()) {
it->second(); // set injection callbacks
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
HttpChannel::send_reply(req, HttpStatus::OK, "OK apply suite " + suite + "\n");
return;
}
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, "unknown suite: " + suite);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
"unknown suite: " + suite + "\n");
}

void handle_enable(HttpRequest* req) {
Expand All @@ -273,6 +289,37 @@ void handle_disable(HttpRequest* req) {

InjectionPointAction::InjectionPointAction() = default;

//
// enable/disable injection point
// ```
// curl "be_ip:http_port/api/injection_point/enable"
// curl "be_ip:http_port/api/injection_point/disable"
// ```
//
// clear all injection points
// ```
// curl "be_ip:http_port/api/injection_point/clear"
// ```
//
// apply/activate specific suite with registered action, see `register_suites()` for more details
// ```
// curl "be_ip:http_port/api/injection_point/apply_suite?name=${suite_name}"
// ```
//
// set predifined action for specific injection point, supported actions are:
// * sleep: for injection point with callback, accepted param is `duration` in milliseconds
// * return: for injection point without return value (return void)
// * return_ok: for injection point with return value, always return Status::OK
// * return_error: for injection point with return value, accepted param is `code`,
// which is an int, valid values can be found in status.h, e.g. -235 or -230,
// if `code` is not present return Status::InternalError
// ```
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return" # return void
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_ok" # return ok
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error" # internal error
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}" # -235
// ```
void InjectionPointAction::handle(HttpRequest* req) {
LOG(INFO) << "handle InjectionPointAction " << req->debug_string();
auto& op = req->param("op");
Expand All @@ -283,7 +330,7 @@ void InjectionPointAction::handle(HttpRequest* req) {
handle_clear(req);
return;
} else if (op == "apply_suite") {
handle_suite(req);
handle_apply_suite(req);
return;
} else if (op == "enable") {
handle_enable(req);
Expand Down
6 changes: 1 addition & 5 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ DEFINE_mInt32(default_num_rows_per_column_file_block, "1024");
// pending data policy
DEFINE_mInt32(pending_data_expire_time_sec, "1800");
// inc_rowset snapshot rs sweep time interval
DEFINE_mInt32(tablet_rowset_stale_sweep_time_sec, "300");
DEFINE_mInt32(tablet_rowset_stale_sweep_time_sec, "600");
// tablet stale rowset sweep by threshold size
DEFINE_Bool(tablet_rowset_stale_sweep_by_size, "false");
DEFINE_mInt32(tablet_rowset_stale_sweep_threshold_size, "100");
Expand Down Expand Up @@ -966,10 +966,6 @@ DEFINE_Int32(pipeline_executor_size, "0");
DEFINE_Bool(enable_workload_group_for_scan, "false");
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");

// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
// Will remove after fully test.
DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");

DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
Expand Down
4 changes: 0 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1026,10 +1026,6 @@ DECLARE_Bool(enable_debug_points);

DECLARE_Int32(pipeline_executor_size);

// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
// Will remove after fully test.
DECLARE_Bool(enable_index_apply_preds_except_leafnode_of_andnode);

// block file cache
DECLARE_Bool(enable_file_cache);
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
Expand Down
10 changes: 4 additions & 6 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
}

std::size_t decompressed_large_block_len = 0;
do {
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
Expand Down Expand Up @@ -505,8 +505,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);
};

if (*more_input_bytes != 0) {
// Need more input buffer
Expand Down Expand Up @@ -586,7 +585,7 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
}

std::size_t decompressed_large_block_len = 0;
do {
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
Expand Down Expand Up @@ -630,8 +629,7 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);
};

if (*more_input_bytes != 0) {
// Need more input buffer
Expand Down
Loading

0 comments on commit 32efeca

Please sign in to comment.