diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index f75878ceee0..052af71b8bc 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -352,11 +352,21 @@ SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_statu void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder) { + /// handle generated column if necessary. + executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log); + NamesAndTypes source_columns; + source_columns.reserve(table_scan.getColumnSize()); + const auto table_scan_output_header = group_builder.getCurrentHeader(); + for (const auto & col : table_scan_output_header) + source_columns.emplace_back(col.name, col.type); + analyzer = std::make_unique(std::move(source_columns), context); + /// If there is no local source, there is no need to execute cast and push down filter, return directly. + /// But we should make sure that the analyzer is initialized before return. + if (remote_read_sources_start_index == 0) + return; /// handle timezone/duration cast for local table scan. executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index); - executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log); - /// handle filter conditions for local and remote table scan. if (filter_conditions.hasValue()) { @@ -431,11 +441,25 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); - - /// handle timezone/duration cast for local and remote table scan. - executeCastAfterTableScan(remote_read_streams_start_index, pipeline); /// handle generated column if necessary. executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline); + NamesAndTypes source_columns; + source_columns.reserve(table_scan.getColumnSize()); + const auto table_scan_output_header = pipeline.firstStream()->getHeader(); + for (const auto & col : table_scan_output_header) + source_columns.emplace_back(col.name, col.type); + analyzer = std::make_unique(std::move(source_columns), context); + /// If there is no local stream, there is no need to execute cast and push down filter, return directly. + /// But we should make sure that the analyzer is initialized before return. + if (remote_read_streams_start_index == 0) + { + recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); + if (filter_conditions.hasValue()) + recordProfileStreams(pipeline, filter_conditions.executor_id); + return; + } + /// handle timezone/duration cast for local and remote table scan. + executeCastAfterTableScan(remote_read_streams_start_index, pipeline); recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); /// handle filter conditions for local and remote table scan. @@ -478,9 +502,7 @@ void DAGStorageInterpreter::prepare() assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end()); storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage; - std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(); - - analyzer = std::make_unique(std::move(source_columns), context); + std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan(); } void DAGStorageInterpreter::executeCastAfterTableScan( @@ -1222,12 +1244,10 @@ std::unordered_map DAG return storages_with_lock; } -std::tuple> DAGStorageInterpreter::getColumnsForTableScan() +std::tuple> DAGStorageInterpreter::getColumnsForTableScan() { Names required_columns_tmp; required_columns_tmp.reserve(table_scan.getColumnSize()); - NamesAndTypes source_columns_tmp; - source_columns_tmp.reserve(table_scan.getColumnSize()); std::vector need_cast_column; need_cast_column.reserve(table_scan.getColumnSize()); String handle_column_name = MutableSupport::tidb_pk_column_name; @@ -1245,7 +1265,6 @@ std::tuple> DAGStorageIn const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci); const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); generated_column_infos.push_back(std::make_tuple(i, col_name, data_type)); - source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type}); continue; } // Column ID -1 return the handle column @@ -1256,16 +1275,6 @@ std::tuple> DAGStorageIn name = MutableSupport::extra_table_id_column_name; else name = storage_for_logical_table->getTableInfo().getColumnName(cid); - if (cid == ExtraTableIDColumnID) - { - NameAndTypePair extra_table_id_column_pair = {name, MutableSupport::extra_table_id_column_type}; - source_columns_tmp.emplace_back(std::move(extra_table_id_column_pair)); - } - else - { - auto pair = storage_for_logical_table->getColumns().getPhysical(name); - source_columns_tmp.emplace_back(std::move(pair)); - } required_columns_tmp.emplace_back(std::move(name)); } @@ -1276,6 +1285,12 @@ std::tuple> DAGStorageIn } for (const auto & col : table_scan.getColumns()) { + if (col.hasGeneratedColumnFlag()) + { + need_cast_column.push_back(ExtraCastAfterTSMode::None); + continue; + } + if (col_id_set.contains(col.id)) { need_cast_column.push_back(ExtraCastAfterTSMode::None); @@ -1291,7 +1306,7 @@ std::tuple> DAGStorageIn } } - return {required_columns_tmp, source_columns_tmp, need_cast_column}; + return {required_columns_tmp, need_cast_column}; } // Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions` diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index 74ef84cb7a5..382594bf753 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -98,7 +98,7 @@ class DAGStorageInterpreter std::unordered_map getAndLockStorages(Int64 query_schema_version); - std::tuple> getColumnsForTableScan(); + std::tuple> getColumnsForTableScan(); std::vector buildRemoteRequests(const DM::ScanContextPtr & scan_context); @@ -164,7 +164,6 @@ class DAGStorageInterpreter std::unordered_map storages_with_structure_lock; ManageableStoragePtr storage_for_logical_table; Names required_columns; - NamesAndTypes source_columns; // For generated column, just need a placeholder, and TiDB will fill this column. std::vector> generated_column_infos; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index bef99e57b55..878e0930f92 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1087,7 +1087,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, { stream = std::make_shared( read_task_pool, - columns_to_read, + filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read, extra_table_id_index, physical_table_id, log_tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h index 0d7e21e10cd..fd910a9a6ba 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h @@ -31,19 +31,18 @@ class PushDownFilter : public std::enable_shared_from_this const ExpressionActionsPtr & beofre_where_, const ColumnDefines & filter_columns_, const String filter_column_name_, - const ExpressionActionsPtr & extra_cast_) + const ExpressionActionsPtr & extra_cast_, + const ColumnDefinesPtr & columns_after_cast_) : rs_operator(rs_operator_) , before_where(beofre_where_) , filter_column_name(std::move(filter_column_name_)) , filter_columns(std::move(filter_columns_)) , extra_cast(extra_cast_) + , columns_after_cast(columns_after_cast_) {} explicit PushDownFilter(const RSOperatorPtr & rs_operator_) : rs_operator(rs_operator_) - , before_where(nullptr) - , filter_columns({}) - , extra_cast(nullptr) {} // Rough set operator @@ -56,6 +55,8 @@ class PushDownFilter : public std::enable_shared_from_this ColumnDefines filter_columns; // The expression actions used to cast the timestamp/datetime column ExpressionActionsPtr extra_cast; + // If the extra_cast is not null, the types of the columns may be changed + ColumnDefinesPtr columns_after_cast; }; } // namespace DB::DM diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index efd8dc1d6bf..b4cf77e0aa2 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -839,7 +839,22 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer); LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions()); - return std::make_shared(rs_operator, before_where, filter_columns, filter_column_name, extra_cast); + auto columns_after_cast = std::make_shared(); + if (extra_cast != nullptr) + { + columns_after_cast->reserve(columns_to_read.size()); + const auto & current_names_and_types = analyzer->getCurrentInputColumns(); + for (size_t i = 0; i < table_scan_column_info.size(); ++i) + { + if (table_scan_column_info[i].hasGeneratedColumnFlag()) + continue; + auto col = columns_to_read_map.at(table_scan_column_info[i].id); + RUNTIME_CHECK_MSG(col.name == current_names_and_types[i].name, "Column name mismatch, expect: {}, actual: {}", col.name, current_names_and_types[i].name); + columns_after_cast->push_back(col); + columns_after_cast->back().type = current_names_and_types[i].type; + } + } + return std::make_shared(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast); } LOG_DEBUG(tracing_logger, "Push down filter is empty"); return std::make_shared(rs_operator); diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index f40776286d9..bc0e732477a 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -305,4 +306,53 @@ void StorageDisaggregated::filterConditions(DAGExpressionAnalyzer & analyzer, DA pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); } } + +void StorageDisaggregated::extraCast(DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline) +{ + // If the column is not in the columns of pushed down filter, append a cast to the column. + std::vector need_cast_column; + need_cast_column.reserve(table_scan.getColumnSize()); + std::unordered_set col_id_set; + for (const auto & expr : table_scan.getPushedDownFilters()) + { + getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set); + } + bool has_need_cast_column = false; + for (const auto & col : table_scan.getColumns()) + { + if (col_id_set.contains(col.id)) + { + need_cast_column.push_back(ExtraCastAfterTSMode::None); + } + else + { + if (col.id != -1 && col.tp == TiDB::TypeTimestamp) + { + need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast); + has_need_cast_column = true; + } + else if (col.id != -1 && col.tp == TiDB::TypeTime) + { + need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast); + has_need_cast_column = true; + } + else + { + need_cast_column.push_back(ExtraCastAfterTSMode::None); + } + } + } + ExpressionActionsChain chain; + if (has_need_cast_column && analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan)) + { + ExpressionActionsPtr extra_cast = chain.getLastActions(); + chain.finalize(); + chain.clear(); + for (auto & stream : pipeline.streams) + { + stream = std::make_shared(stream, extra_cast, log->identifier()); + stream->setExtraInfo("cast after local tableScan"); + } + } +} } // namespace DB diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index a06a5ba34d2..17a481e6ba7 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -156,56 +155,11 @@ BlockInputStreams StorageDisaggregated::readThroughS3( { source_columns.emplace_back(col.name, col.type); } - analyzer = std::make_unique(std::move(source_columns), context); // Handle duration type column - // If the column is not in the columns of pushed down filter, append a cast to the column. - std::vector need_cast_column; - need_cast_column.reserve(table_scan.getColumnSize()); - std::unordered_set col_id_set; - for (const auto & expr : table_scan.getPushedDownFilters()) - { - getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set); - } - bool has_need_cast_column = false; - for (const auto & col : table_scan.getColumns()) - { - if (col_id_set.contains(col.id)) - { - need_cast_column.push_back(ExtraCastAfterTSMode::None); - } - else - { - if (col.id != -1 && col.tp == TiDB::TypeTimestamp) - { - need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast); - has_need_cast_column = true; - } - else if (col.id != -1 && col.tp == TiDB::TypeTime) - { - need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast); - has_need_cast_column = true; - } - else - { - need_cast_column.push_back(ExtraCastAfterTSMode::None); - } - } - } - ExpressionActionsChain chain; - if (has_need_cast_column && analyzer->appendExtraCastsAfterTS(chain, need_cast_column, table_scan)) - { - ExpressionActionsPtr extra_cast = chain.getLastActions(); - chain.finalize(); - chain.clear(); - for (auto & stream : pipeline.streams) - { - stream = std::make_shared(stream, extra_cast, log->identifier()); - stream->setExtraInfo("cast after local tableScan"); - } - } - + extraCast(*analyzer, pipeline); + // Handle filter filterConditions(*analyzer, pipeline); return pipeline.streams; } diff --git a/tests/fullstack-test/expr/duration_filter_late_materialization.test b/tests/fullstack-test/expr/duration_filter_late_materialization.test new file mode 100644 index 00000000000..63b496c3f5d --- /dev/null +++ b/tests/fullstack-test/expr/duration_filter_late_materialization.test @@ -0,0 +1,47 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a time(4), i int); + +# insert more than 8192 rows to make sure filter conditions can be pushed down. +mysql> insert into test.t values('-700:10:10.123456', 1), ('700:11:11.123500', 2), ('600:11:11.123500', 3); +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; + +mysql> alter table test.t set tiflash replica 1; + +func> wait_table test t + +mysql> select * from test.t where a = '500:11:11.123500'; +# success, but the result is empty +mysql> select hour(a), i from test.t where a = '500:11:11.123500'; +mysql> select minute(a), i from test.t where a = '500:11:11.123500'; +mysql> select second(a), i from test.t where a = '500:11:11.123500'; +mysql> select a, i from test.t where hour(a) = 500; +mysql> select a, i from test.t where minute(a) = 13; +mysql> select a, i from test.t where second(a) = 14; + +mysql> drop table test.t; \ No newline at end of file diff --git a/tests/fullstack-test/mpp/late_materialization_generate_column.test b/tests/fullstack-test/mpp/late_materialization_generate_column.test index a3021e70944..ffc382c1cf7 100644 --- a/tests/fullstack-test/mpp/late_materialization_generate_column.test +++ b/tests/fullstack-test/mpp/late_materialization_generate_column.test @@ -13,7 +13,7 @@ # limitations under the License. -mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` float DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; +mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` time DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) values (NULL, NULL, NULL, NULL, NULL, NULL); mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539; mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539;