From 72da32852b6a77e68ad04eefe72ee73511cf131a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 5 Jul 2023 16:15:43 +0800 Subject: [PATCH] limit the queued task number and queued duration of coprocess task. (#6394) (#7739) ref pingcap/tiflash#6438, close pingcap/tiflash#7747 --- contrib/client-c | 2 +- dbms/src/Flash/FlashService.cpp | 24 ++++++++++++++++++++++++ dbms/src/Interpreters/Settings.h | 2 ++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/contrib/client-c b/contrib/client-c index 9563b2d37f4..425148c0392 160000 --- a/contrib/client-c +++ b/contrib/client-c @@ -1 +1 @@ -Subproject commit 9563b2d37f410f356ecc3f9af7ad666a79839c34 +Subproject commit 425148c03929f4c960e50ed2c877f6695b105278 diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 4fb64a8802b..c1e647bc160 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -138,7 +138,31 @@ grpc::Status FlashService::Coprocessor( context->setMockStorage(mock_storage); + const auto & settings = context->getSettingsRef(); + auto handle_limit = settings.cop_pool_handle_limit != 0 ? settings.cop_pool_handle_limit.get() : 10 * cop_pool->size(); + auto max_queued_duration_seconds = std::min(settings.cop_pool_max_queued_seconds, 20); + + if (handle_limit > 0) + { + // We use this atomic variable metrics from the prometheus-cpp library to mark the number of queued queries. + // TODO: Use grpc asynchronous server and a more fully-featured thread pool. + if (auto current = GET_METRIC(tiflash_coprocessor_handling_request_count, type_cop).Value(); current > handle_limit) + { + response->mutable_region_error()->mutable_server_is_busy()->set_reason(fmt::format("tiflash cop pool queued too much, current = {}, limit = {}", current, handle_limit)); + return grpc::Status::OK; + } + } + + grpc::Status ret = executeInThreadPool(*cop_pool, [&] { + if (max_queued_duration_seconds > 0) + { + if (auto current = watch.elapsedSeconds(); current > max_queued_duration_seconds) + { + response->mutable_region_error()->mutable_server_is_busy()->set_reason(fmt::format("this task queued in tiflash cop pool too long, current = {}, limit = {}", current, max_queued_duration_seconds)); + return grpc::Status::OK; + } + } auto [db_context, status] = createDBContext(grpc_context); if (!status.ok()) { diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index bdb552fc998..2e93d4f2495 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -65,6 +65,8 @@ struct Settings M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.") \ M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.") \ M(SettingUInt64, cop_pool_size, 0, "The number of threads to handle cop requests. By default, it is determined automatically.") \ + M(SettingInt64, cop_pool_handle_limit, 0, "The maximum number of requests can be handled by cop pool, include executing and queuing tasks. More cop requests will get error \"TiFlash Server is Busy\". -1 means unlimited, 0 means determined automatically (10 times of cop-pool-size).") \ + M(SettingInt64, cop_pool_max_queued_seconds, 15, "The maximum queuing duration of coprocessor task, unit is second. When task starts to run, it checks whether queued more than this config, if so, it will directly return error \"TiFlash Server is Busy\". <=0 means unlimited, default is 15. The upper limit of this config is 20.") \ M(SettingUInt64, batch_cop_pool_size, 0, "The number of threads to handle batch cop requests. By default, it is determined automatically.") \ M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.") \ M(SettingUInt64, max_distributed_connections, DEFAULT_MAX_DISTRIBUTED_CONNECTIONS, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \