diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h b/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h deleted file mode 100644 index 8baf53fc23b..00000000000 --- a/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#pragma once - -#include -#include -#include - -namespace DB::DM -{ -// CircularScanList is a special circular list. -// It remembers the location of the last iteration and will check whether the object is expired. -template -class CircularScanList -{ -public: - using ElemPtr = std::shared_ptr; - - CircularScanList() - : last_itr(l.end()) - {} - - void add(const ElemPtr & ptr) - { - l.push_back(ptr); - m[ptr->pool_id] = --l.end(); - } - - ElemPtr next() - { - last_itr = nextItr(last_itr); - while (!l.empty()) - { - if (needScheduled(last_itr)) - { - return *last_itr; - } - else - { - m.erase((*last_itr)->pool_id); - last_itr = l.erase(last_itr); - if (last_itr == l.end()) - { - last_itr = l.begin(); - } - } - } - return nullptr; - } - - size_t size() const { return l.size(); } - - // `count` is for test - std::pair count(int64_t table_id) const - { - int64_t valid = 0; - int64_t invalid = 0; - for (const auto & p : l) - { - if (table_id == 0 || p->physical_table_id == table_id) - { - p->valid() ? valid++ : invalid++; - } - } - return {valid, invalid}; - } - - ElemPtr get(uint64_t pool_id) const - { - auto itr = m.find(pool_id); - return itr != m.end() ? *(itr->second) : nullptr; - } - -private: - using Iter = typename std::list::iterator; - Iter nextItr(Iter itr) - { - if (itr == l.end() || std::next(itr) == l.end()) - { - return l.begin(); - } - else - { - return std::next(itr); - } - } - - bool needScheduled(Iter itr) - { - // If other components hold this SegmentReadTaskPool, schedule it for read blocks or clean MergedTaskPool if necessary. - return itr->use_count() > 1; - } - - std::list l; - Iter last_itr; - std::unordered_map m; -}; - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h index 0a82367fe65..e0a2d72c8a4 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h @@ -93,7 +93,11 @@ class MergedTask } void setException(const DB::Exception & e); +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif void initOnce(); int readOneBlock(); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index 5c00bd084f7..eabb5c379fc 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -18,26 +18,46 @@ namespace DB::DM { -SegmentReadTaskScheduler::SegmentReadTaskScheduler() +SegmentReadTaskScheduler::SegmentReadTaskScheduler(bool run_sched_thread) : stop(false) , log(Logger::get()) { - sched_thread = std::thread(&SegmentReadTaskScheduler::schedLoop, this); + if (likely(run_sched_thread)) + { + sched_thread = std::thread(&SegmentReadTaskScheduler::schedLoop, this); + } } SegmentReadTaskScheduler::~SegmentReadTaskScheduler() { setStop(); - sched_thread.join(); + if (likely(sched_thread.joinable())) + { + sched_thread.join(); + } } void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) { + assert(pool != nullptr); + if (pool->getPendingSegmentCount() <= 0) + { + LOG_INFO( + pool->getLogger(), + "Ignored for no segment to read, pool_id={} table_id={}", + pool->pool_id, + pool->physical_table_id); + return; + } Stopwatch sw_add; + // `add_lock` is only used in this function to make all threads calling `add` to execute serially. std::lock_guard add_lock(add_mtx); + add_waittings.fetch_add(1, std::memory_order_relaxed); + // `lock` is used to protect data. std::lock_guard lock(mtx); + add_waittings.fetch_sub(1, std::memory_order_relaxed); Stopwatch sw_do_add; - read_pools.add(pool); + read_pools.emplace(pool->pool_id, pool); const auto & tasks = pool->getTasks(); for (const auto & pa : tasks) @@ -46,11 +66,9 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) merging_segments[pool->physical_table_id][seg_id].push_back(pool->pool_id); } LOG_INFO( - log, - "Added, pool_id={} table_id={} block_slots={} segment_count={} pool_count={} " - "cost={:.3f}us do_add_cost={:.3f}us", // + pool->getLogger(), + "Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", // pool->pool_id, - pool->physical_table_id, pool->getFreeBlockSlots(), tasks.size(), read_pools.size(), @@ -58,28 +76,20 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) sw_do_add.elapsed() / 1000.0); } -std::pair SegmentReadTaskScheduler::scheduleMergedTask() +MergedTaskPtr SegmentReadTaskScheduler::scheduleMergedTask(SegmentReadTaskPoolPtr & pool) { - auto pool = scheduleSegmentReadTaskPoolUnlock(); - if (pool == nullptr) - { - // No SegmentReadTaskPool to schedule. Maybe no read request or - // block queue of each SegmentReadTaskPool reaching the limit. - return {nullptr, false}; - } - // If pool->valid(), read blocks. // If !pool->valid(), read path will clean it. auto merged_task = merged_task_pool.pop(pool->pool_id); if (merged_task != nullptr) { GET_METRIC(tiflash_storage_read_thread_counter, type_sche_from_cache).Increment(); - return {merged_task, true}; + return merged_task; } if (!pool->valid()) { - return {nullptr, true}; + return nullptr; } auto segment = scheduleSegmentUnlock(pool); @@ -87,13 +97,15 @@ std::pair SegmentReadTaskScheduler::scheduleMergedTask() { // The number of active segments reaches the limit. GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_segment).Increment(); - return {nullptr, true}; + return nullptr; } + + RUNTIME_CHECK(!segment->second.empty()); auto pools = getPoolsUnlock(segment->second); if (pools.empty()) { // Maybe SegmentReadTaskPools are expired because of upper threads finish the request. - return {nullptr, true}; + return nullptr; } std::vector units; @@ -104,19 +116,19 @@ std::pair SegmentReadTaskScheduler::scheduleMergedTask() } GET_METRIC(tiflash_storage_read_thread_counter, type_sche_new_task).Increment(); - return {std::make_shared(segment->first, std::move(units)), true}; + return std::make_shared(segment->first, std::move(units)); } SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector & pool_ids) { SegmentReadTaskPools pools; pools.reserve(pool_ids.size()); - for (uint64_t id : pool_ids) + for (auto pool_id : pool_ids) { - auto p = read_pools.get(id); - if (p != nullptr) + auto itr = read_pools.find(pool_id); + if (likely(itr != read_pools.end())) { - pools.push_back(p); + pools.push_back(itr->second); } } return pools; @@ -159,24 +171,10 @@ bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & return false; } -SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock() +bool SegmentReadTaskScheduler::needSchedule(const SegmentReadTaskPoolPtr & pool) { - int64_t pool_count - = read_pools.size(); // All read task pool need to be scheduled, including invalid read task pool. - for (int64_t i = 0; i < pool_count; i++) - { - auto pool = read_pools.next(); - // If !pool->valid(), schedule it for clean MergedTaskPool. - if (pool != nullptr && (needScheduleToRead(pool) || !pool->valid())) - { - return pool; - } - } - if (pool_count == 0) - { - GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment(); - } - return nullptr; + // If `!pool->valid()` is true, schedule it for clean `MergedTaskPool`. + return pool != nullptr && (needScheduleToRead(pool) || !pool->valid()); } std::optional>> SegmentReadTaskScheduler::scheduleSegmentUnlock( @@ -226,54 +224,90 @@ bool SegmentReadTaskScheduler::isStop() const return stop.load(std::memory_order_relaxed); } -bool SegmentReadTaskScheduler::schedule() +std::tuple SegmentReadTaskScheduler::scheduleOneRound() { - Stopwatch sw_sche_all; - std::lock_guard lock(mtx); - Stopwatch sw_do_sche_all; - static constexpr size_t max_sche_count = 8; - auto pool_count = read_pools.size(); - auto sche_count = std::min(pool_count, max_sche_count); - bool run_sche = false; - size_t count = 0; - while (count < sche_count) + UInt64 erased_pool_count = 0; + UInt64 sched_null_count = 0; + UInt64 sched_succ_count = 0; + for (auto itr = read_pools.begin(); itr != read_pools.end(); /**/) { - count++; - Stopwatch sw_sche_once; - MergedTaskPtr merged_task; - std::tie(merged_task, run_sche) = scheduleMergedTask(); - if (merged_task != nullptr) + auto & pool = itr->second; + // No other component or thread hold this `pool`, we can release it. + // TODO: `weak_ptr` may be more suitable. + if (pool.use_count() == 1) { - auto elapsed_ms = sw_sche_once.elapsedMilliseconds(); - if (elapsed_ms >= 5) - { - LOG_DEBUG( - log, - "scheduleMergedTask segment_id={} pool_ids={} cost={}ms pool_count={}", - merged_task->getSegmentId(), - merged_task->getPoolIds(), - elapsed_ms, - pool_count); - } - SegmentReaderPoolManager::instance().addTask(std::move(merged_task)); + LOG_INFO(pool->getLogger(), "Erase pool_id={}", pool->pool_id); + ++erased_pool_count; + itr = read_pools.erase(itr); + continue; + } + ++itr; + + if (!needSchedule(pool)) + { + ++sched_null_count; + continue; } - if (!run_sche) + + auto merged_task = scheduleMergedTask(pool); + if (merged_task == nullptr) { - break; + ++sched_null_count; + continue; } + ++sched_succ_count; + SegmentReaderPoolManager::instance().addTask(std::move(merged_task)); + } + return std::make_tuple(erased_pool_count, sched_null_count, sched_succ_count); +} + +bool SegmentReadTaskScheduler::schedule() +{ + Stopwatch sw_sched_total; + std::lock_guard lock(mtx); + Stopwatch sw_do_sched; + + auto pool_count = read_pools.size(); + UInt64 erased_pool_count = 0; + UInt64 sched_null_count = 0; + UInt64 sched_succ_count = 0; + UInt64 sched_round = 0; + bool can_sched_more_tasks = false; + do + { + ++sched_round; + auto [erase, null, succ] = scheduleOneRound(); + erased_pool_count += erase; + sched_null_count += null; + sched_succ_count += succ; + + can_sched_more_tasks = succ > 0 && !read_pools.empty(); + // If no thread is waitting to add tasks and there are some tasks to be scheduled, run scheduling again. + // Avoid releasing and acquiring `mtx` repeatly. + // This is common when query concurrency is low, but individual queries are heavy. + } while (add_waittings.load(std::memory_order_relaxed) <= 0 && can_sched_more_tasks); + + if (read_pools.empty()) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment(); } - auto sche_all_elapsed_ms = sw_sche_all.elapsedMilliseconds(); - if (sche_all_elapsed_ms >= 100) + + auto total_ms = sw_sched_total.elapsedMilliseconds(); + if (total_ms >= 100) { - LOG_DEBUG( + LOG_INFO( log, - "schedule pool_count={} count={} cost={}ms do_sche_cost={}ms", + "schedule sched_round={} pool_count={} erased_pool_count={} sched_null_count={} sched_succ_count={} " + "cost={}ms do_sched_cost={}ms", + sched_round, pool_count, - count, - sche_all_elapsed_ms, - sw_do_sche_all.elapsedMilliseconds()); + erased_pool_count, + sched_null_count, + sched_succ_count, + total_ms, + sw_do_sched.elapsedMilliseconds()); } - return run_sche; + return can_sched_more_tasks; } void SegmentReadTaskScheduler::schedLoop() diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h index 4f874762f0a..9e5a617425d 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -13,23 +13,19 @@ // limitations under the License. #pragma once -#include #include #include -#include namespace DB::DM { -using SegmentReadTaskPoolList = CircularScanList; -// SegmentReadTaskScheduler is a global singleton. -// All SegmentReadTaskPool will be added to it and be scheduled by it. - -// 1. DeltaMergeStore::read/readRaw will call SegmentReadTaskScheduler::add to add a SegmentReadTaskPool object to the `read_pools` list and -// index segments information into `merging_segments`. +// `SegmentReadTaskScheduler` is a global singleton. All `SegmentReadTaskPool` will be added to it and be scheduled by it. +// 1. `UnorderedInputStream`/`UnorderedSourceOps` will call `SegmentReadTaskScheduler::add` to add a `SegmentReadTaskPool` +// object to the `read_pools` list and index segments information into `merging_segments`. // 2. A schedule-thread will scheduling read tasks: -// a. It scans the read_pools list and choosing a SegmentReadTaskPool. -// b. Chooses a segment of the SegmentReadTaskPool and build a MergedTask. +// a. It scans the `read_pools` list and check if `SegmentReadTaskPool` need be scheduled. +// b. Chooses a `SegmentReadTask` of the `SegmentReadTaskPool`, if other `SegmentReadTaskPool` will read the same +// `SegmentReadTask`, pop them, and build a `MergedTask`. // c. Sends the MergedTask to read threads(SegmentReader). class SegmentReadTaskScheduler { @@ -48,12 +44,16 @@ class SegmentReadTaskScheduler void pushMergedTask(const MergedTaskPtr & p) { merged_task_pool.push(p); } +#ifndef DBMS_PUBLIC_GTEST private: - SegmentReadTaskScheduler(); +#else +public: +#endif + // `run_sched_thread` is used for test. + explicit SegmentReadTaskScheduler(bool run_sched_thread = true); // Choose segment to read. - // Returns - std::pair scheduleMergedTask(); + MergedTaskPtr scheduleMergedTask(SegmentReadTaskPoolPtr & pool); void setStop(); bool isStop() const; @@ -65,11 +65,20 @@ class SegmentReadTaskScheduler std::optional>> scheduleSegmentUnlock( const SegmentReadTaskPoolPtr & pool); SegmentReadTaskPoolPtr scheduleSegmentReadTaskPoolUnlock(); + bool needSchedule(const SegmentReadTaskPoolPtr & pool); + + // `scheduleOneRound()` traverses all pools in `read_pools`, try to schedule `SegmentReadTask` of each pool. + // It returns summary information for a round of scheduling: + // `erased_pool_count` - how many stale pools have beed erased. + // `sched_null_count` - how many pools do not require scheduling. + // `sched_succ_count` - how many pools is scheduled. + std::tuple scheduleOneRound() EXCLUSIVE_LOCKS_REQUIRED(mtx); // To restrict the instantaneous concurrency of `add` and avoid `schedule` from always failing to acquire the lock. std::mutex add_mtx; std::mutex mtx; - SegmentReadTaskPoolList read_pools; + // pool_id -> pool + std::unordered_map read_pools; // table_id -> {seg_id -> pool_ids, seg_id -> pool_ids, ...} std::unordered_map>> merging_segments; @@ -79,5 +88,8 @@ class SegmentReadTaskScheduler std::thread sched_thread; LoggerPtr log; + + // To count how many threads are waitting to add tasks. + std::atomic add_waittings{0}; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index b5268a750f4..722522bcf30 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -228,7 +228,13 @@ class SegmentReadTaskPool : private boost::noncopyable bool isRUExhausted(); + const LoggerPtr & getLogger() const { return log; } + +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif Int64 getFreeActiveSegmentsUnlock() const; bool exceptionHappened() const; void finishSegment(const SegmentPtr & seg); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp deleted file mode 100644 index fa48bfd1eeb..00000000000 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#include -#include - -#include - -namespace DB::DM::tests -{ -class Node -{ -public: - explicit Node(uint64_t id_) - : pool_id(id_) - , physical_table_id(1) - , v(true) - {} - - bool valid() const { return v; } - void setInvalid() { v = false; } - -public: - uint64_t pool_id; - int64_t physical_table_id; - -private: - bool v; -}; - -TEST(CircularScanListTest, Normal) -{ - CircularScanList lst; - - { - ASSERT_EQ(lst.next(), nullptr); - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 0); - ASSERT_EQ(lst.get(1), nullptr); - } - - std::unordered_map> nodes; - for (uint64_t i = 0; i < 10; i++) - { - auto p = std::make_shared(i); - lst.add(p); - nodes.emplace(i, p); - } - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 10); - ASSERT_EQ(invalid, 0); - } - - for (uint64_t i = 0; i < 20; i++) - { - auto sp = lst.next(); - ASSERT_EQ(sp->pool_id, i % 10); - } - - lst.get(1)->setInvalid(); - lst.get(3)->setInvalid(); - lst.get(5)->setInvalid(); - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 7); - ASSERT_EQ(invalid, 3); - } - - // Invalid node still can be scanned if it is holded by other components. - for (uint64_t i = 0; i < 20; i++) - { - auto sp = lst.next(); - ASSERT_EQ(sp->pool_id, i % 10); - } - - nodes.erase(1); - nodes.erase(3); - nodes.erase(5); - - const std::vector valid_ids = {0, 2, 4, 6, 7, 8, 9}; - for (uint64_t i = 0; i < 20; i++) - { - auto sp = lst.next(); - ASSERT_EQ(sp->pool_id, valid_ids[i % valid_ids.size()]); - } - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 7); - ASSERT_EQ(invalid, 0); - } - - for (uint64_t id : valid_ids) - { - lst.get(id)->setInvalid(); - nodes.erase(id); - } - - { - auto [valid, invalid] = lst.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 7); - } - - ASSERT_EQ(lst.next(), nullptr); -} - -TEST(CircularScanListTest, Valid) -{ - CircularScanList l; - auto p1 = std::make_shared(1); - l.add(p1); - - ASSERT_EQ(l.next()->pool_id, 1); - ASSERT_EQ(l.next()->pool_id, 1); - - l.next()->setInvalid(); - p1.reset(); - - ASSERT_EQ(l.next(), nullptr); - ASSERT_EQ(l.next(), nullptr); - auto p2 = std::make_shared(2); - l.add(p2); - - ASSERT_EQ(l.next()->pool_id, 2); - ASSERT_EQ(l.next()->pool_id, 2); -} - -TEST(CircularScanListTest, ScheduleInvalid) -{ - CircularScanList l; - - // Add tasks. - auto n1 = std::make_shared(1); - l.add(n1); - auto n2 = std::make_shared(2); - l.add(n2); - auto n3 = std::make_shared(3); - l.add(n3); - - // Some tasks hold the shared_ptr. - //auto n1 = l.next(); - //auto n2 = l.next(); - //auto n3 = l.next(); - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 3); - ASSERT_EQ(invalid, 0); - } - // Make task invalid. - n1->setInvalid(); - n2->setInvalid(); - n3->setInvalid(); - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 3); - } - - { - // Tasks can be scheduled. - auto n1_1 = l.next(); - ASSERT_NE(n1_1, nullptr); - ASSERT_EQ(n1_1->pool_id, 1); - ASSERT_FALSE(n1_1->valid()); - - auto n2_1 = l.next(); - ASSERT_NE(n2_1, nullptr); - ASSERT_EQ(n2_1->pool_id, 2); - ASSERT_FALSE(n2_1->valid()); - - auto n3_1 = l.next(); - ASSERT_NE(n3_1, nullptr); - ASSERT_EQ(n3_1->pool_id, 3); - ASSERT_FALSE(n3_1->valid()); - } - - // Reset tasks - { - n1.reset(); - n2.reset(); - n3.reset(); - } - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 3); - } - - // Tasks no need to be scheduled since no task hold the shared_ptr. - { - auto n1_1 = l.next(); - ASSERT_EQ(n1_1, nullptr); - - auto n2_1 = l.next(); - ASSERT_EQ(n2_1, nullptr); - - auto n3_1 = l.next(); - ASSERT_EQ(n3_1, nullptr); - } - - { - auto [valid, invalid] = l.count(0); - ASSERT_EQ(valid, 0); - ASSERT_EQ(invalid, 0); - } -} - -} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp index 958cf0210a8..5811f2d2bb7 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp @@ -12,37 +12,91 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include +#include +#include #include #include - namespace DB::DM::tests { -SegmentPtr createSegment(PageIdU64 seg_id) +class SegmentReadTasksPoolTest : public SegmentTestBasic { - return std::make_shared(Logger::get(), 0, RowKeyRange{}, seg_id, seg_id + 1, nullptr, nullptr); -} +protected: + static SegmentPtr createSegment(PageIdU64 seg_id) + { + return std::make_shared(Logger::get(), 0, RowKeyRange{}, seg_id, seg_id + 1, nullptr, nullptr); + } -SegmentReadTaskPtr createSegmentReadTask(PageIdU64 seg_id) -{ - return std::make_shared(createSegment(seg_id), nullptr, RowKeyRanges{}); -} + static SegmentSnapshotPtr createSegmentSnapshot() + { + auto delta_snap = std::make_shared(CurrentMetrics::Metric{}); + delta_snap->delta = std::make_shared(nullptr); + return std::make_shared(std::move(delta_snap), /*stable*/ nullptr, Logger::get()); + } -SegmentReadTasks createSegmentReadTasks(const std::vector & seg_ids) -{ - SegmentReadTasks tasks; - for (PageIdU64 seg_id : seg_ids) + SegmentReadTaskPtr createSegmentReadTask(PageIdU64 seg_id) { - tasks.push_back(createSegmentReadTask(seg_id)); + return std::make_shared( + createSegment(seg_id), + createSegmentSnapshot(), + RowKeyRanges{}); + } + + static Block createBlock() + { + String type_name = "Int64"; + DataTypePtr types[2]; + types[0] = DataTypeFactory::instance().get(type_name); + types[1] = makeNullable(types[0]); + ColumnsWithTypeAndName columns; + for (auto & type : types) + { + auto column = type->createColumn(); + for (size_t i = 0; i < 10; i++) + column->insertDefault(); + columns.emplace_back(std::move(column), type); + } + return Block{columns}; + } + + SegmentReadTasks createSegmentReadTasks(const std::vector & seg_ids) + { + SegmentReadTasks tasks; + for (PageIdU64 seg_id : seg_ids) + { + tasks.push_back(createSegmentReadTask(seg_id)); + } + return tasks; + } + + SegmentReadTaskPoolPtr createSegmentReadTaskPool(const std::vector & seg_ids) + { + DMContextPtr dm_context{createDMContext()}; + return std::make_shared( + dm_context->physical_table_id, + /*extra_table_id_index_*/ dm_context->physical_table_id, + dm_context, + /*columns_to_read_*/ ColumnDefines{}, + /*filter_*/ nullptr, + /*max_version_*/ 0, + /*expected_block_size_*/ DEFAULT_BLOCK_SIZE, + /*read_mode_*/ ReadMode::Bitmap, + createSegmentReadTasks(seg_ids), + /*after_segment_read_*/ [&](const DMContextPtr &, const SegmentPtr &) { /*do nothing*/ }, + /*tracing_id_*/ String{}, + /*enable_read_thread_*/ true, + /*num_streams_*/ 1, + /*res_group_name_*/ String{}); } - return tasks; -} -static const std::vector test_seg_ids{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; + inline static const std::vector test_seg_ids{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; +}; -TEST(SegmentReadTasksWrapperTest, Unordered) +TEST_F(SegmentReadTasksPoolTest, UnorderedWrapper) { SegmentReadTasksWrapper tasks_wrapper(true, createSegmentReadTasks(test_seg_ids)); @@ -68,6 +122,7 @@ TEST(SegmentReadTasksWrapperTest, Unordered) for (PageIdU64 seg_id : v) { auto task = tasks_wrapper.getTask(seg_id); + ASSERT_NE(task, nullptr); ASSERT_EQ(task->segment->segmentId(), seg_id); task = tasks_wrapper.getTask(seg_id); ASSERT_EQ(task, nullptr); @@ -75,7 +130,7 @@ TEST(SegmentReadTasksWrapperTest, Unordered) ASSERT_TRUE(tasks_wrapper.empty()); } -TEST(SegmentReadTasksWrapperTest, Ordered) +TEST_F(SegmentReadTasksPoolTest, OrderedWrapper) { SegmentReadTasksWrapper tasks_wrapper(false, createSegmentReadTasks(test_seg_ids)); @@ -101,4 +156,89 @@ TEST(SegmentReadTasksWrapperTest, Ordered) ASSERT_EQ(tasks_wrapper.nextTask(), nullptr); } +TEST_F(SegmentReadTasksPoolTest, SchedulerBasic) +{ + SegmentReadTaskScheduler scheduler{false}; + + { + // Create and add pool. + auto pool = createSegmentReadTaskPool(test_seg_ids); + pool->increaseUnorderedInputStreamRefCount(); + scheduler.add(pool); + + // Schedule segment to reach limitation. + auto active_segment_limits = pool->getFreeActiveSegments(); + ASSERT_GT(active_segment_limits, 0); + std::vector merged_tasks; + for (int i = 0; i < active_segment_limits; ++i) + { + std::lock_guard lock(scheduler.mtx); + auto merged_task = scheduler.scheduleMergedTask(pool); + ASSERT_NE(merged_task, nullptr); + merged_tasks.push_back(merged_task); + } + { + std::lock_guard lock(scheduler.mtx); + ASSERT_EQ(scheduler.scheduleMergedTask(pool), nullptr); + } + + // Make a segment finished. + { + ASSERT_FALSE(scheduler.needScheduleToRead(pool)); + auto merged_task = merged_tasks.back(); + ASSERT_EQ(merged_task->units.size(), 1); + pool->finishSegment(merged_task->units.front().task->segment); + ASSERT_TRUE(scheduler.needScheduleToRead(pool)); + } + + // Push block to reach limitation. + { + auto free_slot_limits = pool->getFreeBlockSlots(); + ASSERT_GT(free_slot_limits, 0); + for (int i = 0; i < free_slot_limits; ++i) + { + pool->pushBlock(createBlock()); + } + ASSERT_EQ(pool->getFreeBlockSlots(), 0); + ASSERT_FALSE(scheduler.needScheduleToRead(pool)); + + Block blk; + pool->popBlock(blk); + ASSERT_TRUE(blk); + ASSERT_EQ(pool->getFreeBlockSlots(), 1); + ASSERT_TRUE(scheduler.needScheduleToRead(pool)); + + while (pool->tryPopBlock(blk)) {} + } + + // Finish + { + while (!merged_tasks.empty()) + { + auto merged_task = merged_tasks.back(); + merged_tasks.pop_back(); + pool->finishSegment(merged_task->units.front().task->segment); + } + + for (;;) + { + std::lock_guard lock(scheduler.mtx); + auto merged_task = scheduler.scheduleMergedTask(pool); + if (merged_task == nullptr) + { + break; + } + pool->finishSegment(merged_task->units.front().task->segment); + } + + ASSERT_EQ(pool->q.size(), 0); + Block blk; + ASSERT_FALSE(pool->q.pop(blk)); + + pool->decreaseUnorderedInputStreamRefCount(); + ASSERT_FALSE(pool->valid()); + } + } +} + } // namespace DB::DM::tests \ No newline at end of file