From e64d8ccc6b120c3ed2ee1e5987b477fe8fd82a16 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Mon, 30 Jan 2023 00:57:56 +0800 Subject: [PATCH] *: rename `PushDownFilter` to `FilterConditions` (#6679) ref pingcap/tiflash#5829 --- dbms/src/Debug/MockStorage.cpp | 8 +++---- dbms/src/Debug/MockStorage.h | 4 ++-- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 8 +++---- .../Coprocessor/DAGStorageInterpreter.cpp | 16 ++++++------- .../Flash/Coprocessor/DAGStorageInterpreter.h | 6 ++--- ...ushDownFilter.cpp => FilterConditions.cpp} | 14 +++++------ .../{PushDownFilter.h => FilterConditions.h} | 16 +++++++++---- .../Flash/Coprocessor/InterpreterUtils.cpp | 10 ++++---- dbms/src/Flash/Coprocessor/InterpreterUtils.h | 6 ++--- dbms/src/Flash/Coprocessor/RemoteRequest.cpp | 4 ++-- dbms/src/Flash/Coprocessor/RemoteRequest.h | 4 ++-- .../StorageDisaggregatedInterpreter.h | 6 ++--- dbms/src/Flash/Planner/PhysicalPlan.cpp | 8 +++---- .../Planner/Plans/PhysicalMockTableScan.cpp | 18 +++++++------- .../Planner/Plans/PhysicalMockTableScan.h | 10 ++++---- .../Flash/Planner/Plans/PhysicalTableScan.cpp | 24 +++++++++---------- .../Flash/Planner/Plans/PhysicalTableScan.h | 10 ++++---- .../tests/gtest_storage_disaggregated.cpp | 4 ++-- dbms/src/Storages/StorageDisaggregated.cpp | 16 ++++++------- dbms/src/Storages/StorageDisaggregated.h | 6 ++--- 20 files changed, 102 insertions(+), 96 deletions(-) rename dbms/src/Flash/Coprocessor/{PushDownFilter.cpp => FilterConditions.cpp} (73%) rename dbms/src/Flash/Coprocessor/{PushDownFilter.h => FilterConditions.h} (64%) diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index 371344d3ad5..79be39bf0df 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -138,7 +138,7 @@ void MockStorage::addTableDataForDeltaMerge(Context & context, const String & na } } -BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter) +BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions) { assert(tableExistsForDeltaMerge(table_id)); auto storage = storage_delta_merge_map[table_id]; @@ -154,15 +154,15 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6 SelectQueryInfo query_info; query_info.query = std::make_shared(); query_info.mvcc_query_info = std::make_unique(context.getSettingsRef().resolve_locks, std::numeric_limits::max(), scan_context); - if (push_down_filter && push_down_filter->hasValue()) + if (filter_conditions && filter_conditions->hasValue()) { auto analyzer = std::make_unique(names_and_types_map_for_delta_merge[table_id], context); query_info.dag_query = std::make_unique( - push_down_filter->conditions, + filter_conditions->conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); - auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*push_down_filter, *analyzer); + auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*filter_conditions, *analyzer); BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams // TODO: set num_streams, then ins.size() != 1 BlockInputStreamPtr in = ins[0]; diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index e3436468fed..e36858472d5 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -14,7 +14,7 @@ #pragma once #include #include -#include +#include #include #include #include @@ -82,7 +82,7 @@ class MockStorage NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id); - BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter = nullptr); + BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions = nullptr); bool tableExistsForDeltaMerge(Int64 table_id); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 76fcf4471ce..ca6b7747d38 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -33,12 +33,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include #include @@ -189,17 +189,17 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline) { - const auto push_down_filter = PushDownFilter::pushDownFilterFrom(query_block.selection_name, query_block.selection); + const auto filter_conditions = FilterConditions::filterConditionsFrom(query_block.selection_name, query_block.selection); if (context.isDisaggregatedComputeMode()) { - StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, table_scan, push_down_filter, max_streams); + StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, table_scan, filter_conditions, max_streams); disaggregated_tiflash_interpreter.execute(pipeline); analyzer = std::move(disaggregated_tiflash_interpreter.analyzer); } else { - DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams); + DAGStorageInterpreter storage_interpreter(context, table_scan, filter_conditions, max_streams); storage_interpreter.execute(pipeline); analyzer = std::move(storage_interpreter.analyzer); diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 95584f02fe8..6c2cf2a2860 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -256,11 +256,11 @@ String genErrMsgForLocalRead( DAGStorageInterpreter::DAGStorageInterpreter( Context & context_, const TiDBTableScan & table_scan_, - const PushDownFilter & push_down_filter_, + const FilterConditions & filter_conditions_, size_t max_streams_) : context(context_) , table_scan(table_scan_) - , push_down_filter(push_down_filter_) + , filter_conditions(filter_conditions_) , max_streams(max_streams_) , log(Logger::get(context.getDAGContext()->log ? context.getDAGContext()->log->identifier() : "")) , logical_table_id(table_scan.getLogicalTableID()) @@ -337,11 +337,11 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) executeCastAfterTableScan(remote_read_streams_start_index, pipeline); recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); - /// handle pushed down filter for local and remote table scan. - if (push_down_filter.hasValue()) + /// handle filter conditions for local and remote table scan. + if (filter_conditions.hasValue()) { - ::DB::executePushedDownFilter(remote_read_streams_start_index, push_down_filter, *analyzer, log, pipeline); - recordProfileStreams(pipeline, push_down_filter.executor_id); + ::DB::executePushedDownFilter(remote_read_streams_start_index, filter_conditions, *analyzer, log, pipeline); + recordProfileStreams(pipeline, filter_conditions.executor_id); } } @@ -579,7 +579,7 @@ std::unordered_map DAGStorageInterpreter::generateSele /// to avoid null point exception query_info.query = dagContext().dummy_ast; query_info.dag_query = std::make_unique( - push_down_filter.conditions, + filter_conditions.conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); @@ -1006,7 +1006,7 @@ std::vector DAGStorageInterpreter::buildRemoteRequests(const DM:: *context.getDAGContext(), table_scan, storages_with_structure_lock[physical_table_id].storage->getTableInfo(), - push_down_filter, + filter_conditions, log)); } return remote_requests; diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index edb0665026f..177a5558d7f 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include #include @@ -44,7 +44,7 @@ class DAGStorageInterpreter DAGStorageInterpreter( Context & context_, const TiDBTableScan & table_scan, - const PushDownFilter & push_down_filter_, + const FilterConditions & filter_conditions_, size_t max_streams_); DISALLOW_MOVE(DAGStorageInterpreter); @@ -111,7 +111,7 @@ class DAGStorageInterpreter Context & context; const TiDBTableScan & table_scan; - const PushDownFilter & push_down_filter; + const FilterConditions & filter_conditions; const size_t max_streams; LoggerPtr log; diff --git a/dbms/src/Flash/Coprocessor/PushDownFilter.cpp b/dbms/src/Flash/Coprocessor/FilterConditions.cpp similarity index 73% rename from dbms/src/Flash/Coprocessor/PushDownFilter.cpp rename to dbms/src/Flash/Coprocessor/FilterConditions.cpp index 09691e3f015..52f9b9c7e2b 100644 --- a/dbms/src/Flash/Coprocessor/PushDownFilter.cpp +++ b/dbms/src/Flash/Coprocessor/FilterConditions.cpp @@ -13,12 +13,12 @@ // limitations under the License. #include -#include +#include #include namespace DB { -PushDownFilter::PushDownFilter( +FilterConditions::FilterConditions( const String & executor_id_, const std::vector & conditions_) : executor_id(executor_id_) @@ -27,12 +27,12 @@ PushDownFilter::PushDownFilter( if (unlikely(conditions.empty() != executor_id.empty())) { throw TiFlashException( - "for PushDownFilter, conditions and executor_id should both be empty or neither should be empty", + "for FilterConditions, conditions and executor_id should both be empty or neither should be empty", Errors::Coprocessor::BadRequest); } } -tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const +tipb::Executor * FilterConditions::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const { if (hasValue()) { @@ -49,17 +49,17 @@ tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor } } -PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor) +FilterConditions FilterConditions::filterConditionsFrom(const String & executor_id, const tipb::Executor * executor) { if (!executor || !executor->has_selection()) { return {"", {}}; } - return pushDownFilterFrom(executor_id, executor->selection()); + return filterConditionsFrom(executor_id, executor->selection()); } -PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection) +FilterConditions FilterConditions::filterConditionsFrom(const String & executor_id, const tipb::Selection & selection) { std::vector conditions; for (const auto & condition : selection.conditions()) diff --git a/dbms/src/Flash/Coprocessor/PushDownFilter.h b/dbms/src/Flash/Coprocessor/FilterConditions.h similarity index 64% rename from dbms/src/Flash/Coprocessor/PushDownFilter.h rename to dbms/src/Flash/Coprocessor/FilterConditions.h index 47183c5115b..cf2f9afcea9 100644 --- a/dbms/src/Flash/Coprocessor/PushDownFilter.h +++ b/dbms/src/Flash/Coprocessor/FilterConditions.h @@ -21,15 +21,20 @@ namespace DB { -struct PushDownFilter + +/** This struct FilterConditions is used to store the filter conditions of the selection whose child is a table scan. + * Those conditions will be used to construct rough index in storage engine. + * And those conditions will be pushed down to the remote read request. + */ +struct FilterConditions { - static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor); + static FilterConditions filterConditionsFrom(const String & executor_id, const tipb::Executor * executor); - static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection); + static FilterConditions filterConditionsFrom(const String & executor_id, const tipb::Selection & selection); - PushDownFilter() = default; + FilterConditions() = default; - PushDownFilter( + FilterConditions( const String & executor_id_, const std::vector & conditions_); @@ -40,4 +45,5 @@ struct PushDownFilter String executor_id; std::vector conditions; }; + } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 8fed1a5db1d..adfa0f08ac5 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -173,14 +173,14 @@ void executeCreatingSets( } std::tuple buildPushDownFilter( - const PushDownFilter & push_down_filter, + const FilterConditions & filter_conditions, DAGExpressionAnalyzer & analyzer) { - assert(push_down_filter.hasValue()); + assert(filter_conditions.hasValue()); ExpressionActionsChain chain; analyzer.initChain(chain); - String filter_column_name = analyzer.appendWhere(chain, push_down_filter.conditions); + String filter_column_name = analyzer.appendWhere(chain, filter_conditions.conditions); ExpressionActionsPtr before_where = chain.getLastActions(); chain.addStep(); @@ -201,12 +201,12 @@ std::tuple buildPushDownFilt void executePushedDownFilter( size_t remote_read_streams_start_index, - const PushDownFilter & push_down_filter, + const FilterConditions & filter_conditions, DAGExpressionAnalyzer & analyzer, LoggerPtr log, DAGPipeline & pipeline) { - auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(push_down_filter, analyzer); + auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions, analyzer); assert(remote_read_streams_start_index <= pipeline.streams.size()); // for remote read, filter had been pushed down, don't need to execute again. diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 56f7a72f0b5..f842654d162 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include namespace DB @@ -69,12 +69,12 @@ void executeCreatingSets( const LoggerPtr & log); std::tuple buildPushDownFilter( - const PushDownFilter & push_down_filter, + const FilterConditions & filter_conditions, DAGExpressionAnalyzer & analyzer); void executePushedDownFilter( size_t remote_read_streams_start_index, - const PushDownFilter & push_down_filter, + const FilterConditions & filter_conditions, DAGExpressionAnalyzer & analyzer, LoggerPtr log, DAGPipeline & pipeline); diff --git a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp index 0180f0d5900..f125b5b9898 100644 --- a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp +++ b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp @@ -26,14 +26,14 @@ RemoteRequest RemoteRequest::build( DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, - const PushDownFilter & push_down_filter, + const FilterConditions & filter_conditions, const LoggerPtr & log) { LOG_INFO(log, "{}", printRetryRegions(retry_regions, table_info.id)); DAGSchema schema; tipb::DAGRequest dag_req; - auto * executor = push_down_filter.constructSelectionForRemoteRead(dag_req.mutable_root_executor()); + auto * executor = filter_conditions.constructSelectionForRemoteRead(dag_req.mutable_root_executor()); { tipb::Executor * ts_exec = executor; diff --git a/dbms/src/Flash/Coprocessor/RemoteRequest.h b/dbms/src/Flash/Coprocessor/RemoteRequest.h index 7973d48b492..adb85d4c6d9 100644 --- a/dbms/src/Flash/Coprocessor/RemoteRequest.h +++ b/dbms/src/Flash/Coprocessor/RemoteRequest.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include #include @@ -51,7 +51,7 @@ struct RemoteRequest DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, - const PushDownFilter & push_down_filter, + const FilterConditions & filter_conditions, const LoggerPtr & log); static std::vector buildKeyRanges(const RegionRetryList & retry_regions); static std::string printRetryRegions(const RegionRetryList & retry_regions, TableID table_id); diff --git a/dbms/src/Flash/Coprocessor/StorageDisaggregatedInterpreter.h b/dbms/src/Flash/Coprocessor/StorageDisaggregatedInterpreter.h index 2818baeac67..9749ff256f4 100644 --- a/dbms/src/Flash/Coprocessor/StorageDisaggregatedInterpreter.h +++ b/dbms/src/Flash/Coprocessor/StorageDisaggregatedInterpreter.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include @@ -32,10 +32,10 @@ class StorageDisaggregatedInterpreter StorageDisaggregatedInterpreter( Context & context_, const TiDBTableScan & table_scan_, - const PushDownFilter & push_down_filter_, + const FilterConditions & filter_conditions_, size_t max_streams_) : context(context_) - , storage(std::make_unique(context_, table_scan_, push_down_filter_)) + , storage(std::make_unique(context_, table_scan_, filter_conditions_)) , max_streams(max_streams_) {} diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index f69a5a6d71e..8aabeef3885 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -48,14 +48,14 @@ bool pushDownSelection(Context & context, const PhysicalPlanNodePtr & plan, cons if (plan->tp() == PlanType::TableScan) { auto physical_table_scan = std::static_pointer_cast(plan); - return physical_table_scan->pushDownFilter(executor_id, selection); + return physical_table_scan->setFilterConditions(executor_id, selection); } if (unlikely(plan->tp() == PlanType::MockTableScan && context.isExecutorTest())) { auto physical_mock_table_scan = std::static_pointer_cast(plan); if (context.mockStorage()->useDeltaMerge() && context.mockStorage()->tableExistsForDeltaMerge(physical_mock_table_scan->getLogicalTableID())) { - return physical_mock_table_scan->pushDownFilter(context, executor_id, selection); + return physical_mock_table_scan->setFilterConditions(context, executor_id, selection); } } return false; @@ -71,8 +71,8 @@ void fillOrderForListBasedExecutors(DAGContext & dag_context, const PhysicalPlan if (plan->tp() == PlanType::TableScan) { auto physical_table_scan = std::static_pointer_cast(plan); - if (physical_table_scan->hasPushDownFilter()) - list_based_executors_order.push_back(physical_table_scan->getPushDownFilterId()); + if (physical_table_scan->hasFilterConditions()) + list_based_executors_order.push_back(physical_table_scan->getFilterConditionsId()); list_based_executors_order.push_back(physical_table_scan->execId()); } else diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index 8bdfe53d504..c84af203d46 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -132,29 +132,29 @@ void PhysicalMockTableScan::updateStreams(Context & context) { mock_streams.clear(); assert(context.mockStorage()->tableExistsForDeltaMerge(table_id)); - mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &push_down_filter)); + mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &filter_conditions)); } -bool PhysicalMockTableScan::pushDownFilter(Context & context, const String & filter_executor_id, const tipb::Selection & selection) +bool PhysicalMockTableScan::setFilterConditions(Context & context, const String & filter_executor_id, const tipb::Selection & selection) { - if (unlikely(hasPushDownFilter())) + if (unlikely(hasFilterConditions())) { return false; } - push_down_filter = PushDownFilter::pushDownFilterFrom(filter_executor_id, selection); + filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection); updateStreams(context); return true; } -bool PhysicalMockTableScan::hasPushDownFilter() const +bool PhysicalMockTableScan::hasFilterConditions() const { - return push_down_filter.hasValue(); + return filter_conditions.hasValue(); } -const String & PhysicalMockTableScan::getPushDownFilterId() const +const String & PhysicalMockTableScan::getFilterConditionsId() const { - assert(hasPushDownFilter()); - return push_down_filter.executor_id; + assert(hasFilterConditions()); + return filter_conditions.executor_id; } Int64 PhysicalMockTableScan::getLogicalTableID() const diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h index 4a961c8904b..7330b5d16c4 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.h @@ -15,7 +15,7 @@ #pragma once #include -#include +#include #include #include #include @@ -53,11 +53,11 @@ class PhysicalMockTableScan : public PhysicalLeaf void initStreams(Context & context); // for delta-merge test - bool pushDownFilter(Context & context, const String & filter_executor_id, const tipb::Selection & selection); + bool setFilterConditions(Context & context, const String & filter_executor_id, const tipb::Selection & selection); - bool hasPushDownFilter() const; + bool hasFilterConditions() const; - const String & getPushDownFilterId() const; + const String & getFilterConditionsId() const; Int64 getLogicalTableID() const; @@ -67,7 +67,7 @@ class PhysicalMockTableScan : public PhysicalLeaf void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) override; private: - PushDownFilter push_down_filter; + FilterConditions filter_conditions; Block sample_block; BlockInputStreams mock_streams; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp index ffae24ccc9f..27e9d8564b9 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalTableScan.cpp @@ -57,13 +57,13 @@ void PhysicalTableScan::buildBlockInputStreamImpl(DAGPipeline & pipeline, Contex if (context.isDisaggregatedComputeMode()) { - StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, tidb_table_scan, push_down_filter, max_streams); + StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, tidb_table_scan, filter_conditions, max_streams); disaggregated_tiflash_interpreter.execute(pipeline); buildProjection(pipeline, disaggregated_tiflash_interpreter.analyzer->getCurrentInputColumns()); } else { - DAGStorageInterpreter storage_interpreter(context, tidb_table_scan, push_down_filter, max_streams); + DAGStorageInterpreter storage_interpreter(context, tidb_table_scan, filter_conditions, max_streams); storage_interpreter.execute(pipeline); buildProjection(pipeline, storage_interpreter.analyzer->getCurrentInputColumns()); } @@ -103,27 +103,27 @@ const Block & PhysicalTableScan::getSampleBlock() const return sample_block; } -bool PhysicalTableScan::pushDownFilter(const String & filter_executor_id, const tipb::Selection & selection) +bool PhysicalTableScan::setFilterConditions(const String & filter_executor_id, const tipb::Selection & selection) { - /// Since there is at most one selection on the table scan, pushDownFilter will only be called at most once. - /// So in this case hasPushDownFilter() is always false. - if (unlikely(hasPushDownFilter())) + /// Since there is at most one selection on the table scan, setFilterConditions() will only be called at most once. + /// So in this case hasFilterConditions() is always false. + if (unlikely(hasFilterConditions())) { return false; } - push_down_filter = PushDownFilter::pushDownFilterFrom(filter_executor_id, selection); + filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection); return true; } -bool PhysicalTableScan::hasPushDownFilter() const +bool PhysicalTableScan::hasFilterConditions() const { - return push_down_filter.hasValue(); + return filter_conditions.hasValue(); } -const String & PhysicalTableScan::getPushDownFilterId() const +const String & PhysicalTableScan::getFilterConditionsId() const { - assert(hasPushDownFilter()); - return push_down_filter.executor_id; + assert(hasFilterConditions()); + return filter_conditions.executor_id; } } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalTableScan.h b/dbms/src/Flash/Planner/Plans/PhysicalTableScan.h index 3998e531b82..00c68e783cf 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalTableScan.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalTableScan.h @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include #include #include @@ -38,18 +38,18 @@ class PhysicalTableScan : public PhysicalLeaf const Block & getSampleBlock() const override; - bool pushDownFilter(const String & filter_executor_id, const tipb::Selection & selection); + bool setFilterConditions(const String & filter_executor_id, const tipb::Selection & selection); - bool hasPushDownFilter() const; + bool hasFilterConditions() const; - const String & getPushDownFilterId() const; + const String & getFilterConditionsId() const; private: void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; void buildProjection(DAGPipeline & pipeline, const NamesAndTypes & storage_schema); private: - PushDownFilter push_down_filter; + FilterConditions filter_conditions; TiDBTableScan tidb_table_scan; diff --git a/dbms/src/Flash/tests/gtest_storage_disaggregated.cpp b/dbms/src/Flash/tests/gtest_storage_disaggregated.cpp index 6837e1cc8fc..88d19ce866e 100644 --- a/dbms/src/Flash/tests/gtest_storage_disaggregated.cpp +++ b/dbms/src/Flash/tests/gtest_storage_disaggregated.cpp @@ -70,8 +70,8 @@ try TiFlashTestEnv::getGlobalContext().setDAGContext(dag_context.get()); TiDBTableScan tidb_table_scan(&table_scan, table_scan.executor_id(), *dag_context); - PushDownFilter filter; - StorageDisaggregated storage(TiFlashTestEnv::getGlobalContext(), tidb_table_scan, filter); + FilterConditions filter_conditions; + StorageDisaggregated storage(TiFlashTestEnv::getGlobalContext(), tidb_table_scan, filter_conditions); uint64_t store_id; std::vector region_ids; diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 6cdb210b45a..8761d7ce3a2 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -25,13 +25,13 @@ const String StorageDisaggregated::ExecIDPrefixForTiFlashStorageSender = "exec_i StorageDisaggregated::StorageDisaggregated( Context & context_, const TiDBTableScan & table_scan_, - const PushDownFilter & push_down_filter_) + const FilterConditions & filter_conditions_) : IStorage() , context(context_) , table_scan(table_scan_) , log(Logger::get(context_.getDAGContext()->log ? context_.getDAGContext()->log->identifier() : "")) , sender_target_mpp_task_id(context_.getDAGContext()->getMPPTaskMeta()) - , push_down_filter(push_down_filter_) + , filter_conditions(filter_conditions_) { } @@ -55,7 +55,7 @@ BlockInputStreams StorageDisaggregated::read( DAGPipeline pipeline; buildReceiverStreams(dispatch_reqs, num_streams, pipeline); - pushDownFilter(pipeline); + filterConditions(pipeline); return pipeline.streams; } @@ -173,7 +173,7 @@ tipb::Executor StorageDisaggregated::buildTableScanTiPB() // TODO: For now, to avoid versions of tiflash_compute nodes and tiflash_storage being different, // disable filter push down to avoid unsupported expression in tiflash_storage. // Uncomment this when we are sure versions are same. - // executor = push_down_filter.constructSelectionForRemoteRead(dag_req.mutable_root_executor()); + // executor = filter_conditions.constructSelectionForRemoteRead(dag_req.mutable_root_executor()); tipb::Executor ts_exec; ts_exec.set_tp(tipb::ExecType::TypeTableScan); @@ -255,7 +255,7 @@ void StorageDisaggregated::buildReceiverStreams(const std::vectorgetOutputSchema(); @@ -264,12 +264,12 @@ void StorageDisaggregated::pushDownFilter(DAGPipeline & pipeline) analyzer = std::make_unique(std::move(source_columns), context); - if (push_down_filter.hasValue()) + if (filter_conditions.hasValue()) { // No need to cast, because already done by tiflash_storage node. - ::DB::executePushedDownFilter(/*remote_read_streams_start_index=*/pipeline.streams.size(), push_down_filter, *analyzer, log, pipeline); + ::DB::executePushedDownFilter(/*remote_read_streams_start_index=*/pipeline.streams.size(), filter_conditions, *analyzer, log, pipeline); - auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[push_down_filter.executor_id]; + auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[filter_conditions.executor_id]; pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); } } diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index 2414d5828b2..57edfa4e0f9 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -40,7 +40,7 @@ class StorageDisaggregated : public IStorage StorageDisaggregated( Context & context_, const TiDBTableScan & table_scan_, - const PushDownFilter & push_down_filter_); + const FilterConditions & filter_conditions_); std::string getName() const override { @@ -73,14 +73,14 @@ class StorageDisaggregated : public IStorage std::vector buildRemoteTableRanges(); std::vector buildBatchCopTasks(const std::vector & remote_table_ranges); void buildReceiverStreams(const std::vector & dispatch_reqs, unsigned num_streams, DAGPipeline & pipeline); - void pushDownFilter(DAGPipeline & pipeline); + void filterConditions(DAGPipeline & pipeline); tipb::Executor buildTableScanTiPB(); Context & context; const TiDBTableScan & table_scan; LoggerPtr log; MPPTaskId sender_target_mpp_task_id; - const PushDownFilter & push_down_filter; + const FilterConditions & filter_conditions; std::shared_ptr exchange_receiver; };