Skip to content

Commit

Permalink
KVStore: Refine parallel prehandle snapshot (part-1) (#9192)
Browse files Browse the repository at this point in the history
ref #8081

Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com>
  • Loading branch information
JaySon-Huang and Lloyd-Pottiger authored Jul 5, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 82d20fe commit cc1bf74
Showing 3 changed files with 96 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -466,8 +466,7 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe
RUNTIME_CHECK_MSG(
current_truncated_ts > start_limit,
"current pk decreases as reader advances, skipped_times={} start_raw={} start_pk={} current_pk={} "
"current_raw={} cf={} split_id={}, "
"region_id={}",
"current_raw={} cf={} split_id={} region_id={}",
skipped_times,
soft_limit.value().raw_start.toDebugString(),
start_limit.value().toDebugString(),
19 changes: 8 additions & 11 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -23,11 +23,6 @@
#include <memory>
#include <string_view>

namespace Poco
{
class Logger;
}

namespace DB
{
class TMTContext;
@@ -58,8 +53,8 @@ struct SSTScanSoftLimit
std::optional<RawTiDBPK> start_limit;
std::optional<RawTiDBPK> end_limit;

SSTScanSoftLimit(size_t extra_id, TiKVKey && raw_start_, TiKVKey && raw_end_)
: split_id(extra_id)
SSTScanSoftLimit(size_t split_id_, TiKVKey && raw_start_, TiKVKey && raw_end_)
: split_id(split_id_)
, raw_start(std::move(raw_start_))
, raw_end(std::move(raw_end_))
{
@@ -71,19 +66,21 @@ struct SSTScanSoftLimit
{
decoded_end = RecordKVFormat::decodeTiKVKey(raw_end);
}
if (decoded_start.size())
if (!decoded_start.empty())
{
start_limit = RecordKVFormat::getRawTiDBPK(decoded_start);
}
if (decoded_end.size())
if (!decoded_end.empty())
{
end_limit = RecordKVFormat::getRawTiDBPK(decoded_end);
}
}

const std::optional<RawTiDBPK> & getStartLimit() { return start_limit; }
SSTScanSoftLimit clone() const { return SSTScanSoftLimit(split_id, raw_start.toString(), raw_end.toString()); }

const std::optional<RawTiDBPK> & getStartLimit() const { return start_limit; }

const std::optional<RawTiDBPK> & getEndLimit() { return end_limit; }
const std::optional<RawTiDBPK> & getEndLimit() const { return end_limit; }

std::string toDebugString() const
{
173 changes: 87 additions & 86 deletions dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
#include <Storages/KVStore/FFI/SSTReader.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h>
#include <Storages/KVStore/MultiRaft/PreHandlingTrace.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Types.h>
@@ -75,6 +76,7 @@ struct PrehandleTransformCtx
StorageDeltaMergePtr storage;
const DM::SSTFilesToBlockInputStreamOpts & opts;
TMTContext & tmt;
UInt64 snapshot_index;
};

void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parallel, size_t parallel_subtask_limit)
@@ -224,7 +226,9 @@ static inline std::tuple<ReadFromStreamResult, PrehandleResult> executeTransform
.write_cf_keys = sst_stream->getProcessKeys().write_cf,
.lock_cf_keys = sst_stream->getProcessKeys().lock_cf,
.default_cf_keys = sst_stream->getProcessKeys().default_cf,
.max_split_write_cf_keys = sst_stream->getProcessKeys().write_cf}});
.max_split_write_cf_keys = sst_stream->getProcessKeys().write_cf,
},
});
}
catch (DB::Exception & e)
{
@@ -432,16 +436,16 @@ static void runInParallel(
RegionPtr new_region,
const SSTViewVec & snaps,
const TiFlashRaftProxyHelper * proxy_helper,
uint64_t index,
uint64_t extra_id,
ParallelPrehandleCtxPtr parallel_ctx,
DM::SSTScanSoftLimit && part_limit)
{
std::string limit_tag = part_limit.toDebugString();
const String limit_tag = part_limit.toDebugString();
const size_t split_id = part_limit.split_id;

auto part_new_region = std::make_shared<Region>(new_region->getMeta().clone(), proxy_helper);
auto part_sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
part_new_region,
index,
prehandle_ctx.snapshot_index,
snaps,
proxy_helper,
prehandle_ctx.tmt,
@@ -454,24 +458,24 @@ static void runInParallel(
= executeTransform(log, prehandle_ctx, part_new_region, part_sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task limit {} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} "
"Finished extra parallel prehandle task limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} "
"split_id={} region_id={}",
limit_tag,
part_prehandle_result.stats.write_cf_keys,
part_prehandle_result.stats.lock_cf_keys,
part_prehandle_result.stats.default_cf_keys,
part_prehandle_result.ingest_ids.size(),
magic_enum::enum_name(part_result.error),
extra_id,
split_id,
part_new_region->id());
if (part_result.error == PrehandleTransformStatus::ErrUpdateSchema)
{
prehandle_ctx.prehandle_task->abortFor(PrehandleTransformStatus::ErrUpdateSchema);
}
{
std::scoped_lock l(parallel_ctx->mut);
parallel_ctx->gather_res[extra_id] = std::move(part_result);
parallel_ctx->gather_prehandle_res[extra_id] = std::move(part_prehandle_result);
parallel_ctx->gather_res[split_id] = std::move(part_result);
parallel_ctx->gather_prehandle_res[split_id] = std::move(part_prehandle_result);
}
}
catch (Exception & e)
@@ -485,24 +489,21 @@ static void runInParallel(
" write_cf_off={} split_id={} region_id={}",
e.message(),
processed_keys.write_cf,
extra_id,
split_id,
part_new_region->id());
prehandle_ctx.prehandle_task->abortFor(PrehandleTransformStatus::Aborted);
throw;
}
}

void executeParallelTransform(
std::tuple<ReadFromStreamResult, PrehandleResult> executeParallelTransform(
LoggerPtr log,
PrehandleTransformCtx & prehandle_ctx,
RegionPtr new_region,
ReadFromStreamResult & result,
PrehandleResult & prehandle_result,
const std::vector<std::string> & split_keys,
std::shared_ptr<DM::SSTFilesToBlockInputStream> sst_stream,
const SSTViewVec & snaps,
const TiFlashRaftProxyHelper * proxy_helper,
uint64_t index)
const TiFlashRaftProxyHelper * proxy_helper)
{
CurrentMetrics::add(CurrentMetrics::RaftNumParallelPrehandlingTasks);
SCOPE_EXIT({ CurrentMetrics::sub(CurrentMetrics::RaftNumParallelPrehandlingTasks); });
@@ -522,107 +523,107 @@ void executeParallelTransform(
Stopwatch watch;
// Make sure the queue is bigger than `split_key_count`, otherwise `addTask` may fail.
auto async_tasks = SingleSnapshotAsyncTasks(split_key_count, split_key_count, split_key_count + 5);
sst_stream->resetSoftLimit(
DM::SSTScanSoftLimit(DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT, std::string(""), std::string(split_keys[0])));

const DM::SSTScanSoftLimit head_soft_limit(
DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
std::string(""),
std::string(split_keys[0]));
sst_stream->resetSoftLimit(head_soft_limit.clone());

ParallelPrehandleCtxPtr parallel_ctx = std::make_shared<ParallelPrehandleCtx>();

for (size_t extra_id = 0; extra_id < split_key_count; extra_id++)
for (size_t split_id = 0; split_id < split_key_count; ++split_id)
{
auto add_result = async_tasks.addTask(extra_id, [&, extra_id]() {
auto add_result = async_tasks.addTask(split_id, [&, split_id]() {
std::string origin_name = getThreadName();
SCOPE_EXIT({ setThreadName(origin_name.c_str()); });
setThreadName("para-pre-snap");
auto limit = DM::SSTScanSoftLimit(
extra_id,
std::string(split_keys[extra_id]),
extra_id + 1 == split_key_count ? std::string("") : std::string(split_keys[extra_id + 1]));
runInParallel(
log,
prehandle_ctx,
new_region,
snaps,
proxy_helper,
index,
extra_id,
parallel_ctx,
std::move(limit));
auto part_limit = DM::SSTScanSoftLimit(
split_id,
std::string(split_keys[split_id]),
split_id + 1 == split_key_count ? std::string("") : std::string(split_keys[split_id + 1]));
runInParallel(log, prehandle_ctx, new_region, snaps, proxy_helper, parallel_ctx, std::move(part_limit));
return true;
});
RUNTIME_CHECK_MSG(
add_result,
"Failed when adding {}-th task for prehandling region_id={}",
extra_id,
split_id,
new_region->id());
}

// This will read the keys from the beginning to the first split key
auto [head_result, head_prehandle_result] = executeTransform(log, prehandle_ctx, new_region, sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task, limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} "
"error={} split_id={} region_id={}",
sst_stream->getSoftLimit()->toDebugString(),
head_soft_limit.toDebugString(),
head_prehandle_result.stats.write_cf_keys,
head_prehandle_result.stats.lock_cf_keys,
head_prehandle_result.stats.default_cf_keys,
head_prehandle_result.ingest_ids.size(),
magic_enum::enum_name(head_result.error),
DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
head_soft_limit.split_id,
new_region->id());

// Wait all threads to join. May throw.
// If one thread throws, then all result is useless, so `async_tasks` is released directly.
for (size_t extra_id = 0; extra_id < split_key_count; extra_id++)
for (size_t split_id = 0; split_id < split_key_count; ++split_id)
{
// May get exception.
LOG_DEBUG(log, "Try fetch prehandle task split_id={}, region_id={}", extra_id, new_region->id());
async_tasks.fetchResult(extra_id);
}
if (head_result.error == PrehandleTransformStatus::Ok)
{
prehandle_result = std::move(head_prehandle_result);
// Aggregate results.
for (size_t extra_id = 0; extra_id < split_key_count; extra_id++)
{
std::scoped_lock l(parallel_ctx->mut);
if (parallel_ctx->gather_res[extra_id].error == PrehandleTransformStatus::Ok)
{
result.error = PrehandleTransformStatus::Ok;
auto & v = parallel_ctx->gather_prehandle_res[extra_id];
prehandle_result.ingest_ids.insert(
prehandle_result.ingest_ids.end(),
std::make_move_iterator(v.ingest_ids.begin()),
std::make_move_iterator(v.ingest_ids.end()));
v.ingest_ids.clear();
prehandle_result.stats.mergeFrom(v.stats);
// Merge all uncommitted data in different splits.
new_region->mergeDataFrom(*parallel_ctx->gather_res[extra_id].region);
}
else
{
// Once a prehandle has non-ok result, we quit further loop
result = parallel_ctx->gather_res[extra_id];
result.extra_msg = fmt::format(", from {}", extra_id);
break;
}
}
LOG_INFO(
log,
"Finished all extra parallel prehandle task, write_cf={} dmfiles={} error={} splits={} cost={:.3f}s "
"region_id={}",
prehandle_result.stats.write_cf_keys,
prehandle_result.ingest_ids.size(),
magic_enum::enum_name(head_result.error),
split_key_count,
watch.elapsedSeconds(),
new_region->id());
LOG_DEBUG(log, "Try fetch prehandle task split_id={}, region_id={}", split_id, new_region->id());
async_tasks.fetchResult(split_id);
}
else

ReadFromStreamResult result;
PrehandleResult prehandle_result;
if (head_result.error != PrehandleTransformStatus::Ok)
{
// Otherwise, fallback to error handling or exception handling.
result = head_result;
result.extra_msg = fmt::format(", from {}", DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT);
return {result, prehandle_result};
}

assert(head_result.error == PrehandleTransformStatus::Ok);
prehandle_result = std::move(head_prehandle_result);
// Aggregate results.
for (size_t split_id = 0; split_id < split_key_count; ++split_id)
{
std::scoped_lock l(parallel_ctx->mut);
if (parallel_ctx->gather_res[split_id].error == PrehandleTransformStatus::Ok)
{
result.error = PrehandleTransformStatus::Ok;
auto & v = parallel_ctx->gather_prehandle_res[split_id];
prehandle_result.ingest_ids.insert(
prehandle_result.ingest_ids.end(),
std::make_move_iterator(v.ingest_ids.begin()),
std::make_move_iterator(v.ingest_ids.end()));
v.ingest_ids.clear();
prehandle_result.stats.mergeFrom(v.stats);
// Merge all uncommitted data in different splits.
new_region->mergeDataFrom(*parallel_ctx->gather_res[split_id].region);
}
else
{
// Once a prehandle has non-ok result, we quit further loop
result = parallel_ctx->gather_res[split_id];
result.extra_msg = fmt::format(", from {}", split_id);
break;
}
}
LOG_INFO(
log,
"Finished all extra parallel prehandle task, write_cf={} dmfiles={} error={} splits={} cost={:.3f}s "
"region_id={}",
prehandle_result.stats.write_cf_keys,
prehandle_result.ingest_ids.size(),
magic_enum::enum_name(head_result.error),
split_key_count,
watch.elapsedSeconds(),
new_region->id());
return {result, prehandle_result};
}

/// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data
@@ -694,7 +695,8 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
.schema_snap = schema_snap,
.gc_safepoint = gc_safepoint,
.force_decode = force_decode,
.expected_size = expected_block_size};
.expected_size = expected_block_size,
};

auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
new_region,
@@ -712,7 +714,9 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
.job_type = job_type,
.storage = storage,
.opts = opt,
.tmt = tmt};
.tmt = tmt,
.snapshot_index = index,
};

// `split_keys` do not begin with 'z'.
auto [split_keys, approx_bytes] = getSplitKey(log, this, new_region, sst_stream);
@@ -730,17 +734,14 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
}
else
{
executeParallelTransform(
std::tie(result, prehandle_result) = executeParallelTransform(
log,
prehandle_ctx,
new_region,
result,
prehandle_result,
split_keys,
sst_stream,
snaps,
proxy_helper,
index);
proxy_helper);
}

prehandle_result.stats.approx_raft_snapshot_size = approx_bytes;

0 comments on commit cc1bf74

Please sign in to comment.