From 3746241f1e7a712442afc4810b16f9507b5a8377 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Wed, 29 May 2024 17:08:52 +0800 Subject: [PATCH 1/5] Storages: Refine build filters in StorageDeltaMerge --- .../DeltaMerge/Filter/PushDownFilter.cpp | 197 ++++++++++++ .../DeltaMerge/Filter/PushDownFilter.h | 22 ++ .../Storages/DeltaMerge/Filter/RSOperator.cpp | 39 +++ .../Storages/DeltaMerge/Filter/RSOperator.h | 8 + dbms/src/Storages/StorageDeltaMerge.cpp | 283 ++++-------------- dbms/src/Storages/StorageDeltaMerge.h | 29 -- .../Storages/StorageDisaggregatedRemote.cpp | 17 +- .../tests/gtests_parse_push_down_filter.cpp | 9 +- 8 files changed, 327 insertions(+), 277 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp new file mode 100644 index 00000000000..53357202507 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp @@ -0,0 +1,197 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::DM +{ +PushDownFilterPtr PushDownFilter::build( + const RSOperatorPtr & rs_operator, + const ColumnInfos & table_scan_column_info, + const google::protobuf::RepeatedPtrField & pushed_down_filters, + const ColumnDefines & columns_to_read, + const Context & context, + const LoggerPtr & tracing_logger) +{ + if (pushed_down_filters.empty()) + { + LOG_DEBUG(tracing_logger, "Push down filter is empty"); + return std::make_shared(rs_operator); + } + std::unordered_map columns_to_read_map; + for (const auto & column : columns_to_read) + columns_to_read_map.emplace(column.id, column); + + // Get the columns of the filter, is a subset of columns_to_read + std::unordered_set filter_col_id_set; + for (const auto & expr : pushed_down_filters) + { + getColumnIDsFromExpr(expr, table_scan_column_info, filter_col_id_set); + } + auto filter_columns = std::make_shared(); + filter_columns->reserve(filter_col_id_set.size()); + for (const auto & cid : filter_col_id_set) + { + RUNTIME_CHECK_MSG( + columns_to_read_map.contains(cid), + "Filter ColumnID({}) not found in columns_to_read_map", + cid); + filter_columns->emplace_back(columns_to_read_map.at(cid)); + } + + // The source_columns_of_analyzer should be the same as the size of table_scan_column_info + // The columns_to_read is a subset of table_scan_column_info, when there are generated columns and extra table id column. + NamesAndTypes source_columns_of_analyzer; + source_columns_of_analyzer.reserve(table_scan_column_info.size()); + for (size_t i = 0; i < table_scan_column_info.size(); ++i) + { + auto const & ci = table_scan_column_info[i]; + const auto cid = ci.id; + if (ci.hasGeneratedColumnFlag()) + { + const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); + const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci); + source_columns_of_analyzer.emplace_back(col_name, data_type); + continue; + } + if (cid == EXTRA_TABLE_ID_COLUMN_ID) + { + source_columns_of_analyzer.emplace_back(EXTRA_TABLE_ID_COLUMN_NAME, EXTRA_TABLE_ID_COLUMN_TYPE); + continue; + } + RUNTIME_CHECK_MSG(columns_to_read_map.contains(cid), "ColumnID({}) not found in columns_to_read_map", cid); + source_columns_of_analyzer.emplace_back(columns_to_read_map.at(cid).name, columns_to_read_map.at(cid).type); + } + auto analyzer = std::make_unique(source_columns_of_analyzer, context); + + // Build the extra cast + ExpressionActionsPtr extra_cast = nullptr; + // need_cast_column should be the same size as table_scan_column_info and source_columns_of_analyzer + std::vector may_need_add_cast_column; + may_need_add_cast_column.reserve(table_scan_column_info.size()); + for (const auto & col : table_scan_column_info) + may_need_add_cast_column.push_back( + !col.hasGeneratedColumnFlag() && filter_col_id_set.contains(col.id) && col.id != -1); + ExpressionActionsChain chain; + auto & step = analyzer->initAndGetLastStep(chain); + auto & actions = step.actions; + if (auto [has_cast, casted_columns] + = analyzer->buildExtraCastsAfterTS(actions, may_need_add_cast_column, table_scan_column_info); + has_cast) + { + NamesWithAliases project_cols; + for (size_t i = 0; i < columns_to_read.size(); ++i) + { + if (filter_col_id_set.contains(columns_to_read[i].id)) + project_cols.emplace_back(casted_columns[i], columns_to_read[i].name); + } + actions->add(ExpressionAction::project(project_cols)); + + for (const auto & col : *filter_columns) + step.required_output.push_back(col.name); + + extra_cast = chain.getLastActions(); + chain.finalize(); + chain.clear(); + LOG_DEBUG(tracing_logger, "Extra cast for filter columns: {}", extra_cast->dumpActions()); + } + + // build filter expression actions + auto [before_where, filter_column_name, project_after_where] + = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer); + LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions()); + + // record current column defines + auto columns_after_cast = std::make_shared(); + if (extra_cast != nullptr) + { + columns_after_cast->reserve(columns_to_read.size()); + const auto & current_names_and_types = analyzer->getCurrentInputColumns(); + for (size_t i = 0; i < table_scan_column_info.size(); ++i) + { + if (table_scan_column_info[i].hasGeneratedColumnFlag() + || table_scan_column_info[i].id == EXTRA_TABLE_ID_COLUMN_ID) + continue; + auto col = columns_to_read_map.at(table_scan_column_info[i].id); + RUNTIME_CHECK_MSG( + col.name == current_names_and_types[i].name, + "Column name mismatch, expect: {}, actual: {}", + col.name, + current_names_and_types[i].name); + columns_after_cast->push_back(col); + columns_after_cast->back().type = current_names_and_types[i].type; + } + } + + return std::make_shared( + rs_operator, + before_where, + project_after_where, + filter_columns, + filter_column_name, + extra_cast, + columns_after_cast); +} + +PushDownFilterPtr PushDownFilter::build( + const SelectQueryInfo & query_info, + const ColumnDefines & columns_to_read, + const ColumnDefines & table_column_defines, + const Context & context, + const LoggerPtr & tracing_logger) +{ + const auto & dag_query = query_info.dag_query; + if (unlikely(dag_query == nullptr)) + return EMPTY_FILTER; + + // build rough set operator + const auto rs_operator = RSOperator::build( + dag_query, + columns_to_read, + table_column_defines, + context.getSettingsRef().dt_enable_rough_set_filter, + tracing_logger); + // build push down filter + const auto & columns_to_read_info = dag_query->source_columns; + const auto & pushed_down_filters = dag_query->pushed_down_filters; + if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty()) + { + google::protobuf::RepeatedPtrField merged_filters{ + pushed_down_filters.begin(), + pushed_down_filters.end()}; + merged_filters.MergeFrom(dag_query->filters); + return PushDownFilter::build( + rs_operator, + columns_to_read_info, + merged_filters, + columns_to_read, + context, + tracing_logger); + } + return PushDownFilter::build( + rs_operator, + columns_to_read_info, + pushed_down_filters, + columns_to_read, + context, + tracing_logger); +} +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h index 97d69897e00..27fe0c7ee49 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h @@ -17,6 +17,11 @@ #include #include +namespace DB +{ +struct SelectQueryInfo; +} + namespace DB::DM { @@ -48,6 +53,23 @@ class PushDownFilter : rs_operator(rs_operator_) {} + // Use by StorageDisaggregated. + static PushDownFilterPtr build( + const DM::RSOperatorPtr & rs_operator, + const ColumnInfos & table_scan_column_info, + const google::protobuf::RepeatedPtrField & pushed_down_filters, + const ColumnDefines & columns_to_read, + const Context & context, + const LoggerPtr & tracing_logger); + + // Use by StorageDeltaMerge. + static DM::PushDownFilterPtr build( + const SelectQueryInfo & query_info, + const ColumnDefines & columns_to_read, + const ColumnDefines & table_column_defines, + const Context & context, + const LoggerPtr & tracing_logger); + // Rough set operator RSOperatorPtr rs_operator; // Filter expression actions and the name of the tmp filter column diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp index 9928edb16b5..fc668c559e0 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -26,6 +27,7 @@ #include #include #include +#include namespace DB::DM { @@ -46,4 +48,41 @@ RSOperatorPtr createIsNull(const Attr & attr) RSOperatorPtr createUnsupported(const String & content, const String & reason) { return std::make_shared(content, reason); } // clang-format on +RSOperatorPtr RSOperator::build( + const std::unique_ptr & dag_query, + const ColumnDefines & columns_to_read, + const ColumnDefines & table_column_defines, + bool enable_rs_filter, + const LoggerPtr & tracing_logger) +{ + RUNTIME_CHECK(dag_query != nullptr); + // build rough set operator + DM::RSOperatorPtr rs_operator = DM::EMPTY_RS_OPERATOR; + if (likely(enable_rs_filter)) + { + /// Query from TiDB / TiSpark + auto create_attr_by_column_id = [&table_column_defines](ColumnID column_id) -> Attr { + auto iter = std::find_if( + table_column_defines.begin(), + table_column_defines.end(), + [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; }); + if (iter != table_column_defines.end()) + return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; + // Maybe throw an exception? Or check if `type` is nullptr before creating filter? + return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; + }; + rs_operator = FilterParser::parseDAGQuery( + *dag_query, + columns_to_read, + std::move(create_attr_by_column_id), + tracing_logger); + if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) + LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); + } + else + LOG_DEBUG(tracing_logger, "Rough set filter is disabled."); + + return rs_operator; +} + } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h index 4bf68ccbb68..b054269a989 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include #include @@ -48,6 +49,13 @@ class RSOperator virtual RSResults roughCheck(size_t start_pack, size_t pack_count, const RSCheckParam & param) = 0; virtual ColIds getColumnIDs() = 0; + + static RSOperatorPtr build( + const std::unique_ptr & dag_query, + const ColumnDefines & columns_to_read, + const ColumnDefines & table_column_defines, + bool enable_rs_filter, + const LoggerPtr & tracing_logger); }; class ColCmpVal : public RSOperator diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 5e017055270..387e1072e35 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -39,7 +38,6 @@ #include #include #include -#include #include #include #include @@ -57,7 +55,6 @@ #include #include #include -#include #include @@ -555,6 +552,8 @@ WriteResult StorageDeltaMerge::write( return store->write(global_context, settings, block, applied_status); } +namespace +{ std::unordered_set parseSegmentSet(const ASTPtr & ast) { if (!ast) @@ -669,14 +668,17 @@ void checkStartTs(UInt64 start_ts, const Context & context, const String & req_i } } -DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo( +DM::RowKeyRanges parseMvccQueryInfo( const DB::MvccQueryInfo & mvcc_query_info, + KeyspaceID keyspace_id, + TableID table_id, + bool is_common_handle, + size_t rowkey_column_size, unsigned num_streams, const Context & context, const String & req_id, const LoggerPtr & tracing_logger) { - auto keyspace_id = getTableInfo().getKeyspaceID(); checkStartTs(mvcc_query_info.start_ts, context, req_id, keyspace_id); FmtBuffer fmt_buf; @@ -709,7 +711,7 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo( auto ranges = getQueryRanges( mvcc_query_info.regions_query_info, - tidb_table_info.id, + table_id, is_common_handle, rowkey_column_size, num_streams, @@ -729,206 +731,22 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo( return ranges; } -DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator( - const std::unique_ptr & dag_query, - const ColumnDefines & columns_to_read, - const Context & context, - const LoggerPtr & tracing_logger) -{ - RUNTIME_CHECK(dag_query != nullptr); - // build rough set operator - DM::RSOperatorPtr rs_operator = DM::EMPTY_RS_OPERATOR; - const bool enable_rs_filter = context.getSettingsRef().dt_enable_rough_set_filter; - if (likely(enable_rs_filter)) - { - /// Query from TiDB / TiSpark - auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr { - const ColumnDefines & defines = this->getAndMaybeInitStore()->getTableColumns(); - auto iter = std::find_if(defines.begin(), defines.end(), [column_id](const ColumnDefine & d) -> bool { - return d.id == column_id; - }); - if (iter != defines.end()) - return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; - // Maybe throw an exception? Or check if `type` is nullptr before creating filter? - return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; - }; - rs_operator - = FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log); - if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) - LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); - } - else - LOG_DEBUG(tracing_logger, "Rough set filter is disabled."); - - return rs_operator; -} - -DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter( - const RSOperatorPtr & rs_operator, - const ColumnInfos & table_scan_column_info, - const google::protobuf::RepeatedPtrField & pushed_down_filters, - const ColumnDefines & columns_to_read, - const Context & context, - const LoggerPtr & tracing_logger) -{ - if (pushed_down_filters.empty()) - { - LOG_DEBUG(tracing_logger, "Push down filter is empty"); - return std::make_shared(rs_operator); - } - std::unordered_map columns_to_read_map; - for (const auto & column : columns_to_read) - columns_to_read_map.emplace(column.id, column); - - // Get the columns of the filter, is a subset of columns_to_read - std::unordered_set filter_col_id_set; - for (const auto & expr : pushed_down_filters) - { - getColumnIDsFromExpr(expr, table_scan_column_info, filter_col_id_set); - } - auto filter_columns = std::make_shared(); - filter_columns->reserve(filter_col_id_set.size()); - for (const auto & cid : filter_col_id_set) - { - RUNTIME_CHECK_MSG( - columns_to_read_map.contains(cid), - "Filter ColumnID({}) not found in columns_to_read_map", - cid); - filter_columns->emplace_back(columns_to_read_map.at(cid)); - } - - // The source_columns_of_analyzer should be the same as the size of table_scan_column_info - // The columns_to_read is a subset of table_scan_column_info, when there are generated columns and extra table id column. - NamesAndTypes source_columns_of_analyzer; - source_columns_of_analyzer.reserve(table_scan_column_info.size()); - for (size_t i = 0; i < table_scan_column_info.size(); ++i) - { - auto const & ci = table_scan_column_info[i]; - const auto cid = ci.id; - if (ci.hasGeneratedColumnFlag()) - { - const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); - const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci); - source_columns_of_analyzer.emplace_back(col_name, data_type); - continue; - } - if (cid == EXTRA_TABLE_ID_COLUMN_ID) - { - source_columns_of_analyzer.emplace_back(EXTRA_TABLE_ID_COLUMN_NAME, EXTRA_TABLE_ID_COLUMN_TYPE); - continue; - } - RUNTIME_CHECK_MSG(columns_to_read_map.contains(cid), "ColumnID({}) not found in columns_to_read_map", cid); - source_columns_of_analyzer.emplace_back(columns_to_read_map.at(cid).name, columns_to_read_map.at(cid).type); - } - std::unique_ptr analyzer - = std::make_unique(source_columns_of_analyzer, context); - - // Build the extra cast - ExpressionActionsPtr extra_cast = nullptr; - // need_cast_column should be the same size as table_scan_column_info and source_columns_of_analyzer - std::vector may_need_add_cast_column; - may_need_add_cast_column.reserve(table_scan_column_info.size()); - for (const auto & col : table_scan_column_info) - may_need_add_cast_column.push_back( - !col.hasGeneratedColumnFlag() && filter_col_id_set.contains(col.id) && col.id != -1); - ExpressionActionsChain chain; - auto & step = analyzer->initAndGetLastStep(chain); - auto & actions = step.actions; - if (auto [has_cast, casted_columns] - = analyzer->buildExtraCastsAfterTS(actions, may_need_add_cast_column, table_scan_column_info); - has_cast) - { - NamesWithAliases project_cols; - for (size_t i = 0; i < columns_to_read.size(); ++i) - { - if (filter_col_id_set.contains(columns_to_read[i].id)) - project_cols.emplace_back(casted_columns[i], columns_to_read[i].name); - } - actions->add(ExpressionAction::project(project_cols)); - - for (const auto & col : *filter_columns) - step.required_output.push_back(col.name); - - extra_cast = chain.getLastActions(); - chain.finalize(); - chain.clear(); - LOG_DEBUG(tracing_logger, "Extra cast for filter columns: {}", extra_cast->dumpActions()); - } - - // build filter expression actions - auto [before_where, filter_column_name, project_after_where] - = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer); - LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions()); - - // record current column defines - auto columns_after_cast = std::make_shared(); - if (extra_cast != nullptr) - { - columns_after_cast->reserve(columns_to_read.size()); - const auto & current_names_and_types = analyzer->getCurrentInputColumns(); - for (size_t i = 0; i < table_scan_column_info.size(); ++i) - { - if (table_scan_column_info[i].hasGeneratedColumnFlag() - || table_scan_column_info[i].id == EXTRA_TABLE_ID_COLUMN_ID) - continue; - auto col = columns_to_read_map.at(table_scan_column_info[i].id); - RUNTIME_CHECK_MSG( - col.name == current_names_and_types[i].name, - "Column name mismatch, expect: {}, actual: {}", - col.name, - current_names_and_types[i].name); - columns_after_cast->push_back(col); - columns_after_cast->back().type = current_names_and_types[i].type; - } - } - - return std::make_shared( - rs_operator, - before_where, - project_after_where, - filter_columns, - filter_column_name, - extra_cast, - columns_after_cast); -} - -DM::PushDownFilterPtr StorageDeltaMerge::parsePushDownFilter( +RuntimeFilteList parseRuntimeFilterList( const SelectQueryInfo & query_info, - const ColumnDefines & columns_to_read, - const Context & context, - const LoggerPtr & tracing_logger) + const Context & db_context, + const LoggerPtr & log) { - const auto & dag_query = query_info.dag_query; - if (unlikely(dag_query == nullptr)) - return EMPTY_FILTER; - - // build rough set operator - const DM::RSOperatorPtr rs_operator = buildRSOperator(dag_query, columns_to_read, context, tracing_logger); - // build push down filter - const auto & columns_to_read_info = dag_query->source_columns; - const auto & pushed_down_filters = dag_query->pushed_down_filters; - if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty()) + if (db_context.getDAGContext() == nullptr || query_info.dag_query == nullptr) { - google::protobuf::RepeatedPtrField merged_filters{ - pushed_down_filters.begin(), - pushed_down_filters.end()}; - merged_filters.MergeFrom(dag_query->filters); - return buildPushDownFilter( - rs_operator, - columns_to_read_info, - merged_filters, - columns_to_read, - context, - tracing_logger); + return std::vector{}; } - return buildPushDownFilter( - rs_operator, - columns_to_read_info, - pushed_down_filters, - columns_to_read, - context, - tracing_logger); + auto runtime_filter_list = db_context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds( + query_info.dag_query->runtime_filter_ids); + LOG_DEBUG(log, "build runtime filter in local stream, list size:{}", runtime_filter_list.size()); + return runtime_filter_list; } +} // namespace + BlockInputStreams StorageDeltaMerge::read( const Names & column_names, @@ -965,12 +783,21 @@ BlockInputStreams StorageDeltaMerge::read( // Read with MVCC filtering RUNTIME_CHECK(query_info.mvcc_query_info != nullptr); const auto & mvcc_query_info = *query_info.mvcc_query_info; + const auto keyspace_id = getTableInfo().getKeyspaceID(); + auto ranges = parseMvccQueryInfo( + mvcc_query_info, + keyspace_id, + tidb_table_info.id, + is_common_handle, + rowkey_column_size, + num_streams, + context, + query_info.req_id, + tracing_logger); - auto ranges = parseMvccQueryInfo(mvcc_query_info, num_streams, context, query_info.req_id, tracing_logger); - - auto filter = parsePushDownFilter(query_info, columns_to_read, context, tracing_logger); + auto filter = PushDownFilter::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); - auto runtime_filter_list = parseRuntimeFilterList(query_info, context); + auto runtime_filter_list = parseRuntimeFilterList(query_info, context, log); const auto & scan_context = mvcc_query_info.scan_context; @@ -992,7 +819,6 @@ BlockInputStreams StorageDeltaMerge::read( extra_table_id_index, scan_context); - auto keyspace_id = getTableInfo().getKeyspaceID(); /// Ensure start_ts info after read. checkStartTs(mvcc_query_info.start_ts, context, query_info.req_id, keyspace_id); @@ -1001,20 +827,6 @@ BlockInputStreams StorageDeltaMerge::read( return streams; } -RuntimeFilteList StorageDeltaMerge::parseRuntimeFilterList( - const SelectQueryInfo & query_info, - const Context & db_context) const -{ - if (db_context.getDAGContext() == nullptr || query_info.dag_query == nullptr) - { - return std::vector{}; - } - auto runtime_filter_list = db_context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds( - query_info.dag_query->runtime_filter_ids); - LOG_DEBUG(log, "build runtime filter in local stream, list size:{}", runtime_filter_list.size()); - return runtime_filter_list; -} - void StorageDeltaMerge::read( PipelineExecutorContext & exec_context_, PipelineExecGroupBuilder & group_builder, @@ -1054,12 +866,21 @@ void StorageDeltaMerge::read( // Read with MVCC filtering RUNTIME_CHECK(query_info.mvcc_query_info != nullptr); const auto & mvcc_query_info = *query_info.mvcc_query_info; + const auto keyspace_id = getTableInfo().getKeyspaceID(); + auto ranges = parseMvccQueryInfo( + mvcc_query_info, + keyspace_id, + tidb_table_info.id, + is_common_handle, + rowkey_column_size, + num_streams, + context, + query_info.req_id, + tracing_logger); - auto ranges = parseMvccQueryInfo(mvcc_query_info, num_streams, context, query_info.req_id, tracing_logger); - - auto filter = parsePushDownFilter(query_info, columns_to_read, context, tracing_logger); + auto filter = PushDownFilter::build(query_info, columns_to_read, store->getTableColumns(), context, tracing_logger); - auto runtime_filter_list = parseRuntimeFilterList(query_info, context); + auto runtime_filter_list = parseRuntimeFilterList(query_info, context, log); const auto & scan_context = mvcc_query_info.scan_context; @@ -1083,7 +904,6 @@ void StorageDeltaMerge::read( extra_table_id_index, scan_context); - auto keyspace_id = getTableInfo().getKeyspaceID(); /// Ensure start_ts info after read. checkStartTs(mvcc_query_info.start_ts, context, query_info.req_id, keyspace_id); @@ -1105,8 +925,18 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr StorageDeltaMerge::writeNodeBuild const ASTSelectQuery & select_query = typeid_cast(*query_info.query); RUNTIME_CHECK(query_info.mvcc_query_info != nullptr); + const auto keyspace_id = getTableInfo().getKeyspaceID(); const auto & mvcc_query_info = *query_info.mvcc_query_info; - auto ranges = parseMvccQueryInfo(mvcc_query_info, num_streams, context, query_info.req_id, tracing_logger); + auto ranges = parseMvccQueryInfo( + mvcc_query_info, + keyspace_id, + tidb_table_info.id, + is_common_handle, + rowkey_column_size, + num_streams, + context, + query_info.req_id, + tracing_logger); auto read_segments = parseSegmentSet(select_query.segment_expression_list); auto snap = store->writeNodeBuildRemoteReadSnapshot( @@ -1120,7 +950,6 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr StorageDeltaMerge::writeNodeBuild snap->column_defines = std::make_shared(columns_to_read); - auto keyspace_id = getTableInfo().getKeyspaceID(); // Ensure start_ts is valid after snapshot is built checkStartTs(mvcc_query_info.start_ts, context, query_info.req_id, keyspace_id); return snap; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4abcfe4d34d..4f3a3651117 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -208,14 +208,6 @@ class StorageDeltaMerge DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); } - static DM::PushDownFilterPtr buildPushDownFilter( - const DM::RSOperatorPtr & rs_operator, - const ColumnInfos & table_scan_column_info, - const google::protobuf::RepeatedPtrField & pushed_down_filters, - const DM::ColumnDefines & columns_to_read, - const Context & context, - const LoggerPtr & tracing_logger); - #ifndef DBMS_PUBLIC_GTEST protected: #endif @@ -252,27 +244,6 @@ class StorageDeltaMerge bool dataDirExist(); void shutdownImpl(); - DM::RSOperatorPtr buildRSOperator( - const std::unique_ptr & dag_query, - const DM::ColumnDefines & columns_to_read, - const Context & context, - const LoggerPtr & tracing_logger); - /// Get filters from query to construct rough set operation and push down filters. - DM::PushDownFilterPtr parsePushDownFilter( - const SelectQueryInfo & query_info, - const DM::ColumnDefines & columns_to_read, - const Context & context, - const LoggerPtr & tracing_logger); - - DM::RowKeyRanges parseMvccQueryInfo( - const DB::MvccQueryInfo & mvcc_query_info, - unsigned num_streams, - const Context & context, - const String & req_id, - const LoggerPtr & tracing_logger); - - RuntimeFilteList parseRuntimeFilterList(const SelectQueryInfo & query_info, const Context & db_context) const; - #ifndef DBMS_PUBLIC_GTEST private: #endif diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 8346e4a9a1e..1ae0bc889e2 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -501,19 +501,8 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator( std::vector{}, 0, db_context.getTimezoneInfo()); - auto create_attr_by_column_id = [defines = columns_to_read](ColumnID column_id) -> DM::Attr { - auto iter = std::find_if(defines->begin(), defines->end(), [column_id](const DM::ColumnDefine & d) -> bool { - return d.id == column_id; - }); - if (iter != defines->end()) - return DM::Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; - return DM::Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; - }; - auto rs_operator - = DM::FilterParser::parseDAGQuery(*dag_query, *columns_to_read, std::move(create_attr_by_column_id), log); - if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) - LOG_DEBUG(log, "Rough set filter: {}", rs_operator->toDebugString()); - return rs_operator; + + return DM::RSOperator::build(dag_query, *columns_to_read, *columns_to_read, enable_rs_filter, log); } std::variant StorageDisaggregated::packSegmentReadTasks( @@ -526,7 +515,7 @@ std::variant StorageDisagg const auto & executor_id = table_scan.getTableScanExecutorID(); auto rs_operator = buildRSOperator(db_context, column_defines); - auto push_down_filter = StorageDeltaMerge::buildPushDownFilter( + auto push_down_filter = DM::PushDownFilter::build( rs_operator, table_scan.getColumns(), table_scan.getPushedDownFilters(), diff --git a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp index 0c53b8ac835..dcac19ab21b 100644 --- a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp +++ b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp @@ -114,13 +114,8 @@ DM::PushDownFilterPtr ParsePushDownFilterTest::generatePushDownFilter( auto rs_operator = DM::FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log); - auto push_down_filter = StorageDeltaMerge::buildPushDownFilter( - rs_operator, - table_info.columns, - pushed_down_filters, - columns_to_read, - *ctx, - log); + auto push_down_filter + = DM::PushDownFilter::build(rs_operator, table_info.columns, pushed_down_filters, columns_to_read, *ctx, log); return push_down_filter; } From bc01ee6006bc4d12863d74d28f93d9290567f641 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 13 Jun 2024 17:51:25 +0800 Subject: [PATCH 2/5] Refine some logging --- .../DeltaMerge/Filter/PushDownFilter.cpp | 4 +- .../Storages/DeltaMerge/Filter/RSOperator.cpp | 42 +++++++++---------- .../Storages/DeltaMerge/Filter/RSOperator.h | 6 ++- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp index 53357202507..94178e33762 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 PingCAP, Inc. +// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -194,4 +194,4 @@ PushDownFilterPtr PushDownFilter::build( context, tracing_logger); } -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp index fc668c559e0..a242619fc83 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -57,30 +58,27 @@ RSOperatorPtr RSOperator::build( { RUNTIME_CHECK(dag_query != nullptr); // build rough set operator - DM::RSOperatorPtr rs_operator = DM::EMPTY_RS_OPERATOR; - if (likely(enable_rs_filter)) + if (unlikely(!enable_rs_filter)) { - /// Query from TiDB / TiSpark - auto create_attr_by_column_id = [&table_column_defines](ColumnID column_id) -> Attr { - auto iter = std::find_if( - table_column_defines.begin(), - table_column_defines.end(), - [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; }); - if (iter != table_column_defines.end()) - return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; - // Maybe throw an exception? Or check if `type` is nullptr before creating filter? - return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; - }; - rs_operator = FilterParser::parseDAGQuery( - *dag_query, - columns_to_read, - std::move(create_attr_by_column_id), - tracing_logger); - if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) - LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); - } - else LOG_DEBUG(tracing_logger, "Rough set filter is disabled."); + return EMPTY_RS_OPERATOR; + } + + /// Query from TiDB / TiSpark + auto create_attr_by_column_id = [&table_column_defines](ColumnID column_id) -> Attr { + auto iter = std::find_if( + table_column_defines.begin(), + table_column_defines.end(), + [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; }); + if (iter != table_column_defines.end()) + return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type}; + // Maybe throw an exception? Or check if `type` is nullptr before creating filter? + return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}}; + }; + DM::RSOperatorPtr rs_operator + = FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), tracing_logger); + if (likely(rs_operator != DM::EMPTY_RS_OPERATOR)) + LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); return rs_operator; } diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h index b054269a989..c7333e8dbb3 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h @@ -15,11 +15,15 @@ #pragma once #include -#include #include #include #include +namespace DB +{ +struct DAGQueryInfo; +} + namespace DB::DM { From 3886becc820ef99fad3406dc2e242e7d902be41d Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 13 Jun 2024 17:55:54 +0800 Subject: [PATCH 3/5] Add --clang_format for format-diff.py --- format-diff.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/format-diff.py b/format-diff.py index 4674d7e40c8..8f496894a65 100755 --- a/format-diff.py +++ b/format-diff.py @@ -28,6 +28,14 @@ def run_cmd(cmd, show_cmd=False): print("RUN CMD: {}".format(cmd)) return res +def try_find_clang_format(exec_path): + candidates = ['clang-format-15', 'clang-format'] + if exec_path is not None: + candidates.insert(0, exec_path) + for c in candidates: + if which(c) is not None: + return c + return candidates[-1] def main(): default_suffix = ['.cpp', '.h', '.cc', '.hpp'] @@ -39,12 +47,14 @@ def main(): help='suffix of files to format, split by space', default=' '.join(default_suffix)) parser.add_argument('--ignore_suffix', help='ignore files with suffix, split by space') - parser.add_argument( - '--diff_from', help='commit hash/branch to check git diff', default='HEAD') + parser.add_argument('--diff_from', + help='commit hash/branch to check git diff', default='HEAD') parser.add_argument('--check_formatted', help='exit -1 if NOT formatted', action='store_true') parser.add_argument('--dump_diff_files_to', help='dump diff file names to specific path', default=None) + parser.add_argument('--clang_format', + help='path to clang-format', default=None) args = parser.parse_args() default_suffix = args.suffix.strip().split(' ') if args.suffix else [] @@ -83,9 +93,7 @@ def main(): if files_to_format: print('Files to format:\n {}'.format('\n '.join(files_to_format))) - clang_format_cmd = 'clang-format-15' - if which(clang_format_cmd) is None: - clang_format_cmd = 'clang-format' + clang_format_cmd = try_find_clang_format(args.clang_format) for file in files_to_format: cmd = clang_format_cmd + ' -i {}'.format(file) if subprocess.Popen(cmd, shell=True, cwd=tiflash_repo_path).wait(): From b98eca9eebe2d5de74ebf7a27d6c595719d86217 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 13 Jun 2024 18:13:43 +0800 Subject: [PATCH 4/5] Fix compile error --- dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h index 27fe0c7ee49..0b5a43af579 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include From 047d6c9c8f46ce08d3bed8b67d5c467c62084a29 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 13 Jun 2024 18:33:58 +0800 Subject: [PATCH 5/5] Fix compile error --- dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp index 94178e33762..6f2a6497de8 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include