Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: unify segment_id for data sharing #5800

Merged
merged 9 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,9 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read);
};
size_t final_num_stream = std::min(num_streams, tasks.size());
String req_info;
if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask())
req_info = db_context.getDAGContext()->getMPPTaskId().toString();
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
dm_context,
Expand All @@ -1248,11 +1251,9 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
/* is_raw = */ true,
/* do_delete_mark_filter_for_raw = */ false,
std::move(tasks),
after_segment_read);
after_segment_read,
req_info);

String req_info;
if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask())
req_info = db_context.getDAGContext()->getMPPTaskId().toString();
BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
{
Expand Down Expand Up @@ -1311,12 +1312,12 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order;
// SegmentReadTaskScheduler and SegmentReadTaskPool use table_id + segment id as unique ID when read thread is enabled.
// 'try_split_task' can result in several read tasks with the same id that can cause some trouble.
// Also, too many read tasks of a segment with different samll ranges is not good for data sharing cache.
// Also, too many read tasks of a segment with different small ranges is not good for data sharing cache.
SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments, /*try_split_task =*/!enable_read_thread);

auto tracing_logger = Logger::get(log->name(), dm_context->tracing_id);
LOG_FMT_DEBUG(tracing_logger,
"Read create segment snapshot done keep_order {} dt_enable_read_thread {} => enable_read_thread {}",
"Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread);
Expand All @@ -1338,7 +1339,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
/* is_raw = */ is_fast_scan,
/* do_delete_mark_filter_for_raw = */ is_fast_scan,
std::move(tasks),
after_segment_read);
after_segment_read,
tracing_id);

String req_info;
if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask())
Expand Down
37 changes: 15 additions & 22 deletions dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,39 +71,31 @@ std::vector<std::vector<int>> getLinuxNumaNodes()
if (!nodes.exists() || !nodes.isDirectory())
{
auto cpus = getCPUs(cpus_dir_name);
if (cpus.empty())
{
throw Exception("Not recognize CPU: " + cpus_dir_name);
}
RUNTIME_CHECK_MSG(!cpus.empty(), "Not recognize CPU: {}", cpus_dir_name);
numa_nodes.push_back(std::move(cpus));
return numa_nodes;
}
else

// get the cpu id from each NUMA node
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator iter(nodes); iter != end; ++iter)
{
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator iter(nodes); iter != end; ++iter)
if (!isNodeDir(iter.name()))
{
if (!isNodeDir(iter.name()))
{
continue;
}
auto dir_name = nodes_dir_name + "/" + iter.name();
auto cpus = getCPUs(dir_name);
if (cpus.empty())
{
throw Exception("Not recognize CPU: " + nodes_dir_name);
}
numa_nodes.push_back(std::move(cpus));
continue;
}
auto dir_name = nodes_dir_name + "/" + iter.name();
auto cpus = getCPUs(dir_name);
RUNTIME_CHECK_MSG(!cpus.empty(), "Not recognize CPU: {}", nodes_dir_name);
numa_nodes.push_back(std::move(cpus));
}
if (numa_nodes.empty())
{
throw Exception("Not recognize CPU");
}
RUNTIME_CHECK_MSG(!numa_nodes.empty(), "Not recognize CPU");
return numa_nodes;
}

std::vector<std::vector<int>> getNumaNodes(Poco::Logger * log)
{
#ifndef __APPLE__ // Apple macbooks does not support NUMA
try
{
return getLinuxNumaNodes();
Expand All @@ -120,6 +112,7 @@ std::vector<std::vector<int>> getNumaNodes(Poco::Logger * log)
{
LOG_FMT_WARNING(log, "Unknow Error");
}
#endif
LOG_FMT_WARNING(log, "Cannot recognize the CPU NUMA infomation, use the CPU as 'one numa node'");
std::vector<std::vector<int>> numa_nodes(1); // "One numa node"
return numa_nodes;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class ColumnSharingCacheMap
auto add_total = add_count + add_stale;
auto get_cached = get_hit + get_copy;
auto get_total = get_miss + get_part + get_hit + get_copy;
return fmt::format("add_count {} add_stale {} add_ratio {} get_miss {} get_part {} get_hit {} get_copy {} cached_ratio {}",
return fmt::format("add_count={} add_stale={} add_ratio={} get_miss={} get_part={} get_hit={} get_copy={} cached_ratio={}",
add_count,
add_stale,
add_total > 0 ? add_count * 1.0 / add_total : 0,
Expand All @@ -218,17 +218,19 @@ class DMFileReaderPool
{
public:
static DMFileReaderPool & instance();
DMFileReaderPool() = default;
~DMFileReaderPool() = default;
DISALLOW_COPY_AND_MOVE(DMFileReaderPool);

void add(DMFileReader & reader);
void del(DMFileReader & reader);
void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col);

private:
DMFileReaderPool() = default;

private:
std::mutex mtx;
std::unordered_map<std::string, std::unordered_set<DMFileReader *>> readers;
};

} // namespace DB::DM
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SegmentReadTaskScheduler::SegmentReadTaskScheduler()
: stop(false)
, log(&Poco::Logger::get("SegmentReadTaskScheduler"))
{
sched_thread = std::thread(&SegmentReadTaskScheduler::schedThread, this);
sched_thread = std::thread(&SegmentReadTaskScheduler::schedLoop, this);
}

SegmentReadTaskScheduler::~SegmentReadTaskScheduler()
Expand All @@ -43,12 +43,12 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
merging_segments[pool->tableId()][seg_id].push_back(pool->poolId());
if (!seg_ids.insert(seg_id).second)
{
throw DB::Exception(fmt::format("Not support split segment task. seg_ids {} => seg_id {} already exist.", seg_ids, seg_id));
throw DB::Exception(fmt::format("Not support split segment task. segment_ids={} => segment_id={} already exist.", seg_ids, seg_id));
}
}
auto block_slots = pool->getFreeBlockSlots();
auto [unexpired, expired] = read_pools.count(pool->tableId());
LOG_FMT_DEBUG(log, "add pool {} table {} block_slots {} segment count {} segments {} unexpired pool {} expired pool {}", //
LOG_FMT_DEBUG(log, "Added, pool_id={} table_id={} block_slots={} segment_count={} segments={} unexpired_pool={} expired_pool={}", //
pool->poolId(),
pool->tableId(),
block_slots,
Expand Down Expand Up @@ -200,13 +200,13 @@ bool SegmentReadTaskScheduler::schedule()
auto [merged_task, run_sche] = scheduleMergedTask();
if (merged_task != nullptr)
{
LOG_FMT_DEBUG(log, "scheduleMergedTask seg_id {} pools {} => {} ms", merged_task->getSegmentId(), merged_task->getPoolIds(), sw.elapsedMilliseconds());
LOG_FMT_DEBUG(log, "scheduleMergedTask segment_id={} pool_ids={} cost={}ms", merged_task->getSegmentId(), merged_task->getPoolIds(), sw.elapsedMilliseconds());
SegmentReaderPoolManager::instance().addTask(std::move(merged_task));
}
return run_sche;
}

void SegmentReadTaskScheduler::schedThread()
void SegmentReadTaskScheduler::schedLoop()
{
while (!isStop())
{
Expand All @@ -218,4 +218,4 @@ void SegmentReadTaskScheduler::schedThread()
}
}

} // namespace DB::DM
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ class SegmentReadTaskScheduler
SegmentReadTaskScheduler();

// Choose segment to read.
// Returns <MergedTaskPtr, run_next_schedule_immediatly>
// Returns <MergedTaskPtr, run_next_schedule_immediately>
std::pair<MergedTaskPtr, bool> scheduleMergedTask();

void setStop();
bool isStop() const;
bool schedule();
void schedThread();
void schedLoop();

SegmentReadTaskPools getPoolsUnlock(const std::vector<uint64_t> & pool_ids);
// <seg_id, pool_ids>
Expand All @@ -80,4 +80,4 @@ class SegmentReadTaskScheduler

Poco::Logger * log;
};
} // namespace DB::DM
} // namespace DB::DM
24 changes: 14 additions & 10 deletions dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ class SegmentReader

~SegmentReader()
{
LOG_FMT_DEBUG(log, "SegmentReader stop begin");
LOG_FMT_DEBUG(log, "Stop begin");
t.join();
LOG_FMT_DEBUG(log, "SegmentReader stop end");
LOG_FMT_DEBUG(log, "Stop end");
}

std::thread::id getId() const
Expand All @@ -70,11 +70,11 @@ class SegmentReader
if (ret != 0)
{
// It can be failed due to some CPU core cannot access, such as CPU offline.
LOG_FMT_WARNING(log, "sched_setaffinity cpus {} fail: {}", cpus, std::strerror(errno));
LOG_FMT_WARNING(log, "sched_setaffinity fail, cpus={} errno={}", cpus, std::strerror(errno));
}
else
{
LOG_FMT_DEBUG(log, "sched_setaffinity cpus {} succ", cpus);
LOG_FMT_DEBUG(log, "sched_setaffinity succ, cpus={}", cpus);
}
#endif
}
Expand All @@ -91,7 +91,7 @@ class SegmentReader
{
if (!task_queue.pop(merged_task))
{
LOG_FMT_INFO(log, "pop fail, stop {}", isStop());
LOG_FMT_INFO(log, "Pop fail, stop={}", isStop());
return;
}

Expand All @@ -114,7 +114,7 @@ class SegmentReader
}
if (read_count <= 0)
{
LOG_FMT_DEBUG(log, "pool {} seg_id {} read_count {}", merged_task->getPoolIds(), merged_task->getSegmentId(), read_count);
LOG_FMT_DEBUG(log, "All finished, pool_ids={} segment_id={} read_count={}", merged_task->getPoolIds(), merged_task->getSegmentId(), read_count);
}
}
catch (DB::Exception & e)
Expand Down Expand Up @@ -160,6 +160,8 @@ class SegmentReader
std::vector<int> cpus;
};

// ===== SegmentReaderPool ===== //

void SegmentReaderPool::addTask(MergedTaskPtr && task)
{
if (!task_queue.push(std::forward<MergedTaskPtr>(task), nullptr))
Expand All @@ -171,12 +173,12 @@ void SegmentReaderPool::addTask(MergedTaskPtr && task)
SegmentReaderPool::SegmentReaderPool(int thread_count, const std::vector<int> & cpus)
: log(&Poco::Logger::get("SegmentReaderPool"))
{
LOG_FMT_INFO(log, "Create SegmentReaderPool thread_count {} cpus {} start", thread_count, cpus);
LOG_FMT_INFO(log, "Create start, thread_count={} cpus={}", thread_count, cpus);
for (int i = 0; i < thread_count; i++)
{
readers.push_back(std::make_unique<SegmentReader>(task_queue, cpus));
}
LOG_FMT_INFO(log, "Create SegmentReaderPool thread_count {} cpus {} end", thread_count, cpus);
LOG_FMT_INFO(log, "Create end, thread_count={} cpus={}", thread_count, cpus);
}

SegmentReaderPool::~SegmentReaderPool()
Expand All @@ -198,6 +200,8 @@ std::vector<std::thread::id> SegmentReaderPool::getReaderIds() const
return ids;
}

// ===== SegmentReaderPoolManager ===== //

SegmentReaderPoolManager::SegmentReaderPoolManager()
: log(&Poco::Logger::get("SegmentReaderPoolManager"))
{}
Expand All @@ -215,7 +219,7 @@ void SegmentReaderPoolManager::init(const ServerInfo & server_info)
auto ids = reader_pools.back()->getReaderIds();
reader_ids.insert(ids.begin(), ids.end());
}
LOG_FMT_INFO(log, "readers count {}", reader_ids.size());
LOG_FMT_INFO(log, "num_readers={}", reader_ids.size());
}

void SegmentReaderPoolManager::addTask(MergedTaskPtr && task)
Expand All @@ -233,4 +237,4 @@ bool SegmentReaderPoolManager::isSegmentReader() const
{
return reader_ids.find(std::this_thread::get_id()) != reader_ids.end();
}
} // namespace DB::DM
} // namespace DB::DM
9 changes: 4 additions & 5 deletions dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
#pragma once

#include <Common/nocopyable.h>
#include <Server/ServerInfo.h>
#include <Storages/DeltaMerge/ReadThread/WorkQueue.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
Expand All @@ -31,10 +32,8 @@ class SegmentReaderPool
public:
SegmentReaderPool(int thread_count, const std::vector<int> & cpus);
~SegmentReaderPool();
SegmentReaderPool(const SegmentReaderPool &) = delete;
SegmentReaderPool & operator=(const SegmentReaderPool &) = delete;
SegmentReaderPool(SegmentReaderPool &&) = delete;
SegmentReaderPool & operator=(SegmentReaderPool &&) = delete;

DISALLOW_COPY_AND_MOVE(SegmentReaderPool);

void addTask(MergedTaskPtr && task);
std::vector<std::thread::id> getReaderIds() const;
Expand Down Expand Up @@ -74,4 +73,4 @@ class SegmentReaderPoolManager
Poco::Logger * log;
};

} // namespace DB::DM
} // namespace DB::DM
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ class UnorderedInputStream : public IProfilingBlockInputStream
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine();
auto & extra_table_id_col_define = getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value};
header.insert(extra_table_id_index, col);
}
ref_no = task_pool->increaseUnorderedInputStreamRefCount();
LOG_FMT_DEBUG(log, "pool {} ref {} created", task_pool->poolId(), ref_no);
LOG_FMT_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no);
}

~UnorderedInputStream()
{
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_FMT_DEBUG(log, "pool {} ref {} destroy", task_pool->poolId(), ref_no);
LOG_FMT_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->poolId(), ref_no);
}

String getName() const override { return NAME; }
Expand All @@ -70,7 +70,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
return readImpl(filter_ignored, false);
}

// Currently, res_fiter and return_filter is unused.
// Currently, res_filter and return_filter is unused.
Block readImpl(FilterPtr & /*res_filter*/, bool /*return_filter*/) override
{
if (done)
Expand All @@ -87,7 +87,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine();
auto & extra_table_id_col_define = getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id};
size_t row_number = res.rows();
auto col_data = col.type->createColumnConst(row_number, Field(physical_table_id));
Expand All @@ -114,7 +114,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream

void readSuffixImpl() override
{
LOG_FMT_DEBUG(log, "pool {} ref {} finish read {} rows from storage", task_pool->poolId(), ref_no, total_rows);
LOG_FMT_DEBUG(log, "Finish read from storage, pool_id={} ref_no={} rows={}", task_pool->poolId(), ref_no, total_rows);
}

private:
Expand All @@ -128,4 +128,4 @@ class UnorderedInputStream : public IProfilingBlockInputStream
int64_t ref_no;
size_t total_rows = 0;
};
} // namespace DB::DM
} // namespace DB::DM
Loading