diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 7d29442957d..7496107d889 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -543,6 +543,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "Raft learner read failure reason counter", \ Counter, \ F(type_request_error, {{"type", "request_error"}}), \ + F(type_request_error_legacy, {{"type", "request_error_legacy"}}), \ F(type_read_index_timeout, {{"type", "read_index_timeout"}}), \ F(type_not_found_tiflash, {{"type", "not_found_tiflash"}}), \ F(type_epoch_not_match, {{"type", "epoch_not_match"}}), \ diff --git a/dbms/src/Storages/KVStore/Read/Proxy.cpp b/dbms/src/Storages/KVStore/Read/Proxy.cpp index d8aaf2e3d39..6a416cb8953 100644 --- a/dbms/src/Storages/KVStore/Read/Proxy.cpp +++ b/dbms/src/Storages/KVStore/Read/Proxy.cpp @@ -23,6 +23,8 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex( return batchReadIndex_v2(req, timeout_ms); } +// Learner Read will use `ReadIndexDataNode` which will send read index requests in batch, +// so this function is not actually used after the optimization. BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex_v2( const std::vector & req, uint64_t timeout_ms) const @@ -38,10 +40,10 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex_v2( { if (auto task = makeReadIndexTask(r); !task) { - // The read index request is not sent successfully. - GET_METRIC(tiflash_raft_learner_read_failures_count, type_request_error).Increment(); kvrpcpb::ReadIndexResponse res; res.mutable_region_error(); + // The read index request is not sent successfully. + GET_METRIC(tiflash_raft_learner_read_failures_count, type_request_error_legacy).Increment(); resps.emplace_back(std::move(res), r.context().region_id()); } else diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexDataNode.cpp b/dbms/src/Storages/KVStore/Read/ReadIndexDataNode.cpp index 2e36c041b40..ddc942a0403 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexDataNode.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndexDataNode.cpp @@ -102,6 +102,7 @@ void ReadIndexDataNode::runOneRound(const TiFlashRaftProxyHelper & helper, const else { TEST_LOG_FMT("failed to make ReadIndexTask for region_id={} ts {}", region_id, max_ts); + GET_METRIC(tiflash_raft_learner_read_failures_count, type_request_error).Increment(); run_it = running_tasks.try_emplace(max_ts, region_id, max_ts).first; run_it->second.resp.mutable_region_error(); } @@ -182,8 +183,8 @@ void ReadIndexDataNode::ReadIndexElement::doPoll( TEST_LOG_FMT("poll ReadIndexElement timeout for region_id={}", region_id); clean_task = true; - resp.mutable_region_error() - ->mutable_server_is_busy(); // set region_error `server_is_busy` for task timeout + // set region_error `server_is_busy` for task timeout + resp.mutable_region_error()->mutable_server_is_busy(); } else { diff --git a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp index 1f57eed56cb..1160137161f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp @@ -28,16 +28,18 @@ TEST(AsyncTasksTest, AsyncTasksNormal) auto log = DB::Logger::get(); LOG_INFO(log, "Cancel and addTask"); // Cancel and addTask + // 3 -> 1 -> 4 -> 2 { auto async_tasks = std::make_unique(1, 1, 2); auto m = std::make_shared(); - auto m2 = std::make_shared(); int flag = 0; std::unique_lock cl(*m); - async_tasks->addTask(1, [m, &flag, &async_tasks, &m2]() { + std::atomic_bool finished_flag = false; + async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() { auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1); - std::scoped_lock rl2(*m2); - std::scoped_lock rl(*m); + std::scoped_lock rl(*m); // 2 + SCOPE_EXIT({ finished_flag.store(true); }); + // Run after `cl` is released. if (cancel_handle->isCanceled()) { return; @@ -47,8 +49,15 @@ TEST(AsyncTasksTest, AsyncTasksNormal) async_tasks->asyncCancelTask(1); ASSERT_FALSE(async_tasks->isScheduled(1)); async_tasks->addTask(1, [&flag]() { flag = 2; }); - cl.unlock(); - std::scoped_lock rl2(*m2); + cl.unlock(); // Now can task 1 run. + int count = 0; + using namespace std::chrono_literals; + while (!finished_flag.load()) + { + count += 1; + ASSERT(count < 6); + std::this_thread::sleep_for(200ms); + } ASSERT_NO_THROW(async_tasks->fetchResult(1)); ASSERT_EQ(flag, 2); }