Skip to content

Commit

Permalink
Merge branch 'selectdb-cloud-release-2.0' (20221201 986a3d5e7) into s…
Browse files Browse the repository at this point in the history
…electdb-cloud-dev (20221130 23a144c) (apache#1199)

* [feature](selectdb-cloud) Fix file cache metrics nullptr error (apache#1060)

* [feature](selectdb-cloud) Fix abort copy when -235 (apache#1039)

* [feature](selectdb-cloud) Replace libfdb_c.so to make it compatible with different OS (apache#925)

* [feature](selectdb-cloud) Optimize RPC retry in cloud_meta_mgr (apache#1027)

* Optimize RETRY_RPC in cloud_meta_mgr
* Add random sleep for RETRY_RPC
* Add a simple backoff strategy for rpc retry

* [feature](selectdb-cloud) Copy into support select by column name (apache#1055)

* Copy into support select by column name
* Fix broker load core dump due to mis-match of number of columns between remote and schema

* [feature](selectdb-cloud) Fix test_dup_mv_schema_change case (apache#1022)

* [feature](selectdb-cloud) Make the broker execute on the specified cluster (apache#1043)

* Make the broker execute on the specified cluster
* Pass the cluster parameter

* [feature](selectdb-cloud) Support concurrent BaseCompaction and CumuCompaction on a tablet (apache#1059)

* [feature](selectdb-cloud) Reduce meta-service log (apache#1067)

* Quote string in the tagged log
* Add template to enable customized log for RPC requests

* [feature](selectdb-cloud) Use read-only txn + read-write txn for `commit_txn` (apache#1065)

* [feature](selectdb-cloud) Pick "[fix](load) fix that load channel failed to be released in time (apache#14119)"

commit 3690c4d
Author: Xin Liao <[email protected]>
Date:   Wed Nov 9 22:38:08 2022 +0800
    [fix](load) fix that load channel failed to be released in time (apache#14119)

* [feature](selectdb-cloud) Add compaction profile log (apache#1072)

* [feature](selectdb-cloud) Fix abort txn fail when copy job `getAllFileStatus` exception (apache#1066)

* Revert "[feature](selectdb-cloud) Copy into support select by column name (apache#1055)"

This reverts commit f1a543e.

* [feature](selectdb-cloud) Pick"[fix](metric) fix the bug of not updating the query latency metric apache#14172 (apache#1076)"

* [feature](selectdb-cloud) Distinguish KV_TXN_COMMIT_ERR or KV_TXN_CONFLICT while commit failed (apache#1082)

* [feature](selectdb-cloud) Support configuring base compaction concurrency (apache#1080)

* [feature](selectdb-cloud) Enhance start.sh/stop.sh for selectdb_cloud (apache#1079)

* [feature](selectdb-cloud) Add smoke testing (apache#1056)

Add smoke test, 1. upload,query http data api. 2. internal, external stage. 3. select,insert

* [feature](selectdb-cloud) Disable admin stmt in cloud mode (apache#1064)

Disable the following stmt.

* AdminRebalanceDiskStmt/AdminCancelRebalanceDiskStmt
* AdminRepairTableStmt/AdminCancelRepairTableStmt
* AdminCheckTabletsStmt
* AdminCleanTrashStmt
* AdminCompactTableStmt
* AdminCopyTabletStmt
* AdminDiagnoseTabletStmt
* AdminSetConfigStmt
* AdminSetReplicaStatusStmt
* AdminShowConfigStmt
* AdminShowReplicaDistributionStmt
* AdminShowReplicaStatusStmt
* AdminShowTabletStorageFormatStmt

Leaving a backdoor for the user root:

* AdminSetConfigStmt
* AdminShowConfigStmt
* AdminShowReplicaDistributionStmt
* AdminShowReplicaStatusStmt
* AdminDiagnoseTabletStmt

* [feature](selectdb-cloud) Update copy into doc (apache#1063)

* [feature](selectdb-cloud) Fix AdminSetConfigStmt cannot work with root (apache#1085)

* [feature](selectdb-cloud) Fix userid null lead to checkpoint error (apache#1083)

* [feature](selectdb-cloud) Support controling the space used for upload (apache#1091)

* [feature](selectdb-cloud) Pick "[fix](sequence) fix that update table core dump with sequence column (apache#13847)" (apache#1092)

* [Fix](memory-leak) Fix boost::stacktrace memory leak (1097)

* [Fix](selectdb-cloud) Several picks to fix memtracker  (apache#1087)

* [enhancement](memtracker)  Add independent and unique scanner mem tracker for each query (apache#13262)

* [enhancement](memory) Print memory usage log when memory allocation fails (apache#13301)

* [enhancement](memtracker) Print query memory usage log every second when `memory_verbose_track` is enabled (apache#13302)

* [fix](memory) Fix USE_JEMALLOC=true UBSAN compilation error apache#13398

* [enhancement](memtracker) Fix bthread local consume mem tracker (apache#13368)

    Previously, bthread_getspecific was called every time bthread local was used. In the test at apache#10823, it was found 
    that frequent calls to bthread_getspecific had performance problems.

    So a cache is implemented on pthread local based on the btls key, but the btls key cannot correctly sense bthread switching.

    So, based on bthread_self to get the bthread id to implement the cache.

* [enhancement](memtracker) Fix brpc causing query mem tracker to be inaccurate apache#13401

* [fix](memtracker) Fix transmit_tracker null pointer because phamp is not thread safe apache#13528

* [enhancement](memtracker) Fix Brpc mem count and refactored thread context macro  (apache#13469)

* [fix](memtracker) Fix the usage of bthread mem tracker  (apache#13708)

    bthead context init has performance loss, temporarily delete it first, it will be completely refactored in apache#13585.

* [enhancement](memtracker) Refactor load channel + memtable mem tracker (apache#13795)

* [fix](load) Fix load channel mgr lock (apache#13960)

    hot fix load channel mgr lock

* [fix](memtracker) Fix DCHECK !std::count(_consumer_tracker_stack.begin(), _consumer_tracker_stack.end(), tracker)

* [tempfix][memtracker] wait pick 0b945fe

Co-authored-by: Xinyi Zou <[email protected]>

* [feature](selectdb-cloud) Add more recycler case (apache#1094)

* [feature](selectdb-cloud) Pick "[improvement](load) some simple optimization for reduce load memory policy (apache#14215)" (apache#1096)

* [feature](selectdb-cloud) Reduce unnecessary get rowset rpc when prepare compaction (apache#1099)

* [feature](selectdb-cloud) Pick "[improvement](load) reduce memory in batch for small load channels (apache#14214)" (apache#1100)

* [feature](selectdb-cloud) Pick "[improvement](load) release load channel actively when error occurs (apache#14218)" (apache#1102)

* [feature](selectdb-cloud) Print build info of ms/recycler to stdout when launch (apache#1105)

* [feature](selectdb-cloud) copy into support select by column name and load with partial columns (apache#1104)

e.g.
```
COPY INTO test_table FROM (SELECT col1, col2, col3 FROM @ext_stage('1.parquet'))

COPY INTO test_table (id, name) FROM (SELECT col1, col2 FROM @ext_stage('1.parquet'))
```

* [fix](selectdb-cloud) Pick "[Fix](array-type) bugfix for array column with delete condition (apache#13361)" (apache#1109)

Fix for SQL with array column:
delete from tbl where c_array is null;

more info please refer to apache#13360

Co-authored-by: camby <[email protected]>
Co-authored-by: cambyzju <[email protected]>

* [feature](selectdb-cloud) Copy into support force (apache#1081)

* [feature](selectdb-cloud) Add abort txn, abort tablet job http api (apache#1101)

Abort load txn by txn_id:
```
curl "{meta_sevice_ip}:{brpc_port}/MetaService/http/abort_txn?token=greedisgood9999" -d '{
"cloud_unique_id": string,
"txn_id": int64
}'
```

Abort load txn by db_id and label:
```
curl "{meta_sevice_ip}:{brpc_port}/MetaService/http/abort_txn?token=greedisgood9999" -d '{
"cloud_unique_id": string,
"db_id": int64,
"label": string
}'
```

Only support abort compaction job currently:
```
curl "{meta_sevice_ip}:{brpc_port}/MetaService/http/abort_tablet_job?token=greedisgood9999" -d '{
"cloud_unique_id": string,
"job" : {
  "idx": {"tablet_id": int64},
  "compaction": [{"id": string}]
}
}'
```

* [feature](selectdb-cloud) Fix external stage data for smoke test and retry to create stage (apache#1119)

* [feature](selectdb-cloud) Fix data leaks when truncating table (apache#1114)

* Drop cloud partition when truncating table
* Add retry strategy for dropCloudMaterializedIndex

* [feature](selectdb-cloud) Fix missing library when compiling unit test (apache#1128)

* [feature](selectdb-cloud) Validate the object storage when create stage (apache#1115)

* [feature](selectdb-cloud) Fix incorrectly setting cumulative point when committing base compaction (apache#1127)

* [feature](selectdb-cloud) Fix missing lease when preparing cumulative compaction (apache#1131)

* [feature](selectdb-cloud) Fix unbalanced tablet distribution (apache#1121)

* Fix the bug of unbalanced tablet distribution
* Use replica index hash to BE

* [feature](selectdb-cloud) Fix core dump when get tablets info by BE web page (apache#1113)

* [feature](selectdb-cloud) Fix start_fe.sh --version (apache#1106)

* [feature](selectdb-cloud) Print tablet stats before and after compaction (apache#1132)

* Log num rowsets before and after compaction
* Print tablet stats after committing compaction

* [feature](selectdb-cloud) Allow root user execute AlterSystemStmt (apache#1143)

* [feature](selectdb-cloud) Fix BE UT (apache#1141)

* [feature](selectdb-cloud) Select BE for the first bucket of every partition randomly (apache#1136)

* [feature](selectdb-cloud) Fix query_limit int -> int64 (apache#1154)

* [feature](selectdb-cloud) Add more cloud recycler case (apache#1116)

* add more cloud recycler case
* modify cloud recycler case dateset from sf0.1 to sf1

* [feature](selectdb-cloud) Fix misuse of aws transfer which may delete tmp file prematurely (apache#1160)

* [feature](selectdb-cloud) Add test for copy into http data api and userId (apache#1044)

* Add test for copy into http data api and userId
* Add external and internal stage cross use regression case.

* [feature](selectdb-cloud)  Pass the cloud compaction regression test (apache#1173)

* [feature](selectdb-cloud) Modify max_bytes_per_broker_scanner default value to 150G (apache#1184)

* [feature](selectdb-cloud) Fix missing lock when calling Tablet::delete_predicates (apache#1182)

* [improvement](config)change default remote_fragment_exec_timeout_ms to 30 seconds

* [improvement](config) change default value of broker_load_default_timeout_second to 12 hours

* [feature](selectdb-cloud) Fix replay copy into (apache#1167)

* Add stage ddl regression
* fix replay copy into
* remove unused log
* fix user name

* [feature](selectdb-cloud) Fix FE --version option not work after fe started (apache#1161)

* [feature](selectdb-cloud) BE accesses object store using HTTP (apache#1111)

* [feature](selectdb-cloud) Refactor recycle copy jobs (apache#1062)

* [fix](FE) Pick fix from doris master (apache#1177) (apache#1178)

Commit: 53e5f39
Author: starocean999 <[email protected]>
Committer: GitHub <[email protected]>
Date: Mon Oct 31 2022 10:19:32 GMT+0800 (China Standard Time)
fix result exprs should be substituted in the same way as agg exprs (apache#13744)

Commit: a4a9912
Author: starocean999 <[email protected]>
Committer: GitHub <[email protected]>
Date: Thu Nov 03 2022 10:26:59 GMT+0800 (China Standard Time)
fix group by constant value bug (apache#13827)

Commit: 84b969a
Author: starocean999 <[email protected]>
Committer: GitHub <[email protected]>
Date: Thu Nov 10 2022 11:10:42 GMT+0800 (China Standard Time)
fix the grouping expr should check col name from base table first, then alias (apache#14077)

Commit: ae4f4b9
Author: starocean999 <[email protected]>
Committer: GitHub <[email protected]>
Date: Thu Nov 24 2022 10:31:58 GMT+0800 (China Standard Time)
fix having clause should use column name first then alias (apache#14408)

* [feature](selectdb-cloud) Deal with getNextTransactionId rpc exception (apache#1181)

Before fixing, getNextTransactionId will return -1 if there is RPC exception,
it will cause schema change and the previous load task execute in parallel unexpectedly.

* [feature](selectdb-cloud) Throw exception for unsupported operations in CloudGlobalTransactionMgr (apache#1180)

* [improvement](load) Add more log on RPC error (apache#1183)

* [feature](selectdb-cloud) Add copy_into case(json, parquet, orc) and tpch_sf1 to smoke test (apache#1140)

* [feature](selectdb-cloud) Recycle dropped stage (apache#1071)

* log s3 response code
* add log in S3Accessor::delete_objects_by_prefix
* Fix show copy
* remove empty line

* [feature](selectdb-cloud) Support bthread for new scanner (apache#1117)

* Support bthread for new scanner
* Keep the number of remote threads same as local threads

* [feature](selectdb-cloud) Implement self-explained cloud unique id for instance id searching (apache#1089)

1. Implement self-explained cloud unique id for instance id searching
2. Fix register core when metaservice start error
3. Fix drop_instance not set mtime
4. Add HTTP API to get instance info

```
curl "127.0.0.1:5008/MetaService/http/get_instance?token=greedisgood9999&cloud_unique_id=regression-cloud-unique-id-fe-1"

curl "127.0.0.1:5008/MetaService/http/get_instance?token=greedisgood9999&cloud_unique_id=1:regression_instance0:regression-cloud-unique-id-fe-1"

curl "127.0.0.1:5008/MetaService/http/get_instance?token=greedisgood9999&instance_id=regression_instance0"
```

* [improvement](memory) simplify memory config related to tcmalloc  and add gc (apache#1191)

* [improvement](memory) simplify memory config related to tcmalloc

There are several configs related to tcmalloc, users do know how to config them. Actually users just want two modes, performance or compact, in performance mode, users want doris run query and load quickly while in compact mode, users want doris run with less memory usage.

If we want to config tcmalloc individually, we can use env variables which are supported by tcmalloc.

* [improvement](tcmalloc) add moderate mode and avoid oom  with a lot of cache (apache#14374)

ReleaseToSystem aggressively when there are little free memory.

* [feature](selectdb-cloud) Pick "[fix](hashjoin) fix coredump of hash join in ubsan build apache#13479" (apache#1190)

commit b5cd167
Author: TengJianPing <[email protected]>
Date:   Thu Oct 20 10:16:19 2022 +0800
    [fix](hashjoin) fix coredump of hash join in ubsan build (apache#13479)

* [feature](selectdb-cloud) Support close FileWriter without forcing sync data to storage medium (apache#1134)

* Trace accumulated time
* Support close FileWriter without forcing sync data to storage medium
* Avoid trace overhead when disable trace

* [feature](selectdb-cloud) Pick "[BugFix](function) fix reverse function dynamic buffer overflow due to illegal character apache#13671" (apache#1146)

* pick [opt](exec) Replace get_utf8_byte_length function by array (apache#13664)
* pick [BugFix](function) fix reverse function dynamic buffer overflow due to illegal character apache#13671
Co-authored-by: HappenLee <[email protected]>

* [feature](selectdb-cloud) Pick "[fix](fe) Inconsistent behavior for string comparison in FE and BE (apache#13604)" (apache#1150)

Co-authored-by: xueweizhang <[email protected]>

* [feature](selectdb-cloud) Copy into support delete_on condition (apache#1148)

* [feature](selectdb-cloud) Pick "[fix](agg)fix group by constant value bug (apache#13827)" (apache#1152)

* [fix](agg)fix group by constant value bug

* keep only one const grouping exprs if no agg exprs

Co-authored-by: starocean999 <[email protected]>

* [feature](selectdb-cloud) Pick "[fix](join)the build and probe expr should be calculated before converting input block to nullable (apache#13436)" (apache#1155)

* [fix](join)the build and probe expr should be calculated before converting input block to nullable

* remove_nullable can be called on const column

Co-authored-by: starocean999 <[email protected]>

* [feature](selectdb-cloud) Pick "[Bug](predicate) fix core dump on bool type runtime filter (apache#13417)" (apache#1156)

fix core dump on bool type runtime filter

Co-authored-by: Pxl <[email protected]>

* [feature](selectdb-cloud) Pick "[Fix](agg) fix bitmap agg core dump when phmap pointer assert alignment (apache#13381)" (apache#1157)

Co-authored-by: zhangstar333 <[email protected]>

* [feature](selectdb-cloud) Pick "[Bug](function) fix core dump on case when have 1000 condition apache#13315" (apache#1158)

Co-authored-by: Pxl <[email protected]>

* [feature](selectdb-cloud) Pick "[fix](sort)the sort expr nullable info is wrong in some case (apache#12003)"

* [feature](selectdb-cloud) Pick "[Improvement](decimal) print decimal according to the real precision and scale (apache#13437)"

* [feature](selectdb-cloud) Pick "[bugfix](VecDateTimeValue) eat the value of microsecond in function from_date_format_str (apache#13446)"

* [bugfix](VecDateTimeValue) eat the value of microsecond in function from_date_format_str

* add sql based regression test

Co-authored-by: xiaojunjie <[email protected]>

Co-authored-by: Lightman <[email protected]>
Co-authored-by: meiyi <[email protected]>
Co-authored-by: Xiaocc <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: Xin Liao <[email protected]>
Co-authored-by: Luwei <[email protected]>
Co-authored-by: plat1ko <[email protected]>
Co-authored-by: deardeng <[email protected]>
Co-authored-by: Kidd <[email protected]>
Co-authored-by: Xinyi Zou <[email protected]>
Co-authored-by: zhannngchen <[email protected]>
Co-authored-by: camby <[email protected]>
Co-authored-by: cambyzju <[email protected]>
Co-authored-by: Yongqiang YANG <[email protected]>
Co-authored-by: starocean999 <[email protected]>
Co-authored-by: Gabriel <[email protected]>
Co-authored-by: AlexYue <[email protected]>
Co-authored-by: xueweizhang <[email protected]>
Co-authored-by: Pxl <[email protected]>
Co-authored-by: zhangstar333 <[email protected]>
Co-authored-by: xiaojunjie <[email protected]>
Co-authored-by: xiaojunjie <[email protected]>
  • Loading branch information
1 parent a021bb4 commit 96c230b
Show file tree
Hide file tree
Showing 246 changed files with 111,918 additions and 1,061 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ Status CloudBaseCompaction::prepare_compact() {

RETURN_IF_ERROR(pick_rowsets_to_compact());
TRACE("rowsets picked");
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());

for (auto& rs : _input_rowsets) {
_input_rows += rs->num_rows();
Expand Down Expand Up @@ -183,6 +182,7 @@ Status CloudBaseCompaction::update_tablet_meta() {
int64_t base_compaction_cnt = _tablet->base_compaction_cnt();
selectdb::TabletStatsPB stats;
RETURN_IF_ERROR(cloud::meta_mgr()->commit_tablet_job(job, &stats));
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();

{
std::lock_guard wrlock(_tablet->get_header_lock());
Expand Down
16 changes: 10 additions & 6 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ Status CloudCumulativeCompaction::prepare_compact() {
return st;
}
TRACE("rowsets picked");
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());

for (auto& rs : _input_rowsets) {
_input_rows += rs->num_rows();
Expand All @@ -95,7 +94,9 @@ Status CloudCumulativeCompaction::prepare_compact() {
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_data_size)
.tag("tablet_max_version", _tablet->local_max_version())
.tag("cumulative_point", _tablet->cumulative_layer_point());
.tag("cumulative_point", _tablet->cumulative_layer_point())
.tag("num_rowsets", _tablet->fetch_add_approximate_num_rowsets(0))
.tag("cumu_num_rowsets", _tablet->fetch_add_approximate_cumu_num_rowsets(0));

// prepare compaction job
selectdb::TabletJobInfoPB job;
Expand All @@ -112,9 +113,10 @@ Status CloudCumulativeCompaction::prepare_compact() {
compaction_job->set_base_compaction_cnt(base_compaction_cnt);
compaction_job->set_cumulative_compaction_cnt(cumulative_compaction_cnt);
using namespace std::chrono;
_expiration = duration_cast<seconds>(system_clock::now().time_since_epoch()).count() +
config::compaction_timeout_seconds;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
_expiration = now + config::compaction_timeout_seconds;
compaction_job->set_expiration(_expiration);
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
st = cloud::meta_mgr()->prepare_tablet_job(job);
if (st.precise_code() == STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
Expand Down Expand Up @@ -147,7 +149,9 @@ Status CloudCumulativeCompaction::execute_compact_impl() {
.tag("output_segments", _output_rowset->num_segments())
.tag("output_data_size", _output_rowset->data_disk_size())
.tag("tablet_max_version", _tablet->local_max_version())
.tag("cumulative_point", _tablet->cumulative_layer_point());
.tag("cumulative_point", _tablet->cumulative_layer_point())
.tag("num_rowsets", _tablet->fetch_add_approximate_num_rowsets(0))
.tag("cumu_num_rowsets", _tablet->fetch_add_approximate_cumu_num_rowsets(0));
TRACE("compaction finished");

_state = CompactionState::SUCCESS;
Expand Down Expand Up @@ -195,7 +199,7 @@ Status CloudCumulativeCompaction::update_tablet_meta() {
int64_t cumulative_compaction_cnt = _tablet->cumulative_compaction_cnt();
selectdb::TabletStatsPB stats;
RETURN_IF_ERROR(cloud::meta_mgr()->commit_tablet_job(job, &stats));

LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
{
std::lock_guard wrlock(_tablet->get_header_lock());
if (_tablet->cumulative_compaction_cnt() > cumulative_compaction_cnt) {
Expand Down
13 changes: 11 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -828,9 +828,9 @@ CONF_Bool(enable_time_lut, "true");
CONF_Bool(enable_simdjson_reader, "false");

// number of s3 scanner thread pool size
CONF_Int32(doris_remote_scanner_thread_pool_thread_num, "16");
CONF_Int32(doris_remote_scanner_thread_pool_thread_num, "48");
// number of s3 scanner thread pool queue size
CONF_Int32(doris_remote_scanner_thread_pool_queue_size, "10240");
CONF_Int32(doris_remote_scanner_thread_pool_queue_size, "102400");

// If set to true, the new scan node framework will be used.
// This config should be removed when the new scan node is ready.
Expand Down Expand Up @@ -924,6 +924,15 @@ CONF_Int32(max_depth_in_bkd_tree, "32");

CONF_Bool(enable_index_compaction, "false");

// http scheme in S3Client to use. E.g. http or https
CONF_String(s3_client_http_scheme, "http");
CONF_Validator(s3_client_http_scheme, [](const std::string& config) -> bool {
return config == "http" || config == "https";
});

// temporary config for regression-test
CONF_Bool(always_promote_cumulative_point, "false");

} // namespace config

} // namespace doris
10 changes: 7 additions & 3 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ void NodeChannel::open() {
}

void NodeChannel::_cancel_with_msg(const std::string& msg) {
LOG(WARNING) << channel_info() << ", " << msg;
LOG(WARNING) << "cancel node channel " << channel_info() << ", error message: " << msg;
{
std::lock_guard<SpinLock> l(_cancel_msg_lock);
if (_cancel_msg == "") {
Expand Down Expand Up @@ -207,7 +207,10 @@ Status NodeChannel::open_wait() {
}
// If rpc failed, mark all tablets on this node channel as failed
_index_channel->mark_as_failed(this->node_id(), this->host(),
_add_batch_closure->cntl.ErrorText(), -1);
fmt::format("rpc failed, error coed:{}, error text:{}",
_add_batch_closure->cntl.ErrorCode(),
_add_batch_closure->cntl.ErrorText()),
-1);
Status st = _index_channel->check_intolerable_failure();
if (!st.ok()) {
_cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
Expand All @@ -231,7 +234,8 @@ Status NodeChannel::open_wait() {
if (status.ok()) {
// if has error tablet, handle them first
for (auto& error : result.tablet_errors()) {
_index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(),
_index_channel->mark_as_failed(this->node_id(), this->host(),
"tablet error: " + error.msg(),
error.tablet_id());
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,7 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {

switch (column_type) {
case TYPE_BOOLEAN: {
batch_copy<int32_t>(filter, it, [](PColumnValue* column, const int32_t* value) {
batch_copy<bool>(filter, it, [](PColumnValue* column, const bool* value) {
column->set_boolval(*value);
});
return;
Expand Down
10 changes: 5 additions & 5 deletions be/src/exprs/string_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void StringFunctions::init() {}
size_t get_char_len(const StringVal& str, std::vector<size_t>* str_index) {
size_t char_len = 0;
for (size_t i = 0, char_size = 0; i < str.len; i += char_size) {
char_size = get_utf8_byte_length((unsigned)(str.ptr)[i]);
char_size = UTF8_BYTE_LENGTH[(unsigned char)(str.ptr)[i]];
str_index->push_back(i);
++char_len;
}
Expand Down Expand Up @@ -65,7 +65,7 @@ StringVal StringFunctions::substring(FunctionContext* context, const StringVal&
size_t byte_pos = 0;
std::vector<size_t> index;
for (size_t i = 0, char_size = 0; i < str.len; i += char_size) {
char_size = get_utf8_byte_length((unsigned)(str.ptr)[i]);
char_size = UTF8_BYTE_LENGTH[(unsigned char)(str.ptr)[i]];
index.push_back(i);
if (pos.val > 0 && index.size() > pos.val + len.val) {
break;
Expand Down Expand Up @@ -320,7 +320,7 @@ IntVal StringFunctions::char_utf8_length(FunctionContext* context, const StringV
}
size_t char_len = 0;
for (size_t i = 0, char_size = 0; i < str.len; i += char_size) {
char_size = get_utf8_byte_length((unsigned)(str.ptr)[i]);
char_size = UTF8_BYTE_LENGTH[(unsigned char)(str.ptr)[i]];
++char_len;
}
return IntVal(char_len);
Expand Down Expand Up @@ -400,7 +400,7 @@ IntVal StringFunctions::instr(FunctionContext* context, const StringVal& str,
if (loc > 0) {
size_t char_len = 0;
for (size_t i = 0, char_size = 0; i < loc; i += char_size) {
char_size = get_utf8_byte_length((unsigned)(str.ptr)[i]);
char_size = UTF8_BYTE_LENGTH[(unsigned char)(str.ptr)[i]];
++char_len;
}
loc = char_len;
Expand Down Expand Up @@ -448,7 +448,7 @@ IntVal StringFunctions::locate_pos(FunctionContext* context, const StringVal& su
// Hive returns the position in the original string starting from 1.
size_t char_len = 0;
for (size_t i = 0, char_size = 0; i < match_pos; i += char_size) {
char_size = get_utf8_byte_length((unsigned)(adjusted_str.ptr)[i]);
char_size = UTF8_BYTE_LENGTH[(unsigned char)(adjusted_str.ptr)[i]];
++char_len;
}
match_pos = char_len;
Expand Down
12 changes: 12 additions & 0 deletions be/src/http/action/tablets_info_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <string>

#include "cloud/utils.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
Expand Down Expand Up @@ -58,8 +59,19 @@ EasyJson TabletsInfoAction::get_tablets_info(string tablet_num_to_return) {
msg = "Parameter Error";
}
std::vector<TabletInfo> tablets_info;
#ifdef CLOUD_MODE
auto weak_tablets = cloud::tablet_mgr()->get_weak_tablets();
int64_t count = 0;
for (auto& tablet : weak_tablets) {
if (++count > number) break;
if (auto t = tablet.lock()) {
tablets_info.push_back(t->get_tablet_info());
}
}
#else
TabletManager* tablet_manager = StorageEngine::instance()->tablet_manager();
tablet_manager->obtain_specific_quantity_tablets(tablets_info, number);
#endif

EasyJson tablets_info_ej;
tablets_info_ej["msg"] = msg;
Expand Down
3 changes: 1 addition & 2 deletions be/src/io/cloud/cloud_file_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ Status FileSegment::set_downloaded(std::lock_guard<std::mutex>& /* segment_lock
}

if (_cache_writer) {
RETURN_IF_ERROR(_cache_writer->finalize());
RETURN_IF_ERROR(_cache_writer->close());
RETURN_IF_ERROR(_cache_writer->close(false));
_cache_writer.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cloud/tmp_file_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ bool TmpFileMgr::check_if_has_enough_space_to_async_upload(const Path& path,
do {
cur_upload_bytes = tmp_file_dir.cur_upload_bytes;
new_cur_upload_bytes = cur_upload_bytes + upload_file_size;
if (!(new_cur_upload_bytes < tmp_file_dir.max_upload_bytes)) {
if (new_cur_upload_bytes > tmp_file_dir.max_upload_bytes) {
return false;
}
} while (!tmp_file_dir.cur_upload_bytes.compare_exchange_strong(cur_upload_bytes,
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class FileWriter {

virtual Status open() = 0;

// Normal close. Wait for all data to persist before returning.
virtual Status close() = 0;
// Normal close. If `sync` is true, wait for all data to persist before returning.
virtual Status close(bool sync = true) = 0;

// Abnormal close and remove this file.
virtual Status abort() = 0;
Expand Down
1 change: 0 additions & 1 deletion be/src/io/fs/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_
char* to = result.data;
bytes_req = std::min(bytes_req, _file_size - offset);
*bytes_read = 0;

while (bytes_req != 0) {
auto res = ::pread(_fd, to, bytes_req, offset);
if (UNLIKELY(-1 == res && errno != EINTR)) {
Expand Down
8 changes: 4 additions & 4 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Status LocalFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
return open_file_impl(path, reader);
}
Status s;
auto task = [&] {s = open_file_impl(path, reader);};
auto task = [&] { s = open_file_impl(path, reader); };
AsyncIO::run_task(task, io::FileSystemType::LOCAL);
return s;
}
Expand Down Expand Up @@ -128,14 +128,14 @@ Status LocalFileSystem::file_size(const Path& path, size_t* file_size) const {
if (bthread_self() == 0) {
return file_size_impl(path, file_size);
}

Status s;
auto task = [&] {s = file_size_impl(path, file_size);};
auto task = [&] { s = file_size_impl(path, file_size); };
AsyncIO::run_task(task, io::FileSystemType::LOCAL);
return s;
}

Status LocalFileSystem::file_size_impl(const Path &path, size_t *file_size) const {
Status LocalFileSystem::file_size_impl(const Path& path, size_t* file_size) const {
auto fs_path = absolute_path(path);
std::error_code ec;
*file_size = std::filesystem::file_size(fs_path, ec);
Expand Down
8 changes: 2 additions & 6 deletions be/src/io/fs/local_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,8 @@ Status LocalFileWriter::open() {
return Status::OK();
}

Status LocalFileWriter::close() {
return _close(true);
}

Status LocalFileWriter::abort() {
auto st = _close(false);
auto st = close(false);
io::global_local_filesystem()->delete_file(_path);
return st;
}
Expand Down Expand Up @@ -160,7 +156,7 @@ Status LocalFileWriter::finalize() {
return Status::OK();
}

Status LocalFileWriter::_close(bool sync) {
Status LocalFileWriter::close(bool sync) {
if (_closed) {
return Status::OK();
}
Expand Down
5 changes: 1 addition & 4 deletions be/src/io/fs/local_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LocalFileWriter final : public FileWriter {

Status open() override;

Status close() override;
Status close(bool sync = true) override;

Status abort() override;

Expand All @@ -49,9 +49,6 @@ class LocalFileWriter final : public FileWriter {

FileSystem* fs() const override { return _fs; }

private:
Status _close(bool sync);

private:
int _fd = -1; // owned
FileSystem* _fs;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "io/fs/s3_file_writer.h"
#include "util/async_io.h"
#include "util/doris_metrics.h"
#include "util/trace.h"

namespace doris {
namespace io {
Expand Down Expand Up @@ -96,7 +97,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
if (!client) {
return Status::InternalError("init s3 client error");
}
TRACE_START("read s3");
auto outcome = client->GetObject(request);
TRACE_FINISH("read s3");
if (!outcome.IsSuccess()) {
return Status::IOError("failed to read from {}: {}", _path.native(),
outcome.GetError().GetMessage());
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ Status S3FileWriter::abort() {
return Status::OK();
}

Status S3FileWriter::close() {
Status S3FileWriter::close(bool sync) {
if (_closed) {
return Status::OK();
}
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
DCHECK(_handle);
_closed = true;

{
if (sync) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_handle->WaitUntilFinished();
if (_handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) {
Expand Down Expand Up @@ -111,7 +111,7 @@ Status S3FileWriter::finalize() {
if (!client) {
return Status::InternalError("init s3 client error");
}
RETURN_IF_ERROR(_tmp_file_writer->close());
RETURN_IF_ERROR(_tmp_file_writer->close(false));
{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
auto tmp_file_mgr = TmpFileMgr::instance();
Expand All @@ -126,6 +126,7 @@ Status S3FileWriter::finalize() {
upload_start = upload_start, upload_speed = _upload_speed_bytes_s](
const Aws::Transfer::TransferManager*,
const std::shared_ptr<const Aws::Transfer::TransferHandle>& handle) {
handle->WaitUntilFinished();
if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED ||
!tmp_file_mgr->insert_tmp_file(path, size)) {
global_local_filesystem()->delete_file(path);
Expand Down
4 changes: 3 additions & 1 deletion be/src/io/fs/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class S3FileWriter final : public FileWriter {

Status open() override;

Status close() override;
Status close(bool sync = true) override;

Status abort() override;

Expand All @@ -59,7 +59,9 @@ class S3FileWriter final : public FileWriter {

size_t bytes_appended() const override { return _tmp_file_writer->bytes_appended(); }


FileSystem* fs() const override { return _fs; }

int64_t upload_speed_bytes_s() const { return *_upload_speed_bytes_s; }

private:
Expand Down
Loading

0 comments on commit 96c230b

Please sign in to comment.