Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Fix exception in FAP worker #8989

Merged
merged 6 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ namespace DB
M(force_agg_on_partial_block) \
M(force_set_fap_candidate_store_id) \
M(force_not_clean_fap_on_destroy) \
M(force_fap_worker_throw) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge)
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace DB

namespace FailPoints
{
extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
} // namespace FailPoints

Expand Down Expand Up @@ -199,6 +200,8 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
auto current_store_id = tmt.getKVStore()->clonedStoreMeta().id();
std::vector<StoreID> candidate_store_ids = getCandidateStoreIDsForRegion(tmt, region_id, current_store_id);

fiu_do_on(FailPoints::force_fap_worker_throw, { throw Exception(ErrorCodes::LOGICAL_ERROR, "mocked throw"); });

if (candidate_store_ids.empty())
{
LOG_DEBUG(log, "No suitable candidate peer for region_id={}", region_id);
Expand Down Expand Up @@ -445,6 +448,8 @@ FastAddPeerRes FastAddPeerImpl(
new_peer_id,
e.message()));
GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment();
// The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState,
// since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy.
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
}
catch (...)
Expand All @@ -456,6 +461,8 @@ FastAddPeerRes FastAddPeerImpl(
region_id,
new_peer_id));
GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment();
// The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState.
// since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy.
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,35 @@ void FastAddPeerContext::resolveFapSnapshotState(
/// Cancel in `FastAddPeer` is blocking, so a regular snapshot won't meet a canceling snapshot.
/// Can't be Finished because:
/// - A finished task must be fetched by proxy on the next `FastAddPeer`.
/// Unless
/// - The task in worker throws without actually write anything, result in Finished state.
/// -- The destroy region case ---
/// When FAP goes on, it blocks all MsgAppend messages to this region peer, so the destroy won't happen.
/// If the region is destroyed now and sent to this store later, it must be with another peer_id.
RUNTIME_CHECK_MSG(
prev_state == FAPAsyncTasks::TaskState::NotScheduled,
prev_state != FAPAsyncTasks::TaskState::Running && prev_state != FAPAsyncTasks::TaskState::InQueue,
"FastAddPeer: find scheduled fap task, region_id={} fap_state={} is_regular_snapshot={}",
region_id,
magic_enum::enum_name(prev_state),
is_regular_snapshot);
if (prev_state == FAPAsyncTasks::TaskState::Finished)
{
bool is_exception = false;
try
{
tasks_trace->fetchResult(region_id);
}
catch (...)
{
is_exception = true;
}
LOG_INFO(
log,
"FastAddPeer: clean finished result region_id={} is_regular_snapshot={} is_exception={}",
region_id,
is_regular_snapshot,
is_exception);
}
// 1. There leaves some non-ingested data on disk after restart.
// 2. There has been no fap at all.
// 3. FAP is enabled before, but disabled for now.
Expand Down
64 changes: 60 additions & 4 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ namespace DB
{
namespace FailPoints
{
extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
} // namespace FailPoints

namespace FailPoints
{
extern const char force_not_clean_fap_on_destroy[];
} // namespace FailPoints

Expand Down Expand Up @@ -870,5 +867,64 @@ try
}
CATCH

TEST_F(RegionKVStoreTestFAP, FAPWorkerException)
try
{
CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{});
KVStore & kvs = getKVS();
RegionPtr kv_region = std::get<1>(mock_data);

auto & global_context = TiFlashTestEnv::getGlobalContext();
auto fap_context = global_context.getSharedContextDisagg()->fap_context;
uint64_t region_id = 1;

EngineStoreServerWrap server = {
.tmt = &global_context.getTMTContext(),
.proxy_helper = proxy_helper.get(),
};

kvstore->getStore().store_id.store(1, std::memory_order_release);
kvstore->debugMutStoreMeta().set_id(1);
ASSERT_EQ(1, kvstore->getStoreID());
ASSERT_EQ(1, kvstore->clonedStoreMeta().id());
FailPointHelper::enableFailPoint(FailPoints::force_fap_worker_throw);
FailPointHelper::enableFailPoint(FailPoints::force_set_fap_candidate_store_id);
// The FAP will fail because it doesn't contain the new peer in region meta.
FastAddPeer(&server, region_id, 2333);
eventuallyPredicate(
[&]() { return fap_context->tasks_trace->queryState(region_id) == FAPAsyncTasks::TaskState::Finished; });
eventuallyPredicate([&]() {
return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333);
});
ASSERT_TRUE(!fap_context->tryGetCheckpointIngestInfo(region_id).has_value());
// Now we try to apply regular snapshot.
// Note that if an fap snapshot is in stage 1, no regular snapshot could happen,
// because no MsgAppend is handled, such that no following MsgSnapshot could be sent.
{
MockSSTReader::getMockSSTData().clear();
MockSSTGenerator default_cf{901, 800, ColumnFamilyType::Default};
default_cf.finish_file();
default_cf.freeze();
kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface();
proxy_instance->snapshot(
kvs,
global_context.getTMTContext(),
region_id,
{default_cf},
kv_region->cloneMetaRegion(),
2,
0,
0,
std::nullopt,
false);
}
ASSERT_EQ(fap_context->tasks_trace->queryState(region_id), FAPAsyncTasks::TaskState::NotScheduled);

FailPointHelper::disableFailPoint(FailPoints::force_fap_worker_throw);
FailPointHelper::disableFailPoint(FailPoints::force_set_fap_candidate_store_id);
}
CATCH


} // namespace tests
} // namespace DB