Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Refine parallel prehandle snapshot (part-1) #9192

Merged
merged 2 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,7 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe
RUNTIME_CHECK_MSG(
current_truncated_ts > start_limit,
"current pk decreases as reader advances, skipped_times={} start_raw={} start_pk={} current_pk={} "
"current_raw={} cf={} split_id={}, "
"region_id={}",
"current_raw={} cf={} split_id={} region_id={}",
skipped_times,
soft_limit.value().raw_start.toDebugString(),
start_limit.value().toDebugString(),
Expand Down
19 changes: 8 additions & 11 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
#include <memory>
#include <string_view>

namespace Poco
{
class Logger;
}

namespace DB
{
class TMTContext;
Expand Down Expand Up @@ -58,8 +53,8 @@ struct SSTScanSoftLimit
std::optional<RawTiDBPK> start_limit;
std::optional<RawTiDBPK> end_limit;

SSTScanSoftLimit(size_t extra_id, TiKVKey && raw_start_, TiKVKey && raw_end_)
: split_id(extra_id)
SSTScanSoftLimit(size_t split_id_, TiKVKey && raw_start_, TiKVKey && raw_end_)
: split_id(split_id_)
, raw_start(std::move(raw_start_))
, raw_end(std::move(raw_end_))
{
Expand All @@ -71,19 +66,21 @@ struct SSTScanSoftLimit
{
decoded_end = RecordKVFormat::decodeTiKVKey(raw_end);
}
if (decoded_start.size())
if (!decoded_start.empty())
{
start_limit = RecordKVFormat::getRawTiDBPK(decoded_start);
}
if (decoded_end.size())
if (!decoded_end.empty())
{
end_limit = RecordKVFormat::getRawTiDBPK(decoded_end);
}
}

const std::optional<RawTiDBPK> & getStartLimit() { return start_limit; }
SSTScanSoftLimit clone() const { return SSTScanSoftLimit(split_id, raw_start.toString(), raw_end.toString()); }

const std::optional<RawTiDBPK> & getStartLimit() const { return start_limit; }

const std::optional<RawTiDBPK> & getEndLimit() { return end_limit; }
const std::optional<RawTiDBPK> & getEndLimit() const { return end_limit; }

std::string toDebugString() const
{
Expand Down
173 changes: 87 additions & 86 deletions dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Storages/KVStore/FFI/SSTReader.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h>
#include <Storages/KVStore/MultiRaft/PreHandlingTrace.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Types.h>
Expand Down Expand Up @@ -75,6 +76,7 @@ struct PrehandleTransformCtx
StorageDeltaMergePtr storage;
const DM::SSTFilesToBlockInputStreamOpts & opts;
TMTContext & tmt;
UInt64 snapshot_index;
};

void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parallel, size_t parallel_subtask_limit)
Expand Down Expand Up @@ -224,7 +226,9 @@ static inline std::tuple<ReadFromStreamResult, PrehandleResult> executeTransform
.write_cf_keys = sst_stream->getProcessKeys().write_cf,
.lock_cf_keys = sst_stream->getProcessKeys().lock_cf,
.default_cf_keys = sst_stream->getProcessKeys().default_cf,
.max_split_write_cf_keys = sst_stream->getProcessKeys().write_cf}});
.max_split_write_cf_keys = sst_stream->getProcessKeys().write_cf,
},
});
}
catch (DB::Exception & e)
{
Expand Down Expand Up @@ -432,16 +436,16 @@ static void runInParallel(
RegionPtr new_region,
const SSTViewVec & snaps,
const TiFlashRaftProxyHelper * proxy_helper,
uint64_t index,
uint64_t extra_id,
ParallelPrehandleCtxPtr parallel_ctx,
DM::SSTScanSoftLimit && part_limit)
{
std::string limit_tag = part_limit.toDebugString();
const String limit_tag = part_limit.toDebugString();
const size_t split_id = part_limit.split_id;

auto part_new_region = std::make_shared<Region>(new_region->getMeta().clone(), proxy_helper);
auto part_sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
part_new_region,
index,
prehandle_ctx.snapshot_index,
snaps,
proxy_helper,
prehandle_ctx.tmt,
Expand All @@ -454,24 +458,24 @@ static void runInParallel(
= executeTransform(log, prehandle_ctx, part_new_region, part_sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task limit {} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} "
"Finished extra parallel prehandle task limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} "
"split_id={} region_id={}",
limit_tag,
part_prehandle_result.stats.write_cf_keys,
part_prehandle_result.stats.lock_cf_keys,
part_prehandle_result.stats.default_cf_keys,
part_prehandle_result.ingest_ids.size(),
magic_enum::enum_name(part_result.error),
extra_id,
split_id,
part_new_region->id());
if (part_result.error == PrehandleTransformStatus::ErrUpdateSchema)
{
prehandle_ctx.prehandle_task->abortFor(PrehandleTransformStatus::ErrUpdateSchema);
}
{
std::scoped_lock l(parallel_ctx->mut);
parallel_ctx->gather_res[extra_id] = std::move(part_result);
parallel_ctx->gather_prehandle_res[extra_id] = std::move(part_prehandle_result);
parallel_ctx->gather_res[split_id] = std::move(part_result);
parallel_ctx->gather_prehandle_res[split_id] = std::move(part_prehandle_result);
}
}
catch (Exception & e)
Expand All @@ -485,24 +489,21 @@ static void runInParallel(
" write_cf_off={} split_id={} region_id={}",
e.message(),
processed_keys.write_cf,
extra_id,
split_id,
part_new_region->id());
prehandle_ctx.prehandle_task->abortFor(PrehandleTransformStatus::Aborted);
throw;
}
}

void executeParallelTransform(
std::tuple<ReadFromStreamResult, PrehandleResult> executeParallelTransform(
LoggerPtr log,
PrehandleTransformCtx & prehandle_ctx,
RegionPtr new_region,
ReadFromStreamResult & result,
PrehandleResult & prehandle_result,
const std::vector<std::string> & split_keys,
std::shared_ptr<DM::SSTFilesToBlockInputStream> sst_stream,
const SSTViewVec & snaps,
const TiFlashRaftProxyHelper * proxy_helper,
uint64_t index)
const TiFlashRaftProxyHelper * proxy_helper)
{
CurrentMetrics::add(CurrentMetrics::RaftNumParallelPrehandlingTasks);
SCOPE_EXIT({ CurrentMetrics::sub(CurrentMetrics::RaftNumParallelPrehandlingTasks); });
Expand All @@ -522,107 +523,107 @@ void executeParallelTransform(
Stopwatch watch;
// Make sure the queue is bigger than `split_key_count`, otherwise `addTask` may fail.
auto async_tasks = SingleSnapshotAsyncTasks(split_key_count, split_key_count, split_key_count + 5);
sst_stream->resetSoftLimit(
DM::SSTScanSoftLimit(DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT, std::string(""), std::string(split_keys[0])));

const DM::SSTScanSoftLimit head_soft_limit(
DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
std::string(""),
std::string(split_keys[0]));
sst_stream->resetSoftLimit(head_soft_limit.clone());

ParallelPrehandleCtxPtr parallel_ctx = std::make_shared<ParallelPrehandleCtx>();

for (size_t extra_id = 0; extra_id < split_key_count; extra_id++)
for (size_t split_id = 0; split_id < split_key_count; ++split_id)
{
auto add_result = async_tasks.addTask(extra_id, [&, extra_id]() {
auto add_result = async_tasks.addTask(split_id, [&, split_id]() {
std::string origin_name = getThreadName();
SCOPE_EXIT({ setThreadName(origin_name.c_str()); });
setThreadName("para-pre-snap");
auto limit = DM::SSTScanSoftLimit(
extra_id,
std::string(split_keys[extra_id]),
extra_id + 1 == split_key_count ? std::string("") : std::string(split_keys[extra_id + 1]));
runInParallel(
log,
prehandle_ctx,
new_region,
snaps,
proxy_helper,
index,
extra_id,
parallel_ctx,
std::move(limit));
auto part_limit = DM::SSTScanSoftLimit(
split_id,
std::string(split_keys[split_id]),
split_id + 1 == split_key_count ? std::string("") : std::string(split_keys[split_id + 1]));
runInParallel(log, prehandle_ctx, new_region, snaps, proxy_helper, parallel_ctx, std::move(part_limit));
return true;
});
RUNTIME_CHECK_MSG(
add_result,
"Failed when adding {}-th task for prehandling region_id={}",
extra_id,
split_id,
new_region->id());
}

// This will read the keys from the beginning to the first split key
auto [head_result, head_prehandle_result] = executeTransform(log, prehandle_ctx, new_region, sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task, limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} "
"error={} split_id={} region_id={}",
sst_stream->getSoftLimit()->toDebugString(),
head_soft_limit.toDebugString(),
head_prehandle_result.stats.write_cf_keys,
head_prehandle_result.stats.lock_cf_keys,
head_prehandle_result.stats.default_cf_keys,
head_prehandle_result.ingest_ids.size(),
magic_enum::enum_name(head_result.error),
DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
head_soft_limit.split_id,
new_region->id());

// Wait all threads to join. May throw.
// If one thread throws, then all result is useless, so `async_tasks` is released directly.
for (size_t extra_id = 0; extra_id < split_key_count; extra_id++)
for (size_t split_id = 0; split_id < split_key_count; ++split_id)
{
// May get exception.
LOG_DEBUG(log, "Try fetch prehandle task split_id={}, region_id={}", extra_id, new_region->id());
async_tasks.fetchResult(extra_id);
}
if (head_result.error == PrehandleTransformStatus::Ok)
{
prehandle_result = std::move(head_prehandle_result);
// Aggregate results.
for (size_t extra_id = 0; extra_id < split_key_count; extra_id++)
{
std::scoped_lock l(parallel_ctx->mut);
if (parallel_ctx->gather_res[extra_id].error == PrehandleTransformStatus::Ok)
{
result.error = PrehandleTransformStatus::Ok;
auto & v = parallel_ctx->gather_prehandle_res[extra_id];
prehandle_result.ingest_ids.insert(
prehandle_result.ingest_ids.end(),
std::make_move_iterator(v.ingest_ids.begin()),
std::make_move_iterator(v.ingest_ids.end()));
v.ingest_ids.clear();
prehandle_result.stats.mergeFrom(v.stats);
// Merge all uncommitted data in different splits.
new_region->mergeDataFrom(*parallel_ctx->gather_res[extra_id].region);
}
else
{
// Once a prehandle has non-ok result, we quit further loop
result = parallel_ctx->gather_res[extra_id];
result.extra_msg = fmt::format(", from {}", extra_id);
break;
}
}
LOG_INFO(
log,
"Finished all extra parallel prehandle task, write_cf={} dmfiles={} error={} splits={} cost={:.3f}s "
"region_id={}",
prehandle_result.stats.write_cf_keys,
prehandle_result.ingest_ids.size(),
magic_enum::enum_name(head_result.error),
split_key_count,
watch.elapsedSeconds(),
new_region->id());
LOG_DEBUG(log, "Try fetch prehandle task split_id={}, region_id={}", split_id, new_region->id());
async_tasks.fetchResult(split_id);
}
else

ReadFromStreamResult result;
PrehandleResult prehandle_result;
if (head_result.error != PrehandleTransformStatus::Ok)
{
// Otherwise, fallback to error handling or exception handling.
result = head_result;
result.extra_msg = fmt::format(", from {}", DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT);
return {result, prehandle_result};
}

assert(head_result.error == PrehandleTransformStatus::Ok);
prehandle_result = std::move(head_prehandle_result);
// Aggregate results.
for (size_t split_id = 0; split_id < split_key_count; ++split_id)
{
std::scoped_lock l(parallel_ctx->mut);
if (parallel_ctx->gather_res[split_id].error == PrehandleTransformStatus::Ok)
{
result.error = PrehandleTransformStatus::Ok;
auto & v = parallel_ctx->gather_prehandle_res[split_id];
prehandle_result.ingest_ids.insert(
prehandle_result.ingest_ids.end(),
std::make_move_iterator(v.ingest_ids.begin()),
std::make_move_iterator(v.ingest_ids.end()));
v.ingest_ids.clear();
prehandle_result.stats.mergeFrom(v.stats);
// Merge all uncommitted data in different splits.
new_region->mergeDataFrom(*parallel_ctx->gather_res[split_id].region);
}
else
{
// Once a prehandle has non-ok result, we quit further loop
result = parallel_ctx->gather_res[split_id];
result.extra_msg = fmt::format(", from {}", split_id);
break;
}
}
LOG_INFO(
log,
"Finished all extra parallel prehandle task, write_cf={} dmfiles={} error={} splits={} cost={:.3f}s "
"region_id={}",
prehandle_result.stats.write_cf_keys,
prehandle_result.ingest_ids.size(),
magic_enum::enum_name(head_result.error),
split_key_count,
watch.elapsedSeconds(),
new_region->id());
return {result, prehandle_result};
}

/// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data
Expand Down Expand Up @@ -694,7 +695,8 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
.schema_snap = schema_snap,
.gc_safepoint = gc_safepoint,
.force_decode = force_decode,
.expected_size = expected_block_size};
.expected_size = expected_block_size,
};

auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
new_region,
Expand All @@ -712,7 +714,9 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
.job_type = job_type,
.storage = storage,
.opts = opt,
.tmt = tmt};
.tmt = tmt,
.snapshot_index = index,
};

// `split_keys` do not begin with 'z'.
auto [split_keys, approx_bytes] = getSplitKey(log, this, new_region, sst_stream);
Expand All @@ -730,17 +734,14 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
}
else
{
executeParallelTransform(
std::tie(result, prehandle_result) = executeParallelTransform(
log,
prehandle_ctx,
new_region,
result,
prehandle_result,
split_keys,
sst_stream,
snaps,
proxy_helper,
index);
proxy_helper);
}

prehandle_result.stats.approx_raft_snapshot_size = approx_bytes;
Expand Down