Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor initialize of background pool #5190

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,16 @@
#include <fmt/core.h>

#include <boost/functional/hash/hash.hpp>
#include <map>
#include <pcg_random.hpp>
#include <set>
#include <unordered_map>


namespace ProfileEvents
{
extern const Event ContextLock;
}

#include <set>

namespace CurrentMetrics
{
extern const Metric GlobalStorageRunMode;
Expand Down Expand Up @@ -1440,20 +1438,32 @@ void Context::dropCaches() const
shared->mark_cache->reset();
}

BackgroundProcessingPool & Context::getBackgroundPool()
BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size)
{
auto lock = getLock();
if (!shared->background_pool)
shared->background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
shared->background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
return *shared->background_pool;
}

BackgroundProcessingPool & Context::getBlockableBackgroundPool()
BackgroundProcessingPool & Context::getBackgroundPool()
{
auto lock = getLock();
return *shared->background_pool;
}

BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size)
{
// TODO: choose a better thread pool size and maybe a better name for the pool
auto lock = getLock();
if (!shared->blockable_background_pool)
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
return *shared->blockable_background_pool;
}

BackgroundProcessingPool & Context::getBlockableBackgroundPool()
{
// TODO: maybe a better name for the pool
auto lock = getLock();
return *shared->blockable_background_pool;
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ class Context
void setUseL0Opt(bool use_l0_opt);
bool useL0Opt() const;

BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBlockableBackgroundPool();

void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config);
Expand Down Expand Up @@ -505,7 +507,7 @@ class DDLGuard
class SessionCleaner
{
public:
SessionCleaner(Context & context_)
explicit SessionCleaner(Context & context_)
: context{context_}
{}
~SessionCleaner();
Expand Down
26 changes: 12 additions & 14 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,15 @@
#include <Poco/Net/NetException.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/MetricsPrometheus.h>
#include <Server/MetricsTransmitter.h>
#include <Server/RaftConfigParser.h>
#include <Server/Server.h>
#include <Server/ServerInfo.h>
#include <Server/StatusFile.h>
#include <Server/StorageConfigParser.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/UserConfigParser.h>
#include <Storages/FormatVersion.h>
#include <Storages/IManageableStorage.h>
Expand All @@ -81,12 +86,6 @@
#include <limits>
#include <memory>

#include "HTTPHandlerFactory.h"
#include "MetricsPrometheus.h"
#include "MetricsTransmitter.h"
#include "StatusFile.h"
#include "TCPHandlerFactory.h"

#if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h>
#include <Poco/Net/SecureServerSocket.h>
Expand Down Expand Up @@ -1128,6 +1127,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getPathCapacity(),
global_context->getFileProvider());

/// Initialize the background & blockable background thread pool.
Settings & settings = global_context->getSettingsRef();
LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size);
auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size);
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);

global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page);
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));
Expand Down Expand Up @@ -1244,13 +1249,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
global_context->setDefaultProfiles(config());
Settings & settings = global_context->getSettingsRef();

/// Initialize the background thread pool.
/// It internally depends on settings.background_pool_size,
/// so must be called after settings has been load.
auto & bg_pool = global_context->getBackgroundPool();
auto & blockable_bg_pool = global_context->getBlockableBackgroundPool();

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);
Expand Down Expand Up @@ -1402,7 +1400,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = std::thread::hardware_concurrency();
size = server_info.cpu_info.logical_cores;
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class BackgroundProcessingPool
using TaskHandle = std::shared_ptr<TaskInfo>;


BackgroundProcessingPool(int size_);
explicit BackgroundProcessingPool(int size_);

size_t getNumberOfThreads() const { return size; }

Expand All @@ -96,7 +96,7 @@ class BackgroundProcessingPool
/// 2. thread B also get the same task
/// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task
/// 4. thread B find the task is not occupied and execute the task again almost immediately
TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0);
TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

~BackgroundProcessingPool();
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/TestUtils/TiFlashTestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR
KeyManagerPtr key_manager = std::make_shared<MockKeyManager>(false);
global_context->initializeFileProvider(key_manager, false);

// initialize background & blockable background thread pool
Settings & settings = global_context->getSettingsRef();
global_context->initializeBackgroundPool(settings.background_pool_size);
global_context->initializeBlockableBackgroundPool(settings.background_pool_size);

// Theses global variables should be initialized by the following order
// 1. capacity
// 2. path pool
Expand Down