Skip to content

Commit

Permalink
Merge branch 'master' into fix_agg_auto_spill_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Apr 8, 2024
2 parents 7c7f3b3 + 472d601 commit 2a2393b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Raft learner read failure reason counter", \
Counter, \
F(type_request_error, {{"type", "request_error"}}), \
F(type_request_error_legacy, {{"type", "request_error_legacy"}}), \
F(type_read_index_timeout, {{"type", "read_index_timeout"}}), \
F(type_not_found_tiflash, {{"type", "not_found_tiflash"}}), \
F(type_epoch_not_match, {{"type", "epoch_not_match"}}), \
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/KVStore/Read/Proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(
return batchReadIndex_v2(req, timeout_ms);
}

// Learner Read will use `ReadIndexDataNode` which will send read index requests in batch,
// so this function is not actually used after the optimization.
BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex_v2(
const std::vector<kvrpcpb::ReadIndexRequest> & req,
uint64_t timeout_ms) const
Expand All @@ -38,10 +40,10 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex_v2(
{
if (auto task = makeReadIndexTask(r); !task)
{
// The read index request is not sent successfully.
GET_METRIC(tiflash_raft_learner_read_failures_count, type_request_error).Increment();
kvrpcpb::ReadIndexResponse res;
res.mutable_region_error();
// The read index request is not sent successfully.
GET_METRIC(tiflash_raft_learner_read_failures_count, type_request_error_legacy).Increment();
resps.emplace_back(std::move(res), r.context().region_id());
}
else
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/Read/ReadIndexDataNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ void ReadIndexDataNode::runOneRound(const TiFlashRaftProxyHelper & helper, const
else
{
TEST_LOG_FMT("failed to make ReadIndexTask for region_id={} ts {}", region_id, max_ts);
GET_METRIC(tiflash_raft_learner_read_failures_count, type_request_error).Increment();
run_it = running_tasks.try_emplace(max_ts, region_id, max_ts).first;
run_it->second.resp.mutable_region_error();
}
Expand Down Expand Up @@ -182,8 +183,8 @@ void ReadIndexDataNode::ReadIndexElement::doPoll(
TEST_LOG_FMT("poll ReadIndexElement timeout for region_id={}", region_id);

clean_task = true;
resp.mutable_region_error()
->mutable_server_is_busy(); // set region_error `server_is_busy` for task timeout
// set region_error `server_is_busy` for task timeout
resp.mutable_region_error()->mutable_server_is_busy();
}
else
{
Expand Down
21 changes: 15 additions & 6 deletions dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ TEST(AsyncTasksTest, AsyncTasksNormal)
auto log = DB::Logger::get();
LOG_INFO(log, "Cancel and addTask");
// Cancel and addTask
// 3 -> 1 -> 4 -> 2
{
auto async_tasks = std::make_unique<TestAsyncTasks>(1, 1, 2);
auto m = std::make_shared<std::mutex>();
auto m2 = std::make_shared<std::mutex>();
int flag = 0;
std::unique_lock cl(*m);
async_tasks->addTask(1, [m, &flag, &async_tasks, &m2]() {
std::atomic_bool finished_flag = false;
async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() {
auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1);
std::scoped_lock rl2(*m2);
std::scoped_lock rl(*m);
std::scoped_lock rl(*m); // 2
SCOPE_EXIT({ finished_flag.store(true); });
// Run after `cl` is released.
if (cancel_handle->isCanceled())
{
return;
Expand All @@ -47,8 +49,15 @@ TEST(AsyncTasksTest, AsyncTasksNormal)
async_tasks->asyncCancelTask(1);
ASSERT_FALSE(async_tasks->isScheduled(1));
async_tasks->addTask(1, [&flag]() { flag = 2; });
cl.unlock();
std::scoped_lock rl2(*m2);
cl.unlock(); // Now can task 1 run.
int count = 0;
using namespace std::chrono_literals;
while (!finished_flag.load())
{
count += 1;
ASSERT(count < 6);
std::this_thread::sleep_for(200ms);
}
ASSERT_NO_THROW(async_tasks->fetchResult(1));
ASSERT_EQ(flag, 2);
}
Expand Down

0 comments on commit 2a2393b

Please sign in to comment.