diff --git a/dbms/src/Debug/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockRaftStoreProxy.cpp index 36f7fefcd2e..c0df7514cee 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockRaftStoreProxy.cpp @@ -300,7 +300,7 @@ MockReadIndexTask * MockRaftStoreProxy::makeReadIndexTask(kvrpcpb::ReadIndexRequ { auto _ = genLockGuard(); - wake(); + wakeNotifier(); auto region = doGetRegion(req.context().region_id()); if (region) @@ -309,7 +309,7 @@ MockReadIndexTask * MockRaftStoreProxy::makeReadIndexTask(kvrpcpb::ReadIndexRequ r->data = std::make_shared(); r->data->req = std::move(req); r->data->region = region; - tasks.push_back(r->data); + read_index_tasks.push_back(r->data); return r; } return nullptr; @@ -330,7 +330,7 @@ size_t MockRaftStoreProxy::size() const return regions.size(); } -void MockRaftStoreProxy::wake() +void MockRaftStoreProxy::wakeNotifier() { notifier.wake(); } @@ -347,17 +347,18 @@ void MockRaftStoreProxy::testRunNormal(const std::atomic_bool & over) void MockRaftStoreProxy::runOneRound() { auto _ = genLockGuard(); - while (!tasks.empty()) + while (!read_index_tasks.empty()) { - auto & t = *tasks.front(); - if (!region_id_to_drop.count(t.req.context().region_id())) + auto & t = *read_index_tasks.front(); + auto region_id = t.req.context().region_id(); + if (!region_id_to_drop.contains(region_id)) { - if (region_id_to_error.count(t.req.context().region_id())) + if (region_id_to_error.contains(region_id)) t.update(false, true); else t.update(false, false); } - tasks.pop_front(); + read_index_tasks.pop_front(); } } @@ -367,24 +368,40 @@ void MockRaftStoreProxy::unsafeInvokeForTest(std::function> maybe_range) +{ + { + auto _ = genLockGuard(); + RUNTIME_CHECK_MSG(regions.empty(), "Mock Proxy regions are not cleared"); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + RUNTIME_CHECK_MSG(lock.regions.empty(), "KVStore regions are not cleared"); + } + auto start = RecordKVFormat::genKey(table_id, 0); + auto end = RecordKVFormat::genKey(table_id + 1, 0); + debugAddRegions(kvs, tmt, {region_id}, {maybe_range.value_or(std::make_pair(start.toString(), end.toString()))}); +} + +void MockRaftStoreProxy::debugAddRegions( + KVStore & kvs, + TMTContext & tmt, + std::vector region_ids, + std::vector> && ranges) { UNUSED(tmt); + int n = ranges.size(); auto _ = genLockGuard(); - regions.emplace(region_id, std::make_shared(region_id)); - auto task_lock = kvs.genTaskLock(); auto lock = kvs.genRegionWriteLock(task_lock); + for (int i = 0; i < n; ++i) { - auto start = RecordKVFormat::genKey(table_id, 0); - auto end = RecordKVFormat::genKey(table_id + 1, 0); - auto range = maybe_range.value_or(std::make_pair(start.toString(), end.toString())); - auto region = tests::makeRegion(region_id, range.first, range.second); - lock.regions.emplace(region_id, region); + regions.emplace(region_ids[i], std::make_shared(region_ids[i])); + auto region = tests::makeRegion(region_ids[i], ranges[i].first, ranges[i].second, kvs.getProxyHelper()); + lock.regions.emplace(region_ids[i], region); lock.index.add(region); } } @@ -464,7 +481,7 @@ std::tuple MockRaftStoreProxy::rawWrite( } -std::tuple MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index) +std::tuple MockRaftStoreProxy::adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest && request, raft_cmdpb::AdminResponse && response) { uint64_t index = 0; uint64_t term = 0; @@ -477,16 +494,6 @@ std::tuple MockRaftStoreProxy::compactLog(UInt64 region_id, // The new entry is committed on Proxy's side. region->updateCommitIndex(index); // We record them, as persisted raft log, for potential recovery. - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - request.mutable_compact_log(); - request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog); - request.mutable_compact_log()->set_compact_index(compact_index); - // Find compact term, otherwise log must have been compacted. - if (region->commands.count(compact_index)) - { - request.mutable_compact_log()->set_compact_term(region->commands[index].term); - } region->commands[index] = { term, MockProxyRegion::AdminCommand{ @@ -497,6 +504,100 @@ std::tuple MockRaftStoreProxy::compactLog(UInt64 region_id, return std::make_tuple(index, term); } +std::tuple MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index) +{ + auto region = getRegion(region_id); + assert(region != nullptr); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog); + request.mutable_compact_log()->set_compact_index(compact_index); + // Find compact term, otherwise log must have been compacted. + if (region->commands.contains(compact_index)) + { + request.mutable_compact_log()->set_compact_term(region->commands[compact_index].term); + } + return adminCommand(region_id, std::move(request), std::move(response)); +} + +std::tuple MockRaftStoreProxy::composeChangePeer(metapb::Region && meta, std::vector peer_ids, bool is_v2) +{ + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + if (is_v2) + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2); + } + else + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeer); + } + meta.mutable_peers()->Clear(); + for (auto i : peer_ids) + { + meta.add_peers()->set_id(i); + } + *response.mutable_change_peer()->mutable_region() = meta; + return std::make_tuple(request, response); +} + +std::tuple MockRaftStoreProxy::composePrepareMerge(metapb::Region && target, UInt64 min_index) +{ + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); + auto * prepare_merge = request.mutable_prepare_merge(); + prepare_merge->set_min_index(min_index); + *prepare_merge->mutable_target() = target; + return std::make_tuple(request, response); +} + +std::tuple MockRaftStoreProxy::composeCommitMerge(metapb::Region && source, UInt64 commit) +{ + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); + auto * commit_merge = request.mutable_commit_merge(); + commit_merge->set_commit(commit); + *commit_merge->mutable_source() = source; + return std::make_tuple(request, response); +} + +std::tuple MockRaftStoreProxy::composeRollbackMerge(UInt64 commit) +{ + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); + auto * rollback_merge = request.mutable_rollback_merge(); + rollback_merge->set_commit(commit); + return std::make_tuple(request, response); +} + +std::tuple MockRaftStoreProxy::composeBatchSplit(std::vector && region_ids, std::vector> && ranges, metapb::RegionEpoch old_epoch) +{ + RUNTIME_CHECK_MSG(region_ids.size() == ranges.size(), "error composeBatchSplit input"); + auto n = region_ids.size(); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::BatchSplit); + metapb::RegionEpoch new_epoch; + new_epoch.set_version(old_epoch.version() + 1); + new_epoch.set_conf_ver(old_epoch.conf_ver()); + { + raft_cmdpb::BatchSplitResponse * splits = response.mutable_splits(); + for (size_t i = 0; i < n; ++i) + { + auto * region = splits->add_regions(); + region->set_id(region_ids[i]); + region->set_start_key(ranges[i].first); + region->set_end_key(ranges[i].second); + region->add_peers(); + *region->mutable_region_epoch() = new_epoch; + } + } + return std::make_tuple(request, response); +} + void MockRaftStoreProxy::doApply( KVStore & kvs, TMTContext & tmt, @@ -668,7 +769,7 @@ void MockRaftStoreProxy::snapshot( term = region->getLatestCommitTerm(); } - auto new_kv_region = kvs.genRegionPtr(old_kv_region->getMetaRegion(), old_kv_region->mutMeta().peerId(), index, term); + auto new_kv_region = kvs.genRegionPtr(old_kv_region->cloneMetaRegion(), old_kv_region->mutMeta().peerId(), index, term); // The new entry is committed on Proxy's side. region->updateCommitIndex(index); @@ -695,16 +796,21 @@ void MockRaftStoreProxy::snapshot( new_kv_region->setApplied(index, term); } -TableID MockRaftStoreProxy::bootstrap_table( +TableID MockRaftStoreProxy::bootstrapTable( Context & ctx, KVStore & kvs, - TMTContext & tmt) + TMTContext & tmt, + bool drop_at_first) { UNUSED(kvs); ColumnsDescription columns; auto & data_type_factory = DataTypeFactory::instance(); columns.ordinary = NamesAndTypesList({NameAndTypePair{"a", data_type_factory.get("Int64")}}); auto tso = tmt.getPDClient()->getTS(); + if (drop_at_first) + { + MockTiDB::instance().dropDB(ctx, "d", true); + } MockTiDB::instance().newDataBase("d"); // Make sure there is a table with smaller id. MockTiDB::instance().newTable("d", "prevt" + toString(random()), columns, tso, "", "dt"); @@ -716,20 +822,6 @@ TableID MockRaftStoreProxy::bootstrap_table( return table_id; } -void MockRaftStoreProxy::clear_tables( - Context & ctx, - KVStore & kvs, - TMTContext & tmt) -{ - UNUSED(kvs); - UNUSED(tmt); - if (this->table_id != 1) - { - MockTiDB::instance().dropTable(ctx, "d", "t", false); - } - this->table_id = 1; -} - void GCMonitor::add(RawObjType type, int64_t diff) { auto _ = genLockGuard(); diff --git a/dbms/src/Debug/MockRaftStoreProxy.h b/dbms/src/Debug/MockRaftStoreProxy.h index 5e5d9ef3ebc..6cbb070ceb2 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.h +++ b/dbms/src/Debug/MockRaftStoreProxy.h @@ -161,10 +161,11 @@ struct MockRaftStoreProxy : MutexLockWrap size_t size() const; - void wake(); + void wakeNotifier(); void testRunNormal(const std::atomic_bool & over); + /// Handle one read index task. void runOneRound(); void unsafeInvokeForTest(std::function && cb); @@ -183,27 +184,33 @@ struct MockRaftStoreProxy : MutexLockWrap Type type = NORMAL; }; - /// boostrap a region. - void bootstrap( + /// Boostrap with a given region. + /// Similar to TiKV's `bootstrap_region`. + void bootstrapWithRegion( KVStore & kvs, TMTContext & tmt, UInt64 region_id, std::optional> maybe_range); - /// boostrap a table, since applying snapshot needs table schema. - TableID bootstrap_table( + /// Boostrap a table. + /// Must be called if: + /// 1. Applying snapshot which needs table schema + /// 2. Doing row2col. + TableID bootstrapTable( Context & ctx, KVStore & kvs, - TMTContext & tmt); + TMTContext & tmt, + bool drop_at_first = true); - /// clear tables. - void clear_tables( - Context & ctx, + /// Manually add a region. + void debugAddRegions( KVStore & kvs, - TMTContext & tmt); + TMTContext & tmt, + std::vector region_ids, + std::vector> && ranges); /// We assume that we generate one command, and immediately commit. - /// normal write to a region. + /// Normal write to a region. std::tuple normalWrite( UInt64 region_id, std::vector && keys, @@ -221,6 +228,14 @@ struct MockRaftStoreProxy : MutexLockWrap /// Create a compactLog admin command, returns (index, term) of the admin command itself. std::tuple compactLog(UInt64 region_id, UInt64 compact_index); + std::tuple adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest &&, raft_cmdpb::AdminResponse &&); + + static std::tuple composeChangePeer(metapb::Region && meta, std::vector peer_ids, bool is_v2 = true); + static std::tuple composePrepareMerge(metapb::Region && target, UInt64 min_index); + static std::tuple composeCommitMerge(metapb::Region && source, UInt64 commit); + static std::tuple composeRollbackMerge(UInt64 commit); + static std::tuple composeBatchSplit(std::vector && region_ids, std::vector> && ranges, metapb::RegionEpoch old_epoch); + struct Cf { Cf(UInt64 region_id_, TableID table_id_, ColumnFamilyType type_); @@ -273,16 +288,24 @@ struct MockRaftStoreProxy : MutexLockWrap uint64_t region_id, uint64_t to); + void clear() + { + auto _ = genLockGuard(); + regions.clear(); + } + MockRaftStoreProxy() { log = Logger::get("MockRaftStoreProxy"); table_id = 1; } + // Mock Proxy will drop read index requests to these regions std::unordered_set region_id_to_drop; + // Mock Proxy will return error read index response to these regions std::unordered_set region_id_to_error; std::map regions; - std::list> tasks; + std::list> read_index_tasks; AsyncWaker::Notifier notifier; TableID table_id; LoggerPtr log; diff --git a/dbms/src/Debug/dbgFuncMockRaftCommand.cpp b/dbms/src/Debug/dbgFuncMockRaftCommand.cpp index 3626041f428..23e26ccbacc 100644 --- a/dbms/src/Debug/dbgFuncMockRaftCommand.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftCommand.cpp @@ -171,7 +171,7 @@ void MockRaftCommand::dbgFuncPrepareMerge(Context & context, const ASTs & args, prepare_merge->set_min_index(min_index); metapb::Region * target = prepare_merge->mutable_target(); - *target = target_region->getMetaRegion(); + *target = target_region->cloneMetaRegion(); } } @@ -207,7 +207,7 @@ void MockRaftCommand::dbgFuncCommitMerge(Context & context, const ASTs & args, D auto * commit_merge = request.mutable_commit_merge(); { commit_merge->set_commit(source_region->appliedIndex()); - *commit_merge->mutable_source() = source_region->getMetaRegion(); + *commit_merge->mutable_source() = source_region->cloneMetaRegion(); } } @@ -241,7 +241,7 @@ void MockRaftCommand::dbgFuncRollbackMerge(Context & context, const ASTs & args, auto * rollback_merge = request.mutable_rollback_merge(); { - auto merge_state = region->getMergeState(); + auto merge_state = region->cloneMergeState(); rollback_merge->set_commit(merge_state.commit()); } } diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 08d7fd5fe0d..b0e18374979 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -546,7 +546,7 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi // Create a tmp region to store uncommitted data RegionPtr tmp_region; { - auto meta_region = region->getMetaRegion(); + auto meta_region = region->cloneMetaRegion(); auto meta_snap = region->dumpRegionMetaSnapshot(); auto peer_id = meta_snap.peer.id(); tmp_region = genRegionPtr(std::move(meta_region), peer_id, index, term); diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 0be67d40e7c..6432b65c1e1 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -560,6 +560,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ const auto handle_batch_split = [&](Regions & split_regions) { { + // `split_regions` doesn't include the derived region. auto manage_lock = genRegionWriteLock(task_lock); for (auto & new_region : split_regions) diff --git a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp index 075e871f364..680771fdc44 100644 --- a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp +++ b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp @@ -23,35 +23,14 @@ namespace DB { -//#define ADD_TEST_DEBUG_LOG_FMT +// #define ADD_TEST_DEBUG_LOG_FMT #ifdef ADD_TEST_DEBUG_LOG_FMT static Poco::Logger * global_logger_for_test = nullptr; static std::mutex global_logger_mutex; -#define STDOUT_TEST_LOG_FMT(...) \ - do \ - { \ - std::string formatted_message = LogFmtDetails::numArgs(__VA_ARGS__) > 1 \ - ? LogFmtDetails::toCheckedFmtStr( \ - FMT_STRING(LOG_GET_FIRST_ARG(__VA_ARGS__)), \ - __VA_ARGS__) \ - : LogFmtDetails::firstArg(__VA_ARGS__); \ - { \ - auto _ = std::lock_guard(global_logger_mutex); \ - std::cout << fmt::format( \ - "[{}][{}:{}][{}]", \ - Clock::now(), \ - &__FILE__[LogFmtDetails::getFileNameOffset(__FILE__)], \ - __LINE__, \ - formatted_message) \ - << std::endl; \ - } \ - } while (false) - -//#define TEST_LOG_FMT(...) LOG_ERROR(global_logger_for_test, __VA_ARGS__) - -#define TEST_LOG_FMT(...) STDOUT_TEST_LOG_FMT(__VA_ARGS__) + +#define TEST_LOG_FMT(...) LOG_ERROR(global_logger_for_test, __VA_ARGS__) void F_TEST_LOG_FMT(const std::string & s) { @@ -451,11 +430,10 @@ void ReadIndexDataNode::ReadIndexElement::doPoll(const TiFlashRaftProxyHelper & else { TEST_LOG_FMT( - "poll ReadIndexElement failed for region {}, time cost {}, timeout {}, start time {}", + "poll ReadIndexElement failed for region {}, time cost {}, timeout {}", region_id, - SteadyClock::now() - start_time, - timeout, - start_time); + std::chrono::duration(SteadyClock::now() - start_time).count(), + timeout); } if (clean_task) @@ -756,7 +734,7 @@ ReadIndexWorker::ReadIndexWorker( bool ReadIndexWorker::lastRunTimeout(SteadyClock::duration timeout) const { - TEST_LOG_FMT("worker {}, last run time {}, timeout {}", getID(), last_run_time.load(std::memory_order_relaxed), timeout); + TEST_LOG_FMT("worker {}, last run time {}, timeout {}", getID(), last_run_time.load(std::memory_order_relaxed).time_since_epoch().count(), std::chrono::duration_cast(timeout).count()); return last_run_time.load(std::memory_order_relaxed) + timeout < SteadyClock::now(); } diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 9265085e32b..0c7bea70b12 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -804,11 +804,19 @@ UInt64 RegionRaftCommandDelegate::appliedIndex() { return meta.makeRaftCommandDelegate().applyState().applied_index(); } -metapb::Region Region::getMetaRegion() const +metapb::Region Region::cloneMetaRegion() const +{ + return meta.cloneMetaRegion(); +} +const metapb::Region & Region::getMetaRegion() const { return meta.getMetaRegion(); } -raft_serverpb::MergeState Region::getMergeState() const +raft_serverpb::MergeState Region::cloneMergeState() const +{ + return meta.cloneMergeState(); +} +const raft_serverpb::MergeState & Region::getMergeState() const { return meta.getMergeState(); } diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 94778157c57..ff31dfd83bb 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -189,8 +189,10 @@ class Region : public std::enable_shared_from_this void tryCompactionFilter(const Timestamp safe_point); RegionRaftCommandDelegate & makeRaftCommandDelegate(const KVStoreTaskLock &); - metapb::Region getMetaRegion() const; - raft_serverpb::MergeState getMergeState() const; + metapb::Region cloneMetaRegion() const; + const metapb::Region & getMetaRegion() const; + raft_serverpb::MergeState cloneMergeState() const; + const raft_serverpb::MergeState & getMergeState() const; TableID getMappedTableID() const; KeyspaceID getKeyspaceID() const; diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp index 73477526ea6..089bc0af2dc 100644 --- a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp +++ b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp @@ -290,156 +290,4 @@ typename RegionCFDataBase::Data & RegionCFDataBase::getDataMut() template struct RegionCFDataBase; template struct RegionCFDataBase; template struct RegionCFDataBase; - - -namespace RecordKVFormat -{ -// https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs -inline void decodeLockCfValue(DecodedLockCFValue & res) -{ - const TiKVValue & value = *res.val; - const char * data = value.data(); - size_t len = value.dataSize(); - - kvrpcpb::Op lock_type = kvrpcpb::Op_MIN; - switch (readUInt8(data, len)) - { - case LockType::Put: - lock_type = kvrpcpb::Op::Put; - break; - case LockType::Delete: - lock_type = kvrpcpb::Op::Del; - break; - case LockType::Lock: - lock_type = kvrpcpb::Op::Lock; - break; - case LockType::Pessimistic: - lock_type = kvrpcpb::Op::PessimisticLock; - break; - } - res.lock_type = lock_type; - res.primary_lock = readVarString(data, len); - res.lock_version = readVarUInt(data, len); - - if (len > 0) - { - res.lock_ttl = readVarUInt(data, len); - while (len > 0) - { - char flag = readUInt8(data, len); - switch (flag) - { - case SHORT_VALUE_PREFIX: - { - size_t str_len = readUInt8(data, len); - if (len < str_len) - throw Exception("content len shorter than short value len", ErrorCodes::LOGICAL_ERROR); - // no need short value - readRawString(data, len, str_len); - break; - }; - case MIN_COMMIT_TS_PREFIX: - { - res.min_commit_ts = readUInt64(data, len); - break; - } - case FOR_UPDATE_TS_PREFIX: - { - res.lock_for_update_ts = readUInt64(data, len); - break; - } - case TXN_SIZE_PREFIX: - { - res.txn_size = readUInt64(data, len); - break; - } - case ASYNC_COMMIT_PREFIX: - { - res.use_async_commit = true; - const auto * start = data; - UInt64 cnt = readVarUInt(data, len); - for (UInt64 i = 0; i < cnt; ++i) - { - readVarString(data, len); - } - const auto * end = data; - res.secondaries = {start, static_cast(end - start)}; - break; - } - case ROLLBACK_TS_PREFIX: - { - UInt64 cnt = readVarUInt(data, len); - for (UInt64 i = 0; i < cnt; ++i) - { - readUInt64(data, len); - } - break; - } - case LAST_CHANGE_PREFIX: - { - // Used to accelerate TiKV MVCC scan, useless for TiFlash. - UInt64 last_change_ts = readUInt64(data, len); - UInt64 versions_to_last_change = readVarUInt(data, len); - UNUSED(last_change_ts); - UNUSED(versions_to_last_change); - break; - } - case TXN_SOURCE_PREFIX_FOR_LOCK: - { - // Used for CDC, useless for TiFlash. - UInt64 txn_source_prefic = readVarUInt(data, len); - UNUSED(txn_source_prefic); - break; - } - default: - { - std::string msg = std::string("invalid flag ") + flag + " in lock value " + value.toDebugString(); - throw Exception(msg, ErrorCodes::LOGICAL_ERROR); - } - } - } - } - if (len != 0) - throw Exception("invalid lock value " + value.toDebugString(), ErrorCodes::LOGICAL_ERROR); -} - -DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_) - : key(std::move(key_)) - , val(std::move(val_)) -{ - decodeLockCfValue(*this); -} - -void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const -{ - res.set_lock_type(lock_type); - res.set_primary_lock(primary_lock.data(), primary_lock.size()); - res.set_lock_version(lock_version); - res.set_lock_ttl(lock_ttl); - res.set_min_commit_ts(min_commit_ts); - res.set_lock_for_update_ts(lock_for_update_ts); - res.set_txn_size(txn_size); - res.set_use_async_commit(use_async_commit); - res.set_key(decodeTiKVKey(*key)); - - if (use_async_commit) - { - const auto * data = secondaries.data(); - auto len = secondaries.size(); - UInt64 cnt = readVarUInt(data, len); - for (UInt64 i = 0; i < cnt; ++i) - { - res.add_secondaries(readVarString(data, len)); - } - } -} - -std::unique_ptr DecodedLockCFValue::intoLockInfo() const -{ - auto res = std::make_unique(); - intoLockInfo(*res); - return res; -} - -} // namespace RecordKVFormat } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index de9c0d042de..8911b349193 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -448,16 +448,27 @@ RegionMeta::RegionMeta(metapb::Peer peer_, metapb::Region region, raft_serverpb: region_state.setRegion(std::move(region)); } -metapb::Region RegionMeta::getMetaRegion() const +metapb::Region RegionMeta::cloneMetaRegion() const { std::lock_guard lock(mutex); return region_state.getRegion(); } -raft_serverpb::MergeState RegionMeta::getMergeState() const +const metapb::Region & RegionMeta::getMetaRegion() const +{ + std::lock_guard lock(mutex); + return region_state.getRegion(); +} + +raft_serverpb::MergeState RegionMeta::cloneMergeState() const { std::lock_guard lock(mutex); return region_state.getMergeState(); } +const raft_serverpb::MergeState & RegionMeta::getMergeState() const +{ + std::lock_guard lock(mutex); + return region_state.getMergeState(); +} } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index 08fed625c2a..e0b993df280 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -109,8 +109,10 @@ class RegionMeta RegionMetaSnapshot dumpRegionMetaSnapshot() const; MetaRaftCommandDelegate & makeRaftCommandDelegate(); - metapb::Region getMetaRegion() const; - raft_serverpb::MergeState getMergeState() const; + const metapb::Region & getMetaRegion() const; + metapb::Region cloneMetaRegion() const; + const raft_serverpb::MergeState & getMergeState() const; + raft_serverpb::MergeState cloneMergeState() const; RegionMeta() = delete; diff --git a/dbms/src/Storages/Transaction/TiKVRecordFormat.cpp b/dbms/src/Storages/Transaction/TiKVRecordFormat.cpp new file mode 100644 index 00000000000..ef87924a7ff --- /dev/null +++ b/dbms/src/Storages/Transaction/TiKVRecordFormat.cpp @@ -0,0 +1,171 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace RecordKVFormat +{ +// https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs +inline void decodeLockCfValue(DecodedLockCFValue & res) +{ + const TiKVValue & value = *res.val; + const char * data = value.data(); + size_t len = value.dataSize(); + + kvrpcpb::Op lock_type = kvrpcpb::Op_MIN; + switch (readUInt8(data, len)) + { + case LockType::Put: + lock_type = kvrpcpb::Op::Put; + break; + case LockType::Delete: + lock_type = kvrpcpb::Op::Del; + break; + case LockType::Lock: + lock_type = kvrpcpb::Op::Lock; + break; + case LockType::Pessimistic: + lock_type = kvrpcpb::Op::PessimisticLock; + break; + } + res.lock_type = lock_type; + res.primary_lock = readVarString(data, len); + res.lock_version = readVarUInt(data, len); + + if (len > 0) + { + res.lock_ttl = readVarUInt(data, len); + while (len > 0) + { + char flag = readUInt8(data, len); + switch (flag) + { + case SHORT_VALUE_PREFIX: + { + size_t str_len = readUInt8(data, len); + if (len < str_len) + throw Exception("content len shorter than short value len", ErrorCodes::LOGICAL_ERROR); + // no need short value + readRawString(data, len, str_len); + break; + }; + case MIN_COMMIT_TS_PREFIX: + { + res.min_commit_ts = readUInt64(data, len); + break; + } + case FOR_UPDATE_TS_PREFIX: + { + res.lock_for_update_ts = readUInt64(data, len); + break; + } + case TXN_SIZE_PREFIX: + { + res.txn_size = readUInt64(data, len); + break; + } + case ASYNC_COMMIT_PREFIX: + { + res.use_async_commit = true; + const auto * start = data; + UInt64 cnt = readVarUInt(data, len); + for (UInt64 i = 0; i < cnt; ++i) + { + readVarString(data, len); + } + const auto * end = data; + res.secondaries = {start, static_cast(end - start)}; + break; + } + case ROLLBACK_TS_PREFIX: + { + UInt64 cnt = readVarUInt(data, len); + for (UInt64 i = 0; i < cnt; ++i) + { + readUInt64(data, len); + } + break; + } + case LAST_CHANGE_PREFIX: + { + // Used to accelerate TiKV MVCC scan, useless for TiFlash. + UInt64 last_change_ts = readUInt64(data, len); + UInt64 versions_to_last_change = readVarUInt(data, len); + UNUSED(last_change_ts); + UNUSED(versions_to_last_change); + break; + } + case TXN_SOURCE_PREFIX_FOR_LOCK: + { + // Used for CDC, useless for TiFlash. + UInt64 txn_source_prefic = readVarUInt(data, len); + UNUSED(txn_source_prefic); + break; + } + default: + { + std::string msg = std::string("invalid flag ") + flag + " in lock value " + value.toDebugString(); + throw Exception(msg, ErrorCodes::LOGICAL_ERROR); + } + } + } + } + if (len != 0) + throw Exception("invalid lock value " + value.toDebugString(), ErrorCodes::LOGICAL_ERROR); +} + +DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr key_, std::shared_ptr val_) + : key(std::move(key_)) + , val(std::move(val_)) +{ + decodeLockCfValue(*this); +} + +void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const +{ + res.set_lock_type(lock_type); + res.set_primary_lock(primary_lock.data(), primary_lock.size()); + res.set_lock_version(lock_version); + res.set_lock_ttl(lock_ttl); + res.set_min_commit_ts(min_commit_ts); + res.set_lock_for_update_ts(lock_for_update_ts); + res.set_txn_size(txn_size); + res.set_use_async_commit(use_async_commit); + res.set_key(decodeTiKVKey(*key)); + + if (use_async_commit) + { + const auto * data = secondaries.data(); + auto len = secondaries.size(); + UInt64 cnt = readVarUInt(data, len); + for (UInt64 i = 0; i < cnt; ++i) + { + res.add_secondaries(readVarString(data, len)); + } + } +} + +std::unique_ptr DecodedLockCFValue::intoLockInfo() const +{ + auto res = std::make_unique(); + intoLockInfo(*res); + return res; +} + +} // namespace RecordKVFormat +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index 4f91ef788bf..ca1557038db 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -18,21 +18,14 @@ namespace DB { namespace tests { -TEST_F(RegionKVStoreTest, NewProxy) +TEST_F(RegionKVStoreTest, PersistenceV1) +try { - createDefaultRegions(); auto ctx = TiFlashTestEnv::getGlobalContext(); - KVStore & kvs = getKVS(); { ASSERT_EQ(kvs.getRegion(0), nullptr); - auto task_lock = kvs.genTaskLock(); - auto lock = kvs.genRegionWriteLock(task_lock); - { - auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); - lock.regions.emplace(1, region); - lock.index.add(region); - } + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), 1, std::nullopt); } { kvs.tryPersistRegion(1); @@ -55,6 +48,7 @@ TEST_F(RegionKVStoreTest, NewProxy) ASSERT_EQ(kvs.tryFlushRegionData(1, false, false, ctx.getTMTContext(), 0, 0), false); } } +CATCH TEST_F(RegionKVStoreTest, ReadIndex) { @@ -71,25 +65,11 @@ TEST_F(RegionKVStoreTest, ReadIndex) { ASSERT_EQ(kvs.getRegion(0), nullptr); - auto task_lock = kvs.genTaskLock(); - auto lock = kvs.genRegionWriteLock(task_lock); - { - auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10), kvs.getProxyHelper()); - lock.regions.emplace(1, region); - lock.index.add(region); - } - { - auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20), kvs.getProxyHelper()); - lock.regions.emplace(2, region); - lock.index.add(region); - } - { - auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40), kvs.getProxyHelper()); - lock.regions.emplace(3, region); - lock.index.add(region); - } + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {1, 2, 3}, {{{RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)}, {RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)}, {RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)}}}); } { + // `read_index_worker_manager` is not set, fallback to v1. + // We don't support batch read index version 1 now ASSERT_EQ(kvs.read_index_worker_manager, nullptr); { auto region = kvs.getRegion(1); @@ -110,22 +90,17 @@ TEST_F(RegionKVStoreTest, ReadIndex) }, 1); ASSERT_NE(kvs.read_index_worker_manager, nullptr); - + } + { { + // Normal async notifier kvs.asyncRunReadIndexWorkers(); SCOPE_EXIT({ kvs.stopReadIndexWorkers(); }); - auto tar_region_id = 9; - { - auto task_lock = kvs.genTaskLock(); - auto lock = kvs.genRegionWriteLock(task_lock); - - auto region = makeRegion(tar_region_id, RecordKVFormat::genKey(2, 0), RecordKVFormat::genKey(2, 10)); - lock.regions.emplace(region->id(), region); - lock.index.add(region); - } + UInt64 tar_region_id = 9; + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {tar_region_id}, {{{RecordKVFormat::genKey(2, 0), RecordKVFormat::genKey(2, 10)}}}); { ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 5); proxy_instance->regions.at(tar_region_id)->updateCommitIndex(66); @@ -149,6 +124,7 @@ TEST_F(RegionKVStoreTest, ReadIndex) EngineStoreApplyRes::None); } { + // Async notifier error kvs.asyncRunReadIndexWorkers(); SCOPE_EXIT({ kvs.stopReadIndexWorkers(); @@ -175,15 +151,16 @@ TEST_F(RegionKVStoreTest, ReadIndex) ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); } + // Test read index + // Note `batchReadIndex` always returns latest committed index in our mock class. kvs.asyncRunReadIndexWorkers(); SCOPE_EXIT({ kvs.stopReadIndexWorkers(); }); { - // test read index auto region = kvs.getRegion(1); - auto req = GenRegionReadIndexReq(*region, 8); + auto req = GenRegionReadIndexReq(*region, 8); // start_ts = 8 auto resp = kvs.batchReadIndex({req}, 100); ASSERT_EQ(resp[0].first.read_index(), 5); { @@ -200,10 +177,11 @@ TEST_F(RegionKVStoreTest, ReadIndex) r.second->updateCommitIndex(667); } { + // Found in `history_success_tasks` auto region = kvs.getRegion(1); auto req = GenRegionReadIndexReq(*region, 8); auto resp = kvs.batchReadIndex({req}, 100); - ASSERT_EQ(resp[0].first.read_index(), 5); // history + ASSERT_EQ(resp[0].first.read_index(), 5); } { auto region = kvs.getRegion(1); @@ -221,6 +199,7 @@ TEST_F(RegionKVStoreTest, ReadIndex) ASSERT_EQ(std::get<0>(r), WaitIndexResult::Timeout); } { + // Wait for a new index 667 + 1 to be applied AsyncWaker::Notifier notifier; std::thread t([&]() { notifier.wake(); @@ -239,7 +218,7 @@ TEST_F(RegionKVStoreTest, ReadIndex) kvs.stopReadIndexWorkers(); kvs.releaseReadIndexWorkers(); over = true; - proxy_instance->wake(); + proxy_instance->wakeNotifier(); proxy_runner.join(); ASSERT(GCMonitor::instance().checkClean()); ASSERT(!GCMonitor::instance().empty()); @@ -252,19 +231,8 @@ void RegionKVStoreTest::testRaftMergeRollback(KVStore & kvs, TMTContext & tmt) auto source_region = kvs.getRegion(region_id); auto target_region = kvs.getRegion(1); - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - { - request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); - auto * prepare_merge = request.mutable_prepare_merge(); - { - auto min_index = source_region->appliedIndex(); - prepare_merge->set_min_index(min_index); - metapb::Region * target = prepare_merge->mutable_target(); - *target = target_region->getMetaRegion(); - } - } + auto && [request, response] = MockRaftStoreProxy::composePrepareMerge(target_region->cloneMetaRegion(), source_region->appliedIndex()); kvs.handleAdminRaftCmd(std::move(request), std::move(response), region_id, @@ -275,18 +243,7 @@ void RegionKVStoreTest::testRaftMergeRollback(KVStore & kvs, TMTContext & tmt) } { auto region = kvs.getRegion(region_id); - - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - { - request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); - - auto * rollback_merge = request.mutable_rollback_merge(); - { - auto merge_state = region->getMergeState(); - rollback_merge->set_commit(merge_state.commit()); - } - } + auto && [request, response] = MockRaftStoreProxy::composeRollbackMerge(region->getMergeState().commit()); region->setStateApplying(); try @@ -327,18 +284,7 @@ void RegionKVStoreTest::testRaftMergeRollback(KVStore & kvs, TMTContext & tmt) } { auto region = kvs.getRegion(region_id); - - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - { - request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); - - auto * rollback_merge = request.mutable_rollback_merge(); - { - auto merge_state = region->getMergeState(); - rollback_merge->set_commit(merge_state.commit()); - } - } + auto && [request, response] = MockRaftStoreProxy::composeRollbackMerge(region->getMergeState().commit()); kvs.handleAdminRaftCmd(std::move(request), std::move(response), region_id, @@ -357,50 +303,20 @@ void RegionKVStoreTest::testRaftSplit(KVStore & kvs, TMTContext & tmt) region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + region->insert("lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); region->insert("write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); } - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - { - // split region - auto region_id = 1; - RegionID region_id2 = 7; - auto source_region = kvs.getRegion(region_id); - metapb::RegionEpoch new_epoch; - new_epoch.set_version(source_region->version() + 1); - new_epoch.set_conf_ver(source_region->confVer()); - TiKVKey start_key1, start_key2, end_key1, end_key2; - { - start_key1 = RecordKVFormat::genKey(1, 5); - start_key2 = RecordKVFormat::genKey(1, 0); - end_key1 = RecordKVFormat::genKey(1, 10); - end_key2 = RecordKVFormat::genKey(1, 5); - } - { - request.set_cmd_type(raft_cmdpb::AdminCmdType::BatchSplit); - raft_cmdpb::BatchSplitResponse * splits = response.mutable_splits(); - { - auto * region = splits->add_regions(); - region->set_id(region_id); - region->set_start_key(start_key1); - region->set_end_key(end_key1); - region->add_peers(); - *region->mutable_region_epoch() = new_epoch; - } - { - auto * region = splits->add_regions(); - region->set_id(region_id2); - region->set_start_key(start_key2); - region->set_end_key(end_key2); - region->add_peers(); - *region->mutable_region_epoch() = new_epoch; - } - } - } + + // Split region + RegionID region_id = 1; + RegionID region_id2 = 7; + auto source_region = kvs.getRegion(region_id); + auto old_epoch = source_region->mutMeta().getMetaRegion().region_epoch(); + auto && [request, response] = MockRaftStoreProxy::composeBatchSplit({region_id, region_id2}, {{RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10)}, {RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5)}}, old_epoch); kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); { auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); @@ -413,10 +329,11 @@ void RegionKVStoreTest::testRaftSplit(KVStore & kvs, TMTContext & tmt) ASSERT_EQ(mmp.size(), 1); } { + // We don't force migration of committed data from derived region to dm store. ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 default 1 ]"); ASSERT_EQ(kvs.getRegion(7)->dataInfo(), "[lock 1 ]"); } - // rollback 1 to before split + // Rollback 1 to before split // 7 is persisted { kvs.handleDestroy(1, tmt); @@ -439,12 +356,13 @@ void RegionKVStoreTest::testRaftSplit(KVStore & kvs, TMTContext & tmt) ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); } { + // Region 1 and 7 overlaps. auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); ASSERT_TRUE(mmp.count(7) != 0); ASSERT_TRUE(mmp.count(1) != 0); ASSERT_EQ(mmp.size(), 2); } - // split again + // Split again kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); { auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); @@ -462,46 +380,12 @@ void RegionKVStoreTest::testRaftSplit(KVStore & kvs, TMTContext & tmt) } } -void RegionKVStoreTest::testRaftChangePeer(KVStore & kvs, TMTContext & tmt) -{ - { - auto task_lock = kvs.genTaskLock(); - auto lock = kvs.genRegionWriteLock(task_lock); - auto region = makeRegion(88, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 100)); - lock.regions.emplace(88, region); - lock.index.add(region); - } - { - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeer); - auto meta = kvs.getRegion(88)->getMetaRegion(); - meta.mutable_peers()->Clear(); - meta.add_peers()->set_id(2); - meta.add_peers()->set_id(4); - *response.mutable_change_peer()->mutable_region() = meta; - kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 6, 5, tmt); - ASSERT_NE(kvs.getRegion(88), nullptr); - } - { - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2); - auto meta = kvs.getRegion(88)->getMetaRegion(); - meta.mutable_peers()->Clear(); - meta.add_peers()->set_id(3); - meta.add_peers()->set_id(4); - *response.mutable_change_peer()->mutable_region() = meta; - kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 7, 5, tmt); - ASSERT_EQ(kvs.getRegion(88), nullptr); - } -} - void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) { { + auto region_id = 7; kvs.getRegion(1)->clearAllData(); - kvs.getRegion(7)->clearAllData(); + kvs.getRegion(region_id)->clearAllData(); { auto region = kvs.getRegion(1); @@ -512,7 +396,7 @@ void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); } { - auto region = kvs.getRegion(7); + auto region = kvs.getRegion(region_id); auto table_id = 1; region->insert("lock", RecordKVFormat::genKey(table_id, 2), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); region->insert("default", RecordKVFormat::genKey(table_id, 2, 5), TiKVValue("value1")); @@ -525,23 +409,7 @@ void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) auto region_id = 7; auto source_region = kvs.getRegion(region_id); auto target_region = kvs.getRegion(1); - - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - - { - request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); - - auto * prepare_merge = request.mutable_prepare_merge(); - { - auto min_index = source_region->appliedIndex(); - prepare_merge->set_min_index(min_index); - - metapb::Region * target = prepare_merge->mutable_target(); - *target = target_region->getMetaRegion(); - } - } - + auto && [request, response] = MockRaftStoreProxy::composePrepareMerge(target_region->cloneMetaRegion(), source_region->appliedIndex()); kvs.handleAdminRaftCmd(std::move(request), std::move(response), source_region->id(), @@ -554,15 +422,8 @@ void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) { auto source_id = 7, target_id = 1; auto source_region = kvs.getRegion(source_id); - raft_cmdpb::AdminRequest request; - { - request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); - auto * commit_merge = request.mutable_commit_merge(); - { - commit_merge->set_commit(source_region->appliedIndex()); - *commit_merge->mutable_source() = source_region->getMetaRegion(); - } - } + + auto && [request, response] = MockRaftStoreProxy::composeCommitMerge(source_region->cloneMetaRegion(), source_region->appliedIndex()); source_region->setStateApplying(); source_region->makeRaftCommandDelegate(kvs.genTaskLock()); const auto & source_region_meta_delegate = source_region->meta.makeRaftCommandDelegate(); @@ -593,17 +454,7 @@ void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) { auto source_id = 7, target_id = 1; auto source_region = kvs.getRegion(source_id); - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - - { - request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); - auto * commit_merge = request.mutable_commit_merge(); - { - commit_merge->set_commit(source_region->appliedIndex()); - *commit_merge->mutable_source() = source_region->getMetaRegion(); - } - } + auto && [request, response] = MockRaftStoreProxy::composeCommitMerge(source_region->cloneMetaRegion(), source_region->appliedIndex()); { auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); ASSERT_TRUE(mmp.count(target_id) != 0); @@ -652,19 +503,22 @@ void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) } } -TEST_F(RegionKVStoreTest, Region) +TEST_F(RegionKVStoreTest, RegionReadWrite) { - createDefaultRegions(); + auto ctx = TiFlashTestEnv::getGlobalContext(); TableID table_id = 100; + KVStore & kvs = getKVS(); + UInt64 region_id = 1; + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::make_optional(std::make_pair(RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 1000)))); + auto region = kvs.getRegion(region_id); { + // Test create RegionMeta. auto meta = RegionMeta(createPeer(2, true), createRegionInfo(666, RecordKVFormat::genKey(0, 0), RecordKVFormat::genKey(0, 1000)), initialApplyState()); ASSERT_EQ(meta.peerId(), 2); } - auto region = makeRegion(1, RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 1000)); { + // Test GenRegionReadIndexReq. ASSERT_TRUE(region->checkIndex(5)); - } - { auto start_ts = 199; auto req = GenRegionReadIndexReq(*region, start_ts); ASSERT_EQ(req.ranges().size(), 1); @@ -675,12 +529,14 @@ TEST_F(RegionKVStoreTest, Region) ASSERT_EQ(region->getRange()->comparableKeys().second.key, req.ranges()[0].end_key()); } { + // Test read committed and lock with CommittedScanner. region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); ASSERT_EQ(1, region->writeCFCount()); ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); { + // There is a lock. auto iter = region->createCommittedScanner(); auto lock = iter.getLockInfo({100, nullptr}); ASSERT_NE(lock, nullptr); @@ -688,6 +544,7 @@ TEST_F(RegionKVStoreTest, Region) ASSERT_EQ(k->lock_version(), 3); } { + // The record is committed since there is a write record. std::optional data_list_read = ReadRegionCommitCache(region, true); ASSERT_TRUE(data_list_read); ASSERT_EQ(1, data_list_read->size()); @@ -703,6 +560,7 @@ TEST_F(RegionKVStoreTest, Region) region->clearAllData(); } { + // Test duplicate and tryCompactionFilter region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); ASSERT_EQ(region->dataInfo(), "[write 1 ]"); @@ -719,10 +577,11 @@ TEST_F(RegionKVStoreTest, Region) } ASSERT_EQ(ori_size, region->dataSize()); - region->tryCompactionFilter(100); + region->tryCompactionFilter(101); ASSERT_EQ(region->dataInfo(), "[]"); } { + // Test read and delete committed Del record. region->insert("write", RecordKVFormat::genKey(table_id, 4, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::DelFlag, 5)); ASSERT_EQ(1, region->writeCFCount()); region->remove("write", RecordKVFormat::genKey(table_id, 4, 8)); @@ -737,20 +596,17 @@ TEST_F(RegionKVStoreTest, Region) } { ASSERT_EQ(0, region->dataSize()); - region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); ASSERT_LT(0, region->dataSize()); - region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); ASSERT_EQ(0, region->dataSize()); - // remove duplicate records region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); ASSERT_EQ(0, region->dataSize()); } } -TEST_F(RegionKVStoreTest, KVStore) +TEST_F(RegionKVStoreTest, Writes) { createDefaultRegions(); auto ctx = TiFlashTestEnv::getGlobalContext(); @@ -758,7 +614,6 @@ TEST_F(RegionKVStoreTest, KVStore) KVStore & kvs = getKVS(); { // Run without read-index workers - kvs.initReadIndexWorkers( []() { return std::chrono::milliseconds(10); @@ -770,46 +625,32 @@ TEST_F(RegionKVStoreTest, KVStore) kvs.releaseReadIndexWorkers(); } { + // Test set set id auto store = metapb::Store{}; - store.set_id(1234); + store.set_id(2345); kvs.setStore(store); ASSERT_EQ(kvs.getStoreID(), store.id()); } { ASSERT_EQ(kvs.getRegion(0), nullptr); - auto task_lock = kvs.genTaskLock(); - auto lock = kvs.genRegionWriteLock(task_lock); - { - auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); - lock.regions.emplace(1, region); - lock.index.add(region); - } - { - auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); - lock.regions.emplace(2, region); - lock.index.add(region); - } - { - auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); - lock.regions.emplace(3, region); - lock.index.add(region); - } + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {1, 2, 3}, {{{RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)}, {RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)}, {RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)}}}); } { + // Test gc region persister kvs.tryPersistRegion(1); kvs.gcRegionPersistedCache(Seconds{0}); } { + // Check region range ASSERT_EQ(kvs.regionSize(), 3); auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); ASSERT_EQ(mmp.size(), 2); kvs.handleDestroy(3, ctx.getTMTContext()); kvs.handleDestroy(3, ctx.getTMTContext()); - } - { - RegionMap mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); - ASSERT_EQ(mmp.size(), 1); - ASSERT_EQ(mmp.at(2)->id(), 2); + + RegionMap mmp2 = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); + ASSERT_EQ(mmp2.size(), 1); + ASSERT_EQ(mmp2.at(2)->id(), 2); } { { @@ -926,223 +767,59 @@ TEST_F(RegionKVStoreTest, KVStore) kvs.handleDestroy(2, ctx.getTMTContext()); ASSERT_EQ(kvs.regionSize(), 1); } +} + + +TEST_F(RegionKVStoreTest, AdminSplit) +{ + createDefaultRegions(); + auto ctx = TiFlashTestEnv::getGlobalContext(); + KVStore & kvs = getKVS(); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {1}, {{RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)}}); { testRaftSplit(kvs, ctx.getTMTContext()); ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); } - { - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); - } - { - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - - request.mutable_compact_log(); - request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); - raft_cmdpb::AdminResponse first_response = response; - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); - - raft_cmdpb::AdminResponse second_response = response; - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); - - request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash); - raft_cmdpb::AdminResponse third_response = response; - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(third_response), 7, 24, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); +} - request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash); - raft_cmdpb::AdminResponse fourth_response = response; - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fourth_response), 7, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); +TEST_F(RegionKVStoreTest, AdminMerge) +{ + createDefaultRegions(); + auto ctx = TiFlashTestEnv::getGlobalContext(); + KVStore & kvs = getKVS(); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {1, 7}, {{RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5)}, {RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10)}}); - raft_cmdpb::AdminResponse fifth_response = response; - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fifth_response), 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); - { - kvs.setRegionCompactLogConfig(0, 0, 0); - request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); - ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 7, 26, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); - } - } { testRaftMergeRollback(kvs, ctx.getTMTContext()); testRaftMerge(kvs, ctx.getTMTContext()); } - { - testRaftChangePeer(kvs, ctx.getTMTContext()); - } - { - auto region_id = 19; - auto region = makeRegion(region_id, RecordKVFormat::genKey(1, 50), RecordKVFormat::genKey(1, 60)); - auto region_id_str = std::to_string(region_id); - auto & mmp = MockSSTReader::getMockSSTData(); - MockSSTReader::getMockSSTData().clear(); - MockSSTReader::Data default_kv_list; - { - default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); - default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); - } - mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); - std::vector sst_views; - sst_views.push_back(SSTView{ - ColumnFamilyType::Default, - BaseBuffView{region_id_str.data(), region_id_str.length()}, - }); - { - RegionMockTest mock_test(kvstore.get(), region); - - kvs.handleApplySnapshot( - region->getMetaRegion(), - 2, - SSTViewVec{sst_views.data(), sst_views.size()}, - 8, - 5, - ctx.getTMTContext()); - ASSERT_EQ(kvs.getRegion(region_id)->checkIndex(8), true); - try - { - kvs.handleApplySnapshot( - region->getMetaRegion(), - 2, - {}, // empty - 6, // smaller index - 5, - ctx.getTMTContext()); - ASSERT_TRUE(false); - } - catch (Exception & e) - { - ASSERT_EQ(e.message(), fmt::format("[region {}] already has newer apply-index 8 than 6, should not happen", region_id)); - } - } - - { - { - auto region = makeRegion(22, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); - auto ingest_ids = kvs.preHandleSnapshotToFiles( - region, - {}, - 9, - 5, - ctx.getTMTContext()); - kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); - } - try - { - auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); - auto ingest_ids = kvs.preHandleSnapshotToFiles( - region, - {}, - 9, - 5, - ctx.getTMTContext()); - kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, but not tombstone - ASSERT_TRUE(false); - } - catch (Exception & e) - { - ASSERT_EQ(e.message(), "range of region 20 is overlapped with 22, state: region { id: 22 }"); - } - - { - const auto * ori_ptr = proxy_helper->proxy_ptr.inner; - proxy_helper->proxy_ptr.inner = nullptr; - SCOPE_EXIT({ - proxy_helper->proxy_ptr.inner = ori_ptr; - }); - - try - { - auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); - auto ingest_ids = kvs.preHandleSnapshotToFiles( - region, - {}, - 10, - 5, - ctx.getTMTContext()); - kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); - ASSERT_TRUE(false); - } - catch (Exception & e) - { - ASSERT_EQ(e.message(), "getRegionLocalState meet internal error: RaftStoreProxyPtr is none"); - } - } - - { - proxy_instance->getRegion(22)->setSate(({ - raft_serverpb::RegionLocalState s; - s.set_state(::raft_serverpb::PeerState::Tombstone); - s; - })); - auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); - auto ingest_ids = kvs.preHandleSnapshotToFiles( - region, - {}, - 10, - 5, - ctx.getTMTContext()); - kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one +} - auto state = proxy_helper->getRegionLocalState(8192); - ASSERT_EQ(state.state(), raft_serverpb::PeerState::Tombstone); - } - kvs.handleDestroy(20, ctx.getTMTContext()); - } +TEST_F(RegionKVStoreTest, AdminChangePeer) +{ + UInt64 region_id = 88; + auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & kvs = getKVS(); + { + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::make_optional(std::make_pair(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 100)))); } - { - auto region_id = 19; - auto region_id_str = std::to_string(region_id); - auto & mmp = MockSSTReader::getMockSSTData(); - MockSSTReader::getMockSSTData().clear(); - MockSSTReader::Data default_kv_list; - { - default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); - default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); - } - mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); - - // Mock SST data for handle [star, end) - auto region = kvs.getRegion(region_id); - - RegionMockTest mock_test(kvstore.get(), region); - - { - // Mocking ingest a SST for column family "Write" - std::vector sst_views; - sst_views.push_back(SSTView{ - ColumnFamilyType::Default, - BaseBuffView{region_id_str.data(), region_id_str.length()}, - }); - kvs.handleIngestSST( - region_id, - SSTViewVec{sst_views.data(), sst_views.size()}, - 100, - 1, - ctx.getTMTContext()); - ASSERT_EQ(kvs.getRegion(region_id)->checkIndex(100), true); - } + auto meta = kvs.getRegion(region_id)->cloneMetaRegion(); + auto && [request, response] = MockRaftStoreProxy::composeChangePeer(std::move(meta), {2, 4}, false); + kvs.handleAdminRaftCmd(std::move(request), std::move(response), region_id, 6, 5, ctx.getTMTContext()); + ASSERT_NE(kvs.getRegion(region_id), nullptr); } - { - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - - request.mutable_compact_log(); - request.set_cmd_type(::raft_cmdpb::AdminCmdType::InvalidAdmin); - - try - { - kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 110, 6, ctx.getTMTContext()); - ASSERT_TRUE(false); - } - catch (Exception & e) - { - ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin"); - } + auto meta = kvs.getRegion(region_id)->cloneMetaRegion(); + auto && [request, response] = MockRaftStoreProxy::composeChangePeer(std::move(meta), {3, 4}); + kvs.handleAdminRaftCmd(std::move(request), std::move(response), region_id, 7, 5, ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(region_id), nullptr); } } - +// TODO Use test utils in new KVStore test for snapshot test. +// Otherwise data will not actually be inserted. class ApplySnapshotTest : public RegionKVStoreTest , public testing::WithParamInterface @@ -1223,7 +900,7 @@ try RegionMockTest mock_test(kvstore.get(), region); kvs.handleApplySnapshot( - region->getMetaRegion(), + region->cloneMetaRegion(), 2, SSTViewVec{sst_views.data(), sst_views.size()}, 8, @@ -1259,7 +936,7 @@ try RegionMockTest mock_test(kvstore.get(), region); kvs.handleApplySnapshot( - region->getMetaRegion(), + region->cloneMetaRegion(), 2, SSTViewVec{sst_views.data(), sst_views.size()}, 9, @@ -1276,14 +953,9 @@ try } // Finally, the region is migrated out { - raft_cmdpb::AdminRequest request; - raft_cmdpb::AdminResponse response; - request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2); - auto meta = kvs.getRegion(region_id)->getMetaRegion(); - meta.mutable_peers()->Clear(); - meta.add_peers()->set_id(3); - *response.mutable_change_peer()->mutable_region() = meta; - kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), region_id, 10, 6, ctx.getTMTContext()); + auto meta = kvs.getRegion(region_id)->cloneMetaRegion(); + auto && [request, response] = MockRaftStoreProxy::composeChangePeer(std::move(meta), {3}); + kvs.handleAdminRaftCmd(std::move(request), std::move(response), region_id, 10, 6, ctx.getTMTContext()); ASSERT_EQ(kvs.getRegion(region_id), nullptr); } { @@ -1305,93 +977,201 @@ try } CATCH -TEST_F(RegionKVStoreTest, KVStoreRestore) +TEST_F(RegionKVStoreTest, Ingests) +try { + createDefaultRegions(); + auto ctx = TiFlashTestEnv::getGlobalContext(); + KVStore & kvs = getKVS(); + // In this test we only deal with meta, + ASSERT_EQ(proxy_helper->sst_reader_interfaces.fn_key, nullptr); { - KVStore & kvs = getKVS(); + auto region_id = 19; + auto region = makeRegion(region_id, RecordKVFormat::genKey(1, 50), RecordKVFormat::genKey(1, 60)); + auto region_id_str = std::to_string(region_id); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; { - ASSERT_EQ(kvs.getRegion(0), nullptr); - auto task_lock = kvs.genTaskLock(); - auto lock = kvs.genRegionWriteLock(task_lock); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + // Will reject a snapshot with snaller index. + { + // Pre-handle snapshot to DTFiles is ignored because the table is dropped. + kvs.handleApplySnapshot( + region->cloneMetaRegion(), + 2, + SSTViewVec{sst_views.data(), sst_views.size()}, + 8, + 5, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(region_id)->checkIndex(8), true); + try { - auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); - lock.regions.emplace(1, region); - lock.index.add(region); + kvs.handleApplySnapshot( + region->cloneMetaRegion(), + 2, + {}, // empty snap files + 6, // smaller index + 5, + ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), fmt::format("[region {}] already has newer apply-index 8 than 6, should not happen", region_id)); } + } + + { + // Snapshot will be rejected if region overlaps. { - auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); - lock.regions.emplace(2, region); - lock.index.add(region); + auto region = makeRegion(22, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); } + try { - auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); - lock.regions.emplace(3, region); - lock.index.add(region); + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, but not tombstone + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "range of region 20 is overlapped with 22, state: region { id: 22 }"); } } - kvs.tryPersistRegion(1); - kvs.tryPersistRegion(2); - kvs.tryPersistRegion(3); - } - { - KVStore & kvs = reloadKVSFromDisk(); - kvs.getRegion(1); - kvs.getRegion(2); - kvs.getRegion(3); - } -} - -void test_mergeresult() -{ - ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", ""), createRegionInfo(1000, "", "x")).source_at_left, false); - ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "", "x"), createRegionInfo(1000, "x", "")).source_at_left, true); - ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", "y"), createRegionInfo(1000, "y", "z")).source_at_left, true); - ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "y", "z"), createRegionInfo(1000, "x", "y")).source_at_left, false); - - { - RegionState region_state; - bool source_at_left; - RegionState source_region_state; - - region_state.setStartKey(RecordKVFormat::genKey(1, 0)); - region_state.setEndKey(RecordKVFormat::genKey(1, 10)); + { + { + // Applying snapshot will throw if proxy is not inited. + const auto * ori_ptr = proxy_helper->proxy_ptr.inner; + proxy_helper->proxy_ptr.inner = nullptr; + SCOPE_EXIT({ + proxy_helper->proxy_ptr.inner = ori_ptr; + }); - source_region_state.setStartKey(RecordKVFormat::genKey(1, 10)); - source_region_state.setEndKey(RecordKVFormat::genKey(1, 20)); + try + { + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "getRegionLocalState meet internal error: RaftStoreProxyPtr is none"); + } + } - source_at_left = false; + { + // A snapshot can set region to Tombstone. + proxy_instance->getRegion(22)->setSate(({ + raft_serverpb::RegionLocalState s; + s.set_state(::raft_serverpb::PeerState::Tombstone); + s; + })); + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one - ChangeRegionStateRange(region_state, source_at_left, source_region_state); + auto state = proxy_helper->getRegionLocalState(8192); + ASSERT_EQ(state.state(), raft_serverpb::PeerState::Tombstone); + } - ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(1, 0)); - ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(1, 20)); + kvs.handleDestroy(20, ctx.getTMTContext()); + } } - { - RegionState region_state; - bool source_at_left; - RegionState source_region_state; - - region_state.setStartKey(RecordKVFormat::genKey(2, 5)); - region_state.setEndKey(RecordKVFormat::genKey(2, 10)); - source_region_state.setStartKey(RecordKVFormat::genKey(2, 0)); - source_region_state.setEndKey(RecordKVFormat::genKey(2, 5)); + { + auto region_id = 19; + auto region_id_str = std::to_string(region_id); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; + { + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); - source_at_left = true; + // Mock SST data for handle [start, end) + auto region = kvs.getRegion(region_id); - ChangeRegionStateRange(region_state, source_at_left, source_region_state); + { + // Mocking ingest a SST for column family "Write" + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + kvs.handleIngestSST( + region_id, + SSTViewVec{sst_views.data(), sst_views.size()}, + 100, + 1, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(region_id)->checkIndex(100), true); + } + } +} +CATCH - ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(2, 0)); - ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(2, 10)); +TEST_F(RegionKVStoreTest, Restore) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + { + KVStore & kvs = getKVS(); + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {1, 2, 3}, {{{RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)}, {RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)}, {RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)}}}); + } + kvs.tryPersistRegion(1); + kvs.tryPersistRegion(2); + kvs.tryPersistRegion(3); + } + { + KVStore & kvs = reloadKVSFromDisk(); + kvs.getRegion(1); + kvs.getRegion(2); + kvs.getRegion(3); } } -TEST_F(RegionKVStoreTest, Basic) +TEST_F(RegionKVStoreTest, RegionRange) { { + // Test findByRangeOverlap. RegionsRangeIndex region_index; const auto & root_map = region_index.getRoot(); - ASSERT_EQ(root_map.size(), 2); + ASSERT_EQ(root_map.size(), 2); // start and end all equals empty region_index.add(makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10))); @@ -1405,8 +1185,10 @@ TEST_F(RegionKVStoreTest, Basic) region_index.add(makeRegion(4, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4))); + // -inf,0,1,3,4,10,inf ASSERT_EQ(root_map.size(), 7); + // 1,2,3,4 res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); ASSERT_EQ(res.size(), 4); @@ -1440,6 +1222,7 @@ TEST_F(RegionKVStoreTest, Basic) } { + // Test add and remove. RegionsRangeIndex region_index; const auto & root_map = region_index.getRoot(); try @@ -1529,8 +1312,51 @@ TEST_F(RegionKVStoreTest, Basic) region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 2)), 2); ASSERT_EQ(root_map.size(), 2); } + // Test region range with merge. { - test_mergeresult(); + { + // Compute `source_at_left` by region range. + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", ""), createRegionInfo(1000, "", "x")).source_at_left, false); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "", "x"), createRegionInfo(1000, "x", "")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", "y"), createRegionInfo(1000, "y", "z")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "y", "z"), createRegionInfo(1000, "x", "y")).source_at_left, false); + } + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(1, 0)); + region_state.setEndKey(RecordKVFormat::genKey(1, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(1, 10)); + source_region_state.setEndKey(RecordKVFormat::genKey(1, 20)); + + source_at_left = false; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(1, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(1, 20)); + } + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(2, 5)); + region_state.setEndKey(RecordKVFormat::genKey(2, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(2, 0)); + source_region_state.setEndKey(RecordKVFormat::genKey(2, 5)); + + source_at_left = true; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(2, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(2, 10)); + } } } diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp index 3abcb1989aa..b81ff68b808 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore_fast_add_peer.cpp @@ -189,7 +189,7 @@ try KVStore & kvs = getKVS(); auto page_storage = global_context.getWriteNodePageStorage(); - proxy_instance->bootstrap(kvs, global_context.getTMTContext(), region_id, std::nullopt); + proxy_instance->bootstrapWithRegion(kvs, global_context.getTMTContext(), region_id, std::nullopt); auto region = proxy_instance->getRegion(region_id); auto store_id = kvs.getStore().store_id.load(); region->addPeer(store_id, peer_id, metapb::PeerRole::Learner); diff --git a/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp index 9b8676609be..17745af4a21 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp @@ -22,14 +22,14 @@ TEST_F(RegionKVStoreTest, KVStoreFailRecovery) try { auto ctx = TiFlashTestEnv::getGlobalContext(); + KVStore & kvs = getKVS(); { auto applied_index = 0; auto region_id = 1; { - KVStore & kvs = getKVS(); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); MockRaftStoreProxy::FailCond cond; + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {1}, {{{RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)}}}); auto kvr1 = kvs.getRegion(region_id); auto r1 = proxy_instance->getRegion(region_id); ASSERT_NE(r1, nullptr); @@ -56,8 +56,7 @@ try auto applied_index = 0; auto region_id = 2; { - KVStore & kvs = getKVS(); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {2}, {{{RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)}}}); MockRaftStoreProxy::FailCond cond; cond.type = MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_WRITE; @@ -89,8 +88,7 @@ try auto applied_index = 0; auto region_id = 3; { - KVStore & kvs = getKVS(); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {3}, {{{RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)}}}); MockRaftStoreProxy::FailCond cond; cond.type = MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_ADVANCE; @@ -120,8 +118,7 @@ try auto applied_index = 0; auto region_id = 4; { - KVStore & kvs = getKVS(); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {4}, {{{RecordKVFormat::genKey(1, 50), RecordKVFormat::genKey(1, 60)}}}); MockRaftStoreProxy::FailCond cond; cond.type = MockRaftStoreProxy::FailCond::Type::BEFORE_PROXY_ADVANCE; @@ -165,8 +162,8 @@ try { initStorages(); KVStore & kvs = getKVS(); - proxy_instance->bootstrap_table(ctx, kvs, ctx.getTMTContext()); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); + proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::nullopt); MockRaftStoreProxy::FailCond cond; @@ -197,12 +194,13 @@ TEST_F(RegionKVStoreTest, KVStoreAdminCommands) try { auto ctx = TiFlashTestEnv::getGlobalContext(); + // CompactLog and passive persistence { - auto applied_index = 0; - auto region_id = 1; + KVStore & kvs = getKVS(); + UInt64 region_id = 1; { - KVStore & kvs = getKVS(); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); + auto applied_index = 0; + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::nullopt); MockRaftStoreProxy::FailCond cond; auto kvr1 = kvs.getRegion(region_id); @@ -226,27 +224,79 @@ try ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 2); ASSERT_EQ(kvr1->appliedIndex(), applied_index + 2); } + { + proxy_instance->normalWrite(region_id, {34}, {"v2"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + // There shall be data to flush. + ASSERT_EQ(kvs.needFlushRegionData(region_id, ctx.getTMTContext()), true); + // If flush fails, and we don't insist a success. + FailPointHelper::enableFailPoint(FailPoints::force_fail_in_flush_region_data); + ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, false, ctx.getTMTContext(), 0, 0), false); + FailPointHelper::disableFailPoint(FailPoints::force_fail_in_flush_region_data); + // Force flush until succeed only for testing. + ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, true, ctx.getTMTContext(), 0, 0), true); + // Non existing region. + // Flush and CompactLog will not panic. + ASSERT_EQ(kvs.tryFlushRegionData(1999, false, true, ctx.getTMTContext(), 0, 0), true); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 1999, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } } { KVStore & kvs = getKVS(); - auto region_id = 1; - proxy_instance->normalWrite(region_id, {34}, {"v2"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); - // There shall be data to flush. - ASSERT_EQ(kvs.needFlushRegionData(region_id, ctx.getTMTContext()), true); - // If flush fails, and we don't insist a success. - FailPointHelper::enableFailPoint(FailPoints::force_fail_in_flush_region_data); - ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, false, ctx.getTMTContext(), 0, 0), false); - FailPointHelper::disableFailPoint(FailPoints::force_fail_in_flush_region_data); - // Force flush until succeed only for testing. - ASSERT_EQ(kvs.tryFlushRegionData(region_id, false, true, ctx.getTMTContext(), 0, 0), true); - // Non existing region. - // Flush and CompactLog will not panic. - ASSERT_EQ(kvs.tryFlushRegionData(1999, false, true, ctx.getTMTContext(), 0, 0), true); + UInt64 region_id = 2; + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{{RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)}}}); + + // InvalidAdmin raft_cmdpb::AdminRequest request; raft_cmdpb::AdminResponse response; + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::InvalidAdmin); + try + { + kvs.handleAdminRaftCmd(std::move(request), std::move(response), region_id, 110, 6, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin"); + } + } + { + // All "useless" commands. + KVStore & kvs = getKVS(); + UInt64 region_id = 3; + proxy_instance->debugAddRegions(kvs, ctx.getTMTContext(), {region_id}, {{{RecordKVFormat::genKey(1, 20), RecordKVFormat::genKey(1, 30)}}}); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response2; + raft_cmdpb::AdminResponse response; + request.mutable_compact_log(); request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); - ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 1999, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + response = response2; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), region_id, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + response = response2; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), region_id, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + response = response2; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash); + response = response2; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), region_id, 24, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash); + response = response2; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), region_id, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + { + kvs.setRegionCompactLogConfig(0, 0, 0); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response2), region_id, 26, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + } } } CATCH @@ -283,7 +333,7 @@ static void validate(KVStore & kvs, std::unique_ptr & proxy_ ASSERT_EQ(counter, key_count); } -TEST_F(RegionKVStoreTest, KVStoreSnapshot) +TEST_F(RegionKVStoreTest, KVStoreSnapshotV1) try { auto ctx = TiFlashTestEnv::getGlobalContext(); @@ -293,10 +343,12 @@ try { initStorages(); KVStore & kvs = getKVS(); - table_id = proxy_instance->bootstrap_table(ctx, kvs, ctx.getTMTContext()); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); + table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + LOG_INFO(&Poco::Logger::get("Test"), "generated table_id {}", table_id); + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::nullopt); auto kvr1 = kvs.getRegion(region_id); auto r1 = proxy_instance->getRegion(region_id); + ctx.getTMTContext().getRegionTable().updateRegion(*kvr1); { // Only one file MockSSTReader::getMockSSTData().clear(); @@ -455,7 +507,7 @@ try CATCH -TEST_F(RegionKVStoreTest, KVStoreSnapshot2) +TEST_F(RegionKVStoreTest, KVStoreSnapshotV2) try { auto ctx = TiFlashTestEnv::getGlobalContext(); @@ -465,8 +517,8 @@ try region_id = 2; initStorages(); KVStore & kvs = getKVS(); - table_id = proxy_instance->bootstrap_table(ctx, kvs, ctx.getTMTContext()); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); + table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + proxy_instance->bootstrapWithRegion(kvs, ctx.getTMTContext(), region_id, std::nullopt); auto kvr1 = kvs.getRegion(region_id); auto r1 = proxy_instance->getRegion(region_id); { @@ -521,42 +573,6 @@ try } } } - return; - - { - // This is the case that kvs not belong to this region has been written. - // TODO Why should we allow this happen? Is it because we may be slow to update region range before some later valid keys are added? Or just because it is expensive to do this check? - region_id = 3; - initStorages(); - KVStore & kvs = getKVS(); - table_id = proxy_instance->bootstrap_table(ctx, kvs, ctx.getTMTContext()); - proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id, std::nullopt); - auto kvr1 = kvs.getRegion(region_id); - auto r1 = proxy_instance->getRegion(region_id); - { - // Shall filter out of range kvs. - auto klo = RecordKVFormat::genKey(table_id - 1, 1); - auto kro = RecordKVFormat::genKey(table_id + 1, 0); - MockSSTReader::getMockSSTData().clear(); - MockRaftStoreProxy::Cf default_cf{region_id, table_id, ColumnFamilyType::Default}; - default_cf.insert_raw(klo, "v1"); - default_cf.insert_raw(kro, "v1"); - default_cf.finish_file(SSTFormatKind::KIND_SST); - default_cf.freeze(); - MockRaftStoreProxy::Cf write_cf{region_id, table_id, ColumnFamilyType::Write}; - default_cf.insert_raw(klo, "v1"); - default_cf.insert_raw(kro, "v1"); - write_cf.finish_file(SSTFormatKind::KIND_SST); - write_cf.freeze(); - - proxy_instance->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf, write_cf}, 0, 0); - MockRaftStoreProxy::FailCond cond; - { - auto [index, term] = proxy_instance->rawWrite(region_id, {klo}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); - EXPECT_THROW(proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index), Exception); - } - } - } } CATCH diff --git a/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp b/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp index 5c5ac03dd8e..95e98fe9eaa 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp @@ -82,10 +82,10 @@ void ReadIndexTest::testError() auto resp = future->poll(); ASSERT(!resp); } - ASSERT_EQ(proxy_instance.tasks.size(), 1); + ASSERT_EQ(proxy_instance.read_index_tasks.size(), 1); // force response region error `data_is_not_ready` - proxy_instance.tasks.front()->update(false, true); + proxy_instance.read_index_tasks.front()->update(false, true); for (auto & future : futures) { @@ -129,10 +129,10 @@ void ReadIndexTest::testError() auto resp = future->poll(); ASSERT(!resp); } - ASSERT_EQ(proxy_instance.tasks.size(), 1); + ASSERT_EQ(proxy_instance.read_index_tasks.size(), 1); // force response to have lock - proxy_instance.tasks.front()->update(true, false); + proxy_instance.read_index_tasks.front()->update(true, false); proxy_instance.runOneRound(); for (auto & future : futures) @@ -445,7 +445,7 @@ void ReadIndexTest::testNormal() } } over = true; - proxy_instance.wake(); + proxy_instance.wakeNotifier(); proxy_runner.join(); manager.reset(); ASSERT(GCMonitor::instance().checkClean()); @@ -588,7 +588,7 @@ void ReadIndexTest::testBatch() } manager->runOneRoundAll(); { - auto & t = proxy_instance.tasks.back(); + auto & t = proxy_instance.read_index_tasks.back(); ASSERT_EQ(t->req.start_ts(), 10); t->update(); // only response ts `10` } diff --git a/dbms/src/Storages/Transaction/tests/kvstore_helper.h b/dbms/src/Storages/Transaction/tests/kvstore_helper.h index 8b191d462c5..eec095f68de 100644 --- a/dbms/src/Storages/Transaction/tests/kvstore_helper.h +++ b/dbms/src/Storages/Transaction/tests/kvstore_helper.h @@ -93,7 +93,10 @@ class RegionKVStoreTest : public ::testing::Test } } - void TearDown() override {} + void TearDown() override + { + proxy_instance->clear(); + } protected: KVStore & getKVS() { return *kvstore; } @@ -135,12 +138,12 @@ class RegionKVStoreTest : public ::testing::Test protected: static void testRaftSplit(KVStore & kvs, TMTContext & tmt); static void testRaftMerge(KVStore & kvs, TMTContext & tmt); - static void testRaftChangePeer(KVStore & kvs, TMTContext & tmt); static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt); static std::unique_ptr createCleanPathPool(const String & path) { // Drop files on disk + LOG_INFO(Logger::get("Test"), "Clean path {} for bootstrap", path); Poco::File file(path); if (file.exists()) file.remove(true); diff --git a/dbms/src/Storages/Transaction/tests/region_helper.h b/dbms/src/Storages/Transaction/tests/region_helper.h index 39bae2669ab..8e1beba0abd 100644 --- a/dbms/src/Storages/Transaction/tests/region_helper.h +++ b/dbms/src/Storages/Transaction/tests/region_helper.h @@ -58,16 +58,34 @@ inline metapb::Peer createPeer(UInt64 id, bool) return peer; } -inline metapb::Region createRegionInfo(UInt64 id, const std::string start_key, const std::string end_key) +inline metapb::Region createRegionInfo(UInt64 id, const std::string start_key, const std::string end_key, std::optional maybe_epoch = std::nullopt, std::optional> maybe_peers = std::nullopt) { metapb::Region region_info; region_info.set_id(id); region_info.set_start_key(start_key); region_info.set_end_key(end_key); - region_info.mutable_region_epoch()->set_version(5); - region_info.mutable_region_epoch()->set_version(6); - *(region_info.mutable_peers()->Add()) = createPeer(1, true); - *(region_info.mutable_peers()->Add()) = createPeer(2, false); + if (maybe_epoch) + { + *region_info.mutable_region_epoch() = (maybe_epoch.value()); + } + else + { + region_info.mutable_region_epoch()->set_version(5); + region_info.mutable_region_epoch()->set_version(6); + } + if (maybe_peers) + { + auto & peers = maybe_peers.value(); + for (auto it = peers.begin(); it != peers.end(); it++) + { + *(region_info.mutable_peers()->Add()) = *it; + } + } + else + { + *(region_info.mutable_peers()->Add()) = createPeer(1, true); + *(region_info.mutable_peers()->Add()) = createPeer(2, false); + } return region_info; }