Skip to content

Commit

Permalink
KVStore: Enhance Parallel Prehandle (#9155)
Browse files Browse the repository at this point in the history
close #8081

Signed-off-by: CalvinNeo <[email protected]>
  • Loading branch information
CalvinNeo authored Jun 21, 2024
1 parent 9d823e3 commit 06a50ea
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 6 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
M(RaftNumSnapshotsPendingApply) \
M(RaftNumPrehandlingSubTasks) \
M(RaftNumParallelPrehandlingTasks) \
M(RaftNumWaitedParallelPrehandlingTasks) \
M(RateLimiterPendingWriteRequest) \
M(DT_SegmentReadTasks) \
M(DT_SnapshotOfRead) \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockKVStore/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeFactory.h>
#include <Debug/MockKVStore/MockFFIImpls.h>
Expand Down Expand Up @@ -654,6 +655,8 @@ std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
}
}
SSTViewVec snaps{ssts.data(), ssts.size()};

SYNC_FOR("before_MockRaftStoreProxy::snapshot_prehandle");
try
{
auto prehandle_result = kvs.preHandleSnapshotToFiles(new_kv_region, snaps, index, term, deadline_index, tmt);
Expand Down
17 changes: 12 additions & 5 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
maybeSkipBySoftLimit(cf, *reader_ptr);
}

Stopwatch sw;
auto pre = *p_process_keys_bytes;
// Simply read to the end of SST file
if (rowkey_to_be_included == nullptr)
{
Expand All @@ -282,32 +284,35 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
(*p_process_keys_bytes) += (key.len + value.len);
reader->next();
}
auto sec = sw.elapsedMilliseconds();
LOG_DEBUG(
log,
"Done loading all kvpairs, CF={} offset={} processed_bytes={} write_cf_offset={} region_id={} split_id={} "
"snapshot_index={}",
"snapshot_index={} elapsed_sec={} speed={}",
CFToName(cf),
(*p_process_keys),
(*p_process_keys_bytes),
process_keys.write_cf,
region->id(),
getSplitId(),
snapshot_index);
snapshot_index,
sec,
((*p_process_keys_bytes) - pre) * 1.0 / sec);
return;
}


size_t process_keys_offset_end = process_keys.write_cf;
while (reader && reader->remained())
{
// If we have load all keys that less than or equal to `rowkey_to_be_included`, done.
// We keep an assumption that rowkeys are memory-comparable and they are asc sorted in the SST file
if (!last_loaded_rowkey->empty() && *last_loaded_rowkey > *rowkey_to_be_included)
{
auto sec = sw.elapsedMilliseconds();
LOG_DEBUG(
log,
"Done loading, CF={} offset={} processed_bytes={} write_cf_offset={} last_loaded_rowkey={} "
"rowkey_to_be_included={} region_id={} snapshot_index={}",
"rowkey_to_be_included={} region_id={} snapshot_index={} elapsed_sec={} speed={}",
CFToName(cf),
(*p_process_keys),
(*p_process_keys_bytes),
Expand All @@ -317,7 +322,9 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
? Redact::keyToDebugString(rowkey_to_be_included->data(), rowkey_to_be_included->size())
: "<end>"),
region->id(),
snapshot_index);
snapshot_index,
sec,
((*p_process_keys_bytes) - pre) * 1.0 / sec);
break;
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ size_t KVStore::getOngoingPrehandleTaskCount() const
{
return std::max(0, ongoing_prehandle_task_count.load());
}
size_t KVStore::getOngoingPrehandleSubtaskCount() const
{
return std::max(0, prehandling_trace.ongoing_prehandle_subtask_count.load());
}

static const metapb::Peer & findPeer(const metapb::Region & region, UInt64 peer_id)
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class KVStore final : private boost::noncopyable
void releasePreHandledSnapshot(const RegionPtrWrap &, TMTContext & tmt);
void abortPreHandleSnapshot(uint64_t region_id, TMTContext & tmt);
size_t getOngoingPrehandleTaskCount() const;
size_t getOngoingPrehandleSubtaskCount() const;
EngineStoreApplyRes handleIngestSST(UInt64 region_id, SSTViewVec, UInt64 index, UInt64 term, TMTContext & tmt);
size_t getMaxParallelPrehandleSize() const;

Expand Down Expand Up @@ -413,6 +414,7 @@ class KVStore final : private boost::noncopyable
// Relates to `queue_size` in `can_apply_snapshot`,
// we can't have access to these codes though.
std::atomic<int64_t> ongoing_prehandle_task_count{0};
std::atomic<int64_t> ongoing_prehandle_subtask_count{0};
ProxyConfigSummary proxy_config_summary;

JointThreadInfoJeallocMapPtr joint_memory_allocation_map;
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h>
Expand All @@ -39,6 +40,7 @@ namespace CurrentMetrics
{
extern const Metric RaftNumPrehandlingSubTasks;
extern const Metric RaftNumParallelPrehandlingTasks;
extern const Metric RaftNumWaitedParallelPrehandlingTasks;
} // namespace CurrentMetrics

namespace DB
Expand Down Expand Up @@ -99,6 +101,8 @@ void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parall
ongoing_prehandle_subtask_count.load(),
parallel,
region_id);

CurrentMetrics::add(CurrentMetrics::RaftNumWaitedParallelPrehandlingTasks);
while (true)
{
std::unique_lock<std::mutex> cpu_resource_lock{cpu_resource_mut};
Expand All @@ -120,6 +124,7 @@ void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parall
watch.elapsedSeconds(),
region_id,
parallel);
CurrentMetrics::sub(CurrentMetrics::RaftNumWaitedParallelPrehandlingTasks);
}

static inline std::tuple<ReadFromStreamResult, PrehandleResult> executeTransform(
Expand Down Expand Up @@ -346,7 +351,9 @@ static inline std::pair<std::vector<std::string>, size_t> getSplitKey(
}

// Get this info again, since getApproxBytes maybe take some time.
auto ongoing_count = kvstore->getOngoingPrehandleTaskCount();
// Currently, the head split has not been registered as sub task yet,
// so we must add 1 here.
auto ongoing_count = kvstore->getOngoingPrehandleSubtaskCount() + 1;
uint64_t want_split_parts = 0;
auto total_concurrency = kvstore->getMaxParallelPrehandleSize();
if (total_concurrency + 1 > ongoing_count)
Expand Down Expand Up @@ -523,6 +530,9 @@ void executeParallelTransform(
for (size_t extra_id = 0; extra_id < split_key_count; extra_id++)
{
auto add_result = async_tasks.addTask(extra_id, [&, extra_id]() {
std::string origin_name = getThreadName();
SCOPE_EXIT({ setThreadName(origin_name.c_str()); });
setThreadName("para-pre-snap");
auto limit = DM::SSTScanSoftLimit(
extra_id,
std::string(split_keys[extra_id]),
Expand Down
77 changes: 77 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,5 +970,82 @@ try
}
CATCH

TEST_F(RegionKVStoreV2Test, KVStoreSingleSnap7)
try
{
auto & ctx = TiFlashTestEnv::getGlobalContext();
proxy_instance->cluster_ver = RaftstoreVer::V2;
ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr);
ASSERT_NE(proxy_helper->fn_get_config_json, nullptr);
UInt64 region_id = 1;
TableID table_id;
FailPointHelper::enableFailPoint(FailPoints::force_set_parallel_prehandle_threshold, static_cast<size_t>(0));
SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_set_parallel_prehandle_threshold"); });
initStorages();
KVStore & kvs = getKVS();
{
region_id = 2;
RegionID region_id2 = 3;
HandleID table_limit_start = 100;
HandleID table_limit_end = 1900;
HandleID sst_limit = 2000;
table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext());
auto start = RecordKVFormat::genKey(table_id, table_limit_start);
auto end = RecordKVFormat::genKey(table_id, table_limit_end);
auto start2 = RecordKVFormat::genKey(table_id, table_limit_start + sst_limit);
auto end2 = RecordKVFormat::genKey(table_id, table_limit_end + 2 * sst_limit);
proxy_instance->bootstrapWithRegion(
kvs,
ctx.getTMTContext(),
region_id,
std::make_pair(start.toString(), end.toString()));
auto r1 = proxy_instance->getRegion(region_id);
proxy_instance->debugAddRegions(
kvs,
ctx.getTMTContext(),
{region_id2},
{std::make_pair(start2.toString(), end2.toString())});

auto [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999);
{
MockSSTReader::getMockSSTData().clear();
MockSSTGenerator default_cf{region_id, table_id, ColumnFamilyType::Default};
for (HandleID h = 1; h < sst_limit; h++)
{
auto k = RecordKVFormat::genKey(table_id, h, 111);
default_cf.insert_raw(k, value_default);
}
default_cf.finish_file(SSTFormatKind::KIND_TABLET);
default_cf.freeze();
MockSSTGenerator write_cf{region_id, table_id, ColumnFamilyType::Write};
for (HandleID h = table_limit_start + 10; h < table_limit_end - 10; h++)
{
auto k = RecordKVFormat::genKey(table_id, h, 111);
write_cf.insert_raw(k, value_write);
}
write_cf.finish_file(SSTFormatKind::KIND_TABLET);
write_cf.freeze();

auto sp = SyncPointCtl::enableInScope("before_MockRaftStoreProxy::snapshot_prehandle");
std::thread t1([&]() {
ASSERT_NO_THROW(
proxy_instance
->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf, write_cf}, 0, 0, std::nullopt));
});
sp.waitAndPause();
std::thread t2([&]() {
ASSERT_NO_THROW(
proxy_instance
->snapshot(kvs, ctx.getTMTContext(), region_id2, {default_cf, write_cf}, 0, 0, std::nullopt));
});
sp.next();
sp.disable();
t1.join();
t2.join();
}
}
}
CATCH

} // namespace tests
} // namespace DB

0 comments on commit 06a50ea

Please sign in to comment.