diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 1f24d3030f4..5f6f3758883 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -110,6 +110,7 @@ namespace DB M(force_agg_on_partial_block) \ M(force_set_fap_candidate_store_id) \ M(force_not_clean_fap_on_destroy) \ + M(force_fap_worker_throw) \ M(delta_tree_create_node_fail) \ M(disable_flush_cache) \ M(force_agg_two_level_hash_table_before_merge) diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index 21a3b9b94fc..11d1513d6c9 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -51,6 +51,7 @@ namespace DB namespace FailPoints { +extern const char force_fap_worker_throw[]; extern const char force_set_fap_candidate_store_id[]; } // namespace FailPoints @@ -199,6 +200,8 @@ std::variant FastAddPeerImplSelect( auto current_store_id = tmt.getKVStore()->clonedStoreMeta().id(); std::vector candidate_store_ids = getCandidateStoreIDsForRegion(tmt, region_id, current_store_id); + fiu_do_on(FailPoints::force_fap_worker_throw, { throw Exception(ErrorCodes::LOGICAL_ERROR, "mocked throw"); }); + if (candidate_store_ids.empty()) { LOG_DEBUG(log, "No suitable candidate peer for region_id={}", region_id); @@ -445,6 +448,8 @@ FastAddPeerRes FastAddPeerImpl( new_peer_id, e.message())); GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment(); + // The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState, + // since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy. return genFastAddPeerRes(FastAddPeerStatus::BadData, "", ""); } catch (...) @@ -456,6 +461,8 @@ FastAddPeerRes FastAddPeerImpl( region_id, new_peer_id)); GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment(); + // The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState. + // since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy. return genFastAddPeerRes(FastAddPeerStatus::BadData, "", ""); } } diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp index 62e9bcb7962..051c369bd0b 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp @@ -239,15 +239,35 @@ void FastAddPeerContext::resolveFapSnapshotState( /// Cancel in `FastAddPeer` is blocking, so a regular snapshot won't meet a canceling snapshot. /// Can't be Finished because: /// - A finished task must be fetched by proxy on the next `FastAddPeer`. + /// Unless + /// - The task in worker throws without actually write anything, result in Finished state. /// -- The destroy region case --- /// When FAP goes on, it blocks all MsgAppend messages to this region peer, so the destroy won't happen. /// If the region is destroyed now and sent to this store later, it must be with another peer_id. RUNTIME_CHECK_MSG( - prev_state == FAPAsyncTasks::TaskState::NotScheduled, + prev_state != FAPAsyncTasks::TaskState::Running && prev_state != FAPAsyncTasks::TaskState::InQueue, "FastAddPeer: find scheduled fap task, region_id={} fap_state={} is_regular_snapshot={}", region_id, magic_enum::enum_name(prev_state), is_regular_snapshot); + if (prev_state == FAPAsyncTasks::TaskState::Finished) + { + bool is_exception = false; + try + { + tasks_trace->fetchResult(region_id); + } + catch (...) + { + is_exception = true; + } + LOG_INFO( + log, + "FastAddPeer: clean finished result region_id={} is_regular_snapshot={} is_exception={}", + region_id, + is_regular_snapshot, + is_exception); + } // 1. There leaves some non-ingested data on disk after restart. // 2. There has been no fap at all. // 3. FAP is enabled before, but disabled for now. diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 01c08bb3414..3717bea6ee3 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -41,11 +41,8 @@ namespace DB { namespace FailPoints { +extern const char force_fap_worker_throw[]; extern const char force_set_fap_candidate_store_id[]; -} // namespace FailPoints - -namespace FailPoints -{ extern const char force_not_clean_fap_on_destroy[]; } // namespace FailPoints @@ -870,5 +867,64 @@ try } CATCH +TEST_F(RegionKVStoreTestFAP, FAPWorkerException) +try +{ + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{}); + KVStore & kvs = getKVS(); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + uint64_t region_id = 1; + + EngineStoreServerWrap server = { + .tmt = &global_context.getTMTContext(), + .proxy_helper = proxy_helper.get(), + }; + + kvstore->getStore().store_id.store(1, std::memory_order_release); + kvstore->debugMutStoreMeta().set_id(1); + ASSERT_EQ(1, kvstore->getStoreID()); + ASSERT_EQ(1, kvstore->clonedStoreMeta().id()); + FailPointHelper::enableFailPoint(FailPoints::force_fap_worker_throw); + FailPointHelper::enableFailPoint(FailPoints::force_set_fap_candidate_store_id); + // The FAP will fail because it doesn't contain the new peer in region meta. + FastAddPeer(&server, region_id, 2333); + eventuallyPredicate( + [&]() { return fap_context->tasks_trace->queryState(region_id) == FAPAsyncTasks::TaskState::Finished; }); + eventuallyPredicate([&]() { + return !CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + }); + ASSERT_TRUE(!fap_context->tryGetCheckpointIngestInfo(region_id).has_value()); + // Now we try to apply regular snapshot. + // Note that if an fap snapshot is in stage 1, no regular snapshot could happen, + // because no MsgAppend is handled, such that no following MsgSnapshot could be sent. + { + MockSSTReader::getMockSSTData().clear(); + MockSSTGenerator default_cf{901, 800, ColumnFamilyType::Default}; + default_cf.finish_file(); + default_cf.freeze(); + kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface(); + proxy_instance->snapshot( + kvs, + global_context.getTMTContext(), + region_id, + {default_cf}, + kv_region->cloneMetaRegion(), + 2, + 0, + 0, + std::nullopt, + false); + } + ASSERT_EQ(fap_context->tasks_trace->queryState(region_id), FAPAsyncTasks::TaskState::NotScheduled); + + FailPointHelper::disableFailPoint(FailPoints::force_fap_worker_throw); + FailPointHelper::disableFailPoint(FailPoints::force_set_fap_candidate_store_id); +} +CATCH + + } // namespace tests } // namespace DB