diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h index 03e3db6ff4e..245d45f8e30 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h @@ -64,15 +64,14 @@ class RFWaitTask : public Task static void submitReadyRfsAndSegmentTaskPool( const RuntimeFilteList & ready_rf_list, - const DM::SegmentReadTaskPoolPtr & task_pool, - const LoggerPtr & log) + const DM::SegmentReadTaskPoolPtr & task_pool) { for (const RuntimeFilterPtr & rf : ready_rf_list) { auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead()); task_pool->appendRSOperator(rs_operator); } - DM::SegmentReadTaskScheduler::instance().add(task_pool, log); + DM::SegmentReadTaskScheduler::instance().add(task_pool); } private: @@ -83,7 +82,7 @@ class RFWaitTask : public Task filterAndMoveReadyRfs(waiting_rf_list, ready_rf_list); if (waiting_rf_list.empty() || stopwatch.elapsed() >= max_wait_time_ns) { - submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log); + submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool); return ExecTaskStatus::FINISHED; } return ExecTaskStatus::WAITING; diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index 085752a61d4..da1702ef468 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -64,7 +64,7 @@ void UnorderedSourceOp::operatePrefixImpl() std::call_once(task_pool->addToSchedulerFlag(), [&]() { if (waiting_rf_list.empty()) { - DM::SegmentReadTaskScheduler::instance().add(task_pool, log); + DM::SegmentReadTaskScheduler::instance().add(task_pool); } else { @@ -74,7 +74,7 @@ void UnorderedSourceOp::operatePrefixImpl() if (max_wait_time_ms <= 0 || waiting_rf_list.empty()) { - RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log); + RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool); } else { diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h b/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h deleted file mode 100644 index 8baf53fc23b..00000000000 --- a/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#pragma once - -#include -#include -#include - -namespace DB::DM -{ -// CircularScanList is a special circular list. -// It remembers the location of the last iteration and will check whether the object is expired. -template -class CircularScanList -{ -public: - using ElemPtr = std::shared_ptr; - - CircularScanList() - : last_itr(l.end()) - {} - - void add(const ElemPtr & ptr) - { - l.push_back(ptr); - m[ptr->pool_id] = --l.end(); - } - - ElemPtr next() - { - last_itr = nextItr(last_itr); - while (!l.empty()) - { - if (needScheduled(last_itr)) - { - return *last_itr; - } - else - { - m.erase((*last_itr)->pool_id); - last_itr = l.erase(last_itr); - if (last_itr == l.end()) - { - last_itr = l.begin(); - } - } - } - return nullptr; - } - - size_t size() const { return l.size(); } - - // `count` is for test - std::pair count(int64_t table_id) const - { - int64_t valid = 0; - int64_t invalid = 0; - for (const auto & p : l) - { - if (table_id == 0 || p->physical_table_id == table_id) - { - p->valid() ? valid++ : invalid++; - } - } - return {valid, invalid}; - } - - ElemPtr get(uint64_t pool_id) const - { - auto itr = m.find(pool_id); - return itr != m.end() ? *(itr->second) : nullptr; - } - -private: - using Iter = typename std::list::iterator; - Iter nextItr(Iter itr) - { - if (itr == l.end() || std::next(itr) == l.end()) - { - return l.begin(); - } - else - { - return std::next(itr); - } - } - - bool needScheduled(Iter itr) - { - // If other components hold this SegmentReadTaskPool, schedule it for read blocks or clean MergedTaskPool if necessary. - return itr->use_count() > 1; - } - - std::list l; - Iter last_itr; - std::unordered_map m; -}; - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h index 3df53630d03..a6a5193663b 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h @@ -127,7 +127,11 @@ class MergedTask } } +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif void initOnce(); int readOneBlock(); void setUnitFinish(int i) { finished_count += units[i].setFinish(); } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index dce93fc197e..fa03c0a5701 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -11,34 +11,44 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include namespace DB::DM { -SegmentReadTaskScheduler::SegmentReadTaskScheduler() +SegmentReadTaskScheduler::SegmentReadTaskScheduler(bool run_sched_thread) : stop(false) , log(Logger::get()) { - sched_thread = std::thread(&SegmentReadTaskScheduler::schedLoop, this); + if (likely(run_sched_thread)) + { + sched_thread = std::thread(&SegmentReadTaskScheduler::schedLoop, this); + } } SegmentReadTaskScheduler::~SegmentReadTaskScheduler() { setStop(); - sched_thread.join(); + if (likely(sched_thread.joinable())) + { + sched_thread.join(); + } } -void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool, const LoggerPtr & req_log) +void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) { + assert(pool != nullptr); Stopwatch sw_add; // `add_lock` is only used in this function to make all threads calling `add` to execute serially. std::lock_guard add_lock(add_mtx); + add_waittings.fetch_add(1, std::memory_order_relaxed); // `lock` is used to protect data. std::lock_guard lock(mtx); + add_waittings.fetch_sub(1, std::memory_order_relaxed); Stopwatch sw_do_add; - read_pools.add(pool); + read_pools.emplace(pool->pool_id, pool); const auto & tasks = pool->getTasks(); for (const auto & [seg_id, task] : tasks) @@ -46,7 +56,7 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool, const Lo merging_segments[seg_id].push_back(pool->pool_id); } LOG_INFO( - req_log, + pool->getLogger(), "Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", // pool->pool_id, pool->getFreeBlockSlots(), @@ -56,28 +66,20 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool, const Lo sw_do_add.elapsed() / 1000.0); } -std::pair SegmentReadTaskScheduler::scheduleMergedTask() +MergedTaskPtr SegmentReadTaskScheduler::scheduleMergedTask(SegmentReadTaskPoolPtr & pool) { - auto pool = scheduleSegmentReadTaskPoolUnlock(); - if (pool == nullptr) - { - // No SegmentReadTaskPool to schedule. Maybe no read request or - // block queue of each SegmentReadTaskPool reaching the limit. - return {nullptr, false}; - } - // If pool->valid(), read blocks. // If !pool->valid(), read path will clean it. auto merged_task = merged_task_pool.pop(pool->pool_id); if (merged_task != nullptr) { GET_METRIC(tiflash_storage_read_thread_counter, type_sche_from_cache).Increment(); - return {merged_task, true}; + return merged_task; } if (!pool->valid()) { - return {nullptr, true}; + return nullptr; } auto segment = scheduleSegmentUnlock(pool); @@ -85,13 +87,15 @@ std::pair SegmentReadTaskScheduler::scheduleMergedTask() { // The number of active segments reaches the limit. GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_segment).Increment(); - return {nullptr, true}; + return nullptr; } + + RUNTIME_CHECK(!segment->second.empty()); auto pools = getPoolsUnlock(segment->second); if (pools.empty()) { // Maybe SegmentReadTaskPools are expired because of upper threads finish the request. - return {nullptr, true}; + return nullptr; } std::vector units; @@ -102,19 +106,19 @@ std::pair SegmentReadTaskScheduler::scheduleMergedTask() } GET_METRIC(tiflash_storage_read_thread_counter, type_sche_new_task).Increment(); - return {std::make_shared(segment->first, std::move(units)), true}; + return std::make_shared(segment->first, std::move(units)); } SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector & pool_ids) { SegmentReadTaskPools pools; pools.reserve(pool_ids.size()); - for (uint64_t id : pool_ids) + for (auto pool_id : pool_ids) { - auto p = read_pools.get(id); - if (p != nullptr) + auto itr = read_pools.find(pool_id); + if (likely(itr != read_pools.end())) { - pools.push_back(p); + pools.push_back(itr->second); } } return pools; @@ -157,24 +161,10 @@ bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & return false; } -SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock() +bool SegmentReadTaskScheduler::needSchedule(const SegmentReadTaskPoolPtr & pool) { - int64_t pool_count - = read_pools.size(); // All read task pool need to be scheduled, including invalid read task pool. - for (int64_t i = 0; i < pool_count; i++) - { - auto pool = read_pools.next(); - // If !pool->valid(), schedule it for clean MergedTaskPool. - if (pool != nullptr && (needScheduleToRead(pool) || !pool->valid())) - { - return pool; - } - } - if (pool_count == 0) - { - GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment(); - } - return nullptr; + // If `!pool->valid()` is true, schedule it for clean `MergedTaskPool`. + return pool != nullptr && (needScheduleToRead(pool) || !pool->valid()); } std::optional>> SegmentReadTaskScheduler::scheduleSegmentUnlock( @@ -213,57 +203,95 @@ bool SegmentReadTaskScheduler::isStop() const return stop.load(std::memory_order_relaxed); } -bool SegmentReadTaskScheduler::schedule() +std::tuple SegmentReadTaskScheduler::scheduleOneRound() { - Stopwatch sw_sche_all; - std::lock_guard lock(mtx); - Stopwatch sw_do_sche_all; - static constexpr size_t max_sche_count = 8; - auto pool_count = read_pools.size(); - auto sche_count = std::min(pool_count, max_sche_count); - bool run_sche = false; - size_t count = 0; - while (count < sche_count) + UInt64 erased_pool_count = 0; + UInt64 sched_null_count = 0; + UInt64 sched_succ_count = 0; + for (auto itr = read_pools.begin(); itr != read_pools.end(); /**/) { - count++; - Stopwatch sw_sche_once; - MergedTaskPtr merged_task; - std::tie(merged_task, run_sche) = scheduleMergedTask(); - if (merged_task != nullptr) + auto & pool = itr->second; + // No other component or thread hold this `pool`, we can release it. + // TODO: `weak_ptr` may be more suitable. + if (pool.use_count() == 1) { - auto elapsed_ms = sw_sche_once.elapsedMilliseconds(); - if (elapsed_ms >= 5) - { - LOG_DEBUG( - log, - "scheduleMergedTask merged_task=<{}> cost={}ms pool_count={}", - merged_task->toString(), - elapsed_ms, - pool_count); - } - SegmentReaderPoolManager::instance().addTask(std::move(merged_task)); + LOG_INFO(pool->getLogger(), "Erase pool_id={}", pool->pool_id); + ++erased_pool_count; + itr = read_pools.erase(itr); + continue; } - if (!run_sche) + ++itr; + + if (!needSchedule(pool)) { - break; + ++sched_null_count; + continue; } + + auto merged_task = scheduleMergedTask(pool); + if (merged_task == nullptr) + { + ++sched_null_count; + continue; + } + ++sched_succ_count; + SegmentReaderPoolManager::instance().addTask(std::move(merged_task)); + } + return std::make_tuple(erased_pool_count, sched_null_count, sched_succ_count); +} + +bool SegmentReadTaskScheduler::schedule() +{ + Stopwatch sw_sched_total; + std::lock_guard lock(mtx); + Stopwatch sw_do_sched; + + auto pool_count = read_pools.size(); + UInt64 erased_pool_count = 0; + UInt64 sched_null_count = 0; + UInt64 sched_succ_count = 0; + UInt64 sched_round = 0; + bool can_sched_more_tasks = false; + do + { + ++sched_round; + auto [erase, null, succ] = scheduleOneRound(); + erased_pool_count += erase; + sched_null_count += null; + sched_succ_count += succ; + + can_sched_more_tasks = succ > 0 && !read_pools.empty(); + // If no thread is waitting to add tasks and there are some tasks to be scheduled, run scheduling again. + // Avoid releasing and acquiring `mtx` repeatly. + // This is common when query concurrency is low, but individual queries are heavy. + } while (add_waittings.load(std::memory_order_relaxed) <= 0 && can_sched_more_tasks); + + if (read_pools.empty()) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment(); } - auto sche_all_elapsed_ms = sw_sche_all.elapsedMilliseconds(); - if (sche_all_elapsed_ms >= 100) + + auto total_ms = sw_sched_total.elapsedMilliseconds(); + if (total_ms >= 100) { - LOG_DEBUG( + LOG_INFO( log, - "schedule pool_count={} count={} cost={}ms do_sche_cost={}ms", + "schedule sched_round={} pool_count={} erased_pool_count={} sched_null_count={} sched_succ_count={} " + "cost={}ms do_sched_cost={}ms", + sched_round, pool_count, - count, - sche_all_elapsed_ms, - sw_do_sche_all.elapsedMilliseconds()); + erased_pool_count, + sched_null_count, + sched_succ_count, + total_ms, + sw_do_sched.elapsedMilliseconds()); } - return run_sche; + return can_sched_more_tasks; } void SegmentReadTaskScheduler::schedLoop() { + setThreadName("segment-sched"); while (!isStop()) { if (!schedule()) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h index a269c0812a0..09be7c4755e 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -13,21 +13,19 @@ // limitations under the License. #pragma once -#include #include #include namespace DB::DM { -using SegmentReadTaskPoolList = CircularScanList; -// SegmentReadTaskScheduler is a global singleton. -// All SegmentReadTaskPool will be added to it and be scheduled by it. -// 1. DeltaMergeStore::read/readRaw will call SegmentReadTaskScheduler::add to add a SegmentReadTaskPool object to the `read_pools` list and -// index segments information into `merging_segments`. +// `SegmentReadTaskScheduler` is a global singleton. All `SegmentReadTaskPool` will be added to it and be scheduled by it. +// 1. `UnorderedInputStream`/`UnorderedSourceOps` will call `SegmentReadTaskScheduler::add` to add a `SegmentReadTaskPool` +// object to the `read_pools` list and index segments information into `merging_segments`. // 2. A schedule-thread will scheduling read tasks: -// a. It scans the read_pools list and choosing a SegmentReadTaskPool. -// b. Chooses a segment of the SegmentReadTaskPool and build a MergedTask. +// a. It scans the `read_pools` list and check if `SegmentReadTaskPool` need be scheduled. +// b. Chooses a `SegmentReadTask` of the `SegmentReadTaskPool`, if other `SegmentReadTaskPool` will read the same +// `SegmentReadTask`, pop them, and build a `MergedTask`. // c. Sends the MergedTask to read threads(SegmentReader). class SegmentReadTaskScheduler { @@ -42,22 +40,36 @@ class SegmentReadTaskScheduler DISALLOW_COPY_AND_MOVE(SegmentReadTaskScheduler); // Add SegmentReadTaskPool to `read_pools` and index segments into merging_segments. - void add(const SegmentReadTaskPoolPtr & pool, const LoggerPtr & req_log) LOCKS_EXCLUDED(add_mtx, mtx); + void add(const SegmentReadTaskPoolPtr & pool) LOCKS_EXCLUDED(add_mtx, mtx); void pushMergedTask(const MergedTaskPtr & p) { merged_task_pool.push(p); } +#ifndef DBMS_PUBLIC_GTEST private: - SegmentReadTaskScheduler(); +#else +public: +#endif + // `run_sched_thread` is used for test. + explicit SegmentReadTaskScheduler(bool run_sched_thread = true); void setStop(); bool isStop() const; bool needScheduleToRead(const SegmentReadTaskPoolPtr & pool); + bool needSchedule(const SegmentReadTaskPoolPtr & pool); + // `scheduleOneRound()` traverses all pools in `read_pools`, try to schedule `SegmentReadTask` of each pool. + // It returns summary information for a round of scheduling: + // `erased_pool_count` - how many stale pools have beed erased. + // `sched_null_count` - how many pools do not require scheduling. + // `sched_succ_count` - how many pools is scheduled. + std::tuple scheduleOneRound() EXCLUSIVE_LOCKS_REQUIRED(mtx); + // `schedule()` calls `scheduleOneRound()` in a loop + // until there are no tasks to schedule or need to release lock to other tasks. bool schedule() LOCKS_EXCLUDED(mtx); + // `schedLoop()` calls `schedule()` in infinite loop. void schedLoop() LOCKS_EXCLUDED(mtx); - // Choose segment to read, returns . - std::pair scheduleMergedTask() EXCLUSIVE_LOCKS_REQUIRED(mtx); - SegmentReadTaskPoolPtr scheduleSegmentReadTaskPoolUnlock() EXCLUSIVE_LOCKS_REQUIRED(mtx); + + MergedTaskPtr scheduleMergedTask(SegmentReadTaskPoolPtr & pool) EXCLUSIVE_LOCKS_REQUIRED(mtx); // Returns . std::optional>> scheduleSegmentUnlock( const SegmentReadTaskPoolPtr & pool) EXCLUSIVE_LOCKS_REQUIRED(mtx); @@ -67,7 +79,8 @@ class SegmentReadTaskScheduler std::mutex add_mtx ACQUIRED_BEFORE(mtx); std::mutex mtx; - SegmentReadTaskPoolList read_pools GUARDED_BY(mtx); + // pool_id -> pool + std::unordered_map read_pools GUARDED_BY(mtx); // GlobalSegmentID -> pool_ids MergingSegments merging_segments GUARDED_BY(mtx); @@ -77,5 +90,8 @@ class SegmentReadTaskScheduler std::thread sched_thread; LoggerPtr log; + + // To count how many threads are waitting to add tasks. + std::atomic add_waittings{0}; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index 2cfc1b2af8c..00251c0400c 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -128,7 +128,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream } std::call_once(task_pool->addToSchedulerFlag(), [&]() { prepareRuntimeFilter(); - SegmentReadTaskScheduler::instance().add(task_pool, log); + SegmentReadTaskScheduler::instance().add(task_pool); }); task_pool_added = true; } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index c8de4253fab..898e460e6bb 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -189,7 +189,13 @@ class SegmentReadTaskPool : private boost::noncopyable bool isRUExhausted(); + const LoggerPtr & getLogger() const { return log; } + +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif Int64 getFreeActiveSegmentsUnlock() const; bool exceptionHappened() const; void finishSegment(const SegmentReadTaskPtr & seg); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp deleted file mode 100644 index fa48bfd1eeb..00000000000 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#include -#include - -#include - -namespace DB::DM::tests -{ -class Node -{ -public: - explicit Node(uint64_t id_) - : pool_id(id_) - , physical_table_id(1) - , v(true) - {} - - bool valid() const { return v; } - void setInvalid() { v = false; } - -public: - uint64_t pool_id; - int64_t physical_table_id; - -private: - bool v; -}; - -TEST(CircularScanListTest, Normal) -{ - CircularScanList lst; - - { - ASSERT_EQ(lst.next(), nullptr); - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 0); - ASSERT_EQ(lst.get(1), nullptr); - } - - std::unordered_map> nodes; - for (uint64_t i = 0; i < 10; i++) - { - auto p = std::make_shared(i); - lst.add(p); - nodes.emplace(i, p); - } - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 10); - ASSERT_EQ(invalid, 0); - } - - for (uint64_t i = 0; i < 20; i++) - { - auto sp = lst.next(); - ASSERT_EQ(sp->pool_id, i % 10); - } - - lst.get(1)->setInvalid(); - lst.get(3)->setInvalid(); - lst.get(5)->setInvalid(); - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 7); - ASSERT_EQ(invalid, 3); - } - - // Invalid node still can be scanned if it is holded by other components. - for (uint64_t i = 0; i < 20; i++) - { - auto sp = lst.next(); - ASSERT_EQ(sp->pool_id, i % 10); - } - - nodes.erase(1); - nodes.erase(3); - nodes.erase(5); - - const std::vector valid_ids = {0, 2, 4, 6, 7, 8, 9}; - for (uint64_t i = 0; i < 20; i++) - { - auto sp = lst.next(); - ASSERT_EQ(sp->pool_id, valid_ids[i % valid_ids.size()]); - } - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 7); - ASSERT_EQ(invalid, 0); - } - - for (uint64_t id : valid_ids) - { - lst.get(id)->setInvalid(); - nodes.erase(id); - } - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 7); - } - - ASSERT_EQ(lst.next(), nullptr); -} - -TEST(CircularScanListTest, Valid) -{ - CircularScanList l; - auto p1 = std::make_shared(1); - l.add(p1); - - ASSERT_EQ(l.next()->pool_id, 1); - ASSERT_EQ(l.next()->pool_id, 1); - - l.next()->setInvalid(); - p1.reset(); - - ASSERT_EQ(l.next(), nullptr); - ASSERT_EQ(l.next(), nullptr); - auto p2 = std::make_shared(2); - l.add(p2); - - ASSERT_EQ(l.next()->pool_id, 2); - ASSERT_EQ(l.next()->pool_id, 2); -} - -TEST(CircularScanListTest, ScheduleInvalid) -{ - CircularScanList l; - - // Add tasks. - auto n1 = std::make_shared(1); - l.add(n1); - auto n2 = std::make_shared(2); - l.add(n2); - auto n3 = std::make_shared(3); - l.add(n3); - - // Some tasks hold the shared_ptr. - //auto n1 = l.next(); - //auto n2 = l.next(); - //auto n3 = l.next(); - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 3); - ASSERT_EQ(invalid, 0); - } - // Make task invalid. - n1->setInvalid(); - n2->setInvalid(); - n3->setInvalid(); - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 3); - } - - { - // Tasks can be scheduled. - auto n1_1 = l.next(); - ASSERT_NE(n1_1, nullptr); - ASSERT_EQ(n1_1->pool_id, 1); - ASSERT_FALSE(n1_1->valid()); - - auto n2_1 = l.next(); - ASSERT_NE(n2_1, nullptr); - ASSERT_EQ(n2_1->pool_id, 2); - ASSERT_FALSE(n2_1->valid()); - - auto n3_1 = l.next(); - ASSERT_NE(n3_1, nullptr); - ASSERT_EQ(n3_1->pool_id, 3); - ASSERT_FALSE(n3_1->valid()); - } - - // Reset tasks - { - n1.reset(); - n2.reset(); - n3.reset(); - } - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 3); - } - - // Tasks no need to be scheduled since no task hold the shared_ptr. - { - auto n1_1 = l.next(); - ASSERT_EQ(n1_1, nullptr); - - auto n2_1 = l.next(); - ASSERT_EQ(n2_1, nullptr); - - auto n3_1 = l.next(); - ASSERT_EQ(n3_1, nullptr); - } - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 0); - } -} - -} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp index 2e9d3aeeb7a..e380b1ec933 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -22,17 +23,45 @@ #include namespace DB::DM::tests { -class SegmentReadTasksWrapperTest : public SegmentTestBasic +class SegmentReadTasksPoolTest : public SegmentTestBasic { protected: - SegmentPtr createSegment(PageIdU64 seg_id) + static SegmentPtr createSegment(PageIdU64 seg_id) { return std::make_shared(Logger::get(), 0, RowKeyRange{}, seg_id, seg_id + 1, nullptr, nullptr); } + static SegmentSnapshotPtr createSegmentSnapshot() + { + auto delta_snap = std::make_shared(CurrentMetrics::Metric{}); + delta_snap->delta = std::make_shared(nullptr); + return std::make_shared(std::move(delta_snap), /*stable*/ nullptr, Logger::get()); + } + SegmentReadTaskPtr createSegmentReadTask(PageIdU64 seg_id) { - return std::make_shared(createSegment(seg_id), nullptr, createDMContext(), RowKeyRanges{}); + return std::make_shared( + createSegment(seg_id), + createSegmentSnapshot(), + createDMContext(), + RowKeyRanges{}); + } + + static Block createBlock() + { + String type_name = "Int64"; + DataTypePtr types[2]; + types[0] = DataTypeFactory::instance().get(type_name); + types[1] = makeNullable(types[0]); + ColumnsWithTypeAndName columns; + for (auto & type : types) + { + auto column = type->createColumn(); + for (size_t i = 0; i < 10; i++) + column->insertDefault(); + columns.emplace_back(std::move(column), type); + } + return Block{columns}; } SegmentReadTasks createSegmentReadTasks(const std::vector & seg_ids) @@ -57,10 +86,28 @@ class SegmentReadTasksWrapperTest : public SegmentTestBasic }; } + SegmentReadTaskPoolPtr createSegmentReadTaskPool(const std::vector & seg_ids) + { + auto dm_context = createDMContext(); + return std::make_shared( + /*extra_table_id_index_*/ dm_context->physical_table_id, + /*columns_to_read_*/ ColumnDefines{}, + /*filter_*/ nullptr, + /*max_version_*/ 0, + /*expected_block_size_*/ DEFAULT_BLOCK_SIZE, + /*read_mode_*/ ReadMode::Bitmap, + createSegmentReadTasks(seg_ids), + /*after_segment_read_*/ [&](const DMContextPtr &, const SegmentPtr &) { /*do nothing*/ }, + /*tracing_id_*/ String{}, + /*enable_read_thread_*/ true, + /*num_streams_*/ 1, + /*res_group_name_*/ String{}); + } + inline static const std::vector test_seg_ids{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; }; -TEST_F(SegmentReadTasksWrapperTest, Unordered) +TEST_F(SegmentReadTasksPoolTest, UnorderedWrapper) { SegmentReadTasksWrapper tasks_wrapper(true, createSegmentReadTasks(test_seg_ids)); @@ -95,7 +142,7 @@ TEST_F(SegmentReadTasksWrapperTest, Unordered) ASSERT_TRUE(tasks_wrapper.empty()); } -TEST_F(SegmentReadTasksWrapperTest, Ordered) +TEST_F(SegmentReadTasksPoolTest, OrderedWrapper) { SegmentReadTasksWrapper tasks_wrapper(false, createSegmentReadTasks(test_seg_ids)); @@ -121,4 +168,89 @@ TEST_F(SegmentReadTasksWrapperTest, Ordered) ASSERT_EQ(tasks_wrapper.nextTask(), nullptr); } +TEST_F(SegmentReadTasksPoolTest, SchedulerBasic) +{ + SegmentReadTaskScheduler scheduler{false}; + + { + // Create and add pool. + auto pool = createSegmentReadTaskPool(test_seg_ids); + pool->increaseUnorderedInputStreamRefCount(); + scheduler.add(pool); + + // Schedule segment to reach limitation. + auto active_segment_limits = pool->getFreeActiveSegments(); + ASSERT_GT(active_segment_limits, 0); + std::vector merged_tasks; + for (int i = 0; i < active_segment_limits; ++i) + { + std::lock_guard lock(scheduler.mtx); + auto merged_task = scheduler.scheduleMergedTask(pool); + ASSERT_NE(merged_task, nullptr); + merged_tasks.push_back(merged_task); + } + { + std::lock_guard lock(scheduler.mtx); + ASSERT_EQ(scheduler.scheduleMergedTask(pool), nullptr); + } + + // Make a segment finished. + { + ASSERT_FALSE(scheduler.needScheduleToRead(pool)); + auto merged_task = merged_tasks.back(); + ASSERT_EQ(merged_task->units.size(), 1); + pool->finishSegment(merged_task->units.front().task); + ASSERT_TRUE(scheduler.needScheduleToRead(pool)); + } + + // Push block to reach limitation. + { + auto free_slot_limits = pool->getFreeBlockSlots(); + ASSERT_GT(free_slot_limits, 0); + for (int i = 0; i < free_slot_limits; ++i) + { + pool->pushBlock(createBlock()); + } + ASSERT_EQ(pool->getFreeBlockSlots(), 0); + ASSERT_FALSE(scheduler.needScheduleToRead(pool)); + + Block blk; + pool->popBlock(blk); + ASSERT_TRUE(blk); + ASSERT_EQ(pool->getFreeBlockSlots(), 1); + ASSERT_TRUE(scheduler.needScheduleToRead(pool)); + + while (pool->tryPopBlock(blk)) {} + } + + // Finish + { + while (!merged_tasks.empty()) + { + auto merged_task = merged_tasks.back(); + merged_tasks.pop_back(); + pool->finishSegment(merged_task->units.front().task); + } + + for (;;) + { + std::lock_guard lock(scheduler.mtx); + auto merged_task = scheduler.scheduleMergedTask(pool); + if (merged_task == nullptr) + { + break; + } + pool->finishSegment(merged_task->units.front().task); + } + + ASSERT_EQ(pool->q.size(), 0); + Block blk; + ASSERT_FALSE(pool->q.pop(blk)); + + pool->decreaseUnorderedInputStreamRefCount(); + ASSERT_FALSE(pool->valid()); + } + } +} + } // namespace DB::DM::tests diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index b2209167cfb..78d350ab6a0 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -2738,6 +2738,136 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": null, + "description": "", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 44 + }, + "hiddenSeries": false, + "id": 260, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 250, + "sort": "max", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "Limit", + "color": "#F2495C", + "hideTooltip": true, + "legend": false, + "linewidth": 2, + "nullPointMode": "connected" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum by (instance) (rate(tiflash_proxy_thread_cpu_seconds_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", name=~\"segment_sched.*\"}[1m]))", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{name}} {{instance}}", + "refId": "A", + "step": 40 + }, + { + "exemplar": true, + "expr": "count by (instance) (tiflash_proxy_thread_cpu_seconds_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", name=~\"segment_sched.*\"})", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "Limit", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Segment Scheduler", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 1, + "format": "percentunit", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Threads CPU", @@ -7859,7 +7989,7 @@ "current": false, "max": false, "min": false, - "rightSide": false, + "rightSide": true, "show": true, "total": false, "values": false @@ -7888,7 +8018,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(tiflash_storage_read_thread_counter{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type!~\"get_cache_miss|get_cache_hit|get_cache_part|get_cache_copy|sche_no_segment\"}[1m])) by (type)", + "expr": "sum(rate(tiflash_storage_read_thread_counter{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type!~\"get_cache_miss|get_cache_hit|get_cache_part|get_cache_copy\"}[1m])) by (type)", "format": "time_series", "hide": false, "interval": "",