Skip to content

Commit

Permalink
KVStore: Fix exception in FAP worker (#8989)
Browse files Browse the repository at this point in the history
close #8988
  • Loading branch information
CalvinNeo authored Apr 25, 2024
1 parent a239275 commit b77839b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 5 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ namespace DB
M(force_agg_on_partial_block) \
M(force_set_fap_candidate_store_id) \
M(force_not_clean_fap_on_destroy) \
M(force_fap_worker_throw) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge)
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace DB

namespace FailPoints
{
extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
} // namespace FailPoints

Expand Down Expand Up @@ -199,6 +200,8 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
auto current_store_id = tmt.getKVStore()->clonedStoreMeta().id();
std::vector<StoreID> candidate_store_ids = getCandidateStoreIDsForRegion(tmt, region_id, current_store_id);

fiu_do_on(FailPoints::force_fap_worker_throw, { throw Exception(ErrorCodes::LOGICAL_ERROR, "mocked throw"); });

if (candidate_store_ids.empty())
{
LOG_DEBUG(log, "No suitable candidate peer for region_id={}", region_id);
Expand Down Expand Up @@ -445,6 +448,8 @@ FastAddPeerRes FastAddPeerImpl(
new_peer_id,
e.message()));
GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment();
// The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState,
// since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy.
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
}
catch (...)
Expand All @@ -456,6 +461,8 @@ FastAddPeerRes FastAddPeerImpl(
region_id,
new_peer_id));
GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment();
// The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState.
// since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy.
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,35 @@ void FastAddPeerContext::resolveFapSnapshotState(
/// Cancel in `FastAddPeer` is blocking, so a regular snapshot won't meet a canceling snapshot.
/// Can't be Finished because:
/// - A finished task must be fetched by proxy on the next `FastAddPeer`.
/// Unless
/// - The task in worker throws without actually write anything, result in Finished state.
/// -- The destroy region case ---
/// When FAP goes on, it blocks all MsgAppend messages to this region peer, so the destroy won't happen.
/// If the region is destroyed now and sent to this store later, it must be with another peer_id.
RUNTIME_CHECK_MSG(
prev_state == FAPAsyncTasks::TaskState::NotScheduled,
prev_state != FAPAsyncTasks::TaskState::Running && prev_state != FAPAsyncTasks::TaskState::InQueue,
"FastAddPeer: find scheduled fap task, region_id={} fap_state={} is_regular_snapshot={}",
region_id,
magic_enum::enum_name(prev_state),
is_regular_snapshot);
if (prev_state == FAPAsyncTasks::TaskState::Finished)
{
bool is_exception = false;
try
{
tasks_trace->fetchResult(region_id);
}
catch (...)
{
is_exception = true;
}
LOG_INFO(
log,
"FastAddPeer: clean finished result region_id={} is_regular_snapshot={} is_exception={}",
region_id,
is_regular_snapshot,
is_exception);
}
// 1. There leaves some non-ingested data on disk after restart.
// 2. There has been no fap at all.
// 3. FAP is enabled before, but disabled for now.
Expand Down
64 changes: 60 additions & 4 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ namespace DB
{
namespace FailPoints
{
extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
} // namespace FailPoints

namespace FailPoints
{
extern const char force_not_clean_fap_on_destroy[];
} // namespace FailPoints

Expand Down Expand Up @@ -870,5 +867,64 @@ try
}
CATCH

TEST_F(RegionKVStoreTestFAP, FAPWorkerException)
try
{
CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{});
KVStore & kvs = getKVS();
RegionPtr kv_region = std::get<1>(mock_data);

auto & global_context = TiFlashTestEnv::getGlobalContext();
auto fap_context = global_context.getSharedContextDisagg()->fap_context;
uint64_t region_id = 1;

EngineStoreServerWrap server = {
.tmt = &global_context.getTMTContext(),
.proxy_helper = proxy_helper.get(),
};

kvstore->getStore().store_id.store(1, std::memory_order_release);
kvstore->debugMutStoreMeta().set_id(1);
ASSERT_EQ(1, kvstore->getStoreID());
ASSERT_EQ(1, kvstore->clonedStoreMeta().id());
FailPointHelper::enableFailPoint(FailPoints::force_fap_worker_throw);
FailPointHelper::enableFailPoint(FailPoints::force_set_fap_candidate_store_id);
// The FAP will fail because it doesn't contain the new peer in region meta.
FastAddPeer(&server, region_id, 2333);
eventuallyPredicate(
[&]() { return fap_context->tasks_trace->queryState(region_id) == FAPAsyncTasks::TaskState::Finished; });
eventuallyPredicate([&]() {
return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333);
});
ASSERT_TRUE(!fap_context->tryGetCheckpointIngestInfo(region_id).has_value());
// Now we try to apply regular snapshot.
// Note that if an fap snapshot is in stage 1, no regular snapshot could happen,
// because no MsgAppend is handled, such that no following MsgSnapshot could be sent.
{
MockSSTReader::getMockSSTData().clear();
MockSSTGenerator default_cf{901, 800, ColumnFamilyType::Default};
default_cf.finish_file();
default_cf.freeze();
kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface();
proxy_instance->snapshot(
kvs,
global_context.getTMTContext(),
region_id,
{default_cf},
kv_region->cloneMetaRegion(),
2,
0,
0,
std::nullopt,
false);
}
ASSERT_EQ(fap_context->tasks_trace->queryState(region_id), FAPAsyncTasks::TaskState::NotScheduled);

FailPointHelper::disableFailPoint(FailPoints::force_fap_worker_throw);
FailPointHelper::disableFailPoint(FailPoints::force_set_fap_candidate_store_id);
}
CATCH


} // namespace tests
} // namespace DB

0 comments on commit b77839b

Please sign in to comment.