Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Refine SegmentReadTaskPool and SegmentReadTaskScheduler #8237

Merged
merged 8 commits into from
Oct 28, 2023
36 changes: 17 additions & 19 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,11 @@ BlockInputStreams DeltaMergeStore::readRaw(
if (unlikely(!segment_snap))
throw Exception("Failed to get segment snap", ErrorCodes::LOGICAL_ERROR);

tasks.push_back(
std::make_shared<SegmentReadTask>(segment, segment_snap, RowKeyRanges{segment->getRowKeyRange()}));
tasks.push_back(std::make_shared<SegmentReadTask>(
segment,
segment_snap,
dm_context,
RowKeyRanges{segment->getRowKeyRange()}));
}
}
}
Expand All @@ -948,9 +951,7 @@ BlockInputStreams DeltaMergeStore::readRaw(
if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask())
req_info = db_context.getDAGContext()->getMPPTaskId().toString();
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
extra_table_id_index,
dm_context,
columns_to_read,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
Expand Down Expand Up @@ -1025,8 +1026,11 @@ void DeltaMergeStore::readRaw(
if (unlikely(!segment_snap))
throw Exception("Failed to get segment snap", ErrorCodes::LOGICAL_ERROR);

tasks.push_back(
std::make_shared<SegmentReadTask>(segment, segment_snap, RowKeyRanges{segment->getRowKeyRange()}));
tasks.push_back(std::make_shared<SegmentReadTask>(
segment,
segment_snap,
dm_context,
RowKeyRanges{segment->getRowKeyRange()}));
}
}
}
Expand All @@ -1050,9 +1054,7 @@ void DeltaMergeStore::readRaw(
if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask())
req_info = db_context.getDAGContext()->getMPPTaskId().toString();
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
extra_table_id_index,
dm_context,
columns_to_read,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
Expand Down Expand Up @@ -1158,7 +1160,7 @@ BlockInputStreams DeltaMergeStore::read(
// 'try_split_task' can result in several read tasks with the same id that can cause some trouble.
// Also, too many read tasks of a segment with different small ranges is not good for data sharing cache.
SegmentReadTasks tasks = getReadTasksByRanges(
*dm_context,
dm_context,
sorted_ranges,
num_streams,
read_segments,
Expand All @@ -1184,9 +1186,7 @@ BlockInputStreams DeltaMergeStore::read(
size_t final_num_stream = std::max(1, std::min(num_streams, tasks.size()));
auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter);
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
extra_table_id_index,
dm_context,
columns_to_read,
filter,
max_version,
Expand Down Expand Up @@ -1263,7 +1263,7 @@ void DeltaMergeStore::read(
// 'try_split_task' can result in several read tasks with the same id that can cause some trouble.
// Also, too many read tasks of a segment with different small ranges is not good for data sharing cache.
SegmentReadTasks tasks = getReadTasksByRanges(
*dm_context,
dm_context,
sorted_ranges,
num_streams,
read_segments,
Expand All @@ -1287,9 +1287,7 @@ void DeltaMergeStore::read(
= enable_read_thread ? std::max(1, num_streams) : std::max(1, std::min(num_streams, tasks.size()));
auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter);
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
extra_table_id_index,
dm_context,
columns_to_read,
filter,
max_version,
Expand Down Expand Up @@ -1362,7 +1360,7 @@ Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemote
// `try_split_task` is false because we need to ensure only one segment task
// for one segment.
SegmentReadTasks tasks
= getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments, /* try_split_task */ false);
= getReadTasksByRanges(dm_context, sorted_ranges, num_streams, read_segments, /* try_split_task */ false);
GET_METRIC(tiflash_disaggregated_read_tasks_count).Increment(tasks.size());
LOG_DEBUG(tracing_logger, "Read create segment snapshot done");

Expand Down Expand Up @@ -1967,7 +1965,7 @@ void DeltaMergeStore::restoreStableFiles() const
}

SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
DMContext & dm_context,
const DMContextPtr & dm_context,
const RowKeyRanges & sorted_ranges,
size_t expected_tasks_count,
const SegmentIdSet & read_segments,
Expand Down Expand Up @@ -2001,9 +1999,9 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
if (tasks.empty() || tasks.back()->segment != seg_it->second)
{
auto segment = seg_it->second;
auto segment_snap = segment->createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead);
auto segment_snap = segment->createSnapshot(*dm_context, false, CurrentMetrics::DT_SnapshotOfRead);
RUNTIME_CHECK_MSG(segment_snap, "Failed to get segment snap");
tasks.push_back(std::make_shared<SegmentReadTask>(segment, segment_snap));
tasks.push_back(std::make_shared<SegmentReadTask>(segment, segment_snap, dm_context));
}

tasks.back()->addRange(req_range);
Expand Down Expand Up @@ -2045,7 +2043,7 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
total_ranges += task->ranges.size();
}

auto tracing_logger = log->getChild(getLogTracingId(dm_context));
auto tracing_logger = log->getChild(getLogTracingId(*dm_context));
LOG_INFO(
tracing_logger,
"Segment read tasks build done, cost={}ms sorted_ranges={} n_tasks_before_split={} n_tasks_final={} "
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ class DeltaMergeStore : private boost::noncopyable
void removeLocalStableFilesIfDisagg() const;

SegmentReadTasks getReadTasksByRanges(
DMContext & dm_context,
const DMContextPtr & dm_context,
const RowKeyRanges & sorted_ranges,
size_t expected_tasks_count = 1,
const SegmentIdSet & read_segments = {},
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ int MergedTask::readOneBlock()
continue;
}

if (pool->readOneBlock(stream, task->segment))
if (pool->readOneBlock(stream, task))
{
read_block_count++;
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class MergedTask
public:
static int64_t getPassiveMergedSegments() { return passive_merged_segments.load(std::memory_order_relaxed); }

MergedTask(uint64_t seg_id_, std::vector<MergedUnit> && units_)
MergedTask(const GlobalSegmentID & seg_id_, std::vector<MergedUnit> && units_)
: seg_id(seg_id_)
, units(std::move(units_))
, inited(false)
Expand All @@ -62,7 +62,7 @@ class MergedTask

bool allStreamsFinished() const { return finished_count >= units.size(); }

uint64_t getSegmentId() const { return seg_id; }
const GlobalSegmentID & getSegmentId() const { return seg_id; }

size_t getPoolCount() const { return units.size(); }

Expand Down Expand Up @@ -125,7 +125,7 @@ class MergedTask
setStreamFinished(i);
}
}
uint64_t seg_id;
GlobalSegmentID seg_id;
std::vector<MergedUnit> units;
bool inited;
int cur_idx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,15 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
read_pools.add(pool);

const auto & tasks = pool->getTasks();
for (const auto & pa : tasks)
for (const auto & [seg_id, task] : tasks)
{
auto seg_id = pa.first;
merging_segments[pool->physical_table_id][seg_id].push_back(pool->pool_id);
merging_segments[seg_id].push_back(pool->pool_id);
}
auto block_slots = pool->getFreeBlockSlots();
LOG_DEBUG(
log,
"Added, pool_id={} table_id={} block_slots={} segment_count={} pool_count={} cost={}ns do_add_cost={}ns", //
"Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={}ns do_add_cost={}ns", //
pool->pool_id,
pool->physical_table_id,
block_slots,
tasks.size(),
read_pools.size(),
Expand Down Expand Up @@ -152,38 +150,27 @@ SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlo
return nullptr;
}

std::optional<std::pair<uint64_t, std::vector<uint64_t>>> SegmentReadTaskScheduler::scheduleSegmentUnlock(
std::optional<std::pair<GlobalSegmentID, std::vector<UInt64>>> SegmentReadTaskScheduler::scheduleSegmentUnlock(
const SegmentReadTaskPoolPtr & pool)
{
auto expected_merge_seg_count = std::min(read_pools.size(), 2); // Not accurate.
auto itr = merging_segments.find(pool->physical_table_id);
if (itr == merging_segments.end())
{
// No segment of tableId left.
return std::nullopt;
}
std::optional<std::pair<uint64_t, std::vector<uint64_t>>> result;
auto & segments = itr->second;
auto target = pool->scheduleSegment(segments, expected_merge_seg_count);
if (target != segments.end())

std::optional<std::pair<GlobalSegmentID, std::vector<uint64_t>>> result;
auto target = pool->scheduleSegment(merging_segments, expected_merge_seg_count);
if (target != merging_segments.end())
{
if (MergedTask::getPassiveMergedSegments() < 100 || target->second.size() == 1)
{
result = *target;
segments.erase(target);
if (segments.empty())
{
merging_segments.erase(itr);
}
merging_segments.erase(target);
}
else
{
result = std::pair{target->first, std::vector<uint64_t>(1, pool->pool_id)};
auto mutable_target = segments.find(target->first);
auto itr = std::find(mutable_target->second.begin(), mutable_target->second.end(), pool->pool_id);
*itr = mutable_target->second
auto itr = std::find(target->second.begin(), target->second.end(), pool->pool_id);
*itr = target->second
.back(); // SegmentReadTaskPool::scheduleSegment ensures `pool->poolId` must exists in `target`.
mutable_target->second.resize(mutable_target->second.size() - 1);
target->second.resize(target->second.size() - 1);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
namespace DB::DM
{
using SegmentReadTaskPoolList = CircularScanList<SegmentReadTaskPool>;

// SegmentReadTaskScheduler is a global singleton.
// All SegmentReadTaskPool will be added to it and be scheduled by it.

Expand Down Expand Up @@ -62,16 +61,17 @@ class SegmentReadTaskScheduler
bool needScheduleToRead(const SegmentReadTaskPoolPtr & pool);
SegmentReadTaskPools getPoolsUnlock(const std::vector<uint64_t> & pool_ids);
// <seg_id, pool_ids>
std::optional<std::pair<uint64_t, std::vector<uint64_t>>> scheduleSegmentUnlock(
std::optional<std::pair<GlobalSegmentID, std::vector<UInt64>>> scheduleSegmentUnlock(
const SegmentReadTaskPoolPtr & pool);
SegmentReadTaskPoolPtr scheduleSegmentReadTaskPoolUnlock();

// To restrict the instantaneous concurrency of `add` and avoid `schedule` from always failing to acquire the lock.
std::mutex add_mtx;
std::mutex mtx;
SegmentReadTaskPoolList read_pools;
// table_id -> {seg_id -> pool_ids, seg_id -> pool_ids, ...}
std::unordered_map<int64_t, std::unordered_map<uint64_t, std::vector<uint64_t>>> merging_segments;

// GlobalSegmentID -> pool_ids
MergingSegments merging_segments;

MergedTaskPool merged_task_pool;

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ void SegmentReaderPoolManager::init(UInt32 logical_cpu_cores, double read_thread

void SegmentReaderPoolManager::addTask(MergedTaskPtr && task)
{
static std::hash<uint64_t> hash_func;
auto idx = hash_func(task->getSegmentId()) % reader_pools.size();
auto idx = std::hash<GlobalSegmentID>{}(task->getSegmentId()) % reader_pools.size();
reader_pools[idx]->addTask(std::move(task));
}

Expand Down
44 changes: 0 additions & 44 deletions dbms/src/Storages/DeltaMerge/Remote/RNReadTask.h

This file was deleted.

3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <Common/FailPoint.h>
#include <Common/MPMCQueue.h>
#include <Common/TiFlashMetrics.h>
#include <Storages/DeltaMerge/Remote/RNReadTask.h>
#include <Storages/DeltaMerge/Remote/RNSegmentInputStream.h>
#include <Storages/DeltaMerge/Remote/RNWorkers.h>

Expand Down Expand Up @@ -101,7 +100,7 @@ Block RNSegmentInputStream::readImpl(FilterPtr & res_filter, bool return_filter)
}
else
{
action.transform(res, current_seg_task->extra_remote_info->remote_segment_id.physical_table_id);
action.transform(res, current_seg_task->dm_context->physical_table_id);
return res;
}
}
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Remote/RNReadTask.h>
#include <Storages/DeltaMerge/Remote/RNWorkers_fwd.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/Remote/RNSegmentSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <Common/FailPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Storages/DeltaMerge/Remote/RNReadTask.h>
#include <Storages/DeltaMerge/Remote/RNSegmentSourceOp.h>
#include <Storages/DeltaMerge/Remote/RNWorkers.h>

Expand Down Expand Up @@ -68,7 +67,7 @@ OperatorStatus RNSegmentSourceOp::readImpl(Block & block)
if (t_block.has_value())
{
std::swap(block, t_block.value());
action.transform(block, current_seg_task->extra_remote_info->remote_segment_id.physical_table_id);
action.transform(block, current_seg_task->dm_context->physical_table_id);
t_block.reset();
return OperatorStatus::HAS_OUTPUT;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNSegmentSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/Remote/RNReadTask.h>
#include <Storages/DeltaMerge/Remote/RNWorkers_fwd.h>
#include <Storages/DeltaMerge/SegmentReadTask.h>

namespace DB::DM::Remote
{
Expand Down
Loading