diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp index e2ad4a9108b..cfc55e704c5 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp @@ -466,8 +466,7 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe RUNTIME_CHECK_MSG( current_truncated_ts > start_limit, "current pk decreases as reader advances, skipped_times={} start_raw={} start_pk={} current_pk={} " - "current_raw={} cf={} split_id={}, " - "region_id={}", + "current_raw={} cf={} split_id={} region_id={}", skipped_times, soft_limit.value().raw_start.toDebugString(), start_limit.value().toDebugString(), diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h index e872dbac6d2..abf6c229d13 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h @@ -23,11 +23,6 @@ #include #include -namespace Poco -{ -class Logger; -} - namespace DB { class TMTContext; @@ -58,8 +53,8 @@ struct SSTScanSoftLimit std::optional start_limit; std::optional end_limit; - SSTScanSoftLimit(size_t extra_id, TiKVKey && raw_start_, TiKVKey && raw_end_) - : split_id(extra_id) + SSTScanSoftLimit(size_t split_id_, TiKVKey && raw_start_, TiKVKey && raw_end_) + : split_id(split_id_) , raw_start(std::move(raw_start_)) , raw_end(std::move(raw_end_)) { @@ -71,19 +66,21 @@ struct SSTScanSoftLimit { decoded_end = RecordKVFormat::decodeTiKVKey(raw_end); } - if (decoded_start.size()) + if (!decoded_start.empty()) { start_limit = RecordKVFormat::getRawTiDBPK(decoded_start); } - if (decoded_end.size()) + if (!decoded_end.empty()) { end_limit = RecordKVFormat::getRawTiDBPK(decoded_end); } } - const std::optional & getStartLimit() { return start_limit; } + SSTScanSoftLimit clone() const { return SSTScanSoftLimit(split_id, raw_start.toString(), raw_end.toString()); } + + const std::optional & getStartLimit() const { return start_limit; } - const std::optional & getEndLimit() { return end_limit; } + const std::optional & getEndLimit() const { return end_limit; } std::string toDebugString() const { diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index 29595e69790..ffaa688ea45 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -75,6 +76,7 @@ struct PrehandleTransformCtx StorageDeltaMergePtr storage; const DM::SSTFilesToBlockInputStreamOpts & opts; TMTContext & tmt; + UInt64 snapshot_index; }; void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parallel, size_t parallel_subtask_limit) @@ -224,7 +226,9 @@ static inline std::tuple executeTransform .write_cf_keys = sst_stream->getProcessKeys().write_cf, .lock_cf_keys = sst_stream->getProcessKeys().lock_cf, .default_cf_keys = sst_stream->getProcessKeys().default_cf, - .max_split_write_cf_keys = sst_stream->getProcessKeys().write_cf}}); + .max_split_write_cf_keys = sst_stream->getProcessKeys().write_cf, + }, + }); } catch (DB::Exception & e) { @@ -432,16 +436,16 @@ static void runInParallel( RegionPtr new_region, const SSTViewVec & snaps, const TiFlashRaftProxyHelper * proxy_helper, - uint64_t index, - uint64_t extra_id, ParallelPrehandleCtxPtr parallel_ctx, DM::SSTScanSoftLimit && part_limit) { - std::string limit_tag = part_limit.toDebugString(); + const String limit_tag = part_limit.toDebugString(); + const size_t split_id = part_limit.split_id; + auto part_new_region = std::make_shared(new_region->getMeta().clone(), proxy_helper); auto part_sst_stream = std::make_shared( part_new_region, - index, + prehandle_ctx.snapshot_index, snaps, proxy_helper, prehandle_ctx.tmt, @@ -454,7 +458,7 @@ static void runInParallel( = executeTransform(log, prehandle_ctx, part_new_region, part_sst_stream); LOG_INFO( log, - "Finished extra parallel prehandle task limit {} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} " + "Finished extra parallel prehandle task limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} " "split_id={} region_id={}", limit_tag, part_prehandle_result.stats.write_cf_keys, @@ -462,7 +466,7 @@ static void runInParallel( part_prehandle_result.stats.default_cf_keys, part_prehandle_result.ingest_ids.size(), magic_enum::enum_name(part_result.error), - extra_id, + split_id, part_new_region->id()); if (part_result.error == PrehandleTransformStatus::ErrUpdateSchema) { @@ -470,8 +474,8 @@ static void runInParallel( } { std::scoped_lock l(parallel_ctx->mut); - parallel_ctx->gather_res[extra_id] = std::move(part_result); - parallel_ctx->gather_prehandle_res[extra_id] = std::move(part_prehandle_result); + parallel_ctx->gather_res[split_id] = std::move(part_result); + parallel_ctx->gather_prehandle_res[split_id] = std::move(part_prehandle_result); } } catch (Exception & e) @@ -485,24 +489,21 @@ static void runInParallel( " write_cf_off={} split_id={} region_id={}", e.message(), processed_keys.write_cf, - extra_id, + split_id, part_new_region->id()); prehandle_ctx.prehandle_task->abortFor(PrehandleTransformStatus::Aborted); throw; } } -void executeParallelTransform( +std::tuple executeParallelTransform( LoggerPtr log, PrehandleTransformCtx & prehandle_ctx, RegionPtr new_region, - ReadFromStreamResult & result, - PrehandleResult & prehandle_result, const std::vector & split_keys, std::shared_ptr sst_stream, const SSTViewVec & snaps, - const TiFlashRaftProxyHelper * proxy_helper, - uint64_t index) + const TiFlashRaftProxyHelper * proxy_helper) { CurrentMetrics::add(CurrentMetrics::RaftNumParallelPrehandlingTasks); SCOPE_EXIT({ CurrentMetrics::sub(CurrentMetrics::RaftNumParallelPrehandlingTasks); }); @@ -522,107 +523,107 @@ void executeParallelTransform( Stopwatch watch; // Make sure the queue is bigger than `split_key_count`, otherwise `addTask` may fail. auto async_tasks = SingleSnapshotAsyncTasks(split_key_count, split_key_count, split_key_count + 5); - sst_stream->resetSoftLimit( - DM::SSTScanSoftLimit(DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT, std::string(""), std::string(split_keys[0]))); + + const DM::SSTScanSoftLimit head_soft_limit( + DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT, + std::string(""), + std::string(split_keys[0])); + sst_stream->resetSoftLimit(head_soft_limit.clone()); ParallelPrehandleCtxPtr parallel_ctx = std::make_shared(); - for (size_t extra_id = 0; extra_id < split_key_count; extra_id++) + for (size_t split_id = 0; split_id < split_key_count; ++split_id) { - auto add_result = async_tasks.addTask(extra_id, [&, extra_id]() { + auto add_result = async_tasks.addTask(split_id, [&, split_id]() { std::string origin_name = getThreadName(); SCOPE_EXIT({ setThreadName(origin_name.c_str()); }); setThreadName("para-pre-snap"); - auto limit = DM::SSTScanSoftLimit( - extra_id, - std::string(split_keys[extra_id]), - extra_id + 1 == split_key_count ? std::string("") : std::string(split_keys[extra_id + 1])); - runInParallel( - log, - prehandle_ctx, - new_region, - snaps, - proxy_helper, - index, - extra_id, - parallel_ctx, - std::move(limit)); + auto part_limit = DM::SSTScanSoftLimit( + split_id, + std::string(split_keys[split_id]), + split_id + 1 == split_key_count ? std::string("") : std::string(split_keys[split_id + 1])); + runInParallel(log, prehandle_ctx, new_region, snaps, proxy_helper, parallel_ctx, std::move(part_limit)); return true; }); RUNTIME_CHECK_MSG( add_result, "Failed when adding {}-th task for prehandling region_id={}", - extra_id, + split_id, new_region->id()); } + // This will read the keys from the beginning to the first split key auto [head_result, head_prehandle_result] = executeTransform(log, prehandle_ctx, new_region, sst_stream); LOG_INFO( log, "Finished extra parallel prehandle task, limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} " "error={} split_id={} region_id={}", - sst_stream->getSoftLimit()->toDebugString(), + head_soft_limit.toDebugString(), head_prehandle_result.stats.write_cf_keys, head_prehandle_result.stats.lock_cf_keys, head_prehandle_result.stats.default_cf_keys, head_prehandle_result.ingest_ids.size(), magic_enum::enum_name(head_result.error), - DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT, + head_soft_limit.split_id, new_region->id()); // Wait all threads to join. May throw. // If one thread throws, then all result is useless, so `async_tasks` is released directly. - for (size_t extra_id = 0; extra_id < split_key_count; extra_id++) + for (size_t split_id = 0; split_id < split_key_count; ++split_id) { // May get exception. - LOG_DEBUG(log, "Try fetch prehandle task split_id={}, region_id={}", extra_id, new_region->id()); - async_tasks.fetchResult(extra_id); - } - if (head_result.error == PrehandleTransformStatus::Ok) - { - prehandle_result = std::move(head_prehandle_result); - // Aggregate results. - for (size_t extra_id = 0; extra_id < split_key_count; extra_id++) - { - std::scoped_lock l(parallel_ctx->mut); - if (parallel_ctx->gather_res[extra_id].error == PrehandleTransformStatus::Ok) - { - result.error = PrehandleTransformStatus::Ok; - auto & v = parallel_ctx->gather_prehandle_res[extra_id]; - prehandle_result.ingest_ids.insert( - prehandle_result.ingest_ids.end(), - std::make_move_iterator(v.ingest_ids.begin()), - std::make_move_iterator(v.ingest_ids.end())); - v.ingest_ids.clear(); - prehandle_result.stats.mergeFrom(v.stats); - // Merge all uncommitted data in different splits. - new_region->mergeDataFrom(*parallel_ctx->gather_res[extra_id].region); - } - else - { - // Once a prehandle has non-ok result, we quit further loop - result = parallel_ctx->gather_res[extra_id]; - result.extra_msg = fmt::format(", from {}", extra_id); - break; - } - } - LOG_INFO( - log, - "Finished all extra parallel prehandle task, write_cf={} dmfiles={} error={} splits={} cost={:.3f}s " - "region_id={}", - prehandle_result.stats.write_cf_keys, - prehandle_result.ingest_ids.size(), - magic_enum::enum_name(head_result.error), - split_key_count, - watch.elapsedSeconds(), - new_region->id()); + LOG_DEBUG(log, "Try fetch prehandle task split_id={}, region_id={}", split_id, new_region->id()); + async_tasks.fetchResult(split_id); } - else + + ReadFromStreamResult result; + PrehandleResult prehandle_result; + if (head_result.error != PrehandleTransformStatus::Ok) { // Otherwise, fallback to error handling or exception handling. result = head_result; result.extra_msg = fmt::format(", from {}", DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT); + return {result, prehandle_result}; + } + + assert(head_result.error == PrehandleTransformStatus::Ok); + prehandle_result = std::move(head_prehandle_result); + // Aggregate results. + for (size_t split_id = 0; split_id < split_key_count; ++split_id) + { + std::scoped_lock l(parallel_ctx->mut); + if (parallel_ctx->gather_res[split_id].error == PrehandleTransformStatus::Ok) + { + result.error = PrehandleTransformStatus::Ok; + auto & v = parallel_ctx->gather_prehandle_res[split_id]; + prehandle_result.ingest_ids.insert( + prehandle_result.ingest_ids.end(), + std::make_move_iterator(v.ingest_ids.begin()), + std::make_move_iterator(v.ingest_ids.end())); + v.ingest_ids.clear(); + prehandle_result.stats.mergeFrom(v.stats); + // Merge all uncommitted data in different splits. + new_region->mergeDataFrom(*parallel_ctx->gather_res[split_id].region); + } + else + { + // Once a prehandle has non-ok result, we quit further loop + result = parallel_ctx->gather_res[split_id]; + result.extra_msg = fmt::format(", from {}", split_id); + break; + } } + LOG_INFO( + log, + "Finished all extra parallel prehandle task, write_cf={} dmfiles={} error={} splits={} cost={:.3f}s " + "region_id={}", + prehandle_result.stats.write_cf_keys, + prehandle_result.ingest_ids.size(), + magic_enum::enum_name(head_result.error), + split_key_count, + watch.elapsedSeconds(), + new_region->id()); + return {result, prehandle_result}; } /// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data @@ -694,7 +695,8 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( .schema_snap = schema_snap, .gc_safepoint = gc_safepoint, .force_decode = force_decode, - .expected_size = expected_block_size}; + .expected_size = expected_block_size, + }; auto sst_stream = std::make_shared( new_region, @@ -712,7 +714,9 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( .job_type = job_type, .storage = storage, .opts = opt, - .tmt = tmt}; + .tmt = tmt, + .snapshot_index = index, + }; // `split_keys` do not begin with 'z'. auto [split_keys, approx_bytes] = getSplitKey(log, this, new_region, sst_stream); @@ -730,17 +734,14 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( } else { - executeParallelTransform( + std::tie(result, prehandle_result) = executeParallelTransform( log, prehandle_ctx, new_region, - result, - prehandle_result, split_keys, sst_stream, snaps, - proxy_helper, - index); + proxy_helper); } prehandle_result.stats.approx_raft_snapshot_size = approx_bytes;