Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Refactor disaggregated read flow #7530

Merged
merged 35 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5ec3412
wip
breezewish May 23, 2023
ef5077e
Merge remote-tracking branch 'origin/master' into wenxuan/parallel-is
breezewish May 23, 2023
acf80ba
wip2
breezewish May 23, 2023
0bb57ab
Fix
breezewish May 23, 2023
fef2ae7
Fix again
breezewish May 23, 2023
bf09bd4
Remove some logs
breezewish May 23, 2023
a2d81ea
Remove unused comments
breezewish May 23, 2023
d0fb6b7
Fix worker finish early
breezewish May 23, 2023
97e0bd6
Don't count DMFileReader ctor, because it is now called in Workers
breezewish May 23, 2023
0898aae
More timing info
breezewish May 23, 2023
b82df30
Some debug info
breezewish May 24, 2023
42ae3d3
Improve cancel
breezewish May 25, 2023
b6b7e99
Populate last_exception
breezewish May 25, 2023
2f0c324
Update dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.cpp
breezewish May 25, 2023
fd97e25
Add some unit tests
breezewish May 25, 2023
da19b92
Merge branch 'wenxuan/parallel-is' of github.com:breezewish/tiflash i…
breezewish May 25, 2023
c604fd8
Use latest client-c
breezewish May 25, 2023
c432d42
Fix comment
breezewish May 25, 2023
818d630
Merge remote-tracking branch 'origin/master' into wenxuan/parallel-is
breezewish May 25, 2023
0a9e5fa
Fix merge issue
breezewish May 25, 2023
387612e
Fix more
breezewish May 25, 2023
ad3faef
Add / update a couple of metrics
breezewish May 25, 2023
89f1ce2
Merge branch 'master' into wenxuan/parallel-is
breezewish May 25, 2023
013dd76
Fix unit test
breezewish May 26, 2023
c852e1d
Fix a bug and add more tests
breezewish May 27, 2023
cb54f96
Address comments
breezewish May 27, 2023
35f3fc5
Address comments
breezewish May 27, 2023
ae72da3
Fix keyspace ID
breezewish May 28, 2023
6ccd6c1
Merge branch 'master' into wenxuan/parallel-is
breezewish May 28, 2023
3397bc0
Apply suggestions from code review
breezewish May 31, 2023
9213983
Add failpoints
JaySon-Huang Jun 1, 2023
65542bd
Fix a restart bug
JaySon-Huang Jun 1, 2023
4813649
Add warnings when delta_rate==0.0
JaySon-Huang Jun 1, 2023
1ff1d13
Merge remote-tracking branch 'upstream/master' into wenxuan/parallel-is
JaySon-Huang Jun 1, 2023
bbe92d8
Merge branch 'master' into wenxuan/parallel-is
ti-chi-bot[bot] Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/client-c
204 changes: 204 additions & 0 deletions dbms/src/Common/ThreadedWorker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Common/MPMCQueue.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadManager.h>
#include <common/logger_useful.h>

#include <ext/scope_guard.h>
#include <magic_enum.hpp>
#include <mutex>

namespace DB
{

/// A ThreadedWorker spawns n (n=concurrency) threads to process tasks.
/// Each thread takes a task from the source queue, do some work, and put the result into the
/// result queue.
/// ThreadedWorker manages the state of its result queue, for example, when all tasks are finished
/// or when there are errors, the result queue will be finished or cancelled.
template <typename Src, typename Dest>
class ThreadedWorker
{
public:
virtual ~ThreadedWorker()
{
wait();
}

void startInBackground() noexcept
{
std::call_once(start_flag, [this] {
watch_start.restart();

LOG_DEBUG(log, "Starting {} workers, concurrency={}", getName(), concurrency);
active_workers = concurrency;

for (size_t index = 0; index < concurrency; ++index)
{
thread_manager->schedule(true, getName(), [this, index] {
total_wait_schedule_ms += watch_start.elapsedMilliseconds();

workerLoop(index);
handleWorkerFinished();
});
}
});
}

void wait() noexcept
{
std::call_once(wait_flag, [this] {
try
{
// thread_manager->wait can be only called once.
thread_manager->wait();
}
catch (...)
{
// This should not occur, as we should have caught all exceptions in workerLoop.
auto error = getCurrentExceptionMessage(false);
LOG_WARNING(log, "{} meet unexepcted error: {}", getName(), error);
}
});
}

public:
const size_t concurrency;
const std::shared_ptr<MPMCQueue<Src>> source_queue;
const std::shared_ptr<MPMCQueue<Dest>> result_queue;

protected:
const LoggerPtr log;
std::shared_ptr<ThreadManager> thread_manager;
std::atomic<Int64> active_workers = 0;

ThreadedWorker(
std::shared_ptr<MPMCQueue<Src>> source_queue_,
std::shared_ptr<MPMCQueue<Dest>> result_queue_,
LoggerPtr log_,
size_t concurrency_)
: concurrency(concurrency_)
, source_queue(source_queue_)
, result_queue(result_queue_)
, log(log_)
, thread_manager(newThreadManager())
{
RUNTIME_CHECK(concurrency > 0);
}

virtual String getName() const noexcept = 0;

virtual Dest doWork(const Src & task) = 0;

private:
void handleWorkerFinished()
{
active_workers--;
if (active_workers == 0)
{
std::call_once(finish_flag, [this] {
LOG_DEBUG(
log,
"{} workers finished, total_processed_tasks={} concurrency={} elapsed={:.3f}s total_wait_schedule={:.3f}s total_wait_upstream={:.3f}s",
getName(),
total_processed_tasks,
concurrency,
watch_start.elapsedSeconds(),
total_wait_schedule_ms / 1000.0,
total_wait_upstream_ms / 1000.0);
// Note: the result queue may be already cancelled, but it is fine.
result_queue->finish();
});
}
}

void workerLoop(size_t thread_idx)
breezewish marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
while (true)
{
Src task;

Stopwatch w;
auto pop_result = source_queue->pop(task);
total_wait_upstream_ms += w.elapsedMilliseconds();

if (pop_result != MPMCQueueResult::OK)
{
if (pop_result == MPMCQueueResult::FINISHED)
{
// No more work to do, just exit.
// The FINISH signal will be passed to downstreams when
// all workers are exited.
break;
}
else if (pop_result == MPMCQueueResult::CANCELLED)
{
// There are errors, populate the error to downstreams
// immediately.
auto cancel_reason = source_queue->getCancelReason();
LOG_WARNING(log, "{}#{} meeting error from upstream: {}", getName(), thread_idx, cancel_reason);
result_queue->cancelWith(cancel_reason);
break;
}
else
{
RUNTIME_CHECK_MSG(false, "Unexpected pop MPMCQueueResult: {}", magic_enum::enum_name(pop_result));
}
}

auto work_result = doWork(task);
auto push_result = result_queue->push(work_result);
if (push_result != MPMCQueueResult::OK)
{
if (push_result == MPMCQueueResult::CANCELLED)
{
// The result_queue has been closed by other workers, so we
// no longer process any tasks from the source.
break;
}
else
{
RUNTIME_CHECK_MSG(false, "Unexpected push MPMCQueueResult: {}", magic_enum::enum_name(push_result));
}
}

total_processed_tasks++;
}
}
catch (...)
{
auto error = getCurrentExceptionMessage(false);
LOG_ERROR(log, "{}#{} meet error: {}", getName(), thread_idx, error);
result_queue->cancelWith(fmt::format("{} failed: {}", getName(), error));
}
}

private:
Stopwatch watch_start;
std::once_flag start_flag;
std::once_flag wait_flag;
std::once_flag finish_flag;
std::atomic<Int64> total_processed_tasks = 0;
std::atomic<Int64> total_wait_schedule_ms = 0;
std::atomic<Int64> total_wait_upstream_ms = 0;
};

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ namespace DB
F(type_cache_occupy, {{"type", "cache_occupy"}}, ExpBuckets{0.01, 2, 20}), \
F(type_build_read_task, {{"type", "build_read_task"}}, ExpBuckets{0.01, 2, 20}), \
F(type_seg_next_task, {{"type", "seg_next_task"}}, ExpBuckets{0.01, 2, 20}), \
F(type_seg_build_stream, {{"type", "seg_build_stream"}}, ExpBuckets{0.01, 2, 20})) \
F(type_seg_build_stream, {{"type", "seg_build_stream"}}, ExpBuckets{0.01, 2, 20}), \
F(type_seg_read_prefix, {{"type", "seg_read_prefix"}}, ExpBuckets{0.01, 2, 20})) \
M(tiflash_disaggregated_details, "", Counter, \
F(type_cftiny_read, {{"type", "cftiny_read"}}), \
F(type_cftiny_fetch, {{"type", "cftiny_fetch"}})) \
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/DataStreams/AddExtraTableIDColumnInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ namespace DB
AddExtraTableIDColumnInputStream::AddExtraTableIDColumnInputStream(
BlockInputStreamPtr input,
int extra_table_id_index,
TableID physical_table_id)
: action(input->getHeader(), extra_table_id_index, physical_table_id)
TableID physical_table_id_)
: physical_table_id(physical_table_id_)
, action(input->getHeader(), extra_table_id_index)
{
children.push_back(input);
}
Expand All @@ -32,7 +33,7 @@ Block AddExtraTableIDColumnInputStream::readImpl()
if (!res)
return res;

auto ok = action.transform(res);
auto ok = action.transform(res, physical_table_id);
if (!ok)
return {};

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/AddExtraTableIDColumnInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class AddExtraTableIDColumnInputStream : public IProfilingBlockInputStream
Block readImpl() override;

private:
const TableID physical_table_id;
AddExtraTableIDColumnTransformAction action;
};

Expand Down
10 changes: 3 additions & 7 deletions dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,17 @@ Block AddExtraTableIDColumnTransformAction::buildHeader(

AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction(
const Block & inner_header_,
int extra_table_id_index_,
TableID physical_table_id_)
int extra_table_id_index_)
: header(buildHeader(inner_header_, extra_table_id_index_))
, extra_table_id_index(extra_table_id_index_)
, physical_table_id(physical_table_id_)
{
}

AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index_,
TableID physical_table_id_)
int extra_table_id_index_)
: header(buildHeader(columns_to_read_, extra_table_id_index_))
, extra_table_id_index(extra_table_id_index_)
, physical_table_id(physical_table_id_)
{
}

Expand All @@ -70,7 +66,7 @@ Block AddExtraTableIDColumnTransformAction::getHeader() const
return header;
}

bool AddExtraTableIDColumnTransformAction::transform(Block & block)
bool AddExtraTableIDColumnTransformAction::transform(Block & block, TableID physical_table_id)
{
if (unlikely(!block))
return true;
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ struct AddExtraTableIDColumnTransformAction

AddExtraTableIDColumnTransformAction(
const Block & inner_header_,
int extra_table_id_index_,
TableID physical_table_id_);
int extra_table_id_index_);

AddExtraTableIDColumnTransformAction(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index_,
TableID physical_table_id_);
int extra_table_id_index_);

bool transform(Block & block);
bool transform(Block & block, TableID physical_table_id);

Block getHeader() const;

Expand All @@ -48,12 +46,10 @@ struct AddExtraTableIDColumnTransformAction
return total_rows;
}


private:
Block header;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;
const TableID physical_table_id;

size_t total_rows = 0;
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef
return genNamesAndTypes(table_scan.getColumns(), column_prefix);
}

std::tuple<DM::ColumnDefinesPtr, size_t> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan)
std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan)
{
auto column_defines = std::make_shared<DM::ColumnDefines>();
size_t extra_table_id_index = InvalidColumnID;
int extra_table_id_index = InvalidColumnID;
column_defines->reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ using ColumnDefinesPtr = std::shared_ptr<std::vector<ColumnDefine>>;
} // namespace DM

// The column defines and `extra table id index`
std::tuple<DM::ColumnDefinesPtr, size_t>
std::tuple<DM::ColumnDefinesPtr, int>
genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan);

} // namespace DB
Loading