Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Refactor disaggregated read flow #7530

Merged
merged 35 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5ec3412
wip
breezewish May 23, 2023
ef5077e
Merge remote-tracking branch 'origin/master' into wenxuan/parallel-is
breezewish May 23, 2023
acf80ba
wip2
breezewish May 23, 2023
0bb57ab
Fix
breezewish May 23, 2023
fef2ae7
Fix again
breezewish May 23, 2023
bf09bd4
Remove some logs
breezewish May 23, 2023
a2d81ea
Remove unused comments
breezewish May 23, 2023
d0fb6b7
Fix worker finish early
breezewish May 23, 2023
97e0bd6
Don't count DMFileReader ctor, because it is now called in Workers
breezewish May 23, 2023
0898aae
More timing info
breezewish May 23, 2023
b82df30
Some debug info
breezewish May 24, 2023
42ae3d3
Improve cancel
breezewish May 25, 2023
b6b7e99
Populate last_exception
breezewish May 25, 2023
2f0c324
Update dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.cpp
breezewish May 25, 2023
fd97e25
Add some unit tests
breezewish May 25, 2023
da19b92
Merge branch 'wenxuan/parallel-is' of github.com:breezewish/tiflash i…
breezewish May 25, 2023
c604fd8
Use latest client-c
breezewish May 25, 2023
c432d42
Fix comment
breezewish May 25, 2023
818d630
Merge remote-tracking branch 'origin/master' into wenxuan/parallel-is
breezewish May 25, 2023
0a9e5fa
Fix merge issue
breezewish May 25, 2023
387612e
Fix more
breezewish May 25, 2023
ad3faef
Add / update a couple of metrics
breezewish May 25, 2023
89f1ce2
Merge branch 'master' into wenxuan/parallel-is
breezewish May 25, 2023
013dd76
Fix unit test
breezewish May 26, 2023
c852e1d
Fix a bug and add more tests
breezewish May 27, 2023
cb54f96
Address comments
breezewish May 27, 2023
35f3fc5
Address comments
breezewish May 27, 2023
ae72da3
Fix keyspace ID
breezewish May 28, 2023
6ccd6c1
Merge branch 'master' into wenxuan/parallel-is
breezewish May 28, 2023
3397bc0
Apply suggestions from code review
breezewish May 31, 2023
9213983
Add failpoints
JaySon-Huang Jun 1, 2023
65542bd
Fix a restart bug
JaySon-Huang Jun 1, 2023
4813649
Add warnings when delta_rate==0.0
JaySon-Huang Jun 1, 2023
1ff1d13
Merge remote-tracking branch 'upstream/master' into wenxuan/parallel-is
JaySon-Huang Jun 1, 2023
bbe92d8
Merge branch 'master' into wenxuan/parallel-is
ti-chi-bot[bot] Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ namespace DB
M(pause_before_full_gc_prepare) \
M(force_owner_mgr_state) \
M(exception_during_spill) \
M(force_fail_to_create_etcd_session)
M(force_fail_to_create_etcd_session) \
M(force_remote_read_for_batch_cop_once)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
Expand Down Expand Up @@ -101,7 +102,8 @@ namespace DB
M(force_set_mocked_s3_object_mtime) \
M(force_stop_background_checkpoint_upload) \
M(skip_seek_before_read_dmfile) \
M(exception_after_large_write_exceed)
M(exception_after_large_write_exceed) \
M(exception_when_fetch_disagg_pages)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
223 changes: 223 additions & 0 deletions dbms/src/Common/ThreadedWorker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Common/MPMCQueue.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadManager.h>
#include <common/logger_useful.h>

#include <ext/scope_guard.h>
#include <magic_enum.hpp>
#include <mutex>

namespace DB
{

/// A ThreadedWorker spawns n (n=concurrency) threads to process tasks.
/// Each thread takes a task from the source queue, do some work, and put the result into the
/// result queue.
/// ThreadedWorker manages the state of its result queue, for example, when all tasks are finished
/// or when there are errors, the result queue will be finished or cancelled.
template <typename Src, typename Dest>
class ThreadedWorker
{
public:
/// WARNING: As Base class destructors always run AFTER the derived class destructors,
/// derived workers must implement its own destructors, like:
///
/// ```c++
/// ~MyWorker() override { wait(); }
/// ```
///
/// Otherwise, the `doWork()` may be still running and accessing derived class
/// members, while derived class is already destructed.
virtual ~ThreadedWorker() = default;

void startInBackground() noexcept
{
std::call_once(start_flag, [this] {
LOG_DEBUG(log, "Starting {} workers, concurrency={}", getName(), concurrency);
active_workers = concurrency;

watch_start.restart();
for (size_t index = 0; index < concurrency; ++index)
{
thread_manager->schedule(true, getName(), [this, index] {
total_wait_schedule_ms += watch_start.elapsedMilliseconds();

workerLoop(index);
handleWorkerFinished();
});
}
});
}

void wait() noexcept
{
std::call_once(wait_flag, [this] {
try
{
// thread_manager->wait can be only called once.
thread_manager->wait();
}
catch (...)
{
// This should not occur, as we should have caught all exceptions in workerLoop.
auto error = getCurrentExceptionMessage(false);
LOG_WARNING(log, "{} meet unexepcted error: {}", getName(), error);
}
});
}

ThreadedWorker(
std::shared_ptr<MPMCQueue<Src>> source_queue_,
std::shared_ptr<MPMCQueue<Dest>> result_queue_,
LoggerPtr log_,
size_t concurrency_)
: concurrency(concurrency_)
, source_queue(source_queue_)
, result_queue(result_queue_)
, log(log_)
, thread_manager(newThreadManager())
{
RUNTIME_CHECK(concurrency > 0, concurrency);
}

public:
const size_t concurrency;
const std::shared_ptr<MPMCQueue<Src>> source_queue;
const std::shared_ptr<MPMCQueue<Dest>> result_queue;

protected:
const LoggerPtr log;
std::shared_ptr<ThreadManager> thread_manager;
std::atomic<Int64> active_workers = 0;

virtual String getName() const noexcept = 0;

virtual Dest doWork(const Src & task) = 0;

private:
void handleWorkerFinished() noexcept
{
active_workers--;
if (active_workers == 0)
{
std::call_once(finish_flag, [this] {
LOG_DEBUG(
log,
"{} workers finished, total_processed_tasks={} concurrency={} elapsed={:.3f}s total_wait_schedule={:.3f}s total_wait_upstream={:.3f}s total_wait_downstream={:.3f}s",
getName(),
total_processed_tasks,
concurrency,
watch_start.elapsedSeconds(),
total_wait_schedule_ms / 1000.0,
total_wait_upstream_ms / 1000.0,
total_wait_downstream_ms / 1000.0);
// Note: the result queue may be already cancelled, but it is fine.
result_queue->finish();
});
}
}

void workerLoop(size_t thread_idx) noexcept
{
try
{
while (true)
{
Src task;

Stopwatch w{CLOCK_MONOTONIC_COARSE};
auto pop_result = source_queue->pop(task);
total_wait_upstream_ms += w.elapsedMilliseconds();

if (pop_result != MPMCQueueResult::OK)
{
if (pop_result == MPMCQueueResult::FINISHED)
{
// No more work to do, just exit.
// The FINISH signal will be passed to downstreams when
// all workers are exited.
break;
}
else if (pop_result == MPMCQueueResult::CANCELLED)
{
// There are errors from upstream, populate the error to downstreams
// immediately.
auto cancel_reason = source_queue->getCancelReason();
LOG_WARNING(log, "{}#{} meeting error from upstream: {}", getName(), thread_idx, cancel_reason);
result_queue->cancelWith(cancel_reason);
break;
}
else
{
RUNTIME_CHECK_MSG(false, "Unexpected pop MPMCQueueResult: {}", magic_enum::enum_name(pop_result));
}
}

auto work_result = doWork(task);

w.restart();
auto push_result = result_queue->push(work_result);
total_wait_downstream_ms += w.elapsedMilliseconds();

if (push_result != MPMCQueueResult::OK)
{
if (push_result == MPMCQueueResult::CANCELLED)
{
// There are two possible cases:
// Case A: The upstream is cancelled and one of the worker cancelled the downstream.
// Case B: There is something wrong with the downstream
// In case B, we need to populate the error to upstream, so that the whole
// pipeline is cancelled.
auto cancel_reason = source_queue->getCancelReason();
LOG_WARNING(log, "{}#{} meeting error from downstream: {}", getName(), thread_idx, cancel_reason);
source_queue->cancelWith(cancel_reason);
break;
}
else
{
RUNTIME_CHECK_MSG(false, "Unexpected push MPMCQueueResult: {}", magic_enum::enum_name(push_result));
}
}

total_processed_tasks++;
}
}
catch (...)
{
auto error = getCurrentExceptionMessage(false);
LOG_ERROR(log, "{}#{} meet error: {}", getName(), thread_idx, error);
auto cancel_reason = fmt::format("{} failed: {}", getName(), error);
result_queue->cancelWith(cancel_reason);
source_queue->cancelWith(cancel_reason);
}
}

private:
Stopwatch watch_start{CLOCK_MONOTONIC_COARSE};
std::once_flag start_flag;
std::once_flag wait_flag;
std::once_flag finish_flag;
std::atomic<Int64> total_processed_tasks = 0;
std::atomic<Int64> total_wait_schedule_ms = 0;
std::atomic<Int64> total_wait_upstream_ms = 0;
std::atomic<Int64> total_wait_downstream_ms = 0;
};

} // namespace DB
8 changes: 5 additions & 3 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,12 @@ namespace DB
F(type_total_establish_backoff, {{"type", "total_establish_backoff"}}, ExpBuckets{0.01, 2, 20}), \
F(type_resolve_lock, {{"type", "resolve_lock"}}, ExpBuckets{0.01, 2, 20}), \
F(type_rpc_fetch_page, {{"type", "rpc_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \
F(type_write_page_cache, {{"type", "write_page_cache"}}, ExpBuckets{0.01, 2, 20}), \
F(type_cache_occupy, {{"type", "cache_occupy"}}, ExpBuckets{0.01, 2, 20}), \
F(type_build_read_task, {{"type", "build_read_task"}}, ExpBuckets{0.01, 2, 20}), \
F(type_seg_next_task, {{"type", "seg_next_task"}}, ExpBuckets{0.01, 2, 20}), \
F(type_seg_build_stream, {{"type", "seg_build_stream"}}, ExpBuckets{0.01, 2, 20})) \
F(type_worker_fetch_page, {{"type", "worker_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \
F(type_worker_prepare_stream, {{"type", "worker_prepare_stream"}}, ExpBuckets{0.01, 2, 20}), \
F(type_stream_wait_next_task, {{"type", "stream_wait_next_task"}}, ExpBuckets{0.01, 2, 20}), \
F(type_stream_read, {{"type", "stream_read"}}, ExpBuckets{0.01, 2, 20})) \
M(tiflash_disaggregated_details, "", Counter, \
F(type_cftiny_read, {{"type", "cftiny_read"}}), \
F(type_cftiny_fetch, {{"type", "cftiny_fetch"}})) \
Expand Down
Loading