From d7badaa0e0ceafbccd4458f1d650bc795ba87ed2 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 18 Oct 2023 21:27:29 +0800 Subject: [PATCH] Fix check fail when cancel parallel prehandling (#8209) close pingcap/tiflash#8201 --- .../Decode/SSTFilesToBlockInputStream.cpp | 22 ++++-- .../Decode/SSTFilesToBlockInputStream.h | 5 +- .../KVStore/Decode/PartitionStreams.cpp | 1 - .../KVStore/MultiRaft/PrehandleSnapshot.cpp | 2 + .../KVStore/MultiRaft/RegionCFDataBase.cpp | 1 + .../KVStore/TiKVHelpers/TiKVRecordFormat.h | 1 - .../KVStore/tests/gtest_raftstore_v2.cpp | 77 +++++++++++++++++-- 7 files changed, 95 insertions(+), 14 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp index 4b06fe58914..a73398e5379 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp @@ -43,6 +43,7 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( // const TiFlashRaftProxyHelper * proxy_helper_, TMTContext & tmt_, std::optional && soft_limit_, + std::shared_ptr prehandle_task_, SSTFilesToBlockInputStreamOpts && opts_) : region(std::move(region_)) , snapshot_index(snapshot_index_) @@ -50,6 +51,7 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( // , proxy_helper(proxy_helper_) , tmt(tmt_) , soft_limit(std::move(soft_limit_)) + , prehandle_task(prehandle_task_) , opts(std::move(opts_)) { log = Logger::get(opts.log_prefix); @@ -139,23 +141,30 @@ SSTFilesToBlockInputStream::~SSTFilesToBlockInputStream() = default; void SSTFilesToBlockInputStream::readPrefix() {} -void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader) +void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader, ColumnFamilyType cf) { // There must be no data left when we write suffix if (!reader) return; if (!reader->remained()) return; - RUNTIME_CHECK_MSG(soft_limit.has_value(), "soft_limit.has_value()"); + if (prehandle_task->abort_flag.load()) + return; + + // now the stream must be stopped by `soft_limit`, let's check the keys in reader + RUNTIME_CHECK_MSG(soft_limit.has_value(), "soft_limit.has_value(), cf={}", magic_enum::enum_name(cf)); BaseBuffView cur = reader->keyView(); - RUNTIME_CHECK_MSG(buffToStrView(cur) > soft_limit.value().raw_end, "cur > raw_end"); + RUNTIME_CHECK_MSG( + buffToStrView(cur) > soft_limit.value().raw_end, + "cur > raw_end, cf={}", + magic_enum::enum_name(cf)); } void SSTFilesToBlockInputStream::readSuffix() { - checkFinishedState(write_cf_reader); - checkFinishedState(default_cf_reader); - checkFinishedState(lock_cf_reader); + checkFinishedState(write_cf_reader, ColumnFamilyType::Write); + checkFinishedState(default_cf_reader, ColumnFamilyType::Default); + checkFinishedState(lock_cf_reader, ColumnFamilyType::Lock); // reset all SSTReaders and return without writting blocks any more. write_cf_reader.reset(); @@ -209,6 +218,7 @@ Block SSTFilesToBlockInputStream::read() loadCFDataFromSST(ColumnFamilyType::Lock, &rowkey); auto block = readCommitedBlock(); + if (block.rows() != 0) return block; // else continue to decode key-value from write CF. diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h index dab8223700f..1a4d7494191 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -116,6 +117,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream const TiFlashRaftProxyHelper * proxy_helper_, TMTContext & tmt_, std::optional && soft_limit_, + std::shared_ptr prehandle_task_, SSTFilesToBlockInputStreamOpts && opts_); ~SSTFilesToBlockInputStream() override; @@ -163,7 +165,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream // Emits data into block if the transaction to this key is committed. Block readCommitedBlock(); bool maybeStopBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader); - void checkFinishedState(SSTReaderPtr & reader); + void checkFinishedState(SSTReaderPtr & reader, ColumnFamilyType cf); private: RegionPtr region; @@ -172,6 +174,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream const TiFlashRaftProxyHelper * proxy_helper{nullptr}; TMTContext & tmt; std::optional soft_limit; + std::shared_ptr prehandle_task; const SSTFilesToBlockInputStreamOpts opts; LoggerPtr log; diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 75510c99231..7f99a71d76a 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -541,7 +541,6 @@ Block GenRegionBlockDataWithSchema( // No committed data, just return if (!data_list_read) return res_block; - { Stopwatch watch; { diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index 26452a07597..ef0228fa006 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -354,6 +354,7 @@ static void runInParallel( proxy_helper, tmt, std::move(part_limit), + prehandle_task, DM::SSTFilesToBlockInputStreamOpts(opt)); try { @@ -608,6 +609,7 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( proxy_helper, tmt, std::nullopt, + prehandle_task, DM::SSTFilesToBlockInputStreamOpts(opt)); // `split_keys` do not begin with 'z'. diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp index 46218489552..9bf07b060b9 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp @@ -47,6 +47,7 @@ RegionDataRes RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value if (!kv_pair) return 0; + return insert(std::move(*kv_pair), mode); } diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h b/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h index 66bcdf9d05f..9c7c3680262 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h @@ -401,7 +401,6 @@ inline DecodedWriteCFValue decodeWriteCfValue(const TiKVValue & value) auto write_type = RecordKVFormat::readUInt8(data, len); //write type bool can_ignore = write_type != CFModifyFlag::DelFlag && write_type != CFModifyFlag::PutFlag; - if (can_ignore) return std::nullopt; diff --git a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp index 1365d972d7e..73736ab4a6e 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp @@ -28,6 +28,73 @@ extern const char force_set_sst_to_dtfile_block_size[]; namespace tests { +// Test if active cancel from proxy. +TEST_F(RegionKVStoreTest, KVStoreSingleSnapCancel) +try +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + proxy_instance->cluster_ver = RaftstoreVer::V2; + ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr); + ASSERT_NE(proxy_helper->fn_get_config_json, nullptr); + UInt64 region_id = 1; + TableID table_id; + FailPointHelper::enableFailPoint(FailPoints::force_set_parallel_prehandle_threshold, static_cast(0)); + FailPointHelper::enableFailPoint(FailPoints::force_set_sst_to_dtfile_block_size, static_cast(1)); + SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_set_sst_to_dtfile_block_size"); }); + SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_set_parallel_prehandle_threshold"); }); + { + region_id = 2; + initStorages(); + KVStore & kvs = getKVS(); + HandleID table_limit = 40; + HandleID sst_limit = 40; + table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + auto start = RecordKVFormat::genKey(table_id, 0); + auto end = RecordKVFormat::genKey(table_id, table_limit); + proxy_instance->bootstrapWithRegion( + kvs, + ctx.getTMTContext(), + region_id, + std::make_pair(start.toString(), end.toString())); + auto r1 = proxy_instance->getRegion(region_id); + + auto [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + auto kkk = RecordKVFormat::decodeWriteCfValue(TiKVValue::copyFrom(value_write)); + { + MockSSTReader::getMockSSTData().clear(); + MockRaftStoreProxy::Cf default_cf{region_id, table_id, ColumnFamilyType::Default}; + for (HandleID h = 1; h < sst_limit; h++) + { + auto k = RecordKVFormat::genKey(table_id, h, 111); + default_cf.insert_raw(k, value_default); + } + default_cf.finish_file(SSTFormatKind::KIND_TABLET); + default_cf.freeze(); + MockRaftStoreProxy::Cf write_cf{region_id, table_id, ColumnFamilyType::Write}; + for (HandleID h = 1; h < sst_limit; h++) + { + auto k = RecordKVFormat::genKey(table_id, h, 111); + write_cf.insert_raw(k, value_write); + } + write_cf.finish_file(SSTFormatKind::KIND_TABLET); + write_cf.freeze(); + + auto sp = SyncPointCtl::enableInScope("before_SSTFilesToDTFilesOutputStream::handle_one"); + std::thread t([&]() { + auto [kvr1, res] + = proxy_instance + ->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf, write_cf}, 0, 0, std::nullopt); + }); + sp.waitAndPause(); + kvs.abortPreHandleSnapshot(region_id, ctx.getTMTContext()); + sp.next(); + sp.disable(); + t.join(); + } + } +} +CATCH + // Test several uncommitted keys with only one version. TEST_F(RegionKVStoreTest, KVStoreSingleSnap1) try @@ -74,7 +141,7 @@ try auto k = RecordKVFormat::genKey(table_id, h, 111); if (h == uncommitted) continue; - write_cf.insert_raw(k, value_default); + write_cf.insert_raw(k, value_write); } write_cf.finish_file(SSTFormatKind::KIND_TABLET); write_cf.freeze(); @@ -154,7 +221,7 @@ try { auto [value_write, value_default] = proxy_instance->generateTiKVKeyValue(tso, 999); auto k = RecordKVFormat::genKey(table_id, 10, tso); - write_cf.insert_raw(k, value_default); + write_cf.insert_raw(k, value_write); } write_cf.finish_file(SSTFormatKind::KIND_TABLET); write_cf.freeze(); @@ -216,7 +283,7 @@ try for (HandleID h = 1; h < sst_limit; h++) { auto k = RecordKVFormat::genKey(table_id, h, 111); - write_cf.insert_raw(k, value_default); + write_cf.insert_raw(k, value_write); } write_cf.finish_file(SSTFormatKind::KIND_TABLET); write_cf.freeze(); @@ -275,7 +342,7 @@ try for (HandleID h = 1; h < sst_limit; h++) { auto k = RecordKVFormat::genKey(table_id, h, 111); - write_cf.insert_raw(k, value_default); + write_cf.insert_raw(k, value_write); } write_cf.finish_file(SSTFormatKind::KIND_TABLET); write_cf.freeze(); @@ -348,7 +415,7 @@ try for (HandleID h = table_limit_start + 10; h < table_limit_end - 10; h++) { auto k = RecordKVFormat::genKey(table_id, h, 111); - write_cf.insert_raw(k, value_default); + write_cf.insert_raw(k, value_write); } write_cf.finish_file(SSTFormatKind::KIND_TABLET); write_cf.freeze();