From 80999ba73bb85a90f85531fea16d021cc8d46dbe Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Tue, 21 Jun 2022 10:16:32 +0800 Subject: [PATCH 1/3] Revert "Revise default background threads size (#4723)" This reverts commit a79ad91e8b3b8fe8da6b447f4ab46206e94a3971. --- dbms/src/Core/Defines.h | 1 + dbms/src/Interpreters/Context.cpp | 24 ++++---------- dbms/src/Interpreters/Context.h | 4 +-- dbms/src/Interpreters/Settings.h | 6 ++-- dbms/src/Server/Server.cpp | 33 ++++++++----------- .../src/Storages/BackgroundProcessingPool.cpp | 3 -- dbms/src/Storages/BackgroundProcessingPool.h | 4 +-- dbms/src/TestUtils/TiFlashTestEnv.cpp | 6 ---- 8 files changed, 27 insertions(+), 54 deletions(-) diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 75f6f16bb25..33d116dae33 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -78,6 +78,7 @@ /// too short a period can cause errors to disappear immediately after creation. #define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2 * DBMS_DEFAULT_SEND_TIMEOUT_SEC) #define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue. +#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16 #define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032 #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 2dbd495d2c4..44699a324f4 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -68,8 +68,10 @@ #include #include +#include #include -#include +#include + namespace ProfileEvents { @@ -1439,33 +1441,19 @@ void Context::dropCaches() const } BackgroundProcessingPool & Context::getBackgroundPool() -{ - // Note: shared->background_pool should be initialized first. - auto lock = getLock(); - return *shared->background_pool; -} - -BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size) { auto lock = getLock(); if (!shared->background_pool) - shared->background_pool = std::make_shared(pool_size); + shared->background_pool = std::make_shared(settings.background_pool_size); return *shared->background_pool; } BackgroundProcessingPool & Context::getBlockableBackgroundPool() { - // TODO: maybe a better name for the pool - // Note: shared->blockable_background_pool should be initialized first. - auto lock = getLock(); - return *shared->blockable_background_pool; -} - -BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size) -{ + // TODO: choose a better thread pool size and maybe a better name for the pool auto lock = getLock(); if (!shared->blockable_background_pool) - shared->blockable_background_pool = std::make_shared(pool_size); + shared->blockable_background_pool = std::make_shared(settings.background_pool_size); return *shared->blockable_background_pool; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 434179e1ab8..b6e759e364b 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -380,9 +380,7 @@ class Context bool useL0Opt() const; BackgroundProcessingPool & getBackgroundPool(); - BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size); BackgroundProcessingPool & getBlockableBackgroundPool(); - BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size); void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config); @@ -507,7 +505,7 @@ class DDLGuard class SessionCleaner { public: - explicit SessionCleaner(Context & context_) + SessionCleaner(Context & context_) : context{context_} {} ~SessionCleaner(); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index f2b3bbbd7fe..9361e0525d2 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -80,8 +80,8 @@ struct Settings M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \ M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \ M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \ - M(SettingUInt64, background_pool_size, 0, "Number of threads performing background work for tables (for example, merging in merge tree). Only effective at server startup. " \ - "0 means a quarter of the number of logical CPU cores of the machine.") \ + M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server " \ + "startup.") \ \ M(SettingBool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.") \ \ @@ -356,7 +356,7 @@ struct Settings M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \ M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \ M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ - M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means the number of logical CPU cores. Only effective at server startup")\ + M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\ M(SettingBool, enable_async_server, true, "Enable async rpc server.") \ M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \ M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 186ab0889d8..1bb35e51866 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -53,15 +53,10 @@ #include #include #include -#include -#include -#include #include #include #include -#include #include -#include #include #include #include @@ -86,6 +81,12 @@ #include #include +#include "HTTPHandlerFactory.h" +#include "MetricsPrometheus.h" +#include "MetricsTransmitter.h" +#include "StatusFile.h" +#include "TCPHandlerFactory.h" + #if Poco_NetSSL_FOUND #include #include @@ -1127,19 +1128,6 @@ int Server::main(const std::vector & /*args*/) global_context->getPathCapacity(), global_context->getFileProvider()); - /// if default value of background_pool_size is 0 - /// set it to the a quarter of the number of logical CPU cores of machine. - Settings & settings = global_context->getSettingsRef(); - if (settings.background_pool_size == 0) - { - global_context->setSetting("background_pool_size", std::to_string(server_info.cpu_info.logical_cores / 4)); - } - LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size); - - /// Initialize the background & blockable background thread pool. - auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size); - auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size); - global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page); global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast(global_context->getPageStorageRunMode())); @@ -1256,6 +1244,13 @@ int Server::main(const std::vector & /*args*/) /// Load global settings from default_profile and system_profile. /// It internally depends on UserConfig::parseSettings. global_context->setDefaultProfiles(config()); + Settings & settings = global_context->getSettingsRef(); + + /// Initialize the background thread pool. + /// It internally depends on settings.background_pool_size, + /// so must be called after settings has been load. + auto & bg_pool = global_context->getBackgroundPool(); + auto & blockable_bg_pool = global_context->getBlockableBackgroundPool(); /// Initialize RateLimiter. global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool); @@ -1407,7 +1402,7 @@ int Server::main(const std::vector & /*args*/) { auto size = settings.grpc_completion_queue_pool_size; if (size == 0) - size = server_info.cpu_info.logical_cores; + size = std::thread::hardware_concurrency(); GRPCCompletionQueuePool::global_instance = std::make_unique(size); } diff --git a/dbms/src/Storages/BackgroundProcessingPool.cpp b/dbms/src/Storages/BackgroundProcessingPool.cpp index 15740fa2875..45ba032bf53 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/BackgroundProcessingPool.cpp @@ -86,9 +86,6 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_) , thread_ids_counter(size_) { - if (size <= 0) - throw Exception("BackgroundProcessingPool size must be greater than 0", ErrorCodes::LOGICAL_ERROR); - LOG_FMT_INFO(&Poco::Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with {} threads", size); threads.resize(size); diff --git a/dbms/src/Storages/BackgroundProcessingPool.h b/dbms/src/Storages/BackgroundProcessingPool.h index 49a01b3a397..1ba6c4efcf8 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.h +++ b/dbms/src/Storages/BackgroundProcessingPool.h @@ -81,7 +81,7 @@ class BackgroundProcessingPool using TaskHandle = std::shared_ptr; - explicit BackgroundProcessingPool(int size_); + BackgroundProcessingPool(int size_); size_t getNumberOfThreads() const { return size; } @@ -96,7 +96,7 @@ class BackgroundProcessingPool /// 2. thread B also get the same task /// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task /// 4. thread B find the task is not occupied and execute the task again almost immediately - TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0); + TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0); void removeTask(const TaskHandle & task); ~BackgroundProcessingPool(); diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index a7bcfe43d7a..cbd42b57550 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -24,8 +24,6 @@ #include #include -#include - namespace DB::tests { std::unique_ptr TiFlashTestEnv::global_context = nullptr; @@ -41,10 +39,6 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR KeyManagerPtr key_manager = std::make_shared(false); global_context->initializeFileProvider(key_manager, false); - // initialize background & blockable background thread pool - global_context->initializeBackgroundPool(std::thread::hardware_concurrency() / 4); - global_context->initializeBlockableBackgroundPool(std::thread::hardware_concurrency() / 4); - // Theses global variables should be initialized by the following order // 1. capacity // 2. path pool From 036da0211efa474d4ae82655bf40857bd824db1a Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Wed, 22 Jun 2022 14:24:28 +0800 Subject: [PATCH 2/3] refactor the initialize of background pool Signed-off-by: Lloyd-Pottiger --- dbms/src/Interpreters/Context.cpp | 26 ++++++++++++++------ dbms/src/Interpreters/Context.h | 4 ++- dbms/src/Server/Server.cpp | 26 +++++++++----------- dbms/src/Storages/BackgroundProcessingPool.h | 4 +-- dbms/src/TestUtils/TiFlashTestEnv.cpp | 5 ++++ 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 44699a324f4..7cd0cb5ad53 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -68,9 +68,9 @@ #include #include -#include #include #include +#include namespace ProfileEvents @@ -78,8 +78,6 @@ namespace ProfileEvents extern const Event ContextLock; } -#include - namespace CurrentMetrics { extern const Metric GlobalStorageRunMode; @@ -1440,20 +1438,32 @@ void Context::dropCaches() const shared->mark_cache->reset(); } -BackgroundProcessingPool & Context::getBackgroundPool() +BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size) { auto lock = getLock(); if (!shared->background_pool) - shared->background_pool = std::make_shared(settings.background_pool_size); + shared->background_pool = std::make_shared(pool_size); return *shared->background_pool; } -BackgroundProcessingPool & Context::getBlockableBackgroundPool() +BackgroundProcessingPool & Context::getBackgroundPool() +{ + auto lock = getLock(); + return *shared->background_pool; +} + +BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size) { - // TODO: choose a better thread pool size and maybe a better name for the pool auto lock = getLock(); if (!shared->blockable_background_pool) - shared->blockable_background_pool = std::make_shared(settings.background_pool_size); + shared->blockable_background_pool = std::make_shared(pool_size); + return *shared->blockable_background_pool; +} + +BackgroundProcessingPool & Context::getBlockableBackgroundPool() +{ + // TODO: maybe a better name for the pool + auto lock = getLock(); return *shared->blockable_background_pool; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index b6e759e364b..7663b40f612 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -379,7 +379,9 @@ class Context void setUseL0Opt(bool use_l0_opt); bool useL0Opt() const; + BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size); BackgroundProcessingPool & getBackgroundPool(); + BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size); BackgroundProcessingPool & getBlockableBackgroundPool(); void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config); @@ -505,7 +507,7 @@ class DDLGuard class SessionCleaner { public: - SessionCleaner(Context & context_) + explicit SessionCleaner(Context & context_) : context{context_} {} ~SessionCleaner(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 1bb35e51866..2d12fb98fbf 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -53,10 +53,15 @@ #include #include #include +#include +#include +#include #include #include #include +#include #include +#include #include #include #include @@ -81,12 +86,6 @@ #include #include -#include "HTTPHandlerFactory.h" -#include "MetricsPrometheus.h" -#include "MetricsTransmitter.h" -#include "StatusFile.h" -#include "TCPHandlerFactory.h" - #if Poco_NetSSL_FOUND #include #include @@ -1128,6 +1127,12 @@ int Server::main(const std::vector & /*args*/) global_context->getPathCapacity(), global_context->getFileProvider()); + /// Initialize the background & blockable background thread pool. + Settings & settings = global_context->getSettingsRef(); + LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size); + auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size); + auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size); + global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page); global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast(global_context->getPageStorageRunMode())); @@ -1244,13 +1249,6 @@ int Server::main(const std::vector & /*args*/) /// Load global settings from default_profile and system_profile. /// It internally depends on UserConfig::parseSettings. global_context->setDefaultProfiles(config()); - Settings & settings = global_context->getSettingsRef(); - - /// Initialize the background thread pool. - /// It internally depends on settings.background_pool_size, - /// so must be called after settings has been load. - auto & bg_pool = global_context->getBackgroundPool(); - auto & blockable_bg_pool = global_context->getBlockableBackgroundPool(); /// Initialize RateLimiter. global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool); @@ -1402,7 +1400,7 @@ int Server::main(const std::vector & /*args*/) { auto size = settings.grpc_completion_queue_pool_size; if (size == 0) - size = std::thread::hardware_concurrency(); + size = server_info.cpu_info.logical_cores; GRPCCompletionQueuePool::global_instance = std::make_unique(size); } diff --git a/dbms/src/Storages/BackgroundProcessingPool.h b/dbms/src/Storages/BackgroundProcessingPool.h index 1ba6c4efcf8..49a01b3a397 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.h +++ b/dbms/src/Storages/BackgroundProcessingPool.h @@ -81,7 +81,7 @@ class BackgroundProcessingPool using TaskHandle = std::shared_ptr; - BackgroundProcessingPool(int size_); + explicit BackgroundProcessingPool(int size_); size_t getNumberOfThreads() const { return size; } @@ -96,7 +96,7 @@ class BackgroundProcessingPool /// 2. thread B also get the same task /// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task /// 4. thread B find the task is not occupied and execute the task again almost immediately - TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0); + TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0); void removeTask(const TaskHandle & task); ~BackgroundProcessingPool(); diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index cbd42b57550..f44298cbafd 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -39,6 +39,11 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR KeyManagerPtr key_manager = std::make_shared(false); global_context->initializeFileProvider(key_manager, false); + // initialize background & blockable background thread pool + Settings & settings = global_context->getSettingsRef(); + global_context->initializeBackgroundPool(settings.background_pool_size); + global_context->initializeBlockableBackgroundPool(settings.background_pool_size); + // Theses global variables should be initialized by the following order // 1. capacity // 2. path pool From 3224ef8753ef139ee6f4c3c956521c2230c22602 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Tue, 5 Jul 2022 13:27:53 +0800 Subject: [PATCH 3/3] Update dbms/src/Server/Server.cpp --- dbms/src/Server/Server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 2d12fb98fbf..901248c7f6d 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1400,7 +1400,7 @@ int Server::main(const std::vector & /*args*/) { auto size = settings.grpc_completion_queue_pool_size; if (size == 0) - size = server_info.cpu_info.logical_cores; + size = std::thread::hardware_concurrency(); GRPCCompletionQueuePool::global_instance = std::make_unique(size); }