Skip to content

Commit

Permalink
Improve KVStore tests (#7537)
Browse files Browse the repository at this point in the history
ref #7256
  • Loading branch information
CalvinNeo authored May 25, 2023
1 parent 67b6b16 commit cb951a3
Show file tree
Hide file tree
Showing 18 changed files with 827 additions and 828 deletions.
180 changes: 136 additions & 44 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ MockReadIndexTask * MockRaftStoreProxy::makeReadIndexTask(kvrpcpb::ReadIndexRequ
{
auto _ = genLockGuard();

wake();
wakeNotifier();

auto region = doGetRegion(req.context().region_id());
if (region)
Expand All @@ -309,7 +309,7 @@ MockReadIndexTask * MockRaftStoreProxy::makeReadIndexTask(kvrpcpb::ReadIndexRequ
r->data = std::make_shared<RawMockReadIndexTask>();
r->data->req = std::move(req);
r->data->region = region;
tasks.push_back(r->data);
read_index_tasks.push_back(r->data);
return r;
}
return nullptr;
Expand All @@ -330,7 +330,7 @@ size_t MockRaftStoreProxy::size() const
return regions.size();
}

void MockRaftStoreProxy::wake()
void MockRaftStoreProxy::wakeNotifier()
{
notifier.wake();
}
Expand All @@ -347,17 +347,18 @@ void MockRaftStoreProxy::testRunNormal(const std::atomic_bool & over)
void MockRaftStoreProxy::runOneRound()
{
auto _ = genLockGuard();
while (!tasks.empty())
while (!read_index_tasks.empty())
{
auto & t = *tasks.front();
if (!region_id_to_drop.count(t.req.context().region_id()))
auto & t = *read_index_tasks.front();
auto region_id = t.req.context().region_id();
if (!region_id_to_drop.contains(region_id))
{
if (region_id_to_error.count(t.req.context().region_id()))
if (region_id_to_error.contains(region_id))
t.update(false, true);
else
t.update(false, false);
}
tasks.pop_front();
read_index_tasks.pop_front();
}
}

Expand All @@ -367,24 +368,40 @@ void MockRaftStoreProxy::unsafeInvokeForTest(std::function<void(MockRaftStorePro
cb(*this);
}

void MockRaftStoreProxy::bootstrap(
void MockRaftStoreProxy::bootstrapWithRegion(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
std::optional<std::pair<std::string, std::string>> maybe_range)
{
{
auto _ = genLockGuard();
RUNTIME_CHECK_MSG(regions.empty(), "Mock Proxy regions are not cleared");
auto task_lock = kvs.genTaskLock();
auto lock = kvs.genRegionWriteLock(task_lock);
RUNTIME_CHECK_MSG(lock.regions.empty(), "KVStore regions are not cleared");
}
auto start = RecordKVFormat::genKey(table_id, 0);
auto end = RecordKVFormat::genKey(table_id + 1, 0);
debugAddRegions(kvs, tmt, {region_id}, {maybe_range.value_or(std::make_pair(start.toString(), end.toString()))});
}

void MockRaftStoreProxy::debugAddRegions(
KVStore & kvs,
TMTContext & tmt,
std::vector<UInt64> region_ids,
std::vector<std::pair<std::string, std::string>> && ranges)
{
UNUSED(tmt);
int n = ranges.size();
auto _ = genLockGuard();
regions.emplace(region_id, std::make_shared<MockProxyRegion>(region_id));

auto task_lock = kvs.genTaskLock();
auto lock = kvs.genRegionWriteLock(task_lock);
for (int i = 0; i < n; ++i)
{
auto start = RecordKVFormat::genKey(table_id, 0);
auto end = RecordKVFormat::genKey(table_id + 1, 0);
auto range = maybe_range.value_or(std::make_pair(start.toString(), end.toString()));
auto region = tests::makeRegion(region_id, range.first, range.second);
lock.regions.emplace(region_id, region);
regions.emplace(region_ids[i], std::make_shared<MockProxyRegion>(region_ids[i]));
auto region = tests::makeRegion(region_ids[i], ranges[i].first, ranges[i].second, kvs.getProxyHelper());
lock.regions.emplace(region_ids[i], region);
lock.index.add(region);
}
}
Expand Down Expand Up @@ -464,7 +481,7 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::rawWrite(
}


std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index)
std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest && request, raft_cmdpb::AdminResponse && response)
{
uint64_t index = 0;
uint64_t term = 0;
Expand All @@ -477,16 +494,6 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id,
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);
// We record them, as persisted raft log, for potential recovery.
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.mutable_compact_log();
request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog);
request.mutable_compact_log()->set_compact_index(compact_index);
// Find compact term, otherwise log must have been compacted.
if (region->commands.count(compact_index))
{
request.mutable_compact_log()->set_compact_term(region->commands[index].term);
}
region->commands[index] = {
term,
MockProxyRegion::AdminCommand{
Expand All @@ -497,6 +504,100 @@ std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id,
return std::make_tuple(index, term);
}

std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index)
{
auto region = getRegion(region_id);
assert(region != nullptr);
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog);
request.mutable_compact_log()->set_compact_index(compact_index);
// Find compact term, otherwise log must have been compacted.
if (region->commands.contains(compact_index))
{
request.mutable_compact_log()->set_compact_term(region->commands[compact_index].term);
}
return adminCommand(region_id, std::move(request), std::move(response));
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeChangePeer(metapb::Region && meta, std::vector<UInt64> peer_ids, bool is_v2)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
if (is_v2)
{
request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2);
}
else
{
request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeer);
}
meta.mutable_peers()->Clear();
for (auto i : peer_ids)
{
meta.add_peers()->set_id(i);
}
*response.mutable_change_peer()->mutable_region() = meta;
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composePrepareMerge(metapb::Region && target, UInt64 min_index)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge);
auto * prepare_merge = request.mutable_prepare_merge();
prepare_merge->set_min_index(min_index);
*prepare_merge->mutable_target() = target;
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeCommitMerge(metapb::Region && source, UInt64 commit)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge);
auto * commit_merge = request.mutable_commit_merge();
commit_merge->set_commit(commit);
*commit_merge->mutable_source() = source;
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeRollbackMerge(UInt64 commit)
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge);
auto * rollback_merge = request.mutable_rollback_merge();
rollback_merge->set_commit(commit);
return std::make_tuple(request, response);
}

std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> MockRaftStoreProxy::composeBatchSplit(std::vector<UInt64> && region_ids, std::vector<std::pair<std::string, std::string>> && ranges, metapb::RegionEpoch old_epoch)
{
RUNTIME_CHECK_MSG(region_ids.size() == ranges.size(), "error composeBatchSplit input");
auto n = region_ids.size();
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.set_cmd_type(raft_cmdpb::AdminCmdType::BatchSplit);
metapb::RegionEpoch new_epoch;
new_epoch.set_version(old_epoch.version() + 1);
new_epoch.set_conf_ver(old_epoch.conf_ver());
{
raft_cmdpb::BatchSplitResponse * splits = response.mutable_splits();
for (size_t i = 0; i < n; ++i)
{
auto * region = splits->add_regions();
region->set_id(region_ids[i]);
region->set_start_key(ranges[i].first);
region->set_end_key(ranges[i].second);
region->add_peers();
*region->mutable_region_epoch() = new_epoch;
}
}
return std::make_tuple(request, response);
}

void MockRaftStoreProxy::doApply(
KVStore & kvs,
TMTContext & tmt,
Expand Down Expand Up @@ -668,7 +769,7 @@ void MockRaftStoreProxy::snapshot(
term = region->getLatestCommitTerm();
}

auto new_kv_region = kvs.genRegionPtr(old_kv_region->getMetaRegion(), old_kv_region->mutMeta().peerId(), index, term);
auto new_kv_region = kvs.genRegionPtr(old_kv_region->cloneMetaRegion(), old_kv_region->mutMeta().peerId(), index, term);
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);

Expand All @@ -695,16 +796,21 @@ void MockRaftStoreProxy::snapshot(
new_kv_region->setApplied(index, term);
}

TableID MockRaftStoreProxy::bootstrap_table(
TableID MockRaftStoreProxy::bootstrapTable(
Context & ctx,
KVStore & kvs,
TMTContext & tmt)
TMTContext & tmt,
bool drop_at_first)
{
UNUSED(kvs);
ColumnsDescription columns;
auto & data_type_factory = DataTypeFactory::instance();
columns.ordinary = NamesAndTypesList({NameAndTypePair{"a", data_type_factory.get("Int64")}});
auto tso = tmt.getPDClient()->getTS();
if (drop_at_first)
{
MockTiDB::instance().dropDB(ctx, "d", true);
}
MockTiDB::instance().newDataBase("d");
// Make sure there is a table with smaller id.
MockTiDB::instance().newTable("d", "prevt" + toString(random()), columns, tso, "", "dt");
Expand All @@ -716,20 +822,6 @@ TableID MockRaftStoreProxy::bootstrap_table(
return table_id;
}

void MockRaftStoreProxy::clear_tables(
Context & ctx,
KVStore & kvs,
TMTContext & tmt)
{
UNUSED(kvs);
UNUSED(tmt);
if (this->table_id != 1)
{
MockTiDB::instance().dropTable(ctx, "d", "t", false);
}
this->table_id = 1;
}

void GCMonitor::add(RawObjType type, int64_t diff)
{
auto _ = genLockGuard();
Expand Down
47 changes: 35 additions & 12 deletions dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ struct MockRaftStoreProxy : MutexLockWrap

size_t size() const;

void wake();
void wakeNotifier();

void testRunNormal(const std::atomic_bool & over);

/// Handle one read index task.
void runOneRound();

void unsafeInvokeForTest(std::function<void(MockRaftStoreProxy &)> && cb);
Expand All @@ -183,27 +184,33 @@ struct MockRaftStoreProxy : MutexLockWrap
Type type = NORMAL;
};

/// boostrap a region.
void bootstrap(
/// Boostrap with a given region.
/// Similar to TiKV's `bootstrap_region`.
void bootstrapWithRegion(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
std::optional<std::pair<std::string, std::string>> maybe_range);

/// boostrap a table, since applying snapshot needs table schema.
TableID bootstrap_table(
/// Boostrap a table.
/// Must be called if:
/// 1. Applying snapshot which needs table schema
/// 2. Doing row2col.
TableID bootstrapTable(
Context & ctx,
KVStore & kvs,
TMTContext & tmt);
TMTContext & tmt,
bool drop_at_first = true);

/// clear tables.
void clear_tables(
Context & ctx,
/// Manually add a region.
void debugAddRegions(
KVStore & kvs,
TMTContext & tmt);
TMTContext & tmt,
std::vector<UInt64> region_ids,
std::vector<std::pair<std::string, std::string>> && ranges);

/// We assume that we generate one command, and immediately commit.
/// normal write to a region.
/// Normal write to a region.
std::tuple<uint64_t, uint64_t> normalWrite(
UInt64 region_id,
std::vector<HandleID> && keys,
Expand All @@ -221,6 +228,14 @@ struct MockRaftStoreProxy : MutexLockWrap
/// Create a compactLog admin command, returns (index, term) of the admin command itself.
std::tuple<uint64_t, uint64_t> compactLog(UInt64 region_id, UInt64 compact_index);

std::tuple<uint64_t, uint64_t> adminCommand(UInt64 region_id, raft_cmdpb::AdminRequest &&, raft_cmdpb::AdminResponse &&);

static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeChangePeer(metapb::Region && meta, std::vector<UInt64> peer_ids, bool is_v2 = true);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composePrepareMerge(metapb::Region && target, UInt64 min_index);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeCommitMerge(metapb::Region && source, UInt64 commit);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeRollbackMerge(UInt64 commit);
static std::tuple<raft_cmdpb::AdminRequest, raft_cmdpb::AdminResponse> composeBatchSplit(std::vector<UInt64> && region_ids, std::vector<std::pair<std::string, std::string>> && ranges, metapb::RegionEpoch old_epoch);

struct Cf
{
Cf(UInt64 region_id_, TableID table_id_, ColumnFamilyType type_);
Expand Down Expand Up @@ -273,16 +288,24 @@ struct MockRaftStoreProxy : MutexLockWrap
uint64_t region_id,
uint64_t to);

void clear()
{
auto _ = genLockGuard();
regions.clear();
}

MockRaftStoreProxy()
{
log = Logger::get("MockRaftStoreProxy");
table_id = 1;
}

// Mock Proxy will drop read index requests to these regions
std::unordered_set<uint64_t> region_id_to_drop;
// Mock Proxy will return error read index response to these regions
std::unordered_set<uint64_t> region_id_to_error;
std::map<uint64_t, MockProxyRegionPtr> regions;
std::list<std::shared_ptr<RawMockReadIndexTask>> tasks;
std::list<std::shared_ptr<RawMockReadIndexTask>> read_index_tasks;
AsyncWaker::Notifier notifier;
TableID table_id;
LoggerPtr log;
Expand Down
Loading

0 comments on commit cb951a3

Please sign in to comment.