Skip to content

Commit

Permalink
Disagg: Fix hang when canceling non-owner (#8070)
Browse files Browse the repository at this point in the history
close #8069
  • Loading branch information
JaySon-Huang authored Sep 8, 2023
1 parent 9dff564 commit 630d107
Show file tree
Hide file tree
Showing 9 changed files with 486 additions and 315 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/RaftLogManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ bool RaftLogEagerGcTasks::updateHint(
UInt64 applied_index,
UInt64 threshold)
{
if (applied_index < eager_truncated_index || applied_index - eager_truncated_index < threshold)
if (threshold == 0 //
|| applied_index < eager_truncated_index || applied_index - eager_truncated_index < threshold)
return false;

// Try to register a task for eager remove RaftLog to reduce the memory overhead of UniPS
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RaftLogManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ class RaftLogEagerGcTasks
// RegionID -> truncated index
using RaftLogGcTasksRes = std::unordered_map<RegionID, UInt64>;

RaftLogGcTasksRes executeRaftLogGcTasks(Context & global_ctx, RaftLogEagerGcTasks::Hints && hints);
[[nodiscard]] RaftLogGcTasksRes executeRaftLogGcTasks(Context & global_ctx, RaftLogEagerGcTasks::Hints && hints);

} // namespace DB
87 changes: 87 additions & 0 deletions dbms/src/Storages/Transaction/tests/gtest_raft_log_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Transaction/RaftLogManager.h>
#include <TestUtils/TiFlashTestBasic.h>

namespace DB::tests
{

TEST(RaftLogEagerGCTasksTest, Basic)
try
{
RaftLogEagerGcTasks tasks;

RegionID region_id = 1000;
// threshold == 0 always return false
ASSERT_FALSE(tasks.updateHint(region_id, /*eager_truncated_index=*/10, /*applied_index=*/10000, /*threshold=*/0));
ASSERT_FALSE(tasks.updateHint(region_id, /*eager_truncated_index=*/10000, /*applied_index=*/10, /*threshold=*/0));

ASSERT_FALSE(tasks.updateHint(region_id, /*eager_truncated_index=*/10000, /*applied_index=*/10, /*threshold=*/512));

{
// create new hints
ASSERT_TRUE(
tasks.updateHint(region_id, /*eager_truncated_index=*/10, /*applied_index=*/10000, /*threshold=*/512));
// the applied index advance, but not merged into the hints
ASSERT_FALSE(
tasks.updateHint(region_id, /*eager_truncated_index=*/10, /*applied_index=*/10000 + 10, /*threshold=*/512));
auto hints = tasks.getAndClearHints();
ASSERT_EQ(hints.size(), 1);
ASSERT_TRUE(hints.contains(region_id));
ASSERT_EQ(hints[region_id].applied_index, 10000);
ASSERT_EQ(hints[region_id].eager_truncate_index, 10);
}
{
auto hints = tasks.getAndClearHints();
ASSERT_TRUE(hints.empty());
}

{
// create new hints
ASSERT_TRUE(
tasks.updateHint(region_id, /*eager_truncated_index=*/10, /*applied_index=*/10000, /*threshold=*/512));
// the applied index advance, and merged into the hints
ASSERT_TRUE(
tasks
.updateHint(region_id, /*eager_truncated_index=*/10, /*applied_index=*/10000 + 523, /*threshold=*/512));
// applied index rollback, just ignore
ASSERT_FALSE(
tasks
.updateHint(region_id, /*eager_truncated_index=*/10, /*applied_index=*/10000 + 500, /*threshold=*/512));
auto hints = tasks.getAndClearHints();
ASSERT_EQ(hints.size(), 1);
ASSERT_TRUE(hints.contains(region_id));
ASSERT_EQ(hints[region_id].applied_index, 10000 + 523);
ASSERT_EQ(hints[region_id].eager_truncate_index, 10);
}

{
// create new hints
ASSERT_TRUE(
tasks.updateHint(region_id, /*eager_truncated_index=*/10, /*applied_index=*/10000, /*threshold=*/512));
// the applied index and truncated index advance, and merged into the hints
ASSERT_TRUE(
tasks
.updateHint(region_id, /*eager_truncated_index=*/30, /*applied_index=*/10000 + 523, /*threshold=*/512));
auto hints = tasks.getAndClearHints();
ASSERT_EQ(hints.size(), 1);
ASSERT_TRUE(hints.contains(region_id));
ASSERT_EQ(hints[region_id].applied_index, 10000 + 523);
ASSERT_EQ(hints[region_id].eager_truncate_index, 10);
}
}
CATCH

} // namespace DB::tests
4 changes: 2 additions & 2 deletions dbms/src/TiDB/Etcd/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ grpc::Status Client::leaseRevoke(LeaseID lease_id)
}

std::tuple<v3electionpb::LeaderKey, grpc::Status> Client::campaign(
grpc::ClientContext * grpc_context,
const String & name,
const String & value,
LeaseID lease_id)
Expand All @@ -195,12 +196,11 @@ std::tuple<v3electionpb::LeaderKey, grpc::Status> Client::campaign(
req.set_value(value);
req.set_lease(lease_id);

grpc::ClientContext context;
// usually use `campaign` blocks until become leader or error happens,
// don't set timeout.

v3electionpb::CampaignResponse resp;
auto status = leaderClient()->election_stub->Campaign(&context, req, &resp);
auto status = leaderClient()->election_stub->Campaign(grpc_context, req, &resp);
return {resp.leader(), status};
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/TiDB/Etcd/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Client
grpc::Status leaseRevoke(LeaseID lease_id);

std::tuple<v3electionpb::LeaderKey, grpc::Status> campaign(
grpc::ClientContext * grpc_context,
const String & name,
const String & value,
LeaseID lease_id);
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/TiDB/OwnerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ void EtcdOwnerManager::cancelImpl()
}
if (th_camaign.joinable())
{
{
std::unique_lock lock(mtx_camaign);
if (campaing_ctx)
campaing_ctx->TryCancel();
}
th_camaign.join();
}
if (th_watch_owner.joinable())
Expand Down Expand Up @@ -248,7 +253,11 @@ void EtcdOwnerManager::camaignLoop(Etcd::SessionPtr session)
const auto lease_id = session->leaseID();
LOG_DEBUG(log, "new campaign loop with lease_id={:x}", lease_id);
// Let this thread blocks until becone owner or error occurs
auto && [new_leader, status] = client->campaign(campaign_name, id, lease_id);
{
std::unique_lock lock(mtx_camaign);
campaing_ctx = std::make_unique<grpc::ClientContext>();
}
auto && [new_leader, status] = client->campaign(campaing_ctx.get(), campaign_name, id, lease_id);
if (!status.ok())
{
// if error, continue next campaign
Expand Down
1 change: 1 addition & 0 deletions dbms/src/TiDB/OwnerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class EtcdOwnerManager : public OwnerManager
std::mutex mtx_camaign;
State state = State::Init;
std::condition_variable cv_camaign;
std::unique_ptr<grpc::ClientContext> campaing_ctx;

// A thread for running camaign logic
std::thread th_camaign;
Expand Down
76 changes: 74 additions & 2 deletions dbms/src/TiDB/tests/gtest_owner_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <chrono>
#include <condition_variable>
#include <ext/scope_guard.h>
#include <future>
#include <magic_enum.hpp>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -370,7 +371,6 @@ try
}
CATCH


TEST_F(OwnerManagerTest, CreateEtcdSessionFail)
try
{
Expand All @@ -397,7 +397,7 @@ try
auto owner0
= std::static_pointer_cast<EtcdOwnerManager>(OwnerManager::createS3GCOwner(*ctx, id, etcd_client, test_ttl));
auto owner_info = owner0->getOwnerID();
EXPECT_EQ(owner_info.status, OwnerType::NotOwner) << magic_enum::enum_name(owner_info.status);
EXPECT_EQ(owner_info.status, OwnerType::NoLeader) << magic_enum::enum_name(owner_info.status);

FailPointHelper::enableFailPoint(FailPoints::force_fail_to_create_etcd_session);

Expand All @@ -408,4 +408,76 @@ try
}
CATCH

TEST_F(OwnerManagerTest, CancelNonOwner)
try
{
auto etcd_endpoint = Poco::Environment::get("ETCD_ENDPOINT", "");
if (etcd_endpoint.empty())
{
const auto * t = ::testing::UnitTest::GetInstance()->current_test_info();
LOG_INFO(
log,
"{}.{} is skipped because env ETCD_ENDPOINT not set. "
"Run it with an etcd cluster using `ETCD_ENDPOINT=127.0.0.1:2379 ./dbms/gtests_dbms ...`",
t->test_case_name(),
t->name());
return;
}

using namespace std::chrono_literals;

auto ctx = TiFlashTestEnv::getContext();
pingcap::ClusterConfig config;
pingcap::pd::ClientPtr pd_client = std::make_shared<pingcap::pd::Client>(Strings{etcd_endpoint}, config);
auto etcd_client = DB::Etcd::Client::create(pd_client, config);

std::atomic<bool> owner0_elected = false;
std::shared_ptr<EtcdOwnerManager> owner0;
std::shared_ptr<EtcdOwnerManager> owner1;
auto th_owner = std::async([&]() {
const String id = "owner_0";
owner0 = std::static_pointer_cast<EtcdOwnerManager>(
OwnerManager::createS3GCOwner(*ctx, id, etcd_client, test_ttl));
auto owner_info = owner0->getOwnerID();
EXPECT_EQ(owner_info.status, OwnerType::NoLeader) << magic_enum::enum_name(owner_info.status);

owner0->setBeOwnerHook([&] { owner0_elected = true; });
owner0->campaignOwner();

while (!owner0_elected)
;

owner_info = owner0->getOwnerID();
EXPECT_EQ(owner_info.status, OwnerType::IsOwner) << magic_enum::enum_name(owner_info.status);
});

auto th_non_owner = std::async([&] {
const String id = "owner_1";

LOG_INFO(log, "waiting for owner0 elected");
while (!owner0_elected)
;

owner1 = std::static_pointer_cast<EtcdOwnerManager>(
OwnerManager::createS3GCOwner(*ctx, id, etcd_client, test_ttl));
owner1->campaignOwner(); // this will block
});

auto th_cancel_non_owner = std::async([&] {
while (!owner0_elected)
;

LOG_INFO(log, "waiting for owner1 start campaign");
std::this_thread::sleep_for(3s);
LOG_INFO(log, "cancel owner1 start");
owner1->cancel(); // cancel should finished th_non_owner
LOG_INFO(log, "cancel owner1 done");
});

th_cancel_non_owner.wait();
th_non_owner.wait();
th_owner.wait();
}
CATCH

} // namespace DB::tests
Loading

0 comments on commit 630d107

Please sign in to comment.