Skip to content

Commit

Permalink
Add tests for learner read (pingcap#7549)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo committed Jul 3, 2023
1 parent af77857 commit 520f0df
Show file tree
Hide file tree
Showing 19 changed files with 228 additions and 63 deletions.
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ void MockRaftStoreProxy::init(size_t region_num)
}
}

std::unique_ptr<TiFlashRaftProxyHelper> MockRaftStoreProxy::generateProxyHelper()
{
auto proxy_helper = std::make_unique<TiFlashRaftProxyHelper>(MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(
RaftStoreProxyPtr{this}));
// Bind ffi to MockSSTReader.
proxy_helper->sst_reader_interfaces = make_mock_sst_reader_interface();
return proxy_helper;
}

size_t MockRaftStoreProxy::size() const
{
auto _ = genLockGuard();
Expand Down
16 changes: 15 additions & 1 deletion dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ struct MockRaftStoreProxy : MutexLockWrap
}

MockProxyRegionPtr getRegion(uint64_t id);

MockProxyRegionPtr doGetRegion(uint64_t id);

MockReadIndexTask * makeReadIndexTask(kvrpcpb::ReadIndexRequest req);

void init(size_t region_num);
std::unique_ptr<TiFlashRaftProxyHelper> generateProxyHelper();

size_t size() const;

Expand Down Expand Up @@ -332,4 +332,18 @@ struct GCMonitor : MutexLockWrap
static GCMonitor global_gc_monitor;
};

template <typename... Types>
std::vector<std::pair<std::string, std::string>> regionRangeToEncodeKeys(Types &&... args)
{
// RegionRangeKeys::RegionRange is not copy-constructible, however, initialize_list need copy construction.
// So we have to so this way, rather than create a composeXXX that accepts a vector of RegionRangeKeys::RegionRange.
std::vector<std::pair<std::string, std::string>> ranges_str;
([&] {
auto & x = args;
ranges_str.emplace_back(std::make_pair(x.first.toString(), x.second.toString()));
}(),
...);
return ranges_str;
}

} // namespace DB
15 changes: 0 additions & 15 deletions dbms/src/Debug/MockSSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,4 @@ struct MockSSTReader
};

SSTReaderInterfaces make_mock_sst_reader_interface();

class RegionMockTest final
{
public:
RegionMockTest(KVStore * kvstore_, RegionPtr region_);
~RegionMockTest();

DISALLOW_COPY_AND_MOVE(RegionMockTest);

private:
TiFlashRaftProxyHelper mock_proxy_helper{};
const TiFlashRaftProxyHelper * ori_proxy_helper{};
KVStore * kvstore;
RegionPtr region;
};
} // namespace DB
15 changes: 15 additions & 0 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,21 @@ void MockRaftCommand::dbgFuncRegionSnapshot(Context & context, const ASTs & args

std::map<MockSSTReader::Key, MockSSTReader::Data> MockSSTReader::MockSSTData;

class RegionMockTest final
{
public:
RegionMockTest(KVStore * kvstore_, RegionPtr region_);
~RegionMockTest();

DISALLOW_COPY_AND_MOVE(RegionMockTest);

private:
TiFlashRaftProxyHelper mock_proxy_helper{};
const TiFlashRaftProxyHelper * ori_proxy_helper{};
KVStore * kvstore;
RegionPtr region;
};

RegionMockTest::RegionMockTest(KVStore * kvstore_, RegionPtr region_)
: kvstore(kvstore_)
, region(region_)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Storages/Transaction/RegionDataRead.h>
#include <Storages/Transaction/RegionManager.h>
#include <Storages/Transaction/RegionRangeKeys.h>
#include <Storages/Transaction/StorageEngineType.h>

namespace TiDB
Expand Down Expand Up @@ -85,7 +86,7 @@ class KVStore final : private boost::noncopyable

RegionPtr getRegion(RegionID region_id) const;

using RegionRange = std::pair<TiKVRangeKey, TiKVRangeKey>;
using RegionRange = RegionRangeKeys::RegionRange;

RegionMap getRegionsByRangeOverlap(const RegionRange & range) const;

Expand Down
17 changes: 10 additions & 7 deletions dbms/src/Storages/Transaction/LearnerRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,36 +96,37 @@ class MvccQueryInfoWrap
using Base = MvccQueryInfo;
Base & inner;
std::optional<Base::RegionsQueryInfo> regions_info;
Base::RegionsQueryInfo * regions_info_ptr;
// Points to either `regions_info` or `mvcc_query_info.regions_query_info`.
Base::RegionsQueryInfo * regions_query_info_ptr;

public:
MvccQueryInfoWrap(Base & mvcc_query_info, TMTContext & tmt, const TiDB::TableID logical_table_id)
: inner(mvcc_query_info)
{
if (likely(!inner.regions_query_info.empty()))
{
regions_info_ptr = &inner.regions_query_info;
regions_query_info_ptr = &inner.regions_query_info;
}
else
{
regions_info = Base::RegionsQueryInfo();
regions_info_ptr = &*regions_info;
// Only for test, because regions_query_info should never be empty if query is from TiDB or TiSpark.
regions_query_info_ptr = &*regions_info;
// Only for (integration) test, because regions_query_info should never be empty if query is from TiDB or TiSpark.
// todo support partition table
auto regions = tmt.getRegionTable().getRegionsByTable(NullspaceID, logical_table_id);
regions_info_ptr->reserve(regions.size());
regions_query_info_ptr->reserve(regions.size());
for (const auto & [id, region] : regions)
{
if (region == nullptr)
continue;
regions_info_ptr->emplace_back(
regions_query_info_ptr->emplace_back(
RegionQueryInfo{id, region->version(), region->confVer(), logical_table_id, region->getRange()->rawKeys(), {}});
}
}
}
Base * operator->() { return &inner; }

const Base::RegionsQueryInfo & getRegionsInfo() const { return *regions_info_ptr; }
const Base::RegionsQueryInfo & getRegionsInfo() const { return *regions_query_info_ptr; }
void addReadIndexRes(RegionID region_id, UInt64 read_index)
{
inner.read_index_res[region_id] = read_index;
Expand Down Expand Up @@ -228,6 +229,7 @@ LearnerReadSnapshot doLearnerRead(
}
}
}

GET_METRIC(tiflash_stale_read_count).Increment(stats.num_stale_read);
GET_METRIC(tiflash_raft_read_index_count).Increment(batch_read_index_req.size());

Expand Down Expand Up @@ -457,6 +459,7 @@ void validateQueryInfo(
if (auto iter = regions_snapshot.find(region_query_info.region_id); //
iter == regions_snapshot.end() || iter->second != region)
{
// If snapshot is applied during learner read, we should abort with an exception later.
status = RegionException::RegionReadStatus::NOT_FOUND;
}
else if (region->version() != region_query_info.version)
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,10 @@ std::tuple<WaitIndexResult, double> Region::waitIndex(UInt64 index, const UInt64
{
Stopwatch wait_index_watch;
LOG_DEBUG(log,
"{} need to wait learner index {}",
"{} need to wait learner index {} timeout {}",
toString(),
index);
index,
timeout_ms);
auto wait_idx_res = meta.waitIndex(index, timeout_ms, std::move(check_running));
auto elapsed_secs = wait_index_watch.elapsedSeconds();
switch (wait_idx_res)
Expand Down Expand Up @@ -579,7 +580,6 @@ void Region::assignRegion(Region && new_region)
std::unique_lock<std::shared_mutex> lock(mutex);

data.assignRegionData(std::move(new_region.data));

meta.assignRegionMeta(std::move(new_region.meta));
meta.notifyAll();
}
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class Region : public std::enable_shared_from_this<Region>
return region1.meta == region2.meta && region1.data == region2.data;
}

// Check if we can read by this index.
bool checkIndex(UInt64 index) const;

// Return <WaitIndexResult, time cost(seconds)> for wait-index.
Expand All @@ -178,14 +179,9 @@ class Region : public std::enable_shared_from_this<Region>

RegionMetaSnapshot dumpRegionMetaSnapshot() const;

// Assign data and meta by moving from `new_region`.
void assignRegion(Region && new_region);

using HandleMap = std::unordered_map<HandleID, std::tuple<Timestamp, UInt8>>;

/// Only can be used for applying snapshot. only can be called by single thread.
/// Try to fill record with delmark if it exists in ch but has been remove by GC in leader.
void compareAndCompleteSnapshot(HandleMap & handle_map, const Timestamp safe_point);

void tryCompactionFilter(const Timestamp safe_point);

RegionRaftCommandDelegate & makeRaftCommandDelegate(const KVStoreTaskLock &);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Storages/Transaction/RegionRangeKeys.h>
#include <Storages/Transaction/TiKVKeyValue.h>

#include <map>
Expand All @@ -22,7 +23,7 @@ namespace DB
{

struct TiKVRangeKey;
using RegionRange = std::pair<TiKVRangeKey, TiKVRangeKey>;
using RegionRange = RegionRangeKeys::RegionRange;
using RegionDataRes = size_t;

enum class DupCheck
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/Transaction/RegionRangeKeys.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct TiKVRangeKey : boost::noncopyable
TiKVRangeKey copy() const;
TiKVRangeKey & operator=(TiKVRangeKey &&);
std::string toDebugString() const;
std::string toString() const
{
return key.toString();
}

State state;
TiKVKey key;
Expand All @@ -53,9 +57,13 @@ class RegionRangeKeys : boost::noncopyable
using RegionRange = std::pair<TiKVRangeKey, TiKVRangeKey>;

const RegionRange & comparableKeys() const;
static RegionRange cloneRange(const RegionRange & from);
static RegionRange makeComparableKeys(TiKVKey && start_key, TiKVKey && end_key);
const std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr> & rawKeys() const;
explicit RegionRangeKeys(TiKVKey && start_key, TiKVKey && end_key);
explicit RegionRangeKeys(RegionRange && range)
: RegionRangeKeys(std::move(range.first.key), std::move(range.second.key))
{}
TableID getMappedTableID() const;
KeyspaceID getKeyspaceID() const;
std::string toDebugString() const;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/RegionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ RegionRangeKeys::RegionRange RegionRangeKeys::makeComparableKeys(TiKVKey && star
TiKVRangeKey::makeTiKVRangeKey<false>(std::move(end_key)));
}

RegionRangeKeys::RegionRange RegionRangeKeys::cloneRange(const RegionRange & from)
{
return std::make_pair(from.first.copy(), from.second.copy());
}

std::string TiKVRangeKey::toDebugString() const
{
if (this->state == MAX)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id,
{
// Load persisted info.
it = tables.emplace(ks_tb_id, table_id).first;
LOG_INFO(log, "get new table {}", table_id);
LOG_INFO(log, "get new table {} of keyspace {}", table_id, keyspace_id);
}
return it->second;
}
Expand Down Expand Up @@ -426,7 +426,9 @@ void RegionTable::handleInternalRegionsByTable(const KeyspaceID keyspace_id, con
std::lock_guard lock(mutex);

if (auto it = tables.find(KeyspaceTableID{keyspace_id, table_id}); it != tables.end())
{
callback(it->second.regions);
}
}

std::vector<std::pair<RegionID, RegionPtr>> RegionTable::getRegionsByTable(const KeyspaceID keyspace_id, const TableID table_id) const
Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Storages/Transaction/RegionsRangeIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ bool TiKVRangeKeyCmp::operator()(const TiKVRangeKey & x, const TiKVRangeKey & y)

void RegionsRangeIndex::add(const RegionPtr & new_region)
{
auto region_range = new_region->getRange();
const auto & new_range = region_range->comparableKeys();
auto begin_it = split(new_range.first);
auto end_it = split(new_range.second);
auto new_region_range = new_region->getRange();
const auto & new_range_keys = new_region_range->comparableKeys();
auto begin_it = split(new_range_keys.first);
auto end_it = split(new_range_keys.second);
if (begin_it == end_it)
throw Exception(
std::string(__PRETTY_FUNCTION__) + ": range of region " + toString(new_region->id()) + " is empty",
Expand Down Expand Up @@ -90,6 +90,11 @@ void RegionsRangeIndex::clear()
max_it = root.emplace(TiKVRangeKey::makeTiKVRangeKey<false>(TiKVKey()), IndexNode{}).first;
}

void RegionsRangeIndex::tryMergeEmpty()
{
tryMergeEmpty(root.begin());
}

void RegionsRangeIndex::tryMergeEmpty(RootMap::iterator remove_it)
{
if (!remove_it->second.region_map.empty())
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/Transaction/RegionsRangeIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@

#pragma once

#include <Storages/Transaction/RegionRangeKeys.h>
#include <Storages/Transaction/Types.h>

#include <map>

namespace DB
{

class Region;
using RegionPtr = std::shared_ptr<Region>;
using RegionMap = std::unordered_map<RegionID, RegionPtr>;

struct TiKVRangeKey;
using RegionRange = std::pair<TiKVRangeKey, TiKVRangeKey>;
using RegionRange = RegionRangeKeys::RegionRange;

struct TiKVRangeKeyCmp
{
Expand Down Expand Up @@ -55,9 +55,12 @@ class RegionsRangeIndex : private boost::noncopyable

void clear();

// TODO Used by RegionKVStoreTest, using a friend decl here.
RootMap::iterator split(const TiKVRangeKey & new_start);
void tryMergeEmpty();

private:
void tryMergeEmpty(RootMap::iterator remove_it);
RootMap::iterator split(const TiKVRangeKey & new_start);

private:
RootMap root;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ UInt64 TMTContext::waitIndexTimeout() const
{
return wait_index_timeout_ms.load(std::memory_order_relaxed);
}
void TMTContext::debugSetWaitIndexTimeout(UInt64 timeout)
{
return wait_index_timeout_ms.store(timeout, std::memory_order_relaxed);
}
Int64 TMTContext::waitRegionReadyTimeout() const
{
return wait_region_ready_timeout_sec.load(std::memory_order_relaxed);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class TMTContext : private boost::noncopyable
public:
const KVStorePtr & getKVStore() const;
KVStorePtr & getKVStore();
void debugSetKVStore(const KVStorePtr & new_kvstore)
{
kvstore = new_kvstore;
}

const ManagedStorages & getStorages() const;
ManagedStorages & getStorages();
Expand Down Expand Up @@ -137,6 +141,7 @@ class TMTContext : private boost::noncopyable
UInt64 batchReadIndexTimeout() const;
// timeout for wait index (ms). "0" means wait infinitely
UInt64 waitIndexTimeout() const;
void debugSetWaitIndexTimeout(UInt64 timeout);
Int64 waitRegionReadyTimeout() const;
uint64_t readIndexWorkerTick() const;

Expand Down
Loading

0 comments on commit 520f0df

Please sign in to comment.