Skip to content

Commit

Permalink
storage: Support pipeline model for disaggregated S3 read (#7552)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
breezewish authored Jun 28, 2023
1 parent 874c2c4 commit 2402b48
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 45 deletions.
6 changes: 1 addition & 5 deletions dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,12 @@ QueryExecutorPtr queryExecute(Context & context, bool internal)
RUNTIME_CHECK_MSG(
TaskScheduler::instance,
"The task scheduler of the pipeline model has not been initialized, which is an exception. It is necessary to restart the TiFlash node.");
RUNTIME_CHECK_MSG(
context.getSharedContextDisagg()->notDisaggregatedMode() || !S3::ClientFactory::instance().isEnabled(),
"The pipeline model does not support storage-computing separation with S3 mode, and an error is reported because the setting enforce_enable_pipeline is true.");
auto res = executeAsPipeline(context, internal);
RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting enforce_enable_pipeline is true.");
return std::move(*res);
}
if (context.getSettingsRef().enable_planner
&& context.getSettingsRef().enable_pipeline
&& (context.getSharedContextDisagg()->notDisaggregatedMode() || !S3::ClientFactory::instance().isEnabled()))
&& context.getSettingsRef().enable_pipeline)
{
if (auto res = executeAsPipeline(context, internal); res)
return std::move(*res);
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNSegmentInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Common/MPMCQueue.h>
#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
Expand Down
139 changes: 139 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNSegmentSourceOp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Storages/DeltaMerge/Remote/RNReadTask.h>
#include <Storages/DeltaMerge/Remote/RNSegmentSourceOp.h>
#include <Storages/DeltaMerge/Remote/RNWorkers.h>

#include <magic_enum.hpp>

namespace DB::DM::Remote
{

void RNSegmentSourceOp::operateSuffixImpl()
{
LOG_INFO(
log,
"Finished reading remote segments, rows={} read_segments={} total_wait_ready_task={:.3f}s total_read={:.3f}s",
action.totalRows(),
processed_seg_tasks,
duration_wait_ready_task_sec,
duration_read_sec);

// This metric is per-stream.
GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_stream_wait_next_task).Observe(duration_wait_ready_task_sec);
// This metric is per-stream.
GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_stream_read).Observe(duration_read_sec);
}

void RNSegmentSourceOp::operatePrefixImpl()
{
workers->startInBackground();
}

OperatorStatus RNSegmentSourceOp::startGettingNextReadyTask()
{
// Start timing the time of get next ready task.
wait_stop_watch.start();
// A quick try to get the next task to reduce the overhead of switching to WaitReactor.
return awaitImpl();
}

OperatorStatus RNSegmentSourceOp::readImpl(Block & block)
{
if unlikely (done)
{
block = {};
return OperatorStatus::HAS_OUTPUT;
}

if (t_block.has_value())
{
std::swap(block, t_block.value());
action.transform(block, current_seg_task->meta.physical_table_id);
t_block.reset();
return OperatorStatus::HAS_OUTPUT;
}

return current_seg_task ? OperatorStatus::IO : startGettingNextReadyTask();
}

OperatorStatus RNSegmentSourceOp::awaitImpl()
{
if unlikely (done || t_block.has_value())
{
duration_wait_ready_task_sec += wait_stop_watch.elapsedSeconds();
return OperatorStatus::HAS_OUTPUT;
}

if unlikely (current_seg_task)
{
duration_wait_ready_task_sec += wait_stop_watch.elapsedSeconds();
return OperatorStatus::IO;
}

auto pop_result = workers->getReadyChannel()->tryPop(current_seg_task);
switch (pop_result)
{
case MPMCQueueResult::OK:
processed_seg_tasks += 1;
RUNTIME_CHECK(current_seg_task != nullptr);
duration_wait_ready_task_sec += wait_stop_watch.elapsedSeconds();
return OperatorStatus::IO;
case MPMCQueueResult::EMPTY:
return OperatorStatus::WAITING;
case MPMCQueueResult::FINISHED:
current_seg_task = nullptr;
done = true;
duration_wait_ready_task_sec += wait_stop_watch.elapsedSeconds();
return OperatorStatus::HAS_OUTPUT;
case MPMCQueueResult::CANCELLED:
current_seg_task = nullptr;
done = true;
duration_wait_ready_task_sec += wait_stop_watch.elapsedSeconds();
throw Exception(workers->getReadyChannel()->getCancelReason());
default:
current_seg_task = nullptr;
done = true;
duration_wait_ready_task_sec += wait_stop_watch.elapsedSeconds();
throw Exception(fmt::format("Unexpected pop result {}", magic_enum::enum_name(pop_result)));
}
}

OperatorStatus RNSegmentSourceOp::executeIOImpl()
{
if unlikely (done || t_block.has_value())
return OperatorStatus::HAS_OUTPUT;

if unlikely (!current_seg_task)
return startGettingNextReadyTask();

FilterPtr filter_ignored = nullptr;
Stopwatch w{CLOCK_MONOTONIC_COARSE};
Block res = current_seg_task->getInputStream()->read(filter_ignored, false);
duration_read_sec += w.elapsedSeconds();
if likely (res)
{
t_block.emplace(std::move(res));
return OperatorStatus::HAS_OUTPUT;
}
else
{
// Current stream is drained, try to get next ready task.
current_seg_task = nullptr;
return startGettingNextReadyTask();
}
}
} // namespace DB::DM::Remote
91 changes: 91 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNSegmentSourceOp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/Remote/RNReadTask_fwd.h>
#include <Storages/DeltaMerge/Remote/RNWorkers_fwd.h>

namespace DB::DM::Remote
{

class RNSegmentSourceOp : public SourceOp
{
static constexpr auto NAME = "RNSegment";

public:
struct Options
{
std::string_view debug_tag;
PipelineExecutorStatus & exec_status;
const RNWorkersPtr & workers;
const ColumnDefines & columns_to_read;
int extra_table_id_index;
};

explicit RNSegmentSourceOp(const Options & options)
: SourceOp(options.exec_status, String(options.debug_tag))
, workers(options.workers)
, action(options.columns_to_read, options.extra_table_id_index)
{
setHeader(action.getHeader());
}

static SourceOpPtr create(const Options & options)
{
return std::make_unique<RNSegmentSourceOp>(options);
}

String getName() const override { return NAME; }

IOProfileInfoPtr getIOProfileInfo() const override { return IOProfileInfo::createForLocal(profile_info_ptr); }

protected:
void operateSuffixImpl() override;

void operatePrefixImpl() override;

OperatorStatus readImpl(Block & block) override;

OperatorStatus awaitImpl() override;

OperatorStatus executeIOImpl() override;

private:
OperatorStatus startGettingNextReadyTask();

private:
const RNWorkersPtr workers;
AddExtraTableIDColumnTransformAction action;

// Temporarily store the block read from current_seg_task->stream and pass it to downstream operators in readImpl.
std::optional<Block> t_block = std::nullopt;

RNReadSegmentTaskPtr current_seg_task = nullptr;
bool done = false;

// Count the number of segment tasks obtained.
size_t processed_seg_tasks = 0;

// Count the time spent waiting for segment tasks to be ready.
double duration_wait_ready_task_sec = 0;
Stopwatch wait_stop_watch{CLOCK_MONOTONIC_COARSE};

// Count the time consumed by reading blocks in the stream of segment tasks.
double duration_read_sec = 0;
};

} // namespace DB::DM::Remote
37 changes: 31 additions & 6 deletions dbms/src/Storages/StorageDisaggregated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/RequestUtils.h>
#include <Interpreters/Context.h>
#include <Operators/ExchangeReceiverSourceOp.h>
#include <Operators/ExpressionTransformOp.h>
#include <Storages/S3/S3Common.h>
#include <Storages/StorageDisaggregated.h>
#include <Storages/Transaction/TMTContext.h>
Expand All @@ -45,7 +46,7 @@ StorageDisaggregated::StorageDisaggregated(

BlockInputStreams StorageDisaggregated::read(
const Names &,
const SelectQueryInfo & query_info,
const SelectQueryInfo & /*query_info*/,
const Context & db_context,
QueryProcessingStage::Enum &,
size_t,
Expand All @@ -54,7 +55,7 @@ BlockInputStreams StorageDisaggregated::read(
/// S3 config is enabled on the TiFlash compute node, let's read data from S3.
bool remote_data_read = S3::ClientFactory::instance().isEnabled();
if (remote_data_read)
return readThroughS3(db_context, query_info, num_streams);
return readThroughS3(db_context, num_streams);

/// Fetch all data from write node through MPP exchange sender/receiver
return readThroughExchange(num_streams);
Expand All @@ -65,12 +66,13 @@ void StorageDisaggregated::read(
PipelineExecGroupBuilder & group_builder,
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
const Context & db_context,
size_t /*max_block_size*/,
unsigned num_streams)
{
// TODO support S3
RUNTIME_CHECK(!S3::ClientFactory::instance().isEnabled());
bool remote_data_read = S3::ClientFactory::instance().isEnabled();
if (remote_data_read)
return readThroughS3(exec_status, group_builder, db_context, num_streams);

/// Fetch all data from write node through MPP exchange sender/receiver
readThroughExchange(exec_status, group_builder, num_streams);
Expand Down Expand Up @@ -388,7 +390,7 @@ void StorageDisaggregated::filterConditions(
}
}

void StorageDisaggregated::extraCast(DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline)
ExpressionActionsPtr StorageDisaggregated::getExtraCastExpr(DAGExpressionAnalyzer & analyzer)
{
// If the column is not in the columns of pushed down filter, append a cast to the column.
std::vector<UInt8> may_need_add_cast_column;
Expand All @@ -407,11 +409,34 @@ void StorageDisaggregated::extraCast(DAGExpressionAnalyzer & analyzer, DAGPipeli
ExpressionActionsPtr extra_cast = chain.getLastActions();
chain.finalize();
chain.clear();
return extra_cast;
}
else
{
return nullptr;
}
}

void StorageDisaggregated::extraCast(DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline)
{
if (auto extra_cast = getExtraCastExpr(analyzer); extra_cast)
{
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, extra_cast, log->identifier());
stream->setExtraInfo("cast after local tableScan");
}
}
}

void StorageDisaggregated::extraCast(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder, DAGExpressionAnalyzer & analyzer)
{
if (auto extra_cast = getExtraCastExpr(analyzer); extra_cast)
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(exec_status, log->identifier(), extra_cast));
});
}
}

} // namespace DB
Loading

0 comments on commit 2402b48

Please sign in to comment.