Skip to content

Commit

Permalink
Merge branch 'master' into show_proc_node_priv
Browse files Browse the repository at this point in the history
  • Loading branch information
zddr authored Mar 25, 2024
2 parents c76bf4d + 69821c0 commit 831734e
Show file tree
Hide file tree
Showing 241 changed files with 7,517 additions and 1,536 deletions.
4 changes: 2 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed");
TaskWorkerPool::TaskWorkerPool(std::string_view name, int worker_count,
std::function<void(const TAgentTaskRequest& task)> callback)
: _callback(std::move(callback)) {
auto st = ThreadPoolBuilder(fmt::format("TaskWorkerPool.{}", name))
auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name))
.set_min_threads(worker_count)
.set_max_threads(worker_count)
.build(&_thread_pool);
Expand Down Expand Up @@ -512,7 +512,7 @@ PriorTaskWorkerPool::PriorTaskWorkerPool(
std::string_view name, int normal_worker_count, int high_prior_worker_conut,
std::function<void(const TAgentTaskRequest& task)> callback)
: _callback(std::move(callback)) {
auto st = ThreadPoolBuilder(fmt::format("TaskWorkerPool.{}", name))
auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name))
.set_min_threads(normal_worker_count)
.set_max_threads(normal_worker_count)
.build(&_normal_pool);
Expand Down
47 changes: 15 additions & 32 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <random>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -314,7 +315,6 @@ DEFINE_mInt32(trash_file_expire_time_sec, "259200");
// modify them upon necessity
DEFINE_Int32(min_file_descriptor_number, "60000");
DEFINE_mBool(disable_segment_cache, "false");
DEFINE_Int64(index_stream_cache_capacity, "10737418240");
DEFINE_String(row_cache_mem_limit, "20%");

// Cache for storage page size
Expand Down Expand Up @@ -471,8 +471,6 @@ DEFINE_String(ssl_private_key_path, "");
DEFINE_Bool(enable_all_http_auth, "false");
// Number of webserver workers
DEFINE_Int32(webserver_num_workers, "48");
// Period to update rate counters and sampling counters in ms.
DEFINE_mInt32(periodic_counter_update_period_ms, "500");

DEFINE_Bool(enable_single_replica_load, "true");
// Number of download workers for single replica load
Expand Down Expand Up @@ -540,10 +538,6 @@ DEFINE_Int32(num_threads_per_disk, "0");
DEFINE_Int32(read_size, "8388608"); // 8 * 1024 * 1024, Read Size (in bytes)
DEFINE_Int32(min_buffer_size, "1024"); // 1024, The minimum read buffer size (in bytes)

// For each io buffer size, the maximum number of buffers the IoMgr will hold onto
// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
DEFINE_Int32(max_free_io_buffers, "128");

// for pprof
DEFINE_String(pprof_profile_dir, "${DORIS_HOME}/log");
// for jeprofile in jemalloc
Expand All @@ -564,12 +558,6 @@ DEFINE_Int32(num_cores, "0");
// Otherwise, we will ignore the broken disk,
DEFINE_Bool(ignore_broken_disk, "false");

// linux transparent huge page
DEFINE_Bool(madvise_huge_pages, "false");

// whether use mmap to allocate memory
DEFINE_Bool(mmap_buffers, "false");

// Sleep time in milliseconds between memory maintenance iterations
DEFINE_mInt32(memory_maintenance_sleep_time_ms, "100");

Expand All @@ -586,9 +574,6 @@ DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
// percent of (active memtables size / all memtables size) when reach soft limit
DEFINE_mInt32(memtable_soft_limit_active_percent, "50");

// Alignment
DEFINE_Int32(memory_max_alignment, "16");

// memtable insert memory tracker will multiply input block size with this ratio
DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
// max write buffer size before flush, default 200MB
Expand Down Expand Up @@ -814,9 +799,6 @@ DEFINE_mInt32(jdbc_connection_pool_cache_clear_time_sec, "28800");
DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800");

// s3 config
DEFINE_mInt32(max_remote_storage_count, "10");

// reference https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility
// If the dependent kafka broker version older than 0.10.0.0,
// the value of kafka_api_version_request should be false, and the
Expand Down Expand Up @@ -902,14 +884,6 @@ DEFINE_mInt32(remove_unused_remote_files_interval_sec, "21600"); // 6h
DEFINE_mInt32(confirm_unused_remote_files_interval_sec, "60");
DEFINE_Int32(cold_data_compaction_thread_num, "2");
DEFINE_mInt32(cold_data_compaction_interval_sec, "1800");
DEFINE_Int32(concurrency_per_dir, "2");
// file_cache_type is used to set the type of file cache for remote files.
// "": no cache, "sub_file_cache": split sub files from remote file.
// "whole_file_cache": the whole file.
DEFINE_mString(file_cache_type, "file_block_cache");
DEFINE_Validator(file_cache_type, [](std::string_view config) -> bool {
return config.empty() || config == "file_block_cache";
});

DEFINE_String(tmp_file_dir, "tmp");

Expand Down Expand Up @@ -1637,15 +1611,24 @@ void update_config(const std::string& field, const std::string& value) {

Status set_fuzzy_configs() {
std::unordered_map<std::string, std::string> fuzzy_field_and_value;
std::shared_ptr<std::mt19937_64> generator(new std::mt19937_64());
generator->seed(std::random_device()());
std::uniform_int_distribution<int64_t> distribution(0, 100);

// if have set enable_fuzzy_mode=true in be.conf, will fuzzy those field and values
fuzzy_field_and_value["disable_storage_page_cache"] = ((rand() % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["enable_system_metrics"] = ((rand() % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["disable_storage_page_cache"] =
((distribution(*generator) % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["enable_system_metrics"] =
((distribution(*generator) % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["enable_set_in_bitmap_value"] =
((distribution(*generator) % 2) == 0) ? "true" : "false";
fuzzy_field_and_value["enable_shrink_memory"] =
((distribution(*generator) % 2) == 0) ? "true" : "false";

fmt::memory_buffer buf;
for (auto it = fuzzy_field_and_value.begin(); it != fuzzy_field_and_value.end(); it++) {
const auto& field = it->first;
const auto& value = it->second;
for (auto& it : fuzzy_field_and_value) {
const auto& field = it.first;
const auto& value = it.second;
RETURN_IF_ERROR(set_config(field, value, false, true));
fmt::format_to(buf, "{}={}, ", field, value);
}
Expand Down
24 changes: 0 additions & 24 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ DECLARE_mInt32(trash_file_expire_time_sec);
// modify them upon necessity
DECLARE_Int32(min_file_descriptor_number);
DECLARE_mBool(disable_segment_cache);
DECLARE_Int64(index_stream_cache_capacity);
DECLARE_String(row_cache_mem_limit);

// Cache for storage page size
Expand Down Expand Up @@ -514,8 +513,6 @@ DECLARE_String(ssl_private_key_path);
DECLARE_Bool(enable_all_http_auth);
// Number of webserver workers
DECLARE_Int32(webserver_num_workers);
// Period to update rate counters and sampling counters in ms.
DECLARE_mInt32(periodic_counter_update_period_ms);

DECLARE_Bool(enable_single_replica_load);
// Number of download workers for single replica load
Expand Down Expand Up @@ -590,10 +587,6 @@ DECLARE_Int32(num_threads_per_disk);
DECLARE_Int32(read_size); // 8 * 1024 * 1024, Read Size (in bytes)
DECLARE_Int32(min_buffer_size); // 1024, The minimum read buffer size (in bytes)

// For each io buffer size, the maximum number of buffers the IoMgr will hold onto
// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
DECLARE_Int32(max_free_io_buffers);

// for pprof
DECLARE_String(pprof_profile_dir);
// for jeprofile in jemalloc
Expand All @@ -615,12 +608,6 @@ DECLARE_Int32(num_cores);
// Otherwise, we will ignore the broken disk,
DECLARE_Bool(ignore_broken_disk);

// linux transparent huge page
DECLARE_Bool(madvise_huge_pages);

// whether use mmap to allocate memory
DECLARE_Bool(mmap_buffers);

// Sleep time in milliseconds between memory maintenance iterations
DECLARE_mInt32(memory_maintenance_sleep_time_ms);

Expand All @@ -637,9 +624,6 @@ DECLARE_mInt32(memtable_hard_limit_active_percent);
// percent of (active memtables size / all memtables size) when reach soft limit
DECLARE_mInt32(memtable_soft_limit_active_percent);

// Alignment
DECLARE_Int32(memory_max_alignment);

// memtable insert memory tracker will multiply input block size with this ratio
DECLARE_mDouble(memtable_insert_memory_ratio);
// max write buffer size before flush, default 200MB
Expand Down Expand Up @@ -871,9 +855,6 @@ DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec);
// A common object cache depends on an Sharded LRU Cache.
DECLARE_mInt32(common_obj_lru_cache_stale_sweep_time_sec);

// s3 config
DECLARE_mInt32(max_remote_storage_count);

// reference https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility
// If the dependent kafka broker version older than 0.10.0.0,
// the value of kafka_api_version_request should be false, and the
Expand Down Expand Up @@ -956,11 +937,6 @@ DECLARE_mInt32(remove_unused_remote_files_interval_sec); // 6h
DECLARE_mInt32(confirm_unused_remote_files_interval_sec);
DECLARE_Int32(cold_data_compaction_thread_num);
DECLARE_mInt32(cold_data_compaction_interval_sec);
DECLARE_Int32(concurrency_per_dir);
// file_cache_type is used to set the type of file cache for remote files.
// "": no cache, "sub_file_cache": split sub files from remote file.
// "whole_file_cache": the whole file.
DECLARE_mString(file_cache_type);

DECLARE_Int32(s3_transfer_executor_pool_size);

Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "exec/schema_scanner/schema_files_scanner.h"
#include "exec/schema_scanner/schema_metadata_name_ids_scanner.h"
#include "exec/schema_scanner/schema_partitions_scanner.h"
#include "exec/schema_scanner/schema_processlist_scanner.h"
#include "exec/schema_scanner/schema_profiling_scanner.h"
#include "exec/schema_scanner/schema_rowsets_scanner.h"
#include "exec/schema_scanner/schema_schema_privileges_scanner.h"
Expand Down Expand Up @@ -158,6 +159,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaActiveQueriesScanner::create_unique();
case TSchemaTableType::SCH_WORKLOAD_GROUPS:
return SchemaWorkloadGroupsScanner::create_unique();
case TSchemaTableType::SCH_PROCESSLIST:
return SchemaProcessListScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
11 changes: 11 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class TListPrivilegesResult;
class TListTableStatusResult;
class TShowVariableRequest;
class TShowVariableResult;
class TShowProcessListRequest;
class TShowProcessListResult;

Status SchemaHelper::get_db_names(const std::string& ip, const int32_t port,
const TGetDbsParams& request, TGetDbsResult* result) {
Expand Down Expand Up @@ -123,4 +125,13 @@ std::string SchemaHelper::extract_db_name(const std::string& full_name) {
return std::string(full_name.c_str() + found, full_name.size() - found);
}

Status SchemaHelper::show_process_list(const std::string& ip, const int32_t port,
const TShowProcessListRequest& request,
TShowProcessListResult* result) {
return ThriftRpcHelper::rpc<FrontendServiceClient>(
ip, port, [&request, &result](FrontendServiceConnection& client) {
client->showProcessList(*result, request);
});
}

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class TListTableStatusResult;
class TListTableMetadataNameIdsResult;
class TShowVariableRequest;
class TShowVariableResult;
class TShowProcessListRequest;
class TShowProcessListResult;

// this class is a helper for getting schema info from FE
class SchemaHelper {
Expand Down Expand Up @@ -76,6 +78,10 @@ class SchemaHelper {
TListPrivilegesResult* privileges_result);

static std::string extract_db_name(const std::string& full_name);

static Status show_process_list(const std::string& ip, const int32_t port,
const TShowProcessListRequest& request,
TShowProcessListResult* result);
};

} // namespace doris
Loading

0 comments on commit 831734e

Please sign in to comment.