Skip to content

Commit

Permalink
*: rename PushDownFilter to FilterConditions (pingcap#6679)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored Jan 29, 2023
1 parent a1d8a8f commit e64d8cc
Show file tree
Hide file tree
Showing 20 changed files with 102 additions and 96 deletions.
8 changes: 4 additions & 4 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void MockStorage::addTableDataForDeltaMerge(Context & context, const String & na
}
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter)
BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions)
{
assert(tableExistsForDeltaMerge(table_id));
auto storage = storage_delta_merge_map[table_id];
Expand All @@ -154,15 +154,15 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(context.getSettingsRef().resolve_locks, std::numeric_limits<UInt64>::max(), scan_context);
if (push_down_filter && push_down_filter->hasValue())
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
push_down_filter->conditions,
filter_conditions->conditions,
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*push_down_filter, *analyzer);
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*filter_conditions, *analyzer);
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams
// TODO: set num_streams, then ins.size() != 1
BlockInputStreamPtr in = ins[0];
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#pragma once
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <common/types.h>
Expand Down Expand Up @@ -82,7 +82,7 @@ class MockStorage

NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id);

BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter = nullptr);
BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions = nullptr);

bool tableExistsForDeltaMerge(Int64 table_id);

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/StorageDisaggregatedInterpreter.h>
#include <Flash/Mpp/newMPPExchangeWriter.h>
#include <Interpreters/Aggregator.h>
Expand Down Expand Up @@ -189,17 +189,17 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
const auto push_down_filter = PushDownFilter::pushDownFilterFrom(query_block.selection_name, query_block.selection);
const auto filter_conditions = FilterConditions::filterConditionsFrom(query_block.selection_name, query_block.selection);

if (context.isDisaggregatedComputeMode())
{
StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, table_scan, push_down_filter, max_streams);
StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, table_scan, filter_conditions, max_streams);
disaggregated_tiflash_interpreter.execute(pipeline);
analyzer = std::move(disaggregated_tiflash_interpreter.analyzer);
}
else
{
DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams);
DAGStorageInterpreter storage_interpreter(context, table_scan, filter_conditions, max_streams);
storage_interpreter.execute(pipeline);

analyzer = std::move(storage_interpreter.analyzer);
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ String genErrMsgForLocalRead(
DAGStorageInterpreter::DAGStorageInterpreter(
Context & context_,
const TiDBTableScan & table_scan_,
const PushDownFilter & push_down_filter_,
const FilterConditions & filter_conditions_,
size_t max_streams_)
: context(context_)
, table_scan(table_scan_)
, push_down_filter(push_down_filter_)
, filter_conditions(filter_conditions_)
, max_streams(max_streams_)
, log(Logger::get(context.getDAGContext()->log ? context.getDAGContext()->log->identifier() : ""))
, logical_table_id(table_scan.getLogicalTableID())
Expand Down Expand Up @@ -337,11 +337,11 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle pushed down filter for local and remote table scan.
if (push_down_filter.hasValue())
/// handle filter conditions for local and remote table scan.
if (filter_conditions.hasValue())
{
::DB::executePushedDownFilter(remote_read_streams_start_index, push_down_filter, *analyzer, log, pipeline);
recordProfileStreams(pipeline, push_down_filter.executor_id);
::DB::executePushedDownFilter(remote_read_streams_start_index, filter_conditions, *analyzer, log, pipeline);
recordProfileStreams(pipeline, filter_conditions.executor_id);
}
}

Expand Down Expand Up @@ -579,7 +579,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
/// to avoid null point exception
query_info.query = dagContext().dummy_ast;
query_info.dag_query = std::make_unique<DAGQueryInfo>(
push_down_filter.conditions,
filter_conditions.conditions,
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
Expand Down Expand Up @@ -1006,7 +1006,7 @@ std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests(const DM::
*context.getDAGContext(),
table_scan,
storages_with_structure_lock[physical_table_id].storage->getTableInfo(),
push_down_filter,
filter_conditions,
log));
}
return remote_requests;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Common/nocopyable.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/RegionQueryInfo.h>
Expand All @@ -44,7 +44,7 @@ class DAGStorageInterpreter
DAGStorageInterpreter(
Context & context_,
const TiDBTableScan & table_scan,
const PushDownFilter & push_down_filter_,
const FilterConditions & filter_conditions_,
size_t max_streams_);

DISALLOW_MOVE(DAGStorageInterpreter);
Expand Down Expand Up @@ -111,7 +111,7 @@ class DAGStorageInterpreter

Context & context;
const TiDBTableScan & table_scan;
const PushDownFilter & push_down_filter;
const FilterConditions & filter_conditions;
const size_t max_streams;
LoggerPtr log;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <common/likely.h>

namespace DB
{
PushDownFilter::PushDownFilter(
FilterConditions::FilterConditions(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_)
: executor_id(executor_id_)
Expand All @@ -27,12 +27,12 @@ PushDownFilter::PushDownFilter(
if (unlikely(conditions.empty() != executor_id.empty()))
{
throw TiFlashException(
"for PushDownFilter, conditions and executor_id should both be empty or neither should be empty",
"for FilterConditions, conditions and executor_id should both be empty or neither should be empty",
Errors::Coprocessor::BadRequest);
}
}

tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const
tipb::Executor * FilterConditions::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const
{
if (hasValue())
{
Expand All @@ -49,17 +49,17 @@ tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor
}
}

PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor)
FilterConditions FilterConditions::filterConditionsFrom(const String & executor_id, const tipb::Executor * executor)
{
if (!executor || !executor->has_selection())
{
return {"", {}};
}

return pushDownFilterFrom(executor_id, executor->selection());
return filterConditionsFrom(executor_id, executor->selection());
}

PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection)
FilterConditions FilterConditions::filterConditionsFrom(const String & executor_id, const tipb::Selection & selection)
{
std::vector<const tipb::Expr *> conditions;
for (const auto & condition : selection.conditions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@

namespace DB
{
struct PushDownFilter

/** This struct FilterConditions is used to store the filter conditions of the selection whose child is a table scan.
* Those conditions will be used to construct rough index in storage engine.
* And those conditions will be pushed down to the remote read request.
*/
struct FilterConditions
{
static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor);
static FilterConditions filterConditionsFrom(const String & executor_id, const tipb::Executor * executor);

static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection);
static FilterConditions filterConditionsFrom(const String & executor_id, const tipb::Selection & selection);

PushDownFilter() = default;
FilterConditions() = default;

PushDownFilter(
FilterConditions(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_);

Expand All @@ -40,4 +45,5 @@ struct PushDownFilter
String executor_id;
std::vector<const tipb::Expr *> conditions;
};

} // namespace DB
10 changes: 5 additions & 5 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ void executeCreatingSets(
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer)
{
assert(push_down_filter.hasValue());
assert(filter_conditions.hasValue());

ExpressionActionsChain chain;
analyzer.initChain(chain);
String filter_column_name = analyzer.appendWhere(chain, push_down_filter.conditions);
String filter_column_name = analyzer.appendWhere(chain, filter_conditions.conditions);
ExpressionActionsPtr before_where = chain.getLastActions();
chain.addStep();

Expand All @@ -201,12 +201,12 @@ std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilt

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(push_down_filter, analyzer);
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions, analyzer);

assert(remote_read_streams_start_index <= pipeline.streams.size());
// for remote read, filter had been pushed down, don't need to execute again.
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <Core/SortDescription.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Interpreters/ExpressionActions.h>

namespace DB
Expand Down Expand Up @@ -69,12 +69,12 @@ void executeCreatingSets(
const LoggerPtr & log);

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer);

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ RemoteRequest RemoteRequest::build(
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
const LoggerPtr & log)
{
LOG_INFO(log, "{}", printRetryRegions(retry_regions, table_info.id));

DAGSchema schema;
tipb::DAGRequest dag_req;
auto * executor = push_down_filter.constructSelectionForRemoteRead(dag_req.mutable_root_executor());
auto * executor = filter_conditions.constructSelectionForRemoteRead(dag_req.mutable_root_executor());

{
tipb::Executor * ts_exec = executor;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#pragma once

#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RegionInfo.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
Expand Down Expand Up @@ -51,7 +51,7 @@ struct RemoteRequest
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
const LoggerPtr & log);
static std::vector<pingcap::coprocessor::KeyRange> buildKeyRanges(const RegionRetryList & retry_regions);
static std::string printRetryRegions(const RegionRetryList & retry_regions, TableID table_id);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/StorageDisaggregatedInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Interpreters/Context.h>
#include <Storages/StorageDisaggregated.h>

Expand All @@ -32,10 +32,10 @@ class StorageDisaggregatedInterpreter
StorageDisaggregatedInterpreter(
Context & context_,
const TiDBTableScan & table_scan_,
const PushDownFilter & push_down_filter_,
const FilterConditions & filter_conditions_,
size_t max_streams_)
: context(context_)
, storage(std::make_unique<StorageDisaggregated>(context_, table_scan_, push_down_filter_))
, storage(std::make_unique<StorageDisaggregated>(context_, table_scan_, filter_conditions_))
, max_streams(max_streams_)
{}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ bool pushDownSelection(Context & context, const PhysicalPlanNodePtr & plan, cons
if (plan->tp() == PlanType::TableScan)
{
auto physical_table_scan = std::static_pointer_cast<PhysicalTableScan>(plan);
return physical_table_scan->pushDownFilter(executor_id, selection);
return physical_table_scan->setFilterConditions(executor_id, selection);
}
if (unlikely(plan->tp() == PlanType::MockTableScan && context.isExecutorTest()))
{
auto physical_mock_table_scan = std::static_pointer_cast<PhysicalMockTableScan>(plan);
if (context.mockStorage()->useDeltaMerge() && context.mockStorage()->tableExistsForDeltaMerge(physical_mock_table_scan->getLogicalTableID()))
{
return physical_mock_table_scan->pushDownFilter(context, executor_id, selection);
return physical_mock_table_scan->setFilterConditions(context, executor_id, selection);
}
}
return false;
Expand All @@ -71,8 +71,8 @@ void fillOrderForListBasedExecutors(DAGContext & dag_context, const PhysicalPlan
if (plan->tp() == PlanType::TableScan)
{
auto physical_table_scan = std::static_pointer_cast<PhysicalTableScan>(plan);
if (physical_table_scan->hasPushDownFilter())
list_based_executors_order.push_back(physical_table_scan->getPushDownFilterId());
if (physical_table_scan->hasFilterConditions())
list_based_executors_order.push_back(physical_table_scan->getFilterConditionsId());
list_based_executors_order.push_back(physical_table_scan->execId());
}
else
Expand Down
Loading

0 comments on commit e64d8cc

Please sign in to comment.