From 61754ed0be2022e2bc3286cfa00c19b2e099eea4 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Tue, 16 May 2023 16:43:18 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #7469 Signed-off-by: ti-chi-bot --- .../Coprocessor/DAGStorageInterpreter.cpp | 167 ++++++++++++++++-- .../Flash/Coprocessor/DAGStorageInterpreter.h | 3 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 4 + .../DeltaMerge/Filter/PushDownFilter.h | 15 ++ dbms/src/Storages/StorageDeltaMerge.cpp | 18 ++ dbms/src/Storages/StorageDisaggregated.cpp | 50 ++++++ .../Storages/StorageDisaggregatedRemote.cpp | 50 +----- .../duration_filter_late_materialization.test | 47 +++++ .../late_materialization_generate_column.test | 2 +- 9 files changed, 290 insertions(+), 66 deletions(-) create mode 100644 tests/fullstack-test/expr/duration_filter_late_materialization.test diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index ad6ecfbb654..c78ff804821 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -274,6 +274,106 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) executeImpl(pipeline); } +<<<<<<< HEAD +======= +SourceOps DAGStorageInterpreter::execute(PipelineExecutorStatus & exec_status) +{ + prepare(); // learner read + + return executeImpl(exec_status); +} + +SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_status) +{ + auto & dag_context = dagContext(); + + auto scan_context = std::make_shared(); + dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; + mvcc_query_info->scan_context = scan_context; + + SourceOps source_ops; + if (!mvcc_query_info->regions_query_info.empty()) + { + source_ops = buildLocalSourceOps(exec_status, context.getSettingsRef().max_block_size); + } + + // Should build `remote_requests` and `nullSourceOp` under protect of `table_structure_lock`. + if (source_ops.empty()) + { + source_ops.emplace_back(std::make_unique( + exec_status, + storage_for_logical_table->getSampleBlockForColumns(required_columns), + log->identifier())); + } + + // Note that `buildRemoteRequests` must be called after `buildLocalSourceOps` because + // `buildLocalSourceOps` will setup `region_retry_from_local_region` and we must + // retry those regions or there will be data lost. + auto remote_requests = buildRemoteRequests(scan_context); + + if (dag_context.is_disaggregated_task && !remote_requests.empty()) + { + // This means RN is sending requests with stale region info, we simply reject the request + // and ask RN to send requests again with correct region info. When RN updates region info, + // RN may be sending requests to other WN. + + RegionException::UnavailableRegions region_ids; + for (const auto & info : context.getDAGContext()->retry_regions) + region_ids.insert(info.region_id); + + throw RegionException( + std::move(region_ids), + RegionException::RegionReadStatus::EPOCH_NOT_MATCH); + } + + FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired); + + // Release alter locks + // The DeltaTree engine ensures that once sourceOps are created, the caller can get a consistent result + // from those sourceOps even if DDL operations are applied. Release the alter lock so that reading does not + // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. + const TableLockHolders drop_locks = releaseAlterLocks(); + + remote_read_sources_start_index = source_ops.size(); + + if (!remote_requests.empty()) + buildRemoteSourceOps(source_ops, exec_status, remote_requests); + + for (const auto & lock : drop_locks) + dagContext().addTableLock(lock); + + FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); + FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); + + return source_ops; +} + +void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder) +{ + /// handle generated column if necessary. + executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log); + NamesAndTypes source_columns; + source_columns.reserve(table_scan.getColumnSize()); + const auto table_scan_output_header = group_builder.getCurrentHeader(); + for (const auto & col : table_scan_output_header) + source_columns.emplace_back(col.name, col.type); + analyzer = std::make_unique(std::move(source_columns), context); + /// If there is no local source, there is no need to execute cast and push down filter, return directly. + /// But we should make sure that the analyzer is initialized before return. + if (remote_read_sources_start_index == 0) + return; + /// handle timezone/duration cast for local table scan. + executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index); + + /// handle filter conditions for local and remote table scan. + if (filter_conditions.hasValue()) + { + ::DB::executePushedDownFilter(exec_status, group_builder, remote_read_sources_start_index, filter_conditions, *analyzer, log); + /// TODO: record profile + } +} + +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) { auto & dag_context = dagContext(); @@ -340,6 +440,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); +<<<<<<< HEAD if (!table_scan.getPushedDownFilters().empty()) { @@ -357,8 +458,27 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) /// handle timezone/duration cast for local and remote table scan. executeCastAfterTableScan(remote_read_streams_start_index, pipeline); +======= +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) /// handle generated column if necessary. executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline); + NamesAndTypes source_columns; + source_columns.reserve(table_scan.getColumnSize()); + const auto table_scan_output_header = pipeline.firstStream()->getHeader(); + for (const auto & col : table_scan_output_header) + source_columns.emplace_back(col.name, col.type); + analyzer = std::make_unique(std::move(source_columns), context); + /// If there is no local stream, there is no need to execute cast and push down filter, return directly. + /// But we should make sure that the analyzer is initialized before return. + if (remote_read_streams_start_index == 0) + { + recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); + if (filter_conditions.hasValue()) + recordProfileStreams(pipeline, filter_conditions.executor_id); + return; + } + /// handle timezone/duration cast for local and remote table scan. + executeCastAfterTableScan(remote_read_streams_start_index, pipeline); recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); /// handle filter conditions for local and remote table scan. @@ -401,7 +521,31 @@ void DAGStorageInterpreter::prepare() assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end()); storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage; +<<<<<<< HEAD std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(); +======= + std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan(); +} + +void DAGStorageInterpreter::executeCastAfterTableScan( + PipelineExecutorStatus & exec_status, + PipelineExecGroupBuilder & group_builder, + size_t remote_read_sources_start_index) +{ + // execute timezone cast or duration cast if needed for local table scan + auto [has_cast, extra_cast] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan); + if (has_cast) + { + RUNTIME_CHECK(remote_read_sources_start_index <= group_builder.group.size()); + size_t i = 0; + // local sources + while (i < remote_read_sources_start_index) + { + auto & group = group_builder.group[i++]; + group.appendTransformOp(std::make_unique(exec_status, log->identifier(), extra_cast)); + } + } +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) } void DAGStorageInterpreter::executeCastAfterTableScan( @@ -1001,12 +1145,10 @@ std::unordered_map DAG return storages_with_lock; } -std::tuple> DAGStorageInterpreter::getColumnsForTableScan() +std::tuple> DAGStorageInterpreter::getColumnsForTableScan() { Names required_columns_tmp; required_columns_tmp.reserve(table_scan.getColumnSize()); - NamesAndTypes source_columns_tmp; - source_columns_tmp.reserve(table_scan.getColumnSize()); std::vector need_cast_column; need_cast_column.reserve(table_scan.getColumnSize()); String handle_column_name = MutableSupport::tidb_pk_column_name; @@ -1024,7 +1166,6 @@ std::tuple> DAGStorageIn const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci); const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); generated_column_infos.push_back(std::make_tuple(i, col_name, data_type)); - source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type}); continue; } // Column ID -1 return the handle column @@ -1035,16 +1176,6 @@ std::tuple> DAGStorageIn name = MutableSupport::extra_table_id_column_name; else name = storage_for_logical_table->getTableInfo().getColumnName(cid); - if (cid == ExtraTableIDColumnID) - { - NameAndTypePair extra_table_id_column_pair = {name, MutableSupport::extra_table_id_column_type}; - source_columns_tmp.emplace_back(std::move(extra_table_id_column_pair)); - } - else - { - auto pair = storage_for_logical_table->getColumns().getPhysical(name); - source_columns_tmp.emplace_back(std::move(pair)); - } required_columns_tmp.emplace_back(std::move(name)); } @@ -1055,6 +1186,12 @@ std::tuple> DAGStorageIn } for (const auto & col : table_scan.getColumns()) { + if (col.hasGeneratedColumnFlag()) + { + need_cast_column.push_back(ExtraCastAfterTSMode::None); + continue; + } + if (col_id_set.contains(col.id)) { need_cast_column.push_back(ExtraCastAfterTSMode::None); @@ -1070,7 +1207,7 @@ std::tuple> DAGStorageIn } } - return {required_columns_tmp, source_columns_tmp, need_cast_column}; + return {required_columns_tmp, need_cast_column}; } // Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions` diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index fa8e81ffd44..dfbfcc70076 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -82,7 +82,7 @@ class DAGStorageInterpreter std::unordered_map getAndLockStorages(Int64 query_schema_version); - std::tuple> getColumnsForTableScan(); + std::tuple> getColumnsForTableScan(); std::vector buildRemoteRequests(const DM::ScanContextPtr & scan_context); @@ -136,7 +136,6 @@ class DAGStorageInterpreter std::unordered_map storages_with_structure_lock; ManageableStoragePtr storage_for_logical_table; Names required_columns; - NamesAndTypes source_columns; // For generated column, just need a placeholder, and TiDB will fill this column. std::vector> generated_column_infos; }; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index e3ee1c05dcf..0036a47b578 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1086,7 +1086,11 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, { stream = std::make_shared( read_task_pool, +<<<<<<< HEAD filter && filter->before_where ? filter->columns_after_cast : columns_to_read, +======= + filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read, +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) extra_table_id_index, physical_table_id, log_tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h index af6d2b3322a..39f7b35c1cf 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h @@ -32,21 +32,32 @@ class PushDownFilter : public std::enable_shared_from_this const ColumnDefines & filter_columns_, const String filter_column_name_, const ExpressionActionsPtr & extra_cast_, +<<<<<<< HEAD const ColumnDefines & columns_after_cast_) +======= + const ColumnDefinesPtr & columns_after_cast_) +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) : rs_operator(rs_operator_) , before_where(beofre_where_) , filter_column_name(std::move(filter_column_name_)) , filter_columns(std::move(filter_columns_)) , extra_cast(extra_cast_) +<<<<<<< HEAD , columns_after_cast(std::move(columns_after_cast_)) +======= + , columns_after_cast(columns_after_cast_) +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) {} explicit PushDownFilter(const RSOperatorPtr & rs_operator_) : rs_operator(rs_operator_) +<<<<<<< HEAD , before_where(nullptr) , filter_columns({}) , extra_cast(nullptr) , columns_after_cast({}) +======= +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) {} // Rough set operator @@ -60,7 +71,11 @@ class PushDownFilter : public std::enable_shared_from_this // The expression actions used to cast the timestamp/datetime column ExpressionActionsPtr extra_cast; // If the extra_cast is not null, the types of the columns may be changed +<<<<<<< HEAD ColumnDefines columns_after_cast; +======= + ColumnDefinesPtr columns_after_cast; +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) }; } // namespace DB::DM diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 4036e5f6493..ea7590d7a74 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -839,6 +839,7 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer); LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions()); +<<<<<<< HEAD ColumnDefines columns_after_cast; columns_after_cast.reserve(columns_to_read.size()); const auto & source_columns = analyzer->getCurrentInputColumns(); @@ -852,6 +853,23 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr columns_after_cast.back().type = source_columns[i].type; } +======= + auto columns_after_cast = std::make_shared(); + if (extra_cast != nullptr) + { + columns_after_cast->reserve(columns_to_read.size()); + const auto & current_names_and_types = analyzer->getCurrentInputColumns(); + for (size_t i = 0; i < table_scan_column_info.size(); ++i) + { + if (table_scan_column_info[i].hasGeneratedColumnFlag()) + continue; + auto col = columns_to_read_map.at(table_scan_column_info[i].id); + RUNTIME_CHECK_MSG(col.name == current_names_and_types[i].name, "Column name mismatch, expect: {}, actual: {}", col.name, current_names_and_types[i].name); + columns_after_cast->push_back(col); + columns_after_cast->back().type = current_names_and_types[i].type; + } + } +>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) return std::make_shared(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast); } LOG_DEBUG(tracing_logger, "Push down filter is empty"); diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 3c90c1e1481..c71a58b685f 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -303,4 +304,53 @@ void StorageDisaggregated::filterConditions(DAGExpressionAnalyzer & analyzer, DA pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); } } + +void StorageDisaggregated::extraCast(DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline) +{ + // If the column is not in the columns of pushed down filter, append a cast to the column. + std::vector need_cast_column; + need_cast_column.reserve(table_scan.getColumnSize()); + std::unordered_set col_id_set; + for (const auto & expr : table_scan.getPushedDownFilters()) + { + getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set); + } + bool has_need_cast_column = false; + for (const auto & col : table_scan.getColumns()) + { + if (col_id_set.contains(col.id)) + { + need_cast_column.push_back(ExtraCastAfterTSMode::None); + } + else + { + if (col.id != -1 && col.tp == TiDB::TypeTimestamp) + { + need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast); + has_need_cast_column = true; + } + else if (col.id != -1 && col.tp == TiDB::TypeTime) + { + need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast); + has_need_cast_column = true; + } + else + { + need_cast_column.push_back(ExtraCastAfterTSMode::None); + } + } + } + ExpressionActionsChain chain; + if (has_need_cast_column && analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan)) + { + ExpressionActionsPtr extra_cast = chain.getLastActions(); + chain.finalize(); + chain.clear(); + for (auto & stream : pipeline.streams) + { + stream = std::make_shared(stream, extra_cast, log->identifier()); + stream->setExtraInfo("cast after local tableScan"); + } + } +} } // namespace DB diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index a06a5ba34d2..17a481e6ba7 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -156,56 +155,11 @@ BlockInputStreams StorageDisaggregated::readThroughS3( { source_columns.emplace_back(col.name, col.type); } - analyzer = std::make_unique(std::move(source_columns), context); // Handle duration type column - // If the column is not in the columns of pushed down filter, append a cast to the column. - std::vector need_cast_column; - need_cast_column.reserve(table_scan.getColumnSize()); - std::unordered_set col_id_set; - for (const auto & expr : table_scan.getPushedDownFilters()) - { - getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set); - } - bool has_need_cast_column = false; - for (const auto & col : table_scan.getColumns()) - { - if (col_id_set.contains(col.id)) - { - need_cast_column.push_back(ExtraCastAfterTSMode::None); - } - else - { - if (col.id != -1 && col.tp == TiDB::TypeTimestamp) - { - need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast); - has_need_cast_column = true; - } - else if (col.id != -1 && col.tp == TiDB::TypeTime) - { - need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast); - has_need_cast_column = true; - } - else - { - need_cast_column.push_back(ExtraCastAfterTSMode::None); - } - } - } - ExpressionActionsChain chain; - if (has_need_cast_column && analyzer->appendExtraCastsAfterTS(chain, need_cast_column, table_scan)) - { - ExpressionActionsPtr extra_cast = chain.getLastActions(); - chain.finalize(); - chain.clear(); - for (auto & stream : pipeline.streams) - { - stream = std::make_shared(stream, extra_cast, log->identifier()); - stream->setExtraInfo("cast after local tableScan"); - } - } - + extraCast(*analyzer, pipeline); + // Handle filter filterConditions(*analyzer, pipeline); return pipeline.streams; } diff --git a/tests/fullstack-test/expr/duration_filter_late_materialization.test b/tests/fullstack-test/expr/duration_filter_late_materialization.test new file mode 100644 index 00000000000..63b496c3f5d --- /dev/null +++ b/tests/fullstack-test/expr/duration_filter_late_materialization.test @@ -0,0 +1,47 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a time(4), i int); + +# insert more than 8192 rows to make sure filter conditions can be pushed down. +mysql> insert into test.t values('-700:10:10.123456', 1), ('700:11:11.123500', 2), ('600:11:11.123500', 3); +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; +mysql> insert into test.t select * from test.t; + +mysql> alter table test.t set tiflash replica 1; + +func> wait_table test t + +mysql> select * from test.t where a = '500:11:11.123500'; +# success, but the result is empty +mysql> select hour(a), i from test.t where a = '500:11:11.123500'; +mysql> select minute(a), i from test.t where a = '500:11:11.123500'; +mysql> select second(a), i from test.t where a = '500:11:11.123500'; +mysql> select a, i from test.t where hour(a) = 500; +mysql> select a, i from test.t where minute(a) = 13; +mysql> select a, i from test.t where second(a) = 14; + +mysql> drop table test.t; \ No newline at end of file diff --git a/tests/fullstack-test/mpp/late_materialization_generate_column.test b/tests/fullstack-test/mpp/late_materialization_generate_column.test index c2471e69cc4..cb027f51c1f 100644 --- a/tests/fullstack-test/mpp/late_materialization_generate_column.test +++ b/tests/fullstack-test/mpp/late_materialization_generate_column.test @@ -13,7 +13,7 @@ # limitations under the License. -mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` float DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; +mysql> CREATE TABLE test.`IDT_26539` (`COL102` float DEFAULT NULL, `COL103` float DEFAULT NULL, `COL1` float GENERATED ALWAYS AS ((`COL102` DIV 10)) VIRTUAL, `COL2` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `COL4` datetime DEFAULT NULL, `COL3` bigint DEFAULT NULL, `COL5` time DEFAULT NULL, KEY `UK_COL1` (`COL1`) /*!80000 INVISIBLE */) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) values (NULL, NULL, NULL, NULL, NULL, NULL); mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539; mysql> insert into test.IDT_26539 (COL102, COL103, COL2, COL4, COL3, COL5) select COL102, COL103, COL2, COL4, COL3, COL5 from test.IDT_26539; From 939c6a1486c2eb874ee187e6bd4471b3da705b9c Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Tue, 16 May 2023 16:57:53 +0800 Subject: [PATCH 2/3] fix conflict Signed-off-by: Lloyd-Pottiger --- .../Coprocessor/DAGStorageInterpreter.cpp | 143 ------------------ .../Storages/DeltaMerge/DeltaMergeStore.cpp | 4 - .../DeltaMerge/Filter/PushDownFilter.h | 19 --- dbms/src/Storages/StorageDeltaMerge.cpp | 16 -- 4 files changed, 182 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index c78ff804821..ad03cf37f24 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -274,106 +274,6 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) executeImpl(pipeline); } -<<<<<<< HEAD -======= -SourceOps DAGStorageInterpreter::execute(PipelineExecutorStatus & exec_status) -{ - prepare(); // learner read - - return executeImpl(exec_status); -} - -SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_status) -{ - auto & dag_context = dagContext(); - - auto scan_context = std::make_shared(); - dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; - mvcc_query_info->scan_context = scan_context; - - SourceOps source_ops; - if (!mvcc_query_info->regions_query_info.empty()) - { - source_ops = buildLocalSourceOps(exec_status, context.getSettingsRef().max_block_size); - } - - // Should build `remote_requests` and `nullSourceOp` under protect of `table_structure_lock`. - if (source_ops.empty()) - { - source_ops.emplace_back(std::make_unique( - exec_status, - storage_for_logical_table->getSampleBlockForColumns(required_columns), - log->identifier())); - } - - // Note that `buildRemoteRequests` must be called after `buildLocalSourceOps` because - // `buildLocalSourceOps` will setup `region_retry_from_local_region` and we must - // retry those regions or there will be data lost. - auto remote_requests = buildRemoteRequests(scan_context); - - if (dag_context.is_disaggregated_task && !remote_requests.empty()) - { - // This means RN is sending requests with stale region info, we simply reject the request - // and ask RN to send requests again with correct region info. When RN updates region info, - // RN may be sending requests to other WN. - - RegionException::UnavailableRegions region_ids; - for (const auto & info : context.getDAGContext()->retry_regions) - region_ids.insert(info.region_id); - - throw RegionException( - std::move(region_ids), - RegionException::RegionReadStatus::EPOCH_NOT_MATCH); - } - - FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired); - - // Release alter locks - // The DeltaTree engine ensures that once sourceOps are created, the caller can get a consistent result - // from those sourceOps even if DDL operations are applied. Release the alter lock so that reading does not - // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. - const TableLockHolders drop_locks = releaseAlterLocks(); - - remote_read_sources_start_index = source_ops.size(); - - if (!remote_requests.empty()) - buildRemoteSourceOps(source_ops, exec_status, remote_requests); - - for (const auto & lock : drop_locks) - dagContext().addTableLock(lock); - - FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); - FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); - - return source_ops; -} - -void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder) -{ - /// handle generated column if necessary. - executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log); - NamesAndTypes source_columns; - source_columns.reserve(table_scan.getColumnSize()); - const auto table_scan_output_header = group_builder.getCurrentHeader(); - for (const auto & col : table_scan_output_header) - source_columns.emplace_back(col.name, col.type); - analyzer = std::make_unique(std::move(source_columns), context); - /// If there is no local source, there is no need to execute cast and push down filter, return directly. - /// But we should make sure that the analyzer is initialized before return. - if (remote_read_sources_start_index == 0) - return; - /// handle timezone/duration cast for local table scan. - executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index); - - /// handle filter conditions for local and remote table scan. - if (filter_conditions.hasValue()) - { - ::DB::executePushedDownFilter(exec_status, group_builder, remote_read_sources_start_index, filter_conditions, *analyzer, log); - /// TODO: record profile - } -} - ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) { auto & dag_context = dagContext(); @@ -440,26 +340,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); -<<<<<<< HEAD - if (!table_scan.getPushedDownFilters().empty()) - { - /// If there are duration type pushed down filters, type of columns may be changed. - const auto table_scan_output_header = pipeline.firstStream()->getHeader(); - for (const auto & col : table_scan_output_header) - { - const auto & name = col.name; - auto it = std::find_if(source_columns.begin(), source_columns.end(), [&name](const auto & c) { return c.name == name; }); - if (it != source_columns.end() && it->type != col.type) - it->type = col.type; - } - } - analyzer = std::make_unique(std::move(source_columns), context); - - /// handle timezone/duration cast for local and remote table scan. - executeCastAfterTableScan(remote_read_streams_start_index, pipeline); -======= ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) /// handle generated column if necessary. executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline); NamesAndTypes source_columns; @@ -521,33 +402,9 @@ void DAGStorageInterpreter::prepare() assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end()); storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage; -<<<<<<< HEAD - std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(); -======= std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan(); } -void DAGStorageInterpreter::executeCastAfterTableScan( - PipelineExecutorStatus & exec_status, - PipelineExecGroupBuilder & group_builder, - size_t remote_read_sources_start_index) -{ - // execute timezone cast or duration cast if needed for local table scan - auto [has_cast, extra_cast] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan); - if (has_cast) - { - RUNTIME_CHECK(remote_read_sources_start_index <= group_builder.group.size()); - size_t i = 0; - // local sources - while (i < remote_read_sources_start_index) - { - auto & group = group_builder.group[i++]; - group.appendTransformOp(std::make_unique(exec_status, log->identifier(), extra_cast)); - } - } ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) -} - void DAGStorageInterpreter::executeCastAfterTableScan( size_t remote_read_streams_start_index, DAGPipeline & pipeline) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0036a47b578..5e8b821f4f8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1086,11 +1086,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, { stream = std::make_shared( read_task_pool, -<<<<<<< HEAD - filter && filter->before_where ? filter->columns_after_cast : columns_to_read, -======= filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read, ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) extra_table_id_index, physical_table_id, log_tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h index 39f7b35c1cf..fd910a9a6ba 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h @@ -32,32 +32,17 @@ class PushDownFilter : public std::enable_shared_from_this const ColumnDefines & filter_columns_, const String filter_column_name_, const ExpressionActionsPtr & extra_cast_, -<<<<<<< HEAD - const ColumnDefines & columns_after_cast_) -======= const ColumnDefinesPtr & columns_after_cast_) ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) : rs_operator(rs_operator_) , before_where(beofre_where_) , filter_column_name(std::move(filter_column_name_)) , filter_columns(std::move(filter_columns_)) , extra_cast(extra_cast_) -<<<<<<< HEAD - , columns_after_cast(std::move(columns_after_cast_)) -======= , columns_after_cast(columns_after_cast_) ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) {} explicit PushDownFilter(const RSOperatorPtr & rs_operator_) : rs_operator(rs_operator_) -<<<<<<< HEAD - , before_where(nullptr) - , filter_columns({}) - , extra_cast(nullptr) - , columns_after_cast({}) -======= ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) {} // Rough set operator @@ -71,11 +56,7 @@ class PushDownFilter : public std::enable_shared_from_this // The expression actions used to cast the timestamp/datetime column ExpressionActionsPtr extra_cast; // If the extra_cast is not null, the types of the columns may be changed -<<<<<<< HEAD - ColumnDefines columns_after_cast; -======= ColumnDefinesPtr columns_after_cast; ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) }; } // namespace DB::DM diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index ea7590d7a74..d05e7e314d5 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -839,21 +839,6 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer); LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions()); -<<<<<<< HEAD - ColumnDefines columns_after_cast; - columns_after_cast.reserve(columns_to_read.size()); - const auto & source_columns = analyzer->getCurrentInputColumns(); - for (size_t i = 0; i < table_scan_column_info.size(); ++i) - { - if (table_scan_column_info[i].hasGeneratedColumnFlag()) - continue; - auto it = columns_to_read_map.at(table_scan_column_info[i].id); - RUNTIME_CHECK(it.name == source_columns[i].name); - columns_after_cast.push_back(std::move(it)); - columns_after_cast.back().type = source_columns[i].type; - } - -======= auto columns_after_cast = std::make_shared(); if (extra_cast != nullptr) { @@ -869,7 +854,6 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr columns_after_cast->back().type = current_names_and_types[i].type; } } ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) return std::make_shared(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast); } LOG_DEBUG(tracing_logger, "Push down filter is empty"); From d74a5139e0234191ccde8d376a75c3a17cc109f0 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Tue, 16 May 2023 17:00:45 +0800 Subject: [PATCH 3/3] move tests Signed-off-by: Lloyd-Pottiger --- ...ration_filter_push_down_to_table_scan.test | 47 ------------------- 1 file changed, 47 deletions(-) delete mode 100644 tests/fullstack-test-dt/expr/duration_filter_push_down_to_table_scan.test diff --git a/tests/fullstack-test-dt/expr/duration_filter_push_down_to_table_scan.test b/tests/fullstack-test-dt/expr/duration_filter_push_down_to_table_scan.test deleted file mode 100644 index 58e8c4b5208..00000000000 --- a/tests/fullstack-test-dt/expr/duration_filter_push_down_to_table_scan.test +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2023 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -mysql> drop table if exists test.t; -mysql> create table if not exists test.t(a time(4), i int); - -# insert more than 8192 rows to make sure filter conditions can be pushed down. -mysql> insert into test.t values('-700:10:10.123456', 1), ('700:11:11.123500', 2), ('600:11:11.123500', 3); -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; -mysql> insert into test.t select * from test.t; - -mysql> alter table test.t set tiflash replica 1; - -func> wait_table test t - -mysql> select * from test.t where a = '500:11:11.123500'; -# success, but the result is empty -mysql> select hour(a), i from test.t where a = '500:11:11.123500'; -mysql> select minute(a), i from test.t where a = '500:11:11.123500'; -mysql> select second(a), i from test.t where a = '500:11:11.123500'; -mysql> select a, i from test.t where hour(a) = 500; -mysql> select a, i from test.t where minute(a) = 13; -mysql> select a, i from test.t where second(a) = 14; - -mysql> drop table test.t;