Skip to content

Commit

Permalink
Batch read-index request by start-ts to reduce time cost under high c…
Browse files Browse the repository at this point in the history
…oncurrency scenarios (#3971)
  • Loading branch information
solotzg authored Feb 17, 2022
1 parent 9975570 commit 68c41c7
Show file tree
Hide file tree
Showing 28 changed files with 3,366 additions and 135 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Common/FmtUtils.h>
#include <DataStreams/StringStreamBlockInputStream.h>
#include <Debug/DBGInvoker.h>
#include <Debug/ReadIndexStressTest.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Debug/dbgFuncFailPoint.h>
#include <Debug/dbgFuncMisc.h>
Expand Down Expand Up @@ -104,6 +105,8 @@ DBGInvoker::DBGInvoker()

regSchemalessFunc("search_log_for_key", dbgFuncSearchLogForKey);
regSchemalessFunc("tidb_dag", dbgFuncTiDBQueryFromNaturalDag);

regSchemalessFunc("read_index_stress_test", ReadIndexStressTest::dbgFuncStressTest);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down
277 changes: 277 additions & 0 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
#include <Common/Exception.h>
#include <Debug/MockRaftStoreProxy.h>

namespace DB
{
kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t start_ts)
{
kvrpcpb::ReadIndexRequest req;
req.set_start_ts(start_ts);
req.mutable_context()->set_region_id(region_id);
return req;
}

MockRaftStoreProxy & as_ref(RaftStoreProxyPtr ptr)
{
return *reinterpret_cast<MockRaftStoreProxy *>(reinterpret_cast<size_t>(ptr.inner));
}

extern void mock_set_rust_gc_helper(void (*)(RawVoidPtr, RawRustPtrType));

RawRustPtr fn_make_read_index_task(RaftStoreProxyPtr ptr, BaseBuffView view)
{
auto & x = as_ref(ptr);
kvrpcpb::ReadIndexRequest req;
req.ParseFromArray(view.data, view.len);
auto * task = x.makeReadIndexTask(req);
if (task)
GCMonitor::instance().add(RawObjType::MockReadIndexTask, 1);
return RawRustPtr{task, static_cast<uint32_t>(RawObjType::MockReadIndexTask)};
}

RawRustPtr fn_make_async_waker(void (*wake_fn)(RawVoidPtr),
RawCppPtr data)
{
auto * p = new MockAsyncWaker{std::make_shared<MockAsyncNotifier>()};
p->data->data = data;
p->data->wake_fn = wake_fn;
GCMonitor::instance().add(RawObjType::MockAsyncWaker, 1);
return RawRustPtr{p, static_cast<uint32_t>(RawObjType::MockAsyncWaker)};
}

uint8_t fn_poll_read_index_task(RaftStoreProxyPtr, RawVoidPtr task, RawVoidPtr resp, RawVoidPtr waker)
{
auto & read_index_task = *reinterpret_cast<MockReadIndexTask *>(task);
auto * async_waker = reinterpret_cast<MockAsyncWaker *>(waker);
auto res = read_index_task.data->poll(async_waker ? async_waker->data : nullptr);
if (res)
{
auto * s = reinterpret_cast<kvrpcpb::ReadIndexResponse *>(resp);
*s = *res;
return 1;
}
else
{
return 0;
}
}

void fn_gc_rust_ptr(RawVoidPtr ptr, RawRustPtrType type_)
{
if (!ptr)
return;
auto type = static_cast<RawObjType>(type_);
GCMonitor::instance().add(type, -1);
switch (type)
{
case RawObjType::None:
break;
case RawObjType::MockReadIndexTask:
delete reinterpret_cast<MockReadIndexTask *>(ptr);
break;
case RawObjType::MockAsyncWaker:
delete reinterpret_cast<MockAsyncWaker *>(ptr);
break;
}
}

void fn_handle_batch_read_index(RaftStoreProxyPtr, CppStrVecView, RawVoidPtr, uint64_t)
{
throw Exception("`fn_handle_batch_read_index` is deprecated");
}

TiFlashRaftProxyHelper MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr proxy_ptr)
{
TiFlashRaftProxyHelper res{};
res.proxy_ptr = proxy_ptr;
res.fn_make_read_index_task = fn_make_read_index_task;
res.fn_poll_read_index_task = fn_poll_read_index_task;
res.fn_make_async_waker = fn_make_async_waker;
res.fn_handle_batch_read_index = fn_handle_batch_read_index;
{
// make sure such function pointer will be set at most once.
static std::once_flag flag;
std::call_once(flag, []() { MockSetFFI::MockSetRustGcHelper(fn_gc_rust_ptr); });
}

return res;
}

raft_serverpb::RegionLocalState MockProxyRegion::getState()
{
auto _ = genLockGuard();
return state;
}

raft_serverpb::RaftApplyState MockProxyRegion::getApply()
{
auto _ = genLockGuard();
return apply;
}

uint64_t MockProxyRegion::getLatestCommitIndex()
{
return this->getApply().commit_index();
}

void MockProxyRegion::updateCommitIndex(uint64_t index)
{
auto _ = genLockGuard();
this->apply.set_commit_index(index);
}

MockProxyRegion::MockProxyRegion()
{
apply.set_commit_index(5);
}

std::optional<kvrpcpb::ReadIndexResponse> RawMockReadIndexTask::poll(std::shared_ptr<MockAsyncNotifier> waker)
{
auto _ = genLockGuard();

if (!finished)
{
if (waker != this->waker)
{
this->waker = waker;
}
return {};
}
if (has_lock)
{
resp.mutable_locked();
return resp;
}
if (has_region_error)
{
resp.mutable_region_error()->mutable_data_is_not_ready();
return resp;
}
resp.set_read_index(region->getLatestCommitIndex());
return resp;
}

void RawMockReadIndexTask::update(bool lock, bool region_error)
{
{
auto _ = genLockGuard();
if (finished)
return;
finished = true;
has_lock = lock;
has_region_error = region_error;
}
if (waker)
waker->wake();
}

MockProxyRegionPtr MockRaftStoreProxy::getRegion(uint64_t id)
{
auto _ = genLockGuard();
return doGetRegion(id);
}

MockProxyRegionPtr MockRaftStoreProxy::doGetRegion(uint64_t id)
{
if (auto it = regions.find(id); it != regions.end())
{
return it->second;
}
return nullptr;
}

MockReadIndexTask * MockRaftStoreProxy::makeReadIndexTask(kvrpcpb::ReadIndexRequest req)
{
auto _ = genLockGuard();

wake();

auto region = doGetRegion(req.context().region_id());
if (region)
{
auto * r = new MockReadIndexTask{};
r->data = std::make_shared<RawMockReadIndexTask>();
r->data->req = std::move(req);
r->data->region = region;
tasks.push_back(r->data);
return r;
}
return nullptr;
}

void MockRaftStoreProxy::init(size_t region_num)
{
auto _ = genLockGuard();
for (size_t i = 0; i < region_num; ++i)
{
regions.emplace(i, std::make_shared<MockProxyRegion>());
}
}

size_t MockRaftStoreProxy::size() const
{
auto _ = genLockGuard();
return regions.size();
}

void MockRaftStoreProxy::wake()
{
notifier.wake();
}

void MockRaftStoreProxy::testRunNormal(const std::atomic_bool & over)
{
while (!over)
{
runOneRound();
notifier.blockedWaitFor(std::chrono::seconds(1));
}
}

void MockRaftStoreProxy::runOneRound()
{
auto _ = genLockGuard();
while (!tasks.empty())
{
auto & t = *tasks.front();
if (!region_id_to_drop.count(t.req.context().region_id()))
{
if (region_id_to_error.count(t.req.context().region_id()))
t.update(false, true);
else
t.update(false, false);
}
tasks.pop_front();
}
}

void MockRaftStoreProxy::unsafeInvokeForTest(std::function<void(MockRaftStoreProxy &)> && cb)
{
auto _ = genLockGuard();
cb(*this);
}

void GCMonitor::add(RawObjType type, int64_t diff)
{
auto _ = genLockGuard();
data[type] += diff;
}

bool GCMonitor::checkClean()
{
auto _ = genLockGuard();
for (auto && d : data)
{
if (d.second)
return false;
}
return true;
}

bool GCMonitor::empty()
{
auto _ = genLockGuard();
return data.empty();
}

} // namespace DB
Loading

0 comments on commit 68c41c7

Please sign in to comment.