Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: rename PushDownFilter to FilterConditions #6679

Merged
merged 6 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void MockStorage::addTableDataForDeltaMerge(Context & context, const String & na
}
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter)
BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions)
{
assert(tableExistsForDeltaMerge(table_id));
auto storage = storage_delta_merge_map[table_id];
Expand All @@ -154,15 +154,15 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(context.getSettingsRef().resolve_locks, std::numeric_limits<UInt64>::max(), scan_context);
if (push_down_filter && push_down_filter->hasValue())
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
push_down_filter->conditions,
filter_conditions->conditions,
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*push_down_filter, *analyzer);
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*filter_conditions, *analyzer);
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams
// TODO: set num_streams, then ins.size() != 1
BlockInputStreamPtr in = ins[0];
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#pragma once
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <common/types.h>
Expand Down Expand Up @@ -82,7 +82,7 @@ class MockStorage

NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id);

BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter = nullptr);
BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions = nullptr);

bool tableExistsForDeltaMerge(Int64 table_id);

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/StorageDisaggregatedInterpreter.h>
#include <Flash/Mpp/newMPPExchangeWriter.h>
#include <Interpreters/Aggregator.h>
Expand Down Expand Up @@ -189,17 +189,17 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
const auto push_down_filter = PushDownFilter::pushDownFilterFrom(query_block.selection_name, query_block.selection);
const auto filter_conditions = FilterConditions::filterConditionsFrom(query_block.selection_name, query_block.selection);

if (context.isDisaggregatedComputeMode())
{
StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, table_scan, push_down_filter, max_streams);
StorageDisaggregatedInterpreter disaggregated_tiflash_interpreter(context, table_scan, filter_conditions, max_streams);
disaggregated_tiflash_interpreter.execute(pipeline);
analyzer = std::move(disaggregated_tiflash_interpreter.analyzer);
}
else
{
DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams);
DAGStorageInterpreter storage_interpreter(context, table_scan, filter_conditions, max_streams);
storage_interpreter.execute(pipeline);

analyzer = std::move(storage_interpreter.analyzer);
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ String genErrMsgForLocalRead(
DAGStorageInterpreter::DAGStorageInterpreter(
Context & context_,
const TiDBTableScan & table_scan_,
const PushDownFilter & push_down_filter_,
const FilterConditions & filter_conditions_,
size_t max_streams_)
: context(context_)
, table_scan(table_scan_)
, push_down_filter(push_down_filter_)
, filter_conditions(filter_conditions_)
, max_streams(max_streams_)
, log(Logger::get(context.getDAGContext()->log ? context.getDAGContext()->log->identifier() : ""))
, logical_table_id(table_scan.getLogicalTableID())
Expand Down Expand Up @@ -337,11 +337,11 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle pushed down filter for local and remote table scan.
if (push_down_filter.hasValue())
/// handle filter conditions for local and remote table scan.
if (filter_conditions.hasValue())
{
::DB::executePushedDownFilter(remote_read_streams_start_index, push_down_filter, *analyzer, log, pipeline);
recordProfileStreams(pipeline, push_down_filter.executor_id);
::DB::executePushedDownFilter(remote_read_streams_start_index, filter_conditions, *analyzer, log, pipeline);
recordProfileStreams(pipeline, filter_conditions.executor_id);
}
}

Expand Down Expand Up @@ -579,7 +579,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
/// to avoid null point exception
query_info.query = dagContext().dummy_ast;
query_info.dag_query = std::make_unique<DAGQueryInfo>(
push_down_filter.conditions,
filter_conditions.conditions,
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
Expand Down Expand Up @@ -1006,7 +1006,7 @@ std::vector<RemoteRequest> DAGStorageInterpreter::buildRemoteRequests(const DM::
*context.getDAGContext(),
table_scan,
storages_with_structure_lock[physical_table_id].storage->getTableInfo(),
push_down_filter,
filter_conditions,
log));
}
return remote_requests;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Common/nocopyable.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/RegionQueryInfo.h>
Expand All @@ -44,7 +44,7 @@ class DAGStorageInterpreter
DAGStorageInterpreter(
Context & context_,
const TiDBTableScan & table_scan,
const PushDownFilter & push_down_filter_,
const FilterConditions & filter_conditions_,
size_t max_streams_);

DISALLOW_MOVE(DAGStorageInterpreter);
Expand Down Expand Up @@ -111,7 +111,7 @@ class DAGStorageInterpreter

Context & context;
const TiDBTableScan & table_scan;
const PushDownFilter & push_down_filter;
const FilterConditions & filter_conditions;
const size_t max_streams;
LoggerPtr log;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <common/likely.h>

namespace DB
{
PushDownFilter::PushDownFilter(
FilterConditions::FilterConditions(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_)
: executor_id(executor_id_)
Expand All @@ -27,12 +27,12 @@ PushDownFilter::PushDownFilter(
if (unlikely(conditions.empty() != executor_id.empty()))
{
throw TiFlashException(
"for PushDownFilter, conditions and executor_id should both be empty or neither should be empty",
"for FilterConditions, conditions and executor_id should both be empty or neither should be empty",
Errors::Coprocessor::BadRequest);
}
}

tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const
tipb::Executor * FilterConditions::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const
{
if (hasValue())
{
Expand All @@ -49,17 +49,17 @@ tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor
}
}

PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor)
FilterConditions FilterConditions::filterConditionsFrom(const String & executor_id, const tipb::Executor * executor)
{
if (!executor || !executor->has_selection())
{
return {"", {}};
}

return pushDownFilterFrom(executor_id, executor->selection());
return filterConditionsFrom(executor_id, executor->selection());
}

PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection)
FilterConditions FilterConditions::filterConditionsFrom(const String & executor_id, const tipb::Selection & selection)
{
std::vector<const tipb::Expr *> conditions;
for (const auto & condition : selection.conditions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@

namespace DB
{
struct PushDownFilter

/** This struct FilterConditions is used to store the filter conditions of the selection whose child is a table scan.
* Those conditions will be used to construct rough index in storage engine.
* And those conditions will be pushed down to the remote read request.
*/
struct FilterConditions
{
static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor);
static FilterConditions filterConditionsFrom(const String & executor_id, const tipb::Executor * executor);

static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection);
static FilterConditions filterConditionsFrom(const String & executor_id, const tipb::Selection & selection);

PushDownFilter() = default;
FilterConditions() = default;

PushDownFilter(
FilterConditions(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_);

Expand All @@ -40,4 +45,5 @@ struct PushDownFilter
String executor_id;
std::vector<const tipb::Expr *> conditions;
};

} // namespace DB
10 changes: 5 additions & 5 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ void executeCreatingSets(
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer)
{
assert(push_down_filter.hasValue());
assert(filter_conditions.hasValue());

ExpressionActionsChain chain;
analyzer.initChain(chain);
String filter_column_name = analyzer.appendWhere(chain, push_down_filter.conditions);
String filter_column_name = analyzer.appendWhere(chain, filter_conditions.conditions);
ExpressionActionsPtr before_where = chain.getLastActions();
chain.addStep();

Expand All @@ -201,12 +201,12 @@ std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilt

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(push_down_filter, analyzer);
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions, analyzer);

assert(remote_read_streams_start_index <= pipeline.streams.size());
// for remote read, filter had been pushed down, don't need to execute again.
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <Core/SortDescription.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Interpreters/ExpressionActions.h>

namespace DB
Expand Down Expand Up @@ -69,12 +69,12 @@ void executeCreatingSets(
const LoggerPtr & log);

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer);

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ RemoteRequest RemoteRequest::build(
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
const LoggerPtr & log)
{
LOG_INFO(log, "{}", printRetryRegions(retry_regions, table_info.id));

DAGSchema schema;
tipb::DAGRequest dag_req;
auto * executor = push_down_filter.constructSelectionForRemoteRead(dag_req.mutable_root_executor());
auto * executor = filter_conditions.constructSelectionForRemoteRead(dag_req.mutable_root_executor());

{
tipb::Executor * ts_exec = executor;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#pragma once

#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RegionInfo.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
Expand Down Expand Up @@ -51,7 +51,7 @@ struct RemoteRequest
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const FilterConditions & filter_conditions,
const LoggerPtr & log);
static std::vector<pingcap::coprocessor::KeyRange> buildKeyRanges(const RegionRetryList & retry_regions);
static std::string printRetryRegions(const RegionRetryList & retry_regions, TableID table_id);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/StorageDisaggregatedInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Interpreters/Context.h>
#include <Storages/StorageDisaggregated.h>

Expand All @@ -32,10 +32,10 @@ class StorageDisaggregatedInterpreter
StorageDisaggregatedInterpreter(
Context & context_,
const TiDBTableScan & table_scan_,
const PushDownFilter & push_down_filter_,
const FilterConditions & filter_conditions_,
size_t max_streams_)
: context(context_)
, storage(std::make_unique<StorageDisaggregated>(context_, table_scan_, push_down_filter_))
, storage(std::make_unique<StorageDisaggregated>(context_, table_scan_, filter_conditions_))
, max_streams(max_streams_)
{}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ bool pushDownSelection(Context & context, const PhysicalPlanNodePtr & plan, cons
if (plan->tp() == PlanType::TableScan)
{
auto physical_table_scan = std::static_pointer_cast<PhysicalTableScan>(plan);
return physical_table_scan->pushDownFilter(executor_id, selection);
return physical_table_scan->setFilterConditions(executor_id, selection);
}
if (unlikely(plan->tp() == PlanType::MockTableScan && context.isExecutorTest()))
{
auto physical_mock_table_scan = std::static_pointer_cast<PhysicalMockTableScan>(plan);
if (context.mockStorage()->useDeltaMerge() && context.mockStorage()->tableExistsForDeltaMerge(physical_mock_table_scan->getLogicalTableID()))
{
return physical_mock_table_scan->pushDownFilter(context, executor_id, selection);
return physical_mock_table_scan->setFilterConditions(context, executor_id, selection);
}
}
return false;
Expand All @@ -71,8 +71,8 @@ void fillOrderForListBasedExecutors(DAGContext & dag_context, const PhysicalPlan
if (plan->tp() == PlanType::TableScan)
{
auto physical_table_scan = std::static_pointer_cast<PhysicalTableScan>(plan);
if (physical_table_scan->hasPushDownFilter())
list_based_executors_order.push_back(physical_table_scan->getPushDownFilterId());
if (physical_table_scan->hasFilterConditions())
list_based_executors_order.push_back(physical_table_scan->getFilterConditionsId());
list_based_executors_order.push_back(physical_table_scan->execId());
}
else
Expand Down
Loading