Skip to content

Commit

Permalink
Support disaggregated tiflash mode (#6248)
Browse files Browse the repository at this point in the history
close #6441
  • Loading branch information
guo-shaoge authored Dec 8, 2022
1 parent f248fac commit d8c369c
Show file tree
Hide file tree
Showing 42 changed files with 1,094 additions and 132 deletions.
57 changes: 57 additions & 0 deletions dbms/src/Core/TiFlashDisaggregatedMode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Core/TiFlashDisaggregatedMode.h>

namespace DB
{
DisaggregatedMode getDisaggregatedMode(const Poco::Util::LayeredConfiguration & config)
{
static const std::string config_key = "flash.disaggregated_mode";
DisaggregatedMode mode = DisaggregatedMode::None;
if (config.has(config_key))
{
std::string mode_str = config.getString(config_key);
RUNTIME_ASSERT(mode_str == DISAGGREGATED_MODE_STORAGE || mode_str == DISAGGREGATED_MODE_COMPUTE,
"Expect disaggregated_mode is {} or {}, got: {}",
DISAGGREGATED_MODE_STORAGE,
DISAGGREGATED_MODE_COMPUTE,
mode_str);
if (mode_str == DISAGGREGATED_MODE_COMPUTE)
{
mode = DisaggregatedMode::Compute;
}
else
{
mode = DisaggregatedMode::Storage;
}
}
return mode;
}

std::string getProxyLabelByDisaggregatedMode(DisaggregatedMode mode)
{
switch (mode)
{
case DisaggregatedMode::Compute:
return DISAGGREGATED_MODE_COMPUTE_PROXY_LABEL;
case DisaggregatedMode::Storage:
case DisaggregatedMode::None:
return DEF_PROXY_LABEL;
default:
__builtin_unreachable();
};
}
} // namespace DB
37 changes: 37 additions & 0 deletions dbms/src/Core/TiFlashDisaggregatedMode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Poco/Util/LayeredConfiguration.h>

#include <string>

#define DEF_PROXY_LABEL "tiflash"
#define DISAGGREGATED_MODE_COMPUTE_PROXY_LABEL DISAGGREGATED_MODE_COMPUTE
#define DISAGGREGATED_MODE_STORAGE "tiflash_storage"
#define DISAGGREGATED_MODE_COMPUTE "tiflash_compute"

namespace DB
{
enum class DisaggregatedMode
{
None,
Compute,
Storage,
};

DisaggregatedMode getDisaggregatedMode(const Poco::Util::LayeredConfiguration & config);
std::string getProxyLabelByDisaggregatedMode(DisaggregatedMode mode);
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,4 @@ bool runAndCompareDagReq(const coprocessor::Request & req, const coprocessor::Re
}
return unequal_flag;
}
} // namespace DB
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ grpc::Status BatchCoprocessorHandler::execute()

try
{
RUNTIME_CHECK_MSG(!cop_context.db_context.isDisaggregatedComputeMode(), "cannot run cop or batchCop request on tiflash_compute node");

switch (cop_request->tp())
{
case COP_REQ_TYPE_DAG:
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class CoprocessorReader
CoprocessorReader(
const DAGSchema & schema_,
pingcap::kv::Cluster * cluster,
std::vector<pingcap::coprocessor::copTask> tasks,
std::vector<pingcap::coprocessor::CopTask> tasks,
bool has_enforce_encode_type_,
int concurrency)
: schema(schema_)
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,15 @@ class DAGContext
}
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
std::vector<CoprocessorReaderPtr> & getCoprocessorReaders();
void setDisaggregatedComputeExchangeReceiver(const String & executor_id, const ExchangeReceiverPtr & receiver)
{
disaggregated_compute_exchange_receiver = std::make_pair(executor_id, receiver);
}
std::optional<std::pair<String, ExchangeReceiverPtr>> getDisaggregatedComputeExchangeReceiver()
{
return disaggregated_compute_exchange_receiver;
}


void addSubquery(const String & subquery_id, SubqueryForSet && subquery);
bool hasSubquery() const { return !subqueries.empty(); }
Expand Down Expand Up @@ -395,6 +404,9 @@ class DAGContext
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;
// In disaggregated tiflash mode, table_scan in tiflash_compute node will be converted ExchangeReceiver.
// Record here so we can add to receiver_set and cancel/close it.
std::optional<std::pair<String, ExchangeReceiverPtr>> disaggregated_compute_exchange_receiver;
};

} // namespace DB
21 changes: 16 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/StorageDisaggregatedInterpreter.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Mpp/newMPPExchangeWriter.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Join.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/Transaction/TMTContext.h>

namespace DB
{
Expand Down Expand Up @@ -98,8 +100,8 @@ AnalysisResult analyzeExpressions(
{
AnalysisResult res;
ExpressionActionsChain chain;
// selection on table scan had been executed in handleTableScan
// In test mode, filter is not pushed down to table scan
// selection on table scan had been executed in handleTableScan.
// In test mode, filter is not pushed down to table scan.
if (query_block.selection && (!query_block.isTableScanSource() || context.isTest()))
{
std::vector<const tipb::Expr *> where_conditions;
Expand Down Expand Up @@ -185,10 +187,19 @@ void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan,
{
const auto push_down_filter = PushDownFilter::pushDownFilterFrom(query_block.selection_name, query_block.selection);

DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams);
storage_interpreter.execute(pipeline);
if (context.isDisaggregatedComputeMode())
{
StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, table_scan, push_down_filter, max_streams);
disaggregated_tiflash_interpreter.execute(pipeline);
analyzer = std::move(disaggregated_tiflash_interpreter.analyzer);
}
else
{
DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams);
storage_interpreter.execute(pipeline);

analyzer = std::move(storage_interpreter.analyzer);
analyzer = std::move(storage_interpreter.analyzer);
}
}

void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query, size_t fine_grained_shuffle_count)
Expand Down
55 changes: 5 additions & 50 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
/// handle pushed down filter for local and remote table scan.
if (push_down_filter.hasValue())
{
executePushedDownFilter(remote_read_streams_start_index, pipeline);
::DB::executePushedDownFilter(remote_read_streams_start_index, push_down_filter, *analyzer, log, pipeline);
recordProfileStreams(pipeline, push_down_filter.executor_id);
}
}
Expand Down Expand Up @@ -419,51 +419,6 @@ void DAGStorageInterpreter::prepare()
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGStorageInterpreter::buildPushDownFilter()
{
assert(push_down_filter.hasValue());

ExpressionActionsChain chain;
analyzer->initChain(chain, analyzer->getCurrentInputColumns());
String filter_column_name = analyzer->appendWhere(chain, push_down_filter.conditions);
ExpressionActionsPtr before_where = chain.getLastActions();
chain.addStep();

// remove useless tmp column and keep the schema of local streams and remote streams the same.
NamesWithAliases project_cols;
for (const auto & col : analyzer->getCurrentInputColumns())
{
chain.getLastStep().required_output.push_back(col.name);
project_cols.emplace_back(col.name, col.name);
}
chain.getLastActions()->add(ExpressionAction::project(project_cols));
ExpressionActionsPtr project_after_where = chain.getLastActions();
chain.finalize();
chain.clear();

return {before_where, filter_column_name, project_after_where};
}

void DAGStorageInterpreter::executePushedDownFilter(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where] = buildPushDownFilter();

assert(pipeline.streams_with_non_joined_data.empty());
assert(remote_read_streams_start_index <= pipeline.streams.size());
// for remote read, filter had been pushed down, don't need to execute again.
for (size_t i = 0; i < remote_read_streams_start_index; ++i)
{
auto & stream = pipeline.streams[i];
stream = std::make_shared<FilterBlockInputStream>(stream, before_where, filter_column_name, log->identifier());
stream->setExtraInfo("push down filter");
// after filter, do project action to keep the schema of local streams and remote streams the same.
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_after_where, log->identifier());
stream->setExtraInfo("projection after push down filter");
}
}

void DAGStorageInterpreter::executeCastAfterTableScan(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline)
Expand Down Expand Up @@ -492,7 +447,7 @@ void DAGStorageInterpreter::executeCastAfterTableScan(
}
}

std::vector<pingcap::coprocessor::copTask> DAGStorageInterpreter::buildCopTasks(const std::vector<RemoteRequest> & remote_requests)
std::vector<pingcap::coprocessor::CopTask> DAGStorageInterpreter::buildCopTasks(const std::vector<RemoteRequest> & remote_requests)
{
assert(!remote_requests.empty());
#ifndef NDEBUG
Expand All @@ -514,7 +469,7 @@ std::vector<pingcap::coprocessor::copTask> DAGStorageInterpreter::buildCopTasks(
}
#endif
pingcap::kv::Cluster * cluster = tmt.getKVCluster();
std::vector<pingcap::coprocessor::copTask> all_tasks;
std::vector<pingcap::coprocessor::CopTask> all_tasks;
for (const auto & remote_request : remote_requests)
{
pingcap::coprocessor::RequestPtr req = std::make_shared<pingcap::coprocessor::Request>();
Expand All @@ -539,7 +494,7 @@ std::vector<pingcap::coprocessor::copTask> DAGStorageInterpreter::buildCopTasks(

void DAGStorageInterpreter::buildRemoteStreams(const std::vector<RemoteRequest> & remote_requests, DAGPipeline & pipeline)
{
std::vector<pingcap::coprocessor::copTask> all_tasks = buildCopTasks(remote_requests);
std::vector<pingcap::coprocessor::CopTask> all_tasks = buildCopTasks(remote_requests);

const DAGSchema & schema = remote_requests[0].schema;
pingcap::kv::Cluster * cluster = tmt.getKVCluster();
Expand All @@ -554,7 +509,7 @@ void DAGStorageInterpreter::buildRemoteStreams(const std::vector<RemoteRequest>
task_end++;
if (task_end == task_start)
continue;
std::vector<pingcap::coprocessor::copTask> tasks(all_tasks.begin() + task_start, all_tasks.begin() + task_end);
std::vector<pingcap::coprocessor::CopTask> tasks(all_tasks.begin() + task_start, all_tasks.begin() + task_end);

auto coprocessor_reader = std::make_shared<CoprocessorReader>(schema, cluster, tasks, has_enforce_encode_type, 1);
context.getDAGContext()->addCoprocessorReader(coprocessor_reader);
Expand Down
8 changes: 1 addition & 7 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,13 @@ class DAGStorageInterpreter

void recordProfileStreams(DAGPipeline & pipeline, const String & key);

std::vector<pingcap::coprocessor::copTask> buildCopTasks(const std::vector<RemoteRequest> & remote_requests);
std::vector<pingcap::coprocessor::CopTask> buildCopTasks(const std::vector<RemoteRequest> & remote_requests);
void buildRemoteStreams(const std::vector<RemoteRequest> & remote_requests, DAGPipeline & pipeline);

void executeCastAfterTableScan(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);

// before_where, filter_column_name, after_where
std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter();
void executePushedDownFilter(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);

void prepare();

void executeImpl(DAGPipeline & pipeline);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ struct FineGrainedShuffle
const UInt64 stream_count;
const UInt64 batch_size;
};
} // namespace DB
} // namespace DB
22 changes: 22 additions & 0 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ DataTypePtr getPkType(const ColumnInfo & column_info)
}
} // namespace

NamesAndTypes genNamesAndTypesForTableScan(const TiDBTableScan & table_scan)
{
return genNamesAndTypes(table_scan, "table_scan");
}

NamesAndTypes genNamesAndTypesForExchangeReceiver(const TiDBTableScan & table_scan)
{
NamesAndTypes names_and_types;
names_and_types.reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
const auto & column_info = table_scan.getColumns()[i];
names_and_types.emplace_back(genNameForExchangeReceiver(i), getDataTypeByColumnInfoForComputingLayer(column_info));
}
return names_and_types;
}

String genNameForExchangeReceiver(Int32 col_index)
{
return "exchange_receiver_" + std::to_string(col_index);
}

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
{
NamesAndTypes names_and_types;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

namespace DB
{
NamesAndTypes genNamesAndTypesForExchangeReceiver(const TiDBTableScan & table_scan);
NamesAndTypes genNamesAndTypesForTableScan(const TiDBTableScan & table_scan);
String genNameForExchangeReceiver(Int32 col_index);

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix);
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema);
Expand Down
Loading

0 comments on commit d8c369c

Please sign in to comment.