Skip to content

Commit

Permalink
Merge branch 'master' into ugi_opt
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz authored Jul 11, 2024
2 parents 5e019d2 + 5367f7e commit f65435a
Show file tree
Hide file tree
Showing 623 changed files with 16,666 additions and 11,125 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ header:
- "**/*.sql"
- "**/*.lock"
- "**/*.out"
- "**/*.parquet"
- "docs/.markdownlintignore"
- "fe/fe-core/src/test/resources/data/net_snmp_normal"
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4"
Expand Down
5 changes: 5 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ set(BASE_DIR "${CMAKE_CURRENT_SOURCE_DIR}")
set(ENV{DORIS_HOME} "${BASE_DIR}/..")
set(BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}")
set(GENSRC_DIR "${BASE_DIR}/../gensrc/build/")
set(COMMON_SRC_DIR "${BASE_DIR}/../common")
set(SRC_DIR "${BASE_DIR}/src/")
set(TEST_DIR "${CMAKE_SOURCE_DIR}/test/")
set(OUTPUT_DIR "${BASE_DIR}/output")
Expand Down Expand Up @@ -436,6 +437,7 @@ include_directories(

include_directories(
SYSTEM
${COMMON_SRC_DIR}
${GENSRC_DIR}/
${THIRDPARTY_DIR}/include
${GPERFTOOLS_HOME}/include
Expand Down Expand Up @@ -500,6 +502,7 @@ set(DORIS_LINK_LIBS
Pipeline
Cloud
${WL_END_GROUP}
CommonCPP
)

set(absl_DIR ${THIRDPARTY_DIR}/lib/cmake/absl)
Expand Down Expand Up @@ -765,6 +768,8 @@ if (MAKE_TEST)
add_subdirectory(${TEST_DIR})
endif ()

add_subdirectory(${COMMON_SRC_DIR}/cpp ${BUILD_DIR}/src/common_cpp)

# Install be
install(DIRECTORY DESTINATION ${OUTPUT_DIR})
install(DIRECTORY DESTINATION ${OUTPUT_DIR}/bin)
Expand Down
1 change: 0 additions & 1 deletion be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ if (USE_JEMALLOC)
else()
add_thirdparty(tcmalloc WHOLELIBPATH ${GPERFTOOLS_HOME}/lib/libtcmalloc.a NOTADD)
endif()
add_thirdparty(jemalloc_arrow LIBNAME "lib/libjemalloc_arrow.a")

if (WITH_MYSQL)
add_thirdparty(mysql LIBNAME "lib/libmysqlclient.a")
Expand Down
11 changes: 10 additions & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/sync_point.h"
#include "cpp/sync_point.h"
#include "gen_cpp/cloud.pb.h"
#include "olap/compaction.h"
#include "olap/task/engine_checksum_task.h"
Expand Down Expand Up @@ -297,6 +297,15 @@ Status CloudBaseCompaction::modify_rowsets() {
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("initiator", initiator)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
11 changes: 10 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/sync_point.h"
#include "cpp/sync_point.h"
#include "gen_cpp/cloud.pb.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_policy.h"
Expand Down Expand Up @@ -242,6 +242,15 @@ Status CloudCumulativeCompaction::modify_rowsets() {
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("initiator", initiator)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/sync_point.h"
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down
12 changes: 11 additions & 1 deletion be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "common/sync_point.h"
#include "cpp/sync_point.h"
#include "gen_cpp/cloud.pb.h"
#include "olap/compaction.h"
#include "olap/rowset/beta_rowset.h"
Expand Down Expand Up @@ -327,6 +327,16 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
}
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*cloud_tablet(), -1, initiator,
delete_bitmap.get()));
LOG_INFO("update delete bitmap in CloudFullCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("initiator", initiator)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("update_bitmap_size", delete_bitmap->delete_bitmap.size());
_tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
return Status::OK();
}
Expand Down
9 changes: 7 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
#include "cloud/pb_convert.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/sync_point.h"
#include "cpp/sync_point.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/Types_types.h"
Expand Down Expand Up @@ -747,7 +747,12 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);

RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(true);
// Variant schema maybe updated, so we need to update the schema as well.
// The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap`
// will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset
// for variant type
bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0;
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema);
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
Status st =
retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset);
Expand Down
7 changes: 5 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,11 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write
RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = std::make_shared<TabletSchema>();
context.tablet_schema->copy_from(*(rowset.tablet_schema()));
// During a partial update, the extracted columns of a variant should not be included in the tablet schema.
// This is because the partial update for a variant needs to ignore the extracted columns.
// Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
// the complete variant is constructed by reading all the sub-columns of the variant.
context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <shared_mutex>

#include "common/status.h"
#include "common/sync_point.h"
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
#include "olap/tablet_meta.h"

Expand Down
33 changes: 23 additions & 10 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <mutex>

#include "common/status.h"
#include "common/sync_point.h"
#include "cpp/sync_point.h"
#include "http/http_channel.h"
#include "http/http_request.h"
#include "http/http_status.h"
Expand Down Expand Up @@ -213,17 +213,15 @@ void handle_set(HttpRequest* req) {
}

void handle_clear(HttpRequest* req) {
auto& point = req->param("name");
const auto& point = req->param("name");
auto* sp = SyncPoint::get_instance();
if (point.empty()) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty point name");
return;
}
auto sp = SyncPoint::get_instance();
if (point == "all") {
// If point name is emtpy, clear all
sp->clear_all_call_backs();
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
return;
}

sp->clear_call_back(point);
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}
Expand All @@ -244,12 +242,20 @@ void handle_suite(HttpRequest* req) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, "unknown suite: " + suite);
}

} // namespace

InjectionPointAction::InjectionPointAction() {
void handle_enable(HttpRequest* req) {
SyncPoint::get_instance()->enable_processing();
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

void handle_disable(HttpRequest* req) {
SyncPoint::get_instance()->disable_processing();
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

} // namespace

InjectionPointAction::InjectionPointAction() = default;

void InjectionPointAction::handle(HttpRequest* req) {
LOG(INFO) << req->debug_string();
auto& op = req->param("op");
Expand All @@ -262,7 +268,14 @@ void InjectionPointAction::handle(HttpRequest* req) {
} else if (op == "apply_suite") {
handle_suite(req);
return;
} else if (op == "enable") {
handle_enable(req);
return;
} else if (op == "disable") {
handle_disable(req);
return;
}

HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "unknown op: " + op);
}

Expand Down
14 changes: 10 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ DEFINE_mString(public_access_ip, "");
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
DEFINE_Int32(brpc_num_threads, "256");
// the time of brpc server keep idle connection, setting this value too small may cause rpc between backends to fail,
// the default value is set to -1, which means never close idle connection.
DEFINE_Int32(brpc_idle_timeout_sec, "-1");

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
Expand Down Expand Up @@ -118,6 +121,8 @@ DEFINE_mInt32(double_resize_threshold, "23");

DEFINE_Int64(max_sys_mem_available_low_water_mark_bytes, "6871947673");

DEFINE_Int64(memtable_limiter_reserved_memory_bytes, "838860800");

// The size of the memory that gc wants to release each time, as a percentage of the mem limit.
DEFINE_mString(process_minor_gc_size, "10%");
DEFINE_mString(process_full_gc_size, "20%");
Expand Down Expand Up @@ -1072,7 +1077,6 @@ DEFINE_mInt32(schema_cache_sweep_time_sec, "100");
DEFINE_mInt32(segment_cache_capacity, "-1");
DEFINE_mInt32(estimated_num_columns_per_segment, "200");
DEFINE_mInt32(estimated_mem_per_column_reader, "1024");
// The value is calculate by storage_page_cache_limit * index_page_cache_percentage
DEFINE_mInt32(segment_cache_memory_percentage, "2");

// enable feature binlog, default false
Expand Down Expand Up @@ -1165,6 +1169,9 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");

DEFINE_mBool(enable_be_proc_monitor, "false");
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");

DEFINE_mBool(enable_workload_group_memory_gc, "true");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
Expand Down Expand Up @@ -1262,9 +1269,6 @@ DEFINE_Validator(s3_client_http_scheme, [](const std::string& config) -> bool {
return config == "http" || config == "https";
});

// enable injection point in regression-test
DEFINE_mBool(enable_injection_point, "false");

DEFINE_mBool(ignore_schema_change_check, "false");

DEFINE_mInt64(string_overflow_size, "4294967295"); // std::numic_limits<uint32_t>::max()
Expand Down Expand Up @@ -1322,6 +1326,8 @@ DEFINE_mBool(enable_parquet_page_index, "true");

DEFINE_mBool(ignore_not_found_file_in_external_table, "true");

DEFINE_mBool(enable_hdfs_mem_limiter, "true");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
17 changes: 11 additions & 6 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ DECLARE_mString(public_access_ip);
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
DECLARE_Int32(brpc_num_threads);
DECLARE_Int32(brpc_idle_timeout_sec);

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
Expand Down Expand Up @@ -159,12 +160,15 @@ DECLARE_mInt32(max_fill_rate);
DECLARE_mInt32(double_resize_threshold);

// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 6.4G,
// actual low water mark=min(6.4G, MemTotal * 10%), avoid wasting too much memory on machines
// with large memory larger than 64G.
// Turn up max. On machines with more than 64G memory, more memory buffers will be reserved for Full GC.
// actual low water mark=min(6.4G, MemTotal * 5%), avoid wasting too much memory on machines
// with large memory larger than 128G.
// Turn up max. On machines with more than 128G memory, more memory buffers will be reserved for Full GC.
// Turn down max. will use as much memory as possible.
DECLARE_Int64(max_sys_mem_available_low_water_mark_bytes);

// reserve a small amount of memory so we do not trigger MinorGC
DECLARE_Int64(memtable_limiter_reserved_memory_bytes);

// The size of the memory that gc wants to release each time, as a percentage of the mem limit.
DECLARE_mString(process_minor_gc_size);
DECLARE_mString(process_full_gc_size);
Expand Down Expand Up @@ -1248,6 +1252,8 @@ DECLARE_mBool(exit_on_exception);

// cgroup
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_mBool(enable_be_proc_monitor);
DECLARE_mInt32(be_proc_monitor_interval_ms);

DECLARE_mBool(enable_workload_group_memory_gc);

Expand Down Expand Up @@ -1348,9 +1354,6 @@ DECLARE_mInt32(thrift_client_open_num_tries);
// http scheme in S3Client to use. E.g. http or https
DECLARE_String(s3_client_http_scheme);

// enable injection point in regression-test
DECLARE_mBool(enable_injection_point);

DECLARE_mBool(ignore_schema_change_check);

/** Only use in fuzzy test **/
Expand Down Expand Up @@ -1413,6 +1416,8 @@ DECLARE_mBool(enable_parquet_page_index);
// Default is true, if set to false, the not found file will result in query failure.
DECLARE_mBool(ignore_not_found_file_in_external_table);

DECLARE_mBool(enable_hdfs_mem_limiter);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
14 changes: 14 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/be_proc_monitor.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
Expand Down Expand Up @@ -399,6 +400,13 @@ void Daemon::wg_mem_used_refresh_thread() {
}
}

void Daemon::be_proc_monitor_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) {
LOG(INFO) << "log be thread num, " << BeProcMonitor::get_be_thread_info();
}
}

void Daemon::start() {
Status st;
st = Thread::create(
Expand Down Expand Up @@ -435,6 +443,12 @@ void Daemon::start() {
st = Thread::create(
"Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); },
&_threads.emplace_back());

if (config::enable_be_proc_monitor) {
st = Thread::create(
"Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); },
&_threads.emplace_back());
}
CHECK(st.ok()) << st;
}

Expand Down
Loading

0 comments on commit f65435a

Please sign in to comment.