From 4b48b97f07d5c1905c67755d35c50767b5024ce8 Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Thu, 1 Jun 2023 15:46:42 +0800 Subject: [PATCH] This is an automated cherry-pick of #7530 Signed-off-by: ti-chi-bot --- dbms/src/Common/FailPoint.cpp | 6 +- dbms/src/Common/ThreadedWorker.h | 223 +++++++ dbms/src/Common/TiFlashMetrics.h | 8 +- .../Common/tests/gtest_threaded_worker.cpp | 288 +++++++++ .../Coprocessor/DAGStorageInterpreter.cpp | 5 + .../Flash/Coprocessor/GenSchemaAndColumn.cpp | 2 +- .../Flash/Disaggregated/RNPagePreparer.cpp | 202 ------ dbms/src/Flash/Disaggregated/RNPagePreparer.h | 77 --- .../Flash/Disaggregated/RNPageReceiver.cpp | 450 ------------- dbms/src/Flash/Disaggregated/RNPageReceiver.h | 206 ------ .../Disaggregated/RNPageReceiverContext.cpp | 198 ------ .../Disaggregated/RNPageReceiverContext.h | 93 --- dbms/src/Flash/FlashService.cpp | 7 + dbms/src/Server/StorageConfigParser.cpp | 6 +- .../Storages/DeltaMerge/Remote/RNReadTask.cpp | 156 +++++ .../Storages/DeltaMerge/Remote/RNReadTask.h | 143 +++++ ...NRemoteReadTask_fwd.h => RNReadTask_fwd.h} | 18 +- .../DeltaMerge/Remote/RNRemoteReadTask.cpp | 590 ------------------ .../DeltaMerge/Remote/RNRemoteReadTask.h | 279 --------- .../Remote/RNSegmentInputStream.cpp | 103 +++ .../DeltaMerge/Remote/RNSegmentInputStream.h | 81 +++ .../DeltaMerge/Remote/RNWorkerFetchPages.cpp | 276 ++++++++ .../DeltaMerge/Remote/RNWorkerFetchPages.h | 74 +++ .../Remote/RNWorkerPrepareStreams.cpp | 39 ++ .../Remote/RNWorkerPrepareStreams.h | 82 +++ .../Storages/DeltaMerge/Remote/RNWorkers.cpp | 70 +++ .../Storages/DeltaMerge/Remote/RNWorkers.h | 72 +++ .../DeltaMerge/Remote/RNWorkers_fwd.h | 25 + .../tests/gtest_rn_remote_read_task.cpp | 234 ------- dbms/src/Storages/S3/FileCache.cpp | 2 + dbms/src/Storages/StorageDisaggregated.h | 36 +- .../Storages/StorageDisaggregatedRemote.cpp | 253 ++++---- 32 files changed, 1827 insertions(+), 2477 deletions(-) create mode 100644 dbms/src/Common/ThreadedWorker.h create mode 100644 dbms/src/Common/tests/gtest_threaded_worker.cpp delete mode 100644 dbms/src/Flash/Disaggregated/RNPagePreparer.cpp delete mode 100644 dbms/src/Flash/Disaggregated/RNPagePreparer.h delete mode 100644 dbms/src/Flash/Disaggregated/RNPageReceiver.cpp delete mode 100644 dbms/src/Flash/Disaggregated/RNPageReceiver.h delete mode 100644 dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp delete mode 100644 dbms/src/Flash/Disaggregated/RNPageReceiverContext.h create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNReadTask.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNReadTask.h rename dbms/src/Storages/DeltaMerge/Remote/{RNRemoteReadTask_fwd.h => RNReadTask_fwd.h} (57%) delete mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp delete mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkers_fwd.h delete mode 100644 dbms/src/Storages/DeltaMerge/Remote/tests/gtest_rn_remote_read_task.cpp diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 39b748a194c..c6f6961857c 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -71,7 +71,8 @@ namespace DB M(pause_before_full_gc_prepare) \ M(force_owner_mgr_state) \ M(exception_during_spill) \ - M(force_fail_to_create_etcd_session) + M(force_fail_to_create_etcd_session) \ + M(force_remote_read_for_batch_cop_once) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ @@ -101,7 +102,8 @@ namespace DB M(force_set_mocked_s3_object_mtime) \ M(force_stop_background_checkpoint_upload) \ M(skip_seek_before_read_dmfile) \ - M(exception_after_large_write_exceed) + M(exception_after_large_write_exceed) \ + M(exception_when_fetch_disagg_pages) #define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \ M(pause_with_alter_locks_acquired) \ diff --git a/dbms/src/Common/ThreadedWorker.h b/dbms/src/Common/ThreadedWorker.h new file mode 100644 index 00000000000..3b50317f46c --- /dev/null +++ b/dbms/src/Common/ThreadedWorker.h @@ -0,0 +1,223 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ + +/// A ThreadedWorker spawns n (n=concurrency) threads to process tasks. +/// Each thread takes a task from the source queue, do some work, and put the result into the +/// result queue. +/// ThreadedWorker manages the state of its result queue, for example, when all tasks are finished +/// or when there are errors, the result queue will be finished or cancelled. +template +class ThreadedWorker +{ +public: + /// WARNING: As Base class destructors always run AFTER the derived class destructors, + /// derived workers must implement its own destructors, like: + /// + /// ```c++ + /// ~MyWorker() override { wait(); } + /// ``` + /// + /// Otherwise, the `doWork()` may be still running and accessing derived class + /// members, while derived class is already destructed. + virtual ~ThreadedWorker() = default; + + void startInBackground() noexcept + { + std::call_once(start_flag, [this] { + LOG_DEBUG(log, "Starting {} workers, concurrency={}", getName(), concurrency); + active_workers = concurrency; + + watch_start.restart(); + for (size_t index = 0; index < concurrency; ++index) + { + thread_manager->schedule(true, getName(), [this, index] { + total_wait_schedule_ms += watch_start.elapsedMilliseconds(); + + workerLoop(index); + handleWorkerFinished(); + }); + } + }); + } + + void wait() noexcept + { + std::call_once(wait_flag, [this] { + try + { + // thread_manager->wait can be only called once. + thread_manager->wait(); + } + catch (...) + { + // This should not occur, as we should have caught all exceptions in workerLoop. + auto error = getCurrentExceptionMessage(false); + LOG_WARNING(log, "{} meet unexepcted error: {}", getName(), error); + } + }); + } + + ThreadedWorker( + std::shared_ptr> source_queue_, + std::shared_ptr> result_queue_, + LoggerPtr log_, + size_t concurrency_) + : concurrency(concurrency_) + , source_queue(source_queue_) + , result_queue(result_queue_) + , log(log_) + , thread_manager(newThreadManager()) + { + RUNTIME_CHECK(concurrency > 0, concurrency); + } + +public: + const size_t concurrency; + const std::shared_ptr> source_queue; + const std::shared_ptr> result_queue; + +protected: + const LoggerPtr log; + std::shared_ptr thread_manager; + std::atomic active_workers = 0; + + virtual String getName() const noexcept = 0; + + virtual Dest doWork(const Src & task) = 0; + +private: + void handleWorkerFinished() noexcept + { + active_workers--; + if (active_workers == 0) + { + std::call_once(finish_flag, [this] { + LOG_DEBUG( + log, + "{} workers finished, total_processed_tasks={} concurrency={} elapsed={:.3f}s total_wait_schedule={:.3f}s total_wait_upstream={:.3f}s total_wait_downstream={:.3f}s", + getName(), + total_processed_tasks, + concurrency, + watch_start.elapsedSeconds(), + total_wait_schedule_ms / 1000.0, + total_wait_upstream_ms / 1000.0, + total_wait_downstream_ms / 1000.0); + // Note: the result queue may be already cancelled, but it is fine. + result_queue->finish(); + }); + } + } + + void workerLoop(size_t thread_idx) noexcept + { + try + { + while (true) + { + Src task; + + Stopwatch w{CLOCK_MONOTONIC_COARSE}; + auto pop_result = source_queue->pop(task); + total_wait_upstream_ms += w.elapsedMilliseconds(); + + if (pop_result != MPMCQueueResult::OK) + { + if (pop_result == MPMCQueueResult::FINISHED) + { + // No more work to do, just exit. + // The FINISH signal will be passed to downstreams when + // all workers are exited. + break; + } + else if (pop_result == MPMCQueueResult::CANCELLED) + { + // There are errors from upstream, populate the error to downstreams + // immediately. + auto cancel_reason = source_queue->getCancelReason(); + LOG_WARNING(log, "{}#{} meeting error from upstream: {}", getName(), thread_idx, cancel_reason); + result_queue->cancelWith(cancel_reason); + break; + } + else + { + RUNTIME_CHECK_MSG(false, "Unexpected pop MPMCQueueResult: {}", magic_enum::enum_name(pop_result)); + } + } + + auto work_result = doWork(task); + + w.restart(); + auto push_result = result_queue->push(work_result); + total_wait_downstream_ms += w.elapsedMilliseconds(); + + if (push_result != MPMCQueueResult::OK) + { + if (push_result == MPMCQueueResult::CANCELLED) + { + // There are two possible cases: + // Case A: The upstream is cancelled and one of the worker cancelled the downstream. + // Case B: There is something wrong with the downstream + // In case B, we need to populate the error to upstream, so that the whole + // pipeline is cancelled. + auto cancel_reason = source_queue->getCancelReason(); + LOG_WARNING(log, "{}#{} meeting error from downstream: {}", getName(), thread_idx, cancel_reason); + source_queue->cancelWith(cancel_reason); + break; + } + else + { + RUNTIME_CHECK_MSG(false, "Unexpected push MPMCQueueResult: {}", magic_enum::enum_name(push_result)); + } + } + + total_processed_tasks++; + } + } + catch (...) + { + auto error = getCurrentExceptionMessage(false); + LOG_ERROR(log, "{}#{} meet error: {}", getName(), thread_idx, error); + auto cancel_reason = fmt::format("{} failed: {}", getName(), error); + result_queue->cancelWith(cancel_reason); + source_queue->cancelWith(cancel_reason); + } + } + +private: + Stopwatch watch_start{CLOCK_MONOTONIC_COARSE}; + std::once_flag start_flag; + std::once_flag wait_flag; + std::once_flag finish_flag; + std::atomic total_processed_tasks = 0; + std::atomic total_wait_schedule_ms = 0; + std::atomic total_wait_upstream_ms = 0; + std::atomic total_wait_downstream_ms = 0; +}; + +} // namespace DB diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 7d9e2797c4d..aad799e48be 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -229,10 +229,12 @@ namespace DB F(type_total_establish_backoff, {{"type", "total_establish_backoff"}}, ExpBuckets{0.01, 2, 20}), \ F(type_resolve_lock, {{"type", "resolve_lock"}}, ExpBuckets{0.01, 2, 20}), \ F(type_rpc_fetch_page, {{"type", "rpc_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_write_page_cache, {{"type", "write_page_cache"}}, ExpBuckets{0.01, 2, 20}), \ F(type_cache_occupy, {{"type", "cache_occupy"}}, ExpBuckets{0.01, 2, 20}), \ - F(type_build_read_task, {{"type", "build_read_task"}}, ExpBuckets{0.01, 2, 20}), \ - F(type_seg_next_task, {{"type", "seg_next_task"}}, ExpBuckets{0.01, 2, 20}), \ - F(type_seg_build_stream, {{"type", "seg_build_stream"}}, ExpBuckets{0.01, 2, 20})) \ + F(type_worker_fetch_page, {{"type", "worker_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_worker_prepare_stream, {{"type", "worker_prepare_stream"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_stream_wait_next_task, {{"type", "stream_wait_next_task"}}, ExpBuckets{0.01, 2, 20}), \ + F(type_stream_read, {{"type", "stream_read"}}, ExpBuckets{0.01, 2, 20})) \ M(tiflash_disaggregated_details, "", Counter, \ F(type_cftiny_read, {{"type", "cftiny_read"}}), \ F(type_cftiny_fetch, {{"type", "cftiny_fetch"}})) \ diff --git a/dbms/src/Common/tests/gtest_threaded_worker.cpp b/dbms/src/Common/tests/gtest_threaded_worker.cpp new file mode 100644 index 00000000000..1b37eea3a19 --- /dev/null +++ b/dbms/src/Common/tests/gtest_threaded_worker.cpp @@ -0,0 +1,288 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +namespace DB +{ +namespace tests +{ + +class WorkerMultiply : public ThreadedWorker +{ +public: + using ThreadedWorker::ThreadedWorker; + + explicit WorkerMultiply( + std::shared_ptr> source_queue_, + std::shared_ptr> result_queue_, + size_t concurrency, + Int64 multiply_factor_ = 2, + std::chrono::duration sleep_duration_ = 50ms) + : ThreadedWorker(source_queue_, result_queue_, Logger::get(), concurrency) + , multiply_factor(multiply_factor_) + , sleep_duration(sleep_duration_) + { + } + + ~WorkerMultiply() override { wait(); } + +protected: + Int64 doWork(const Int64 & value) override + { + std::this_thread::sleep_for(sleep_duration); + return value * multiply_factor; + } + + String getName() const noexcept override { return "Multiply"; } + +private: + Int64 multiply_factor = 2; + std::chrono::duration sleep_duration = 50ms; +}; + +class WorkerErrorAtN : public ThreadedWorker +{ +public: + using ThreadedWorker::ThreadedWorker; + + explicit WorkerErrorAtN( + std::shared_ptr> source_queue_, + std::shared_ptr> result_queue_, + size_t concurrency, + Int64 error_at_, + std::chrono::duration sleep_duration_ = 50ms) + : ThreadedWorker(source_queue_, result_queue_, Logger::get(), concurrency) + , error_at(error_at_) + , sleep_duration(sleep_duration_) + { + } + + ~WorkerErrorAtN() override { wait(); } + +protected: + Int64 doWork(const Int64 & value) override + { + auto n = current_n.fetch_add(1); + std::this_thread::sleep_for(sleep_duration); + + if (static_cast(n) == error_at) + throw Exception("Error"); + + return value; + } + + String getName() const noexcept override { return "ErrorAtN"; } + +private: + size_t error_at = 0; + std::atomic current_n = 0; + std::chrono::duration sleep_duration = 50ms; +}; + +TEST(ThreadedWorker, SingleThread) +{ + auto w = WorkerMultiply( + /* src */ std::make_shared>(5), + /* result */ std::make_shared>(5), + 1); + + Int64 result = 0; + auto pop_result = w.result_queue->popTimeout(result, 100ms); + ASSERT_EQ(pop_result, MPMCQueueResult::TIMEOUT); + + // When worker is not started, push should not trigger any results to be produced. + w.source_queue->push(1); + w.source_queue->push(3); + w.source_queue->push(5); + + pop_result = w.result_queue->popTimeout(result, 300ms); + ASSERT_EQ(pop_result, MPMCQueueResult::TIMEOUT); + + w.startInBackground(); + + w.source_queue->push(7); + + w.result_queue->pop(result); + ASSERT_EQ(result, 2); + + w.result_queue->pop(result); + ASSERT_EQ(result, 6); + + w.result_queue->pop(result); + ASSERT_EQ(result, 10); + + w.result_queue->pop(result); + ASSERT_EQ(result, 14); + + pop_result = w.result_queue->popTimeout(result, 300ms); + ASSERT_EQ(pop_result, MPMCQueueResult::TIMEOUT); + + // When source queue is finished, result queue should be finished. + w.source_queue->finish(); + + pop_result = w.result_queue->pop(result); + ASSERT_EQ(pop_result, MPMCQueueResult::FINISHED); +} + +TEST(ThreadedWorker, MultiThread) +{ + auto w = WorkerMultiply( + /* src */ std::make_shared>(100), + /* result */ std::make_shared>(100), + 10); + + std::multiset remainings_results; + for (Int64 v : {23, 13, 41, 17, 19}) + { + remainings_results.emplace(v * 2); + w.source_queue->push(v); + } + + w.startInBackground(); + + Int64 r = 0; + for (int i = 0; i < 5; i++) + { + w.result_queue->pop(r); + ASSERT_TRUE(remainings_results.contains(r)); + remainings_results.erase(remainings_results.find(r)); + } + ASSERT_TRUE(remainings_results.empty()); + + w.source_queue->push(2); + w.result_queue->pop(r); + ASSERT_EQ(r, 4); + + w.source_queue->push(3); + w.result_queue->pop(r); + ASSERT_EQ(r, 6); + + w.source_queue->finish(); + auto pop_result = w.result_queue->pop(r); + ASSERT_EQ(pop_result, MPMCQueueResult::FINISHED); +} + +TEST(ThreadedWorker, Chainned) +{ + auto w1 = WorkerMultiply( + /* src */ std::make_shared>(100), + /* result */ std::make_shared>(100), + 2); + + auto w2 = WorkerMultiply( + /* src */ w1.result_queue, + /* result */ std::make_shared>(100), + 10, + /* multiply_factor*/ 3); + + std::multiset remainings_results; + for (Int64 v : {5, 7, 43, 47, 5, 7}) + { + remainings_results.emplace(v * 2 * 3); + w1.source_queue->push(v); + } + w1.source_queue->finish(); + + w1.startInBackground(); + w2.startInBackground(); + + Int64 r = 0; + for (int i = 0; i < 6; i++) + { + w2.result_queue->pop(r); + ASSERT_TRUE(remainings_results.contains(r)); + remainings_results.erase(remainings_results.find(r)); + } + ASSERT_TRUE(remainings_results.empty()); + + auto pop_result = w2.result_queue->pop(r); + ASSERT_EQ(pop_result, MPMCQueueResult::FINISHED); +} + +TEST(ThreadedWorker, ErrorInWorker) +{ + auto w = WorkerErrorAtN( + /* src */ std::make_shared>(100), + /* result */ std::make_shared>(100), + 2, + /* error_at */ 3); + + w.startInBackground(); + + Int64 r = 0; + + w.source_queue->push(4); + w.result_queue->pop(r); + ASSERT_EQ(r, 4); + + w.source_queue->push(3); + w.result_queue->pop(r); + ASSERT_EQ(r, 3); + + w.source_queue->push(7); + w.result_queue->pop(r); + ASSERT_EQ(r, 7); + + w.source_queue->push(1); + auto result = w.result_queue->pop(r); + ASSERT_EQ(result, MPMCQueueResult::CANCELLED); + + result = w.source_queue->push(4); + ASSERT_EQ(result, MPMCQueueResult::CANCELLED); +} + +TEST(ThreadedWorker, ErrorInWorkerWithNonEmptyQueue) +{ + auto w = WorkerErrorAtN( + /* src */ std::make_shared>(100), + /* result */ std::make_shared>(100), + 2, + /* error_at */ 3); + + w.source_queue->push(5); + w.source_queue->push(1); + w.source_queue->push(3); + w.source_queue->push(13); + w.source_queue->push(7); + w.source_queue->push(11); + + w.startInBackground(); + + Int64 r = 0; + auto result = w.result_queue->pop(r); + ASSERT_EQ(result, MPMCQueueResult::OK); + + result = w.result_queue->pop(r); + ASSERT_EQ(result, MPMCQueueResult::OK); + + result = w.result_queue->pop(r); + ASSERT_EQ(result, MPMCQueueResult::OK); + + result = w.result_queue->pop(r); + ASSERT_EQ(result, MPMCQueueResult::CANCELLED); + + result = w.source_queue->push(10); + ASSERT_EQ(result, MPMCQueueResult::CANCELLED); +} + + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index ad03cf37f24..e08d462bdba 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -60,6 +60,7 @@ extern const char region_exception_after_read_from_storage_some_error[]; extern const char region_exception_after_read_from_storage_all_error[]; extern const char pause_with_alter_locks_acquired[]; extern const char force_remote_read_for_batch_cop[]; +extern const char force_remote_read_for_batch_cop_once[]; extern const char pause_after_copr_streams_acquired[]; extern const char pause_after_copr_streams_acquired_once[]; } // namespace FailPoints @@ -117,6 +118,10 @@ MakeRegionQueryInfos( if (batch_cop) status = RegionException::RegionReadStatus::NOT_FOUND; }); + fiu_do_on(FailPoints::force_remote_read_for_batch_cop_once, { + if (batch_cop) + status = RegionException::RegionReadStatus::NOT_FOUND; + }); if (status != RegionException::RegionReadStatus::OK) { region_need_retry.emplace_back(r); diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index da5f8ed22ff..fd348ecc1d5 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -86,7 +86,7 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef std::tuple genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan) { auto column_defines = std::make_shared(); - size_t extra_table_id_index = InvalidColumnID; + int extra_table_id_index = InvalidColumnID; column_defines->reserve(table_scan.getColumnSize()); for (Int32 i = 0; i < table_scan.getColumnSize(); ++i) { diff --git a/dbms/src/Flash/Disaggregated/RNPagePreparer.cpp b/dbms/src/Flash/Disaggregated/RNPagePreparer.cpp deleted file mode 100644 index 0d291413a52..00000000000 --- a/dbms/src/Flash/Disaggregated/RNPagePreparer.cpp +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace DB -{ - -RNPagePreparer::RNPagePreparer( - DM::RNRemoteReadTaskPtr remote_read_tasks_, - std::shared_ptr receiver_, - const DM::ColumnDefinesPtr & columns_to_read, - size_t max_streams_, - const String & req_id, - const String & executor_id, - bool do_prepare_) - : threads_num(max_streams_ * 2) // these threads involve disk IO, 2x scale for better CPU utilization - , do_prepare(do_prepare_) - , remote_read_tasks(std::move(remote_read_tasks_)) - , receiver(std::move(receiver_)) - , live_persisters(threads_num) - , state(PageReceiverState::NORMAL) - , total_rows(0) - , total_pages(0) - , exc_log(Logger::get(req_id, executor_id)) -{ - assert(columns_to_read != nullptr); - decoder_ptr = std::make_unique(DM::toEmptyBlock(*columns_to_read)); - - try - { - for (size_t index = 0; index < threads_num; ++index) - { - auto task = std::make_shared>([this, index] { - try - { - prepareLoop(index); - } - catch (...) - { - auto ex = getCurrentExceptionMessage(true); - LOG_ERROR(Logger::get(), "PagePrepareThread#{} read loop meet exception and exited, ex={}", index, ex); - } - }); - persist_threads.emplace_back(task->get_future()); - - RNPagePreparerPool::get().scheduleOrThrowOnError([task] { (*task)(); }); - } - } - catch (...) - { - tryLogCurrentException(exc_log, __PRETTY_FUNCTION__); - throw; - } -} - -RNPagePreparer::~RNPagePreparer() noexcept -{ - for (auto & task : persist_threads) - { - try - { - task.get(); - } - catch (...) - { - tryLogCurrentException(exc_log, __PRETTY_FUNCTION__); - } - } -} - -void RNPagePreparer::prepareLoop(size_t idx) -{ - LoggerPtr log = exc_log->getChild(fmt::format("PagePrepareThread#{}", idx)); - - - bool meet_error = false; - String local_err_msg; - - while (!meet_error) - { - try - { - // no more results - if (!consumeOneResult(log)) - break; - } - catch (...) - { - meet_error = true; - local_err_msg = getCurrentExceptionMessage(false); - } - } - - downloadDone(meet_error, local_err_msg, log); - - // try to do some preparation for speed up reading - size_t num_prepared = 0; - double seconds_cost = 0.0; - if (do_prepare) - { - while (true) - { - auto seg_task = remote_read_tasks->nextTaskForPrepare(); - if (!seg_task) - break; - watch.restart(); - // do place index - seg_task->prepare(); - seconds_cost = watch.elapsedSeconds(); - num_prepared += 1; - LOG_DEBUG(log, "segment prepare done, segment_id={}", seg_task->segment_id); - // update the state - remote_read_tasks->updateTaskState(seg_task, DM::SegmentReadTaskState::DataReadyAndPrepared, false); - } - } - - remote_read_tasks->wakeAll(); - - LOG_INFO(log, "Done preparation for {} segment tasks, cost={:.3f}s", num_prepared, seconds_cost); -} - -void RNPagePreparer::downloadDone(bool meet_error, const String & local_err_msg, const LoggerPtr & log) -{ - Int32 copy_persister_num = -1; - { - std::unique_lock lock(mu); - if (meet_error) - { - if (state == PageReceiverState::NORMAL) - state = PageReceiverState::ERROR; - if (err_msg.empty()) - err_msg = local_err_msg; - } - copy_persister_num = --live_persisters; - } - - LOG_DEBUG( - log, - "persist end. meet error: {}{}, current alive persister: {}", - meet_error, - meet_error ? fmt::format(", err msg: {}", local_err_msg) : "", - copy_persister_num); - - if (copy_persister_num < 0) - { - throw Exception("live_persisters should not be less than 0!"); - } - else if (copy_persister_num == 0) - { - LOG_DEBUG(log, "All persist threads end in RNPageReceiver"); - String copy_persister_msg; - { - std::unique_lock lock(mu); - copy_persister_msg = err_msg; - } - remote_read_tasks->allDataReceive(copy_persister_msg); - } -} - -bool RNPagePreparer::consumeOneResult(const LoggerPtr & log) -{ - auto result = receiver->nextResult(decoder_ptr); - if (result.eof()) - { - LOG_DEBUG(log, "fetch reader meets eof"); - return false; - } - if (!result.ok()) - { - LOG_WARNING(log, "fetch reader meets error: {}", result.error_msg); - throw Exception(result.error_msg); - } - - const auto decode_detail = result.decode_detail; - total_rows += decode_detail.rows; - total_pages += decode_detail.pages; - - return true; -} - -} // namespace DB diff --git a/dbms/src/Flash/Disaggregated/RNPagePreparer.h b/dbms/src/Flash/Disaggregated/RNPagePreparer.h deleted file mode 100644 index aae4c1eb89d..00000000000 --- a/dbms/src/Flash/Disaggregated/RNPagePreparer.h +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include - -namespace DB -{ -class RNPagePreparer; -using RNPagePreparerPtr = std::shared_ptr; - -// `RNPagePreparer` starts background threads to keep -// - popping message from `RNPageReceiver.msg_channel` -// - persisting pages for ColumnFileTiny into RN's LocalPageCache -// - decoding blocks for ColumnFileInMemory into RN's memory -// - do some extra preparation work after all data received from -// all write nodes -class RNPagePreparer -{ -public: - RNPagePreparer( - DM::RNRemoteReadTaskPtr remote_read_tasks_, - std::shared_ptr receiver_, - const DM::ColumnDefinesPtr & columns_to_read, - size_t max_streams_, - const String & req_id, - const String & executor_id, - bool do_prepare_); - - ~RNPagePreparer() noexcept; - -private: - void prepareLoop(size_t idx); - - bool consumeOneResult(const LoggerPtr & log); - - void downloadDone(bool meet_error, const String & local_err_msg, const LoggerPtr & log); - -private: - const size_t threads_num; - const bool do_prepare; - DM::RNRemoteReadTaskPtr remote_read_tasks; - std::shared_ptr receiver; - std::vector> persist_threads; - - std::unique_ptr decoder_ptr; - - std::mutex mu; - Int32 live_persisters; - PageReceiverState state; - String err_msg; - - std::atomic total_rows; - std::atomic total_pages; - - Stopwatch watch; - LoggerPtr exc_log; -}; - -} // namespace DB diff --git a/dbms/src/Flash/Disaggregated/RNPageReceiver.cpp b/dbms/src/Flash/Disaggregated/RNPageReceiver.cpp deleted file mode 100644 index 074e4be382d..00000000000 --- a/dbms/src/Flash/Disaggregated/RNPageReceiver.cpp +++ /dev/null @@ -1,450 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -namespace DB -{ -namespace details -{ -String constructStatusString(PageReceiverState state, const String & error_message) -{ - if (error_message.empty()) - return fmt::format("Receiver state: {}", magic_enum::enum_name(state)); - return fmt::format("Receiver state: {}, error message: {}", magic_enum::enum_name(state), error_message); -} -bool pushPacket(const DM::RNRemoteSegmentReadTaskPtr & seg_task, - const String & req_info, - const TrackedPageDataPacketPtr & tracked_packet, - std::unique_ptr> & msg_channel, - LoggerPtr & log) -{ - bool push_succeed = true; - - const disaggregated::DisaggReadError * error_ptr = nullptr; - auto & packet = *tracked_packet; - if (packet.has_error()) - error_ptr = &packet.error(); - - if (!(error_ptr == nullptr // no error - && packet.chunks_size() == 0 && packet.pages_size() == 0 // empty - )) - { - auto recv_msg = std::make_shared( - req_info, - seg_task, - tracked_packet, - error_ptr); - - // We record pending msg before pushing to channel, because the msg may be consumed immediately - // after the push. - seg_task->addPendingMsg(); - push_succeed = msg_channel->push(std::move(recv_msg)) == MPMCQueueResult::OK; - } - - LOG_TRACE(log, "push recv_msg to msg_channels(size: {}) succeed:{}", 1, push_succeed); - return push_succeed; -} -} // namespace details - -template -RNPageReceiverBase::RNPageReceiverBase( - std::unique_ptr rpc_context_, - size_t source_num_, - size_t max_streams_, - const String & req_id, - const String & executor_id) - : rpc_context(std::move(rpc_context_)) - , source_num(source_num_) - , max_buffer_size(std::max(16, max_streams_ * 2)) - , thread_manager(newThreadManager()) - , live_connections(source_num) - , state(PageReceiverState::NORMAL) - , collected(false) - , thread_count(0) - , exc_log(Logger::get(req_id, executor_id)) -{ - try - { - msg_channel = std::make_unique>(max_buffer_size); - // setup fetch threads to fetch pages/blocks from write nodes - setUpConnection(); - } - catch (...) - { - try - { - cancel(); - thread_manager->wait(); - } - catch (...) - { - tryLogCurrentException(exc_log, __PRETTY_FUNCTION__); - } - throw; - } -} - -template -RNPageReceiverBase::~RNPageReceiverBase() -{ - try - { - close(); - thread_manager->wait(); - } - catch (...) - { - tryLogCurrentException(exc_log, __PRETTY_FUNCTION__); - } -} - -template -void RNPageReceiverBase::cancel() -{ - if (setEndState(PageReceiverState::CANCELED)) - { - rpc_context->cancelDisaggTaskOnTiFlashStorageNode(exc_log); - } - cancelAllMsgChannels(); -} - -template -void RNPageReceiverBase::close() -{ - setEndState(PageReceiverState::CLOSED); - finishAllMsgChannels(); -} - -template -void RNPageReceiverBase::setUpConnection() -{ - // TODO: support async - for (size_t index = 0; index < source_num; ++index) - { - thread_manager->schedule(true, "Receiver", [this] { - readLoop(); - }); - ++thread_count; - } -} - -template -PageReceiverResult RNPageReceiverBase::nextResult(std::unique_ptr & decoder_ptr) -{ - // Note that decode_ptr can not squash blocks, the blocks could comes - // from different stores, tables, segments - - std::shared_ptr recv_msg; - auto pop_res = msg_channel->pop(recv_msg); - if (pop_res != MPMCQueueResult::OK) - { - std::unique_lock lock(mu); - if (state != PageReceiverState::NORMAL) - { - return PageReceiverResult::newError(RNPageReceiverBase::name, details::constructStatusString(state, err_msg)); - } - - /// live_connections == 0, msg_channel is finished, and state is NORMAL, - /// that is the end. - return PageReceiverResult::newEOF(RNPageReceiverBase::name); - } - - assert(recv_msg != nullptr); - if (unlikely(recv_msg->error_ptr != nullptr)) - return PageReceiverResult::newError(recv_msg->req_info, recv_msg->error_ptr->msg()); - - // Decode the pages or blocks into recv_msg->seg_task - return toDecodeResult(recv_msg, decoder_ptr); -} - -template -PageReceiverResult RNPageReceiverBase::toDecodeResult( - const std::shared_ptr & recv_msg, - std::unique_ptr & decoder_ptr) -{ - assert(recv_msg != nullptr); - /// the data packets (now we ignore execution summary) - auto result = PageReceiverResult::newOk(recv_msg->req_info); - result.decode_detail = decodeChunksAndPersistPages(recv_msg, decoder_ptr); - auto has_pending_msg = recv_msg->seg_task->addConsumedMsg(); - LOG_DEBUG(exc_log, - "seg: {} state: {} received: {} pending: {}", - recv_msg->seg_task->segment_id, - magic_enum::enum_name(recv_msg->seg_task->state), - recv_msg->seg_task->num_msg_consumed, - recv_msg->seg_task->num_msg_to_consume); - if (recv_msg->seg_task->state == DM::SegmentReadTaskState::Receiving - && !has_pending_msg) - { - // All pending message of current segment task are received, - // mark the segment task is ready for reading. - rpc_context->finishTaskReceive(recv_msg->seg_task); - } - return result; -} - -template -PageDecodeDetail RNPageReceiverBase::decodeChunksAndPersistPages( - const std::shared_ptr & recv_msg, - std::unique_ptr & decoder_ptr) -{ - assert(recv_msg != nullptr); - PageDecodeDetail detail; - - LOG_DEBUG(exc_log, "decoding msg with {} chunks, {} pages", recv_msg->chunks().size(), recv_msg->pages().size()); - - if (recv_msg->empty()) - return detail; - - { - // Record total packet size - const auto & packet = *recv_msg->packet; - detail.packet_bytes = packet.ByteSizeLong(); - } - - // Note: Currently in the 2nd response memtable data is not contained. - for (const String & chunk : recv_msg->chunks()) - { - auto block = decoder_ptr->decode(chunk); - if (!block) - continue; - detail.rows += block.rows(); - if likely (block.rows() > 0) - { - recv_msg->seg_task->receiveMemTable(std::move(block)); - } - } - - // Parse the remote pages and persist them into local cache - for (const String & page : recv_msg->pages()) - { - DM::RemotePb::RemotePage remote_page; - bool parsed = remote_page.ParseFromString(page); // TODO: handle error - RUNTIME_CHECK_MSG(parsed, "Can not parse remote page"); - recv_msg->seg_task->receivePage(std::move(remote_page)); - detail.pages += 1; - } - - return detail; -} - -constexpr Int32 max_retry_times = 10; - -template -void RNPageReceiverBase::readLoop() -{ - // TODO: metrics - - CPUAffinityManager::getInstance().bindSelfQueryThread(); - bool meet_error = false; - String local_err_msg; - - // Keep popping segment fetch pages request to get the task ready - while (!meet_error) - { - auto req = rpc_context->nextFetchPagesRequest(); - if (!req.isValid()) - break; - try - { - std::tie(meet_error, local_err_msg) = taskReadLoop(req); - } - catch (...) - { - meet_error = true; - local_err_msg = getCurrentExceptionMessage(false); - } - rpc_context->finishTaskEstablish(req, meet_error); - } - connectionDone(meet_error, local_err_msg, exc_log); -} - -template -std::tuple RNPageReceiverBase::taskReadLoop(const FetchPagesRequest & req) -{ - auto status = RPCContext::getStatusOK(); - bool meet_error = false; - String local_err_msg; - - String req_info = fmt::format("tunnel{}", req.identifier()); - LoggerPtr log = exc_log->getChild(req_info); - for (int i = 0; i < max_retry_times; ++i) - { - Stopwatch w_fetch_page; - SCOPE_EXIT({ - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_rpc_fetch_page).Observe(w_fetch_page.elapsedSeconds()); - }); - - auto resp_reader = rpc_context->doRequest(req); - bool has_data = false; - // Keep reading packets and push the packet - // into msg_channels - for (;;) - { - LOG_TRACE(log, "begin next "); - TrackedPageDataPacketPtr packet = std::make_shared(); - if (bool success = resp_reader->read(packet); !success) - break; - has_data = true; - if (packet->has_error()) - { - meet_error = true; - local_err_msg = fmt::format("Read error message from page packet: {}", packet->error().msg()); - break; - } - - if (!details::pushPacket( - req.seg_task, - req_info, - packet, - msg_channel, - log)) - { - meet_error = true; - local_err_msg = fmt::format("Push page packet failed. {}", getStatusString()); - break; - } - // LOG_DEBUG(log, - // "push packet into channel, chunks: {} pages: {}, msg_size: {}", - // packet->chunks_size(), - // packet->pages_size(), - // msg_channel->size()); - } - // if meet error, such as decode packet fails, it will not retry. - if (meet_error) - { - resp_reader->cancel(local_err_msg); - break; - } - status = resp_reader->finish(); - if (status.ok()) - { - // LOG_DEBUG(log, "finish read: {}", req.debugString()); - break; - } - else - { - bool retryable = !has_data && i + 1 < max_retry_times; - LOG_WARNING( - log, - "FetchDisaggregatedPagesRequest meets rpc fail. Err code = {}, err msg = {}, retryable = {}", - status.error_code(), - status.error_message(), - retryable); - // if we have received some data, we can not retry. - if (has_data) - break; - - using namespace std::chrono_literals; - std::this_thread::sleep_for(1s); - } - } - if (!status.ok()) - { - meet_error = true; - local_err_msg = status.error_message(); - } - return {meet_error, local_err_msg}; -} - -template -bool RNPageReceiverBase::setEndState(PageReceiverState new_state) -{ - assert(new_state == PageReceiverState::CANCELED || new_state == PageReceiverState::CLOSED); - std::unique_lock lock(mu); - if (state == PageReceiverState::CANCELED || state == PageReceiverState::CLOSED) - { - return false; - } - state = new_state; - return true; -} - -template -String RNPageReceiverBase::getStatusString() -{ - std::unique_lock lock(mu); - return details::constructStatusString(state, err_msg); -} - -template -void RNPageReceiverBase::connectionDone( - bool meet_error, - const String & local_err_msg, - const LoggerPtr & log) -{ - Int32 copy_live_conn = -1; - { - std::unique_lock lock(mu); - if (meet_error) - { - if (state == PageReceiverState::NORMAL) - state = PageReceiverState::ERROR; - if (err_msg.empty()) - err_msg = local_err_msg; - } - copy_live_conn = --live_connections; - } - LOG_DEBUG( - log, - "connection end. meet error: {}{}, current alive connections: {}", - meet_error, - meet_error ? fmt::format(", err msg: {}", local_err_msg) : "", - copy_live_conn); - - if (copy_live_conn == 0) - { - LOG_DEBUG(log, "All read threads end in RNPageReceiver"); - } - else if (copy_live_conn < 0) - throw Exception("live_connections should not be less than 0!"); - - if (meet_error || copy_live_conn == 0) - finishAllMsgChannels(); -} - -template -void RNPageReceiverBase::finishAllMsgChannels() -{ - msg_channel->finish(); -} - -template -void RNPageReceiverBase::cancelAllMsgChannels() -{ - msg_channel->cancel(); -} - -/// Explicit template instantiations -template class RNPageReceiverBase; - -} // namespace DB diff --git a/dbms/src/Flash/Disaggregated/RNPageReceiver.h b/dbms/src/Flash/Disaggregated/RNPageReceiver.h deleted file mode 100644 index 99e48045080..00000000000 --- a/dbms/src/Flash/Disaggregated/RNPageReceiver.h +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ -namespace DM -{ -class RNRemoteSegmentReadTask; -using RNRemoteSegmentReadTaskPtr = std::shared_ptr; -} // namespace DM - -enum class PageReceiverState -{ - NORMAL, - ERROR, - CANCELED, - CLOSED, -}; - -struct PageReceivedMessage -{ - String req_info; - DM::RNRemoteSegmentReadTaskPtr seg_task; - const TrackedPageDataPacketPtr packet; - const disaggregated::DisaggReadError * error_ptr; - - bool empty() const { return packet->pages_size() == 0 && packet->chunks_size() == 0; } - // The serialized pages to be parsed as Page - const auto & pages() const { return packet->pages(); } - // The chunks to be parsed as Block - const auto & chunks() const { return packet->chunks(); } - - PageReceivedMessage( - const String & req_info_, - const DM::RNRemoteSegmentReadTaskPtr & seg_task_, - const TrackedPageDataPacketPtr & packet_, - const disaggregated::DisaggReadError * error_ptr_) - : req_info(req_info_) - , seg_task(seg_task_) - , packet(packet_) - , error_ptr(error_ptr_) - { - } -}; -using PageReceivedMessagePtr = std::shared_ptr; - - -/// Detail of the packet that decoding in RNPageReceiverBase.decodeChunks -struct PageDecodeDetail -{ - // Responding packets count, usually be 1, be 0 when flush data before eof - Int64 packets = 1; - - // The row number of all blocks of the original packet - Int64 rows = 0; - - // Total byte size of the origin packet - Int64 packet_bytes = 0; - - // The pages of the original packet - Int64 pages = 0; -}; -struct PageReceiverResult -{ - enum class Type - { - Ok, - Eof, - Error, - }; - - Type type; - String req_info; - String error_msg; - // details to collect execution summary - PageDecodeDetail decode_detail; - - static PageReceiverResult newOk(const String & req_info_) - { - return PageReceiverResult{Type::Ok, req_info_, /*error_msg*/ ""}; - } - - static PageReceiverResult newEOF(const String & req_info_) - { - return PageReceiverResult{Type::Eof, req_info_, /*error_msg*/ ""}; - } - - static PageReceiverResult newError(const String & req_info, const String & error_msg) - { - return PageReceiverResult{Type::Error, req_info, error_msg}; - } - - bool ok() const { return type == Type::Ok; } - bool eof() const { return type == Type::Eof; } - -private: - explicit PageReceiverResult( - Type type_, - const String & req_info_ = "", - const String & error_msg_ = "") - : type(type_) - , req_info(req_info_) - , error_msg(error_msg_) - {} -}; - -// `RNPageReceiver` starts background threads to keep -// - poping ** non-ready ** segment tasks from the task pool and fetch -// the pages and mem-tables blocks through `FlashService::FetchDisaggPages` -// from write nodes -// - receiving message from different write nodes and push them into msg_channel -template -class RNPageReceiverBase -{ -public: - static constexpr auto name = "RNPageReceiver"; - -public: - RNPageReceiverBase( - std::unique_ptr rpc_context_, - size_t source_num_, - size_t max_streams_, - const String & req_id, - const String & executor_id); - - ~RNPageReceiverBase(); - - void cancel(); - - void close(); - - PageReceiverResult nextResult(std::unique_ptr & decoder_ptr); - -private: - void setUpConnection(); - void readLoop(); - std::tuple taskReadLoop(const FetchPagesRequest & req); - - bool setEndState(PageReceiverState new_state); - String getStatusString(); - - void connectionDone( - bool meet_error, - const String & local_err_msg, - const LoggerPtr & log); - - void finishAllMsgChannels(); - void cancelAllMsgChannels(); - - PageReceiverResult toDecodeResult( - const std::shared_ptr & recv_msg, - std::unique_ptr & decoder_ptr); - - PageDecodeDetail decodeChunksAndPersistPages( - const std::shared_ptr & recv_msg, - std::unique_ptr & decoder_ptr); - -private: - std::unique_ptr rpc_context; - const size_t source_num; - const size_t max_buffer_size; - - std::shared_ptr thread_manager; - std::unique_ptr> msg_channel; - - std::mutex mu; - /// should lock `mu` when visit these members - Int32 live_connections; - PageReceiverState state; - String err_msg; - - bool collected; - int thread_count; - - LoggerPtr exc_log; -}; - -class RNPageReceiver : public RNPageReceiverBase -{ -public: - using Base = RNPageReceiverBase; - using Base::Base; -}; - -} // namespace DB diff --git a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp b/dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp deleted file mode 100644 index b2a14827b0d..00000000000 --- a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.cpp +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -namespace pingcap::kv -{ -template <> -struct RpcTypeTraits -{ - using RequestType = disaggregated::FetchDisaggPagesRequest; - using ResultType = disaggregated::PagesPacket; - static std::unique_ptr> doRPCCall( - grpc::ClientContext * context, - std::shared_ptr client, - const RequestType & req) - { - return client->stub->FetchDisaggPages(context, req); - } - static std::unique_ptr> doAsyncRPCCall( - grpc::ClientContext * context, - std::shared_ptr client, - const RequestType & req, - grpc::CompletionQueue & cq, - void * call) - { - return client->stub->AsyncFetchDisaggPages(context, req, &cq, call); - } -}; - -} // namespace pingcap::kv - -namespace DB -{ -namespace -{ -struct GRPCFetchPagesResponseReader : public FetchPagesResponseReader -{ - std::shared_ptr> call; - grpc::ClientContext client_context; - std::unique_ptr> reader; - - explicit GRPCFetchPagesResponseReader(const FetchPagesRequest & req) - { - call = std::make_shared>(req.req); - } - - bool read(TrackedPageDataPacketPtr & packet) override - { - return reader->Read(packet.get()); - } - - grpc::Status finish() override - { - return reader->Finish(); - } - - void cancel(const String &) override {} -}; - -} // namespace - -GRPCPagesReceiverContext::GRPCPagesReceiverContext( - const DM::RNRemoteReadTaskPtr & remote_read_tasks_, - pingcap::kv::Cluster * cluster_) - : remote_read_tasks(remote_read_tasks_) - , cluster(cluster_) -{} - -FetchPagesRequest::FetchPagesRequest(DM::RNRemoteSegmentReadTaskPtr seg_task_) - : seg_task(std::move(seg_task_)) - , req(std::make_shared()) -{ - // Invalid task, just skip - if (!seg_task) - return; - - auto meta = seg_task->snapshot_id.toMeta(); - // The keyspace_id here is not vital, as we locate the table and segment by given - // snapshot_id. But it could be helpful for debugging. - auto keyspace_id = seg_task->ks_table_id.first; - meta.set_keyspace_id(keyspace_id); - meta.set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); - *req->mutable_snapshot_id() = meta; - req->set_table_id(seg_task->ks_table_id.second); - req->set_segment_id(seg_task->segment_id); - - { - std::vector cf_tiny_oids; - cf_tiny_oids.reserve(seg_task->delta_tinycf_page_ids.size()); - for (const auto & page_id : seg_task->delta_tinycf_page_ids) - { - auto page_oid = DM::Remote::PageOID{ - .store_id = seg_task->store_id, - .ks_table_id = seg_task->ks_table_id, - .page_id = page_id, - }; - cf_tiny_oids.emplace_back(page_oid); - } - - // Note: We must occupySpace segment by segment, because we need to read - // at least the complete data of one segment in order to drive everything forward. - // Currently we call occupySpace for each FetchPagesRequest, which is fine, - // because we send one request each seg_task. If we want to split - // FetchPagesRequest into multiples in future, then we need to change - // the moment of calling `occupySpace`. - auto page_cache = seg_task->dm_context->db_context.getSharedContextDisagg()->rn_page_cache; - - Stopwatch w_occupy; - auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->delta_tinycf_page_sizes); - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy).Observe(w_occupy.elapsedSeconds()); - - for (auto page_id : occupy_result.pages_not_in_cache) - req->add_page_ids(page_id.page_id); - - auto cftiny_total = seg_task->delta_tinycf_page_ids.size(); - auto cftiny_fetch = occupy_result.pages_not_in_cache.size(); - LOG_INFO( - Logger::get(), - "read task local cache hit rate: {}, pages_not_in_cache={}", - cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total), - occupy_result.pages_not_in_cache); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch); - - seg_task->initColumnFileDataProvider(occupy_result.pages_guard); - } -} - -const String & FetchPagesRequest::address() const -{ - assert(seg_task != nullptr); - return seg_task->address; -} - -FetchPagesRequest GRPCPagesReceiverContext::nextFetchPagesRequest() const -{ - auto seg_task = remote_read_tasks->nextFetchTask(); - return FetchPagesRequest(std::move(seg_task)); -} - -void GRPCPagesReceiverContext::finishTaskEstablish(const FetchPagesRequest & req, bool meet_error) -{ - remote_read_tasks->updateTaskState(req.seg_task, DM::SegmentReadTaskState::Receiving, meet_error); -} - -void GRPCPagesReceiverContext::finishTaskReceive(const DM::RNRemoteSegmentReadTaskPtr & seg_task) -{ - remote_read_tasks->updateTaskState(seg_task, DM::SegmentReadTaskState::DataReady, false); -} - -void GRPCPagesReceiverContext::cancelDisaggTaskOnTiFlashStorageNode(LoggerPtr /*log*/) -{ - // TODO cancel -} - -FetchPagesResponseReaderPtr GRPCPagesReceiverContext::doRequest(const FetchPagesRequest & request) const -{ - auto reader = std::make_shared(request); - reader->reader = cluster->rpc_client->sendStreamRequest( - request.address(), - &reader->client_context, - *reader->call); - return reader; -} - -String FetchPagesRequest::debugString() const -{ - return req->ShortDebugString(); -} -} // namespace DB diff --git a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.h b/dbms/src/Flash/Disaggregated/RNPageReceiverContext.h deleted file mode 100644 index d4bd0c51808..00000000000 --- a/dbms/src/Flash/Disaggregated/RNPageReceiverContext.h +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include - -#include - -namespace DB -{ -using PageDataPacket = disaggregated::PagesPacket; -// TODO make the memory tracked -using TrackedPageDataPacketPtr = std::shared_ptr; -using TrackedPageDataPacketPtrs = std::vector; - -class FetchPagesResponseReader -{ -public: - virtual ~FetchPagesResponseReader() = default; - virtual bool read(TrackedPageDataPacketPtr & packet) = 0; - virtual grpc::Status finish() = 0; - virtual void cancel(const String & reason) = 0; -}; -using FetchPagesResponseReaderPtr = std::shared_ptr; - -struct FetchPagesRequest -{ - DM::RNRemoteSegmentReadTaskPtr seg_task; - std::shared_ptr req; - - explicit FetchPagesRequest(DM::RNRemoteSegmentReadTaskPtr seg_task_); - - bool isValid() const { return seg_task != nullptr; } - - const String & address() const; - - String identifier() const - { - assert(isValid()); - return fmt::format("{}+{}+{}+{}", seg_task->store_id, seg_task->ks_table_id.first, seg_task->ks_table_id.second, seg_task->segment_id); - } - - String debugString() const; -}; - -class GRPCPagesReceiverContext -{ -public: - using Status = grpc::Status; - - GRPCPagesReceiverContext( - const DM::RNRemoteReadTaskPtr & remote_read_tasks, - pingcap::kv::Cluster * cluster_); - - FetchPagesRequest nextFetchPagesRequest() const; - - FetchPagesResponseReaderPtr doRequest(const FetchPagesRequest & request) const; - - static Status getStatusOK() - { - return grpc::Status::OK; - } - - // When error happens, try cancel disagg task on the storage node side. - void cancelDisaggTaskOnTiFlashStorageNode(LoggerPtr log); - - void finishTaskEstablish(const FetchPagesRequest & req, bool meet_error); - - void finishTaskReceive(const DM::RNRemoteSegmentReadTaskPtr & seg_task); - -private: - // The remote segment task pool - DM::RNRemoteReadTaskPtr remote_read_tasks; - pingcap::kv::Cluster * cluster; -}; -} // namespace DB diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 933454c142d..2cdcc76e8ba 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -61,6 +62,10 @@ extern const int NOT_IMPLEMENTED; extern const int UNKNOWN_EXCEPTION; extern const int DISAGG_ESTABLISH_RETRYABLE_ERROR; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char exception_when_fetch_disagg_pages[]; +} // namespace FailPoints #define CATCH_FLASHSERVICE_EXCEPTION \ catch (Exception & e) \ @@ -760,6 +765,8 @@ grpc::Status FlashService::FetchDisaggPages( auto task = snap->popSegTask(request->table_id(), request->segment_id()); RUNTIME_CHECK(task.isValid(), task.err_msg); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_when_fetch_disagg_pages); + PageIdU64s read_ids; read_ids.reserve(request->page_ids_size()); for (auto page_id : request->page_ids()) diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index 7291bb12872..e481f96b850 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -14,6 +14,7 @@ /// Suppress gcc warning: ‘*((void*)& +4)’ may be used uninitialized in this function #include +#include #include #include #include @@ -643,7 +644,10 @@ void StorageRemoteCacheConfig::parse(const String & content, const LoggerPtr & l readConfig(table, "dtfile_level", dtfile_level); RUNTIME_CHECK(dtfile_level <= 100); readConfig(table, "delta_rate", delta_rate); - RUNTIME_CHECK(std::isgreaterequal(delta_rate, 0.1) && std::islessequal(delta_rate, 1.0), delta_rate); + RUNTIME_CHECK(std::isgreaterequal(delta_rate, 0.0) && std::islessequal(delta_rate, 1.0), delta_rate); + if (delta_rate == 0.0) + LOG_WARNING(log, "Starting with unlimited delta page cache capacity, delta_rate={}", delta_rate); + readConfig(table, "reserved_rate", reserved_rate); RUNTIME_CHECK(std::isgreaterequal(reserved_rate, 0.0) && std::islessequal(reserved_rate, 0.5), reserved_rate); RUNTIME_CHECK(std::islessequal(delta_rate + reserved_rate, 1.0), delta_rate, reserved_rate); diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNReadTask.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNReadTask.cpp new file mode 100644 index 00000000000..7f50381cbeb --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNReadTask.cpp @@ -0,0 +1,156 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::DM::Remote +{ + +RNReadSegmentTaskPtr RNReadSegmentTask::buildFromEstablishResp( + const LoggerPtr & log, + const Context & db_context, + const ScanContextPtr & scan_context, + const RemotePb::RemoteSegment & proto, + const DisaggTaskId & snapshot_id, + StoreID store_id, + const String & store_address, + KeyspaceID keyspace_id, + TableID physical_table_id) +{ + RowKeyRange segment_range; + { + ReadBufferFromString rb(proto.key_range()); + segment_range = RowKeyRange::deserialize(rb); + } + RowKeyRanges read_ranges(proto.read_key_ranges_size()); + for (int i = 0; i < proto.read_key_ranges_size(); ++i) + { + ReadBufferFromString rb(proto.read_key_ranges(i)); + read_ranges[i] = RowKeyRange::deserialize(rb); + } + + auto dm_context = std::make_shared( + db_context, + /* path_pool */ nullptr, + /* storage_pool */ nullptr, + /* min_version */ 0, + keyspace_id, + physical_table_id, + /* is_common_handle */ segment_range.is_common_handle, + /* rowkey_column_size */ segment_range.rowkey_column_size, + db_context.getSettingsRef(), + scan_context); + + auto segment = std::make_shared( + log, + /*epoch*/ 0, + segment_range, + proto.segment_id(), + /*next_segment_id*/ 0, + nullptr, + nullptr); + + auto segment_snap = Serializer::deserializeSegmentSnapshotFrom( + *dm_context, + store_id, + physical_table_id, + proto); + + // Note: At this moment, we still cannot read from `task->segment_snap`, + // because they are constructed using ColumnFileDataProviderNop. + + std::vector delta_tinycf_ids; + std::vector delta_tinycf_sizes; + { + auto persisted_cfs = segment_snap->delta->getPersistedFileSetSnapshot(); + delta_tinycf_ids.reserve(persisted_cfs->getColumnFileCount()); + delta_tinycf_sizes.reserve(persisted_cfs->getColumnFileCount()); + for (const auto & cfs : persisted_cfs->getColumnFiles()) + { + if (auto * tiny = cfs->tryToTinyFile(); tiny) + { + delta_tinycf_ids.emplace_back(tiny->getDataPageId()); + delta_tinycf_sizes.emplace_back(tiny->getDataPageSize()); + } + } + } + + LOG_DEBUG( + log, + "Build RNReadSegmentTask, segment_id={} store_id={} keyspace_id={} table_id={} memtable_cfs={} persisted_cfs={}", + proto.segment_id(), + store_id, + keyspace_id, + physical_table_id, + segment_snap->delta->getMemTableSetSnapshot()->getColumnFileCount(), + segment_snap->delta->getPersistedFileSetSnapshot()->getColumnFileCount()); + + return std::shared_ptr(new RNReadSegmentTask( + RNReadSegmentMeta{ + .keyspace_id = keyspace_id, + .physical_table_id = physical_table_id, + .segment_id = proto.segment_id(), + .store_id = store_id, + + .delta_tinycf_page_ids = delta_tinycf_ids, + .delta_tinycf_page_sizes = delta_tinycf_sizes, + .segment = segment, + .segment_snap = segment_snap, + .store_address = store_address, + + .read_ranges = read_ranges, + .snapshot_id = snapshot_id, + .dm_context = dm_context, + })); +} + +void RNReadSegmentTask::initColumnFileDataProvider(const RNLocalPageCacheGuardPtr & pages_guard) +{ + auto & data_provider = meta.segment_snap->delta->getPersistedFileSetSnapshot()->data_provider; + RUNTIME_CHECK(std::dynamic_pointer_cast(data_provider)); + + auto page_cache = meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + data_provider = std::make_shared( + page_cache, + pages_guard, + meta.store_id, + KeyspaceTableID{meta.keyspace_id, meta.physical_table_id}); +} + +void RNReadSegmentTask::initInputStream( + const ColumnDefines & columns_to_read, + UInt64 read_tso, + const PushDownFilterPtr & push_down_filter, + ReadMode read_mode) +{ + RUNTIME_CHECK(input_stream == nullptr); + input_stream = meta.segment->getInputStream( + read_mode, + *meta.dm_context, + columns_to_read, + meta.segment_snap, + meta.read_ranges, + push_down_filter, + read_tso, + DEFAULT_BLOCK_SIZE); +} + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNReadTask.h b/dbms/src/Storages/DeltaMerge/Remote/RNReadTask.h new file mode 100644 index 00000000000..b29d64d692e --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNReadTask.h @@ -0,0 +1,143 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB::DM +{ +class Segment; +using SegmentPtr = std::shared_ptr; + +struct SegmentSnapshot; +using SegmentSnapshotPtr = std::shared_ptr; + +struct DMContext; +using DMContextPtr = std::shared_ptr; +} // namespace DB::DM + +namespace DB::DM::Remote +{ + +struct RNReadSegmentMeta +{ + // ======== Fields below uniquely identify a remote segment ======== + const KeyspaceID keyspace_id; + const TableID physical_table_id; + const UInt64 segment_id; + const StoreID store_id; + // ================================================================= + + // ======== Fields below are other supplementary information about the remote segment ======== + const std::vector delta_tinycf_page_ids; + const std::vector delta_tinycf_page_sizes; + const SegmentPtr segment; + const SegmentSnapshotPtr segment_snap; + const String store_address; + // =========================================================================================== + + // ======== Fields below are information about this reading ======== + const RowKeyRanges read_ranges; + const DisaggTaskId snapshot_id; + const DMContextPtr dm_context; + // ================================================================= +}; + +/// Represent a read from a remote segment. Initially it is built from information +/// returned by Write Node in EstablishDisaggTask. +/// It is a stateful object, fields may be changed when the segment is being read. +class RNReadSegmentTask : boost::noncopyable +{ +public: + // meta is assigned when this SegmentTask is initially created from info returned + // by Write Node. It is never changed. + const RNReadSegmentMeta meta; + + static RNReadSegmentTaskPtr buildFromEstablishResp( + const LoggerPtr & log, + const Context & db_context, + const ScanContextPtr & scan_context, + const RemotePb::RemoteSegment & proto, + const DisaggTaskId & snapshot_id, + StoreID store_id, + const String & store_address, + KeyspaceID keyspace_id, + TableID physical_table_id); + + String info() const + { + return fmt::format( + "ReadSegmentTask", + meta.store_id, + meta.segment_id, + meta.physical_table_id); + } + + /// Called from WorkerFetchPages. + void initColumnFileDataProvider(const RNLocalPageCacheGuardPtr & pages_guard); + + /// Called from WorkerPrepareStreams. + void initInputStream( + const ColumnDefines & columns_to_read, + UInt64 read_tso, + const PushDownFilterPtr & push_down_filter, + ReadMode read_mode); + + BlockInputStreamPtr getInputStream() const + { + RUNTIME_CHECK(input_stream != nullptr); + return input_stream; + } + +private: + explicit RNReadSegmentTask(const RNReadSegmentMeta & meta_) + : meta(meta_) + { + } + +private: + BlockInputStreamPtr input_stream; +}; + +/// "Remote read" is simply reading from these remote segments. +class RNReadTask : boost::noncopyable +{ +public: + const std::vector segment_read_tasks; + + static RNReadTaskPtr create(const std::vector & segment_read_tasks_) + { + return std::shared_ptr(new RNReadTask(segment_read_tasks_)); + } + +private: + explicit RNReadTask(const std::vector & segment_read_tasks_) + : segment_read_tasks(segment_read_tasks_) + { + } +}; + + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask_fwd.h b/dbms/src/Storages/DeltaMerge/Remote/RNReadTask_fwd.h similarity index 57% rename from dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask_fwd.h rename to dbms/src/Storages/DeltaMerge/Remote/RNReadTask_fwd.h index dedf759d40f..10475d930cc 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask_fwd.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNReadTask_fwd.h @@ -14,18 +14,16 @@ #pragma once +#include #include -namespace DB::DM +namespace DB::DM::Remote { -class RNRemoteReadTask; -using RNRemoteReadTaskPtr = std::shared_ptr; -class RNRemoteStoreReadTask; -using RNRemoteStoreReadTaskPtr = std::shared_ptr; -class RNRemotePhysicalTableReadTask; -using RNRemotePhysicalTableReadTaskPtr = std::shared_ptr; -class RNRemoteSegmentReadTask; -using RNRemoteSegmentReadTaskPtr = std::shared_ptr; +class RNReadTask; +using RNReadTaskPtr = std::shared_ptr; -} // namespace DB::DM +class RNReadSegmentTask; +using RNReadSegmentTaskPtr = std::shared_ptr; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp deleted file mode 100644 index bace75222d7..00000000000 --- a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.cpp +++ /dev/null @@ -1,590 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -namespace DB::DM -{ - -RNRemoteReadTask::RNRemoteReadTask(std::vector && input_tasks_) - : num_segments(0) - , log(Logger::get()) -{ - for (const auto & store_task : input_tasks_) - { - if (!store_task) - continue; - auto res = store_tasks.emplace(store_task->store_id, store_task); - RUNTIME_CHECK_MSG(res.second, "Duplicated task from store_id={}", store_task->store_id); - num_segments += store_task->numRemainTasks(); - - // Push all inited tasks to ready queue - for (const auto & table_task : store_task->table_read_tasks) - { - for (const auto & seg_task : table_task->tasks) - { - // TODO: If all pages are ready in local - // cache, and the segment does not contains any - // blocks on write node's mem-table, then we - // can simply skip the fetch page pharse and - // push it into ready queue - ready_segment_tasks[seg_task->state].push_back(seg_task); - } - } - } - curr_store = store_tasks.begin(); -} - -RNRemoteReadTask::~RNRemoteReadTask() -{ - cv_ready_tasks.notify_all(); -} - -size_t RNRemoteReadTask::numSegments() const -{ - return num_segments; -} - -RNRemoteSegmentReadTaskPtr RNRemoteReadTask::nextFetchTask() -{ - // A simple scheduling policy that try to execute the segment tasks - // from different stores in parallel - std::lock_guard gurad(mtx_tasks); - while (true) - { - if (store_tasks.empty()) - return nullptr; - - if (curr_store->second->numRemainTasks() > 0) - { - auto task = curr_store->second->nextTask(); - // Move to next store - curr_store++; - if (curr_store == store_tasks.end()) - curr_store = store_tasks.begin(); - return task; - } - // No tasks left in this store, erase and try to pop task from the next store - curr_store = store_tasks.erase(curr_store); - if (curr_store == store_tasks.end()) - curr_store = store_tasks.begin(); - } -} - -void RNRemoteReadTask::updateTaskState(const RNRemoteSegmentReadTaskPtr & seg_task, SegmentReadTaskState target_state, bool meet_error) -{ - { - std::unique_lock ready_lock(mtx_ready_tasks); - const auto old_state = seg_task->state; - auto state_iter = ready_segment_tasks.find(old_state); - RUNTIME_CHECK(state_iter != ready_segment_tasks.end()); - - // TODO: make it an unordered_map - bool found = false; - for (auto task_iter = state_iter->second.begin(); task_iter != state_iter->second.end(); task_iter++) - { - auto & task = *task_iter; - if (task->store_id != seg_task->store_id - || task->ks_table_id != seg_task->ks_table_id - || task->segment_id != seg_task->segment_id) - { - continue; - } - seg_task->state = meet_error ? SegmentReadTaskState::Error : target_state; - found = true; - // Move it into the right state, note `task`/`task_iter` is invalid - state_iter->second.erase(task_iter); - if (state_iter->second.empty()) - ready_segment_tasks.erase(state_iter); - - insertTask(seg_task, ready_lock); - break; - } - RUNTIME_CHECK(found); - } - - cv_ready_tasks.notify_one(); -} - -void RNRemoteReadTask::allDataReceive(const String & end_err_msg) -{ - { - std::unique_lock ready_lock(mtx_ready_tasks); - // set up the error message - if (err_msg.empty() && !end_err_msg.empty()) - err_msg = end_err_msg; - - for (auto iter = ready_segment_tasks.begin(); iter != ready_segment_tasks.end(); /* empty */) - { - const auto state = iter->first; - const auto & tasks = iter->second; - if (state != SegmentReadTaskState::Init && state != SegmentReadTaskState::Receiving) - { - ++iter; - continue; - } - - // init or receiving -> all data ready - for (const auto & seg_task : tasks) - { - auto old_state = seg_task->state; - seg_task->state = SegmentReadTaskState::DataReady; - LOG_DEBUG( - log, - "seg_task: {} from {} to {}", - seg_task->segment_id, - magic_enum::enum_name(old_state), - magic_enum::enum_name(seg_task->state)); - insertTask(seg_task, ready_lock); - } - - iter = ready_segment_tasks.erase(iter); - } - } - cv_ready_tasks.notify_all(); -} - - -void RNRemoteReadTask::insertTask(const RNRemoteSegmentReadTaskPtr & seg_task, std::unique_lock &) -{ - if (auto state_iter = ready_segment_tasks.find(seg_task->state); - state_iter != ready_segment_tasks.end()) - state_iter->second.push_back(seg_task); - else - ready_segment_tasks.emplace(seg_task->state, std::list{seg_task}); -} - -RNRemoteSegmentReadTaskPtr RNRemoteReadTask::nextTaskForPrepare() -{ - std::unique_lock ready_lock(mtx_ready_tasks); - RNRemoteSegmentReadTaskPtr seg_task = nullptr; - cv_ready_tasks.wait(ready_lock, [this, &seg_task, &ready_lock] { - // All segment task are processed, return a nullptr - if (doneOrErrorHappen()) - return true; - - // Check whether there are segment task ready for place index - if (auto iter = ready_segment_tasks.find(SegmentReadTaskState::DataReady); iter != ready_segment_tasks.end()) - { - if (iter->second.empty()) - return false; // yield for another awake - seg_task = iter->second.front(); - iter->second.pop_front(); - if (iter->second.empty()) - { - ready_segment_tasks.erase(iter); - } - - const auto old_state = seg_task->state; - seg_task->state = SegmentReadTaskState::DataReadyAndPrepraring; - LOG_DEBUG( - log, - "seg_task: {} from {} to {}", - seg_task->segment_id, - magic_enum::enum_name(old_state), - magic_enum::enum_name(seg_task->state)); - insertTask(seg_task, ready_lock); - return true; - } - // If there exist some task that will become "DataReady", then we should - // wait. Else we should return true to end the wait. - bool has_more_tasks = (ready_segment_tasks.count(SegmentReadTaskState::Init) > 0 - || ready_segment_tasks.count(SegmentReadTaskState::Receiving) > 0); - return !has_more_tasks; - }); - return seg_task; -} - -RNRemoteSegmentReadTaskPtr RNRemoteReadTask::nextReadyTask() -{ - std::unique_lock ready_lock(mtx_ready_tasks); - RNRemoteSegmentReadTaskPtr seg_task = nullptr; - cv_ready_tasks.wait(ready_lock, [this, &seg_task] { - // All segment task are processed, return a nullptr - if (doneOrErrorHappen()) - return true; - - // First check whether there are prepared segment task - if (auto iter = ready_segment_tasks.find(SegmentReadTaskState::DataReadyAndPrepared); iter != ready_segment_tasks.end()) - { - if (!iter->second.empty()) - { - seg_task = iter->second.front(); - iter->second.pop_front(); - if (iter->second.empty()) - { - ready_segment_tasks.erase(iter); - } - return true; - } - } - // Else fallback to check whether there are segment task ready for reading - if (auto iter = ready_segment_tasks.find(SegmentReadTaskState::DataReady); iter != ready_segment_tasks.end()) - { - if (iter->second.empty()) - return false; // yield and wait for next check - seg_task = iter->second.front(); - iter->second.pop_front(); - if (iter->second.empty()) - { - ready_segment_tasks.erase(iter); - } - return true; - } - return false; // yield and wait for next check - }); - - return seg_task; -} - -const String & RNRemoteReadTask::getErrorMessage() const -{ - std::unique_lock ready_lock(mtx_ready_tasks); - return err_msg; -} - -bool RNRemoteReadTask::doneOrErrorHappen() const -{ - // All finished - if (ready_segment_tasks.empty()) - return true; - auto iter = ready_segment_tasks.find(SegmentReadTaskState::Error); - if (iter != ready_segment_tasks.end() && !iter->second.empty()) - { - // some tasks run into error when fetching pages - return true; // NOLINT(readability-simplify-boolean-expr) - } - return false; -} - -/// RNRemoteStoreReadTask /// - -RNRemoteStoreReadTask::RNRemoteStoreReadTask( - StoreID store_id_, - std::vector table_read_tasks_) - : store_id(store_id_) - , table_read_tasks(std::move(table_read_tasks_)) -{ - cur_table_task = table_read_tasks.begin(); -} - -size_t RNRemoteStoreReadTask::numRemainTasks() const -{ - std::lock_guard guard(mtx_tasks); - size_t num_segments = 0; - for (const auto & table_task : table_read_tasks) - { - num_segments += table_task->numRemainTasks(); - } - return num_segments; -} - -RNRemoteSegmentReadTaskPtr RNRemoteStoreReadTask::nextTask() -{ - std::lock_guard guard(mtx_tasks); - while (cur_table_task != table_read_tasks.end()) - { - if (auto seg_task = (*cur_table_task)->nextTask(); seg_task != nullptr) - return seg_task; - ++cur_table_task; - } - return {}; -} - -/// RNRemotePhysicalTableReadTask /// - -RNRemotePhysicalTableReadTaskPtr RNRemotePhysicalTableReadTask::buildFrom( - const Context & db_context, - const ScanContextPtr & scan_context, - const StoreID store_id, - const String & address, - const DisaggTaskId & snapshot_id, - const RemotePb::RemotePhysicalTable & remote_table, - const LoggerPtr & log) -{ - // Deserialize from `DisaggregatedPhysicalTable`, this should also - // ensure the local cache pages. - auto table_task = std::make_shared( - store_id, - KeyspaceTableID{remote_table.keyspace_id(), remote_table.table_id()}, - snapshot_id, - address); - - std::vector> futures; - - auto size = static_cast(remote_table.segments().size()); - for (size_t idx = 0; idx < size; ++idx) - { - const auto & remote_seg = remote_table.segments(idx); - - auto task = std::make_shared>([&, idx, size] { - Stopwatch watch; - SCOPE_EXIT({ - LOG_DEBUG(log, "Build RNRemoteSegmentReadTask finished, elapsed={}s task_idx={} task_total={} segment_id={}", watch.elapsedSeconds(), idx, size, remote_seg.segment_id()); - }); - - return RNRemoteSegmentReadTask::buildFrom( - db_context, - scan_context, - remote_seg, - snapshot_id, - table_task->store_id, - table_task->ks_table_id, - table_task->address, - log); - }); - - futures.emplace_back(task->get_future()); - RNRemoteReadTaskPool::get().scheduleOrThrowOnError([task] { (*task)(); }); - } - - for (auto & f : futures) - table_task->tasks.push_back(f.get()); - - return table_task; -} - -RNRemoteSegmentReadTaskPtr RNRemotePhysicalTableReadTask::nextTask() -{ - std::lock_guard gurad(mtx_tasks); - if (tasks.empty()) - return nullptr; - auto task = tasks.front(); - tasks.pop_front(); - return task; -} - -/** - * Remote segment - */ - -Allocator RNRemoteSegmentReadTask::allocator; - -RNRemoteSegmentReadTask::RNRemoteSegmentReadTask( - DisaggTaskId snapshot_id_, - StoreID store_id_, - KeyspaceTableID ks_table_id_, - UInt64 segment_id_, - String address_, - LoggerPtr log_) - : snapshot_id(std::move(snapshot_id_)) - , store_id(store_id_) - , ks_table_id(ks_table_id_) - , segment_id(segment_id_) - , address(std::move(address_)) - , log(std::move(log_)) -{ -} - -RNRemoteSegmentReadTaskPtr RNRemoteSegmentReadTask::buildFrom( - const Context & db_context, - const ScanContextPtr & scan_context, - const RemotePb::RemoteSegment & proto, - const DisaggTaskId & snapshot_id, - StoreID store_id, - KeyspaceTableID ks_table_id, - const String & address, - const LoggerPtr & log) -{ - RowKeyRange segment_range; - { - ReadBufferFromString rb(proto.key_range()); - segment_range = RowKeyRange::deserialize(rb); - } - RowKeyRanges read_ranges(proto.read_key_ranges_size()); - for (int i = 0; i < proto.read_key_ranges_size(); ++i) - { - ReadBufferFromString rb(proto.read_key_ranges(i)); - read_ranges[i] = RowKeyRange::deserialize(rb); - } - - auto task = std::make_shared( - snapshot_id, - store_id, - ks_table_id, - proto.segment_id(), - address, - log); - - task->dm_context = std::make_shared( - db_context, - /* path_pool */ nullptr, - /* storage_pool */ nullptr, - /* min_version */ 0, - ks_table_id.first, - ks_table_id.second, - /* is_common_handle */ segment_range.is_common_handle, - /* rowkey_column_size */ segment_range.rowkey_column_size, - db_context.getSettingsRef(), - scan_context); - - task->segment = std::make_shared( - log, - /*epoch*/ 0, - segment_range, - proto.segment_id(), - /*next_segment_id*/ 0, - nullptr, - nullptr); - task->read_ranges = std::move(read_ranges); - - task->segment_snap = Remote::Serializer::deserializeSegmentSnapshotFrom( - *(task->dm_context), - store_id, - ks_table_id.second, - proto); - - // Note: At this moment, we still cannot read from `task->segment_snap`, - // because they are constructed using ColumnFileDataProviderNop. - - { - auto persisted_cfs = task->segment_snap->delta->getPersistedFileSetSnapshot(); - std::vector persisted_ids; - std::vector persisted_sizes; - persisted_ids.reserve(persisted_cfs->getColumnFileCount()); - persisted_sizes.reserve(persisted_cfs->getColumnFileCount()); - for (const auto & cfs : persisted_cfs->getColumnFiles()) - { - if (auto * tiny = cfs->tryToTinyFile(); tiny) - { - persisted_ids.emplace_back(tiny->getDataPageId()); - persisted_sizes.emplace_back(tiny->getDataPageSize()); - } - } - - task->delta_tinycf_page_ids = persisted_ids; - task->delta_tinycf_page_sizes = persisted_sizes; - - LOG_INFO(log, - "Build RemoteSegmentReadTask, store_id={} keyspace_id={} table_id={} memtable_cfs={} persisted_cfs={}", - task->store_id, - task->ks_table_id.first, - task->ks_table_id.second, - task->segment_snap->delta->getMemTableSetSnapshot()->getColumnFileCount(), - task->segment_snap->delta->getPersistedFileSetSnapshot()->getColumnFileCount()); - } - - return task; -} - -void RNRemoteSegmentReadTask::initColumnFileDataProvider(Remote::RNLocalPageCacheGuardPtr pages_guard) -{ - auto & data_provider = segment_snap->delta->getPersistedFileSetSnapshot()->data_provider; - RUNTIME_CHECK(std::dynamic_pointer_cast(data_provider)); - - auto page_cache = dm_context->db_context.getSharedContextDisagg()->rn_page_cache; - data_provider = std::make_shared( - page_cache, - pages_guard, - store_id, - ks_table_id); -} - -void RNRemoteSegmentReadTask::receivePage(RemotePb::RemotePage && remote_page) -{ - std::lock_guard lock(mtx_queue); - const size_t buf_size = remote_page.data().size(); - - // Use LocalPageCache - auto oid = Remote::PageOID{ - .store_id = store_id, - .ks_table_id = ks_table_id, - .page_id = remote_page.page_id(), - }; - auto read_buffer = std::make_shared(remote_page.data().data(), buf_size); - PageFieldSizes field_sizes; - field_sizes.reserve(remote_page.field_sizes_size()); - for (const auto & field_sz : remote_page.field_sizes()) - { - field_sizes.emplace_back(field_sz); - } - auto & page_cache = dm_context->db_context.getSharedContextDisagg()->rn_page_cache; - page_cache->write(oid, std::move(read_buffer), buf_size, std::move(field_sizes)); - LOG_DEBUG(log, "receive page, oid={} segment_id={}", oid, segment->segmentId()); -} - -bool RNRemoteSegmentReadTask::addConsumedMsg() -{ - num_msg_consumed += 1; - RUNTIME_CHECK( - num_msg_consumed <= num_msg_to_consume, - num_msg_consumed, - num_msg_to_consume, - segment->segmentId()); - - // return there are more pending msg or not - return num_msg_consumed < num_msg_to_consume; -} - -void RNRemoteSegmentReadTask::prepare() -{ - // Do place index for full segment - segment->placeDeltaIndex(*dm_context, segment_snap); -} - -BlockInputStreamPtr RNRemoteSegmentReadTask::getInputStream( - const ColumnDefines & columns_to_read, - const RowKeyRanges & key_ranges, - UInt64 read_tso, - const PushDownFilterPtr & push_down_filter, - size_t expected_block_size, - ReadMode read_mode) -{ - return segment->getInputStream( - read_mode, - *dm_context, - columns_to_read, - segment_snap, - key_ranges, - push_down_filter, - read_tso, - expected_block_size); -} - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h deleted file mode 100644 index 1b546f1fedb..00000000000 --- a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteReadTask.h +++ /dev/null @@ -1,279 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -namespace DB -{ -class Context; -struct FetchPagesRequest; -namespace DM -{ -namespace tests -{ -class RNRemoteReadTaskTest; -} - -enum class SegmentReadTaskState -{ - Init, - Error, - Receiving, - // All data are ready for reading - DataReady, - // The data are ready for reading, doing place index to - // speed up later reading - DataReadyAndPrepraring, - // The data are ready for reading with some preparation done - DataReadyAndPrepared, -}; - -// Represent a read tasks for one disagg task. -// The read node use it as a task pool for RNRemoteSegmentReadTask. -class RNRemoteReadTask -{ -public: - explicit RNRemoteReadTask(std::vector && input_tasks_); - - ~RNRemoteReadTask(); - - size_t numSegments() const; - - // Return a segment task that need to fetch pages from - // write node. - RNRemoteSegmentReadTaskPtr nextFetchTask(); - - // After the fetch pages done for a segment task, the - // worker thread need to update the task state. - // Then the read threads can know the segment is ready - // or there is error happened. - void updateTaskState( - const RNRemoteSegmentReadTaskPtr & seg_task, - SegmentReadTaskState target_state, - bool meet_error); - - void allDataReceive(const String & end_err_msg); - - // Return a segment read task that is ready for some preparation - // to speed up later reading - RNRemoteSegmentReadTaskPtr nextTaskForPrepare(); - - // Return a segment read task that is ready for reading. - RNRemoteSegmentReadTaskPtr nextReadyTask(); - - void wakeAll() { cv_ready_tasks.notify_all(); } - - const String & getErrorMessage() const; - - friend class tests::RNRemoteReadTaskTest; - friend struct DB::FetchPagesRequest; - -private: - void insertTask(const RNRemoteSegmentReadTaskPtr & seg_task, std::unique_lock &); - - bool doneOrErrorHappen() const; - -private: - // The original number of segment tasks - // Only assign when init - size_t num_segments; - - // A task pool for fetching data from write nodes - mutable std::mutex mtx_tasks; - std::unordered_map store_tasks; - std::unordered_map::iterator curr_store; - - // A task pool for segment tasks - // The tasks are sorted by the ready state of segment tasks - mutable std::mutex mtx_ready_tasks; - std::condition_variable cv_ready_tasks; - String err_msg; - std::map> ready_segment_tasks; - - LoggerPtr log; -}; - -// Represent a read tasks from one write node -class RNRemoteStoreReadTask -{ -public: - RNRemoteStoreReadTask( - StoreID store_id_, - std::vector table_read_tasks_); - - size_t numRemainTasks() const; - RNRemoteSegmentReadTaskPtr nextTask(); - - friend class RNRemoteReadTask; - -private: - const StoreID store_id; - mutable std::mutex mtx_tasks; - const std::vector table_read_tasks; - std::vector::const_iterator cur_table_task; -}; - -class RNRemotePhysicalTableReadTask -{ -public: - RNRemotePhysicalTableReadTask( - StoreID store_id_, - KeyspaceTableID ks_table_id_, - DisaggTaskId snap_id_, - const String & address_) - : store_id(store_id_) - , ks_table_id(ks_table_id_) - , snapshot_id(std::move(snap_id_)) - , address(address_) - {} - - static RNRemotePhysicalTableReadTaskPtr buildFrom( - const Context & db_context, - const ScanContextPtr & scan_context, - StoreID store_id, - const String & address, - const DisaggTaskId & snapshot_id, - const RemotePb::RemotePhysicalTable & table, - const LoggerPtr & log); - - size_t numRemainTasks() const - { - std::lock_guard guard(mtx_tasks); - return tasks.size(); - } - - RNRemoteSegmentReadTaskPtr nextTask(); - - friend class tests::RNRemoteReadTaskTest; - friend class RNRemoteReadTask; - -private: - const StoreID store_id; - const KeyspaceTableID ks_table_id; - const DisaggTaskId snapshot_id; - const String address; - - mutable std::mutex mtx_tasks; - // The remote segment tasks - std::list tasks; -}; - -class RNRemoteSegmentReadTask -{ -public: - static RNRemoteSegmentReadTaskPtr buildFrom( - const Context & db_context, - const ScanContextPtr & scan_context, - const RemotePb::RemoteSegment & proto, - const DisaggTaskId & snapshot_id, - StoreID store_id, - KeyspaceTableID ks_table_id, - const String & address, - const LoggerPtr & log); - - RowKeyRanges getReadRanges() const { return read_ranges; } - - BlockInputStreamPtr getInputStream( - const ColumnDefines & columns_to_read, - const RowKeyRanges & key_ranges, - UInt64 read_tso, - const PushDownFilterPtr & push_down_filter, - size_t expected_block_size, - ReadMode read_mode); - - void addPendingMsg() { num_msg_to_consume += 1; } - - /// Returns true if there are more pending messages. - bool addConsumedMsg(); - - void initColumnFileDataProvider(Remote::RNLocalPageCacheGuardPtr pages_guard); - - void receivePage(RemotePb::RemotePage && remote_page); - - void receiveMemTable(Block && block) - { - // Keep the block in memory for reading (multiple times) - std::lock_guard lock(mtx_queue); - mem_table_blocks.push(std::move(block)); - } - - void prepare(); - - friend class tests::RNRemoteReadTaskTest; - friend struct DB::FetchPagesRequest; - friend class RNRemoteReadTask; - - // Only used by buildFrom - RNRemoteSegmentReadTask( - DisaggTaskId snapshot_id_, - StoreID store_id_, - KeyspaceTableID ks_table_id_, - UInt64 segment_id_, - String address_, - LoggerPtr log_); - -public: - SegmentReadTaskState state = SegmentReadTaskState::Init; - const DisaggTaskId snapshot_id; - const StoreID store_id; - const KeyspaceTableID ks_table_id; - const UInt64 segment_id; - const String address; - -private: - std::vector delta_tinycf_page_ids; - std::vector delta_tinycf_page_sizes; - std::vector stable_files; - - DMContextPtr dm_context; - SegmentPtr segment; - RowKeyRanges read_ranges; - SegmentSnapshotPtr segment_snap; - -public: - std::atomic num_msg_to_consume{0}; - std::atomic num_msg_consumed{0}; - -private: - std::mutex mtx_queue; - - // A temporary queue for storing the blocks - // from remote mem-table - std::queue mem_table_blocks; - - static Allocator allocator; - - LoggerPtr log; -}; - -} // namespace DM -} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.cpp new file mode 100644 index 00000000000..d1e24beec91 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.cpp @@ -0,0 +1,103 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include + +namespace DB::DM::Remote +{ + +RNSegmentInputStream::~RNSegmentInputStream() +{ + LOG_INFO( + log, + "Finished reading remote segments, rows={} read_segments={} total_wait_ready_task={:.3f}s total_read={:.3f}s", + action.totalRows(), + processed_seg_tasks, + duration_wait_ready_task_sec, + duration_read_sec); + + // This metric is per-stream. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_stream_wait_next_task).Observe(duration_wait_ready_task_sec); + // This metric is per-stream. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_stream_read).Observe(duration_read_sec); +} + +Block RNSegmentInputStream::readImpl(FilterPtr & res_filter, bool return_filter) +{ + if (done) + return {}; + + workers->startInBackground(); + + while (true) + { + if (!current_seg_task) + { + Stopwatch w{CLOCK_MONOTONIC_COARSE}; + auto pop_result = workers->getReadyChannel()->pop(current_seg_task); + duration_wait_ready_task_sec += w.elapsedSeconds(); + + if (pop_result == MPMCQueueResult::OK) + { + processed_seg_tasks += 1; + RUNTIME_CHECK(current_seg_task != nullptr); + } + else if (pop_result == MPMCQueueResult::FINISHED) + { + current_seg_task = nullptr; + done = true; + return {}; + } + else if (pop_result == MPMCQueueResult::CANCELLED) + { + current_seg_task = nullptr; + throw Exception(workers->getReadyChannel()->getCancelReason()); + } + else + { + current_seg_task = nullptr; + RUNTIME_CHECK_MSG(false, "Unexpected pop result {}", magic_enum::enum_name(pop_result)); + } + } + + Stopwatch w{CLOCK_MONOTONIC_COARSE}; + Block res = current_seg_task->getInputStream()->read(res_filter, return_filter); + duration_read_sec += w.elapsedSeconds(); + + if (!res) + { + // Current stream is drained, try read from next stream. + current_seg_task = nullptr; + continue; + } + + if (res.rows() == 0) + { + continue; + } + else + { + action.transform(res, current_seg_task->meta.physical_table_id); + return res; + } + } +} + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h b/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h new file mode 100644 index 00000000000..02fb77c08b0 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h @@ -0,0 +1,81 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB::DM::Remote +{ + +class RNSegmentInputStream : public IProfilingBlockInputStream +{ + static constexpr auto NAME = "RNSegment"; + +public: + ~RNSegmentInputStream() override; + + String getName() const override { return NAME; } + + Block getHeader() const override { return action.getHeader(); } + +protected: + Block readImpl() override + { + FilterPtr filter_ignored; + return readImpl(filter_ignored, false); + } + + Block readImpl(FilterPtr & res_filter, bool return_filter) override; + +public: + struct Options + { + std::string_view debug_tag; + const RNWorkersPtr & workers; + const ColumnDefines & columns_to_read; + int extra_table_id_index; + }; + + explicit RNSegmentInputStream(const Options & options) + : log(Logger::get(options.debug_tag)) + , workers(options.workers) + , action(options.columns_to_read, options.extra_table_id_index) + {} + + static BlockInputStreamPtr create(const Options & options) + { + return std::make_shared(options); + } + +private: + const LoggerPtr log; + const RNWorkersPtr workers; + AddExtraTableIDColumnTransformAction action; + + RNReadSegmentTaskPtr current_seg_task = nullptr; + bool done = false; + size_t processed_seg_tasks = 0; + + double duration_wait_ready_task_sec = 0; + double duration_read_sec = 0; +}; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp new file mode 100644 index 00000000000..0c4ecb9ca19 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp @@ -0,0 +1,276 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +namespace pingcap::kv +{ + +template <> +struct RpcTypeTraits +{ + using RequestType = disaggregated::FetchDisaggPagesRequest; + using ResultType = disaggregated::PagesPacket; + static std::unique_ptr> doRPCCall( + grpc::ClientContext * context, + std::shared_ptr client, + const RequestType & req) + { + return client->stub->FetchDisaggPages(context, req); + } + static std::unique_ptr> doAsyncRPCCall( + grpc::ClientContext * context, + std::shared_ptr client, + const RequestType & req, + grpc::CompletionQueue & cq, + void * call) + { + return client->stub->AsyncFetchDisaggPages(context, req, &cq, call); + } +}; + +} // namespace pingcap::kv + +namespace DB::DM::Remote +{ + +RNLocalPageCache::OccupySpaceResult blockingOccupySpaceForTask(const RNReadSegmentTaskPtr & seg_task) +{ + std::vector cf_tiny_oids; + { + cf_tiny_oids.reserve(seg_task->meta.delta_tinycf_page_ids.size()); + for (const auto & page_id : seg_task->meta.delta_tinycf_page_ids) + { + auto page_oid = PageOID{ + .store_id = seg_task->meta.store_id, + .ks_table_id = {seg_task->meta.keyspace_id, seg_task->meta.physical_table_id}, + .page_id = page_id, + }; + cf_tiny_oids.emplace_back(page_oid); + } + } + + // Note: We must occupySpace segment by segment, because we need to read + // at least the complete data of one segment in order to drive everything forward. + // Currently we call occupySpace for each FetchPagesRequest, which is fine, + // because we send one request each seg_task. If we want to split + // FetchPagesRequest into multiples in future, then we need to change + // the moment of calling `occupySpace`. + auto page_cache = seg_task->meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + + Stopwatch w_occupy; + auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->meta.delta_tinycf_page_sizes); + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy).Observe(w_occupy.elapsedSeconds()); + + return occupy_result; +} + +std::shared_ptr buildFetchPagesRequest( + const RNReadSegmentTaskPtr & seg_task, + const std::vector & pages_not_in_cache) +{ + auto req = std::make_shared(); + auto meta = seg_task->meta.snapshot_id.toMeta(); + // The keyspace_id here is not vital, as we locate the table and segment by given + // snapshot_id. But it could be helpful for debugging. + auto keyspace_id = seg_task->meta.keyspace_id; + meta.set_keyspace_id(keyspace_id); + meta.set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); + *req->mutable_snapshot_id() = meta; + req->set_table_id(seg_task->meta.physical_table_id); + req->set_segment_id(seg_task->meta.segment_id); + + for (auto page_id : pages_not_in_cache) + req->add_page_ids(page_id.page_id); + + return req; +} + +RNReadSegmentTaskPtr RNWorkerFetchPages::doWork(const RNReadSegmentTaskPtr & seg_task) +{ + Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; + SCOPE_EXIT({ + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_fetch_page).Observe(watch_work.elapsedSeconds()); + }); + + auto occupy_result = blockingOccupySpaceForTask(seg_task); + auto req = buildFetchPagesRequest(seg_task, occupy_result.pages_not_in_cache); + { + auto cftiny_total = seg_task->meta.delta_tinycf_page_ids.size(); + auto cftiny_fetch = occupy_result.pages_not_in_cache.size(); + LOG_DEBUG( + log, + "Ready to fetch pages, seg_task={} page_hit_rate={} pages_not_in_cache={}", + seg_task->info(), + cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total), + occupy_result.pages_not_in_cache); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch); + } + + const size_t max_retry_times = 3; + std::exception_ptr last_exception; + + // TODO: Maybe don't need to re-fetch all pages when retry. + for (size_t i = 0; i < max_retry_times; ++i) + { + try + { + doFetchPages(seg_task, req); + seg_task->initColumnFileDataProvider(occupy_result.pages_guard); + + // We finished fetch all pages for this seg task, just return it for downstream + // workers. If we have met any errors, page guard will not be persisted. + return seg_task; + } + catch (const pingcap::Exception & e) + { + last_exception = std::current_exception(); + LOG_WARNING( + log, + "Meet RPC client exception when fetching pages: {}, will be retried. seg_task={}", + e.displayText(), + seg_task->info()); + std::this_thread::sleep_for(1s); + } + } + + // Still failed after retry... + RUNTIME_CHECK(last_exception); + std::rethrow_exception(last_exception); +} + +void RNWorkerFetchPages::doFetchPages( + const RNReadSegmentTaskPtr & seg_task, + std::shared_ptr request) +{ + // No page need to be fetched. + if (request->page_ids_size() == 0) + return; + + Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE}; + bool rpc_is_observed = false; + double total_write_page_cache_sec = 0.0; + + grpc::ClientContext client_context; + auto rpc_call = std::make_shared>(request); + auto stream_resp = cluster->rpc_client->sendStreamRequest( + seg_task->meta.store_address, + &client_context, + *rpc_call); + + SCOPE_EXIT({ + // TODO: Not sure whether we really need this. Maybe RAII is already there? + stream_resp->Finish(); + }); + + // Used to verify all pages are fetched. + std::set remaining_pages_to_fetch; + for (auto p : request->page_ids()) + remaining_pages_to_fetch.insert(p); + + // Keep reading packets. + while (true) + { + auto packet = std::make_shared(); + if (bool more = stream_resp->Read(packet.get()); !more) + break; + + if (!rpc_is_observed) + { + // Count RPC time as sending request + receive first response packet. + rpc_is_observed = true; + // This metric is per-segment, because we only count once for each task. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_rpc_fetch_page).Observe(watch_rpc.elapsedSeconds()); + } + + if (packet->has_error()) + { + throw Exception(fmt::format("{} (from {})", packet->error().msg(), seg_task->info())); + } + + Stopwatch watch_write_page_cache{CLOCK_MONOTONIC_COARSE}; + SCOPE_EXIT({ + total_write_page_cache_sec += watch_write_page_cache.elapsedSeconds(); + }); + + std::vector received_page_ids; + for (const String & page : packet->pages()) + { + DM::RemotePb::RemotePage remote_page; + bool parsed = remote_page.ParseFromString(page); + RUNTIME_CHECK_MSG(parsed, "Failed to parse page data (from {})", seg_task->info()); + + RUNTIME_CHECK( + remaining_pages_to_fetch.contains(remote_page.page_id()), + remaining_pages_to_fetch, + remote_page.page_id()); + + received_page_ids.emplace_back(remote_page.page_id()); + remaining_pages_to_fetch.erase(remote_page.page_id()); + + const size_t buf_size = remote_page.data().size(); + + // Write page into LocalPageCache. Note that the page must be occupied. + auto oid = Remote::PageOID{ + .store_id = seg_task->meta.store_id, + .ks_table_id = {seg_task->meta.keyspace_id, seg_task->meta.physical_table_id}, + .page_id = remote_page.page_id(), + }; + auto read_buffer = std::make_shared(remote_page.data().data(), buf_size); + PageFieldSizes field_sizes; + field_sizes.reserve(remote_page.field_sizes_size()); + for (const auto & field_sz : remote_page.field_sizes()) + { + field_sizes.emplace_back(field_sz); + } + auto & page_cache = seg_task->meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + page_cache->write(oid, std::move(read_buffer), buf_size, std::move(field_sizes)); + } + + LOG_DEBUG( + log, + "Cached pages data from write node, seg_task={} received_pages_id={}", + seg_task->info(), + received_page_ids); + } + + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_write_page_cache).Observe(total_write_page_cache_sec); + + // Verify all pending pages are now received. + RUNTIME_CHECK_MSG( + remaining_pages_to_fetch.empty(), + "Failed to fetch all pages (from {}), remaining_pages_to_fetch={}", + seg_task->info(), + remaining_pages_to_fetch); + + LOG_DEBUG(log, "Finished fetch pages, seg_task={}", seg_task->info()); +} + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h new file mode 100644 index 00000000000..1ba06257ffe --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.h @@ -0,0 +1,74 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include + +namespace DB::DM::Remote +{ + +class RNWorkerFetchPages; +using RNWorkerFetchPagesPtr = std::shared_ptr; + +/// This worker fetch page data from Write Node, and then write page data into the local cache. +class RNWorkerFetchPages + : private boost::noncopyable + , public ThreadedWorker +{ +protected: + RNReadSegmentTaskPtr doWork(const RNReadSegmentTaskPtr & task) override; + + String getName() const noexcept override { return "FetchPages"; } + +private: + void doFetchPages( + const RNReadSegmentTaskPtr & seg_task, + std::shared_ptr request); + +private: + const pingcap::kv::Cluster * cluster; + +public: + struct Options + { + const std::shared_ptr> & source_queue; + const std::shared_ptr> & result_queue; + const LoggerPtr & log; + const size_t concurrency; + const pingcap::kv::Cluster * cluster; + }; + + explicit RNWorkerFetchPages(const Options & options) + : ThreadedWorker( + options.source_queue, + options.result_queue, + options.log, + options.concurrency) + , cluster(options.cluster) + {} + + static RNWorkerFetchPagesPtr create(const Options & options) + { + return std::make_shared(options); + } + + ~RNWorkerFetchPages() override { wait(); } +}; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.cpp new file mode 100644 index 00000000000..8401528f6f5 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.cpp @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB::DM::Remote +{ + +RNReadSegmentTaskPtr RNWorkerPrepareStreams::doWork(const RNReadSegmentTaskPtr & task) +{ + Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; + SCOPE_EXIT({ + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_prepare_stream).Observe(watch_work.elapsedSeconds()); + }); + + task->initInputStream( + *columns_to_read, + read_tso, + push_down_filter, + read_mode); + + return task; +} + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h new file mode 100644 index 00000000000..f3883307ae6 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h @@ -0,0 +1,82 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB::DM::Remote +{ + +class RNWorkerPrepareStreams; +using RNWorkerPrepareStreamsPtr = std::shared_ptr; + +/// This worker prepare data streams for reading. +/// For example, when S3 files of the stable layer does not exist locally, +/// they will be downloaded. +class RNWorkerPrepareStreams + : private boost::noncopyable + , public ThreadedWorker +{ +protected: + RNReadSegmentTaskPtr doWork(const RNReadSegmentTaskPtr & task) override; + + String getName() const noexcept override { return "PrepareStreams"; } + +public: + const ColumnDefinesPtr columns_to_read; + const UInt64 read_tso; + const PushDownFilterPtr push_down_filter; + const ReadMode read_mode; + +public: + struct Options + { + const std::shared_ptr> & source_queue; + const std::shared_ptr> & result_queue; + const LoggerPtr & log; + const size_t concurrency; + const ColumnDefinesPtr & columns_to_read; + const UInt64 read_tso; + const PushDownFilterPtr & push_down_filter; + const ReadMode read_mode; + }; + + static RNWorkerPrepareStreamsPtr create(const Options & options) + { + return std::make_shared(options); + } + + explicit RNWorkerPrepareStreams(const Options & options) + : ThreadedWorker( + options.source_queue, + options.result_queue, + options.log, + options.concurrency) + , columns_to_read(options.columns_to_read) + , read_tso(options.read_tso) + , push_down_filter(options.push_down_filter) + , read_mode(options.read_mode) + {} + + ~RNWorkerPrepareStreams() override { wait(); } +}; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp new file mode 100644 index 00000000000..70e95112296 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp @@ -0,0 +1,70 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB::DM::Remote +{ + +RNWorkers::RNWorkers(const Options & options) +{ + size_t n = options.read_task->segment_read_tasks.size(); + RUNTIME_CHECK(n > 0, n); + + worker_fetch_pages = RNWorkerFetchPages::create({ + .source_queue = std::make_shared(n), + .result_queue = std::make_shared(n), + .log = options.log, + .concurrency = n, + .cluster = options.cluster, + }); + + worker_prepare_streams = RNWorkerPrepareStreams::create({ + .source_queue = worker_fetch_pages->result_queue, + .result_queue = std::make_shared(n), + .log = options.log, + .concurrency = n, + .columns_to_read = options.columns_to_read, + .read_tso = options.read_tso, + .push_down_filter = options.push_down_filter, + .read_mode = options.read_mode, + }); + + // TODO: Can we push the task that all delta/stable data hit local cache first? + for (auto const & seg_task : options.read_task->segment_read_tasks) + { + auto push_result = worker_fetch_pages->source_queue->tryPush(seg_task); + RUNTIME_CHECK(push_result == MPMCQueueResult::OK, magic_enum::enum_name(push_result)); + } + worker_fetch_pages->source_queue->finish(); +} + +void RNWorkers::startInBackground() +{ + worker_fetch_pages->startInBackground(); + worker_prepare_streams->startInBackground(); +} + +void RNWorkers::wait() +{ + worker_fetch_pages->wait(); + worker_prepare_streams->wait(); +} + +RNWorkers::ChannelPtr RNWorkers::getReadyChannel() const +{ + return worker_prepare_streams->result_queue; +} +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h new file mode 100644 index 00000000000..6c8a7c4b6fa --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h @@ -0,0 +1,72 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace DB::DM::Remote +{ + +class RNWorkers + : private boost::noncopyable +{ +public: + using Channel = MPMCQueue; + using ChannelPtr = std::shared_ptr; + +public: + /// Get the channel which outputs ready-for-read segment tasks. + ChannelPtr getReadyChannel() const; + + void startInBackground(); + + void wait(); + + ~RNWorkers() + { + wait(); + } + +public: + struct Options + { + const LoggerPtr log; + const RNReadTaskPtr & read_task; + const ColumnDefinesPtr & columns_to_read; + const UInt64 read_tso; + const PushDownFilterPtr & push_down_filter; + const ReadMode read_mode; + const pingcap::kv::Cluster * cluster; + }; + + explicit RNWorkers(const Options & options); + + static RNWorkersPtr create(const Options & options) + { + return std::make_shared(options); + } + +private: + RNWorkerFetchPagesPtr worker_fetch_pages; + RNWorkerPrepareStreamsPtr worker_prepare_streams; +}; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers_fwd.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers_fwd.h new file mode 100644 index 00000000000..49888c04823 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers_fwd.h @@ -0,0 +1,25 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM::Remote +{ + +class RNWorkers; +using RNWorkersPtr = std::shared_ptr; + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/tests/gtest_rn_remote_read_task.cpp b/dbms/src/Storages/DeltaMerge/Remote/tests/gtest_rn_remote_read_task.cpp deleted file mode 100644 index f4ed532c6e0..00000000000 --- a/dbms/src/Storages/DeltaMerge/Remote/tests/gtest_rn_remote_read_task.cpp +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -namespace DB::DM::tests -{ -using namespace DB::tests; - -class RNRemoteReadTaskTest : public ::testing::Test -{ -public: - void SetUp() override - { - log = Logger::get(); - } - - RNRemotePhysicalTableReadTaskPtr buildTableTestTask( - StoreID store_id, - KeyspaceTableID ks_table_id, - const std::vector & seg_ids) - { - DisaggTaskId snapshot_id; - String address; - auto physical_table_task = std::make_shared(store_id, ks_table_id, snapshot_id, address); - for (const auto & seg_id : seg_ids) - { - auto seg_task = std::make_shared(snapshot_id, store_id, ks_table_id, seg_id, address, log); - physical_table_task->tasks.emplace_back(std::move(seg_task)); - } - return physical_table_task; - } - - RNRemoteReadTaskPtr buildTestTask() - { - std::vector store_tasks{ - std::make_shared( - 111, - std::vector{ - // partition 0 - buildTableTestTask(111, TEST_KS_TABLE_PART0_ID, {2, 5, 100}), - // partition 1 - buildTableTestTask(111, TEST_KS_TABLE_PART1_ID, {1}), - }), - std::make_shared( - 222, - std::vector{ - // partition 0 - buildTableTestTask(222, TEST_KS_TABLE_PART0_ID, {202, 205, 300}), - }), - std::make_shared( - 333, - std::vector{ - // partition 0 - buildTableTestTask(333, TEST_KS_TABLE_PART0_ID, {400, 401, 402, 403}), - }), - }; - return std::make_shared(std::move(store_tasks)); - } - -protected: - static constexpr KeyspaceTableID TEST_KS_TABLE_PART0_ID = {NullspaceID, 100}; - static constexpr KeyspaceTableID TEST_KS_TABLE_PART1_ID = {NullspaceID, 101}; - LoggerPtr log; -}; - -TEST_F(RNRemoteReadTaskTest, popTasksWithoutPreparation) -{ - auto read_task = buildTestTask(); - const auto num_segments = read_task->numSegments(); - ASSERT_EQ(num_segments, 3 + 1 + 3 + 4); - - for (size_t i = 0; i < num_segments; ++i) - { - auto seg_task = read_task->nextFetchTask(); - - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReady, false); // mock fetch done - auto ready_seg_task = read_task->nextReadyTask(); - ASSERT_EQ(ready_seg_task->state, SegmentReadTaskState::DataReady) << magic_enum::enum_name(ready_seg_task->state); - ASSERT_EQ(seg_task->segment_id, ready_seg_task->segment_id); - ASSERT_EQ(seg_task->store_id, ready_seg_task->store_id); - ASSERT_EQ(seg_task->ks_table_id, ready_seg_task->ks_table_id); - } - - ASSERT_EQ(read_task->nextFetchTask(), nullptr); - ASSERT_EQ(read_task->nextReadyTask(), nullptr); -} - -TEST_F(RNRemoteReadTaskTest, popPrepareTasks) -{ - auto read_task = buildTestTask(); - const auto num_segments = read_task->numSegments(); - ASSERT_EQ(num_segments, 3 + 1 + 3 + 4); - - for (size_t i = 0; i < num_segments; ++i) - { - auto seg_task = read_task->nextFetchTask(); - - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReady, false); // mock fetch done - auto prepare_seg_task = read_task->nextTaskForPrepare(); - ASSERT_EQ(prepare_seg_task->state, SegmentReadTaskState::DataReadyAndPrepraring) << magic_enum::enum_name(prepare_seg_task->state); - ASSERT_EQ(seg_task->segment_id, prepare_seg_task->segment_id); - ASSERT_EQ(seg_task->store_id, prepare_seg_task->store_id); - ASSERT_EQ(seg_task->ks_table_id, prepare_seg_task->ks_table_id); - read_task->updateTaskState(prepare_seg_task, SegmentReadTaskState::DataReadyAndPrepared, false); - } - - // there is no more task for prepare, return nullptr quickly - ASSERT_EQ(read_task->nextTaskForPrepare(), nullptr); - ASSERT_EQ(read_task->nextFetchTask(), nullptr); - - for (size_t i = 0; i < num_segments; ++i) - { - auto ready_task = read_task->nextReadyTask(); - ASSERT_EQ(ready_task->state, SegmentReadTaskState::DataReadyAndPrepared) << magic_enum::enum_name(ready_task->state); - } - ASSERT_EQ(read_task->nextReadyTask(), nullptr); -} - -TEST_F(RNRemoteReadTaskTest, popTasksWithAllPrepared) -{ - auto read_task = buildTestTask(); - const auto num_segments = read_task->numSegments(); - ASSERT_EQ(num_segments, 3 + 1 + 3 + 4); - - for (size_t i = 0; i < num_segments; ++i) - { - auto seg_task = read_task->nextFetchTask(); - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReady, false); // mock fetch done - { - auto prepare_seg_task = read_task->nextTaskForPrepare(); - ASSERT_EQ(prepare_seg_task->state, SegmentReadTaskState::DataReadyAndPrepraring) << magic_enum::enum_name(prepare_seg_task->state); - ASSERT_EQ(seg_task->segment_id, prepare_seg_task->segment_id); - ASSERT_EQ(seg_task->store_id, prepare_seg_task->store_id); - ASSERT_EQ(seg_task->ks_table_id, prepare_seg_task->ks_table_id); - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReadyAndPrepared, false); // mock prepare done - } - { - auto ready_seg_task = read_task->nextReadyTask(); - ASSERT_EQ(ready_seg_task->state, SegmentReadTaskState::DataReadyAndPrepared) << magic_enum::enum_name(ready_seg_task->state); - ASSERT_EQ(seg_task->segment_id, ready_seg_task->segment_id); - ASSERT_EQ(seg_task->store_id, ready_seg_task->store_id); - ASSERT_EQ(seg_task->ks_table_id, ready_seg_task->ks_table_id); - } - } - - ASSERT_EQ(read_task->nextFetchTask(), nullptr); - ASSERT_EQ(read_task->nextTaskForPrepare(), nullptr); - ASSERT_EQ(read_task->nextReadyTask(), nullptr); -} - -TEST_F(RNRemoteReadTaskTest, popTasksWithSomePrepared) -{ - auto read_task = buildTestTask(); - const auto num_segments = read_task->numSegments(); - ASSERT_EQ(num_segments, 3 + 1 + 3 + 4); - - for (size_t i = 0; i < num_segments; ++i) - { - auto seg_task = read_task->nextFetchTask(); - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReady, false); // mock fetch done - bool do_prepare = i % 2 == 0; - if (do_prepare) - { - auto prepare_seg_task = read_task->nextTaskForPrepare(); - ASSERT_EQ(prepare_seg_task->state, SegmentReadTaskState::DataReadyAndPrepraring) << magic_enum::enum_name(prepare_seg_task->state); - ASSERT_EQ(seg_task->segment_id, prepare_seg_task->segment_id); - ASSERT_EQ(seg_task->store_id, prepare_seg_task->store_id); - ASSERT_EQ(seg_task->ks_table_id, prepare_seg_task->ks_table_id); - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReadyAndPrepared, false); // mock prepare done - } - { - auto ready_seg_task = read_task->nextReadyTask(); - ASSERT_EQ(ready_seg_task->state, do_prepare ? SegmentReadTaskState::DataReadyAndPrepared : SegmentReadTaskState::DataReady) << magic_enum::enum_name(ready_seg_task->state); - ASSERT_EQ(seg_task->segment_id, ready_seg_task->segment_id); - ASSERT_EQ(seg_task->store_id, ready_seg_task->store_id); - ASSERT_EQ(seg_task->ks_table_id, ready_seg_task->ks_table_id); - } - } - - ASSERT_EQ(read_task->nextFetchTask(), nullptr); - ASSERT_EQ(read_task->nextTaskForPrepare(), nullptr); - ASSERT_EQ(read_task->nextReadyTask(), nullptr); -} - -TEST_F(RNRemoteReadTaskTest, failTask) -{ - auto read_task = buildTestTask(); - const auto num_segments = read_task->numSegments(); - ASSERT_EQ(num_segments, 3 + 1 + 3 + 4); - - assert(num_segments >= 4); - for (size_t i = 0; i < 3; ++i) - { - auto seg_task = read_task->nextFetchTask(); - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReady, false); - auto ready_seg_task = read_task->nextReadyTask(); - ASSERT_EQ(ready_seg_task->state, SegmentReadTaskState::DataReady) << magic_enum::enum_name(ready_seg_task->state); - ASSERT_EQ(seg_task->segment_id, ready_seg_task->segment_id); - ASSERT_EQ(seg_task->store_id, ready_seg_task->store_id); - ASSERT_EQ(seg_task->ks_table_id, ready_seg_task->ks_table_id); - } - // mock meet error for this segment task - auto seg_task = read_task->nextFetchTask(); - read_task->updateTaskState(seg_task, SegmentReadTaskState::DataReady, /*meet_error*/ true); - // cancel all - ASSERT_EQ(read_task->nextReadyTask(), nullptr); - ASSERT_EQ(read_task->nextTaskForPrepare(), nullptr); -} - - -} // namespace DB::DM::tests diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index c037b958295..a35ffd01ae4 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -575,6 +575,8 @@ void FileCache::restoreWriteNode(const std::filesystem::directory_entry & write_ { RUNTIME_CHECK_MSG(write_node_entry.is_directory(), "{} is not a directory", write_node_entry.path()); auto write_node_data_path = write_node_entry.path() / "data"; + if (!std::filesystem::exists(write_node_data_path)) + return; for (const auto & table_entry : std::filesystem::directory_iterator(write_node_data_path)) { restoreTable(table_entry); diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index c3c1c487a04..38f50f5b253 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -20,7 +20,8 @@ #include #include #include -#include +#include +#include #include #pragma GCC diagnostic push @@ -88,20 +89,35 @@ class StorageDisaggregated : public IStorage const Context & db_context, const SelectQueryInfo & query_info, unsigned num_streams); +<<<<<<< HEAD /// helper functions for building the task fetch all data from write node through MPP exchange sender/receiver BlockInputStreams readThroughExchange(unsigned num_streams); DM::RNRemoteReadTaskPtr buildDisaggTasks( +======= + + DM::Remote::RNReadTaskPtr buildReadTask( +>>>>>>> edf82d2a63 (storage: Refactor disaggregated read flow (#7530)) const Context & db_context, - const DM::ScanContextPtr & scan_context, - const std::vector & batch_cop_tasks); - void buildDisaggTask( + const DM::ScanContextPtr & scan_context); + + void buildReadTaskForWriteNode( const Context & db_context, const DM::ScanContextPtr & scan_context, const pingcap::coprocessor::BatchCopTask & batch_cop_task, - std::vector & store_read_tasks, - std::mutex & store_read_tasks_lock); - std::shared_ptr - buildDisaggTaskForNode( + std::mutex & output_lock, + std::vector & output_seg_tasks); + + void buildReadTaskForWriteNodeTable( + const Context & db_context, + const DM::ScanContextPtr & scan_context, + const DM::DisaggTaskId & snapshot_id, + StoreID store_id, + const String & store_address, + const String & serialized_physical_table, + std::mutex & output_lock, + std::vector & output_seg_tasks); + + std::shared_ptr buildEstablishDisaggTaskReq( const Context & db_context, const pingcap::coprocessor::BatchCopTask & batch_cop_task); DM::RSOperatorPtr buildRSOperator( @@ -109,8 +125,8 @@ class StorageDisaggregated : public IStorage const DM::ColumnDefinesPtr & columns_to_read); void buildRemoteSegmentInputStreams( const Context & db_context, - const DM::RNRemoteReadTaskPtr & remote_read_tasks, - const SelectQueryInfo & query_info, + const DM::Remote::RNReadTaskPtr & read_task, + const SelectQueryInfo &, size_t num_streams, DAGPipeline & pipeline); diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 17a481e6ba7..be544f8f92c 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -29,9 +29,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -40,8 +37,9 @@ #include #include #include -#include -#include +#include +#include +#include #include #include #include @@ -101,10 +99,11 @@ BlockInputStreams StorageDisaggregated::readThroughS3( auto scan_context = std::make_shared(); context.getDAGContext()->scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; - DM::RNRemoteReadTaskPtr remote_read_tasks; + DM::Remote::RNReadTaskPtr read_task; double total_backoff_seconds = 0.0; SCOPE_EXIT({ + // This metric is per-read. GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_total_establish_backoff).Observe(total_backoff_seconds); }); @@ -115,18 +114,8 @@ BlockInputStreams StorageDisaggregated::readThroughS3( try { - auto remote_table_ranges = buildRemoteTableRanges(); - // only send to tiflash node with label [{"engine":"tiflash"}, {"engine-role":"write"}] - auto label_filter = pingcap::kv::labelFilterOnlyTiFlashWriteNode; - auto batch_cop_tasks = buildBatchCopTasks(remote_table_ranges, label_filter); - RUNTIME_CHECK(!batch_cop_tasks.empty()); - // Fetch the remote segment read tasks from write nodes - remote_read_tasks = buildDisaggTasks( - db_context, - scan_context, - batch_cop_tasks); - + read_task = buildReadTask(db_context, scan_context); break; } catch (DB::Exception & e) @@ -146,7 +135,12 @@ BlockInputStreams StorageDisaggregated::readThroughS3( // Build InputStream according to the remote segment read tasks DAGPipeline pipeline; - buildRemoteSegmentInputStreams(db_context, remote_read_tasks, query_info, num_streams, pipeline); + buildRemoteSegmentInputStreams( + db_context, + read_task, + query_info, + num_streams, + pipeline); NamesAndTypes source_columns; source_columns.reserve(table_scan.getColumnSize()); @@ -164,54 +158,65 @@ BlockInputStreams StorageDisaggregated::readThroughS3( return pipeline.streams; } - -DM::RNRemoteReadTaskPtr StorageDisaggregated::buildDisaggTasks( +DM::Remote::RNReadTaskPtr StorageDisaggregated::buildReadTask( const Context & db_context, - const DM::ScanContextPtr & scan_context, - const std::vector & batch_cop_tasks) + const DM::ScanContextPtr & scan_context) { - size_t tasks_n = batch_cop_tasks.size(); + std::vector batch_cop_tasks; - std::mutex store_read_tasks_lock; - std::vector store_read_tasks; - store_read_tasks.reserve(tasks_n); + // First split the read task for different write nodes. + // For each write node, a BatchCopTask is built. + { + auto remote_table_ranges = buildRemoteTableRanges(); + // only send to tiflash node with label [{"engine":"tiflash"}, {"engine-role":"write"}] + auto label_filter = pingcap::kv::labelFilterOnlyTiFlashWriteNode; + batch_cop_tasks = buildBatchCopTasks(remote_table_ranges, label_filter); + RUNTIME_CHECK(!batch_cop_tasks.empty()); + } - auto thread_manager = newThreadManager(); - const auto & executor_id = table_scan.getTableScanExecutorID(); - const DM::DisaggTaskId task_id(context.getDAGContext()->getMPPTaskId(), executor_id); + std::mutex output_lock; + std::vector output_seg_tasks; + // Then, for each BatchCopTask, let's build read tasks concurrently. + auto thread_manager = newThreadManager(); for (const auto & cop_task : batch_cop_tasks) { thread_manager->schedule( true, - "BuildDisaggTask", + "buildReadTaskForWriteNode", [&] { - buildDisaggTask( + buildReadTaskForWriteNode( db_context, scan_context, cop_task, - store_read_tasks, - store_read_tasks_lock); + output_lock, + output_seg_tasks); }); } - // The first exception will be thrown out. + // Let's wait for all threads to finish. Otherwise local variable references will be invalid. + // The first exception will be thrown out if any, after all threads are finished, which is safe. thread_manager->wait(); - return std::make_shared(std::move(store_read_tasks)); + // Do some integrity checks for the build seg tasks. For example, we should not + // ever read from the same store+table+segment multiple times. + { + // TODO + } + + return DM::Remote::RNReadTask::create(output_seg_tasks); } -/// Note: This function runs concurrently when there are multiple Write Nodes. -void StorageDisaggregated::buildDisaggTask( +void StorageDisaggregated::buildReadTaskForWriteNode( const Context & db_context, const DM::ScanContextPtr & scan_context, const pingcap::coprocessor::BatchCopTask & batch_cop_task, - std::vector & store_read_tasks, - std::mutex & store_read_tasks_lock) + std::mutex & output_lock, + std::vector & output_seg_tasks) { Stopwatch watch; - auto req = buildDisaggTaskForNode(db_context, batch_cop_task); + auto req = buildEstablishDisaggTaskReq(db_context, batch_cop_task); auto * cluster = context.getTMTContext().getKVCluster(); auto call = pingcap::kv::RpcCall(req); @@ -334,39 +339,71 @@ void StorageDisaggregated::buildDisaggTask( } } - // Parse the resp and gen tasks on read node - std::vector remote_seg_tasks; - remote_seg_tasks.reserve(resp->tables_size()); - for (const auto & physical_table : resp->tables()) + // Now we have successfully established disaggregated read for this write node. + // Let's parse the result and generate actual segment read tasks. + // There may be multiple tables, so we concurrently build tasks for these tables. + auto thread_manager = newThreadManager(); + for (const auto & serialized_physical_table : resp->tables()) { - DB::DM::RemotePb::RemotePhysicalTable table; - auto parse_ok = table.ParseFromString(physical_table); - RUNTIME_CHECK_MSG(parse_ok, "Failed to deserialize RemotePhysicalTable from response"); - - Stopwatch w_build_table_task; - - const auto task = DM::RNRemotePhysicalTableReadTask::buildFrom( - db_context, - scan_context, - resp->store_id(), - batch_cop_task.store_addr, - snapshot_id, - table, - log); - remote_seg_tasks.emplace_back(task); - - LOG_DEBUG( - log, - "Build RNRemotePhysicalTableReadTask finished, elapsed={:.3f}s store={} addr={} segments={}", - w_build_table_task.elapsedSeconds(), - resp->store_id(), - batch_cop_task.store_addr, - table.segments().size()); + thread_manager->schedule( + true, + "buildReadTaskForWriteNodeTable", + [&] { + buildReadTaskForWriteNodeTable( + db_context, + scan_context, + snapshot_id, + resp->store_id(), + req->address(), + serialized_physical_table, + output_lock, + output_seg_tasks); + }); } - std::unique_lock lock(store_read_tasks_lock); - store_read_tasks.emplace_back(std::make_shared(resp->store_id(), remote_seg_tasks)); + thread_manager->wait(); +} - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_build_read_task).Observe(watch.elapsedSeconds()); +void StorageDisaggregated::buildReadTaskForWriteNodeTable( + const Context & db_context, + const DM::ScanContextPtr & scan_context, + const DM::DisaggTaskId & snapshot_id, + StoreID store_id, + const String & store_address, + const String & serialized_physical_table, + std::mutex & output_lock, + std::vector & output_seg_tasks) +{ + DB::DM::RemotePb::RemotePhysicalTable table; + auto parse_ok = table.ParseFromString(serialized_physical_table); + RUNTIME_CHECK_MSG(parse_ok, "Failed to deserialize RemotePhysicalTable from response"); + + auto thread_manager = newThreadManager(); + auto n = static_cast(table.segments().size()); + for (size_t idx = 0; idx < n; ++idx) + { + const auto & remote_seg = table.segments(idx); + + thread_manager->schedule( + true, + "buildRNReadSegmentTask", + [&] { + auto seg_read_task = DM::Remote::RNReadSegmentTask::buildFromEstablishResp( + log, + db_context, + scan_context, + remote_seg, + snapshot_id, + store_id, + store_address, + table.keyspace_id(), + table.table_id()); + + std::lock_guard lock(output_lock); + output_seg_tasks.push_back(seg_read_task); + }); + } + + thread_manager->wait(); } /** @@ -378,7 +415,7 @@ void StorageDisaggregated::buildDisaggTask( * Similar to `StorageDisaggregated::buildDispatchMPPTaskRequest` */ std::shared_ptr -StorageDisaggregated::buildDisaggTaskForNode( +StorageDisaggregated::buildEstablishDisaggTaskReq( const Context & db_context, const pingcap::coprocessor::BatchCopTask & batch_cop_task) { @@ -410,7 +447,6 @@ StorageDisaggregated::buildDisaggTaskForNode( tipb::DAGRequest table_scan_req; table_scan_req.set_time_zone_name(dag_req->time_zone_name()); table_scan_req.set_time_zone_offset(dag_req->time_zone_offset()); - // TODO: enable exec summary collection table_scan_req.set_collect_execution_summaries(false); table_scan_req.set_flags(dag_req->flags()); table_scan_req.set_encode_type(tipb::EncodeType::TypeCHBlock); @@ -465,43 +501,15 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator( void StorageDisaggregated::buildRemoteSegmentInputStreams( const Context & db_context, - const DM::RNRemoteReadTaskPtr & remote_read_tasks, + const DM::Remote::RNReadTaskPtr & read_task, const SelectQueryInfo &, size_t num_streams, DAGPipeline & pipeline) { - auto io_concurrency = static_cast(static_cast(num_streams) * db_context.getSettingsRef().disagg_read_concurrency_scale); - LOG_DEBUG(log, "Build disagg streams with {} segment tasks, num_streams={} io_concurrency={}", remote_read_tasks->numSegments(), num_streams, io_concurrency); - // TODO: We can reduce max io_concurrency to numSegments. - const auto & executor_id = table_scan.getTableScanExecutorID(); - // Build a RNPageReceiver to fetch the pages from all write nodes - auto * kv_cluster = db_context.getTMTContext().getKVCluster(); - auto receiver_ctx = std::make_unique(remote_read_tasks, kv_cluster); - auto page_receiver = std::make_shared( - std::move(receiver_ctx), - /*source_num_=*/remote_read_tasks->numSegments(), - num_streams, - log->identifier(), - executor_id); - - bool do_prepare = false; // Build the input streams to read blocks from remote segments auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan); - auto page_preparer = std::make_shared( - remote_read_tasks, - page_receiver, - column_defines, - num_streams, - log->identifier(), - executor_id, - do_prepare); - - const UInt64 read_tso = sender_target_mpp_task_id.query_id.start_ts; - constexpr std::string_view extra_info = "disaggregated compute node remote segment reader"; - pipeline.streams.reserve(num_streams); - auto rs_operator = buildRSOperator(db_context, column_defines); auto push_down_filter = StorageDeltaMerge::buildPushDownFilter( rs_operator, @@ -511,29 +519,32 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams( db_context, log); auto read_mode = DM::DeltaMergeStore::getReadMode(db_context, table_scan.isFastScan(), table_scan.keepOrder(), push_down_filter); + const UInt64 read_tso = sender_target_mpp_task_id.query_id.start_ts; - auto sub_streams_size = io_concurrency / num_streams; + auto workers = DM::Remote::RNWorkers::create({ + .log = log->getChild(executor_id), + .read_task = read_task, + .columns_to_read = column_defines, + .read_tso = read_tso, + .push_down_filter = push_down_filter, + .read_mode = read_mode, + .cluster = db_context.getTMTContext().getKVCluster(), + }); + + RUNTIME_CHECK(num_streams > 0, num_streams); + pipeline.streams.reserve(num_streams); for (size_t stream_idx = 0; stream_idx < num_streams; ++stream_idx) { - // Build N UnionBlockInputStream, each one collects from M underlying RemoteInputStream. - // As a result, we will have N * M IO concurrency (N = num_streams, M = sub_streams_size). - - auto sub_streams = DM::RNRemoteSegmentThreadInputStream::buildInputStreams( - db_context, - remote_read_tasks, - page_preparer, - column_defines, - read_tso, - sub_streams_size, - extra_table_id_index, - push_down_filter, - extra_info, - /*tracing_id*/ log->identifier(), - read_mode); - RUNTIME_CHECK(!sub_streams.empty(), sub_streams.size(), sub_streams_size); - - auto union_stream = std::make_shared>(sub_streams, BlockInputStreams{}, sub_streams_size, /*req_id=*/""); - pipeline.streams.emplace_back(std::move(union_stream)); + auto stream = DM::Remote::RNSegmentInputStream::create({ + .debug_tag = log->identifier(), + // Note: We intentionally pass the whole worker, instead of worker->getReadyChannel() + // because we want to extend the lifetime of the WorkerPtr until read is finished. + // Also, we want to start the Worker after the read. + .workers = workers, + .columns_to_read = *column_defines, + .extra_table_id_index = extra_table_id_index, + }); + pipeline.streams.emplace_back(stream); } auto * dag_context = db_context.getDAGContext();